#!/usr/bin/env python3 import os import pty import select import socket import serial import threading import termios import fcntl import sys import time # === CONFIG === TCP_HOST = "host.docker.internal" TCP_PORT = 33329 BAUDRATE = 115200 PTY_NAME = "/dev/ttyS0" def create_virtual_serial(): master_fd, slave_fd = pty.openpty() pty_name = os.ttyname(slave_fd) # Force PTY name if needed (e.g., /dev/ttyESP32 symlink) try: if os.path.exists(PTY_NAME): os.remove(PTY_NAME) os.symlink(pty_name, PTY_NAME) print(f"[client] Linked PTY: {PTY_NAME} → {pty_name}") except Exception as e: print(f"[client] Failed to link PTY: {e}") return None, None, None return master_fd, slave_fd, pty_name def proxy_loop(master_fd, ser): print("[client] Starting proxy loop...") try: while True: rlist, _, _ = select.select([master_fd, ser], [], []) if master_fd in rlist: data = os.read(master_fd, 1024) if data: ser.write(data) if ser in rlist: data = ser.read(ser.in_waiting or 1) if data: os.write(master_fd, data) except Exception as e: print(f"[client] Proxy loop error: {e}") def monitor_control_lines(master_fd, ser): print("[client] Monitoring DTR/RTS...") prev_dtr = None prev_rts = None while True: try: # Read current state from the PTY buf = fcntl.ioctl(master_fd, termios.TIOCMGET, b"\x00\x00\x00\x00") state = int.from_bytes(buf, byteorder='little') dtr = bool(state & termios.TIOCM_DTR) rts = bool(state & termios.TIOCM_RTS) if dtr != prev_dtr: ser.dtr = dtr print(f"[client] DTR changed: {dtr}") prev_dtr = dtr if rts != prev_rts: ser.rts = rts print(f"[client] RTS changed: {rts}") prev_rts = rts time.sleep(0.05) except Exception as e: print(f"[client] Error monitoring DTR/RTS: {e}") break def main(): print(f"[client] Connecting to {TCP_HOST}:{TCP_PORT}...") try: ser = serial.serial_for_url(f"socket://{TCP_HOST}:{TCP_PORT}", baudrate=BAUDRATE, timeout=0) except Exception as e: print(f"[client] Could not open TCP socket: {e}") return master_fd, slave_fd, pty_name = create_virtual_serial() if master_fd is None: return proxy_thread = threading.Thread(target=proxy_loop, args=(master_fd, ser), daemon=True) control_thread = threading.Thread(target=monitor_control_lines, args=(master_fd, ser), daemon=True) proxy_thread.start() control_thread.start() proxy_thread.join() control_thread.join() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[client] Exiting.")