Skip to content

Instantly share code, notes, and snippets.

@lost-theory
Last active August 29, 2015 14:08
Show Gist options
  • Select an option

  • Save lost-theory/2e94ae20643ac32a6773 to your computer and use it in GitHub Desktop.

Select an option

Save lost-theory/2e94ae20643ac32a6773 to your computer and use it in GitHub Desktop.

Revisions

  1. lost-theory revised this gist Nov 7, 2014. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions rq_custom_scheduler.py
    Original file line number Diff line number Diff line change
    @@ -15,6 +15,9 @@ class CustomRQScheduler(Scheduler):
    expiration of results by yourself.
    See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43
    This works with rq-scheduler==0.5.0. Be careful about running with a version
    other than that.
    '''
    def enqueue_job(self, job):
    """
  2. lost-theory created this gist Nov 7, 2014.
    50 changes: 50 additions & 0 deletions rq_custom_scheduler.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    from datetime import datetime
    import logging

    logging.basicConfig(level=logging.DEBUG)

    from redis import Redis
    from rq_scheduler import Scheduler
    from rq_scheduler.utils import to_unix

    class CustomRQScheduler(Scheduler):
    '''
    Same as the core rq_scheduler.Scheduler, but with the enqueue_job method
    overridden to enqueue new jobs for each run instead of re-using the same job
    ID. This fixes race conditions around overlapping jobs and allows you to manage
    expiration of results by yourself.
    See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43
    '''
    def enqueue_job(self, job):
    """
    Move a scheduled job to a queue. In addition, it also does puts the job
    back into the scheduler if needed.
    """
    self.log.debug('Pushing {0} to {1}'.format(job.id, job.origin))

    interval = job.meta.get('interval', None)
    repeat = job.meta.get('repeat', None)

    # If job is a repeated job, decrement counter
    if repeat:
    job.meta['repeat'] = int(repeat) - 1
    job.enqueued_at = datetime.utcnow()
    job.save()

    queue = self.get_queue_for_job(job)
    queue.enqueue_call(job.func, job.args, job.kwargs, job.timeout, job.result_ttl)
    self.connection.zrem(self.scheduled_jobs_key, job.id)

    if interval:
    # If this is a repeat job and counter has reached 0, don't repeat
    if repeat is not None:
    if job.meta['repeat'] == 0:
    return
    self.connection._zadd(self.scheduled_jobs_key,
    to_unix(datetime.utcnow()) + int(interval),
    job.id)

    if __name__ == "__main__":
    s = CustomRQScheduler(connection=Redis(), interval=5)
    s.run()