I'm using Celery from a webapp to start a task hierarchy.
I'm using the following tasks:
task_a
task_b
task_c
notify_user
A Django view starts several task_a
instances. Each of them does some processing and then starts several task_b
instances. And each of those does some processing and then starts several task_c
instances.
To visualize:
My goal is to execute all the tasks, and to run a callback function as soon as the entire hierarchy has finished. Additionally, I want to be able to pass data from the lowest tasks to the top level.
notify_user
callback function should be called.notify_user
callback function needs access to data from the task_c
s.All the tasks should be non-blocking, so task_b
should not wait for all the task_c
subtasks to finish.
What would be the right way to achieve the goal stated above?
The solution turned out to be the dynamic task feature provided in this pull request: https://github.com/celery/celery/pull/817. With this, each task can return a group of subtasks, which will then replace the original taks in the queue.
Suppose you have these tasks:
celery = Celery(
broker="amqp://test:test@localhost:5672/test"
)
celery.conf.update(
CELERY_RESULT_BACKEND = "mongodb",
)
@celery.task
def task_a(result):
print 'task_a:', result
return result
@celery.task
def task_b(result):
print 'task_b:', result
return result
@celery.task
def task_c(result):
print 'task_c:', result
return result
@celery.task
def notify_user(result):
print result
return result
For a given input data (as you drawn it):
tree = [
[["C1", "C2", "C3"], ["C4", "C5"]], [["C6", "C7", "C8"], ["C9"]]
]
You can do:
a_group = []
for ia, a in enumerate(tree):
print "A%s:" % ia
b_group = []
for ib, b in enumerate(a):
print " - B%s:" % ib
for c in b:
print ' -', c
c_group = group([task_c.s(c) for c in b])
b_group.append(c_group | task_b.s())
a_group.append(group(b_group) | task_a.s())
final_task = group(a_group) | notify_user.s()
It's representation is (don't read it, it's ugly :)
[[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user()
And the data passed into notify_user would be:
[[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]]
Everything is run via callbacks (chords) so there are no tasks waiting for other tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With