|
|
@@ -1,27 +1,37 @@ |
|
|
""" |
|
|
This module adds Redis Sentinel transport support to Celery. |
|
|
Current version of celery doesn't support Redis sentinel client, which is must have for automatic failover. |
|
|
|
|
|
To use it:: |
|
|
This module adds Redis Sentinel backend support to Celery. |
|
|
Current version (4.0.2) of Celery supports Redis sentinel only as broker but |
|
|
not as result backend. |
|
|
|
|
|
To use it in Django:: |
|
|
import register_celery_alias |
|
|
register_celery_alias("redis-sentinel") |
|
|
|
|
|
celery = Celery(..., broker="redis-sentinel://...", backend="redis-sentinel://...") |
|
|
register_celery_alias('redis-sentinel') |
|
|
app = Celery('my_app') |
|
|
app.config_from_object('django.conf:settings', namespace='CELERY') |
|
|
|
|
|
Example config:: |
|
|
CELERY_RESULT_BACKEND = 'redis-sentinel://...' |
|
|
CELERY_REDIS_SENTINEL_SENTINELS = [('sentinel-host01', '26379'), |
|
|
('sentinel-host02', '26379'), |
|
|
('sentinel-host03', '26379'), |
|
|
] |
|
|
CELERY_REDIS_SENTINEL_SERVICE_NAME = 'mymaster' |
|
|
CELERY_REDIS_SENTINEL_DB = 2 |
|
|
|
|
|
Stripped down version of https://gist.github.com/reclosedev/8460004 |
|
|
""" |
|
|
from celery.backends import BACKEND_ALIASES |
|
|
from kombu.transport import TRANSPORT_ALIASES |
|
|
from celery.app.backends import BACKEND_ALIASES |
|
|
from celery.backends.redis import RedisBackend |
|
|
from kombu.utils import cached_property |
|
|
from kombu.transport.redis import Transport, Channel |
|
|
from redis import Redis |
|
|
from redis import StrictRedis |
|
|
from redis.sentinel import Sentinel |
|
|
|
|
|
|
|
|
class RedisSentinelBackend(RedisBackend): |
|
|
|
|
|
def __init__(self, sentinels=None, sentinel_timeout=None, socket_timeout=None, |
|
|
min_other_sentinels=0, service_name=None, **kwargs): |
|
|
def __init__(self, sentinels=None, sentinel_timeout=None, |
|
|
socket_timeout=None, service_name=None, min_other_sentinels=0, |
|
|
sentinel_db=0, **kwargs): |
|
|
super(RedisSentinelBackend, self).__init__(**kwargs) |
|
|
conf = self.app.conf |
|
|
|
|
|
@@ -31,50 +41,34 @@ def _get(key): |
|
|
except KeyError: |
|
|
pass |
|
|
|
|
|
self.sentinels = sentinels or _get("SENTINELS") |
|
|
self.sentinel_timeout = sentinel_timeout or _get("SENTINEL_TIMEOUT") |
|
|
self.socket_timeout = socket_timeout or _get("SOCKET_TIMEOUT") |
|
|
self.min_other_sentinels = min_other_sentinels or _get("MIN_OTHER_SENTINELS") |
|
|
self.service_name = service_name or _get("SERVICE_NAME") |
|
|
self.sentinels = sentinels or _get('SENTINELS') |
|
|
self.sentinel_timeout = sentinel_timeout or _get('SENTINEL_TIMEOUT') |
|
|
self.socket_timeout = socket_timeout or _get('SOCKET_TIMEOUT') |
|
|
self.service_name = service_name or _get('SERVICE_NAME') |
|
|
self.min_other_sentinels = min_other_sentinels |
|
|
if self.min_other_sentinels == 0 and _get('MIN_OTHER_SENTINELS') != 0: |
|
|
self.min_other_sentinels = _get('MIN_OTHER_SENTINELS') |
|
|
self.sentinel_db = sentinel_db |
|
|
if self.sentinel_db == 0 and _get('DB') != 0: |
|
|
self.sentinel_db = _get('DB') |
|
|
|
|
|
@cached_property |
|
|
def client(self): |
|
|
sentinel = Sentinel(self.sentinels, min_other_sentinels=self.min_other_sentinels, |
|
|
password=self.password, sentinel_kwargs={"socket_timeout": self.sentinel_timeout}) |
|
|
return sentinel.master_for(self.service_name, redis_class=Redis, socket_timeout=self.socket_timeout) |
|
|
|
|
|
|
|
|
class SentinelChannel(Channel): |
|
|
|
|
|
from_transport_options = Channel.from_transport_options + ( |
|
|
"service_name", |
|
|
"sentinels", |
|
|
"password", |
|
|
"min_other_sentinels", |
|
|
"sentinel_timeout", |
|
|
) |
|
|
|
|
|
#noinspection PyUnresolvedReferences |
|
|
@cached_property |
|
|
def _sentinel_managed_pool(self): |
|
|
|
|
|
sentinel = Sentinel( |
|
|
self.sentinels, |
|
|
min_other_sentinels=getattr(self, "min_other_sentinels", 0), |
|
|
password=getattr(self, "password", None), |
|
|
sentinel_kwargs={"socket_timeout": getattr(self, "sentinel_timeout", None)}, |
|
|
min_other_sentinels=self.min_other_sentinels, |
|
|
password=self.password, |
|
|
sentinel_kwargs={ |
|
|
'socket_timeout': self.sentinel_timeout, |
|
|
} |
|
|
) |
|
|
return sentinel.master_for( |
|
|
self.service_name, |
|
|
redis_class=StrictRedis, |
|
|
db=self.sentinel_db, |
|
|
socket_timeout=self.socket_timeout |
|
|
) |
|
|
return sentinel.master_for(self.service_name, self.Client, |
|
|
socket_timeout=self.socket_timeout).connection_pool |
|
|
|
|
|
def _get_pool(self): |
|
|
return self._sentinel_managed_pool |
|
|
|
|
|
|
|
|
class RedisSentinelTransport(Transport): |
|
|
Channel = SentinelChannel |
|
|
|
|
|
|
|
|
def register_celery_alias(alias="redis-sentinel"): |
|
|
BACKEND_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelBackend" |
|
|
TRANSPORT_ALIASES[alias] = "utils.celery_sentinel.RedisSentinelTransport" |
|
|
def register_celery_alias(alias='redis-sentinel'): |
|
|
BACKEND_ALIASES[alias] = 'utils.celery_sentinel.RedisSentinelBackend' |