Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: Chaining tasks with multiple arguments

Tags:

python

celery

The celery documentation tells me that if multiple tasks are chained together, the result of the first task will be the first argument of the next. My problem is, I can't get it to work when I have a task that returns multiple results.

Example:

@task()
def get_comments(url):
    #get the comments and the submission and return them as 2 objects
    return comments, submission

@task
def render_template(threadComments, submission):
    #render the objects into a html file
    #does not return anything

Now, if I call them in a chain like (get_comments(url) | render_template()).apply_asnc() python will throw an TypeError: render_template() takes exactly 2 arguments (0 given).

I can see that the results are not unwrapped and applied to the arguments. If I only call get_comments, I can do:

result = get_comments(url)
arg1, arg2 = result

and get both results.

like image 308
Lett1 Avatar asked Jan 18 '13 09:01

Lett1


People also ask

Does Celery run tasks in parallel?

Celery task canvas Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task.

How do you call Celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

How do you pass a Celery argument?

To pass arguments to task with apply_async() you need to wrap them in a list and then pass the list as first argument, I.e. apply_async([arg1, arg2, arg3]) . See the documentation for more details and examples. Use delay() as an alternative.

Is celery task ID unique?

This may help to find the task status in the UI, but Celery always needs a unique id for each task execution.


1 Answers

There are two mistakes here.

First, you don't have to call get_comments() and render_template(). Instead, you should use the .s() task method. Like:

( get_comments.s(url) | render_template.s()).apply_async()

In your case, you launch the function first, and then tries to join functions results to a chain.

Second, actually, you don't return "two results" from your first task. Instead, you return a tuple, containing both results, and this tuple is passed to the second task as the single object.

Therefore, you should rewrite your second task as

@task
def render_template(comments_and_submission):
   comments, submission = comments_and_submission

If you fix these, it should work.

like image 100
Roman Imankulov Avatar answered Oct 05 '22 01:10

Roman Imankulov