Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining job results in celery

I'm working on a software upgrade system using celery. I have a use case that I'm struggling to implement cleanly. Here are my jobs:

device_software_updates(device_id)

returns a list of software updates that need to be installed on a device

installed_device_software(device_id)

returns the software modules that are currently installed on a device

latest_device_software(device_id)

returns the latest software versions available for a device

software_updates(installed_software, latest_software)

returns the latest software modules that are not installed

In pure python, the implementation of device_software_updates might look like

def device_software_updates(device_id):
    return software_updates(installed_device_software(device_id),
                            latest_device_software(device_id))

What is the cleanest way to implement this in Celery 3.0? I'd like to do something using groups. My current implementation looks like this:

def device_software_updates(device_id):
    return (
        group(installed_device_software.s(device_id),
              latest_device_software.s(device_id)) |
        software_updates.s()
    )()

Unfortunately, this means that the argspec of software_updates is software_updates(arg_list) which is not ideal.

like image 297
jfocht Avatar asked Aug 23 '12 23:08

jfocht


1 Answers

I believe that using a chord would be the right way to handle this.

According the the Celery documentation at http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups,

A chord is a task that only executes after all of the tasks in a taskset have finished executing.

...

A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

Here is a broken down, line by line example (from the Celery documentation)

callback = tsum.subtask()
header = [add.subtask((i, i)) for i in xrange(100)]
result = chord(header)(callback)
result.get()

In your case, you could do something similar, like:

@celery.task
def device_software_updates():
    callback = software_updates.subtask()
    header = [
              installed_device_software.subtask(device_id), 
              latest_device_software.s(device_id) 
             ]
    result = chord(header)(callback)
    return result.get()
like image 100
Peter Kirby Avatar answered Oct 02 '22 22:10

Peter Kirby