Created
November 8, 2024 11:54
-
-
Save fordnox/a3825f710b87e644771a4d98a48441fc to your computer and use it in GitHub Desktop.
Revisions
-
fordnox created this gist
Nov 8, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,50 @@ import asyncio import signal async def hourly_task(): try: while True: print("Running hourly task") await asyncio.sleep(3) # Sleep for 1 hour except asyncio.CancelledError: print("Hourly task canceled") # Perform any cleanup if needed async def half_hourly_task(): try: while True: print("Running half-hourly task") await asyncio.sleep(1) # Sleep for 30 minutes except asyncio.CancelledError: print("Half-hourly task canceled") # Perform any cleanup if needed def shutdown(loop, tasks): print("Received SIGTERM. Cancelling tasks...") for task in tasks: task.cancel() # Cancel each task loop.stop() # Stop the loop after tasks are canceled # Run the tasks with asyncio loop = asyncio.new_event_loop() try: # Create tasks only once here task1 = loop.create_task(hourly_task()) task2 = loop.create_task(half_hourly_task()) tasks = [task1, task2] # Set up SIGTERM handler to cancel tasks loop.add_signal_handler(signal.SIGTERM, shutdown, loop, tasks) loop.add_signal_handler(signal.SIGINT, shutdown, loop, tasks) # Run the loop until the tasks are done or canceled loop.run_forever() for t in [t for t in tasks if not (t.done() or t.cancelled())]: # give canceled tasks the last chance to run loop.run_until_complete(t) except asyncio.CancelledError: print("Tasks were cancelled") finally: loop.close()