Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Yield from Async Generator in Python AsyncIO

I have a simple class that leverages an async generator to retrieve a list of URLs:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

When I execute this main part of the code:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

The log prints out:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

Since responses is an async generator, I expect it to yield one response from the async generator (which should only send the request upon actually yielding), send a separate request to the endpoint with no x parameter, and then yield the next response from the async generator. This should flip back and forth between a request with an x parameter and a request with no parameters. Instead, it is yielding all responses from the async generator with an x parameter and then followed by all of the https requests that have no parameters.

Something similar happens when I do:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

And the log prints:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

Instead, what I want is:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

There are times when I want to retrieve all of the responses first before doing anything else. However, there are also times when I want to interject and make intermediate requests before yielding the next item from the generator (i.e., the generator returns results from paginated search results and I want to process further links from each page before moving onto the next page).

What do I need to change to achieve the required result?

like image 662
slaw Avatar asked May 15 '18 17:05

slaw


People also ask

What is an asynchronous generator in Python?

September 22, 2021 ‐ 1 min read. Asynchronous generator functions are part of Python version 3.6, they were introduced by PEP-525. Asynchronous generator functions are much like regular asynchronous functions except that they contain the yield keyword in the function body.

What is yield in generator function Python?

What Is Yield In Python? The Yield keyword in Python is similar to a return statement used for returning values or objects in Python. However, there is a slight difference. The yield statement returns a generator object to the one who calls the function which contains yield, instead of simply returning a value.

Does await yield Python?

Correct OP answer: No, await (per se) does not yield to the event loop, yield yields to the event loop, hence for the case given: "(2) We jump directly into send_message ".

Is async IO asynchronous?

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.


1 Answers

Leaving aside the technical question of whether responses is an async generator (it's not, as Python uses the term), your problem lies in as_completed. as_completed starts a bunch of coroutines in parallel and provides means to obtain their results as they complete. That the futures run in parallel is not exactly obvious from the documentation (improved in later versions), but it makes sense if you consider that the original concurrent.futures.as_completed works on thread-based futures which revolve around parallel execution. Conceptually, the same is true of asyncio futures.

Your code obtains only the first (fastest-arriving) result and then start doing something else, also using asyncio. The remaining coroutines passed to as_completed are not frozen up merely because no one is collecting their results - they are doing their jobs in the background, and once done are ready to be awaited (in your case by the code inside as_completed, which you access using loop.run_until_complete()). I would venture to guess that the URL without parameters takes longer to retrieve than the URL with just the parameter x, which is why it gets printed after all other coroutines.

In other words, those log lines being printed means that asyncio is doing its job and providing the parallel execution you requested! If you don't want parallel execution, then don't ask for it, execute them serially:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

But this is a poor way of using asyncio - its main loop is non-reentrant, so to ensure composability, you almost certainly want the loop to be spun just once once at top-level. This is typically done with a construct like loop.run_until_complete(main()) or loop.run_forever(). As Martijn pointed out, you could achieve that, while retaining the nice generator API, by making get_routes an actual async generator:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

Now you can have a main() coroutine that looks like this:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())
like image 64
user4815162342 Avatar answered Sep 17 '22 11:09

user4815162342