My question is probably pretty basic but still I can't get a solution in the official doc. I have defined a Celery chain inside my Django application, performing a set of tasks dependent from eanch other:
chain( tasks.apply_fetching_decision.s(x, y),
tasks.retrieve_public_info.s(z, x, y),
tasks.public_adapter.s())()
Obviously the second and the third tasks need the output of the parent, that's why I used a chain.
Now the question: I need to programmatically revoke the 2nd and the 3rd tasks if a test condition in the 1st task fails. How to do it in a clean way? I know I can revoke the tasks of a chain from within the method where I have defined the chain (see thisquestion and this doc) but inside the first task I have no visibility of subsequent tasks nor of the chain itself.
My current solution is to skip the computation inside the subsequent tasks based on result of the previous task:
@shared_task
def retrieve_public_info(result, x, y):
if not result:
return []
...
@shared_task
def public_adapter(result, z, x, y):
for r in result:
...
But this "workaround" has some flaw:
I haven't played too much with passing references of the chain to tasks for fear of messing up things. I admit also I haven't tried Exception-throwing approach, because I think that the choice of not proceeding through the chain can be a functional (thus non exceptional) scenario...
Thanks for helping!
If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.
Celery is a framework for performing asynchronous tasks in your application. Celery is written in Python and makes it very easy to offload work out of the synchronous request lifecycle of a web app onto a pool of task workers to perform jobs asynchronously.
Celery will stop retrying after 7 failed attempts and raise an exception.
Celery workers are worker processes that run tasks independently from one another and outside the context of your main service. Celery beat is a scheduler that orchestrates when to run tasks. You can use it to schedule periodic tasks as well.
I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.
For completeness I post the basic code snapshot:
@app.task(bind=True) # Note that we need bind=True for self to work
def task1(self, other_args):
#do_stuff
if end_chain:
self.request.callbacks[:] = []
....
I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called revoke_chain_authority
, so that it can revoke automatically the chain without rewriting the code I previously described.
from functools import wraps
class RevokeChainRequested(Exception):
def __init__(self, return_value):
Exception.__init__(self, "")
# Now for your custom code...
self.return_value = return_value
def revoke_chain_authority(a_shared_task):
"""
@see: https://gist.github.com/bloudermilk/2173940
@param a_shared_task: a @shared_task(bind=True) celery function.
@return:
"""
@wraps(a_shared_task)
def inner(self, *args, **kwargs):
try:
return a_shared_task(self, *args, **kwargs)
except RevokeChainRequested, e:
# Drop subsequent tasks in chain (if not EAGER mode)
if self.request.callbacks:
self.request.callbacks[:] = []
return e.return_value
return inner
This decorator can be used on a shared task
as follows:
@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
#...
if condition:
raise RevokeChainRequested(False)
Please note the use of @wraps
. It is necessary to preserve the signature of the original function, otherwise this latter will be lost and celery
will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)
As of Celery 4.0, what I found to be working is to remove the remaining tasks from the current task instance's request using the statement:
self.request.chain = None
Let's say you have a chain of tasks a.s() | b.s() | c.s()
. You can only access the self
variable inside a task if you bind the task by passing bind=True
as argument to the tasks' decorator.
@app.task(name='main.a', bind=True):
def a(self):
if something_happened:
self.request.chain = None
If something_happened
is truthy, b
and c
wouldn't be executed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With