Skip to content

Instantly share code, notes, and snippets.

@rangermeier
Forked from reclosedev/celery_sentinel.py
Last active April 6, 2017 12:01
Show Gist options
  • Save rangermeier/c646d2bca968adc772a04bb8ea49bc7d to your computer and use it in GitHub Desktop.
Save rangermeier/c646d2bca968adc772a04bb8ea49bc7d to your computer and use it in GitHub Desktop.
Temporary hack. Support for Redis Sentinel as result backend for Celery 4.0.2
from celery.backends import BACKEND_ALIASES
from kombu.transport import TRANSPORT_ALIASES
from celery.backends.redis import RedisBackend
from kombu.utils import cached_property
from kombu.transport.redis import Transport, Channel
from redis import Redis
from redis.sentinel import Sentinel
class RedisSentinelBackend(RedisBackend):
def __init__(self, sentinels=None, sentinel_timeout=None, socket_timeout=None,
min_other_sentinels=0, service_name=None, **kwargs):
super(RedisSentinelBackend, self).__init__(**kwargs)
conf = self.app.conf
def _get(key):
try:
return conf['CELERY_REDIS_SENTINEL_%s' % key]
except KeyError:
pass
self.sentinels = sentinels or _get("SENTINELS")
self.sentinel_timeout = sentinel_timeout or _get("SENTINEL_TIMEOUT")
self.socket_timeout = socket_timeout or _get("SOCKET_TIMEOUT")
self.min_other_sentinels = min_other_sentinels or _get("MIN_OTHER_SENTINELS")
self.service_name = service_name or _get("SERVICE_NAME")
@cached_property
def client(self):
sentinel = Sentinel(self.sentinels, self.password, self.sentinel_timeout, self.min_other_sentinels)
return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout)
class SentinelChannel(Channel):
from_transport_options = Channel.from_transport_options + (
"service_name",
"sentinels",
"password",
"min_other_sentinels",
"sentinel_timeout",
)
#noinspection PyUnresolvedReferences
@cached_property
def _sentinel_managed_pool(self):
sentinel = Sentinel(
self.sentinels, getattr(self, "password", None),
getattr(self, "sentinel_timeout", None),
getattr(self, "min_other_sentinels", 0)
)
return sentinel.master_for(self.service_name, self.Client,
socket_timeout=self.socket_timeout).connection_pool
def _get_pool(self):
return self._sentinel_managed_pool
class RedisSentinelTransport(Transport):
Channel = SentinelChannel
def register_celery_alias(alias="redis-sentinel"):
BACKEND_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelBackend"
TRANSPORT_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelTransport"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment