Skip to content

Instantly share code, notes, and snippets.

@fordnox
Created November 8, 2024 11:54
Show Gist options
  • Save fordnox/a3825f710b87e644771a4d98a48441fc to your computer and use it in GitHub Desktop.
Save fordnox/a3825f710b87e644771a4d98a48441fc to your computer and use it in GitHub Desktop.

Revisions

  1. fordnox created this gist Nov 8, 2024.
    50 changes: 50 additions & 0 deletions async_loop.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    import asyncio
    import signal

    async def hourly_task():
    try:
    while True:
    print("Running hourly task")
    await asyncio.sleep(3) # Sleep for 1 hour
    except asyncio.CancelledError:
    print("Hourly task canceled")
    # Perform any cleanup if needed

    async def half_hourly_task():
    try:
    while True:
    print("Running half-hourly task")
    await asyncio.sleep(1) # Sleep for 30 minutes
    except asyncio.CancelledError:
    print("Half-hourly task canceled")
    # Perform any cleanup if needed

    def shutdown(loop, tasks):
    print("Received SIGTERM. Cancelling tasks...")
    for task in tasks:
    task.cancel() # Cancel each task

    loop.stop() # Stop the loop after tasks are canceled

    # Run the tasks with asyncio
    loop = asyncio.new_event_loop()

    try:
    # Create tasks only once here
    task1 = loop.create_task(hourly_task())
    task2 = loop.create_task(half_hourly_task())
    tasks = [task1, task2]

    # Set up SIGTERM handler to cancel tasks
    loop.add_signal_handler(signal.SIGTERM, shutdown, loop, tasks)
    loop.add_signal_handler(signal.SIGINT, shutdown, loop, tasks)

    # Run the loop until the tasks are done or canceled
    loop.run_forever()
    for t in [t for t in tasks if not (t.done() or t.cancelled())]:
    # give canceled tasks the last chance to run
    loop.run_until_complete(t)
    except asyncio.CancelledError:
    print("Tasks were cancelled")
    finally:
    loop.close()