Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging async iterables in python3

Is there a good way, or a well-supported library, for merging async iterators in python3?

The desired behavior is basically the same as that of merging observables in reactivex.

That is, in the normal case, if I'm merging two async iterator, I want the resulting async iterator to yield results chronologically. An error in one of the iterators should derail the merged iterator.

Merging Observables

(Source: http://reactivex.io/documentation/operators/merge.html)

This is my best attempt, but it seems like something there might be a standard solution to:

async def drain(stream, q, sentinal=None):
    try:
        async for item in stream:
            await q.put(item)
        if sentinal:
            await q.put(sentinal)
    except BaseException as e:
        await q.put(e)


async def merge(*streams):

    q = asyncio.Queue()
    sentinal = namedtuple("QueueClosed", ["truthy"])(True)

    futures = {
        asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
    }

    remaining = len(streams)
    while remaining > 0:
        result = await q.get()
        if result is sentinal:
            remaining -= 1
            continue
        if isinstance(result, BaseException):
            raise result
        yield result


if __name__ == "__main__":

    # Example: Should print:
    #   1
    #   2
    #   3
    #   4

    loop = asyncio.get_event_loop()

    async def gen():
        yield 1
        await asyncio.sleep(1.5)
        yield 3

    async def gen2():
        await asyncio.sleep(1)
        yield 2
        await asyncio.sleep(1)
        yield 4

    async def go():
        async for x in merge(gen(), gen2()):
            print(x)

    loop.run_until_complete(go())
like image 828
samfrances Avatar asked Feb 16 '18 16:02

samfrances


1 Answers

You can use aiostream.stream.merge:

from aiostream import stream

async def go():
    async for x in stream.merge(gen(), gen2()):
        print(x)

More examples in the documentation and this answer.

like image 77
Vincent Avatar answered Nov 10 '22 17:11

Vincent