#!/usr/bin/env python3 import asyncio import logging import ray import time @ray.remote class Worker(): def __init__(self): self.logger = logging.getLogger(__name__) self.counter = 0 self.logger.warning("init") def cycle(self): self.logger.warning(f"counter is {self.counter}") self.counter += 1 @ray.remote class Interval(): """ based on https://phoolish-philomath.com/asynchronous-task-scheduling-in-python.html """ async def run_periodically(self, wait_time, func, *args): """ Helper for schedule_task_periodically. Wraps a function in a coroutine that will run the given function indefinitely :param wait_time: seconds to wait between iterations of func :param func: the function that will be run :param args: any args that need to be provided to func """ while True: func(*args) await asyncio.sleep(wait_time) def schedule_task_periodically(self, wait_time, func, *args): """ Schedule a function to run periodically as an asyncio.Task :param wait_time: interval (in seconds) :param func: the function that will be run :param args: any args needed to be provided to func :return: an asyncio Task that has been scheduled to run """ self.task = asyncio.create_task( self.run_periodically(wait_time, func, *args)) async def cancel_scheduled_task(self, task): """ Gracefully cancels a task :type task: asyncio.Task """ self.task.cancel() try: await self.task except asyncio.CancelledError: pass if __name__ == "__main__": ray.init(address="auto", _redis_password="5241590000000000") interval = Interval.options(lifetime="detached", name="interval").remote() worker = Worker.remote() interval.schedule_task_periodically.remote(2, worker.cycle.remote)