Last active
October 23, 2025 09:55
-
-
Save mikelgg93/d629a77cce9543ef43c30fd8b821c95e to your computer and use it in GitHub Desktop.
Run YOLO segmentation in real-time over your Neon' scene camera stream with gaze.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "opencv-python", | |
| # "pupil-labs-realtime-api", | |
| # "ultralytics", | |
| # "click", | |
| # "lap", | |
| # ] | |
| # /// | |
| import click | |
| import cv2 | |
| from pupil_labs.realtime_api.simple import Device, discover_one_device | |
| from ultralytics import YOLO | |
| class DetectMode(click.ParamType): | |
| """Custom Click type for validating the --detect argument. | |
| It accepts 'all', 'gazed', or a comma-separated list of integers. | |
| The conversion returns a tuple: (mode_string, class_list_or_None). | |
| """ | |
| name = "detect_mode" | |
| def convert(self, value, param, ctx): | |
| val_lower = str(value).lower() | |
| if val_lower in ("all", "gazed"): | |
| return (val_lower, None) | |
| try: | |
| classes = [int(p.strip()) for p in value.split(",")] | |
| if not classes and value.strip() != "0": | |
| self.fail( | |
| f"'{value}' is not a valid non-empty list of class IDs.", param, ctx | |
| ) | |
| except ValueError: | |
| self.fail( | |
| f"""Value '{value}' for --plot is not 'all', 'gazed', or a | |
| comma-separated list of integers.""", | |
| param, | |
| ctx, | |
| ) | |
| return ("specific", classes) | |
| @click.command() | |
| @click.option( | |
| "--model", | |
| type=click.Choice(["n", "s", "m"]), | |
| default="n", | |
| help="Specify YOLO model variant: 'n' (nano), 's' (small), 'm' (medium).", | |
| ) | |
| @click.option("--track", is_flag=True, help="Enable YOLO tracking.") | |
| @click.option( | |
| "--detect", | |
| default="gazed", | |
| type=DetectMode(), | |
| help="Plotting mode: 'gazed', 'all', or comma-separated specific class IDs from" | |
| "coco.yaml (e.g., '0,15,16').", | |
| ) | |
| @click.option("--ip", default=None, help="IP address of the Pupil Labs device.") | |
| @click.option("--port", default=8080, help="Port of the Pupil Labs device.") | |
| def main(model, track, detect, ip, port): | |
| """YOLO-based detection over Neon streaming.""" | |
| detector = YOLO(f"yolo11{model}-seg.pt") | |
| device = None | |
| if ip is None: | |
| print("IP address not provided. Attempting to discover device via mDNS...") | |
| try: | |
| device = discover_one_device(max_search_duration_seconds=10) | |
| if device: | |
| print(f"Discovered device: {device}") | |
| except Exception as e: | |
| print(f"mDNS discovery failed: {e}") | |
| else: | |
| print(f"Attempting to connect to device at {ip}:{port}...") | |
| try: | |
| device = Device(address=ip, port=port) | |
| except Exception as e: | |
| print(f"Failed to connect to device at {ip}:{port}. Error: {e}") | |
| if device is None: | |
| print( | |
| "Could not find or connect to a device. " | |
| "Please check the connection or provide a valid IP address using --ip." | |
| ) | |
| raise SystemExit(-1) | |
| print(f"Connecting to {device}...") | |
| mode, target_classes = detect | |
| print(f"Detection mode: {mode}, Target classes: {target_classes}") | |
| try: | |
| while True: | |
| matched = device.receive_matched_scene_and_eyes_video_frames_and_gaze() | |
| if not matched: | |
| print( | |
| "Not able to find a match! Note: Pupil Invisible does not support " | |
| "streaming eyes video" | |
| ) | |
| continue | |
| gaze_x = int(matched.gaze.x) | |
| gaze_y = int(matched.gaze.y) | |
| if track: | |
| detection = detector.track( | |
| matched.scene.bgr_pixels, classes=target_classes | |
| ) | |
| else: | |
| detection = detector(matched.scene.bgr_pixels, classes=target_classes) | |
| frame = matched.scene.bgr_pixels.copy() | |
| if mode in ["all", "specific"]: | |
| frame = detection[0].plot() | |
| elif mode == "gazed": | |
| for result in detection[0]: | |
| if result.boxes: | |
| for box in result.boxes.xyxy: | |
| x1, y1, x2, y2 = box | |
| if x1 < gaze_x < x2 and y1 < gaze_y < y2: | |
| frame = result.plot() | |
| cv2.circle( | |
| frame, | |
| (int(matched.gaze.x), int(matched.gaze.y)), | |
| radius=10, | |
| color=(0, 0, 255), | |
| thickness=5, | |
| ) | |
| # Render eyes video into the scene video | |
| if matched.eyes is not None and matched.eyes.bgr_pixels is not None: | |
| height, width, _ = matched.eyes.bgr_pixels.shape | |
| frame[0:height, 0:width] = matched.eyes.bgr_pixels | |
| cv2.imshow("Scene camera with eyes and gaze overlay", frame) | |
| if cv2.waitKey(1) & 0xFF == 27: # Press ESC to exit | |
| break | |
| except KeyboardInterrupt: | |
| print("\nScript interrupted by user.") | |
| finally: | |
| print("Stopping...") | |
| if device: | |
| device.close() # Explicitly stop auto-update | |
| cv2.destroyAllWindows() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment