Skip to content

Instantly share code, notes, and snippets.

@mikelgg93
Last active October 23, 2025 09:55
Show Gist options
  • Select an option

  • Save mikelgg93/d629a77cce9543ef43c30fd8b821c95e to your computer and use it in GitHub Desktop.

Select an option

Save mikelgg93/d629a77cce9543ef43c30fd8b821c95e to your computer and use it in GitHub Desktop.
Run YOLO segmentation in real-time over your Neon' scene camera stream with gaze.
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "opencv-python",
# "pupil-labs-realtime-api",
# "ultralytics",
# "click",
# "lap",
# ]
# ///
import click
import cv2
from pupil_labs.realtime_api.simple import Device, discover_one_device
from ultralytics import YOLO
class DetectMode(click.ParamType):
"""Custom Click type for validating the --detect argument.
It accepts 'all', 'gazed', or a comma-separated list of integers.
The conversion returns a tuple: (mode_string, class_list_or_None).
"""
name = "detect_mode"
def convert(self, value, param, ctx):
val_lower = str(value).lower()
if val_lower in ("all", "gazed"):
return (val_lower, None)
try:
classes = [int(p.strip()) for p in value.split(",")]
if not classes and value.strip() != "0":
self.fail(
f"'{value}' is not a valid non-empty list of class IDs.", param, ctx
)
except ValueError:
self.fail(
f"""Value '{value}' for --plot is not 'all', 'gazed', or a
comma-separated list of integers.""",
param,
ctx,
)
return ("specific", classes)
@click.command()
@click.option(
"--model",
type=click.Choice(["n", "s", "m"]),
default="n",
help="Specify YOLO model variant: 'n' (nano), 's' (small), 'm' (medium).",
)
@click.option("--track", is_flag=True, help="Enable YOLO tracking.")
@click.option(
"--detect",
default="gazed",
type=DetectMode(),
help="Plotting mode: 'gazed', 'all', or comma-separated specific class IDs from"
"coco.yaml (e.g., '0,15,16').",
)
@click.option("--ip", default=None, help="IP address of the Pupil Labs device.")
@click.option("--port", default=8080, help="Port of the Pupil Labs device.")
def main(model, track, detect, ip, port):
"""YOLO-based detection over Neon streaming."""
detector = YOLO(f"yolo11{model}-seg.pt")
device = None
if ip is None:
print("IP address not provided. Attempting to discover device via mDNS...")
try:
device = discover_one_device(max_search_duration_seconds=10)
if device:
print(f"Discovered device: {device}")
except Exception as e:
print(f"mDNS discovery failed: {e}")
else:
print(f"Attempting to connect to device at {ip}:{port}...")
try:
device = Device(address=ip, port=port)
except Exception as e:
print(f"Failed to connect to device at {ip}:{port}. Error: {e}")
if device is None:
print(
"Could not find or connect to a device. "
"Please check the connection or provide a valid IP address using --ip."
)
raise SystemExit(-1)
print(f"Connecting to {device}...")
mode, target_classes = detect
print(f"Detection mode: {mode}, Target classes: {target_classes}")
try:
while True:
matched = device.receive_matched_scene_and_eyes_video_frames_and_gaze()
if not matched:
print(
"Not able to find a match! Note: Pupil Invisible does not support "
"streaming eyes video"
)
continue
gaze_x = int(matched.gaze.x)
gaze_y = int(matched.gaze.y)
if track:
detection = detector.track(
matched.scene.bgr_pixels, classes=target_classes
)
else:
detection = detector(matched.scene.bgr_pixels, classes=target_classes)
frame = matched.scene.bgr_pixels.copy()
if mode in ["all", "specific"]:
frame = detection[0].plot()
elif mode == "gazed":
for result in detection[0]:
if result.boxes:
for box in result.boxes.xyxy:
x1, y1, x2, y2 = box
if x1 < gaze_x < x2 and y1 < gaze_y < y2:
frame = result.plot()
cv2.circle(
frame,
(int(matched.gaze.x), int(matched.gaze.y)),
radius=10,
color=(0, 0, 255),
thickness=5,
)
# Render eyes video into the scene video
if matched.eyes is not None and matched.eyes.bgr_pixels is not None:
height, width, _ = matched.eyes.bgr_pixels.shape
frame[0:height, 0:width] = matched.eyes.bgr_pixels
cv2.imshow("Scene camera with eyes and gaze overlay", frame)
if cv2.waitKey(1) & 0xFF == 27: # Press ESC to exit
break
except KeyboardInterrupt:
print("\nScript interrupted by user.")
finally:
print("Stopping...")
if device:
device.close() # Explicitly stop auto-update
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment