Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wrap asyncio with iterator

I have the following simplified code:

async def asynchronous_function(*args, **kwds):
    statement = await prepare(query)
    async with conn.transaction():
        async for record in statement.cursor():
            ??? yield record ???

...

class Foo:

    def __iter__(self):
        records = ??? asynchronous_function ???
        yield from records

...

x = Foo()
for record in x:
    ...

I don't know how to fill in the ??? above. I want to yield the record data, but it's really not obvious how to wrap asyncio code.

like image 546
Brian Bruggeman Avatar asked Mar 13 '19 00:03

Brian Bruggeman


2 Answers

While it is true that asyncio is intended to be used across the board, sometimes it is simply impossible to immediately convert a large piece of software (with all its dependencies) to async. Fortunately there are ways to combine legacy synchronous code with newly written asyncio portions. A straightforward way to do so is by running the event loop in a dedicated thread, and using asyncio.run_coroutine_threadsafe to submit tasks to it.

With those low-level tools you can write a generic adapter to turn any asynchronous iterator into a synchronous one. For example:

import asyncio, threading, queue

# create an asyncio loop that runs in the background to
# serve our asyncio needs
loop = asyncio.get_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

def wrap_async_iter(ait):
    """Wrap an asynchronous iterator into a synchronous one"""
    q = queue.Queue()
    _END = object()

    def yield_queue_items():
        while True:
            next_item = q.get()
            if next_item is _END:
                break
            yield next_item
        # After observing _END we know the aiter_to_queue coroutine has
        # completed.  Invoke result() for side effect - if an exception
        # was raised by the async iterator, it will be propagated here.
        async_result.result()

    async def aiter_to_queue():
        try:
            async for item in ait:
                q.put(item)
        finally:
            q.put(_END)

    async_result = asyncio.run_coroutine_threadsafe(aiter_to_queue(), loop)
    return yield_queue_items()

Then your code just needs to call wrap_async_iter to wrap an async iter into a sync one:

async def mock_records():
    for i in range(3):
        yield i
        await asyncio.sleep(1)

for record in wrap_async_iter(mock_records()):
    print(record)

In your case Foo.__iter__ would use yield from wrap_async_iter(asynchronous_function(...)).

like image 179
user4815162342 Avatar answered Nov 02 '22 09:11

user4815162342


If you want to receive all records from async generator, you can use async for or, for shortness, asynchronous comprehensions:

async def asynchronous_function(*args, **kwds):
    # ...
    yield record


async def aget_records():
    records = [
        record 
        async for record 
        in asynchronous_function()
    ]
    return records

If you want to get result from asynchronous function synchronously (i.e. blocking), you can just run this function in asyncio loop:

def get_records():
    records = asyncio.run(aget_records())
    return records

Note, however, that once you run some coroutine in event loop you're losing ability to run this coroutine concurrently (i.e. parallel) with other coroutines and thus receive all related benefits.

As Vincent already pointed in comments, asyncio is not a magic wand that makes code faster, it's an instrument that sometimes can be used to run different I/O tasks concurrently with low overhead.

You may be interested in reading this answer to see main idea behind asyncio.

like image 24
Mikhail Gerasimov Avatar answered Nov 02 '22 08:11

Mikhail Gerasimov