Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Callback for celery apply_async

Tags:

python

celery

I use celery in my application to run periodic tasks. Let's see simple example below

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

As you can see in the example above, i use celery to run async task, but (since it's a queue) i need to do queue.fail(uid) in case of exception in do_stuff or queue.ack(uid) otherwise. In this situation it would be very clear and usefull to have some callback from my task in both cases - on_failure and on_success.

I saw some documentation, but never seen practices of using callbacks with apply_async. Is it possible to do that?

like image 334
gakhov Avatar asked Sep 21 '12 08:09

gakhov


People also ask

What is Apply_async in Celery?

This way, you delegate queue creation to Celery. You can use apply_async with any queue and Celery will handle it, provided your task is aware of the queue used by apply_async . If none is provided then the worker will listen only for the default queue.

How do you call celery task?

If the task isn't registered in the current process you can use send_task() to call the task by name instead. So delay is clearly convenient, but if you want to set additional execution options you have to use apply_async .

How do you call Celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.


2 Answers

Subclass the Task class and overload the on_success and on_failure functions:

from celery import Task


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        '''
        retval – The return value of the task.
        task_id – Unique id of the executed task.
        args – Original arguments for the executed task.
        kwargs – Original keyword arguments for the executed task.
        '''
        pass
        
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        '''
        exc – The exception raised by the task.
        task_id – Unique id of the failed task.
        args – Original arguments for the task that failed.
        kwargs – Original keyword arguments for the task that failed.
        '''
        pass

Use:

@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y
like image 146
Douwe van der Meij Avatar answered Oct 25 '22 04:10

Douwe van der Meij


You can specify success and error callbacks via the link and link_err kwargs when you call apply_async. The celery docs include a clear example: http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

like image 8
sussudio Avatar answered Oct 25 '22 04:10

sussudio