Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Chunks of async_generator

I can get chunks of an iterator doing as follow:

def get_chunks_it(l, n):
    """ Chunks an iterator `l` in size `n`
    Args:
        l (Iterator[Any]): an iterator
        n (int): size of
    Returns:
        Generator[Any]
    """
    iterator = iter(l)
    for first in iterator:
        yield itertools.chain([first], itertools.islice(iterator, n - 1))

Now let's say I have an asynchronous generator (python 3.6):

async def generator():
    for i in range(0, 10):
        yield i
        await asyncio.sleep(1)

How could I get chunks (let's say of size 3 that would yield [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]) of the resulting async_generator so that I could write:

async for chunk in get_chunk_it_async(generator(), 3):
    print(chunk)
like image 337
Orelus Avatar asked Jun 19 '17 09:06

Orelus


People also ask

What is an Asyncgenerator?

An async generator is similar to a regular generator except that its next() method returns a Promise. To iterate over an async generator, you use the for await...of statement.

What is Async_generator in Python?

September 22, 2021 ‐ 1 min read. Asynchronous generator functions are part of Python version 3.6, they were introduced by PEP-525. Asynchronous generator functions are much like regular asynchronous functions except that they contain the yield keyword in the function body.

Is async await built on generators?

Async/await makes it easier to implement a particular use case of Generators. The return value of the generator is always {value: X, done: Boolean} whereas for async functions, it will always be a promise that will either resolve to the value X or throw an error.

Can generator functions be async?

Among those features are generator functions and async/await. Generator functions give you the ability to pause and continue the execution of a program. In contrast, async/await gives you the ability to write asynchronous code without falling into "callback hell", which you risk when writing standard promises.


Video Answer


2 Answers

You can use aiostream.stream.chunks:

from aiostream import stream

async def main():
    async for x in stream.chunks(generator(), 3):
        print(x)

Output:

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]

See the project page and the documentation for further information.

Disclaimer: I am the project maintainer.

like image 187
Vincent Avatar answered Nov 15 '22 23:11

Vincent


This is slightly complicated by the lack of an aiter() function in Python 3.6 (it'll be added in 3.7 once returning an awaitable from __aiter__ is properly deprecated). There are no async versions of itertools objects yet either.

Define your own:

try:
    aiter
except NameError:
    # not yet a built-in, define our own shim for now
    from inspect import iscoroutinefunction as _isasync
    def aiter(ob, _isasync=_isasync):
        obtype = type(ob)  # magic methods are looked up on the type
        if not hasattr(obtype, '__aiter__'):
            raise TypeError(f'{obtype.__name__!r} object is not async iterable')
        async_iter = obtype.__aiter__(ob)
        if _isasync(async_iter):
            # PEP 492 allowed for __aiter__ to be a coroutine, but 525 reverses this again
            raise TypeError(f'{obtype.__name__!r} object is not async iterable')
        return async_iter
    del _isasync

Next, you need to define async islice and chain objects:

class achain():
    """Chain over multiple async iterators"""
    def __init__(self, *async_iterables):
        self._source = iter(async_iterables)
        self._active = None
    def __aiter__(self):
        return self
    async def __anext__(self):
        if self._source is None:
            # we are all done, nothing more to produce
            raise StopAsyncIteration
        if self._active is None:
            # get next async iterable to loop over
            ait = next(self._source, None)
            if ait is None:
                # we are all done, nothing more to produce
                self._source = None
                raise StopAsyncIteration
            self._active = aiter(ait)
        try:
            return await type(ait).__anext__(ait)
        except StopAsyncIteration:
            # exhausted, recurse
            self._active = None
            return await self.__anext__()

class aslice():
    """Slice an async iterator"""
    def __init__(self, ait, start, stop=None, step=1):
        if stop is None:
            start, stop = 0, start
        self._ait = ait
        self._next, self._stop, self._step = start, stop, step
        self._cnt = 0
    def __aiter__(self):
        return self
    async def __anext__(self):
        ait, stop = self._ait, self._stop
        if ait is None:
            raise StopAsyncIteration
        anext = type(ait).__anext__
        while self._cnt < self._next:
            try:
                await anext(ait)
            except StopAsyncIteration:
                self._ait = None
                raise
            self._cnt += 1
        if stop is not None and self._cnt >= stop:
            self._ait = None
            raise StopAsyncIteration
        try:
            item = await anext(ait)
        except StopAsyncIteration:
            self._ait = None
            raise
        self._cnt += 1
        self._next += self._step
        return item

With those in place, simply add async in the right places:

async def get_chunks_it(l, n):
    """ Chunks an async iterator `l` in size `n`
    Args:
        l (Iterator[Any]): an iterator
        n (int): size of
    Returns:
        Generator[Any]
    """
    iterator = aiter(l)
    async for first in iterator:
        async def afirst():
            yield first
        yield achain(afirst, aslice(iterator, n - 1))
like image 26
Martijn Pieters Avatar answered Nov 15 '22 21:11

Martijn Pieters