Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create generator that yields coroutine results as the coroutines finish

Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio and aiohttp to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?

like image 645
Mark Amery Avatar asked Jan 27 '17 19:01

Mark Amery


People also ask

Are coroutines generators?

Coroutines are Generators, but their yield accepts values. Coroutines can pause and resume execution (great for concurrency).

Is a Python generator a coroutine?

Python's generator functions are almost coroutines – but not quite – in that they allow pausing execution to produce a value, but do not provide for values or exceptions to be passed in when execution resumes.

What differentiates a generator from a coroutine?

Coroutines and generators are very different concepts. Generators let you create functions that look like iterators to the consumer. Coroutines are an extension of the concept of traditional functions. A function will hand control back to its caller once through the return statement.

Are Python generators asynchronous?

Starting with Python 3.6 we have asynchronous generators and able to use yield directly inside coroutines.


1 Answers

asyncio.as_completed() takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await the members from inside an async function...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

async def main():
    for future in asyncio.as_completed([first(), second(), third()]):
        print(await future)

# Prints 'second', then 'third', then 'first'
asyncio.run(main())

... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async functions are being used under the hood. We can do that by calling loop.run_until_complete() on the futures yielded by our as_completed call...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed([first(), second(), third()]):
        yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
    print(element)

In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async, or to even know that ordinary_generator is using asyncio under the hood.

As an alternative implementation of ordinary_generator() that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait() with the FIRST_COMPLETED flag instead of looping over as_completed():

import concurrent.futures

def ordinary_generator():
    loop = asyncio.get_event_loop()
    pending = [first(), second(), third()]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(
                pending,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
        )
        for job in done:
            yield job.result()

This approach, maintaining a list of pending jobs, has the advantage that we can adapt it to add jobs to the pending list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.

One caveat: the approaches above assume we're calling the synchronous code from the main thread, in which case get_event_loop is guaranteed to give us a loop and we've got no need to .close it. If we want ordinary_generator to be usable from a non-main thread, especially one that may have previously had an event loop created, then life gets harder, because we can't rely on get_event_loop (it raises a RuntimeError on any non-main thread that doesn't have an event loop yet). In that case the simplest thing I can think to do is to spin off a new thread to run our asyncio code, and communicate with it via a queue:

def ordinary_generator():
    sentinel = object()
    queue = Queue()

    def thread_entry_point():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        for future in asyncio.as_completed([first(), second(), third()]):
            try:
                queue.put(loop.run_until_complete(future))
            except Exception as e:
                queue.put((sentinel, e))
                break
        loop.close()
        queue.put(sentinel)

    Thread(target=thread_entry_point).start()
    while True:
        val = queue.get()
        if val is sentinel:
            return
        if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
            raise val[1]
        yield val

(Combining the use of run_until_complete from the penultimate example with the use of an extra thread in the final example is left as an exercise for any reader who needs to do so.)

like image 102
Mark Amery Avatar answered Nov 15 '22 21:11

Mark Amery