Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronous generator in asyncio

I have the following scenario:

  1. I have a blocking, synchronous generator
  2. I have an non-blocking, async function

I would like to run blocking generator (executed in a ThreadPool) and the async function on the event loop. How do I achieve this?

The following function simply prints the output from the generator, not from sleep function.

Thanks!

from concurrent.futures import ThreadPoolExecutor

import numpy as np
import asyncio
import time


def f():
    while True:
        r = np.random.randint(0, 3)
        time.sleep(r)
        yield r


async def gen():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    gen = await loop.run_in_executor(executor, f)
    for item in gen:
        print(item)
        print('Inside generator')


async def sleep():
    while True:
        await asyncio.sleep(1)
        print('Inside async sleep')


async def combine():
    await asyncio.gather(sleep(), gen())


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(combine())


if __name__ == '__main__':
    main()
like image 283
abhinavkulkarni Avatar asked Oct 31 '25 15:10

abhinavkulkarni


1 Answers

run_in_executor doesn't work on generators because it is designed for blocking functions. While a generator is a valid function, it returns immediately when called, providing an object that the caller is supposed to exhaust through repeated invocations of next. (This is what Python's for loop does under the hood.) To use a blocking generator from async code, you have two choices:

  • wrap each step of the iteration (each individual call to next) in a separate call to run_in_executor, or
  • start a for loop in a separate thread and use a queue to transfer the objects to an async consumer.

Either approach can be abstracted into a function that accepts an iterator and returns an equivalent async iterator. This is an implementation of the second approach:

import asyncio, threading

def async_wrap_iter(it):
    """Wrap blocking iterator into an asynchronous one"""
    loop = asyncio.get_event_loop()
    q = asyncio.Queue(1)
    exception = None
    _END = object()

    async def yield_queue_items():
        while True:
            next_item = await q.get()
            if next_item is _END:
                break
            yield next_item
        if exception is not None:
            # the iterator has raised, propagate the exception
            raise exception

    def iter_to_queue():
        nonlocal exception
        try:
            for item in it:
                # This runs outside the event loop thread, so we
                # must use thread-safe API to talk to the queue.
                asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
        except Exception as e:
            exception = e
        finally:
            asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()

    threading.Thread(target=iter_to_queue).start()
    return yield_queue_items()

It can be tested with a trivial sync iterator that uses time.time() to block and an async heartbeat function to prove that the event loop is running:

# async_wrap_iter definition as above

import time

def test_iter():
    for i in range(5):
        yield i
        time.sleep(1)

async def test():
    ait = async_wrap_iter(test_iter())
    async for i in ait:
        print(i)

async def heartbeat():
    while True:
        print('alive')
        await asyncio.sleep(.1)

async def main():
    asyncio.create_task(heartbeat())
    await test()

asyncio.run(main())
like image 131
user4815162342 Avatar answered Nov 03 '25 01:11

user4815162342



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!