from datetime import datetime import logging logging.basicConfig(level=logging.DEBUG) from redis import Redis from rq_scheduler import Scheduler from rq_scheduler.utils import to_unix class CustomRQScheduler(Scheduler): ''' Same as the core rq_scheduler.Scheduler, but with the enqueue_job method overridden to enqueue new jobs for each run instead of re-using the same job ID. This fixes race conditions around overlapping jobs and allows you to manage expiration of results by yourself. See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43 This works with rq-scheduler==0.5.0. Be careful about running with a version other than that. ''' def enqueue_job(self, job): """ Move a scheduled job to a queue. In addition, it also does puts the job back into the scheduler if needed. """ self.log.debug('Pushing {0} to {1}'.format(job.id, job.origin)) interval = job.meta.get('interval', None) repeat = job.meta.get('repeat', None) # If job is a repeated job, decrement counter if repeat: job.meta['repeat'] = int(repeat) - 1 job.enqueued_at = datetime.utcnow() job.save() queue = self.get_queue_for_job(job) queue.enqueue_call(job.func, job.args, job.kwargs, job.timeout, job.result_ttl) self.connection.zrem(self.scheduled_jobs_key, job.id) if interval: # If this is a repeat job and counter has reached 0, don't repeat if repeat is not None: if job.meta['repeat'] == 0: return self.connection._zadd(self.scheduled_jobs_key, to_unix(datetime.utcnow()) + int(interval), job.id) if __name__ == "__main__": s = CustomRQScheduler(connection=Redis(), interval=5) s.run()