Working on replacing my implementation of a server query tool that uses ThreadPoolExecutors
with all asynchronous calls using asyncio
and aiohttp
. Most of the transition is straight forward since network calls are non-blocking IO, it's the saving of the responses that has me in a conundrum.
All the examples I am using, even the docs for both libraries, use asyncio.gather()
which collects all the awaitable results. In my case, these results can be files in the many GB range, and I don't want to store them in memory.
Whats an appropriate way to solve this? Is it to use asyncio.as_completed()
and then:
for f in as_completed(aws):
earliest_result = await f
# Assumes `loop` defined under `if __name__` block outside coroutine
loop = get_event_loop()
# Run the blocking IO in an exectuor and write to file
_ = await loop.run_in_executor(None, save_result, earliest_result)
Doesn't this introduce a thread (assuming I use a ThreadPoolExecutor
by default) thus making this an asynchronous, multi-threaded program vice an asynchronous, single-threaded program?
Futher, does this ensure only 1 earliest_result
is being written to file at any time? I dont want the call to await loop.run_in_executor(...)
to be running, then another result comes in and I try to run to the same file; I could limit with a semaphore I suppose.
I'd suggest to make use of aiohttp Streaming API. Write your responses directly to the disk instead of RAM and return file names instead of responses itself from gather. Doing so won't use a lot of memory at all. This is a small demo of what I mean:
import asyncio
import aiofiles
from aiohttp import ClientSession
async def make_request(session, url):
response = await session.request(method="GET", url=url)
filename = url.split('/')[-1]
async for data in response.content.iter_chunked(1024):
async with aiofiles.open(filename, "ba") as f:
await f.write(data)
return filename
async def main():
urls = ['https://github.com/Tinche/aiofiles',
'https://github.com/aio-libs/aiohttp']
async with ClientSession() as session:
coros = [make_request(session, url) for url in urls]
result_files = await asyncio.gather(*coros)
print(result_files)
asyncio.run(main())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With