Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing web responses to file in an asyncronous program

Working on replacing my implementation of a server query tool that uses ThreadPoolExecutors with all asynchronous calls using asyncio and aiohttp. Most of the transition is straight forward since network calls are non-blocking IO, it's the saving of the responses that has me in a conundrum.

All the examples I am using, even the docs for both libraries, use asyncio.gather() which collects all the awaitable results. In my case, these results can be files in the many GB range, and I don't want to store them in memory.

Whats an appropriate way to solve this? Is it to use asyncio.as_completed() and then:

for f in as_completed(aws):
    earliest_result = await f
    # Assumes `loop` defined under `if __name__` block outside coroutine
    loop = get_event_loop()
    # Run the blocking IO in an exectuor and write to file
    _ = await loop.run_in_executor(None, save_result, earliest_result)

Doesn't this introduce a thread (assuming I use a ThreadPoolExecutor by default) thus making this an asynchronous, multi-threaded program vice an asynchronous, single-threaded program?

Futher, does this ensure only 1 earliest_result is being written to file at any time? I dont want the call to await loop.run_in_executor(...) to be running, then another result comes in and I try to run to the same file; I could limit with a semaphore I suppose.

like image 638
pstatix Avatar asked Nov 06 '22 20:11

pstatix


1 Answers

I'd suggest to make use of aiohttp Streaming API. Write your responses directly to the disk instead of RAM and return file names instead of responses itself from gather. Doing so won't use a lot of memory at all. This is a small demo of what I mean:

import asyncio

import aiofiles
from aiohttp import ClientSession


async def make_request(session, url):
    response = await session.request(method="GET", url=url)
    filename = url.split('/')[-1]
    async for data in response.content.iter_chunked(1024):
        async with aiofiles.open(filename, "ba") as f:
            await f.write(data)
    return filename


async def main():
    urls = ['https://github.com/Tinche/aiofiles',
            'https://github.com/aio-libs/aiohttp']
    async with ClientSession() as session:
        coros = [make_request(session, url) for url in urls]
        result_files = await asyncio.gather(*coros)
    print(result_files)


asyncio.run(main())
like image 74
merrydeath Avatar answered Nov 14 '22 22:11

merrydeath