I use celery
in my application to run periodic tasks. Let's see simple example below
from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
queue = Queue()
uid, questions = queue.pop()
if uid is None:
return
job = group(do_stuff(q) for q in questions)
job.apply_async()
def do_stuff(question):
try:
...
except:
...
raise
As you can see in the example above, i use celery
to run async task, but (since it's a queue) i need to do queue.fail(uid)
in case of exception in do_stuff
or queue.ack(uid)
otherwise. In this situation it would be very clear and usefull to have some callback from my task in both cases - on_failure
and on_success
.
I saw some documentation, but never seen practices of using callbacks with apply_async
. Is it possible to do that?
This way, you delegate queue creation to Celery. You can use apply_async with any queue and Celery will handle it, provided your task is aware of the queue used by apply_async . If none is provided then the worker will listen only for the default queue.
If the task isn't registered in the current process you can use send_task() to call the task by name instead. So delay is clearly convenient, but if you want to set additional execution options you have to use apply_async .
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
Subclass the Task class and overload the on_success and on_failure functions:
from celery import Task
class CallbackTask(Task):
def on_success(self, retval, task_id, args, kwargs):
'''
retval – The return value of the task.
task_id – Unique id of the executed task.
args – Original arguments for the executed task.
kwargs – Original keyword arguments for the executed task.
'''
pass
def on_failure(self, exc, task_id, args, kwargs, einfo):
'''
exc – The exception raised by the task.
task_id – Unique id of the failed task.
args – Original arguments for the task that failed.
kwargs – Original keyword arguments for the task that failed.
'''
pass
Use:
@celery.task(base=CallbackTask) # this does the trick
def add(x, y):
return x + y
You can specify success and error callbacks via the link and link_err kwargs when you call apply_async. The celery docs include a clear example: http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With