Skip to content

Instantly share code, notes, and snippets.

@darklow
Last active October 1, 2025 05:52
Show Gist options
  • Select an option

  • Save darklow/c70a8d1147f05be877c3 to your computer and use it in GitHub Desktop.

Select an option

Save darklow/c70a8d1147f05be877c3 to your computer and use it in GitHub Desktop.
Celery tasks error handling example
from celery.task import task
from my_app.models import FailedTask
@task(base=LogErrorsTask)
def some task():
return result
class LogErrorsTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.save_failed_task(exc, task_id, args, kwargs, einfo)
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
def save_failed_task(self, exc, task_id, args, kwargs, traceback):
"""
:type exc: Exception
"""
task = FailedTask()
task.celery_task_id = task_id
task.full_name = self.name
task.name = self.name.split('.')[-1]
task.exception_class = exc.__class__.__name__
task.exception_msg = unicode(exc).strip()
task.traceback = unicode(traceback).strip()
task.updated_at = timezone.now()
if args:
task.args = json.dumps(list(args))
if kwargs:
task.kwargs = json.dumps(kwargs)
# Find if task with same args, name and exception already exists
# If it do, update failures count and last updated_at
#: :type: FailedTask
existing_task = FailedTask.objects.filter(
args=task.args,
kwargs=task.kwargs,
full_name=task.full_name,
exception_class=task.exception_class,
exception_msg=task.exception_msg,
)
if len(existing_task):
existing_task = existing_task[0]
existing_task.failures += 1
existing_task.updated_at = task.updated_at
existing_task.save(force_update=True,
update_fields=('updated_at', 'failures'))
else:
task.save(force_insert=True)
@AlanCoding
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment