Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you ensure a Celery chord callback gets called with failed subtasks?

Tags:

python

celery

I am using a Chord in Celery to have a callback that gets called when a Group of parallel tasks finish executing. Specifically, I have a group of functions that wrap calls to an external API. I want to wait for all of these to return before I process the results and update my database in the Chord callback. I would like the callback to execute when all of the API calls have finished, regardless of their status.

My problem is that the callback function only gets called if none of the group's subtasks raise an exception. If, however, one subtask raises an exception then an optional error handler on_error() gets called with a string representation of the task_id of the chord. The remaining tasks in the group do continue execution but the callback is never called.

I'll illustrate this with an example below:

@app.task
def maybe_succeed():
  divisor = randint(0, 10)
  return 1 / divisor

@app.task
def master_task():
 g = group([maybe_succeed.s() for i in range(100)])
 c = g | chord_callback.s()
 return c.delay()

@app.task
def chord_callback(results):
  print 'Made it here!'

In the above example, calling master_task() will run all of the tasks in the group, however, the callback will never get called because one of the maybe_succeed() will fail (unless you're super lucky!).


Right now, I'm dealing with this problem by catching all exceptions in my equivalent of maybe_succeed() so that the chord will never fail. I guess this is a fine solution though it doesn't feel right.

So, my question is: Is there a way to have a Celery Chord callback execute regardless of the return status of its group's subtasks?

like image 497
Michael DiStefano Avatar asked Oct 18 '17 01:10

Michael DiStefano


1 Answers

You could try calling the original callback in the errback:

@celery.task
def plus(x, y):
    print(f'Running plus {x}, {y}')
    return x + y


@celery.task
def failure():
    print('Running failure')
    raise ValueError('BAD')


@celery.task
def callme(stuff):
    print('Callback')
    print(f'Callback arg: {stuff}')


@celery.task
def on_chord_error(task_id, extra_info):
    print('ON ERROR CALLBACK')
    print(f'Task ID: {task_id}')
    print(f'Extra info: {extra_info}')
    callme.delay(extra_info)


@celery.task
def chord_test():
    tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
    callback = callme.s().on_error(on_chord_error.s('extra info'))
    chord(tasks)(callback)

Which results in:

Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7]  ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
   File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
    ret = j(timeout=3.0, propagate=True)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
   interval=interval, no_ack=no_ack, on_interval=on_interval,
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
   self.maybe_throw(callback=callback)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
   self.throw(value, self._to_remote_traceback(tb))
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
    self.on_ready.throw(*args, **kwargs)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
    reraise(type(exc), exc, tb)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
    raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info
like image 139
user9538 Avatar answered Oct 27 '22 11:10

user9538