mport aiohttp import asyncio from datetime import datetime from aiostream import stream as aiozip from pipe import Pipe import json import jq async def schedule(): while True: for interval in [1, 1, 20, 1, 1]: yield await asyncio.sleep(interval) @Pipe async def timeout(stream, duration): flag = False async def timer(): nonlocal flag while True: if flag: yield "timeout" flag = True await asyncio.sleep(duration) async def process(): nonlocal flag async for _ in stream: flag = False yield "schedule" combine = aiozip.merge(timer(), process()) async with combine.stream() as streamer: async for item in streamer: yield item @Pipe async def timeout2(stream, duration): while True: try: item = await asyncio.wait_for(anext(stream), duration) except TimeoutError: yield "timeout" else: yield "schedule" @Pipe async def timeout3(stream, duration): queue = asyncio.Queue(1) async def drain(aiter): async for item in aiter: await queue.put(item) task = asyncio.create_task(drain(stream)) while not task.done(): try: await asyncio.wait_for(queue.get(), duration) yield "schedule" except TimeoutError: yield "timeout" @Pipe async def date(stream): async for reason in stream: data = datetime.now() yield [data, reason] @Pipe async def display(stream): async for item in stream: print(json.dumps(item, default=str)) asyncio.run(schedule() | timeout(1) | date | display)