I want to accomplish something like this:
results = []
for i in range(N):
data = generate_data_slowly()
res = tasks.process_data.apply_async(data)
results.append(res)
celery.collect(results).then(tasks.combine_processed_data())
ie launch asynchronous tasks over a long period of time, then schedule a dependent task that will only be executed once all earlier tasks are complete.
I've looked at things like chain
and chord
, but it seems like they only work if you can construct your task graph completely upfront.
For anyone interested, I ended up using this snippet:
@app.task(bind=True, max_retries=None)
def wait_for(self, task_id_or_ids):
try:
ready = app.AsyncResult(task_id_or_ids).ready()
except TypeError:
ready = all(app.AsyncResult(task_id).ready()
for task_id in task_id_or_ids)
if not ready:
self.retry(countdown=2**self.request.retries)
And writing the workflow something like this:
task_ids = []
for i in range(N):
task = (generate_data_slowly.si(i) |
process_data.si(i)
)
task_id = task.delay().task_id
task_ids.append(task_id)
final_task = (wait_for(task_ids) |
combine_processed_data.si()
)
final_task.delay()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With