Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: How to ignore task result in chord or chain?

I'm using celery, I have several tasks which needed to be executed in order.

For example I have this task:

@celery.task def tprint(word):     print word 

And I want to do something like this:

>>> chain(tprint.s('a') | tprint.s('b'))() 

Then I get TypeError: tprint() takes exactly 1 argument (2 given).

The same with chord, in this situation which I need a task to be executed after a group of tasks:

>>> chord([tprint.s('a'), tprint.s('b')])(tprint.s('c')) 

So how to deal with this situation? I don't care the result of each task, but they need to be executed in order.


Add a second parameter won't work:

@celery.task def tprint(word, ignore=None):     print word  >>> chain(tprint.s('a', 0) | tprint.s('b'))() 

This will print out 'a' and 'None'.

like image 956
lxyu Avatar asked Nov 29 '12 15:11

lxyu


People also ask

How do you stop the execution of a celery task?

If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.

What is a celery chord?

Celery chords are one of the six Celery workflow primitives. A Celery workflow defines the order in which individual Celery tasks are executed asynchronously. A chord consists of a Celery group (called chord header) and a callback. A group is a list of tasks that are executed in parallel.

WHAT IS group in celery?

Celery documentation says: Groups are used to execute tasks in parallel. The group function takes in a list of signatures. Example code : >>> from celery import group >>> from tasks import add >>> job = group([ ...

How do you retry celery task?

retry method to make it work. By setting the countdown argument to 5, the task will retry after a 5 second delay.


2 Answers

There is a built-in functionality to ignore result in chaining and others - immutable subtask. You can use .si() shortcut instead of .s() or .subtask(immutable=True)

More details here: http://docs.celeryproject.org/en/master/userguide/canvas.html#immutability

like image 182
Vlad Frolov Avatar answered Oct 06 '22 04:10

Vlad Frolov


One possible solution has already posted, but I'd like to add further clarification and an alternate solution (and in some cases a superior one).

The error you're seeing, which indicates that your task's signature needs to take into account a second parameter, is due to the fact that when calling tasks in a chain, Celery automatically pushes each tasks result as the first parameter of the following task.

From the docs:

Tasks can be linked together, which in practice means adding a callback task:

>>> res = add.apply_async((2, 2), link=mul.s(16)) >>> res.get() 4 

The linked task will be applied with the result of its parent task as the first argument

Therefore, in your case, you could rewrite your task like this:

@celery.task def tprint(result, word):     print word 

If you're not going to do anything with the result, you may as well ignore it, by changing the decorator thus:

@celery.task(ignore_result=True) 

And then you won't have to change your task signature.

Sorry, that last point needs further research.

like image 40
ygesher Avatar answered Oct 06 '22 03:10

ygesher