I have faced a pretty strange issue with celery:
There is a chain of tasks, and one of them gives an exception and does several retries
chain = (err.si(1) | err.si(2))
result = chain.apply_async()
result.state
result.get()
here is the code of the task:
@celery.task(base=MyTask)
def err(x):
try:
if x < 3:
raise Exception
else:
return x+1
except Exception as exp:
print "retrying"
raise err.retry(args=[x],exc=exp,countdown=5,max_retries=3)
The thing is that although the task in chain gives an exception, but the result.state keeps being 'PENDING' and .get() just freezes.
I have tried to fail the task in case it reaches maximum retries value:
class MyTask(celery.Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.max_retries == self.request.retries:
self.state = states.FAILURE
But although executed separately task is getting marked as FAILED, executing in chain gives same result - PENDING & Freezed get.
I expected that the chain will get failed once any of it's tasks will get failed and .get of the result should produce the exception thrown from the task.
_UPDATE_ Stack trace given by apply_async with ALWAYS_EAGER=True
result = chain.apply_async()
Exception
Traceback (most recent call last)
<ipython-input-4-81202b369b5f> in <module>()
----> 1 result = chain.apply_async()
lib/python2.7/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs, **options)
147 # For callbacks: extra args are prepended to the stored args.
148 args, kwargs, options = self._merge(args, kwargs, options)
--> 149 return self.type.apply_async(args, kwargs, **options)
150
151 def append_to_list_option(self, key, value):
/lib/python2.7/site-packages/celery/app/builtins.pyc in apply_async(self, args, kwargs, group_id, chord, task_id, **options)
232 task_id=None, **options):
233 if self.app.conf.CELERY_ALWAYS_EAGER:
--> 234 return self.apply(args, kwargs, **options)
235 options.pop('publisher', None)
236 tasks, results = self.prepare_steps(args, kwargs['tasks'])
lib/python2.7/site-packages/celery/app/builtins.pyc in apply(self, args, kwargs, subtask, **options)
249 last, fargs = None, args # fargs passed to first task only
250 for task in kwargs['tasks']:
--> 251 res = subtask(task).clone(fargs).apply(last and (last.get(), ))
252 res.parent, last, fargs = last, res, None
253 return last
lib/python2.7/site-packages/celery/result.pyc in get(self, timeout, propagate, **kwargs)
677 elif self.state in states.PROPAGATE_STATES:
678 if propagate:
--> 679 raise self.result
680 return self.result
681 wait = get
Exception:
When you have a chain:
>>> c = a.s() | b.s() | c.s()
>>> res = c()
>>> res.get()
Calling the chain will generate unique id's for all of the task in the chain, send the messages and return the last result in the chain.
So when you do res.get()
you are simple trying to retrieve the result of the last task in the chain.
It will also decorate the results with parent
attributes, which you can traverse to get the progress of the chain:
>>> res # result of c.s()
>>> res.parent # result of b.s()
>>> res.parent.parent # result of a.s()
If you want to check for errors along the way you can do:
def nodes(node):
while node.parent:
yield node
node = node.parent
yield node
values = [node.get(timeout=1) for node in reversed(list(nodes(res)))]
value = values[-1]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With