Created
January 13, 2024 22:44
-
-
Save AndBondStyle/76170571d28412a8ca2f8afbe06c48c1 to your computer and use it in GitHub Desktop.
Revisions
-
AndBondStyle created this gist
Jan 13, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,148 @@ from odrive.pyfibre.fibre.libfibre import * import select tasks = [] # time, callback, args... now = time.perf_counter # super stupid "event loop" implementation def spin(): for _ in range(10): delete = [] for i, task in enumerate(tasks.copy()): when, callback, *args = task if when <= now(): # print("-->", callback) callback(*args) # print("<--") delete.append(i) for i in reversed(delete): del tasks[i] # "coroutine" that watches file descriptior for read/write access def watch_fd(fd, mode, callback, ctx): if mode == "read": rlist, _, _ = select.select([fd], [], [], 0) if rlist: print(">>> read", fd) tasks.append((now(), callback, ctx, 1)) # return if mode == "write": _, wlist, _ = select.select([], [fd], [], 0) if wlist: print(">>> write", fd) tasks.append((now(), callback, ctx, 4)) # return tasks.append((now(), watch_fd, fd, mode, callback, ctx)) # debug decorator def logcall(func): def deco(*args): print("[call]", func.__name__, *args) return func(*args) return deco # ========================= # libfibre callbacks impl # ========================= @logcall def _post(callback, ctx): tasks.append((now(), callback, ctx)) return 0 @logcall def _register_event(event_fd, events, callback, ctx): assert not events & 0xfffffffa if events & 1: tasks.append((now(), watch_fd, event_fd, "read", callback, ctx)) if events & 4: tasks.append((now(), watch_fd, event_fd, "write", callback, ctx)) return 0 @logcall def _deregister_event(event_fd): return 0 @logcall def _call_later(delay, callback, ctx): return 0 @logcall def _cancel_timer(timer_id): return 0 @logcall def _on_found_object(ctx, obj, intf): pass @logcall def _on_lost_object(ctx, obj): pass @logcall def _on_discovery_stopped(ctx, result): pass @logcall def _on_attribute_added(ctx, attr, name, name_length, subintf, subintf_name, subintf_name_length): pass @logcall def _on_attribute_removed(ctx, attr): pass @logcall def _on_function_added(ctx, func, name, name_length, input_names, input_codecs, output_names, output_codecs): pass @logcall def _on_function_removed(ctx, func): pass @logcall def _on_call_completed(ctx, status, tx_end, rx_end, tx_buf, tx_len, rx_buf, rx_len): pass c_post = PostSignature(_post) c_register_event = RegisterEventSignature(_register_event) c_deregister_event = DeregisterEventSignature(_deregister_event) c_call_later = CallLaterSignature(_call_later) c_cancel_timer = CancelTimerSignature(_cancel_timer) c_on_found_object = OnFoundObjectSignature(_on_found_object) c_on_lost_object = OnLostObjectSignature(_on_lost_object) c_on_discovery_stopped = OnStoppedSignature(_on_discovery_stopped) c_on_attribute_added = OnAttributeAddedSignature(_on_attribute_added) c_on_attribute_removed = OnAttributeRemovedSignature(_on_attribute_removed) c_on_function_added = OnFunctionAddedSignature(_on_function_added) c_on_function_removed = OnFunctionRemovedSignature(_on_function_removed) c_on_call_completed = OnCallCompletedSignature(_on_call_completed) event_loop = LibFibreEventLoop() event_loop.post = c_post event_loop.register_event = c_register_event event_loop.deregister_event = c_deregister_event event_loop.call_later = c_call_later event_loop.cancel_timer = c_cancel_timer # entrypoint ctx = c_void_p(libfibre_open(event_loop)) assert ctx spin() path = 'usb:idVendor=0x1209,idProduct=0x0D32,bInterfaceClass=0,bInterfaceSubClass=1,bInterfaceProtocol=0' buf = path.encode('ascii') domain_handle = libfibre_open_domain(ctx, buf, len(buf)) print(domain_handle) spin() discovery_handle = c_void_p(0) discovery_id = 0 libfibre_start_discovery(domain_handle, byref(discovery_handle), c_on_found_object, c_on_lost_object, c_on_discovery_stopped, discovery_id) # "event loop" while True: spin() time.sleep(0.001)