I would like to listen for events from multiple instances of the same object and then merge this event streams to one stream. For example, if I use async generators:
class PeriodicYielder: 
    def __init__(self, period: int) -> None: 
        self.period = period 
    async def updates(self): 
        while True: 
            await asyncio.sleep(self.period)
            yield self.period
I can successfully listen for events from one instance:
async def get_updates_from_one(): 
    each_1 = PeriodicYielder(1) 
    async for n in each_1.updates(): 
        print(n)
# 1
# 1
# 1
# ...
But how can I get events from multiple async generators? In other words: how can I iterate through multiple async generators in the order they are ready to produce next value?
async def get_updates_from_multiple(): 
    each_1 = PeriodicYielder(1) 
    each_2 = PeriodicYielder(2) 
    async for n in magic_async_join_function(each_1.updates(), each_2.updates()): 
        print(n)
# 1
# 1
# 2
# 1
# 1
# 2
# ...
Is there such magic_async_join_function in stdlib or in 3rd party module?
You can use wonderful aiostream library. It'll look like this:
import asyncio
from aiostream import stream
async def test1():
    for _ in range(5):
        await asyncio.sleep(0.1)
        yield 1
async def test2():
    for _ in range(5):
        await asyncio.sleep(0.2)
        yield 2
async def main():
    combine = stream.merge(test1(), test2())
    async with combine.stream() as streamer:
        async for item in streamer:
            print(item)
asyncio.run(main())
Result:
1
1
2
1
1
2
1
2
2
2
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With