Skip to content

Instantly share code, notes, and snippets.

@stancel
Forked from Scherlac/get_tds_cert.py
Created August 12, 2021 23:30
Show Gist options
  • Save stancel/4ac209f76ac51bd3e99603dcb62876fe to your computer and use it in GitHub Desktop.
Save stancel/4ac209f76ac51bd3e99603dcb62876fe to your computer and use it in GitHub Desktop.

Revisions

  1. @Scherlac Scherlac revised this gist Feb 27, 2021. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions get_tds_cert.py
    Original file line number Diff line number Diff line change
    @@ -41,8 +41,8 @@ def recv_tdspacket(sock):
    tdspacket = tdspbuf
    header = {}

    tdspacket += sock.recv(4096)
    for i in range(0,5):
    tdspacket += sock.recv(4096)
    print("\n# get_tdspacket: {}, tdspacket len: {} ".format(i, len(tdspacket)))
    if len(tdspacket) >= 8:
    header = read_header(tdspacket[:8])
    @@ -59,7 +59,7 @@ def recv_tdspacket(sock):
    print("Usage: {} <hostname> <port>".format(sys.argv[0]))
    sys.exit(1)

    hostname = sys.argv[1]
    hostname = sys.argv[1]
    port = int(sys.argv[2])


  2. @lnattrass lnattrass revised this gist Feb 14, 2017. No changes.
  3. @lnattrass lnattrass revised this gist Feb 14, 2017. 1 changed file with 8 additions and 13 deletions.
    21 changes: 8 additions & 13 deletions get_tds_cert.py
    Original file line number Diff line number Diff line change
    @@ -52,24 +52,17 @@ def recv_tdspacket(sock):
    print("# Remaining tdspbuf length: {}\n".format(len(tdspbuf)))
    return header, tdspacket[8:header['length']]



    sleep(0.05)





    # Ensure we have a commandline
    if len(sys.argv) != 3:
    print("Usage: {} <hostname> <port>".format(sys.argv[0]))
    sys.exit(1)

    hostname = sys.argv[1]
    hostname = sys.argv[1]
    port = int(sys.argv[2])



    # Setup SSL
    if hasattr(ssl, 'PROTOCOL_TLS'):
    sslProto = ssl.PROTOCOL_TLS
    @@ -95,9 +88,12 @@ def recv_tdspacket(sock):
    # Send the first TDS PRELOGIN message
    s.send(prelogin_msg)

    # Get the response and ignore. We will try to negotiate encryption anyway.
    # Get the response and ignore. We will try to negotiate encryption anyway.
    header, data = recv_tdspacket(s)

    while header['status']==0:
    header, ext_data = recv_tdspacket(s)
    data += ext_data


    print("# Starting TLS handshake loop..")
    # Craft the packet
    @@ -120,8 +116,7 @@ def recv_tdspacket(sock):
    while header['status']==0:
    header, ext_data = recv_tdspacket(s)
    data += ext_data
    break


    tls_in_buf.write(data)

    print("# Handshake did not complete / exiting")
    print("# Handshake did not complete / exiting")
  4. @lnattrass lnattrass revised this gist Feb 14, 2017. 1 changed file with 75 additions and 43 deletions.
    118 changes: 75 additions & 43 deletions get_tds_cert.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,9 @@
    import sys
    import pprint
    import struct
    import socket
    import ssl
    from time import sleep

    # Standard "HELLO" message for TDS
    prelogin_msg = bytearray([ 0x12, 0x01, 0x00, 0x2f, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x1a, 0x00, 0x06, 0x01, 0x00, 0x20,
    @@ -17,19 +19,64 @@ def prep_header(data):
    data_head = prelogin_head + total_len.to_bytes(2, 'big')
    data_head += bytearray([ 0x00, 0x00, 0x01, 0x00])
    return data_head + data

    def read_header(data):
    if len(data) != 8:
    raise ValueError("prelogin header is > 8-bytes", data)

    format = ">bbhhbb"
    sct = struct.Struct(format)
    unpacked = sct.unpack(data)
    return { "type": unpacked[0],
    "status": unpacked[1],
    "length": unpacked[2],
    "channel": unpacked[3],
    "packet": unpacked[4],
    "window": unpacked[5]
    }

    tdspbuf = bytearray()
    def recv_tdspacket(sock):
    global tdspbuf
    tdspacket = tdspbuf
    header = {}

    tdspacket += sock.recv(4096)
    for i in range(0,5):
    print("\n# get_tdspacket: {}, tdspacket len: {} ".format(i, len(tdspacket)))
    if len(tdspacket) >= 8:
    header = read_header(tdspacket[:8])
    print("# Header: ", header)
    if len(tdspacket) >= header['length']:
    tdspbuf = tdspacket[header['length']:]
    print("# Remaining tdspbuf length: {}\n".format(len(tdspbuf)))
    return header, tdspacket[8:header['length']]



    sleep(0.05)





    # Ensure we have a commandline
    if len(sys.argv) != 3:
    print("Usage: {} <hostname> <port>".format(sys.argv[0]))
    exit(1)
    sys.exit(1)

    hostname = sys.argv[1]
    port = int(sys.argv[2])



    # Setup SSL
    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
    if hasattr(ssl, 'PROTOCOL_TLS'):
    sslProto = ssl.PROTOCOL_TLS
    else:
    sslProto = ssl.PROTOCOL_SSLv23

    sslctx = ssl.SSLContext(sslProto)
    sslctx.check_hostname = False
    tls_in_buf = ssl.MemoryBIO()
    tls_out_buf = ssl.MemoryBIO()
    @@ -40,7 +87,7 @@ def prep_header(data):
    # create an INET, STREAMing socket
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setblocking(0)
    s.settimeout(3)
    s.settimeout(1)

    # Connect to the SQL Server
    s.connect(( hostname, port ))
    @@ -49,47 +96,32 @@ def prep_header(data):
    s.send(prelogin_msg)

    # Get the response and ignore. We will try to negotiate encryption anyway.
    resp = s.recv(2048)
    header, data = recv_tdspacket(s)

    print("# Starting TLS handshake..")
    # Craft the packet
    try:
    tlssock.do_handshake()
    except ssl.SSLWantReadError as err:
    # TLS wants to keep shaking hands, but because we're controlling the R/W buffers it throws an exception
    print("# Shaking.")
    pass

    # Get CLIENT Hello message
    client_hello = tls_out_buf.read()

    # Forward to SQL
    s.sendall(prep_header(client_hello))

    # Get response
    resp = s.recv(2048)

    # Shuffle to the SSL object (ignore Prelogin Header... again..)
    tls_in_buf.write(resp[8:])

    # Keep shaking hands
    try:
    print("# Starting TLS handshake loop..")
    # Craft the packet
    for i in range(0,5):
    try:
    tlssock.do_handshake()
    except ssl.SSLWantReadError as err:
    print("# Handshake completed, dumping certificates")
    peercert = ssl.DER_cert_to_PEM_cert(tlssock.getpeercert(True))
    print(peercert)
    sys.exit(0)
    except ssl.SSLWantReadError as err:
    # TLS wants to keep shaking hands, but because we're controlling the R/W buffers it throws an exception
    print("# Shaking..")
    pass

    # Shuffle data again
    handshake = tls_out_buf.read()
    s.sendall(prep_header(handshake))

    resp = s.recv(2048)
    tls_in_buf.write(resp[8:])

    # If we were successful we should have a good handshake now
    tlssock.do_handshake()
    print("# Handshake completed, dumping certificates")

    peercert = ssl.DER_cert_to_PEM_cert(tlssock.getpeercert(True))
    print(peercert)
    print("# Shaking ({}/5)".format(i))

    tls_data = tls_out_buf.read()
    s.sendall(prep_header(tls_data))
    # TDS Packets can be split over two frames, each with their own headers.
    # We have to concat these for TLS to handle nego properly
    header, data = recv_tdspacket(s)
    while header['status']==0:
    header, ext_data = recv_tdspacket(s)
    data += ext_data
    break

    tls_in_buf.write(data)

    print("# Handshake did not complete / exiting")
  5. @lnattrass lnattrass created this gist Feb 14, 2017.
    95 changes: 95 additions & 0 deletions get_tds_cert.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,95 @@
    import sys
    import pprint
    import socket
    import ssl

    # Standard "HELLO" message for TDS
    prelogin_msg = bytearray([ 0x12, 0x01, 0x00, 0x2f, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x1a, 0x00, 0x06, 0x01, 0x00, 0x20,
    0x00, 0x01, 0x02, 0x00, 0x21, 0x00, 0x01, 0x03, 0x00, 0x22, 0x00, 0x04, 0x04, 0x00, 0x26, 0x00,
    0x01, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ])

    # Prep Header function
    def prep_header(data):
    data_len = len(data)
    prelogin_head = bytearray([ 0x12, 0x01 ])
    header_len = 8
    total_len = header_len + data_len
    data_head = prelogin_head + total_len.to_bytes(2, 'big')
    data_head += bytearray([ 0x00, 0x00, 0x01, 0x00])
    return data_head + data

    # Ensure we have a commandline
    if len(sys.argv) != 3:
    print("Usage: {} <hostname> <port>".format(sys.argv[0]))
    exit(1)

    hostname = sys.argv[1]
    port = int(sys.argv[2])



    # Setup SSL
    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
    sslctx.check_hostname = False
    tls_in_buf = ssl.MemoryBIO()
    tls_out_buf = ssl.MemoryBIO()

    # Create the SSLObj connected to the tls_in_buf and tls_out_buf
    tlssock = sslctx.wrap_bio(tls_in_buf, tls_out_buf)

    # create an INET, STREAMing socket
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setblocking(0)
    s.settimeout(3)

    # Connect to the SQL Server
    s.connect(( hostname, port ))

    # Send the first TDS PRELOGIN message
    s.send(prelogin_msg)

    # Get the response and ignore. We will try to negotiate encryption anyway.
    resp = s.recv(2048)

    print("# Starting TLS handshake..")
    # Craft the packet
    try:
    tlssock.do_handshake()
    except ssl.SSLWantReadError as err:
    # TLS wants to keep shaking hands, but because we're controlling the R/W buffers it throws an exception
    print("# Shaking.")
    pass

    # Get CLIENT Hello message
    client_hello = tls_out_buf.read()

    # Forward to SQL
    s.sendall(prep_header(client_hello))

    # Get response
    resp = s.recv(2048)

    # Shuffle to the SSL object (ignore Prelogin Header... again..)
    tls_in_buf.write(resp[8:])

    # Keep shaking hands
    try:
    tlssock.do_handshake()
    except ssl.SSLWantReadError as err:
    # TLS wants to keep shaking hands, but because we're controlling the R/W buffers it throws an exception
    print("# Shaking..")
    pass

    # Shuffle data again
    handshake = tls_out_buf.read()
    s.sendall(prep_header(handshake))

    resp = s.recv(2048)
    tls_in_buf.write(resp[8:])

    # If we were successful we should have a good handshake now
    tlssock.do_handshake()
    print("# Handshake completed, dumping certificates")

    peercert = ssl.DER_cert_to_PEM_cert(tlssock.getpeercert(True))
    print(peercert)