I'm having trouble combining async generators and actually running them. This is because the only way I 've found to run them is through an event loop which returns an iterable and not a generator. Let me illustrate this with a simple example:
Let's say I have a function google_search that searches google by scraping it (I'm not using the API on purpose). It takes in a search string and returns a generator of search results. This generator doesn't end when the page is over, the function continues by going on to the next page. Therefore the google_search function returns a possibly nearly endless generator (it will technically always end but often you can get millions of hits for a search on google)
def google_search(search_string):
# Basically uses requests/aiohttp and beautifulsoup
# to parse the resulting html and yield search results
# Assume this function works
......
Okay, so now I want to make a function that allows me to iterate over multiple google_search generators. I'd like something like this:
def google_searches(*search_strings):
for results in zip(google_search(query) for query in search_strings):
yield results
This way I can use a simple for loop to unwind google_searches and get my results. And the above code works well but is very slow for any reasonably big number of searches. The code is sending a request for the first search, then the second and so forth until finally, it yields results. I would like to speed this up (a lot). My first idea is to change google_searches to an async function (I am using python 3.6.3 and can use await/async etc). This then creates an async generator which is fine but I can only run it in another async function or an event loop. And running it in an event loop with run_until_complete(loop.gather(...)) returns a list of results instead of a normal generator, which defeats the purpose as there's probably way too many search results to hold in a list.
How can I make the google_searches function faster (using preferably async code but anything is welcomed) by executing requests asynchronously while still having it be a vanilla generator? Thanks in advance!
The accepted answer waits for one result from EACH async generator before calling the generators again. If data doesn't come at the same exact same pace, that may be a problem. The solution below takes multiple async iterables (generators or not) and iterates all of them simultaneously in multiple coroutines. Each coroutine puts the results in a asyncio.Queue
, which is then iterated by the client code:
Iterator code:
import asyncio
from async_timeout import timeout
class MergeAsyncIterator:
def __init__(self, *it, timeout=60, maxsize=0):
self._it = [self.iter_coro(i) for i in it]
self.timeout = timeout
self._futures = []
self._queue = asyncio.Queue(maxsize=maxsize)
def __aiter__(self):
for it in self._it:
f = asyncio.ensure_future(it)
self._futures.append(f)
return self
async def __anext__(self):
if all(f.done() for f in self._futures) and self._queue.empty():
raise StopAsyncIteration
with timeout(self.timeout):
try:
return await self._queue.get()
except asyncio.CancelledError:
raise StopAsyncIteration
def iter_coro(self, it):
if not hasattr(it, '__aiter__'):
raise ValueError('Object passed must be an AsyncIterable')
return self.aiter_to_queue(it)
async def aiter_to_queue(self, ait):
async for i in ait:
await self._queue.put(i)
await asyncio.sleep(0)
Sample client code:
import random
import asyncio
from datetime import datetime
async def myaiter(name):
for i in range(5):
n = random.randint(0, 3)
await asyncio.sleep(0.1 + n)
yield (name, n)
yield (name, 'DONE')
async def main():
aiters = [myaiter(i) for i in 'abc']
async for i in MergeAsyncIterator(*aiters, timeout=3):
print(datetime.now().strftime('%H:%M:%S.%f'), i)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')
PS: The code above uses the async_timeout
third-party library.
PS2: The aiostream
library does the same as the above code and much more.
def google_search(search_string):
# Basically uses requests/aiohttp and beautifulsoup
This is plain synchronous generator. You would be able to use requests
inside it, but if you want to use asynchronous aiohttp
, you would need asynchronous generator defined with async def
.
What comes to iterating over multiple async generators it's more interesting. You can't use plain zip
since it works with plain iterables, not async iterables. So you should implement your own (that would also support iterating concurrently).
I made a little prototype that I think does what you want:
import asyncio
import aiohttp
import time
# async versions of some builtins:
async def anext(aiterator):
try:
return await aiterator.__anext__()
except StopAsyncIteration as exc:
raise exc
def aiter(aiterable):
return aiterable.__aiter__()
async def azip(*iterables):
iterators = [aiter(it) for it in iterables]
while iterators:
results = await asyncio.gather(
*[anext(it) for it in iterators],
return_exceptions=True,
)
yield tuple(results)
# emulating grabbing:
async def request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def google_search(search_string):
for i in range(999): # big async generator
url = 'http://httpbin.org/delay/{}'.format(i) # increase delay to better see concurency
j = await request(url)
yield search_string + ' ' + str(i)
async def google_searches(*search_strings):
async for results in azip(*[google_search(s) for s in search_strings]):
for result in results:
yield result
# test it works:
async def main():
async for result in google_searches('first', 'second', 'third'):
print(result, int(time.time()))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
Output:
first 0 1514759561
second 0 1514759561
third 0 1514759561
first 1 1514759562
second 1 1514759562
third 1 1514759562
first 2 1514759564
second 2 1514759564
third 2 1514759564
first 3 1514759567
second 3 1514759567
third 3 1514759567
Time shows that different searches run concurrently.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With