Last active
August 29, 2015 14:08
-
-
Save lost-theory/2e94ae20643ac32a6773 to your computer and use it in GitHub Desktop.
Revisions
-
lost-theory revised this gist
Nov 7, 2014 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,6 +15,9 @@ class CustomRQScheduler(Scheduler): expiration of results by yourself. See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43 This works with rq-scheduler==0.5.0. Be careful about running with a version other than that. ''' def enqueue_job(self, job): """ -
lost-theory created this gist
Nov 7, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,50 @@ from datetime import datetime import logging logging.basicConfig(level=logging.DEBUG) from redis import Redis from rq_scheduler import Scheduler from rq_scheduler.utils import to_unix class CustomRQScheduler(Scheduler): ''' Same as the core rq_scheduler.Scheduler, but with the enqueue_job method overridden to enqueue new jobs for each run instead of re-using the same job ID. This fixes race conditions around overlapping jobs and allows you to manage expiration of results by yourself. See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43 ''' def enqueue_job(self, job): """ Move a scheduled job to a queue. In addition, it also does puts the job back into the scheduler if needed. """ self.log.debug('Pushing {0} to {1}'.format(job.id, job.origin)) interval = job.meta.get('interval', None) repeat = job.meta.get('repeat', None) # If job is a repeated job, decrement counter if repeat: job.meta['repeat'] = int(repeat) - 1 job.enqueued_at = datetime.utcnow() job.save() queue = self.get_queue_for_job(job) queue.enqueue_call(job.func, job.args, job.kwargs, job.timeout, job.result_ttl) self.connection.zrem(self.scheduled_jobs_key, job.id) if interval: # If this is a repeat job and counter has reached 0, don't repeat if repeat is not None: if job.meta['repeat'] == 0: return self.connection._zadd(self.scheduled_jobs_key, to_unix(datetime.utcnow()) + int(interval), job.id) if __name__ == "__main__": s = CustomRQScheduler(connection=Redis(), interval=5) s.run()