Skip to content

Instantly share code, notes, and snippets.

@satyajeetkrjha
Forked from dabeaz/aproducer.py
Created October 14, 2023 02:56
Show Gist options
  • Save satyajeetkrjha/b892ba5379db6d71d1161a061c005ac1 to your computer and use it in GitHub Desktop.
Save satyajeetkrjha/b892ba5379db6d71d1161a061c005ac1 to your computer and use it in GitHub Desktop.

Revisions

  1. @dabeaz dabeaz created this gist Oct 17, 2019.
    89 changes: 89 additions & 0 deletions aproducer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,89 @@
    # aproducer.py
    #
    # Async Producer-consumer problem.
    # Challenge: How to implement the same functionality, but no threads.

    import time
    from collections import deque
    import heapq

    class Scheduler:
    def __init__(self):
    self.ready = deque() # Functions ready to execute
    self.sleeping = [] # Sleeping functions
    self.sequence = 0

    def call_soon(self, func):
    self.ready.append(func)

    def call_later(self, delay, func):
    self.sequence += 1
    deadline = time.time() + delay # Expiration time
    # Priority queue
    heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    # Find the nearest deadline
    deadline, _, func = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(func)

    while self.ready:
    func = self.ready.popleft()
    func()

    sched = Scheduler() # Behind scenes scheduler object

    # -----

    class AsyncQueue:
    def __init__(self):
    self.items = deque()
    self.waiting = deque() # All getters waiting for data

    def put(self, item):
    self.items.append(item)
    if self.waiting:
    func = self.waiting.popleft()
    # Do we call it right away? No. Schedule it to be called.
    sched.call_soon(func)

    def get(self, callback):
    # Wait until an item is available. Then return it
    if self.items:
    callback(self.items.popleft())
    else:
    self.waiting.append(lambda: self.get(callback))

    def producer(q, count):
    def _run(n):
    if n < count:
    print('Producing', n)
    q.put(n)
    sched.call_later(1, lambda: _run(n+1))
    else:
    print('Producer done')
    q.put(None)
    _run(0)

    def consumer(q):
    def _consume(item):
    if item is None:
    print('Consumer done')
    else:
    print('Consuming', item)
    sched.call_soon(lambda: consumer(q))
    q.get(callback=_consume)

    q = AsyncQueue()
    sched.call_soon(lambda: producer(q, 10))
    sched.call_soon(lambda: consumer(q,))
    sched.run()




    119 changes: 119 additions & 0 deletions aproducer_error.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,119 @@
    # aproducer_error.py
    #
    # Example of returning results + errors in callback based code.

    import time
    from collections import deque
    import heapq

    class Scheduler:
    def __init__(self):
    self.ready = deque() # Functions ready to execute
    self.sleeping = [] # Sleeping functions
    self.sequence = 0

    def call_soon(self, func):
    self.ready.append(func)

    def call_later(self, delay, func):
    self.sequence += 1
    deadline = time.time() + delay # Expiration time
    # Priority queue
    heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    # Find the nearest deadline
    deadline, _, func = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(func)

    while self.ready:
    func = self.ready.popleft()
    func()

    sched = Scheduler() # Behind scenes scheduler object

    # -----

    # Class used to communicate both a normal value or an exception
    class Result:
    def __init__(self, value=None, exc=None):
    self.value = value
    self.exc = exc

    def result(self):
    if self.exc:
    raise self.exc
    else:
    return self.value

    class AsyncQueue:
    def __init__(self):
    self.items = deque()
    self.waiting = deque() # All getters waiting for data
    self._closed = False # Can queue be used anymore?

    def close(self):
    self._closed = True
    if self.waiting and not self.items:
    for func in self.waiting:
    sched.call_soon(func)

    def put(self, item):
    if self._closed:
    raise QueueClosed()

    self.items.append(item)
    if self.waiting:
    func = self.waiting.popleft()
    # Do we call it right away?
    sched.call_soon(func)

    def get(self, callback):
    # Wait until an item is available. Then return it
    # Question: How does a closed queue interact with get()
    if self.items:
    callback(Result(value=self.items.popleft())) # Good result
    else:
    # No items available (must wait)
    if self._closed:
    callback(Result(exc=QueueClosed())) # Error result
    else:
    self.waiting.append(lambda: self.get(callback))

    class QueueClosed(Exception):
    pass

    def producer(q, count):
    def _run(n):
    if n < count:
    print('Producing', n)
    q.put(n)
    sched.call_later(1, lambda: _run(n+1))
    else:
    print('Producer done')
    q.close() # Means no more items will be produced
    _run(0)

    def consumer(q):
    def _consume(result):
    try:
    item = result.result()
    print('Consuming', item)
    sched.call_soon(lambda: consumer(q))
    except QueueClosed:
    print("Consumer done")
    q.get(callback=_consume)

    q = AsyncQueue()
    sched.call_soon(lambda: producer(q, 10))
    sched.call_soon(lambda: consumer(q,))
    sched.run()




    55 changes: 55 additions & 0 deletions asynco.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,55 @@
    # asynco.py
    #
    # A basic asynchronous scheduler with support for time

    import time
    from collections import deque
    import heapq

    class Scheduler:
    def __init__(self):
    self.ready = deque() # Functions ready to execute
    self.sleeping = [] # Sleeping functions
    self.sequence = 0 # Used to break ties in priority queue

    def call_soon(self, func):
    self.ready.append(func)

    def call_later(self, delay, func):
    self.sequence += 1
    deadline = time.time() + delay # Expiration time
    heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    # Find the nearest deadline
    deadline, _, func = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(func)

    while self.ready:
    func = self.ready.popleft()
    func()

    sched = Scheduler() # Behind scenes scheduler object

    def countdown(n):
    if n > 0:
    print('Down', n)
    # time.sleep(4)
    sched.call_later(4, lambda: countdown(n-1))

    def countup(stop):
    def _run(x):
    if x < stop:
    print('Up', x)
    # time.sleep(1)
    sched.call_later(1, lambda: _run(x+1))
    _run(0)

    sched.call_soon(lambda: countdown(5))
    sched.call_soon(lambda: countup(20))
    sched.run()
    136 changes: 136 additions & 0 deletions coro_callback.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,136 @@
    # coro_callback.py
    #
    # An example of how to implement coroutine based concurrency layered
    # on top of a callback-based scheduler.

    import time
    from collections import deque
    import heapq

    # Callback based scheduler (from earlier)
    class Scheduler:
    def __init__(self):
    self.ready = deque() # Functions ready to execute
    self.sleeping = [] # Sleeping functions
    self.sequence = 0

    def call_soon(self, func):
    self.ready.append(func)

    def call_later(self, delay, func):
    self.sequence += 1
    deadline = time.time() + delay # Expiration time
    # Priority queue
    heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    # Find the nearest deadline
    deadline, _, func = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(func)

    while self.ready:
    func = self.ready.popleft()
    func()

    # Coroutine-based functions
    def new_task(self, coro):
    self.ready.append(Task(coro)) # Wrapped coroutine

    async def sleep(self, delay):
    self.call_later(delay, self.current)
    self.current = None
    await switch() # Switch to a new task

    # Class that wraps a coroutine--making it look like a callback
    class Task:
    def __init__(self, coro):
    self.coro = coro # "Wrapped coroutine"

    # Make it look like a callback
    def __call__(self):
    try:
    # Driving the coroutine as before
    sched.current = self
    self.coro.send(None)
    if sched.current:
    sched.ready.append(self)
    except StopIteration:
    pass

    class Awaitable:
    def __await__(self):
    yield

    def switch():
    return Awaitable()

    sched = Scheduler() # Background scheduler object

    # ----------------

    class AsyncQueue:
    def __init__(self):
    self.items = deque()
    self.waiting = deque()

    async def put(self, item):
    self.items.append(item)
    if self.waiting:
    sched.ready.append(self.waiting.popleft())

    async def get(self):
    if not self.items:
    self.waiting.append(sched.current) # Put myself to sleep
    sched.current = None # "Disappear"
    await switch() # Switch to another task
    return self.items.popleft()

    # Coroutine-based tasks
    async def producer(q, count):
    for n in range(count):
    print('Producing', n)
    await q.put(n)
    await sched.sleep(1)

    print('Producer done')
    await q.put(None) # "Sentinel" to shut down

    async def consumer(q):
    while True:
    item = await q.get()
    if item is None:
    break
    print('Consuming', item)
    print('Consumer done')

    q = AsyncQueue()
    sched.new_task(producer(q, 10))
    sched.new_task(consumer(q))

    # Call-back based tasks
    def countdown(n):
    if n > 0:
    print('Down', n)
    # time.sleep(4) # Blocking call (nothing else can run)
    sched.call_later(4, lambda: countdown(n-1))

    def countup(stop):
    def _run(x):
    if x < stop:
    print('Up', x)
    # time.sleep(1)
    sched.call_later(1, lambda: _run(x+1))
    _run(0)

    sched.call_soon(lambda: countdown(5))
    sched.call_soon(lambda: countup(20))
    sched.run()




    30 changes: 30 additions & 0 deletions example.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    # Build Your Own Async
    #
    # David Beazley (@dabeaz)
    # https://www.dabeaz.com
    #
    # Originally presented at PyCon India, Chennai, October 14, 2019

    import time

    def countdown(n):
    while n > 0:
    print('Down', n)
    time.sleep(1)
    n -= 1

    def countup(stop):
    x = 0
    while x < stop:
    print('Up', x)
    time.sleep(1)
    x += 1

    # Example of sequential execution
    countdown(5)
    countup(5)

    # Example of concurrent execution (via threads)
    import threading
    threading.Thread(target=countdown, args=(5,)).start()
    threading.Thread(target=countup, args=(5,)).start()
    146 changes: 146 additions & 0 deletions io_scheduler.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,146 @@
    # io_scheduler.py
    #
    # An example of implementing I/O operations in the scheduler

    import time
    from collections import deque
    import heapq
    from select import select

    # Callback based scheduler (from earlier)
    class Scheduler:
    def __init__(self):
    self.ready = deque() # Functions ready to execute
    self.sleeping = [] # Sleeping functions
    self.sequence = 0
    self._read_waiting = { }
    self._write_waiting = { }

    def call_soon(self, func):
    self.ready.append(func)

    def call_later(self, delay, func):
    self.sequence += 1
    deadline = time.time() + delay # Expiration time
    # Priority queue
    heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def read_wait(self, fileno, func):
    # Trigger func() when fileno is readable
    self._read_waiting[fileno] = func

    def write_wait(self, fileno, func):
    # Trigger func() when fileno is writeable
    self._write_waiting[fileno] = func

    def run(self):
    while (self.ready or self.sleeping or self._read_waiting or self._write_waiting):
    if not self.ready:
    # Find the nearest deadline
    if self.sleeping:
    deadline, _, func = self.sleeping[0]
    timeout = deadline - time.time()
    if timeout < 0:
    timeout = 0
    else:
    timeout = None # Wait forever

    # Wait for I/O (and sleep)
    can_read, can_write, _ = select(self._read_waiting,
    self._write_waiting, [], timeout)

    for fd in can_read:
    self.ready.append(self._read_waiting.pop(fd))
    for fd in can_write:
    self.ready.append(self._write_waiting.pop(fd))

    # Check for sleeping tasks
    now = time.time()
    while self.sleeping:
    if now > self.sleeping[0][0]:
    self.ready.append(heapq.heappop(self.sleeping)[2])
    else:
    break

    while self.ready:
    func = self.ready.popleft()
    func()

    def new_task(self, coro):
    self.ready.append(Task(coro)) # Wrapped coroutine

    async def sleep(self, delay):
    self.call_later(delay, self.current)
    self.current = None
    await switch() # Switch to a new task

    async def recv(self, sock, maxbytes):
    self.read_wait(sock, self.current)
    self.current = None
    await switch()
    return sock.recv(maxbytes)

    async def send(self, sock, data):
    self.write_wait(sock, self.current)
    self.current = None
    await switch()
    return sock.send(data)

    async def accept(self, sock):
    self.read_wait(sock, self.current)
    self.current = None
    await switch()
    return sock.accept()

    class Task:
    def __init__(self, coro):
    self.coro = coro # "Wrapped coroutine"

    # Make it look like a callback
    def __call__(self):
    try:
    # Driving the coroutine as before
    sched.current = self
    self.coro.send(None)
    if sched.current:
    sched.ready.append(self)
    except StopIteration:
    pass


    class Awaitable:
    def __await__(self):
    yield

    def switch():
    return Awaitable()

    sched = Scheduler() # Background scheduler object

    # ----------------

    from socket import *
    async def tcp_server(addr):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(1)
    while True:
    client, addr = await sched.accept(sock)
    print('Connection from', addr)
    sched.new_task(echo_handler(client))

    async def echo_handler(sock):
    while True:
    data = await sched.recv(sock, 10000)
    if not data:
    break
    await sched.send(sock, b'Got:' + data)
    print('Connection closed')
    sock.close()

    sched.new_task(tcp_server(('', 30000)))
    sched.run()




    30 changes: 30 additions & 0 deletions producer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    # producer.py
    #
    # Producer-consumer problem with threads

    import queue
    import threading
    import time

    def producer(q, count):
    for n in range(count):
    print('Producing', n)
    q.put(n)
    time.sleep(1)

    print('Producer done')
    q.put(None) # "Sentinel" to shut down

    def consumer(q):
    while True:
    item = q.get()
    if item is None:
    break
    print('Consuming', item)
    print('Consumer done')

    q = queue.Queue() # Thread-safe queue
    threading.Thread(target=producer, args=(q, 10)).start()
    threading.Thread(target=consumer, args=(q,)).start()


    74 changes: 74 additions & 0 deletions yieldo.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,74 @@
    # yieldo.py
    #
    # Example of a coroutine-based scheduler

    import time
    from collections import deque
    import heapq

    # Plumbing that makes the "await" statement work. We provide
    # a single function "switch" that is used by the schedule to
    # switch tasks.

    class Awaitable:
    def __await__(self):
    yield

    def switch():
    return Awaitable()

    class Scheduler:
    def __init__(self):
    self.ready = deque()
    self.sleeping = [ ]
    self.current = None # Currently executing generator
    self.sequence = 0

    async def sleep(self, delay):
    deadline = time.time() + delay
    self.sequence += 1
    heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
    self.current = None # "Disappear"
    await switch() # Switch tasks

    def new_task(self, coro):
    self.ready.append(coro)

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    deadline, _, coro = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(coro)

    self.current = self.ready.popleft()
    # Drive as a generator
    try:
    self.current.send(None) # Send to a coroutine
    if self.current:
    self.ready.append(self.current)
    except StopIteration:
    pass

    sched = Scheduler() # Background scheduler object

    # ---- Example code

    async def countdown(n):
    while n > 0:
    print('Down', n)
    await sched.sleep(4)
    n -= 1

    async def countup(stop):
    x = 0
    while x < stop:
    print('Up', x)
    await sched.sleep(1)
    x += 1

    sched.new_task(countdown(5))
    sched.new_task(countup(20))
    sched.run()
    96 changes: 96 additions & 0 deletions yproducer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,96 @@
    # yproducer.py
    #
    # Coroutine producer-consumer problem. With async-await

    import time
    from collections import deque
    import heapq

    class Awaitable:
    def __await__(self):
    yield

    def switch():
    return Awaitable()

    class Scheduler:
    def __init__(self):
    self.ready = deque()
    self.sleeping = [ ]
    self.current = None # Currently executing generator
    self.sequence = 0

    async def sleep(self, delay):
    deadline = time.time() + delay
    self.sequence += 1
    heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
    self.current = None # "Disappear"
    await switch() # Switch tasks

    def new_task(self, coro):
    self.ready.append(coro)

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    deadline, _, coro = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(coro)

    self.current = self.ready.popleft()
    # Drive as a generator
    try:
    self.current.send(None) # Send to a coroutine
    if self.current:
    self.ready.append(self.current)
    except StopIteration:
    pass

    sched = Scheduler() # Background scheduler object

    # ----------------

    class AsyncQueue:
    def __init__(self):
    self.items = deque()
    self.waiting = deque()

    async def put(self, item):
    self.items.append(item)
    if self.waiting:
    sched.ready.append(self.waiting.popleft())

    async def get(self):
    if not self.items:
    self.waiting.append(sched.current) # Put myself to sleep
    sched.current = None # "Disappear"
    await switch() # Switch to another task
    return self.items.popleft()

    async def producer(q, count):
    for n in range(count):
    print('Producing', n)
    await q.put(n)
    await sched.sleep(1)

    print('Producer done')
    await q.put(None) # "Sentinel" to shut down

    async def consumer(q):
    while True:
    item = await q.get()
    if item is None:
    break
    print('Consuming', item)
    print('Consumer done')

    q = AsyncQueue()
    sched.new_task(producer(q, 10))
    sched.new_task(consumer(q))
    sched.run()




    111 changes: 111 additions & 0 deletions yproducer_error.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,111 @@
    # yproducer_error.py
    #
    # Example of error handling with coroutines

    import time
    from collections import deque
    import heapq

    class Awaitable:
    def __await__(self):
    yield

    def switch():
    return Awaitable()

    class Scheduler:
    def __init__(self):
    self.ready = deque()
    self.sleeping = [ ]
    self.current = None # Currently executing generator
    self.sequence = 0

    async def sleep(self, delay):
    deadline = time.time() + delay
    self.sequence += 1
    heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
    self.current = None # "Disappear"
    await switch() # Switch tasks

    def new_task(self, coro):
    self.ready.append(coro)

    def run(self):
    while self.ready or self.sleeping:
    if not self.ready:
    deadline, _, coro = heapq.heappop(self.sleeping)
    delta = deadline - time.time()
    if delta > 0:
    time.sleep(delta)
    self.ready.append(coro)

    self.current = self.ready.popleft()
    # Drive as a generator
    try:
    self.current.send(None) # Send to a coroutine
    if self.current:
    self.ready.append(self.current)
    except StopIteration:
    pass

    sched = Scheduler() # Background scheduler object

    # ----------------

    class QueueClosed(Exception):
    pass

    class AsyncQueue:
    def __init__(self):
    self.items = deque()
    self.waiting = deque()
    self._closed = False

    def close(self):
    self._closed = True
    if self.waiting and not self.items:
    sched.ready.append(self.waiting.popleft()) # Reschedule waiting tasks

    async def put(self, item):
    if self._closed:
    raise QueueClosed()

    self.items.append(item)
    if self.waiting:
    sched.ready.append(self.waiting.popleft())

    async def get(self):
    while not self.items:
    if self._closed:
    raise QueueClosed()
    self.waiting.append(sched.current) # Put myself to sleep
    sched.current = None # "Disappear"
    await switch() # Switch to another task

    return self.items.popleft()

    async def producer(q, count):
    for n in range(count):
    print('Producing', n)
    await q.put(n)
    await sched.sleep(1)

    print('Producer done')
    q.close()

    async def consumer(q):
    try:
    while True:
    item = await q.get()
    print('Consuming', item)
    except QueueClosed:
    print('Consumer done')

    q = AsyncQueue()
    sched.new_task(producer(q, 10))
    sched.new_task(consumer(q))
    sched.run()