#!/usr/bin/env python3 import sys, pty, os, time, threading, queue, argparse ap = argparse.ArgumentParser() parser = argparse.ArgumentParser( description='Tests the input latency of a shell', usage='usage: shell-latency-test-infinite.py [--sleep] [--] argv-to-execute...', ) parser.add_argument('--sleep', action='store_true', help='''only type once every 1s instead of constantly - results in higher observed latency, probably due to OS overhead''') parser.add_argument('argv_to_execute', nargs='+', help='shell command to execute') args = parser.parse_args() # Spawn the shell in a pty which we own the controlling end of. child_pid, fd = pty.fork() if child_pid == 0: # This is the child process. Exec the shell. os.execvp(args.argv_to_execute[0], args.argv_to_execute) raise Exception("execvp failed") # Otherwise, we're the parent. data_queue_main = queue.SimpleQueue() data_queue_position_requests = queue.SimpleQueue() data_queues = [data_queue_main, data_queue_position_requests] def read_thread(): '''Constantly read input from the shell and send it to each of the queues.''' while new_data := os.read(fd, 1048576): for q in data_queues: q.put(new_data) threading.Thread(target=read_thread, daemon=True).start() def position_request_thread(): '''Handle position requests.''' buf = b'' while True: new_data = data_queue_position_requests.get() buf = buf[-3:] + new_data for _ in range(buf.count(b'\x1b[6n')): os.write(fd, b'\x1b[1;1R') threading.Thread(target=position_request_thread, daemon=True).start() def join_queue(q): '''Remove all bytes objects currently in the queue and return them concatenated.''' ret = b'' while True: try: ret += q.get_nowait() except queue.Empty: break return ret # Wait one second to ensure the shell is done initializing. time.sleep(1) startup_data = join_queue(data_queue_main) print('startup:', startup_data) durations = [] durations_clear_time = time.time() i = 0 while True: # Did we get any extra data, suggesting the below assumption is violated? junk = join_queue(data_queue_main) if junk: print('junk:', junk) # Alternately input 'a' and input a backspace to erase the 'a'. os.write(fd, b'\b' if i & 1 else b'a') # The timing isn't exact (e.g. there might be a delay between the write # and calculating the time here), but any error is tens of microseconds # at most. time_pre = time.time() # Assume the first read contains the full response to the input and the # shell won't be sending anything more. If we didn't assume this, we # couldn't immediately continue with more keystrokes. new_data = data_queue_main.get() duration_us = (time.time() - time_pre) * 1_000_000 durations.append(duration_us) if args.sleep: time.sleep(1) total_duration = time.time() - durations_clear_time if total_duration >= 1.0 or args.sleep: average_duration_us = sum(durations) / len(durations) print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us') durations = [] durations_clear_time = time.time() i += 1