I'm using celery, I have several tasks which needed to be executed in order.
For example I have this task:
@celery.task def tprint(word): print word
And I want to do something like this:
>>> chain(tprint.s('a') | tprint.s('b'))()
Then I get TypeError: tprint() takes exactly 1 argument (2 given)
.
The same with chord, in this situation which I need a task to be executed after a group of tasks:
>>> chord([tprint.s('a'), tprint.s('b')])(tprint.s('c'))
So how to deal with this situation? I don't care the result of each task, but they need to be executed in order.
Add a second parameter won't work:
@celery.task def tprint(word, ignore=None): print word >>> chain(tprint.s('a', 0) | tprint.s('b'))()
This will print out 'a' and 'None'.
If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.
Celery chords are one of the six Celery workflow primitives. A Celery workflow defines the order in which individual Celery tasks are executed asynchronously. A chord consists of a Celery group (called chord header) and a callback. A group is a list of tasks that are executed in parallel.
Celery documentation says: Groups are used to execute tasks in parallel. The group function takes in a list of signatures. Example code : >>> from celery import group >>> from tasks import add >>> job = group([ ...
retry method to make it work. By setting the countdown argument to 5, the task will retry after a 5 second delay.
There is a built-in functionality to ignore result in chaining and others - immutable subtask. You can use .si() shortcut instead of .s() or .subtask(immutable=True)
More details here: http://docs.celeryproject.org/en/master/userguide/canvas.html#immutability
One possible solution has already posted, but I'd like to add further clarification and an alternate solution (and in some cases a superior one).
The error you're seeing, which indicates that your task's signature needs to take into account a second parameter, is due to the fact that when calling tasks in a chain
, Celery automatically pushes each tasks result
as the first parameter of the following task.
From the docs:
Tasks can be linked together, which in practice means adding a callback task:
>>> res = add.apply_async((2, 2), link=mul.s(16)) >>> res.get() 4
The linked task will be applied with the result of its parent task as the first argument
Therefore, in your case, you could rewrite your task like this:
@celery.task def tprint(result, word): print word
If you're not going to do anything with the result, you may as well ignore it, by changing the decorator thus:
@celery.task(ignore_result=True)
And then you won't have to change your task signature.
Sorry, that last point needs further research.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With