Skip to content

Instantly share code, notes, and snippets.

@rangermeier
Forked from reclosedev/celery_sentinel.py
Last active April 6, 2017 12:01
Show Gist options
  • Save rangermeier/c646d2bca968adc772a04bb8ea49bc7d to your computer and use it in GitHub Desktop.
Save rangermeier/c646d2bca968adc772a04bb8ea49bc7d to your computer and use it in GitHub Desktop.

Revisions

  1. rangermeier revised this gist Apr 6, 2017. 1 changed file with 46 additions and 52 deletions.
    98 changes: 46 additions & 52 deletions celery_sentinel.py
    Original file line number Diff line number Diff line change
    @@ -1,27 +1,37 @@
    """
    This module adds Redis Sentinel transport support to Celery.
    Current version of celery doesn't support Redis sentinel client, which is must have for automatic failover.
    To use it::
    This module adds Redis Sentinel backend support to Celery.
    Current version (4.0.2) of Celery supports Redis sentinel only as broker but
    not as result backend.
    To use it in Django::
    import register_celery_alias
    register_celery_alias("redis-sentinel")
    celery = Celery(..., broker="redis-sentinel://...", backend="redis-sentinel://...")
    register_celery_alias('redis-sentinel')
    app = Celery('my_app')
    app.config_from_object('django.conf:settings', namespace='CELERY')
    Example config::
    CELERY_RESULT_BACKEND = 'redis-sentinel://...'
    CELERY_REDIS_SENTINEL_SENTINELS = [('sentinel-host01', '26379'),
    ('sentinel-host02', '26379'),
    ('sentinel-host03', '26379'),
    ]
    CELERY_REDIS_SENTINEL_SERVICE_NAME = 'mymaster'
    CELERY_REDIS_SENTINEL_DB = 2
    Stripped down version of https://gist.github.com/reclosedev/8460004
    """
    from celery.backends import BACKEND_ALIASES
    from kombu.transport import TRANSPORT_ALIASES
    from celery.app.backends import BACKEND_ALIASES
    from celery.backends.redis import RedisBackend
    from kombu.utils import cached_property
    from kombu.transport.redis import Transport, Channel
    from redis import Redis
    from redis import StrictRedis
    from redis.sentinel import Sentinel


    class RedisSentinelBackend(RedisBackend):

    def __init__(self, sentinels=None, sentinel_timeout=None, socket_timeout=None,
    min_other_sentinels=0, service_name=None, **kwargs):
    def __init__(self, sentinels=None, sentinel_timeout=None,
    socket_timeout=None, service_name=None, min_other_sentinels=0,
    sentinel_db=0, **kwargs):
    super(RedisSentinelBackend, self).__init__(**kwargs)
    conf = self.app.conf

    @@ -31,50 +41,34 @@ def _get(key):
    except KeyError:
    pass

    self.sentinels = sentinels or _get("SENTINELS")
    self.sentinel_timeout = sentinel_timeout or _get("SENTINEL_TIMEOUT")
    self.socket_timeout = socket_timeout or _get("SOCKET_TIMEOUT")
    self.min_other_sentinels = min_other_sentinels or _get("MIN_OTHER_SENTINELS")
    self.service_name = service_name or _get("SERVICE_NAME")
    self.sentinels = sentinels or _get('SENTINELS')
    self.sentinel_timeout = sentinel_timeout or _get('SENTINEL_TIMEOUT')
    self.socket_timeout = socket_timeout or _get('SOCKET_TIMEOUT')
    self.service_name = service_name or _get('SERVICE_NAME')
    self.min_other_sentinels = min_other_sentinels
    if self.min_other_sentinels == 0 and _get('MIN_OTHER_SENTINELS') != 0:
    self.min_other_sentinels = _get('MIN_OTHER_SENTINELS')
    self.sentinel_db = sentinel_db
    if self.sentinel_db == 0 and _get('DB') != 0:
    self.sentinel_db = _get('DB')

    @cached_property
    def client(self):
    sentinel = Sentinel(self.sentinels, min_other_sentinels=self.min_other_sentinels,
    password=self.password, sentinel_kwargs={"socket_timeout": self.sentinel_timeout})
    return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout)


    class SentinelChannel(Channel):

    from_transport_options = Channel.from_transport_options + (
    "service_name",
    "sentinels",
    "password",
    "min_other_sentinels",
    "sentinel_timeout",
    )

    #noinspection PyUnresolvedReferences
    @cached_property
    def _sentinel_managed_pool(self):

    sentinel = Sentinel(
    self.sentinels,
    min_other_sentinels=getattr(self, "min_other_sentinels", 0),
    password=getattr(self, "password", None),
    sentinel_kwargs={"socket_timeout": getattr(self, "sentinel_timeout", None)},
    min_other_sentinels=self.min_other_sentinels,
    password=self.password,
    sentinel_kwargs={
    'socket_timeout': self.sentinel_timeout,
    }
    )
    return sentinel.master_for(
    self.service_name,
    redis_class=StrictRedis,
    db=self.sentinel_db,
    socket_timeout=self.socket_timeout
    )
    return sentinel.master_for(self.service_name, self.Client,
    socket_timeout=self.socket_timeout).connection_pool

    def _get_pool(self):
    return self._sentinel_managed_pool


    class RedisSentinelTransport(Transport):
    Channel = SentinelChannel


    def register_celery_alias(alias="redis-sentinel"):
    BACKEND_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelBackend"
    TRANSPORT_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelTransport"
    def register_celery_alias(alias='redis-sentinel'):
    BACKEND_ALIASES[alias] = 'utils.celery_sentinel.RedisSentinelBackend'
  2. @reclosedev reclosedev revised this gist Apr 16, 2015. 1 changed file with 18 additions and 4 deletions.
    22 changes: 18 additions & 4 deletions celery_sentinel.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,14 @@
    """
    This module adds Redis Sentinel transport support to Celery.
    Current version of celery doesn't support Redis sentinel client, which is must have for automatic failover.
    To use it::
    import register_celery_alias
    register_celery_alias("redis-sentinel")
    celery = Celery(..., broker="redis-sentinel://...", backend="redis-sentinel://...")
    """
    from celery.backends import BACKEND_ALIASES
    from kombu.transport import TRANSPORT_ALIASES
    from celery.backends.redis import RedisBackend
    @@ -28,7 +39,8 @@ def _get(key):

    @cached_property
    def client(self):
    sentinel = Sentinel(self.sentinels, self.password, self.sentinel_timeout, self.min_other_sentinels)
    sentinel = Sentinel(self.sentinels, min_other_sentinels=self.min_other_sentinels,
    password=self.password, sentinel_kwargs={"socket_timeout": self.sentinel_timeout})
    return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout)


    @@ -45,10 +57,12 @@ class SentinelChannel(Channel):
    #noinspection PyUnresolvedReferences
    @cached_property
    def _sentinel_managed_pool(self):

    sentinel = Sentinel(
    self.sentinels, getattr(self, "password", None),
    getattr(self, "sentinel_timeout", None),
    getattr(self, "min_other_sentinels", 0)
    self.sentinels,
    min_other_sentinels=getattr(self, "min_other_sentinels", 0),
    password=getattr(self, "password", None),
    sentinel_kwargs={"socket_timeout": getattr(self, "sentinel_timeout", None)},
    )
    return sentinel.master_for(self.service_name, self.Client,
    socket_timeout=self.socket_timeout).connection_pool
  3. @reclosedev reclosedev created this gist Jan 16, 2014.
    66 changes: 66 additions & 0 deletions celery_sentinel.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    from celery.backends import BACKEND_ALIASES
    from kombu.transport import TRANSPORT_ALIASES
    from celery.backends.redis import RedisBackend
    from kombu.utils import cached_property
    from kombu.transport.redis import Transport, Channel
    from redis import Redis
    from redis.sentinel import Sentinel


    class RedisSentinelBackend(RedisBackend):

    def __init__(self, sentinels=None, sentinel_timeout=None, socket_timeout=None,
    min_other_sentinels=0, service_name=None, **kwargs):
    super(RedisSentinelBackend, self).__init__(**kwargs)
    conf = self.app.conf

    def _get(key):
    try:
    return conf['CELERY_REDIS_SENTINEL_%s' % key]
    except KeyError:
    pass

    self.sentinels = sentinels or _get("SENTINELS")
    self.sentinel_timeout = sentinel_timeout or _get("SENTINEL_TIMEOUT")
    self.socket_timeout = socket_timeout or _get("SOCKET_TIMEOUT")
    self.min_other_sentinels = min_other_sentinels or _get("MIN_OTHER_SENTINELS")
    self.service_name = service_name or _get("SERVICE_NAME")

    @cached_property
    def client(self):
    sentinel = Sentinel(self.sentinels, self.password, self.sentinel_timeout, self.min_other_sentinels)
    return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout)


    class SentinelChannel(Channel):

    from_transport_options = Channel.from_transport_options + (
    "service_name",
    "sentinels",
    "password",
    "min_other_sentinels",
    "sentinel_timeout",
    )

    #noinspection PyUnresolvedReferences
    @cached_property
    def _sentinel_managed_pool(self):
    sentinel = Sentinel(
    self.sentinels, getattr(self, "password", None),
    getattr(self, "sentinel_timeout", None),
    getattr(self, "min_other_sentinels", 0)
    )
    return sentinel.master_for(self.service_name, self.Client,
    socket_timeout=self.socket_timeout).connection_pool

    def _get_pool(self):
    return self._sentinel_managed_pool


    class RedisSentinelTransport(Transport):
    Channel = SentinelChannel


    def register_celery_alias(alias="redis-sentinel"):
    BACKEND_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelBackend"
    TRANSPORT_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelTransport"