Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: clean way of revoking the entire chain from within a task

My question is probably pretty basic but still I can't get a solution in the official doc. I have defined a Celery chain inside my Django application, performing a set of tasks dependent from eanch other:

chain(  tasks.apply_fetching_decision.s(x, y),
        tasks.retrieve_public_info.s(z, x, y),
        tasks.public_adapter.s())()

Obviously the second and the third tasks need the output of the parent, that's why I used a chain.

Now the question: I need to programmatically revoke the 2nd and the 3rd tasks if a test condition in the 1st task fails. How to do it in a clean way? I know I can revoke the tasks of a chain from within the method where I have defined the chain (see thisquestion and this doc) but inside the first task I have no visibility of subsequent tasks nor of the chain itself.

Temporary solution

My current solution is to skip the computation inside the subsequent tasks based on result of the previous task:

@shared_task
def retrieve_public_info(result, x, y):
   if not result:
      return []
   ...

@shared_task
def public_adapter(result, z, x, y):
   for r in result:
       ...

But this "workaround" has some flaw:

  • Adds unnecessary logic to each task (based on predecessor's result), compromising reuse
  • Still executes the subsequent tasks, with all the resulting overhead

I haven't played too much with passing references of the chain to tasks for fear of messing up things. I admit also I haven't tried Exception-throwing approach, because I think that the choice of not proceeding through the chain can be a functional (thus non exceptional) scenario...

Thanks for helping!

like image 296
Alex Gidan Avatar asked May 21 '14 21:05

Alex Gidan


People also ask

How do I terminate a celery task?

If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.

What is celery asynchronous?

Celery is a framework for performing asynchronous tasks in your application. Celery is written in Python and makes it very easy to offload work out of the synchronous request lifecycle of a web app onto a pool of task workers to perform jobs asynchronously.

What happens when a celery task fails?

Celery will stop retrying after 7 failed attempts and raise an exception.

How does celery execute tasks?

Celery workers are worker processes that run tasks independently from one another and outside the context of your main service. Celery beat is a scheduler that orchestrates when to run tasks. You can use it to schedule periodic tasks as well.


2 Answers

I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.

For completeness I post the basic code snapshot:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....

Update

I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called revoke_chain_authority, so that it can revoke automatically the chain without rewriting the code I previously described.

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

This decorator can be used on a shared task as follows:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

Please note the use of @wraps. It is necessary to preserve the signature of the original function, otherwise this latter will be lost and celery will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)

like image 109
Alex Gidan Avatar answered Oct 08 '22 21:10

Alex Gidan


As of Celery 4.0, what I found to be working is to remove the remaining tasks from the current task instance's request using the statement:

self.request.chain = None

Let's say you have a chain of tasks a.s() | b.s() | c.s(). You can only access the self variable inside a task if you bind the task by passing bind=True as argument to the tasks' decorator.

@app.task(name='main.a', bind=True):
def a(self):
  if something_happened:
    self.request.chain = None

If something_happened is truthy, b and c wouldn't be executed.

like image 37
Bruno Henrique Avatar answered Oct 08 '22 20:10

Bruno Henrique