Created
June 27, 2017 00:55
-
-
Save cjerdonek/858e1467f768ee045849ea81ddb47901 to your computer and use it in GitHub Desktop.
Revisions
-
cjerdonek created this gist
Jun 27, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,85 @@ import asyncio import random NO_READERS_EVENT = asyncio.Event() NO_WRITERS_EVENT = asyncio.Event() WRITE_LOCK = asyncio.Lock() class State: reader_count = 0 mock_file_data = 'initial' async def read_file(): data = State.mock_file_data print(f'read: {data}') async def write_file(data): print(f'writing: {data}') State.mock_file_data = data await asyncio.sleep(0.5) async def write(data): async with WRITE_LOCK: NO_WRITERS_EVENT.clear() # Wait for the readers to finish. await NO_READERS_EVENT.wait() # Do the file write. await write_file(data) # Awaken waiting readers. NO_WRITERS_EVENT.set() async def read(): while True: await NO_WRITERS_EVENT.wait() # Check the writer_lock again in case a new writer has # started writing. if WRITE_LOCK.locked(): print(f'cannot read: still writing: {State.mock_file_data!r}') else: # Otherwise, we can do the read. break State.reader_count += 1 if State.reader_count == 1: NO_READERS_EVENT.clear() # Do the file read. await read_file() State.reader_count -= 1 if State.reader_count == 0: # Awaken any waiting writer. NO_READERS_EVENT.set() async def delayed(coro): await asyncio.sleep(random.random()) await coro async def test_synchronization(): NO_READERS_EVENT.set() NO_WRITERS_EVENT.set() coros = [ read(), read(), read(), read(), read(), read(), write('apple'), write('banana'), ] # Add a delay before each coroutine for variety. coros = [delayed(coro) for coro in coros] await asyncio.gather(*coros) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(test_synchronization())