I am writing an application which will execute a group of several synchronous chains of tasks asynchronously.
In other words, I might have the pipeline foo(a,b,c) -> boo(a,b,c)
for some list of bs
.
My understanding is to create a chain of foo(a,b,c) | boo(a,b,c)
for each b in this list. These chains will then form a celery group, which can be applied asynchronously.
My code to do this is below:
my_app.py
#!/usr/bin/env python3
import functools
import time
from celery import chain, group, Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def foo(a, b, c):
logger.info("foo from {0}!".format(b))
return b
@app.task
def boo(a, b, c):
logger.info("boo from {0}!".format(b))
return b
def break_up_tasks(tasks):
try:
first_task, *remaining_tasks = tasks
except ValueError as e:
first_task, remaining_tasks = [], []
return first_task, remaining_tasks
def do_tasks(a, bs, c, opts):
tasks = [foo, boo]
# There should be an option for each task
if len(opts) != len(tasks):
raise ValueError("There should be {0} provided options".format(len(tasks)))
# Create a list of tasks that should be included per the list of options' boolean values
tasks = [task for opt, task in zip(opts, tasks) if opt]
first_task, remaining_tasks = break_up_tasks(tasks)
# If there are no tasks, we're done.
if not first_task: return
chains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(c),
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
g = group(*chains)
res = g.apply_async(args=(a,), queue="default")
print("Applied async... waiting for termination.")
total_tasks = len(tasks)
while not res.ready():
print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks))
time.sleep(1)
if __name__ == "__main__":
a = "whatever"
bs = ["hello", "world"]
c = "baz"
opts = [
# do "foo"
True,
# do "boo"
True
]
do_tasks(a, bs, c, opts)
Running celery
celery worker -A my_app -l info -c 5 -Q default
What I'm finding, though, is that when I run the above, my server client runs an infinite loop because boo
is missing an argument:
TypeError: boo() missing 1 required positional argument: 'c'
My understanding is that apply_async
will provide the args
kwarg to each chain and that previous links in the chain will provide their return value to subsequent links.
Why is boo
not receiving the arguments properly? I'm sure these tasks aren't well-written as this is my first foray into Celery. If you have other suggestions, I'm happy to entertain them.
After debugging your code ( I'm new to Celery too! :) ) I've learned that each chained function will get the first argument replaced by the result of the previous chained function call - so with that said I believe the solution to your problem is to add one missing argument ( 2nd one ) to the y.s in the reduce:
chains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(b,c), # <- here is the 'new guy'
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
Hope it helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With