Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Running a task after all tasks have been completed

I'm writing an application which needs to run a series of tasks in parallel and then a single task with the results of all the tasks run:

@celery.task
def power(value, expo):
    return value ** expo

@celery.task
def amass(values):
    print str(values)

It's a very contrived and oversimplified example, but hopefully the point comes across well. Basically, I have many items which need to run through power, but I only want to run amass on the results from all of the tasks. All of this should happen asynchronously, and I don't need anything back from the amass method.

Does anyone know how to set this up in celery so that everything is executed asynchronously and a single callback with a list of the results is called after all is said and done?

I've setup this example to run with a chord as Alexander Afanasiev recommended:

from time import sleep

import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

Unfortunately, in the above example, all tasks in tasks are started only when the chord method is called. Is there a way that each task can start separately and then I could add a callback to the group to run when everything has finished?

like image 851
Naftuli Kay Avatar asked Apr 30 '13 20:04

Naftuli Kay


People also ask

When to use task WhenAll?

The Task. WhenAll method is used to create a task that will complete if and only if all the other tasks have completed.

Does task Whenany start the tasks?

Task. WhenAll doesn't start tasks. The general pattern is that methods that return a Task , return a hot Task .

What is task when all?

WhenAll(IEnumerable<Task>) Creates a task that will complete when all of the Task objects in an enumerable collection have completed.

What does task WhenAll return?

public static Task WhenAll (params Task[] tasks); Task. WhenAll creates a task that will complete when all of the supplied tasks have been completed. It's pretty straightforward what this method does, it simply receives a list of Tasks and returns a Task when all of the received Tasks completes.


2 Answers

Here's a solution which worked for my purposes:

tasks.py:

from time import sleep

import random

@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo

@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())

    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))

    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results

Use Case:

tasks = []

for i in xrange(10):
    tasks.append(power.delay(i, 2))

amass.delay([], tasks)

What this should do is start all of the tasks as soon as possible asynchronously. Once they've all been posted to the queue, the amass task will also be posted to the queue. The amass task will keep reposting itself until all of the other tasks have been completed.

like image 76
Naftuli Kay Avatar answered Oct 24 '22 23:10

Naftuli Kay


Celery has plenty of tools for most of workflows you can imagine.

It seems you need to get use of chord. Here's a quote from docs:

A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

like image 41
alecxe Avatar answered Oct 25 '22 01:10

alecxe