Skip to content

Instantly share code, notes, and snippets.

@hansthen
Last active May 25, 2025 19:14
Show Gist options
  • Select an option

  • Save hansthen/4844fb643bb742edc0868f9a0d49d527 to your computer and use it in GitHub Desktop.

Select an option

Save hansthen/4844fb643bb742edc0868f9a0d49d527 to your computer and use it in GitHub Desktop.

Revisions

  1. hansthen revised this gist May 25, 2025. No changes.
  2. hansthen revised this gist May 25, 2025. 1 changed file with 29 additions and 4 deletions.
    33 changes: 29 additions & 4 deletions timeout.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    import aiohttp
    mport aiohttp
    import asyncio
    from datetime import datetime
    from aiostream import stream as aiozip
    @@ -10,12 +10,12 @@

    async def schedule():
    while True:
    for interval in [1, 1, 5, 1, 1]:
    for interval in [1, 1, 20, 1, 1]:
    yield
    await asyncio.sleep(interval)

    @Pipe
    async def timeout(stream, interval):
    async def timeout(stream, duration):

    flag = False

    @@ -25,7 +25,7 @@ async def timer():
    if flag:
    yield "timeout"
    flag = True
    await asyncio.sleep(interval)
    await asyncio.sleep(duration)

    async def process():
    nonlocal flag
    @@ -38,6 +38,31 @@ async def process():
    async for item in streamer:
    yield item

    @Pipe
    async def timeout2(stream, duration):
    while True:
    try:
    item = await asyncio.wait_for(anext(stream), duration)
    except TimeoutError:
    yield "timeout"
    else:
    yield "schedule"

    @Pipe
    async def timeout3(stream, duration):
    queue = asyncio.Queue(1)
    async def drain(aiter):
    async for item in aiter:
    await queue.put(item)
    task = asyncio.create_task(drain(stream))

    while not task.done():
    try:
    await asyncio.wait_for(queue.get(), duration)
    yield "schedule"
    except TimeoutError:
    yield "timeout"

    @Pipe
    async def date(stream):
    async for reason in stream:
  3. hansthen created this gist May 25, 2025.
    53 changes: 53 additions & 0 deletions timeout.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,53 @@
    import aiohttp
    import asyncio
    from datetime import datetime
    from aiostream import stream as aiozip

    from pipe import Pipe
    import json
    import jq


    async def schedule():
    while True:
    for interval in [1, 1, 5, 1, 1]:
    yield
    await asyncio.sleep(interval)

    @Pipe
    async def timeout(stream, interval):

    flag = False

    async def timer():
    nonlocal flag
    while True:
    if flag:
    yield "timeout"
    flag = True
    await asyncio.sleep(interval)

    async def process():
    nonlocal flag
    async for _ in stream:
    flag = False
    yield "schedule"

    combine = aiozip.merge(timer(), process())
    async with combine.stream() as streamer:
    async for item in streamer:
    yield item

    @Pipe
    async def date(stream):
    async for reason in stream:
    data = datetime.now()
    yield [data, reason]

    @Pipe
    async def display(stream):
    async for item in stream:
    print(json.dumps(item, default=str))


    asyncio.run(schedule() | timeout(1) | date | display)