Last active
          May 26, 2025 12:42 
        
      - 
      
- 
        Save byates/f90c6437cfcf4628fd4f51ff3093219c to your computer and use it in GitHub Desktop. 
Revisions
- 
        byates revised this gist May 26, 2025 . 1 changed file with 37 additions and 0 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,6 +2,43 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel Corporation # # This utility will attempt to bind a network interface to a dpdk # compatible driver (dpdk_default_driver). The network interface # must be bound to the OS for this utility to work as this utility: # 1. records the current interface parameters to a file # 2. unbinds the interface from the OS # 3. binds the interface to the dpdk driver # # When "unbinding" using this utility, you don't specify the interface # as this utility only looks in the previously recorded file. This # implies that only a single interface can be used at a time. # # usage: dpdk-bind-and-record.py [-h] [-i INTERFACE] [-b INTERFACE] [-u] # # Utility to bind and unbind devices from Linux kernel # # positional arguments: # INTERFACE Network interface by name (ex. eth1) # # options: # -h, --help show this help message and exit # -i, --info Print the interface info #. -d, --driver which driver to use: igb_uio, vfio-pci, uio_pci_generic # -b, --bind Bind the interface to the DPDK driver # -u, --unbind Unbind a previously bound interface # # Examples: # --------- # # To display device info for eth1 but not bind: # dpdk-bind-and-record.py --info eth1 # # To bind eth1 from the current driver and move to use igb_pci # dpdk-bind-and-record.py --bind eth1 # # To unbind the last bound interface # dpdk-bind-and-record.py --unbind # import sys import os 
- 
        byates revised this gist May 26, 2025 . 1 changed file with 10 additions and 39 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,42 +2,6 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel Corporation # import sys import os @@ -65,15 +29,14 @@ device = {} # list of supported DPDK drivers dpdk_drivers = ["igb_uio", "vfio-pci", "uio_pci_generic"] # list of currently loaded kernel modules loaded_modules = None # command-line arg flags b_flag = None info_flag = False args_dev = [] driver = "igb_uio" # check if a specific kernel module is loaded def module_is_loaded(module): @@ -283,6 +246,7 @@ def parse_args(): global b_flag global info_flag global args_dev global driver parser = argparse.ArgumentParser( description='Utility to bind and unbind devices from Linux kernel', @@ -315,6 +279,12 @@ def parse_args(): '--unbind', action='store_true', help="Unbind a device (equivalent to \"-b none\")") parser.add_argument( '-d', '--driver', type=str, default='igb_uio', help="Set the driver to use: vfio-pci, igb_uio (default)") parser.add_argument( 'devices', metavar='DEVICE', @@ -329,6 +299,7 @@ def parse_args(): if opt.bind or opt.unbind: b_flag = opt.bind args_dev = opt.devices driver = opt.driver if (b_flag is None) and (not info_flag): print("Error: No action specified for devices. " @@ -541,7 +512,7 @@ def do_arg_actions(): if b_flag is not None: if b_flag: save_device_details() bind_one(device["pci"], driver, True) else: if bind_one(device["pci"], device["driver"], False): os.remove(file_name_for_saved_data) 
