I'm working on a software upgrade system using celery. I have a use case that I'm struggling to implement cleanly. Here are my jobs:
device_software_updates(device_id)
returns a list of software updates that need to be installed on a device
installed_device_software(device_id)
returns the software modules that are currently installed on a device
latest_device_software(device_id)
returns the latest software versions available for a device
software_updates(installed_software, latest_software)
returns the latest software modules that are not installed
In pure python, the implementation of device_software_updates might look like
def device_software_updates(device_id):
return software_updates(installed_device_software(device_id),
latest_device_software(device_id))
What is the cleanest way to implement this in Celery 3.0? I'd like to do something using groups. My current implementation looks like this:
def device_software_updates(device_id):
return (
group(installed_device_software.s(device_id),
latest_device_software.s(device_id)) |
software_updates.s()
)()
Unfortunately, this means that the argspec of software_updates is software_updates(arg_list)
which is not ideal.
I believe that using a chord would be the right way to handle this.
According the the Celery documentation at http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups,
A chord is a task that only executes after all of the tasks in a taskset have finished executing.
...
A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.
Here is a broken down, line by line example (from the Celery documentation)
callback = tsum.subtask()
header = [add.subtask((i, i)) for i in xrange(100)]
result = chord(header)(callback)
result.get()
In your case, you could do something similar, like:
@celery.task
def device_software_updates():
callback = software_updates.subtask()
header = [
installed_device_software.subtask(device_id),
latest_device_software.s(device_id)
]
result = chord(header)(callback)
return result.get()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With