Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to merge async generators into a vanilla generator in python 3.5+

I'm having trouble combining async generators and actually running them. This is because the only way I 've found to run them is through an event loop which returns an iterable and not a generator. Let me illustrate this with a simple example:

Let's say I have a function google_search that searches google by scraping it (I'm not using the API on purpose). It takes in a search string and returns a generator of search results. This generator doesn't end when the page is over, the function continues by going on to the next page. Therefore the google_search function returns a possibly nearly endless generator (it will technically always end but often you can get millions of hits for a search on google)

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
    # to parse the resulting html and yield search results
    # Assume this function works
    ......

Okay, so now I want to make a function that allows me to iterate over multiple google_search generators. I'd like something like this:

def google_searches(*search_strings):
    for results in zip(google_search(query) for query in search_strings):
        yield results

This way I can use a simple for loop to unwind google_searches and get my results. And the above code works well but is very slow for any reasonably big number of searches. The code is sending a request for the first search, then the second and so forth until finally, it yields results. I would like to speed this up (a lot). My first idea is to change google_searches to an async function (I am using python 3.6.3 and can use await/async etc). This then creates an async generator which is fine but I can only run it in another async function or an event loop. And running it in an event loop with run_until_complete(loop.gather(...)) returns a list of results instead of a normal generator, which defeats the purpose as there's probably way too many search results to hold in a list.

How can I make the google_searches function faster (using preferably async code but anything is welcomed) by executing requests asynchronously while still having it be a vanilla generator? Thanks in advance!

like image 663
Max Smith Avatar asked Dec 31 '17 17:12

Max Smith


2 Answers

The accepted answer waits for one result from EACH async generator before calling the generators again. If data doesn't come at the same exact same pace, that may be a problem. The solution below takes multiple async iterables (generators or not) and iterates all of them simultaneously in multiple coroutines. Each coroutine puts the results in a asyncio.Queue, which is then iterated by the client code:

Iterator code:

import asyncio
from async_timeout import timeout

class MergeAsyncIterator:
    def __init__(self, *it, timeout=60, maxsize=0):
        self._it = [self.iter_coro(i) for i in it]
        self.timeout = timeout
        self._futures = []
        self._queue = asyncio.Queue(maxsize=maxsize)

    def __aiter__(self):
        for it in self._it:
            f = asyncio.ensure_future(it)
            self._futures.append(f)
        return self

    async def __anext__(self):
        if all(f.done() for f in self._futures) and self._queue.empty():
            raise StopAsyncIteration
        with timeout(self.timeout):
            try:
                return await self._queue.get()
            except asyncio.CancelledError:
                raise StopAsyncIteration

    def iter_coro(self, it):
        if not hasattr(it, '__aiter__'):
            raise ValueError('Object passed must be an AsyncIterable')
        return self.aiter_to_queue(it)

    async def aiter_to_queue(self, ait):
        async for i in ait:
            await self._queue.put(i)
            await asyncio.sleep(0)

Sample client code:

import random
import asyncio
from datetime import datetime

async def myaiter(name):
    for i in range(5):
        n = random.randint(0, 3)
        await asyncio.sleep(0.1 + n)
        yield (name, n)
    yield (name, 'DONE')

async def main():
    aiters = [myaiter(i) for i in 'abc']
    async for i in MergeAsyncIterator(*aiters, timeout=3):
        print(datetime.now().strftime('%H:%M:%S.%f'), i)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Output:

14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')

PS: The code above uses the async_timeout third-party library.
PS2: The aiostream library does the same as the above code and much more.

like image 141
Gustavo Bezerra Avatar answered Nov 05 '22 10:11

Gustavo Bezerra


def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup

This is plain synchronous generator. You would be able to use requests inside it, but if you want to use asynchronous aiohttp, you would need asynchronous generator defined with async def.

What comes to iterating over multiple async generators it's more interesting. You can't use plain zip since it works with plain iterables, not async iterables. So you should implement your own (that would also support iterating concurrently).

I made a little prototype that I think does what you want:

import asyncio
import aiohttp
import time


# async versions of some builtins:
async def anext(aiterator):
    try:
        return await aiterator.__anext__()
    except StopAsyncIteration as exc:
        raise exc


def aiter(aiterable):
    return aiterable.__aiter__()


async def azip(*iterables):
    iterators = [aiter(it) for it in iterables]
    while iterators:
        results = await asyncio.gather(
            *[anext(it) for it in iterators],
            return_exceptions=True,
        )
        yield tuple(results)


# emulating grabbing:
async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def google_search(search_string):
    for i in range(999):  # big async generator
        url = 'http://httpbin.org/delay/{}'.format(i)  # increase delay to better see concurency
        j = await request(url)
        yield search_string + ' ' + str(i)


async def google_searches(*search_strings):
    async for results in azip(*[google_search(s) for s in search_strings]):
        for result in results:
            yield result


# test it works:
async def main():
    async for result in google_searches('first', 'second', 'third'):
        print(result, int(time.time()))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

Output:

first 0 1514759561
second 0 1514759561
third 0 1514759561
first 1 1514759562
second 1 1514759562
third 1 1514759562
first 2 1514759564
second 2 1514759564
third 2 1514759564
first 3 1514759567
second 3 1514759567
third 3 1514759567

Time shows that different searches run concurrently.

like image 38
Mikhail Gerasimov Avatar answered Nov 05 '22 11:11

Mikhail Gerasimov