Skip to content

Instantly share code, notes, and snippets.

@reclosedev
Last active October 23, 2015 22:52
Show Gist options
  • Save reclosedev/8809480 to your computer and use it in GitHub Desktop.
Save reclosedev/8809480 to your computer and use it in GitHub Desktop.

Revisions

  1. reclosedev revised this gist Feb 4, 2014. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion singleton_scheduler.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,7 @@
    from os import urandom

    from celery.beat import PersistentScheduler
    from some_place create_readis_connection # !
    from some_place import create_readis_connection # !


    class SingletonScheduler(PersistentScheduler):
  2. reclosedev created this gist Feb 4, 2014.
    59 changes: 59 additions & 0 deletions singleton_scheduler.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,59 @@
    from os import urandom

    from celery.beat import PersistentScheduler
    from some_place create_readis_connection # !


    class SingletonScheduler(PersistentScheduler):

    def __init__(self, *args, **kwargs):
    super(SingletonScheduler, self).__init__(*args, **kwargs)
    self._mutex = RedisMutex(self.__class__.__name__, create_readis_connection())
    self._mutex_ttl_ms = int(self.max_interval * 2 * 1000)

    def tick(self):
    if not self._mutex.acquire(self._mutex_ttl_ms):
    return self.max_interval
    new_tick_interval = super(SingletonScheduler, self).tick()
    self._mutex_ttl_ms = ttl_ms = int(new_tick_interval * 2 * 1000)
    self._mutex.update_ttl(ttl_ms)
    return new_tick_interval

    def close(self):
    super(SingletonScheduler, self).close()
    self._mutex.release()


    class RedisMutex(object):
    """ Distributed mutex with ttl.
    """
    def __init__(self, name, redis, token_length=16):
    self.name = name
    self.redis = redis
    self._token = urandom(token_length)
    # snippet from http://redis.io/commands/set
    self._delete_lock_cmd = self.redis.register_script("""
    if redis.call("get", KEYS[1]) == ARGV[1]
    then
    return redis.call("del", KEYS[1])
    else
    return 0
    end
    """)

    def acquire(self, ttl_ms=None):
    """ NOTE: blocking until mutex available is not implemented
    """
    token = self.redis.get(self.name)
    if token is None:
    if not self.redis.set(self.name, self._token, px=ttl_ms, nx=True):
    return False
    elif token != self._token:
    return False
    return True

    def release(self):
    self._delete_lock_cmd([self.name], [self._token])

    def update_ttl(self, ttl_ms):
    return self.redis.set(self.name, self._token, px=ttl_ms)