Skip to content

Instantly share code, notes, and snippets.

@kxxoling
Created December 3, 2017 10:31
Show Gist options
  • Select an option

  • Save kxxoling/eddab4f126ba6ea36f76b4872c98b5e2 to your computer and use it in GitHub Desktop.

Select an option

Save kxxoling/eddab4f126ba6ea36f76b4872c98b5e2 to your computer and use it in GitHub Desktop.

Revisions

  1. kxxoling created this gist Dec 3, 2017.
    108 changes: 108 additions & 0 deletions asyncloop.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,108 @@
    import datetime
    import heapq
    import types
    import time


    class Task:

    """Represent how long a coroutine should wait before starting again.
    Comparison operators are implemented for use by heapq. Two-item
    tuples unfortunately don't work because when the datetime.datetime
    instances are equal, comparison falls to the coroutine and they don't
    implement comparison methods, triggering an exception.
    Think of this as being like asyncio.Task/curio.Task.
    """

    def __init__(self, wait_until, coro):
    self.coro = coro
    self.waiting_until = wait_until

    def __eq__(self, other):
    return self.waiting_until == other.waiting_until

    def __lt__(self, other):
    return self.waiting_until < other.waiting_until


    class SleepingLoop:

    """An event loop focused on delaying execution of coroutines.
    Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
    """

    def __init__(self, *coros):
    self._new = coros
    self._waiting = []

    def run_until_complete(self):
    # Start all the coroutines.
    for coro in self._new:
    wait_for = coro.send(None)
    heapq.heappush(self._waiting, Task(wait_for, coro))
    # Keep running until there is no more work to do.
    while self._waiting:
    now = datetime.datetime.now()
    # Get the coroutine with the soonest resumption time.
    task = heapq.heappop(self._waiting)
    if now < task.waiting_until:
    # We're ahead of schedule; wait until it's time to resume.
    delta = task.waiting_until - now
    time.sleep(delta.total_seconds())
    now = datetime.datetime.now()
    try:
    # It's time to resume the coroutine.
    wait_until = task.coro.send(now)
    heapq.heappush(self._waiting, Task(wait_until, task.coro))
    except StopIteration:
    # The coroutine is done.
    pass


    @types.coroutine
    def sleep(seconds):
    """Pause a coroutine for the specified number of seconds.
    Think of this as being like asyncio.sleep()/curio.sleep().
    """
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    # Make all coroutines on the call stack pause; the need to use `yield`
    # necessitates this be generator-based and not an async-based coroutine.
    actual = yield wait_until
    # Resume the execution stack, sending back how long we actually waited.
    return actual - now


    async def countdown(label, length, *, delay=0):
    """Countdown a launch for `length` seconds, waiting `delay` seconds.
    This is what a user would typically write.
    """
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
    print(label, 'T-minus', length)
    waited = await sleep(1)
    length -= 1
    print(label, 'lift-off!')


    def main():
    """Start the event loop, counting down 3 separate launches.
    This is what a user would typically write.
    """
    loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2),
    countdown('C', 4, delay=1))
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)


    if __name__ == '__main__':
    main()