Skip to content

Instantly share code, notes, and snippets.

@jpic
Last active April 8, 2025 08:19
Show Gist options
  • Save jpic/6b9df2a08906eda4dbf7057bdd055179 to your computer and use it in GitHub Desktop.
Save jpic/6b9df2a08906eda4dbf7057bdd055179 to your computer and use it in GitHub Desktop.

Revisions

  1. jpic revised this gist Apr 8, 2025. 2 changed files with 107 additions and 55 deletions.
    55 changes: 0 additions & 55 deletions proxy.py
    Original file line number Diff line number Diff line change
    @@ -1,55 +0,0 @@
    # Save as socks_server.py on your LOCAL machine
    import socket
    import threading
    from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler

    class SOCKSProxyHandler(StreamRequestHandler):
    def handle(self):
    # SOCKS5 greeting (no authentication)
    self.connection.recv(2) # Version and number of methods
    self.connection.send(b'\x05\x00') # Version 5, no auth

    # Request details
    data = self.connection.recv(4)
    if len(data) < 4 or data[0] != 5:
    return
    cmd, _, atyp = data[1], data[2], data[3]

    if cmd != 1: # Only support CONNECT
    self.connection.send(b'\x05\x07\x00\x01') # Command not supported
    return

    # Get destination address
    if atyp == 1: # IPv4
    dest_addr = socket.inet_ntoa(self.connection.recv(4))
    elif atyp == 3: # Domain name
    addr_len = self.connection.recv(1)[0]
    dest_addr = self.connection.recv(addr_len).decode('utf-8')
    else:
    return
    dest_port = int.from_bytes(self.connection.recv(2), 'big')

    # Connect to destination
    remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    remote.connect((dest_addr, dest_port))
    self.connection.send(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') # Success
    # Relay data
    def forward(src, dst):
    while True:
    data = src.recv(4096)
    if not data:
    break
    dst.send(data)
    threading.Thread(target=forward, args=(self.connection, remote)).start()
    forward(remote, self.connection)
    except Exception:
    self.connection.send(b'\x05\x04\x00\x01') # Host unreachable

    class ThreadingTCPServer(ThreadingMixIn, TCPServer):
    pass

    if __name__ == "__main__":
    server = ThreadingTCPServer(('127.0.0.1', 1080), SOCKSProxyHandler)
    print("SOCKS proxy running on localhost:1080")
    server.serve_forever()
    107 changes: 107 additions & 0 deletions socks.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,107 @@
    # Save as proxy2.py on your LOCAL machine
    import socket
    import threading
    from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler

    class SOCKSProxyHandler(StreamRequestHandler):
    def handle(self):
    print("New connection")
    # Read initial byte to determine SOCKS version
    version = self.connection.recv(1)
    print(f"Received version: {version}")

    if version == b'\x04': # SOCKS4
    self.handle_socks4()
    elif version == b'\x05': # SOCKS5
    self.handle_socks5()
    else:
    print("Unsupported SOCKS version")
    return

    def handle_socks4(self):
    print("Handling SOCKS4")
    cmd = self.connection.recv(1) # CONNECT (0x01)
    if cmd != b'\x01':
    print("SOCKS4: Only CONNECT supported")
    return
    port = int.from_bytes(self.connection.recv(2), 'big')
    addr = socket.inet_ntoa(self.connection.recv(4))
    user_id = b''
    while True: # Read until null-terminated user ID
    byte = self.connection.recv(1)
    if byte == b'\x00':
    break
    user_id += byte
    print(f"SOCKS4: Connecting to {addr}:{port}, user_id={user_id.decode('ascii')}")

    remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    remote.connect((addr, port))
    # SOCKS4 reply: success (0x5A)
    self.connection.send(b'\x00\x5A' + self.connection.recv(2) + socket.inet_aton('0.0.0.0'))
    self.forward_data(remote)
    except Exception as e:
    print(f"SOCKS4 error: {e}")
    self.connection.send(b'\x00\x5B' + self.connection.recv(2) + socket.inet_aton('0.0.0.0')) # Failed
    finally:
    remote.close()

    def handle_socks5(self):
    print("Handling SOCKS5")
    # SOCKS5 greeting
    nmethods = self.connection.recv(1)[0]
    methods = self.connection.recv(nmethods)
    print(f"Auth methods: {methods}")
    self.connection.send(b'\x05\x00') # No authentication

    # SOCKS5 request
    request = self.connection.recv(4)
    print(f"Received request: {request}")
    if len(request) < 4 or request[0] != 5 or request[1] != 1:
    print("Invalid SOCKS5 request")
    self.connection.send(b'\x05\x07\x00\x01') # Command not supported
    return

    atyp = request[3]
    if atyp == 1: # IPv4
    dest_addr = socket.inet_ntoa(self.connection.recv(4))
    elif atyp == 3: # Domain name
    addr_len = self.connection.recv(1)[0]
    dest_addr = self.connection.recv(addr_len).decode('utf-8')
    else:
    print("Unsupported address type")
    self.connection.send(b'\x05\x08\x00\x01')
    return

    dest_port = int.from_bytes(self.connection.recv(2), 'big')
    print(f"SOCKS5: Connecting to {dest_addr}:{dest_port}")

    remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    remote.connect((dest_addr, dest_port))
    self.connection.send(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') # Success
    self.forward_data(remote)
    except Exception as e:
    print(f"SOCKS5 error: {e}")
    self.connection.send(b'\x05\x04\x00\x01') # Host unreachable
    finally:
    remote.close()

    def forward_data(self, remote):
    print("Forwarding data")
    def forward(src, dst):
    while True:
    data = src.recv(4096)
    if not data:
    break
    dst.send(data)
    threading.Thread(target=forward, args=(self.connection, remote)).start()
    forward(remote, self.connection)

    class ThreadingTCPServer(ThreadingMixIn, TCPServer):
    pass

    if __name__ == "__main__":
    server = ThreadingTCPServer(('127.0.0.1', 1337), SOCKSProxyHandler)
    print("SOCKS proxy running on localhost:1337")
    server.serve_forever()
  2. jpic revised this gist Apr 8, 2025. 1 changed file with 48 additions and 63 deletions.
    111 changes: 48 additions & 63 deletions proxy.py
    Original file line number Diff line number Diff line change
    @@ -1,70 +1,55 @@
    # Save as socks_server.py on your LOCAL machine
    import socket
    import threading
    from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler

    # Configuration
    PROXY_HOST = '127.0.0.1' # Proxy runs locally
    PROXY_PORT = 8888 # Port to listen on
    BUFFER_SIZE = 4096 # Size of data chunks

    def handle_client(client_socket):
    # Receive the request from the client
    request = client_socket.recv(BUFFER_SIZE)

    if not request:
    client_socket.close()
    return

    # Parse the first line of the request to get method, host, and port
    first_line = request.split(b'\n')[0]
    method, url, protocol = first_line.split(b' ')

    # Extract host and port from the request
    host_start = request.find(b'Host: ') + 6
    host_end = request.find(b'\r\n', host_start)
    host_line = request[host_start:host_end].decode('utf-8')

    host = host_line.split(':')[0]
    port = int(host_line.split(':')[1]) if ':' in host_line else 80

    # Connect to the target server
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    server_socket.connect((host, port))
    server_socket.send(request) # Forward the client's request
    class SOCKSProxyHandler(StreamRequestHandler):
    def handle(self):
    # SOCKS5 greeting (no authentication)
    self.connection.recv(2) # Version and number of methods
    self.connection.send(b'\x05\x00') # Version 5, no auth

    # Relay the server's response back to the client
    while True:
    response = server_socket.recv(BUFFER_SIZE)
    if len(response) > 0:
    client_socket.send(response) # Send response to client
    else:
    break # No more data, exit loop

    except Exception as e:
    print(f"Error: {e}")

    finally:
    # Clean up sockets
    server_socket.close()
    client_socket.close()

    def start_proxy():
    # Create the proxy server socket
    proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    proxy_socket.bind((PROXY_HOST, PROXY_PORT))
    proxy_socket.listen(10) # Allow up to 10 queued connections

    print(f"Proxy server running on {PROXY_HOST}:{PROXY_PORT}")

    while True:
    # Accept incoming client connections
    client_socket, addr = proxy_socket.accept()
    print(f"Connection from {addr}")
    # Request details
    data = self.connection.recv(4)
    if len(data) < 4 or data[0] != 5:
    return
    cmd, _, atyp = data[1], data[2], data[3]

    if cmd != 1: # Only support CONNECT
    self.connection.send(b'\x05\x07\x00\x01') # Command not supported
    return

    # Handle each client in a separate thread
    client_thread = threading.Thread(target=handle_client, args=(client_socket,))
    client_thread.start()
    # Get destination address
    if atyp == 1: # IPv4
    dest_addr = socket.inet_ntoa(self.connection.recv(4))
    elif atyp == 3: # Domain name
    addr_len = self.connection.recv(1)[0]
    dest_addr = self.connection.recv(addr_len).decode('utf-8')
    else:
    return
    dest_port = int.from_bytes(self.connection.recv(2), 'big')

    # Connect to destination
    remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    remote.connect((dest_addr, dest_port))
    self.connection.send(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') # Success
    # Relay data
    def forward(src, dst):
    while True:
    data = src.recv(4096)
    if not data:
    break
    dst.send(data)
    threading.Thread(target=forward, args=(self.connection, remote)).start()
    forward(remote, self.connection)
    except Exception:
    self.connection.send(b'\x05\x04\x00\x01') # Host unreachable

    class ThreadingTCPServer(ThreadingMixIn, TCPServer):
    pass

    if __name__ == "__main__":
    start_proxy()
    server = ThreadingTCPServer(('127.0.0.1', 1080), SOCKSProxyHandler)
    print("SOCKS proxy running on localhost:1080")
    server.serve_forever()
  3. jpic created this gist Apr 8, 2025.
    70 changes: 70 additions & 0 deletions proxy.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    import socket
    import threading

    # Configuration
    PROXY_HOST = '127.0.0.1' # Proxy runs locally
    PROXY_PORT = 8888 # Port to listen on
    BUFFER_SIZE = 4096 # Size of data chunks

    def handle_client(client_socket):
    # Receive the request from the client
    request = client_socket.recv(BUFFER_SIZE)

    if not request:
    client_socket.close()
    return

    # Parse the first line of the request to get method, host, and port
    first_line = request.split(b'\n')[0]
    method, url, protocol = first_line.split(b' ')

    # Extract host and port from the request
    host_start = request.find(b'Host: ') + 6
    host_end = request.find(b'\r\n', host_start)
    host_line = request[host_start:host_end].decode('utf-8')

    host = host_line.split(':')[0]
    port = int(host_line.split(':')[1]) if ':' in host_line else 80

    # Connect to the target server
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    server_socket.connect((host, port))
    server_socket.send(request) # Forward the client's request

    # Relay the server's response back to the client
    while True:
    response = server_socket.recv(BUFFER_SIZE)
    if len(response) > 0:
    client_socket.send(response) # Send response to client
    else:
    break # No more data, exit loop

    except Exception as e:
    print(f"Error: {e}")

    finally:
    # Clean up sockets
    server_socket.close()
    client_socket.close()

    def start_proxy():
    # Create the proxy server socket
    proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    proxy_socket.bind((PROXY_HOST, PROXY_PORT))
    proxy_socket.listen(10) # Allow up to 10 queued connections

    print(f"Proxy server running on {PROXY_HOST}:{PROXY_PORT}")

    while True:
    # Accept incoming client connections
    client_socket, addr = proxy_socket.accept()
    print(f"Connection from {addr}")

    # Handle each client in a separate thread
    client_thread = threading.Thread(target=handle_client, args=(client_socket,))
    client_thread.start()

    if __name__ == "__main__":
    start_proxy()