I would like to have Celery tasks that depend on the result of 2 or more other tasks. I have looked into Python+Celery: Chaining jobs? and http://pypi.python.org/pypi/celery-tasktree , but those are good only if tasks have just one dependent task.
I know about TaskSet, but there does not seem to be a way to instantly execute a callback when TaskSetResult.ready() becomes True. What I have in mind right now is to have a periodic task that polls TaskSetResult.ready() every few [milli]seconds or so and fire the callback as it returns True, but that sounds rather inelegant to me.
Any suggestions?
mrbox is true, you can retry until the results are ready, but is not so clear in the docs that when you retry you have to pass the setid and the subtasks elements, and for recovery it you have to use the map function, below there is a sample code for explain what I mean.
def run(self, setid=None, subtasks=None, **kwargs):
if not setid or not subtasks:
#Is the first time that I launch this task, I'm going to launch the subtasks
…
tasks = []
for slice in slices:
tasks.append(uploadTrackSlice.subtask((slice,folder_name)))
job = TaskSet(tasks=tasks)
task_set_result = job.apply_async()
setid = task_set_result.taskset_id
subtasks = [result.task_id for result in task_set_result.subtasks]
self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])
#Is a retry than we just have to check the results
tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))
if not tasks_result.ready():
self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])
else:
if tasks_result.successful():
return tasks_result.join()
else:
raise Exception("Some of the tasks was failing")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With