from odrive.pyfibre.fibre.libfibre import * import select tasks = [] # time, callback, args... now = time.perf_counter # super stupid "event loop" implementation def spin(): for _ in range(10): delete = [] for i, task in enumerate(tasks.copy()): when, callback, *args = task if when <= now(): # print("-->", callback) callback(*args) # print("<--") delete.append(i) for i in reversed(delete): del tasks[i] # "coroutine" that watches file descriptior for read/write access def watch_fd(fd, mode, callback, ctx): if mode == "read": rlist, _, _ = select.select([fd], [], [], 0) if rlist: print(">>> read", fd) tasks.append((now(), callback, ctx, 1)) # return if mode == "write": _, wlist, _ = select.select([], [fd], [], 0) if wlist: print(">>> write", fd) tasks.append((now(), callback, ctx, 4)) # return tasks.append((now(), watch_fd, fd, mode, callback, ctx)) # debug decorator def logcall(func): def deco(*args): print("[call]", func.__name__, *args) return func(*args) return deco # ========================= # libfibre callbacks impl # ========================= @logcall def _post(callback, ctx): tasks.append((now(), callback, ctx)) return 0 @logcall def _register_event(event_fd, events, callback, ctx): assert not events & 0xfffffffa if events & 1: tasks.append((now(), watch_fd, event_fd, "read", callback, ctx)) if events & 4: tasks.append((now(), watch_fd, event_fd, "write", callback, ctx)) return 0 @logcall def _deregister_event(event_fd): return 0 @logcall def _call_later(delay, callback, ctx): return 0 @logcall def _cancel_timer(timer_id): return 0 @logcall def _on_found_object(ctx, obj, intf): pass @logcall def _on_lost_object(ctx, obj): pass @logcall def _on_discovery_stopped(ctx, result): pass @logcall def _on_attribute_added(ctx, attr, name, name_length, subintf, subintf_name, subintf_name_length): pass @logcall def _on_attribute_removed(ctx, attr): pass @logcall def _on_function_added(ctx, func, name, name_length, input_names, input_codecs, output_names, output_codecs): pass @logcall def _on_function_removed(ctx, func): pass @logcall def _on_call_completed(ctx, status, tx_end, rx_end, tx_buf, tx_len, rx_buf, rx_len): pass c_post = PostSignature(_post) c_register_event = RegisterEventSignature(_register_event) c_deregister_event = DeregisterEventSignature(_deregister_event) c_call_later = CallLaterSignature(_call_later) c_cancel_timer = CancelTimerSignature(_cancel_timer) c_on_found_object = OnFoundObjectSignature(_on_found_object) c_on_lost_object = OnLostObjectSignature(_on_lost_object) c_on_discovery_stopped = OnStoppedSignature(_on_discovery_stopped) c_on_attribute_added = OnAttributeAddedSignature(_on_attribute_added) c_on_attribute_removed = OnAttributeRemovedSignature(_on_attribute_removed) c_on_function_added = OnFunctionAddedSignature(_on_function_added) c_on_function_removed = OnFunctionRemovedSignature(_on_function_removed) c_on_call_completed = OnCallCompletedSignature(_on_call_completed) event_loop = LibFibreEventLoop() event_loop.post = c_post event_loop.register_event = c_register_event event_loop.deregister_event = c_deregister_event event_loop.call_later = c_call_later event_loop.cancel_timer = c_cancel_timer # entrypoint ctx = c_void_p(libfibre_open(event_loop)) assert ctx spin() path = 'usb:idVendor=0x1209,idProduct=0x0D32,bInterfaceClass=0,bInterfaceSubClass=1,bInterfaceProtocol=0' buf = path.encode('ascii') domain_handle = libfibre_open_domain(ctx, buf, len(buf)) print(domain_handle) spin() discovery_handle = c_void_p(0) discovery_id = 0 libfibre_start_discovery(domain_handle, byref(discovery_handle), c_on_found_object, c_on_lost_object, c_on_discovery_stopped, discovery_id) # "event loop" while True: spin() time.sleep(0.001)