Last active
October 1, 2025 05:52
-
-
Save darklow/c70a8d1147f05be877c3 to your computer and use it in GitHub Desktop.
Revisions
-
darklow revised this gist
Dec 16, 2016 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,4 @@ from celery import Task from celery.task import task from my_app.models import FailedTask from django.db import models -
darklow revised this gist
Oct 2, 2014 . 1 changed file with 43 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,6 @@ from celery.task import task from my_app.models import FailedTask from django.db import models @task(base=LogErrorsTask) def some task(): @@ -47,3 +48,45 @@ def save_failed_task(self, exc, task_id, args, kwargs, traceback): update_fields=('updated_at', 'failures')) else: task.save(force_insert=True) class FailedTask(models.Model): created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(null=True, blank=True) name = models.CharField(max_length=125) full_name = models.TextField() args = models.TextField(null=True, blank=True) kwargs = models.TextField(null=True, blank=True) exception_class = models.TextField() exception_msg = models.TextField() traceback = models.TextField(null=True, blank=True) celery_task_id = models.CharField(max_length=36) failures = models.PositiveSmallIntegerField(default=1) class Meta: ordering = ('-updated_at',) def __unicode__(self): return '%s %s [%s]' % (self.name, self.args, self.exception_class) def retry_and_delete(self, inline=False): import importlib # Import real module and function mod_name, func_name = self.full_name.rsplit('.', 1) mod = importlib.import_module(mod_name) func = getattr(mod, func_name) args = json.loads(self.args) if self.args else () kwargs = json.loads(self.kwargs) if self.kwargs else {} if inline: try: res = func(*args, **kwargs) self.delete() return res except Exception as e: raise e self.delete() return func.delay(*args, **kwargs) -
darklow renamed this gist
Oct 2, 2014 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
darklow created this gist
Oct 2, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,49 @@ from celery.task import task from my_app.models import FailedTask @task(base=LogErrorsTask) def some task(): return result class LogErrorsTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): self.save_failed_task(exc, task_id, args, kwargs, einfo) super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) def save_failed_task(self, exc, task_id, args, kwargs, traceback): """ :type exc: Exception """ task = FailedTask() task.celery_task_id = task_id task.full_name = self.name task.name = self.name.split('.')[-1] task.exception_class = exc.__class__.__name__ task.exception_msg = unicode(exc).strip() task.traceback = unicode(traceback).strip() task.updated_at = timezone.now() if args: task.args = json.dumps(list(args)) if kwargs: task.kwargs = json.dumps(kwargs) # Find if task with same args, name and exception already exists # If it do, update failures count and last updated_at #: :type: FailedTask existing_task = FailedTask.objects.filter( args=task.args, kwargs=task.kwargs, full_name=task.full_name, exception_class=task.exception_class, exception_msg=task.exception_msg, ) if len(existing_task): existing_task = existing_task[0] existing_task.failures += 1 existing_task.updated_at = task.updated_at existing_task.save(force_update=True, update_fields=('updated_at', 'failures')) else: task.save(force_insert=True)