I would like to start a large number of HTTP requests and collect their results, once all of them have returned. Sending the requests in a non-blocking fashion is possible with asyncio
, but I have problems collecting their results.
I'm aware of solutions such as aiohttp that are made for this specific problem. But the HTTP requests are just an example, my question is how to use asyncio
correctly.
On the server-side, I have flask which answers every request to localhost/
with "Hello World!", but it waits 0.1 seconds before answering. In all my examples, I'm sending 10 requests. A synchronous code should take about 1 second, an asynchronous version could do it in 0.1 seconds.
On the client-side I want to spin up many requests at the same time and collect their results. I'm trying to do this in three different ways. Since asyncio needs an executor to work around blocking code, all of the approaches call loop.run_in_executor
.
This code is shared between them:
import requests
from time import perf_counter
import asyncio
loop = asyncio.get_event_loop()
async def request_async():
r = requests.get("http://127.0.0.1:5000/")
return r.text
def request_sync():
r = requests.get("http://127.0.0.1:5000/")
return r.text
Approach 1:
Use asyncio.gather()
on a list of tasks and then run_until_complete
. After reading Asyncio.gather vs asyncio.wait, it seemed like gather would wait on the results. But it doesn't. So this code returns instantly, without waiting for the requests to finish.
If I use a blocking function here, this works. Why can't I use an async function ?
# approach 1
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003
# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.112
Python even warns me that coroutine "request_async"
was never awaited.
At this point, I have a working solution: Using a normal (not async) function in an executor. But I would like to have a solution that works with async
function definitions. Because I would like to use await
inside them (in this simple example that is not necessary, but if I move more code to asyncio
, I'm sure it will become important).
Approach 2:
Python warns me that my coroutines are never awaited. So let's await them. Approach 2 wraps all the code into an outer async function and awaits the result from the gathering. Same problem, also returns instantly (also same warning):
# approach 2
async def main():
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async))
gathered_tasks = asyncio.gather(*tasks)
return await gathered_tasks # <-------- here I'm waiting on the coroutine
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.0036
This really confused me. I'm waiting on the result of gather
. Intuitively that should be propagated to the coroutines that I'm gathering. But python still complains that my coroutine is never awaited.
I read some more and found: How could I use requests in asyncio?
This is pretty much exactly my example: Combining requests
and asyncio
. Which brings me to approach 3:
Approach 3:
Same structure as approach 2, but wait on each task that was given to run_in_executor()
individually (surely this counts as awaiting the coroutine):
# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():
tasks = []
for i in range(10):
task = loop.run_in_executor(None, request_async)
tasks.append(task)
responses = []
for task in tasks:
response = await task
responses.append(response)
return responses
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.004578
My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?
The asyncio. gather() returns the results of awaitables as a tuple with the same order as you pass the awaitables to the function.
async and await Inside an async function, you can use the await keyword before a call to a function that returns a promise. This makes the code wait at that point until the promise is settled, at which point the fulfilled value of the promise is treated as a return value, or the rejected value is thrown.
asyncio. run(coro) will run coro , and return the result. It will always start a new event loop, and it cannot be called when the event loop is already running. This leads to a couple of obvious ways to run your async code.
To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.
My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?
The answer is that you're not supposed to have blocking code in your coroutines. If you must have it, you have to isolate it using run_in_executor
. So the correct way to write request_async
using requests
is:
async def request_async():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, request_sync)
Passing request_async
to run_in_executor
doesn't make sense because the entire point of run_in_executor
is to invoke a sync function in a different thread. If you give it a coroutine function, it will happily call it (in another thread) and provide the returned coroutine object as "result". That's like passing a generator to a code that expects an ordinary function - yes, it will call the generator just fine, but it will not know what to do with the returned object.
In general, you cannot just put async
in front of the def
and expect to get a usable coroutine. A coroutine must not block, except by awaiting other asynchronous code.
Once you have a usable request_async
, you can collect its results like this:
async def main():
coros = [request_async() for _i in range(10)]
results = await asyncio.gather(*coros)
return results
results = loop.run_until_complete(main())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With