Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Join multiple async generators in Python [duplicate]

I would like to listen for events from multiple instances of the same object and then merge this event streams to one stream. For example, if I use async generators:

class PeriodicYielder: 
    def __init__(self, period: int) -> None: 
        self.period = period 

    async def updates(self): 
        while True: 
            await asyncio.sleep(self.period)
            yield self.period

I can successfully listen for events from one instance:

async def get_updates_from_one(): 
    each_1 = PeriodicYielder(1) 
    async for n in each_1.updates(): 
        print(n)
# 1
# 1
# 1
# ...

But how can I get events from multiple async generators? In other words: how can I iterate through multiple async generators in the order they are ready to produce next value?

async def get_updates_from_multiple(): 
    each_1 = PeriodicYielder(1) 
    each_2 = PeriodicYielder(2) 
    async for n in magic_async_join_function(each_1.updates(), each_2.updates()): 
        print(n)
# 1
# 1
# 2
# 1
# 1
# 2
# ...

Is there such magic_async_join_function in stdlib or in 3rd party module?

like image 584
Andrey Semakin Avatar asked Mar 22 '19 12:03

Andrey Semakin


1 Answers

You can use wonderful aiostream library. It'll look like this:

import asyncio
from aiostream import stream


async def test1():
    for _ in range(5):
        await asyncio.sleep(0.1)
        yield 1


async def test2():
    for _ in range(5):
        await asyncio.sleep(0.2)
        yield 2


async def main():
    combine = stream.merge(test1(), test2())

    async with combine.stream() as streamer:
        async for item in streamer:
            print(item)


asyncio.run(main())

Result:

1
1
2
1
1
2
1
2
2
2
like image 161
Mikhail Gerasimov Avatar answered Nov 10 '22 00:11

Mikhail Gerasimov