Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can you catch a custom exception from Celery worker, or stop it being prefixed with `celery.backends.base`?

My Celery task raises a custom exception NonTransientProcessingError, which is then caught by AsyncResult.get(). Tasks.py:

class NonTransientProcessingError(Exception):
    pass

@shared_task()
def throw_exception():
    raise NonTransientProcessingError('Error raised by POC model for test purposes')

In the Python console:

from my_app.tasks import *
r = throw_exception.apply_async()
try:
    r.get()
except NonTransientProcessingError as e:
    print('caught NonTrans in type specific except clause')

But my custom exception is my_app.tasks.NonTransientProcessingError, whereas the exception raised by AsyncResult.get() is celery.backends.base.NonTransientProcessingError, so my except clause fails.

Traceback (most recent call last):
  File "<input>", line 4, in <module>
  File "/...venv/lib/python3.5/site-packages/celery/result.py", line 175, in get
    raise meta['result']
celery.backends.base.NonTransientProcessingError: Error raised by POC model for test purposes

If I catch the exception within the task, it works fine. It is only when the exception is raised to the .get() call that it is renamed.

How can I raise a custom exception and catch it correctly?

I have confirmed that the same happens when I define a Task class and raise the custom exception in its on_failure method. The following does work:

try:
    r.get()
except Exception as e:
    if type(e).__name__ == 'NonTransientProcessingError':
        print('specific exception identified')
    else:
        print('caught generic but not identified')

Outputs:

specific exception identified

But this can't be the best way of doing this? Ideally I'd like to catch exception superclasses for categories of behaviour.

I'm using Django 1.8.6, Python 3.5 and Celery 3.1.18, with a Redis 3.1.18, Python redis lib 2.10.3 backend.

like image 802
Chris Avatar asked Jan 31 '16 13:01

Chris


People also ask

How do you call celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

Where are celery logs stored?

Default is /var/log/celery/%n%I.


1 Answers

import celery
from celery import shared_task


class NonTransientProcessingError(Exception):
    pass


class CeleryTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if isinstance(exc, NonTransientProcessingError):
            """
            deal with NonTransientProcessingError
            """
            pass

    def run(self, *args, **kwargs):
        pass


@shared_task(base=CeleryTask)
def add(x, y):
    raise NonTransientProcessingError

Use a base Task with on_failure callback to catch custom exception.

like image 103
gushitong Avatar answered Nov 05 '22 20:11

gushitong