Skip to content

Instantly share code, notes, and snippets.

@darklow
Last active October 1, 2025 05:52
Show Gist options
  • Select an option

  • Save darklow/c70a8d1147f05be877c3 to your computer and use it in GitHub Desktop.

Select an option

Save darklow/c70a8d1147f05be877c3 to your computer and use it in GitHub Desktop.

Revisions

  1. darklow revised this gist Dec 16, 2016. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions celery_tasks_error_handling.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,4 @@
    from celery import Task
    from celery.task import task
    from my_app.models import FailedTask
    from django.db import models
  2. darklow revised this gist Oct 2, 2014. 1 changed file with 43 additions and 0 deletions.
    43 changes: 43 additions & 0 deletions celery_tasks_error_handling.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,6 @@
    from celery.task import task
    from my_app.models import FailedTask
    from django.db import models

    @task(base=LogErrorsTask)
    def some task():
    @@ -47,3 +48,45 @@ def save_failed_task(self, exc, task_id, args, kwargs, traceback):
    update_fields=('updated_at', 'failures'))
    else:
    task.save(force_insert=True)


    class FailedTask(models.Model):
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(null=True, blank=True)
    name = models.CharField(max_length=125)
    full_name = models.TextField()
    args = models.TextField(null=True, blank=True)
    kwargs = models.TextField(null=True, blank=True)
    exception_class = models.TextField()
    exception_msg = models.TextField()
    traceback = models.TextField(null=True, blank=True)
    celery_task_id = models.CharField(max_length=36)
    failures = models.PositiveSmallIntegerField(default=1)

    class Meta:
    ordering = ('-updated_at',)

    def __unicode__(self):
    return '%s %s [%s]' % (self.name, self.args, self.exception_class)

    def retry_and_delete(self, inline=False):

    import importlib

    # Import real module and function
    mod_name, func_name = self.full_name.rsplit('.', 1)
    mod = importlib.import_module(mod_name)
    func = getattr(mod, func_name)

    args = json.loads(self.args) if self.args else ()
    kwargs = json.loads(self.kwargs) if self.kwargs else {}
    if inline:
    try:
    res = func(*args, **kwargs)
    self.delete()
    return res
    except Exception as e:
    raise e

    self.delete()
    return func.delay(*args, **kwargs)
  3. darklow renamed this gist Oct 2, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  4. darklow created this gist Oct 2, 2014.
    49 changes: 49 additions & 0 deletions sentry_error_handling.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,49 @@
    from celery.task import task
    from my_app.models import FailedTask

    @task(base=LogErrorsTask)
    def some task():
    return result

    class LogErrorsTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
    self.save_failed_task(exc, task_id, args, kwargs, einfo)
    super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    def save_failed_task(self, exc, task_id, args, kwargs, traceback):
    """
    :type exc: Exception
    """
    task = FailedTask()
    task.celery_task_id = task_id
    task.full_name = self.name
    task.name = self.name.split('.')[-1]
    task.exception_class = exc.__class__.__name__
    task.exception_msg = unicode(exc).strip()
    task.traceback = unicode(traceback).strip()
    task.updated_at = timezone.now()

    if args:
    task.args = json.dumps(list(args))
    if kwargs:
    task.kwargs = json.dumps(kwargs)

    # Find if task with same args, name and exception already exists
    # If it do, update failures count and last updated_at
    #: :type: FailedTask
    existing_task = FailedTask.objects.filter(
    args=task.args,
    kwargs=task.kwargs,
    full_name=task.full_name,
    exception_class=task.exception_class,
    exception_msg=task.exception_msg,
    )

    if len(existing_task):
    existing_task = existing_task[0]
    existing_task.failures += 1
    existing_task.updated_at = task.updated_at
    existing_task.save(force_update=True,
    update_fields=('updated_at', 'failures'))
    else:
    task.save(force_insert=True)