from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.job import Job import json import logging from apscheduler.triggers.cron import CronTrigger import time from celery import Celery from typing import List # You must install APScheduler and celery in order to use that code # In addition, you must override get_tasks, get_celery_application, update_periodic_tasks_from_database methods logger = logging.getLogger(__name__) class PeriodicTask: def __init__(self, name: str, task: str, args: list, kwargs: dict, cron: CronTrigger) -> None: self.name = name self.task = task self.args = args self.kwargs = kwargs self.cron = cron def get_tasks() -> List[PeriodicTask]: """ Adjust this method to get list of periodic tasks from a file/database/whatever :return: """ return [ PeriodicTask(name='test task', task='tasks.test_task', args=[], kwargs={}, cron=CronTrigger(minute=1)) ] def get_celery_application() -> Celery: """ Adjust this method to get the Celery object in your project. :return: """ pass def update_periodic_tasks_from_database(scheduler: BackgroundScheduler) -> None: """ Adjust this method to update changed tasks from file/database/whatever using scheduler.get_job to get the job, and then you can update it as you want. :param scheduler: :return: """ pass class CeleryPoolExecutor(ThreadPoolExecutor): """ A threaded pool executor that will dispatch celery tasks instead running the tasks itself. """ def _do_submit_job(self, job: Job, run_times: int) -> None: try: logger.info('About to start task {} (Known next run time: {})'.format(job.name, job.next_run_time)) try: get_celery_application().send_task('api.tasks.{task_name}'.format(task_name=job.name), args=job.args, kwargs=job.kwargs) except Exception as e: logger.warning('Could not send task through celery. Exception: {}'.format(repr(e))) self._run_job_success(job.id, []) except Exception as e: logger.error('Failed to start task {} by scheduler. Exception was: {}'.format(job.name, repr(e))) self._run_job_error(job.id, e, []) def dummy_func(*args, **kwargs) -> None: """ A dummy task required for APScheduler as a mandatory field. We are using APScheduler to only schedule celery tasks, and not as a job runner, therefore we will provide this dummy function instead a real one. :param args: :param kwargs: :return: """ pass def run(): scheduler = BackgroundScheduler() scheduler.add_jobstore(MemoryJobStore(), "default") scheduler.add_executor(CeleryPoolExecutor(), 'default') scheduler.remove_all_jobs() # Populate scheduler tasks by the database for periodic_task in get_tasks(): scheduler.add_job( func=dummy_func, trigger=CronTrigger(**json.loads(periodic_task.cron)), args=json.loads(periodic_task.args) if periodic_task.args else None, kwargs=json.loads(periodic_task.kwargs) if periodic_task.kwargs else None, name=periodic_task.task, coalesce=True, id=periodic_task.name, ) logger.info('Scheduler started') try: scheduler.start() while True: update_periodic_tasks_from_database(scheduler) time.sleep(5) except Exception as e: logger.info('Shutting down scheduler') scheduler.shutdown() if __name__ == "__main__": run()