I'm attempting to create a list of coroutines that can be passed to asyncio.gather()
However, I want to attach arguments to these coroutines as I append them to the list.
My current method shown below uses functools.partial. Unfortunately asyncio.gather doesn't accept a partial function, which makes sense.
What doesn't make sense to me is how to find a solution.
Example code:
async def test(arg1):
print(arg1)
statements = []
function = functools.partial(test, "hello world")
statements.append(function)
results = await asyncio.gather(*statements)
So how do I attach arguments to a function so that it can still be passed to asyncio.gather?
*EDIT
It seems I was being rather silly.
My solution was rather simple, don't use functools.partial and just append the coroutine straight to the list.
Code:
async def test(arg1):
print(arg1)
async def main():
statements = []
statements.append(test("hello_world"))
results = await asyncio.gather(*statements)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
If you need to run a sequence of async
function calls only once, you can simply store their immediately returned coroutine objects in a list, pass the unpacked list to asyncio.gather
and then await
to collect final return values from coroutines.
async def upper_cased(value: str) -> str:
await asyncio.sleep(1)
return value.upper()
coroutines = [
upper_cased("h"),
upper_cased("e"),
upper_cased("l"),
upper_cased("l"),
upper_cased("o"),
upper_cased(" "),
upper_cased("w"),
upper_cased("o"),
upper_cased("r"),
upper_cased("l"),
upper_cased("d"),
]
print("".join(await asyncio.gather(*coroutines)))
# prints HELLO WORLD
This works because unlike ordinary functions, calling an async
function (without awaiting) doesn't start executing it, it just returns an object that contains all the necessary details of the function call including argument values. This object can be used later to complete its execution in different ways, one of them is using await
.
If you need to run the same sequence of async
functions (with predetermined arguments) multiple times, simply storing their return values in a list and then passing the unpacked list to asyncio.gather
will not work.
This is because coroutine objects can only be awaited once. Awaiting them again when they have already returned values will raise a RuntimeError
.
async def random_cased(value: str) -> str:
await asyncio.sleep(1)
if random.randint(0, 1) == 1:
return value.upper()
return value.lower()
coroutines = [
random_cased("h"),
random_cased("e"),
random_cased("l"),
random_cased("l"),
random_cased("o"),
random_cased(" "),
random_cased("w"),
random_cased("o"),
random_cased("r"),
random_cased("l"),
random_cased("d"),
]
print("".join(await asyncio.gather(*coroutines)))
# prints HeLlO wOrLd (or any other random cased variation)
print("".join(await asyncio.gather(*coroutines)))
# raises RuntimeError (cannot reuse already awaited coroutine)
For this case, you can use functools.partial
objects to store both the async
functions and their argument values.
partial_funcs = [
functools.partial(random_cased, "h"),
functools.partial(random_cased, "e"),
functools.partial(random_cased, "l"),
functools.partial(random_cased, "l"),
functools.partial(random_cased, "o"),
functools.partial(random_cased, " "),
functools.partial(random_cased, "w"),
functools.partial(random_cased, "o"),
functools.partial(random_cased, "r"),
functools.partial(random_cased, "l"),
functools.partial(random_cased, "d"),
]
print("".join(await asyncio.gather(*[func() for func in partial_funcs])))
# prints HeLlO wOrLd (or any other random cased variation)
print("".join(await asyncio.gather(*[func() for func in partial_funcs])))
# prints HeLlO wOrLd (or any other random cased variation)
print("".join(await asyncio.gather(*[func() for func in partial_funcs])))
# prints HeLlO wOrLd (or any other random cased variation)
You can also use other containers instead of functools.partial
if you want. For example, a list for storing functions and another list for storing their arguments.
However, I want to attach arguments to these coroutines as I append them to the list.
You can simply call the coroutine functions with their requisite arguments. Unlike ordinary functions (but similar to generators), calling the coroutine function doesn't start executing it, it just returns an object that can be used to drive its execution later. This object already captures the invocation arguments and can be added to the list passed to asyncio.gather
.
For example:
statements = []
statements.append(test("hello world"))
# ...
results = await asyncio.gather(*statements)
Note: This will not work in case you need to gather results from the functions multiple times. See this answer for a solution to this special case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With