Created
November 12, 2016 19:12
-
-
Save IrSent/5e4820f6b187d3654967b55e27d5d204 to your computer and use it in GitHub Desktop.
Revisions
-
IrSent created this gist
Nov 12, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,22 @@ # Safe Queue Celery App Settings from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('some_project', broker='amqp://', backend='amqp://', include=['some_project.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) app.conf.update( enable_utc=True, timezone='Europe/Kiev', ) if __name__ == '__main__': app.start() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,51 @@ # Sentry # By default CELERYD_HIJACK_ROOT_LOGGER = True # Is important variable that allows Celery to overlap other custom logging handlers CELERYD_HIJACK_ROOT_LOGGER = False LOGGING = { 'handlers': { 'celery_sentry_handler': { 'level': 'ERROR', 'class': 'core.log.handlers.CelerySentryHandler' } }, 'loggers': { 'celery': { 'handlers': ['celery_sentry_handler'], 'level': 'ERROR', 'propagate': False, }, } } # ... # # Prioritize your tasks! # CELERY_QUEUES = ( Queue('high', Exchange('high'), routing_key='high'), Queue('normal', Exchange('normal'), routing_key='normal'), Queue('low', Exchange('low'), routing_key='low'), ) CELERY_DEFAULT_QUEUE = 'normal' CELERY_DEFAULT_EXCHANGE = 'normal' CELERY_DEFAULT_ROUTING_KEY = 'normal' CELERY_ROUTES = { # -- HIGH PRIORITY QUEUE -- # 'myapp.tasks.check_payment_status': {'queue': 'high'}, # -- LOW PRIORITY QUEUE -- # 'myapp.tasks.close_session': {'queue': 'low'}, } # Keep result only if you really need them: CELERY_IGNORE_RESULT = False # In all other cases it is better to have place somewhere in db CELERY_IGNORE_RESULT = True # ... This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,20 @@ # Launch your workers celery worker -E -l INFO -n worker.high -Q high celery worker -E -l INFO -n worker.normal -Q normal celery worker -E -l INFO -n worker.low -Q low # This worker will accept tasks if for example all other high queue workers are busy celery worker -E -l INFO -n worker.whatever # Use FLOWER to monitor your Celery app `https://github.com/mher/flower`, `https://flower.readthedocs.io/` $ pip install flower # Launch the server and open http://localhost:5555 $ flower -A some_project --port=5555 # Or, launch from Celery $ celery flower -A proj --address=127.0.0.1 --port=5555 # Broker URL and other configuration options can be passed through the standard Celery options $ celery flower -A proj --broker=amqp://guest:guest@localhost:5672// This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,22 @@ # Keep your tasks in tasks.py small and readable # Don't place your business logic in it, just import them here from __future__ import absolute_import, unicode_literals from .celery import app from .utils import generate_report, send_email # BTW bind=True is helpful to restart this task in case it is failed @app.task(bind=True) def send_report(self): filename = generate_report() try: send_email(subject, message, attachments=[filename]) except SomeError as exc: # Retry in 5 minutes. raise self.retry(countdown=60 * 5, exc=exc) # set time limits to avoid worker hangs @app.task(bind=True, soft_time_limit=120, time_limit=180) def some_long_task_with_high_risk_of_failure(): pass