I'm trying to do something like this:
import asyncio
from itertools import islice
async def generate_numbers(n):
for x in range(n):
yield x
async def consume_numbers(n):
async for x in generate_numbers(n):
print(x)
async def consume_some_numbers(n,m):
async for x in islice(generate_numbers(n),m): #<-- This doesn't work. islice doesn't recognize async iterators as iterators.
print(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(consume_numbers(10))
loop.run_until_complete(consume_some_numbers(10,5))
Is there a way to make this work, or at least get similar functionality?
Here is an attempt to implement asyncio friendly islice (and enumerate):
import asyncio
import sys
import random
async def aenumerate(aiterable):
i = 0
async for x in aiterable:
yield i, x
i += 1
async def aislice(aiterable, *args):
s = slice(*args)
it = iter(range(s.start or 0, s.stop or sys.maxsize, s.step or 1))
try:
nexti = next(it)
except StopIteration:
return
async for i, element in aenumerate(aiterable):
if i == nexti:
yield element
try:
nexti = next(it)
except StopIteration:
return
async def generate_numbers(n):
for x in range(n):
await asyncio.sleep(random.uniform(0.1, 0.4))
yield x
async def consume_numbers(tag, n):
print(tag, "start")
async for x in generate_numbers(n):
print(tag, x)
print(tag, "done")
async def consume_some_numbers(tag, n, a, b, step=1):
print(tag, "start")
async for x in aislice(generate_numbers(n), a, b, step):
print(tag, x)
print(tag, "done")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
consume_numbers("A", 5),
consume_numbers("B", 10),
consume_some_numbers("C", 10, 0, 5),
consume_some_numbers("D", 30, 3, 20, 4),
consume_some_numbers("E", 10, 3, 8, 2),
]))
loop.close()
This was not tested in a real world application, comments welcome :-)
The aiostream library provides generator-based operators for asynchronous iteration. See this example using stream.take:
import asyncio
from aiostream import stream
async def generate_numbers(n):
for x in range(n):
yield x
async def consume_some_numbers(n,m):
async for x in stream.take(generate_numbers(n), m):
print(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(consume_some_numbers(10, 5))
All the stream operators return an enhanced asynchronous iterable providing extra features such as slicing support. Consider the following example:
import asyncio
from aiostream import stream
async def main():
xs = stream.count()
ys = xs[5:10:2]
async for y in ys:
print(y) # Prints 5, 7, 9
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
See more examples in this demonstration and the documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With