Created
December 3, 2017 10:31
-
-
Save kxxoling/eddab4f126ba6ea36f76b4872c98b5e2 to your computer and use it in GitHub Desktop.
Revisions
-
kxxoling created this gist
Dec 3, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,108 @@ import datetime import heapq import types import time class Task: """Represent how long a coroutine should wait before starting again. Comparison operators are implemented for use by heapq. Two-item tuples unfortunately don't work because when the datetime.datetime instances are equal, comparison falls to the coroutine and they don't implement comparison methods, triggering an exception. Think of this as being like asyncio.Task/curio.Task. """ def __init__(self, wait_until, coro): self.coro = coro self.waiting_until = wait_until def __eq__(self, other): return self.waiting_until == other.waiting_until def __lt__(self, other): return self.waiting_until < other.waiting_until class SleepingLoop: """An event loop focused on delaying execution of coroutines. Think of this as being like asyncio.BaseEventLoop/curio.Kernel. """ def __init__(self, *coros): self._new = coros self._waiting = [] def run_until_complete(self): # Start all the coroutines. for coro in self._new: wait_for = coro.send(None) heapq.heappush(self._waiting, Task(wait_for, coro)) # Keep running until there is no more work to do. while self._waiting: now = datetime.datetime.now() # Get the coroutine with the soonest resumption time. task = heapq.heappop(self._waiting) if now < task.waiting_until: # We're ahead of schedule; wait until it's time to resume. delta = task.waiting_until - now time.sleep(delta.total_seconds()) now = datetime.datetime.now() try: # It's time to resume the coroutine. wait_until = task.coro.send(now) heapq.heappush(self._waiting, Task(wait_until, task.coro)) except StopIteration: # The coroutine is done. pass @types.coroutine def sleep(seconds): """Pause a coroutine for the specified number of seconds. Think of this as being like asyncio.sleep()/curio.sleep(). """ now = datetime.datetime.now() wait_until = now + datetime.timedelta(seconds=seconds) # Make all coroutines on the call stack pause; the need to use `yield` # necessitates this be generator-based and not an async-based coroutine. actual = yield wait_until # Resume the execution stack, sending back how long we actually waited. return actual - now async def countdown(label, length, *, delay=0): """Countdown a launch for `length` seconds, waiting `delay` seconds. This is what a user would typically write. """ print(label, 'waiting', delay, 'seconds before starting countdown') delta = await sleep(delay) print(label, 'starting after waiting', delta) while length: print(label, 'T-minus', length) waited = await sleep(1) length -= 1 print(label, 'lift-off!') def main(): """Start the event loop, counting down 3 separate launches. This is what a user would typically write. """ loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2), countdown('C', 4, delay=1)) start = datetime.datetime.now() loop.run_until_complete() print('Total elapsed time is', datetime.datetime.now() - start) if __name__ == '__main__': main()