Skip to content

Instantly share code, notes, and snippets.

@byates
Last active May 26, 2025 12:42
Show Gist options
  • Save byates/f90c6437cfcf4628fd4f51ff3093219c to your computer and use it in GitHub Desktop.
Save byates/f90c6437cfcf4628fd4f51ff3093219c to your computer and use it in GitHub Desktop.
Tool to bind and unbind a DPDK driver to a network interface
#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2010-2014 Intel Corporation
#
# This utility will attempt to bind a network interface to a dpdk
# compatible driver (dpdk_default_driver). The network interface
# must be bound to the OS for this utility to work as this utility:
# 1. records the current interface parameters to a file
# 2. unbinds the interface from the OS
# 3. binds the interface to the dpdk driver
#
# When "unbinding" using this utility, you don't specify the interface
# as this utility only looks in the previously recorded file. This
# implies that only a single interface can be used at a time.
#
# usage: dpdk-bind-and-record.py [-h] [-i INTERFACE] [-b INTERFACE] [-u]
#
# Utility to bind and unbind devices from Linux kernel
#
# positional arguments:
# INTERFACE Network interface by name (ex. eth1)
#
# options:
# -h, --help show this help message and exit
# -i, --info Print the interface info
#. -d, --driver which driver to use: igb_uio, vfio-pci, uio_pci_generic
# -b, --bind Bind the interface to the DPDK driver
# -u, --unbind Unbind a previously bound interface
#
# Examples:
# ---------
#
# To display device info for eth1 but not bind:
# dpdk-bind-and-record.py --info eth1
#
# To bind eth1 from the current driver and move to use igb_pci
# dpdk-bind-and-record.py --bind eth1
#
# To unbind the last bound interface
# dpdk-bind-and-record.py --unbind
#
import sys
import os
import platform
import subprocess
import argparse
import netifaces
import json
from glob import glob
from os.path import exists, basename
from os.path import join as path_join
from pprint import pprint
file_name_for_saved_data = "dpdk-bind-and-record.json"
network_class = {'Class': '02', 'Vendor': None, 'Device': None,
'SVendor': None, 'SDevice': None}
network_devices = [network_class]
# global dict ethernet devices present. Dictionary indexed by PCI address.
# Each device within this is itself a dictionary of device properties
devices = {}
# global dict for the selected device
device = {}
# list of supported DPDK drivers
dpdk_drivers = ["igb_uio", "vfio-pci", "uio_pci_generic"]
# list of currently loaded kernel modules
loaded_modules = None
# command-line arg flags
b_flag = None
info_flag = False
args_dev = []
driver = "igb_uio"
# check if a specific kernel module is loaded
def module_is_loaded(module):
global loaded_modules
if module == 'vfio_pci':
module = 'vfio-pci'
if loaded_modules:
return module in loaded_modules
# Get list of sysfs modules (both built-in and dynamically loaded)
sysfs_path = '/sys/module/'
# Get the list of directories in sysfs_path
sysfs_mods = [m for m in os.listdir(sysfs_path)
if os.path.isdir(os.path.join(sysfs_path, m))]
# special case for vfio_pci (module is named vfio-pci,
# but its .ko is named vfio_pci)
sysfs_mods = [a if a != 'vfio_pci' else 'vfio-pci' for a in sysfs_mods]
loaded_modules = sysfs_mods
# add built-in modules as loaded
release = platform.uname().release
filename = os.path.join("/lib/modules/", release, "modules.builtin")
if os.path.exists(filename):
try:
with open(filename) as f:
loaded_modules += [os.path.splitext(os.path.basename(mod))[0] for mod in f]
except IOError:
print("Warning: cannot read list of built-in kernel modules")
return module in loaded_modules
def check_dpdk_modules():
'''Checks that at least one DPDK driver is loaded'''
global dpdk_drivers
# list of supported modules
mods = [{"Name": driver, "Found": False} for driver in dpdk_drivers]
# Check all modules to see if they are loaded
for mod in mods:
if module_is_loaded(mod["Name"]):
mod["Found"] = True
# check if we have at least one loaded module
if True not in [mod["Found"] for mod in mods] and b_flag is not None:
sys.exit("ERROR: no supported DPDK kernel modules (drivers) are loaded")
# change DPDK driver list to only contain drivers that are loaded
dpdk_drivers = [mod["Name"] for mod in mods if mod["Found"]]
def has_driver(dev_id):
'''return true if a device is assigned to a driver. False otherwise'''
return ("Driver_str" in devices[dev_id]) and (devices[dev_id]["Driver_str"] != "")
def get_pci_device_details(dev_id, probe_lspci):
'''This function gets additional details for a PCI device'''
device = {}
if probe_lspci:
extra_info = subprocess.check_output(["lspci", "-vmmks", dev_id]).splitlines()
# parse lspci details
for line in extra_info:
if not line:
continue
name, value = line.decode("utf8").split("\t", 1)
name = name.strip(":") + "_str"
device[name] = value
# check for a unix interface name
device["Interface"] = ""
for base, dirs, _ in os.walk("/sys/bus/pci/devices/%s/" % dev_id):
if "net" in dirs:
device["Interface"] = \
",".join(os.listdir(os.path.join(base, "net")))
break
# check if a port is used for ssh connection
device["Ssh_if"] = False
device["Active"] = ""
return device
def build_dict_of_all_devices(devices_type):
'''This function populates the "devices" dictionary. The keys used are
the pci addresses (domain:bus:slot.func). The values are themselves
dictionaries - one for each NIC.'''
global devices
global dpdk_drivers
# first loop through and read details for all devices
# request machine readable format, with numeric IDs and String
dev = {}
dev_lines = subprocess.check_output(["lspci", "-Dvmmnnk"]).splitlines()
for dev_line in dev_lines:
if not dev_line:
if device_type_match(dev, devices_type):
# Replace "Driver" with "Driver_str" to have consistency of
# of dictionary key names
if "Driver" in dev.keys():
dev["Driver_str"] = dev.pop("Driver")
if "Module" in dev.keys():
dev["Module_str"] = dev.pop("Module")
# use dict to make copy of dev
devices[dev["Slot"]] = dict(dev)
# Clear previous device's data
dev = {}
else:
name, value = dev_line.decode("utf8").split("\t", 1)
value_list = value.rsplit(' ', 1)
if value_list:
# String stored in <name>_str
dev[name.rstrip(":") + '_str'] = value_list[0]
# Numeric IDs
dev[name.rstrip(":")] = value_list[len(value_list) - 1] \
.rstrip("]").lstrip("[")
if devices_type == network_devices:
# check what is the interface if any for an ssh connection if
# any to this host, so we can mark it later.
ssh_if = []
route = subprocess.check_output(["ip", "-o", "route"])
# filter out all lines for 169.254 routes
route = "\n".join(filter(lambda ln: not ln.startswith("169.254"),
route.decode().splitlines()))
rt_info = route.split()
for i in range(len(rt_info) - 1):
if rt_info[i] == "dev":
ssh_if.append(rt_info[i + 1])
# based on the basic info, get extended text details
for d in devices.keys():
if not device_type_match(devices[d], devices_type):
continue
# get additional info and add it to existing data
devices[d] = devices[d].copy()
# No need to probe lspci
devices[d].update(get_pci_device_details(d, False).items())
if devices_type == network_devices:
for _if in ssh_if:
if _if in devices[d]["Interface"].split(","):
devices[d]["Ssh_if"] = True
devices[d]["Active"] = "*Active*"
break
# add igb_uio to list of supporting modules if needed
if "Module_str" in devices[d]:
for driver in dpdk_drivers:
if driver not in devices[d]["Module_str"]:
devices[d]["Module_str"] = \
devices[d]["Module_str"] + ",%s" % driver
else:
devices[d]["Module_str"] = ",".join(dpdk_drivers)
# make sure the driver and module strings do not have any duplicates
if has_driver(d):
modules = devices[d]["Module_str"].split(",")
if devices[d]["Driver_str"] in modules:
modules.remove(devices[d]["Driver_str"])
devices[d]["Module_str"] = ",".join(modules)
def device_type_match(dev, devices_type):
for i in range(len(devices_type)):
param_count = len(
[x for x in devices_type[i].values() if x is not None])
match_count = 0
if dev["Class"][0:2] == devices_type[i]["Class"]:
match_count = match_count + 1
for key in devices_type[i].keys():
if key != 'Class' and devices_type[i][key]:
value_list = devices_type[i][key].split(',')
for value in value_list:
if value.strip(' ') == dev[key]:
match_count = match_count + 1
# count must be the number of non None parameters to match
if match_count == param_count:
return True
return False
def pci_from_dev_name(dev_name):
'''Take a device "name" - a string passed in by user to identify a NIC
device, and determine the device id - i.e. the domain:bus:slot.func - for
it, which can then be used to index into the devices array'''
# check if it's already a suitable index
if dev_name in devices:
return dev_name
# check if it's an index just missing the domain part
if "0000:" + dev_name in devices:
return "0000:" + dev_name
# check if it's an interface name, e.g. eth1
for d in devices.keys():
if dev_name in devices[d]["Interface"].split(","):
return devices[d]["Slot"]
# if nothing else matches - error
raise ValueError("Unknown device: %s. "
"Please specify device in \"bus:slot.func\" format" % dev_name)
def parse_args():
'''Parses the command-line arguments given by the user and takes the
appropriate action for each'''
global b_flag
global info_flag
global args_dev
global driver
parser = argparse.ArgumentParser(
description='Utility to bind and unbind devices from Linux kernel',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
---------
To display device info for eth1 but not bind:
%(prog)s --info eth1
To bind eth1 from the current driver and move to use vfio-pci
%(prog)s --bind=vfio-pci eth1
""")
parser.add_argument(
'-i',
'--info',
action='store_true',
help="Print the device info")
bind_group = parser.add_mutually_exclusive_group()
bind_group.add_argument(
'-b',
'--bind',
action='store_true',
help="Select the driver to use or \"none\" to unbind the device")
bind_group.add_argument(
'-u',
'--unbind',
action='store_true',
help="Unbind a device (equivalent to \"-b none\")")
parser.add_argument(
'-d',
'--driver',
type=str,
default='igb_uio',
help="Set the driver to use: vfio-pci, igb_uio (default)")
parser.add_argument(
'devices',
metavar='DEVICE',
nargs='*',
help="""
Device specified by interface name.
""")
opt = parser.parse_args()
if opt.info:
info_flag = True
if opt.bind or opt.unbind:
b_flag = opt.bind
args_dev = opt.devices
driver = opt.driver
if (b_flag is None) and (not info_flag):
print("Error: No action specified for devices. "
"Please give a --bind, --ubind or --info option",
file=sys.stderr)
parser.print_usage()
sys.exit(1)
if (b_flag or info_flag) and not args_dev:
print("Error: No devices specified.", file=sys.stderr)
parser.print_usage()
sys.exit(1)
if (b_flag or info_flag) and (len(args_dev) != 1):
print("Error: Only one device may be specified.", file=sys.stderr)
parser.print_usage()
sys.exit(1)
if args_dev:
args_dev = args_dev[0]
def check_device():
'''Make sure the selected device is actually one of the system interfaces.
'''
if not args_dev in netifaces.interfaces():
print("Error: %s is not a valid network interface." % (args_dev), file=sys.stderr)
print(" Valid interfaces are:"+str(netifaces.interfaces()))
sys.exit(1)
pass
def extract_device_details():
'''Creats a disctionary called 'device' which holds all the interesting details about
the selected device.
'''
dev_pci = pci_from_dev_name(args_dev)
IPv4=netifaces.ifaddresses(args_dev)[netifaces.AF_INET][0]
device["device"] = args_dev
device["pci"] = devices[dev_pci]["Slot_str"]
device["driver"] = devices[dev_pci]["Driver_str"]
device["mac"] = netifaces.ifaddresses(args_dev)[netifaces.AF_LINK][0]["addr"]
device["ipv4"] = IPv4["addr"]
device["netmask"] = IPv4["netmask"]
def show_status():
'''Shows the details for the selected device'''
print("Device : "+device["device"])
print("PCI : "+device["pci"])
print("Driver : "+device["driver"])
print("MAC : "+device["mac"])
print("IP : "+device["ipv4"])
print("Netmask : "+device["netmask"])
def save_device_details():
'''Writes device details to json file'''
global device
with open(file_name_for_saved_data, 'w', encoding='utf-8') as f:
json.dump(device, f, ensure_ascii=False, indent=4)
def read_device_details_from_file():
'''Reads device details from json file'''
global device
try:
with open(file_name_for_saved_data) as f:
device = json.load(f)
except FileNotFoundError:
sys.exit("ERROR: File '"+file_name_for_saved_data+" not found. Can't auto unbind.")
def unbind_one(dev_id, force):
'''Unbind the device identified by "dev_id" from its current driver'''
dev = devices[dev_id]
if not has_driver(dev_id):
print("Notice: %s %s %s is not currently managed by any driver" %
(dev["Slot"], dev["Device_str"], dev["Interface"]), file=sys.stderr)
return
print("Info: unbinding %s from device %s" % (dev["Driver_str"], dev_id))
# prevent us disconnecting ourselves
if dev["Ssh_if"] and not force:
print("Warning: routing table indicates that interface %s is active. "
"Skipping unbind" % dev_id, file=sys.stderr)
return
# write to /sys to unbind
filename = "/sys/bus/pci/drivers/%s/unbind" % dev["Driver_str"]
try:
f = open(filename, "a")
except OSError as err:
sys.exit("Error: unbind failed for %s - Cannot open %s: %s" %
(dev_id, filename, err))
f.write(dev_id)
f.close()
def bind_one(dev_id, driver, force) -> bool:
'''Bind the device given by "dev_id" to the driver "driver". If the device
is already bound to a different driver, it will be unbound first'''
dev = devices[dev_id]
saved_driver = None # used to rollback any unbind in case of failure
# prevent disconnection of our ssh session
if dev["Ssh_if"] and not force:
print("Warning: routing table indicates that interface %s is active. "
"Not modifying" % dev_id, file=sys.stderr)
return False
# unbind any existing drivers we don't want
if has_driver(dev_id):
if dev["Driver_str"] == driver:
print("Notice: %s already bound to driver %s, skipping" %
(dev_id, driver), file=sys.stderr)
return False
saved_driver = dev["Driver_str"]
unbind_one(dev_id, force)
dev["Driver_str"] = "" # clear driver string
print("Info: binding device %s to driver %s" % (dev_id, driver))
# For kernels >= 3.15 driver_override can be used to specify the driver
# for a device rather than relying on the driver to provide a positive
# match of the device. The existing process of looking up
# the vendor and device ID, adding them to the driver new_id,
# will erroneously bind other devices too which has the additional burden
# of unbinding those devices
if driver in dpdk_drivers:
filename = "/sys/bus/pci/devices/%s/driver_override" % dev_id
if exists(filename):
try:
f = open(filename, "w")
except OSError as err:
print("Error[1]: bind failed for %s - Cannot open %s: %s"
% (dev_id, filename, err), file=sys.stderr)
return False
try:
f.write("%s" % driver)
f.close()
except OSError as err:
print("Error: bind failed for %s - Cannot write driver %s to "
"PCI ID: %s" % (dev_id, driver, err), file=sys.stderr)
return False
# For kernels < 3.15 use new_id to add PCI id's to the driver
else:
filename = "/sys/bus/pci/drivers/%s/new_id" % driver
try:
f = open(filename, "w")
except OSError as err:
print("Error[2]: bind failed for %s - Cannot open %s: %s"
% (dev_id, filename, err), file=sys.stderr)
return False
try:
# Convert Device and Vendor Id to int to write to new_id
f.write("%04x %04x" % (int(dev["Vendor"], 16),
int(dev["Device"], 16)))
f.close()
except OSError as err:
print("Error: bind failed for %s - Cannot write new PCI ID to "
"driver %s: %s" % (dev_id, driver, err), file=sys.stderr)
return False
# do the bind by writing to /sys
filename = "/sys/bus/pci/drivers/%s/bind" % driver
try:
f = open(filename, "a")
except OSError as err:
print("Error[3]: bind failed for %s - Cannot open %s: %s"
% (dev_id, filename, err), file=sys.stderr)
if saved_driver is not None: # restore any previous driver
bind_one(dev_id, saved_driver, force)
return False
try:
f.write(dev_id)
f.close()
except OSError as err:
# for some reason, closing dev_id after adding a new PCI ID to new_id
# results in IOError. however, if the device was successfully bound,
# we don't care for any errors and can safely ignore IOError
tmp = get_pci_device_details(dev_id, True)
if "Driver_str" in tmp and tmp["Driver_str"] == driver:
return
print("Error: bind failed for %s - Cannot bind to driver %s: %s"
% (dev_id, driver, err), file=sys.stderr)
if saved_driver is not None: # restore any previous driver
bind_one(dev_id, saved_driver, force)
return False
# For kernels > 3.15 driver_override is used to bind a device to a driver.
# Before unbinding it, overwrite driver_override with empty string so that
# the device can be bound to any other driver
filename = "/sys/bus/pci/devices/%s/driver_override" % dev_id
if exists(filename):
try:
f = open(filename, "w")
except OSError as err:
sys.exit("Error: unbind failed for %s - Cannot open %s: %s"
% (dev_id, filename, err))
try:
f.write("\00")
f.close()
except OSError as err:
sys.exit("Error: unbind failed for %s - Cannot write %s: %s"
% (dev_id, filename, err))
return True
def do_arg_actions():
'''do the actual action requested by the user'''
global b_flag
global info_flag
global args_dev
if info_flag:
show_status()
if b_flag is not None:
if b_flag:
save_device_details()
bind_one(device["pci"], driver, True)
else:
if bind_one(device["pci"], device["driver"], False):
os.remove(file_name_for_saved_data)
def main():
'''program main function'''
# check to make sure we have the right permissions
if os.geteuid() != 0:
sys.exit("You must run this script with SUDO or be root")
# check if lspci is installed, suppress any output
with open(os.devnull, 'w') as devnull:
ret = subprocess.call(['which', 'lspci'],
stdout=devnull, stderr=devnull)
if ret != 0:
sys.exit("'lspci' not found - please install 'pciutils'")
parse_args()
check_dpdk_modules()
build_dict_of_all_devices(network_devices)
if ((b_flag is not None) and b_flag) or info_flag:
check_device()
extract_device_details()
else:
read_device_details_from_file()
do_arg_actions()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment