Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Curiouspaul1/ad0e3f55d00ed191f5f7636b17d49e5f to your computer and use it in GitHub Desktop.
Save Curiouspaul1/ad0e3f55d00ed191f5f7636b17d49e5f to your computer and use it in GitHub Desktop.

Revisions

  1. @amitripshtos amitripshtos created this gist Aug 22, 2018.
    117 changes: 117 additions & 0 deletions alternative-scheduler-for-celery.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,117 @@
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.job import Job
    import json
    import logging
    from apscheduler.triggers.cron import CronTrigger
    import time
    from celery import Celery
    from typing import List

    # You must install APScheduler and celery in order to use that code
    # In addition, you must override get_tasks, get_celery_application, update_periodic_tasks_from_database methods

    logger = logging.getLogger(__name__)


    class PeriodicTask:
    def __init__(self, name: str, task: str, args: list, kwargs: dict, cron: CronTrigger) -> None:
    self.name = name
    self.task = task
    self.args = args
    self.kwargs = kwargs
    self.cron = cron


    def get_tasks() -> List[PeriodicTask]:
    """
    Adjust this method to get list of periodic tasks from a file/database/whatever
    :return:
    """
    return [
    PeriodicTask(name='test task', task='tasks.test_task', args=[], kwargs={}, cron=CronTrigger(minute=1))
    ]


    def get_celery_application() -> Celery:
    """
    Adjust this method to get the Celery object in your project.
    :return:
    """
    pass


    def update_periodic_tasks_from_database(scheduler: BackgroundScheduler) -> None:
    """
    Adjust this method to update changed tasks from file/database/whatever using scheduler.get_job to get the job, and then you can update it as you want.
    :param scheduler:
    :return:
    """
    pass


    class CeleryPoolExecutor(ThreadPoolExecutor):
    """
    A threaded pool executor that will dispatch celery tasks instead running the tasks itself.
    """
    def _do_submit_job(self, job: Job, run_times: int) -> None:
    try:
    logger.info('About to start task {} (Known next run time: {})'.format(job.name, job.next_run_time))
    try:
    get_celery_application().send_task('api.tasks.{task_name}'.format(task_name=job.name), args=job.args, kwargs=job.kwargs)
    except Exception as e:
    logger.warning('Could not send task through celery. Exception: {}'.format(repr(e)))
    self._run_job_success(job.id, [])
    except Exception as e:
    logger.error('Failed to start task {} by scheduler. Exception was: {}'.format(job.name, repr(e)))
    self._run_job_error(job.id, e, [])


    def dummy_func(*args, **kwargs) -> None:
    """
    A dummy task required for APScheduler as a mandatory field.
    We are using APScheduler to only schedule celery tasks, and not as a job runner,
    therefore we will provide this dummy function instead a real one.
    :param args:
    :param kwargs:
    :return:
    """
    pass


    def run():
    scheduler = BackgroundScheduler()
    scheduler.add_jobstore(MemoryJobStore(), "default")
    scheduler.add_executor(CeleryPoolExecutor(), 'default')

    scheduler.remove_all_jobs()

    # Populate scheduler tasks by the database
    for periodic_task in get_tasks():
    scheduler.add_job(
    func=dummy_func,
    trigger=CronTrigger(**json.loads(periodic_task.cron)),
    args=json.loads(periodic_task.args) if periodic_task.args else None,
    kwargs=json.loads(periodic_task.kwargs) if periodic_task.kwargs else None,
    name=periodic_task.task,
    coalesce=True,
    id=periodic_task.name,
    )

    logger.info('Scheduler started')
    try:
    scheduler.start()
    while True:
    update_periodic_tasks_from_database(scheduler)
    time.sleep(5)
    except Exception as e:
    logger.info('Shutting down scheduler')
    scheduler.shutdown()


    if __name__ == "__main__":
    run()