Skip to content

Instantly share code, notes, and snippets.

@xznhj8129
Created May 25, 2025 13:16
Show Gist options
  • Select an option

  • Save xznhj8129/871198b3b64caf3f11fbd71d35fe80aa to your computer and use it in GitHub Desktop.

Select an option

Save xznhj8129/871198b3b64caf3f11fbd71d35fe80aa to your computer and use it in GitHub Desktop.

Revisions

  1. xznhj8129 created this gist May 25, 2025.
    98 changes: 98 additions & 0 deletions ali_opflow.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,98 @@
    #!/usr/bin/env python3

    import serial
    import time
    import struct

    # ------------------
    # Configuration
    # ------------------
    SERIAL_PORT = '/dev/ttyUSB0'
    BAUD_RATE = 115200

    SYNC_HEADER = b'\x99\x77\x0e' # 3-byte header
    FRAME_SIZE = 13 # total bytes per frame

    def extract_frames(buffer: bytes) -> tuple[list[bytes], bytes]:
    """
    Extract all 13-byte frames that start with 99 77 0E from 'buffer'.
    Returns (list_of_frames, leftover_bytes).
    """
    frames = []
    i = 0
    while i <= len(buffer) - FRAME_SIZE:
    if buffer[i:i+3] == SYNC_HEADER:
    candidate = buffer[i:i+FRAME_SIZE]
    frames.append(candidate)
    i += FRAME_SIZE
    else:
    i += 1
    leftover = buffer[i:]
    return frames, leftover

    def decode_frame(frame: bytes):
    """
    Attempt to interpret the 13-byte frame as:
    header[0..2] = 0x99 0x77 0x0e
    data[3..4] = deltaX (signed 16-bit, little-endian)
    data[5..6] = deltaY (signed 16-bit, little-endian)
    data[7] = quality? (8-bit)
    data[8..12] = always zero in your logs
    Returns a dict with the decoded fields.
    """
    # sanity check: must be 13 bytes total
    assert len(frame) == FRAME_SIZE, "Frame is not 13 bytes"

    xr = frame[3:5] # 2 bytes
    yr = frame[5:7] # 2 bytes
    d3 = frame[7] # 1 byte
    qual = d3 / 255.0

    # Little-endian signed 16:
    (x,) = struct.unpack('<h', xr)
    (y,) = struct.unpack('<h', yr)

    xw = x * qual
    yw = y * qual

    return [x, y, d3]

    def main():
    print(f"Opening {SERIAL_PORT} at {BAUD_RATE} baud...\n")
    ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1)

    buffer = b''
    xsum = 0
    ysum = 0
    print("Reading data and decoding frames (Ctrl+C to stop)...\n")
    try:
    while True:
    chunk = ser.read(ser.in_waiting or 1)
    if chunk:
    buffer += chunk
    frames, buffer = extract_frames(buffer)
    for f in frames:
    decoded = decode_frame(f)

    hex_frame = ' '.join(f'{b:02X}' for b in f)
    xsum+=decoded[0]
    ysum+=decoded[1]
    print(f" Decoded:\t X: {decoded[0]}\t "
    f"Y: {decoded[1]}\t\t "
    f"Xsum: {xsum}\t\t "
    f"Ysum: {ysum}\t\t "
    f"?: {decoded[2]}\t ")

    time.sleep(0.01)

    except KeyboardInterrupt:
    print("\nUser interrupted. Stopping...\n")

    finally:
    if ser.is_open:
    ser.close()
    print("Serial port closed.")

    if __name__ == "__main__":
    main()