Skip to content

Instantly share code, notes, and snippets.

@AndBondStyle
Created January 13, 2024 22:44
Show Gist options
  • Select an option

  • Save AndBondStyle/76170571d28412a8ca2f8afbe06c48c1 to your computer and use it in GitHub Desktop.

Select an option

Save AndBondStyle/76170571d28412a8ca2f8afbe06c48c1 to your computer and use it in GitHub Desktop.

Revisions

  1. AndBondStyle created this gist Jan 13, 2024.
    148 changes: 148 additions & 0 deletions libfibre.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,148 @@
    from odrive.pyfibre.fibre.libfibre import *
    import select

    tasks = [] # time, callback, args...
    now = time.perf_counter

    # super stupid "event loop" implementation
    def spin():
    for _ in range(10):
    delete = []
    for i, task in enumerate(tasks.copy()):
    when, callback, *args = task
    if when <= now():
    # print("-->", callback)
    callback(*args)
    # print("<--")
    delete.append(i)
    for i in reversed(delete):
    del tasks[i]

    # "coroutine" that watches file descriptior for read/write access
    def watch_fd(fd, mode, callback, ctx):
    if mode == "read":
    rlist, _, _ = select.select([fd], [], [], 0)
    if rlist:
    print(">>> read", fd)
    tasks.append((now(), callback, ctx, 1))
    # return
    if mode == "write":
    _, wlist, _ = select.select([], [fd], [], 0)
    if wlist:
    print(">>> write", fd)
    tasks.append((now(), callback, ctx, 4))
    # return
    tasks.append((now(), watch_fd, fd, mode, callback, ctx))

    # debug decorator
    def logcall(func):
    def deco(*args):
    print("[call]", func.__name__, *args)
    return func(*args)
    return deco

    # =========================
    # libfibre callbacks impl
    # =========================

    @logcall
    def _post(callback, ctx):
    tasks.append((now(), callback, ctx))
    return 0

    @logcall
    def _register_event(event_fd, events, callback, ctx):
    assert not events & 0xfffffffa
    if events & 1:
    tasks.append((now(), watch_fd, event_fd, "read", callback, ctx))
    if events & 4:
    tasks.append((now(), watch_fd, event_fd, "write", callback, ctx))
    return 0

    @logcall
    def _deregister_event(event_fd):
    return 0

    @logcall
    def _call_later(delay, callback, ctx):
    return 0

    @logcall
    def _cancel_timer(timer_id):
    return 0

    @logcall
    def _on_found_object(ctx, obj, intf):
    pass

    @logcall
    def _on_lost_object(ctx, obj):
    pass

    @logcall
    def _on_discovery_stopped(ctx, result):
    pass

    @logcall
    def _on_attribute_added(ctx, attr, name, name_length, subintf, subintf_name, subintf_name_length):
    pass

    @logcall
    def _on_attribute_removed(ctx, attr):
    pass

    @logcall
    def _on_function_added(ctx, func, name, name_length, input_names, input_codecs, output_names, output_codecs):
    pass

    @logcall
    def _on_function_removed(ctx, func):
    pass

    @logcall
    def _on_call_completed(ctx, status, tx_end, rx_end, tx_buf, tx_len, rx_buf, rx_len):
    pass

    c_post = PostSignature(_post)
    c_register_event = RegisterEventSignature(_register_event)
    c_deregister_event = DeregisterEventSignature(_deregister_event)
    c_call_later = CallLaterSignature(_call_later)
    c_cancel_timer = CancelTimerSignature(_cancel_timer)
    c_on_found_object = OnFoundObjectSignature(_on_found_object)
    c_on_lost_object = OnLostObjectSignature(_on_lost_object)
    c_on_discovery_stopped = OnStoppedSignature(_on_discovery_stopped)
    c_on_attribute_added = OnAttributeAddedSignature(_on_attribute_added)
    c_on_attribute_removed = OnAttributeRemovedSignature(_on_attribute_removed)
    c_on_function_added = OnFunctionAddedSignature(_on_function_added)
    c_on_function_removed = OnFunctionRemovedSignature(_on_function_removed)
    c_on_call_completed = OnCallCompletedSignature(_on_call_completed)

    event_loop = LibFibreEventLoop()
    event_loop.post = c_post
    event_loop.register_event = c_register_event
    event_loop.deregister_event = c_deregister_event
    event_loop.call_later = c_call_later
    event_loop.cancel_timer = c_cancel_timer

    # entrypoint

    ctx = c_void_p(libfibre_open(event_loop))
    assert ctx

    spin()

    path = 'usb:idVendor=0x1209,idProduct=0x0D32,bInterfaceClass=0,bInterfaceSubClass=1,bInterfaceProtocol=0'
    buf = path.encode('ascii')
    domain_handle = libfibre_open_domain(ctx, buf, len(buf))
    print(domain_handle)

    spin()

    discovery_handle = c_void_p(0)
    discovery_id = 0
    libfibre_start_discovery(domain_handle, byref(discovery_handle), c_on_found_object, c_on_lost_object, c_on_discovery_stopped, discovery_id)

    # "event loop"
    while True:
    spin()
    time.sleep(0.001)