Skip to content

Instantly share code, notes, and snippets.

@reclosedev
Last active April 24, 2019 00:01
Show Gist options
  • Save reclosedev/8460004 to your computer and use it in GitHub Desktop.
Save reclosedev/8460004 to your computer and use it in GitHub Desktop.

Revisions

  1. reclosedev revised this gist Apr 16, 2015. 1 changed file with 18 additions and 4 deletions.
    22 changes: 18 additions & 4 deletions celery_sentinel.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,14 @@
    """
    This module adds Redis Sentinel transport support to Celery.
    Current version of celery doesn't support Redis sentinel client, which is must have for automatic failover.
    To use it::
    import register_celery_alias
    register_celery_alias("redis-sentinel")
    celery = Celery(..., broker="redis-sentinel://...", backend="redis-sentinel://...")
    """
    from celery.backends import BACKEND_ALIASES
    from kombu.transport import TRANSPORT_ALIASES
    from celery.backends.redis import RedisBackend
    @@ -28,7 +39,8 @@ def _get(key):

    @cached_property
    def client(self):
    sentinel = Sentinel(self.sentinels, self.password, self.sentinel_timeout, self.min_other_sentinels)
    sentinel = Sentinel(self.sentinels, min_other_sentinels=self.min_other_sentinels,
    password=self.password, sentinel_kwargs={"socket_timeout": self.sentinel_timeout})
    return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout)


    @@ -45,10 +57,12 @@ class SentinelChannel(Channel):
    #noinspection PyUnresolvedReferences
    @cached_property
    def _sentinel_managed_pool(self):

    sentinel = Sentinel(
    self.sentinels, getattr(self, "password", None),
    getattr(self, "sentinel_timeout", None),
    getattr(self, "min_other_sentinels", 0)
    self.sentinels,
    min_other_sentinels=getattr(self, "min_other_sentinels", 0),
    password=getattr(self, "password", None),
    sentinel_kwargs={"socket_timeout": getattr(self, "sentinel_timeout", None)},
    )
    return sentinel.master_for(self.service_name, self.Client,
    socket_timeout=self.socket_timeout).connection_pool
  2. reclosedev created this gist Jan 16, 2014.
    66 changes: 66 additions & 0 deletions celery_sentinel.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    from celery.backends import BACKEND_ALIASES
    from kombu.transport import TRANSPORT_ALIASES
    from celery.backends.redis import RedisBackend
    from kombu.utils import cached_property
    from kombu.transport.redis import Transport, Channel
    from redis import Redis
    from redis.sentinel import Sentinel


    class RedisSentinelBackend(RedisBackend):

    def __init__(self, sentinels=None, sentinel_timeout=None, socket_timeout=None,
    min_other_sentinels=0, service_name=None, **kwargs):
    super(RedisSentinelBackend, self).__init__(**kwargs)
    conf = self.app.conf

    def _get(key):
    try:
    return conf['CELERY_REDIS_SENTINEL_%s' % key]
    except KeyError:
    pass

    self.sentinels = sentinels or _get("SENTINELS")
    self.sentinel_timeout = sentinel_timeout or _get("SENTINEL_TIMEOUT")
    self.socket_timeout = socket_timeout or _get("SOCKET_TIMEOUT")
    self.min_other_sentinels = min_other_sentinels or _get("MIN_OTHER_SENTINELS")
    self.service_name = service_name or _get("SERVICE_NAME")

    @cached_property
    def client(self):
    sentinel = Sentinel(self.sentinels, self.password, self.sentinel_timeout, self.min_other_sentinels)
    return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout)


    class SentinelChannel(Channel):

    from_transport_options = Channel.from_transport_options + (
    "service_name",
    "sentinels",
    "password",
    "min_other_sentinels",
    "sentinel_timeout",
    )

    #noinspection PyUnresolvedReferences
    @cached_property
    def _sentinel_managed_pool(self):
    sentinel = Sentinel(
    self.sentinels, getattr(self, "password", None),
    getattr(self, "sentinel_timeout", None),
    getattr(self, "min_other_sentinels", 0)
    )
    return sentinel.master_for(self.service_name, self.Client,
    socket_timeout=self.socket_timeout).connection_pool

    def _get_pool(self):
    return self._sentinel_managed_pool


    class RedisSentinelTransport(Transport):
    Channel = SentinelChannel


    def register_celery_alias(alias="redis-sentinel"):
    BACKEND_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelBackend"
    TRANSPORT_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelTransport"