I would like to listen for events from multiple instances of the same object and then merge this event streams to one stream. For example, if I use async generators:
class PeriodicYielder:
def __init__(self, period: int) -> None:
self.period = period
async def updates(self):
while True:
await asyncio.sleep(self.period)
yield self.period
I can successfully listen for events from one instance:
async def get_updates_from_one():
each_1 = PeriodicYielder(1)
async for n in each_1.updates():
print(n)
# 1
# 1
# 1
# ...
But how can I get events from multiple async generators? In other words: how can I iterate through multiple async generators in the order they are ready to produce next value?
async def get_updates_from_multiple():
each_1 = PeriodicYielder(1)
each_2 = PeriodicYielder(2)
async for n in magic_async_join_function(each_1.updates(), each_2.updates()):
print(n)
# 1
# 1
# 2
# 1
# 1
# 2
# ...
Is there such magic_async_join_function in stdlib or in 3rd party module?
You can use wonderful aiostream library. It'll look like this:
import asyncio
from aiostream import stream
async def test1():
for _ in range(5):
await asyncio.sleep(0.1)
yield 1
async def test2():
for _ in range(5):
await asyncio.sleep(0.2)
yield 2
async def main():
combine = stream.merge(test1(), test2())
async with combine.stream() as streamer:
async for item in streamer:
print(item)
asyncio.run(main())
Result:
1
1
2
1
1
2
1
2
2
2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With