from os import urandom from celery.beat import PersistentScheduler from some_place create_readis_connection # ! class SingletonScheduler(PersistentScheduler): def __init__(self, *args, **kwargs): super(SingletonScheduler, self).__init__(*args, **kwargs) self._mutex = RedisMutex(self.__class__.__name__, create_readis_connection()) self._mutex_ttl_ms = int(self.max_interval * 2 * 1000) def tick(self): if not self._mutex.acquire(self._mutex_ttl_ms): return self.max_interval new_tick_interval = super(SingletonScheduler, self).tick() self._mutex_ttl_ms = ttl_ms = int(new_tick_interval * 2 * 1000) self._mutex.update_ttl(ttl_ms) return new_tick_interval def close(self): super(SingletonScheduler, self).close() self._mutex.release() class RedisMutex(object): """ Distributed mutex with ttl. """ def __init__(self, name, redis, token_length=16): self.name = name self.redis = redis self._token = urandom(token_length) # snippet from http://redis.io/commands/set self._delete_lock_cmd = self.redis.register_script(""" if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """) def acquire(self, ttl_ms=None): """ NOTE: blocking until mutex available is not implemented """ token = self.redis.get(self.name) if token is None: if not self.redis.set(self.name, self._token, px=ttl_ms, nx=True): return False elif token != self._token: return False return True def release(self): self._delete_lock_cmd([self.name], [self._token]) def update_ttl(self, ttl_ms): return self.redis.set(self.name, self._token, px=ttl_ms)