Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3.6: async version of islice?

I'm trying to do something like this:

import asyncio
from itertools import islice

async def generate_numbers(n):
    for x in range(n):
        yield x


async def consume_numbers(n):
    async for x in generate_numbers(n):
        print(x)

async def consume_some_numbers(n,m):
    async for x in islice(generate_numbers(n),m): #<-- This doesn't work.  islice doesn't recognize async iterators as iterators.
        print(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(consume_numbers(10))
loop.run_until_complete(consume_some_numbers(10,5))

Is there a way to make this work, or at least get similar functionality?

like image 552
Andrew Spott Avatar asked Feb 21 '17 21:02

Andrew Spott


2 Answers

Here is an attempt to implement asyncio friendly islice (and enumerate):

import asyncio
import sys

import random


async def aenumerate(aiterable):
    i = 0
    async for x in aiterable:
        yield i, x
        i += 1


async def aislice(aiterable, *args):
    s = slice(*args)
    it = iter(range(s.start or 0, s.stop or sys.maxsize, s.step or 1))
    try:
        nexti = next(it)
    except StopIteration:
        return
    async for i, element in aenumerate(aiterable):
        if i == nexti:
            yield element
            try:
                nexti = next(it)
            except StopIteration:
                return


async def generate_numbers(n):
    for x in range(n):
        await asyncio.sleep(random.uniform(0.1, 0.4))
        yield x


async def consume_numbers(tag, n):
    print(tag, "start")
    async for x in generate_numbers(n):
        print(tag, x)
    print(tag, "done")


async def consume_some_numbers(tag, n, a, b, step=1):
    print(tag, "start")
    async for x in aislice(generate_numbers(n), a, b, step):
        print(tag, x)
    print(tag, "done")


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    consume_numbers("A", 5),
    consume_numbers("B", 10),
    consume_some_numbers("C", 10, 0, 5),
    consume_some_numbers("D", 30, 3, 20, 4),
    consume_some_numbers("E", 10, 3, 8, 2),
]))
loop.close()

This was not tested in a real world application, comments welcome :-)

like image 85
Udi Avatar answered Sep 26 '22 03:09

Udi


The aiostream library provides generator-based operators for asynchronous iteration. See this example using stream.take:

import asyncio
from aiostream import stream

async def generate_numbers(n):
    for x in range(n):
        yield x

async def consume_some_numbers(n,m):
    async for x in stream.take(generate_numbers(n), m):
        print(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(consume_some_numbers(10, 5))

All the stream operators return an enhanced asynchronous iterable providing extra features such as slicing support. Consider the following example:

import asyncio
from aiostream import stream

async def main():
    xs = stream.count()
    ys = xs[5:10:2]
    async for y in ys:
        print(y)  # Prints 5, 7, 9

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

See more examples in this demonstration and the documentation.

like image 41
Vincent Avatar answered Sep 25 '22 03:09

Vincent