Skip to content

Instantly share code, notes, and snippets.

@cjerdonek
Created June 27, 2017 00:55
Show Gist options
  • Save cjerdonek/858e1467f768ee045849ea81ddb47901 to your computer and use it in GitHub Desktop.
Save cjerdonek/858e1467f768ee045849ea81ddb47901 to your computer and use it in GitHub Desktop.

Revisions

  1. cjerdonek created this gist Jun 27, 2017.
    85 changes: 85 additions & 0 deletions test-rw.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,85 @@
    import asyncio
    import random


    NO_READERS_EVENT = asyncio.Event()
    NO_WRITERS_EVENT = asyncio.Event()
    WRITE_LOCK = asyncio.Lock()


    class State:
    reader_count = 0
    mock_file_data = 'initial'


    async def read_file():
    data = State.mock_file_data
    print(f'read: {data}')


    async def write_file(data):
    print(f'writing: {data}')
    State.mock_file_data = data
    await asyncio.sleep(0.5)


    async def write(data):
    async with WRITE_LOCK:
    NO_WRITERS_EVENT.clear()
    # Wait for the readers to finish.
    await NO_READERS_EVENT.wait()
    # Do the file write.
    await write_file(data)

    # Awaken waiting readers.
    NO_WRITERS_EVENT.set()


    async def read():
    while True:
    await NO_WRITERS_EVENT.wait()
    # Check the writer_lock again in case a new writer has
    # started writing.
    if WRITE_LOCK.locked():
    print(f'cannot read: still writing: {State.mock_file_data!r}')
    else:
    # Otherwise, we can do the read.
    break

    State.reader_count += 1
    if State.reader_count == 1:
    NO_READERS_EVENT.clear()
    # Do the file read.
    await read_file()
    State.reader_count -= 1
    if State.reader_count == 0:
    # Awaken any waiting writer.
    NO_READERS_EVENT.set()


    async def delayed(coro):
    await asyncio.sleep(random.random())
    await coro


    async def test_synchronization():
    NO_READERS_EVENT.set()
    NO_WRITERS_EVENT.set()

    coros = [
    read(),
    read(),
    read(),
    read(),
    read(),
    read(),
    write('apple'),
    write('banana'),
    ]
    # Add a delay before each coroutine for variety.
    coros = [delayed(coro) for coro in coros]
    await asyncio.gather(*coros)


    if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(test_synchronization())