Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: Callback after task hierarchy

I'm using Celery from a webapp to start a task hierarchy.

Tasks

I'm using the following tasks:

  • task_a
  • task_b
  • task_c
  • notify_user

A Django view starts several task_a instances. Each of them does some processing and then starts several task_b instances. And each of those does some processing and then starts several task_c instances.

To visualize:

Tree

Goals

My goal is to execute all the tasks, and to run a callback function as soon as the entire hierarchy has finished. Additionally, I want to be able to pass data from the lowest tasks to the top level.

  1. The view should just "kick off" the tasks and then return.
  2. Each subtask depends on the parent task. The parent task does not depend directly on the child task. After a parent task has started all the child tasks, it can be stopped.
  3. Everything can be parallelized, as long as the parent task runs before the child task is started.
  4. After all the tasks have finished, the notify_user callback function should be called.
  5. The notify_user callback function needs access to data from the task_cs.

All the tasks should be non-blocking, so task_b should not wait for all the task_c subtasks to finish.

What would be the right way to achieve the goal stated above?

like image 935
Danilo Bargen Avatar asked May 24 '13 14:05

Danilo Bargen


2 Answers

The solution turned out to be the dynamic task feature provided in this pull request: https://github.com/celery/celery/pull/817. With this, each task can return a group of subtasks, which will then replace the original taks in the queue.

like image 175
Danilo Bargen Avatar answered Nov 09 '22 08:11

Danilo Bargen


Suppose you have these tasks:

celery = Celery(
    broker="amqp://test:test@localhost:5672/test"
)
celery.conf.update(
    CELERY_RESULT_BACKEND = "mongodb",
)


@celery.task
def task_a(result):
    print 'task_a:', result
    return result

@celery.task
def task_b(result):
    print 'task_b:', result
    return result

@celery.task
def task_c(result):
    print 'task_c:', result
    return result

@celery.task
def notify_user(result):
    print result
    return result

For a given input data (as you drawn it):

    tree = [
        [["C1", "C2", "C3"], ["C4", "C5"]], [["C6", "C7", "C8"], ["C9"]]
    ]

You can do:

    a_group = []
    for ia, a in enumerate(tree):
        print "A%s:" % ia
        b_group = []
        for ib, b in enumerate(a):
            print " - B%s:" % ib
            for c in b:
                print '   -', c

            c_group = group([task_c.s(c) for c in b])

            b_group.append(c_group | task_b.s())

        a_group.append(group(b_group) | task_a.s())

    final_task = group(a_group) | notify_user.s()

It's representation is (don't read it, it's ugly :)

[[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user()

And the data passed into notify_user would be:

[[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]]

Everything is run via callbacks (chords) so there are no tasks waiting for other tasks.

like image 20
ionelmc Avatar answered Nov 09 '22 09:11

ionelmc