Skip to content

Instantly share code, notes, and snippets.

@rask
Last active April 24, 2018 15:56
Show Gist options
  • Save rask/1ec45c0dca341cfdeafc21d95409cf67 to your computer and use it in GitHub Desktop.
Save rask/1ec45c0dca341cfdeafc21d95409cf67 to your computer and use it in GitHub Desktop.

Revisions

  1. rask revised this gist Apr 24, 2018. 1 changed file with 30 additions and 0 deletions.
    30 changes: 30 additions & 0 deletions example.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    #!/usr/bin/env python
    import asyncio
    from keyed_pool import KeyedPool


    async def produce(pool):
    await pool.put('foo', 'bar')
    await pool.put('hello', 'world')
    await asyncio.sleep(4)
    await pool.put('foo', 'baz')


    async def consume(pool):
    await asyncio.sleep(2)
    fooval = await pool.get('foo')
    helloval = await pool.get('hello')
    await asyncio.sleep(3)
    newfooval = await pool.get('foo')

    print('{} {} {}'.format(fooval, helloval, newfooval))


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    pool = KeyedPool(loop)

    tasks = asyncio.gather(produce(pool), consume(pool))

    loop.run_until_complete(tasks)
  2. rask created this gist Apr 24, 2018.
    136 changes: 136 additions & 0 deletions keyed_pool.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,136 @@
    import asyncio
    import collections


    class KeyCollision(KeyError):
    pass


    class KeyedPool():
    """
    A pool from which items can be awaited by key. References to asyncio.Queue
    are prominent in this one.
    """

    def __init__(self, loop: asyncio.AbstractEventLoop):
    """
    Inits.
    """
    self._max_size = 1024
    self._loop = loop
    self._pool = {}
    self._getters = collections.deque()
    self._putters = collections.deque()

    def _do_next(self, waiters):
    """
    Trigger an action for a collection of waiting Futures.
    """
    while waiters:
    waiter = waiters.popleft()
    if not waiter.done():
    waiter.set_result(None)
    break

    def has_key(self, key: str) -> bool:
    """
    Check if a key is defined for this pool.
    """
    return key in self._pool.keys()

    def has_items(self) -> int:
    """
    Check if the pool has any items.
    """
    return bool(self._pool.keys())

    def is_empty(self):
    """
    Check if the pool is empty.
    """
    return not self.has_items()

    def is_full(self):
    """
    Is the pool full?
    """
    return self._max_size <= len(self._pool)

    async def put(self, key: str, item):
    """
    Put a new item into the pool.
    """
    while self.is_full():
    putter = self._loop.create_future()
    self._putters.append(putter)

    try:
    await putter
    except:
    putter.cancel() # Just in case putter is not done yet.
    try:
    # Clean self._putters from canceled putters.
    self._putters.remove(putter)
    except ValueError:
    # The putter could be removed from self._putters by a
    # previous get_nowait call.
    pass
    if not self.is_full() and not putter.cancelled():
    # We were woken up by get_nowait(), but can't take
    # the call. Wake up the next in line.
    self._do_next(self._putters)
    raise

    if self.has_key(key):
    raise KeyCollision()

    self._put(key, item)

    def _put(self, key: str, item):
    """
    Actually put.
    """
    self._pool[key] = item
    self._do_next(self._getters)

    async def get(self, key: str):
    """
    Get an item from the pool by key.
    """
    while self.is_empty() or not self.has_key(key):
    getter = self._loop.create_future()
    self._getters.append(getter)

    try:
    await getter
    except:
    getter.cancel() # Just in case getter is not done yet.
    try:
    # Clean self._getters from canceled getters.
    self._getters.remove(getter)
    except ValueError:
    # The getter could be removed from self._getters by a
    # previous put_nowait call.
    pass
    if not self.is_empty() and not getter.cancelled():
    # We were woken up by put_nowait(), but can't take
    # the call. Wake up the next in line.
    self._do_next(self._getters)
    raise

    return self._get(key)

    def _get(self, key: str):
    """
    Actually get.
    """
    p = self._pool

    item = p.get(key)
    del p[key]

    self._pool = p

    self._do_next(self._putters)

    return item