Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Appending to merged async generators in Python

I'm trying to merge a bunch of asynchronous generators in Python 3.7 while still adding new async generators on iteration. I'm currently using aiostream to merge my generators:

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
    yield 0
    await sleep(1)
    yield 50
    await sleep(1)
    yield 100

async def main():
    tasks = merge(go(), go(), go())

    async for v in tasks:
        print(v)

if __name__ == '__main__':
    run(main())

However, I need to be able to continue to add to the running tasks once the loop has begun. Something like.

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
    yield 0
    await sleep(1)
    yield 50
    await sleep(1)
    yield 100

async def main():
    tasks = merge(go(), go(), go())

    async for v in tasks:
        if v == 50:
            tasks.merge(go())
        print(v)

if __name__ == '__main__':
    run(main())

The closest I've got to this is using the aiostream library but maybe this can also be written fairly neatly with just the native asyncio standard library.

like image 580
freebie Avatar asked Jul 18 '18 10:07

freebie


2 Answers

Here is an implementation that should work efficiently even with a large number of async iterators:

class merge:
    def __init__(self, *iterables):
        self._iterables = list(iterables)
        self._wakeup = asyncio.Event()

    def _add_iters(self, next_futs, on_done):
        for it in self._iterables:
            it = it.__aiter__()
            nfut = asyncio.ensure_future(it.__anext__())
            nfut.add_done_callback(on_done)
            next_futs[nfut] = it
        del self._iterables[:]
        return next_futs

    async def __aiter__(self):
        done = {}
        next_futs = {}
        def on_done(nfut):
            done[nfut] = next_futs.pop(nfut)
            self._wakeup.set()

        self._add_iters(next_futs, on_done)
        try:
            while next_futs:
                await self._wakeup.wait()
                self._wakeup.clear()
                for nfut, it in done.items():
                    try:
                        ret = nfut.result()
                    except StopAsyncIteration:
                        continue
                    self._iterables.append(it)
                    yield ret
                done.clear()
                if self._iterables:
                    self._add_iters(next_futs, on_done)
        finally:
            # if the generator exits with an exception, or if the caller stops
            # iterating, make sure our callbacks are removed
            for nfut in next_futs:
                nfut.remove_done_callback(on_done)

    def append_iter(self, new_iter):
        self._iterables.append(new_iter)
        self._wakeup.set()

The only change required for your sample code is that the method is named append_iter, not merge.

like image 119
user4815162342 Avatar answered Sep 21 '22 12:09

user4815162342


This can be done using stream.flatten with an asyncio queue to store the new generators.

import asyncio
from aiostream import stream, pipe

async def main():
    queue = asyncio.Queue()
    await queue.put(go())
    await queue.put(go())
    await queue.put(go())

    xs = stream.call(queue.get)
    ys = stream.cycle(xs)
    zs = stream.flatten(ys, task_limit=5)
    async with zs.stream() as streamer:
        async for item in streamer:
            if item == 50:
                await queue.put(go())
            print(item)

Notice that you may tune the number of tasks that can run at the same time using the task_limit argument. Also note that zs can be elegantly defined using the pipe syntax:

zs = stream.call(queue.get) | pipe.cycle() | pipe.flatten(task_limit=5)

Disclaimer: I am the project maintainer.

like image 34
Vincent Avatar answered Sep 19 '22 12:09

Vincent