''' I/O test for asyncio.Queue QSIZE = 300000 QFILL = 300000 FORCE_SWITCH = False enqueue(): Put 100000 items. enqueue(): Put 200000 items. enqueue(): Put 300000 items. dequeue(): Get 100000 items. dequeue(): Get 200000 items. dequeue(): Get 300000 items. asynio.Queue put() and get() 300000 items. 0.67 seconds, 448386.61 items per second. QSIZE = 100000 QFILL = 300000 FORCE_SWITCH = False enqueue(): Put 100000 items. dequeue(): Get 100000 items. enqueue(): Put 200000 items. dequeue(): Get 200000 items. enqueue(): Put 300000 items. dequeue(): Get 300000 items. asynio.Queue put() and get() 300000 items. 0.68 seconds, 442956.25 items per second. QSIZE = 300000 QFILL = 300000 FORCE_SWITCH = True enqueue(): Put 100000 items. dequeue(): Get 100000 items. enqueue(): Put 200000 items. dequeue(): Get 200000 items. enqueue(): Put 300000 items. dequeue(): Get 300000 items. asynio.Queue put() and get() 300000 items. 0.68 seconds, 440712.40 items per second. ''' import asyncio import time QSIZE = 300000 QFILL = 300000 FORCE_SWITCH = False OUTPUT_INTERVAL = 100000 TICK = { 'security_id': '2330.TW', 'open': 599.05, 'close': 600.15, 'high': 612.05, 'low': 593.75, '5d': 595.25, '10d': 590.85, '20d': 588.65, '60d': 550.50, '120d': 510.45, '240d': 450.15 } async def enqueue(): global q i = 0 while i < QFILL: await q.put(TICK.copy()) i += 1 if i % OUTPUT_INTERVAL == 0: print('enqueue(): Put %d items.' % i) if FORCE_SWITCH: await asyncio.sleep(0) async def dequeue(): global q i = 0 while i < QFILL: await q.get() i += 1 if i % OUTPUT_INTERVAL == 0: print('dequeue(): Get %d items.' % i) if FORCE_SWITCH: await asyncio.sleep(0) async def main(): global q q = asyncio.Queue(maxsize=QSIZE) begin = time.time() await asyncio.gather(dequeue(), enqueue()) elapsed = time.time() - begin print('asynio.Queue put() and get() %d items.' % QFILL) print('%.2f seconds, %.2f items per second.' % (elapsed, QFILL / elapsed)) if __name__ == '__main__': asyncio.run(main())