- 
        byates created this gist Mar 10, 2025 .There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,573 @@ #!/usr/bin/env python3 # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel Corporation # # This utility will attempt to bind a network interface to a dpdk # compatible driver (dpdk_default_driver). The network interface # must be bound to the OS for this utility to work as this utility: # 1. records the current interface parameters to a file # 2. unbinds the interface from the OS # 3. binds the interface to the dpdk driver # # When "unbinding" using this utility, you don't specify the interface # as this utility only looks in the previously recorded file. This # implies that only a single interface can be used at a time. # # usage: dpdk-bind-and-record.py [-h] [-i INTERFACE] [-b INTERFACE] [-u] # # Utility to bind and unbind devices from Linux kernel # # positional arguments: # INTERFACE Network interface by name (ex. eth1) # # options: # -h, --help show this help message and exit # -i, --info Print the interface info # -b, --bind Bind the interface to the DPDK driver # -u, --unbind Unbind a previously bound interface # # Examples: # --------- # # To display device info for eth1 but not bind: # dpdk-bind-and-record.py --info eth1 # # To bind eth1 from the current driver and move to use igb_pci # dpdk-bind-and-record.py --bind eth1 # # To unbind the last bound interface # dpdk-bind-and-record.py --unbind # import sys import os import platform import subprocess import argparse import netifaces import json from glob import glob from os.path import exists, basename from os.path import join as path_join from pprint import pprint file_name_for_saved_data = "dpdk-bind-and-record.json" network_class = {'Class': '02', 'Vendor': None, 'Device': None, 'SVendor': None, 'SDevice': None} network_devices = [network_class] # global dict ethernet devices present. Dictionary indexed by PCI address. # Each device within this is itself a dictionary of device properties devices = {} # global dict for the selected device device = {} # list of supported DPDK drivers dpdk_drivers = ["igb_uio", "vfio-pci", "uio_pci_generic"] # which driver to use dpdk_default_driver = "igb_uio" # list of currently loaded kernel modules loaded_modules = None # command-line arg flags b_flag = None info_flag = False args_dev = [] # check if a specific kernel module is loaded def module_is_loaded(module): global loaded_modules if module == 'vfio_pci': module = 'vfio-pci' if loaded_modules: return module in loaded_modules # Get list of sysfs modules (both built-in and dynamically loaded) sysfs_path = '/sys/module/' # Get the list of directories in sysfs_path sysfs_mods = [m for m in os.listdir(sysfs_path) if os.path.isdir(os.path.join(sysfs_path, m))] # special case for vfio_pci (module is named vfio-pci, # but its .ko is named vfio_pci) sysfs_mods = [a if a != 'vfio_pci' else 'vfio-pci' for a in sysfs_mods] loaded_modules = sysfs_mods # add built-in modules as loaded release = platform.uname().release filename = os.path.join("/lib/modules/", release, "modules.builtin") if os.path.exists(filename): try: with open(filename) as f: loaded_modules += [os.path.splitext(os.path.basename(mod))[0] for mod in f] except IOError: print("Warning: cannot read list of built-in kernel modules") return module in loaded_modules def check_dpdk_modules(): '''Checks that at least one DPDK driver is loaded''' global dpdk_drivers # list of supported modules mods = [{"Name": driver, "Found": False} for driver in dpdk_drivers] # Check all modules to see if they are loaded for mod in mods: if module_is_loaded(mod["Name"]): mod["Found"] = True # check if we have at least one loaded module if True not in [mod["Found"] for mod in mods] and b_flag is not None: sys.exit("ERROR: no supported DPDK kernel modules (drivers) are loaded") # change DPDK driver list to only contain drivers that are loaded dpdk_drivers = [mod["Name"] for mod in mods if mod["Found"]] def has_driver(dev_id): '''return true if a device is assigned to a driver. False otherwise''' return ("Driver_str" in devices[dev_id]) and (devices[dev_id]["Driver_str"] != "") def get_pci_device_details(dev_id, probe_lspci): '''This function gets additional details for a PCI device''' device = {} if probe_lspci: extra_info = subprocess.check_output(["lspci", "-vmmks", dev_id]).splitlines() # parse lspci details for line in extra_info: if not line: continue name, value = line.decode("utf8").split("\t", 1) name = name.strip(":") + "_str" device[name] = value # check for a unix interface name device["Interface"] = "" for base, dirs, _ in os.walk("/sys/bus/pci/devices/%s/" % dev_id): if "net" in dirs: device["Interface"] = \ ",".join(os.listdir(os.path.join(base, "net"))) break # check if a port is used for ssh connection device["Ssh_if"] = False device["Active"] = "" return device def build_dict_of_all_devices(devices_type): '''This function populates the "devices" dictionary. The keys used are the pci addresses (domain:bus:slot.func). The values are themselves dictionaries - one for each NIC.''' global devices global dpdk_drivers # first loop through and read details for all devices # request machine readable format, with numeric IDs and String dev = {} dev_lines = subprocess.check_output(["lspci", "-Dvmmnnk"]).splitlines() for dev_line in dev_lines: if not dev_line: if device_type_match(dev, devices_type): # Replace "Driver" with "Driver_str" to have consistency of # of dictionary key names if "Driver" in dev.keys(): dev["Driver_str"] = dev.pop("Driver") if "Module" in dev.keys(): dev["Module_str"] = dev.pop("Module") # use dict to make copy of dev devices[dev["Slot"]] = dict(dev) # Clear previous device's data dev = {} else: name, value = dev_line.decode("utf8").split("\t", 1) value_list = value.rsplit(' ', 1) if value_list: # String stored in <name>_str dev[name.rstrip(":") + '_str'] = value_list[0] # Numeric IDs dev[name.rstrip(":")] = value_list[len(value_list) - 1] \ .rstrip("]").lstrip("[") if devices_type == network_devices: # check what is the interface if any for an ssh connection if # any to this host, so we can mark it later. ssh_if = [] route = subprocess.check_output(["ip", "-o", "route"]) # filter out all lines for 169.254 routes route = "\n".join(filter(lambda ln: not ln.startswith("169.254"), route.decode().splitlines())) rt_info = route.split() for i in range(len(rt_info) - 1): if rt_info[i] == "dev": ssh_if.append(rt_info[i + 1]) # based on the basic info, get extended text details for d in devices.keys(): if not device_type_match(devices[d], devices_type): continue # get additional info and add it to existing data devices[d] = devices[d].copy() # No need to probe lspci devices[d].update(get_pci_device_details(d, False).items()) if devices_type == network_devices: for _if in ssh_if: if _if in devices[d]["Interface"].split(","): devices[d]["Ssh_if"] = True devices[d]["Active"] = "*Active*" break # add igb_uio to list of supporting modules if needed if "Module_str" in devices[d]: for driver in dpdk_drivers: if driver not in devices[d]["Module_str"]: devices[d]["Module_str"] = \ devices[d]["Module_str"] + ",%s" % driver else: devices[d]["Module_str"] = ",".join(dpdk_drivers) # make sure the driver and module strings do not have any duplicates if has_driver(d): modules = devices[d]["Module_str"].split(",") if devices[d]["Driver_str"] in modules: modules.remove(devices[d]["Driver_str"]) devices[d]["Module_str"] = ",".join(modules) def device_type_match(dev, devices_type): for i in range(len(devices_type)): param_count = len( [x for x in devices_type[i].values() if x is not None]) match_count = 0 if dev["Class"][0:2] == devices_type[i]["Class"]: match_count = match_count + 1 for key in devices_type[i].keys(): if key != 'Class' and devices_type[i][key]: value_list = devices_type[i][key].split(',') for value in value_list: if value.strip(' ') == dev[key]: match_count = match_count + 1 # count must be the number of non None parameters to match if match_count == param_count: return True return False def pci_from_dev_name(dev_name): '''Take a device "name" - a string passed in by user to identify a NIC device, and determine the device id - i.e. the domain:bus:slot.func - for it, which can then be used to index into the devices array''' # check if it's already a suitable index if dev_name in devices: return dev_name # check if it's an index just missing the domain part if "0000:" + dev_name in devices: return "0000:" + dev_name # check if it's an interface name, e.g. eth1 for d in devices.keys(): if dev_name in devices[d]["Interface"].split(","): return devices[d]["Slot"] # if nothing else matches - error raise ValueError("Unknown device: %s. " "Please specify device in \"bus:slot.func\" format" % dev_name) def parse_args(): '''Parses the command-line arguments given by the user and takes the appropriate action for each''' global b_flag global info_flag global args_dev parser = argparse.ArgumentParser( description='Utility to bind and unbind devices from Linux kernel', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: --------- To display device info for eth1 but not bind: %(prog)s --info eth1 To bind eth1 from the current driver and move to use vfio-pci %(prog)s --bind=vfio-pci eth1 """) parser.add_argument( '-i', '--info', action='store_true', help="Print the device info") bind_group = parser.add_mutually_exclusive_group() bind_group.add_argument( '-b', '--bind', action='store_true', help="Select the driver to use or \"none\" to unbind the device") bind_group.add_argument( '-u', '--unbind', action='store_true', help="Unbind a device (equivalent to \"-b none\")") parser.add_argument( 'devices', metavar='DEVICE', nargs='*', help=""" Device specified by interface name. """) opt = parser.parse_args() if opt.info: info_flag = True if opt.bind or opt.unbind: b_flag = opt.bind args_dev = opt.devices if (b_flag is None) and (not info_flag): print("Error: No action specified for devices. " "Please give a --bind, --ubind or --info option", file=sys.stderr) parser.print_usage() sys.exit(1) if (b_flag or info_flag) and not args_dev: print("Error: No devices specified.", file=sys.stderr) parser.print_usage() sys.exit(1) if (b_flag or info_flag) and (len(args_dev) != 1): print("Error: Only one device may be specified.", file=sys.stderr) parser.print_usage() sys.exit(1) if args_dev: args_dev = args_dev[0] def check_device(): '''Make sure the selected device is actually one of the system interfaces. ''' if not args_dev in netifaces.interfaces(): print("Error: %s is not a valid network interface." % (args_dev), file=sys.stderr) print(" Valid interfaces are:"+str(netifaces.interfaces())) sys.exit(1) pass def extract_device_details(): '''Creats a disctionary called 'device' which holds all the interesting details about the selected device. ''' dev_pci = pci_from_dev_name(args_dev) IPv4=netifaces.ifaddresses(args_dev)[netifaces.AF_INET][0] device["device"] = args_dev device["pci"] = devices[dev_pci]["Slot_str"] device["driver"] = devices[dev_pci]["Driver_str"] device["mac"] = netifaces.ifaddresses(args_dev)[netifaces.AF_LINK][0]["addr"] device["ipv4"] = IPv4["addr"] device["netmask"] = IPv4["netmask"] def show_status(): '''Shows the details for the selected device''' print("Device : "+device["device"]) print("PCI : "+device["pci"]) print("Driver : "+device["driver"]) print("MAC : "+device["mac"]) print("IP : "+device["ipv4"]) print("Netmask : "+device["netmask"]) def save_device_details(): '''Writes device details to json file''' global device with open(file_name_for_saved_data, 'w', encoding='utf-8') as f: json.dump(device, f, ensure_ascii=False, indent=4) def read_device_details_from_file(): '''Reads device details from json file''' global device try: with open(file_name_for_saved_data) as f: device = json.load(f) except FileNotFoundError: sys.exit("ERROR: File '"+file_name_for_saved_data+" not found. Can't auto unbind.") def unbind_one(dev_id, force): '''Unbind the device identified by "dev_id" from its current driver''' dev = devices[dev_id] if not has_driver(dev_id): print("Notice: %s %s %s is not currently managed by any driver" % (dev["Slot"], dev["Device_str"], dev["Interface"]), file=sys.stderr) return print("Info: unbinding %s from device %s" % (dev["Driver_str"], dev_id)) # prevent us disconnecting ourselves if dev["Ssh_if"] and not force: print("Warning: routing table indicates that interface %s is active. " "Skipping unbind" % dev_id, file=sys.stderr) return # write to /sys to unbind filename = "/sys/bus/pci/drivers/%s/unbind" % dev["Driver_str"] try: f = open(filename, "a") except OSError as err: sys.exit("Error: unbind failed for %s - Cannot open %s: %s" % (dev_id, filename, err)) f.write(dev_id) f.close() def bind_one(dev_id, driver, force) -> bool: '''Bind the device given by "dev_id" to the driver "driver". If the device is already bound to a different driver, it will be unbound first''' dev = devices[dev_id] saved_driver = None # used to rollback any unbind in case of failure # prevent disconnection of our ssh session if dev["Ssh_if"] and not force: print("Warning: routing table indicates that interface %s is active. " "Not modifying" % dev_id, file=sys.stderr) return False # unbind any existing drivers we don't want if has_driver(dev_id): if dev["Driver_str"] == driver: print("Notice: %s already bound to driver %s, skipping" % (dev_id, driver), file=sys.stderr) return False saved_driver = dev["Driver_str"] unbind_one(dev_id, force) dev["Driver_str"] = "" # clear driver string print("Info: binding device %s to driver %s" % (dev_id, driver)) # For kernels >= 3.15 driver_override can be used to specify the driver # for a device rather than relying on the driver to provide a positive # match of the device. The existing process of looking up # the vendor and device ID, adding them to the driver new_id, # will erroneously bind other devices too which has the additional burden # of unbinding those devices if driver in dpdk_drivers: filename = "/sys/bus/pci/devices/%s/driver_override" % dev_id if exists(filename): try: f = open(filename, "w") except OSError as err: print("Error[1]: bind failed for %s - Cannot open %s: %s" % (dev_id, filename, err), file=sys.stderr) return False try: f.write("%s" % driver) f.close() except OSError as err: print("Error: bind failed for %s - Cannot write driver %s to " "PCI ID: %s" % (dev_id, driver, err), file=sys.stderr) return False # For kernels < 3.15 use new_id to add PCI id's to the driver else: filename = "/sys/bus/pci/drivers/%s/new_id" % driver try: f = open(filename, "w") except OSError as err: print("Error[2]: bind failed for %s - Cannot open %s: %s" % (dev_id, filename, err), file=sys.stderr) return False try: # Convert Device and Vendor Id to int to write to new_id f.write("%04x %04x" % (int(dev["Vendor"], 16), int(dev["Device"], 16))) f.close() except OSError as err: print("Error: bind failed for %s - Cannot write new PCI ID to " "driver %s: %s" % (dev_id, driver, err), file=sys.stderr) return False # do the bind by writing to /sys filename = "/sys/bus/pci/drivers/%s/bind" % driver try: f = open(filename, "a") except OSError as err: print("Error[3]: bind failed for %s - Cannot open %s: %s" % (dev_id, filename, err), file=sys.stderr) if saved_driver is not None: # restore any previous driver bind_one(dev_id, saved_driver, force) return False try: f.write(dev_id) f.close() except OSError as err: # for some reason, closing dev_id after adding a new PCI ID to new_id # results in IOError. however, if the device was successfully bound, # we don't care for any errors and can safely ignore IOError tmp = get_pci_device_details(dev_id, True) if "Driver_str" in tmp and tmp["Driver_str"] == driver: return print("Error: bind failed for %s - Cannot bind to driver %s: %s" % (dev_id, driver, err), file=sys.stderr) if saved_driver is not None: # restore any previous driver bind_one(dev_id, saved_driver, force) return False # For kernels > 3.15 driver_override is used to bind a device to a driver. # Before unbinding it, overwrite driver_override with empty string so that # the device can be bound to any other driver filename = "/sys/bus/pci/devices/%s/driver_override" % dev_id if exists(filename): try: f = open(filename, "w") except OSError as err: sys.exit("Error: unbind failed for %s - Cannot open %s: %s" % (dev_id, filename, err)) try: f.write("\00") f.close() except OSError as err: sys.exit("Error: unbind failed for %s - Cannot write %s: %s" % (dev_id, filename, err)) return True def do_arg_actions(): '''do the actual action requested by the user''' global b_flag global info_flag global args_dev if info_flag: show_status() if b_flag is not None: if b_flag: save_device_details() bind_one(device["pci"], dpdk_default_driver, True) else: if bind_one(device["pci"], device["driver"], False): os.remove(file_name_for_saved_data) def main(): '''program main function''' # check to make sure we have the right permissions if os.geteuid() != 0: sys.exit("You must run this script with SUDO or be root") # check if lspci is installed, suppress any output with open(os.devnull, 'w') as devnull: ret = subprocess.call(['which', 'lspci'], stdout=devnull, stderr=devnull) if ret != 0: sys.exit("'lspci' not found - please install 'pciutils'") parse_args() check_dpdk_modules() build_dict_of_all_devices(network_devices) if ((b_flag is not None) and b_flag) or info_flag: check_device() extract_device_details() else: read_device_details_from_file() do_arg_actions() if __name__ == "__main__": main()