My Celery task raises a custom exception NonTransientProcessingError
, which is then caught by AsyncResult.get()
. Tasks.py:
class NonTransientProcessingError(Exception):
pass
@shared_task()
def throw_exception():
raise NonTransientProcessingError('Error raised by POC model for test purposes')
In the Python console:
from my_app.tasks import *
r = throw_exception.apply_async()
try:
r.get()
except NonTransientProcessingError as e:
print('caught NonTrans in type specific except clause')
But my custom exception is my_app.tasks.
NonTransientProcessingError
, whereas the exception raised by AsyncResult.get()
is celery.backends.base.
NonTransientProcessingError
, so my except
clause fails.
Traceback (most recent call last):
File "<input>", line 4, in <module>
File "/...venv/lib/python3.5/site-packages/celery/result.py", line 175, in get
raise meta['result']
celery.backends.base.NonTransientProcessingError: Error raised by POC model for test purposes
If I catch the exception within the task, it works fine. It is only when the exception is raised to the .get()
call that it is renamed.
How can I raise a custom exception and catch it correctly?
I have confirmed that the same happens when I define a Task
class and raise the custom exception in its on_failure
method. The following does work:
try:
r.get()
except Exception as e:
if type(e).__name__ == 'NonTransientProcessingError':
print('specific exception identified')
else:
print('caught generic but not identified')
Outputs:
specific exception identified
But this can't be the best way of doing this? Ideally I'd like to catch exception superclasses for categories of behaviour.
I'm using Django 1.8.6, Python 3.5 and Celery 3.1.18, with a Redis 3.1.18, Python redis lib 2.10.3 backend.
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
Default is /var/log/celery/%n%I.
import celery
from celery import shared_task
class NonTransientProcessingError(Exception):
pass
class CeleryTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, NonTransientProcessingError):
"""
deal with NonTransientProcessingError
"""
pass
def run(self, *args, **kwargs):
pass
@shared_task(base=CeleryTask)
def add(x, y):
raise NonTransientProcessingError
Use a base Task with on_failure callback to catch custom exception.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With