Skip to content

Instantly share code, notes, and snippets.

@xznhj8129
Created May 25, 2025 13:16
Show Gist options
  • Select an option

  • Save xznhj8129/871198b3b64caf3f11fbd71d35fe80aa to your computer and use it in GitHub Desktop.

Select an option

Save xznhj8129/871198b3b64caf3f11fbd71d35fe80aa to your computer and use it in GitHub Desktop.
Aliexpress E88 drone Optical Flow module protocol reverse engineering
#!/usr/bin/env python3
import serial
import time
import struct
# ------------------
# Configuration
# ------------------
SERIAL_PORT = '/dev/ttyUSB0'
BAUD_RATE = 115200
SYNC_HEADER = b'\x99\x77\x0e' # 3-byte header
FRAME_SIZE = 13 # total bytes per frame
def extract_frames(buffer: bytes) -> tuple[list[bytes], bytes]:
"""
Extract all 13-byte frames that start with 99 77 0E from 'buffer'.
Returns (list_of_frames, leftover_bytes).
"""
frames = []
i = 0
while i <= len(buffer) - FRAME_SIZE:
if buffer[i:i+3] == SYNC_HEADER:
candidate = buffer[i:i+FRAME_SIZE]
frames.append(candidate)
i += FRAME_SIZE
else:
i += 1
leftover = buffer[i:]
return frames, leftover
def decode_frame(frame: bytes):
"""
Attempt to interpret the 13-byte frame as:
header[0..2] = 0x99 0x77 0x0e
data[3..4] = deltaX (signed 16-bit, little-endian)
data[5..6] = deltaY (signed 16-bit, little-endian)
data[7] = quality? (8-bit)
data[8..12] = always zero in your logs
Returns a dict with the decoded fields.
"""
# sanity check: must be 13 bytes total
assert len(frame) == FRAME_SIZE, "Frame is not 13 bytes"
xr = frame[3:5] # 2 bytes
yr = frame[5:7] # 2 bytes
d3 = frame[7] # 1 byte
qual = d3 / 255.0
# Little-endian signed 16:
(x,) = struct.unpack('<h', xr)
(y,) = struct.unpack('<h', yr)
xw = x * qual
yw = y * qual
return [x, y, d3]
def main():
print(f"Opening {SERIAL_PORT} at {BAUD_RATE} baud...\n")
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1)
buffer = b''
xsum = 0
ysum = 0
print("Reading data and decoding frames (Ctrl+C to stop)...\n")
try:
while True:
chunk = ser.read(ser.in_waiting or 1)
if chunk:
buffer += chunk
frames, buffer = extract_frames(buffer)
for f in frames:
decoded = decode_frame(f)
hex_frame = ' '.join(f'{b:02X}' for b in f)
xsum+=decoded[0]
ysum+=decoded[1]
print(f" Decoded:\t X: {decoded[0]}\t "
f"Y: {decoded[1]}\t\t "
f"Xsum: {xsum}\t\t "
f"Ysum: {ysum}\t\t "
f"?: {decoded[2]}\t ")
time.sleep(0.01)
except KeyboardInterrupt:
print("\nUser interrupted. Stopping...\n")
finally:
if ser.is_open:
ser.close()
print("Serial port closed.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment