I use celery to launch task sets that look like this:
I aggregate results of these tasks into single answer, then do something with this answer --- like store to the database, save to special result file and so on. Basically after tasks done executing I have to call function that has following signature:
def callback(result_file_name, task_result_list):
#store in file
def callback(entity_key, task_result_list):
#store in db
For now step 1. is done in Celery queue and step 2 is done outside celery:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
This approach is cumbersome since I have to stop a single thread until all tasks are performed (which can take couple of hours).
I would like to somehow move step 2 to celery also --- esentially I would need to add a callback to entire taskset (as far as I know it is unsupported in Celery) or submit a task that is executed after all these subtasks.
Does anyone have idea how to do it? I use it in the django enviorment so I can store some state in the database.
I'cant use chords straight forwardly because chords enable me to create callbacks that look this way:
def callback(task_result_list):
#store in file
there is no obvious way to pass additional parameters to callback (especially because these callbacks can't be local functions).
I can store results using TaskSetMeta
but this entity has no status field --- so even if I would add a signal to TaskSetMeta i'd have to pool task results which could have siginificant overhead.
Well answer was really straightforward, and I can indeed use chords --- and additional parameters (like report file name and so on) must be passed as kwargs.
Here is chord task:
@task
def print_and_sum(to_sum, file_name):
print file_name
print sum(to_sum)
return file_name, sum(to_sum)
Here is how to instantiate it:
subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With