UPDATED QUESTION FOR CLARITY:
suppose I have 2 processing generator functions:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
I can chain them with itertools
from itertools import chain
mix = chain(gen1(), gen2())
and then I can create another generator function object with it,
def mix_yield():
for item in mix:
yield item
or simply if I just want to next(mix)
, it's there.
My question is, how can I do the equivalent in asynchronous code?
Because I need it to:
next
iteratorPREV. UPDATE:
After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
but I still can't do next(a_mix)
TypeError: 'merge' object is not an iterator
or next(await a_mix)
raise StreamEmpty()
Although I still can make it into a list:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
so one goal is completed, one more to go:
return in yield (one by one), or with next
iterator
- the fastest resolved yield first (async)
Python's next
built-in function is just a convenient way of invoking the underlying __next__
method on the object. The async equivalent of __next__
is the __anext__
method on the async iterator. There is no anext
global function in the standard library (the aiostream library provides one), but one could easily write it:
async def anext(aiterator):
return await aiterator.__anext__()
But the savings is so small that, in rare situations when this is needed, one may as well invoke __anext__
directly. The async iterator is in turn obtained from an async iterable by calling the __aiter__
(in analogy to __iter__
provided by regular iterables). Async iteration driven manually looks like this:
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
__anext__
will raise StopAsyncIteration
when no more elements are available. To loop over async iterators one should use async for
.
Here is a runnable example, based on your code, using both __anext__
and async for
to exhaust the stream set up with aiostream.stream.combine.merge
:
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())
I came across this answer and I looked at the aiostream library. Here is the code I came up with to merge multiple async generators. It does not use any library.
async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]
Hope this helps you and let me know if there are any bugs in this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With