Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python+Celery: Chaining jobs?

Tags:

python

celery

The Celery documentation suggests that it's a bad idea to have tasks wait on the results of other tasks… But the suggested solution (see “good” heading) leaves a something to be desired. Specifically, there's no clear way of getting the subtask's result back to the caller (also, it's kind of ugly).

So, is there any way of “chaining” jobs, so the caller gets the result of the final job? Eg, to use the add example:

>>> add3 = add.subtask(args=(3, )) >>> add.delay(1, 2, callback=add3).get() 6 

Alternately, is it OK to return instances of Result? For example:

@task def add(x, y, callback=None):     result = x + y     if callback:         return subtask(callback).delay(result)     return result 

This would let the result of the “final” job in the chain could be retrived with a simple:

result = add(1, 2, callback=add3).delay() while isinstance(result, Result):     result = result.get() print "result:", result 
like image 773
David Wolever Avatar asked Oct 10 '10 16:10

David Wolever


People also ask

What do you use Python Celery for?

It allows you to offload work from your Python app. Once you integrate Celery into your app, you can send time-intensive tasks to Celery's task queue. That way, your web app can continue to respond quickly to users while Celery completes expensive operations asynchronously in the background.

What is a Celery chain?

Celery chains allow you to modularise your application and reuse common Celery tasks.

What is Shared_task in Celery?

The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.


1 Answers

You can do it with a celery chain. See https://celery.readthedocs.org/en/latest/userguide/canvas.html#chains

@task() def add(a, b):     time.sleep(5) # simulate long time processing     return a + b 

Chaining job:

# import chain from celery import chain # the result of the first add job will be  # the first argument of the second add job ret = chain(add.s(1, 2), add.s(3)).apply_async()  # another way to express a chain using pipes ret2 = (add.s(1, 2) | add.s(3)).apply_async()  ...  # check ret status to get result if ret.status == u'SUCCESS':     print "result:", ret.get() 
like image 73
ax003d Avatar answered Sep 23 '22 22:09

ax003d