Skip to content

Instantly share code, notes, and snippets.

@IrSent
Created November 12, 2016 19:12
Show Gist options
  • Select an option

  • Save IrSent/5e4820f6b187d3654967b55e27d5d204 to your computer and use it in GitHub Desktop.

Select an option

Save IrSent/5e4820f6b187d3654967b55e27d5d204 to your computer and use it in GitHub Desktop.

Revisions

  1. IrSent created this gist Nov 12, 2016.
    22 changes: 22 additions & 0 deletions celery.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    # Safe Queue Celery App Settings

    from __future__ import absolute_import, unicode_literals
    from celery import Celery

    app = Celery('some_project',
    broker='amqp://',
    backend='amqp://',
    include=['some_project.tasks'])

    # Optional configuration, see the application user guide.
    app.conf.update(
    result_expires=3600,
    )

    app.conf.update(
    enable_utc=True,
    timezone='Europe/Kiev',
    )

    if __name__ == '__main__':
    app.start()
    51 changes: 51 additions & 0 deletions settings.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,51 @@

    # Sentry

    # By default CELERYD_HIJACK_ROOT_LOGGER = True
    # Is important variable that allows Celery to overlap other custom logging handlers
    CELERYD_HIJACK_ROOT_LOGGER = False

    LOGGING = {
    'handlers': {
    'celery_sentry_handler': {
    'level': 'ERROR',
    'class': 'core.log.handlers.CelerySentryHandler'
    }
    },

    'loggers': {
    'celery': {
    'handlers': ['celery_sentry_handler'],
    'level': 'ERROR',
    'propagate': False,
    },
    }
    }

    # ...
    #
    # Prioritize your tasks!
    #

    CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('normal', Exchange('normal'), routing_key='normal'),
    Queue('low', Exchange('low'), routing_key='low'),
    )

    CELERY_DEFAULT_QUEUE = 'normal'
    CELERY_DEFAULT_EXCHANGE = 'normal'
    CELERY_DEFAULT_ROUTING_KEY = 'normal'

    CELERY_ROUTES = {
    # -- HIGH PRIORITY QUEUE -- #
    'myapp.tasks.check_payment_status': {'queue': 'high'},
    # -- LOW PRIORITY QUEUE -- #
    'myapp.tasks.close_session': {'queue': 'low'},
    }

    # Keep result only if you really need them: CELERY_IGNORE_RESULT = False
    # In all other cases it is better to have place somewhere in db
    CELERY_IGNORE_RESULT = True

    # ...
    20 changes: 20 additions & 0 deletions shell_commands
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,20 @@
    # Launch your workers
    celery worker -E -l INFO -n worker.high -Q high
    celery worker -E -l INFO -n worker.normal -Q normal
    celery worker -E -l INFO -n worker.low -Q low

    # This worker will accept tasks if for example all other high queue workers are busy
    celery worker -E -l INFO -n worker.whatever


    # Use FLOWER to monitor your Celery app `https://github.com/mher/flower`, `https://flower.readthedocs.io/`
    $ pip install flower

    # Launch the server and open http://localhost:5555
    $ flower -A some_project --port=5555

    # Or, launch from Celery
    $ celery flower -A proj --address=127.0.0.1 --port=5555

    # Broker URL and other configuration options can be passed through the standard Celery options
    $ celery flower -A proj --broker=amqp://guest:guest@localhost:5672//
    22 changes: 22 additions & 0 deletions tasks.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    # Keep your tasks in tasks.py small and readable
    # Don't place your business logic in it, just import them here

    from __future__ import absolute_import, unicode_literals
    from .celery import app

    from .utils import generate_report, send_email

    # BTW bind=True is helpful to restart this task in case it is failed
    @app.task(bind=True)
    def send_report(self):
    filename = generate_report()
    try:
    send_email(subject, message, attachments=[filename])
    except SomeError as exc:
    # Retry in 5 minutes.
    raise self.retry(countdown=60 * 5, exc=exc)

    # set time limits to avoid worker hangs
    @app.task(bind=True, soft_time_limit=120, time_limit=180)
    def some_long_task_with_high_risk_of_failure():
    pass