Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fail the chain if it's sub task gives an exception

I have faced a pretty strange issue with celery:

There is a chain of tasks, and one of them gives an exception and does several retries

chain = (err.si(1) | err.si(2))
result = chain.apply_async()
result.state
result.get()

here is the code of the task:

@celery.task(base=MyTask)
def err(x):
try:
    if x < 3:
        raise Exception
    else:
        return x+1

except Exception as exp:
    print "retrying"
    raise err.retry(args=[x],exc=exp,countdown=5,max_retries=3)

The thing is that although the task in chain gives an exception, but the result.state keeps being 'PENDING' and .get() just freezes.

I have tried to fail the task in case it reaches maximum retries value:

class MyTask(celery.Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.max_retries == self.request.retries:
        self.state = states.FAILURE

But although executed separately task is getting marked as FAILED, executing in chain gives same result - PENDING & Freezed get.

I expected that the chain will get failed once any of it's tasks will get failed and .get of the result should produce the exception thrown from the task.

_UPDATE_ Stack trace given by apply_async with ALWAYS_EAGER=True

result = chain.apply_async()

Exception                                 
Traceback (most recent call last)
<ipython-input-4-81202b369b5f> in <module>()
----> 1 result = chain.apply_async()

lib/python2.7/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs,     **options)
    147         # For callbacks: extra args are prepended to the stored args.
    148         args, kwargs, options = self._merge(args, kwargs, options)
--> 149         return self.type.apply_async(args, kwargs, **options)
    150 
    151     def append_to_list_option(self, key, value):

/lib/python2.7/site-packages/celery/app/builtins.pyc in apply_async(self, args, kwargs, group_id, chord, task_id, **options)
    232                 task_id=None, **options):
    233             if self.app.conf.CELERY_ALWAYS_EAGER:
--> 234                 return self.apply(args, kwargs, **options)
    235             options.pop('publisher', None)
    236             tasks, results = self.prepare_steps(args, kwargs['tasks'])

lib/python2.7/site-packages/celery/app/builtins.pyc in apply(self, args, kwargs, subtask, **options)
    249             last, fargs = None, args  # fargs passed to first task only
    250             for task in kwargs['tasks']:
--> 251                 res = subtask(task).clone(fargs).apply(last and (last.get(), ))
    252                 res.parent, last, fargs = last, res, None
    253             return last

lib/python2.7/site-packages/celery/result.pyc in get(self, timeout, propagate, **kwargs)
    677         elif self.state in states.PROPAGATE_STATES:
    678             if propagate:
--> 679                 raise self.result
    680             return self.result
    681     wait = get

Exception: 
like image 755
Alex Luberg Avatar asked Sep 30 '12 11:09

Alex Luberg


1 Answers

When you have a chain:

>>> c = a.s() | b.s() | c.s()
>>> res = c()
>>> res.get()

Calling the chain will generate unique id's for all of the task in the chain, send the messages and return the last result in the chain.

So when you do res.get() you are simple trying to retrieve the result of the last task in the chain.

It will also decorate the results with parent attributes, which you can traverse to get the progress of the chain:

>>> res                # result of c.s()
>>> res.parent         # result of b.s()
>>> res.parent.parent  # result of a.s()

If you want to check for errors along the way you can do:

def nodes(node):
    while node.parent:
        yield node
        node = node.parent
    yield node


values = [node.get(timeout=1) for node in reversed(list(nodes(res)))]
value = values[-1]
like image 157
asksol Avatar answered Oct 19 '22 07:10

asksol