Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio
and aiohttp
to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?
Coroutines are Generators, but their yield accepts values. Coroutines can pause and resume execution (great for concurrency).
Python's generator functions are almost coroutines – but not quite – in that they allow pausing execution to produce a value, but do not provide for values or exceptions to be passed in when execution resumes.
Coroutines and generators are very different concepts. Generators let you create functions that look like iterators to the consumer. Coroutines are an extension of the concept of traditional functions. A function will hand control back to its caller once through the return statement.
Starting with Python 3.6 we have asynchronous generators and able to use yield directly inside coroutines.
asyncio.as_completed()
takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await
the members from inside an async
function...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)
# Prints 'second', then 'third', then 'first'
asyncio.run(main())
... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async
functions are being used under the hood. We can do that by calling loop.run_until_complete()
on the futures yielded by our as_completed
call...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)
# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)
In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async
, or to even know that ordinary_generator
is using asyncio
under the hood.
As an alternative implementation of ordinary_generator()
that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait()
with the FIRST_COMPLETED
flag instead of looping over as_completed()
:
import concurrent.futures
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()
This approach, maintaining a list of pending
jobs, has the advantage that we can adapt it to add jobs to the pending
list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.
One caveat: the approaches above assume we're calling the synchronous code from the main thread, in which case get_event_loop
is guaranteed to give us a loop and we've got no need to .close
it. If we want ordinary_generator
to be usable from a non-main thread, especially one that may have previously had an event loop created, then life gets harder, because we can't rely on get_event_loop
(it raises a RuntimeError
on any non-main thread that doesn't have an event loop yet). In that case the simplest thing I can think to do is to spin off a new thread to run our asyncio
code, and communicate with it via a queue:
def ordinary_generator():
sentinel = object()
queue = Queue()
def thread_entry_point():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
try:
queue.put(loop.run_until_complete(future))
except Exception as e:
queue.put((sentinel, e))
break
loop.close()
queue.put(sentinel)
Thread(target=thread_entry_point).start()
while True:
val = queue.get()
if val is sentinel:
return
if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
raise val[1]
yield val
(Combining the use of run_until_complete
from the penultimate example with the use of an extra thread in the final example is left as an exercise for any reader who needs to do so.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With