-
-
Save satyajeetkrjha/b892ba5379db6d71d1161a061c005ac1 to your computer and use it in GitHub Desktop.
Revisions
-
dabeaz created this gist
Oct 17, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,89 @@ # aproducer.py # # Async Producer-consumer problem. # Challenge: How to implement the same functionality, but no threads. import time from collections import deque import heapq class Scheduler: def __init__(self): self.ready = deque() # Functions ready to execute self.sleeping = [] # Sleeping functions self.sequence = 0 def call_soon(self, func): self.ready.append(func) def call_later(self, delay, func): self.sequence += 1 deadline = time.time() + delay # Expiration time # Priority queue heapq.heappush(self.sleeping, (deadline, self.sequence, func)) def run(self): while self.ready or self.sleeping: if not self.ready: # Find the nearest deadline deadline, _, func = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(func) while self.ready: func = self.ready.popleft() func() sched = Scheduler() # Behind scenes scheduler object # ----- class AsyncQueue: def __init__(self): self.items = deque() self.waiting = deque() # All getters waiting for data def put(self, item): self.items.append(item) if self.waiting: func = self.waiting.popleft() # Do we call it right away? No. Schedule it to be called. sched.call_soon(func) def get(self, callback): # Wait until an item is available. Then return it if self.items: callback(self.items.popleft()) else: self.waiting.append(lambda: self.get(callback)) def producer(q, count): def _run(n): if n < count: print('Producing', n) q.put(n) sched.call_later(1, lambda: _run(n+1)) else: print('Producer done') q.put(None) _run(0) def consumer(q): def _consume(item): if item is None: print('Consumer done') else: print('Consuming', item) sched.call_soon(lambda: consumer(q)) q.get(callback=_consume) q = AsyncQueue() sched.call_soon(lambda: producer(q, 10)) sched.call_soon(lambda: consumer(q,)) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,119 @@ # aproducer_error.py # # Example of returning results + errors in callback based code. import time from collections import deque import heapq class Scheduler: def __init__(self): self.ready = deque() # Functions ready to execute self.sleeping = [] # Sleeping functions self.sequence = 0 def call_soon(self, func): self.ready.append(func) def call_later(self, delay, func): self.sequence += 1 deadline = time.time() + delay # Expiration time # Priority queue heapq.heappush(self.sleeping, (deadline, self.sequence, func)) def run(self): while self.ready or self.sleeping: if not self.ready: # Find the nearest deadline deadline, _, func = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(func) while self.ready: func = self.ready.popleft() func() sched = Scheduler() # Behind scenes scheduler object # ----- # Class used to communicate both a normal value or an exception class Result: def __init__(self, value=None, exc=None): self.value = value self.exc = exc def result(self): if self.exc: raise self.exc else: return self.value class AsyncQueue: def __init__(self): self.items = deque() self.waiting = deque() # All getters waiting for data self._closed = False # Can queue be used anymore? def close(self): self._closed = True if self.waiting and not self.items: for func in self.waiting: sched.call_soon(func) def put(self, item): if self._closed: raise QueueClosed() self.items.append(item) if self.waiting: func = self.waiting.popleft() # Do we call it right away? sched.call_soon(func) def get(self, callback): # Wait until an item is available. Then return it # Question: How does a closed queue interact with get() if self.items: callback(Result(value=self.items.popleft())) # Good result else: # No items available (must wait) if self._closed: callback(Result(exc=QueueClosed())) # Error result else: self.waiting.append(lambda: self.get(callback)) class QueueClosed(Exception): pass def producer(q, count): def _run(n): if n < count: print('Producing', n) q.put(n) sched.call_later(1, lambda: _run(n+1)) else: print('Producer done') q.close() # Means no more items will be produced _run(0) def consumer(q): def _consume(result): try: item = result.result() print('Consuming', item) sched.call_soon(lambda: consumer(q)) except QueueClosed: print("Consumer done") q.get(callback=_consume) q = AsyncQueue() sched.call_soon(lambda: producer(q, 10)) sched.call_soon(lambda: consumer(q,)) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,55 @@ # asynco.py # # A basic asynchronous scheduler with support for time import time from collections import deque import heapq class Scheduler: def __init__(self): self.ready = deque() # Functions ready to execute self.sleeping = [] # Sleeping functions self.sequence = 0 # Used to break ties in priority queue def call_soon(self, func): self.ready.append(func) def call_later(self, delay, func): self.sequence += 1 deadline = time.time() + delay # Expiration time heapq.heappush(self.sleeping, (deadline, self.sequence, func)) def run(self): while self.ready or self.sleeping: if not self.ready: # Find the nearest deadline deadline, _, func = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(func) while self.ready: func = self.ready.popleft() func() sched = Scheduler() # Behind scenes scheduler object def countdown(n): if n > 0: print('Down', n) # time.sleep(4) sched.call_later(4, lambda: countdown(n-1)) def countup(stop): def _run(x): if x < stop: print('Up', x) # time.sleep(1) sched.call_later(1, lambda: _run(x+1)) _run(0) sched.call_soon(lambda: countdown(5)) sched.call_soon(lambda: countup(20)) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,136 @@ # coro_callback.py # # An example of how to implement coroutine based concurrency layered # on top of a callback-based scheduler. import time from collections import deque import heapq # Callback based scheduler (from earlier) class Scheduler: def __init__(self): self.ready = deque() # Functions ready to execute self.sleeping = [] # Sleeping functions self.sequence = 0 def call_soon(self, func): self.ready.append(func) def call_later(self, delay, func): self.sequence += 1 deadline = time.time() + delay # Expiration time # Priority queue heapq.heappush(self.sleeping, (deadline, self.sequence, func)) def run(self): while self.ready or self.sleeping: if not self.ready: # Find the nearest deadline deadline, _, func = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(func) while self.ready: func = self.ready.popleft() func() # Coroutine-based functions def new_task(self, coro): self.ready.append(Task(coro)) # Wrapped coroutine async def sleep(self, delay): self.call_later(delay, self.current) self.current = None await switch() # Switch to a new task # Class that wraps a coroutine--making it look like a callback class Task: def __init__(self, coro): self.coro = coro # "Wrapped coroutine" # Make it look like a callback def __call__(self): try: # Driving the coroutine as before sched.current = self self.coro.send(None) if sched.current: sched.ready.append(self) except StopIteration: pass class Awaitable: def __await__(self): yield def switch(): return Awaitable() sched = Scheduler() # Background scheduler object # ---------------- class AsyncQueue: def __init__(self): self.items = deque() self.waiting = deque() async def put(self, item): self.items.append(item) if self.waiting: sched.ready.append(self.waiting.popleft()) async def get(self): if not self.items: self.waiting.append(sched.current) # Put myself to sleep sched.current = None # "Disappear" await switch() # Switch to another task return self.items.popleft() # Coroutine-based tasks async def producer(q, count): for n in range(count): print('Producing', n) await q.put(n) await sched.sleep(1) print('Producer done') await q.put(None) # "Sentinel" to shut down async def consumer(q): while True: item = await q.get() if item is None: break print('Consuming', item) print('Consumer done') q = AsyncQueue() sched.new_task(producer(q, 10)) sched.new_task(consumer(q)) # Call-back based tasks def countdown(n): if n > 0: print('Down', n) # time.sleep(4) # Blocking call (nothing else can run) sched.call_later(4, lambda: countdown(n-1)) def countup(stop): def _run(x): if x < stop: print('Up', x) # time.sleep(1) sched.call_later(1, lambda: _run(x+1)) _run(0) sched.call_soon(lambda: countdown(5)) sched.call_soon(lambda: countup(20)) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,30 @@ # Build Your Own Async # # David Beazley (@dabeaz) # https://www.dabeaz.com # # Originally presented at PyCon India, Chennai, October 14, 2019 import time def countdown(n): while n > 0: print('Down', n) time.sleep(1) n -= 1 def countup(stop): x = 0 while x < stop: print('Up', x) time.sleep(1) x += 1 # Example of sequential execution countdown(5) countup(5) # Example of concurrent execution (via threads) import threading threading.Thread(target=countdown, args=(5,)).start() threading.Thread(target=countup, args=(5,)).start() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,146 @@ # io_scheduler.py # # An example of implementing I/O operations in the scheduler import time from collections import deque import heapq from select import select # Callback based scheduler (from earlier) class Scheduler: def __init__(self): self.ready = deque() # Functions ready to execute self.sleeping = [] # Sleeping functions self.sequence = 0 self._read_waiting = { } self._write_waiting = { } def call_soon(self, func): self.ready.append(func) def call_later(self, delay, func): self.sequence += 1 deadline = time.time() + delay # Expiration time # Priority queue heapq.heappush(self.sleeping, (deadline, self.sequence, func)) def read_wait(self, fileno, func): # Trigger func() when fileno is readable self._read_waiting[fileno] = func def write_wait(self, fileno, func): # Trigger func() when fileno is writeable self._write_waiting[fileno] = func def run(self): while (self.ready or self.sleeping or self._read_waiting or self._write_waiting): if not self.ready: # Find the nearest deadline if self.sleeping: deadline, _, func = self.sleeping[0] timeout = deadline - time.time() if timeout < 0: timeout = 0 else: timeout = None # Wait forever # Wait for I/O (and sleep) can_read, can_write, _ = select(self._read_waiting, self._write_waiting, [], timeout) for fd in can_read: self.ready.append(self._read_waiting.pop(fd)) for fd in can_write: self.ready.append(self._write_waiting.pop(fd)) # Check for sleeping tasks now = time.time() while self.sleeping: if now > self.sleeping[0][0]: self.ready.append(heapq.heappop(self.sleeping)[2]) else: break while self.ready: func = self.ready.popleft() func() def new_task(self, coro): self.ready.append(Task(coro)) # Wrapped coroutine async def sleep(self, delay): self.call_later(delay, self.current) self.current = None await switch() # Switch to a new task async def recv(self, sock, maxbytes): self.read_wait(sock, self.current) self.current = None await switch() return sock.recv(maxbytes) async def send(self, sock, data): self.write_wait(sock, self.current) self.current = None await switch() return sock.send(data) async def accept(self, sock): self.read_wait(sock, self.current) self.current = None await switch() return sock.accept() class Task: def __init__(self, coro): self.coro = coro # "Wrapped coroutine" # Make it look like a callback def __call__(self): try: # Driving the coroutine as before sched.current = self self.coro.send(None) if sched.current: sched.ready.append(self) except StopIteration: pass class Awaitable: def __await__(self): yield def switch(): return Awaitable() sched = Scheduler() # Background scheduler object # ---------------- from socket import * async def tcp_server(addr): sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(1) while True: client, addr = await sched.accept(sock) print('Connection from', addr) sched.new_task(echo_handler(client)) async def echo_handler(sock): while True: data = await sched.recv(sock, 10000) if not data: break await sched.send(sock, b'Got:' + data) print('Connection closed') sock.close() sched.new_task(tcp_server(('', 30000))) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,30 @@ # producer.py # # Producer-consumer problem with threads import queue import threading import time def producer(q, count): for n in range(count): print('Producing', n) q.put(n) time.sleep(1) print('Producer done') q.put(None) # "Sentinel" to shut down def consumer(q): while True: item = q.get() if item is None: break print('Consuming', item) print('Consumer done') q = queue.Queue() # Thread-safe queue threading.Thread(target=producer, args=(q, 10)).start() threading.Thread(target=consumer, args=(q,)).start() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,74 @@ # yieldo.py # # Example of a coroutine-based scheduler import time from collections import deque import heapq # Plumbing that makes the "await" statement work. We provide # a single function "switch" that is used by the schedule to # switch tasks. class Awaitable: def __await__(self): yield def switch(): return Awaitable() class Scheduler: def __init__(self): self.ready = deque() self.sleeping = [ ] self.current = None # Currently executing generator self.sequence = 0 async def sleep(self, delay): deadline = time.time() + delay self.sequence += 1 heapq.heappush(self.sleeping, (deadline, self.sequence, self.current)) self.current = None # "Disappear" await switch() # Switch tasks def new_task(self, coro): self.ready.append(coro) def run(self): while self.ready or self.sleeping: if not self.ready: deadline, _, coro = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(coro) self.current = self.ready.popleft() # Drive as a generator try: self.current.send(None) # Send to a coroutine if self.current: self.ready.append(self.current) except StopIteration: pass sched = Scheduler() # Background scheduler object # ---- Example code async def countdown(n): while n > 0: print('Down', n) await sched.sleep(4) n -= 1 async def countup(stop): x = 0 while x < stop: print('Up', x) await sched.sleep(1) x += 1 sched.new_task(countdown(5)) sched.new_task(countup(20)) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,96 @@ # yproducer.py # # Coroutine producer-consumer problem. With async-await import time from collections import deque import heapq class Awaitable: def __await__(self): yield def switch(): return Awaitable() class Scheduler: def __init__(self): self.ready = deque() self.sleeping = [ ] self.current = None # Currently executing generator self.sequence = 0 async def sleep(self, delay): deadline = time.time() + delay self.sequence += 1 heapq.heappush(self.sleeping, (deadline, self.sequence, self.current)) self.current = None # "Disappear" await switch() # Switch tasks def new_task(self, coro): self.ready.append(coro) def run(self): while self.ready or self.sleeping: if not self.ready: deadline, _, coro = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(coro) self.current = self.ready.popleft() # Drive as a generator try: self.current.send(None) # Send to a coroutine if self.current: self.ready.append(self.current) except StopIteration: pass sched = Scheduler() # Background scheduler object # ---------------- class AsyncQueue: def __init__(self): self.items = deque() self.waiting = deque() async def put(self, item): self.items.append(item) if self.waiting: sched.ready.append(self.waiting.popleft()) async def get(self): if not self.items: self.waiting.append(sched.current) # Put myself to sleep sched.current = None # "Disappear" await switch() # Switch to another task return self.items.popleft() async def producer(q, count): for n in range(count): print('Producing', n) await q.put(n) await sched.sleep(1) print('Producer done') await q.put(None) # "Sentinel" to shut down async def consumer(q): while True: item = await q.get() if item is None: break print('Consuming', item) print('Consumer done') q = AsyncQueue() sched.new_task(producer(q, 10)) sched.new_task(consumer(q)) sched.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,111 @@ # yproducer_error.py # # Example of error handling with coroutines import time from collections import deque import heapq class Awaitable: def __await__(self): yield def switch(): return Awaitable() class Scheduler: def __init__(self): self.ready = deque() self.sleeping = [ ] self.current = None # Currently executing generator self.sequence = 0 async def sleep(self, delay): deadline = time.time() + delay self.sequence += 1 heapq.heappush(self.sleeping, (deadline, self.sequence, self.current)) self.current = None # "Disappear" await switch() # Switch tasks def new_task(self, coro): self.ready.append(coro) def run(self): while self.ready or self.sleeping: if not self.ready: deadline, _, coro = heapq.heappop(self.sleeping) delta = deadline - time.time() if delta > 0: time.sleep(delta) self.ready.append(coro) self.current = self.ready.popleft() # Drive as a generator try: self.current.send(None) # Send to a coroutine if self.current: self.ready.append(self.current) except StopIteration: pass sched = Scheduler() # Background scheduler object # ---------------- class QueueClosed(Exception): pass class AsyncQueue: def __init__(self): self.items = deque() self.waiting = deque() self._closed = False def close(self): self._closed = True if self.waiting and not self.items: sched.ready.append(self.waiting.popleft()) # Reschedule waiting tasks async def put(self, item): if self._closed: raise QueueClosed() self.items.append(item) if self.waiting: sched.ready.append(self.waiting.popleft()) async def get(self): while not self.items: if self._closed: raise QueueClosed() self.waiting.append(sched.current) # Put myself to sleep sched.current = None # "Disappear" await switch() # Switch to another task return self.items.popleft() async def producer(q, count): for n in range(count): print('Producing', n) await q.put(n) await sched.sleep(1) print('Producer done') q.close() async def consumer(q): try: while True: item = await q.get() print('Consuming', item) except QueueClosed: print('Consumer done') q = AsyncQueue() sched.new_task(producer(q, 10)) sched.new_task(consumer(q)) sched.run()