Last active
April 24, 2018 15:56
-
-
Save rask/1ec45c0dca341cfdeafc21d95409cf67 to your computer and use it in GitHub Desktop.
Revisions
-
rask revised this gist
Apr 24, 2018 . 1 changed file with 30 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,30 @@ #!/usr/bin/env python import asyncio from keyed_pool import KeyedPool async def produce(pool): await pool.put('foo', 'bar') await pool.put('hello', 'world') await asyncio.sleep(4) await pool.put('foo', 'baz') async def consume(pool): await asyncio.sleep(2) fooval = await pool.get('foo') helloval = await pool.get('hello') await asyncio.sleep(3) newfooval = await pool.get('foo') print('{} {} {}'.format(fooval, helloval, newfooval)) if __name__ == "__main__": loop = asyncio.get_event_loop() pool = KeyedPool(loop) tasks = asyncio.gather(produce(pool), consume(pool)) loop.run_until_complete(tasks) -
rask created this gist
Apr 24, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,136 @@ import asyncio import collections class KeyCollision(KeyError): pass class KeyedPool(): """ A pool from which items can be awaited by key. References to asyncio.Queue are prominent in this one. """ def __init__(self, loop: asyncio.AbstractEventLoop): """ Inits. """ self._max_size = 1024 self._loop = loop self._pool = {} self._getters = collections.deque() self._putters = collections.deque() def _do_next(self, waiters): """ Trigger an action for a collection of waiting Futures. """ while waiters: waiter = waiters.popleft() if not waiter.done(): waiter.set_result(None) break def has_key(self, key: str) -> bool: """ Check if a key is defined for this pool. """ return key in self._pool.keys() def has_items(self) -> int: """ Check if the pool has any items. """ return bool(self._pool.keys()) def is_empty(self): """ Check if the pool is empty. """ return not self.has_items() def is_full(self): """ Is the pool full? """ return self._max_size <= len(self._pool) async def put(self, key: str, item): """ Put a new item into the pool. """ while self.is_full(): putter = self._loop.create_future() self._putters.append(putter) try: await putter except: putter.cancel() # Just in case putter is not done yet. try: # Clean self._putters from canceled putters. self._putters.remove(putter) except ValueError: # The putter could be removed from self._putters by a # previous get_nowait call. pass if not self.is_full() and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. self._do_next(self._putters) raise if self.has_key(key): raise KeyCollision() self._put(key, item) def _put(self, key: str, item): """ Actually put. """ self._pool[key] = item self._do_next(self._getters) async def get(self, key: str): """ Get an item from the pool by key. """ while self.is_empty() or not self.has_key(key): getter = self._loop.create_future() self._getters.append(getter) try: await getter except: getter.cancel() # Just in case getter is not done yet. try: # Clean self._getters from canceled getters. self._getters.remove(getter) except ValueError: # The getter could be removed from self._getters by a # previous put_nowait call. pass if not self.is_empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take # the call. Wake up the next in line. self._do_next(self._getters) raise return self._get(key) def _get(self, key: str): """ Actually get. """ p = self._pool item = p.get(key) del p[key] self._pool = p self._do_next(self._putters) return item