Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how can I asynchronously map/filter an asynchronous iterable?

Let's say I have an asynchronous iterable that I can pass over using async for, how then can I then map and filter it to a new asynchronous iterator? The following code which is an adaptation of how I'd do the same thing with a synchronous iterable doesn't work, since yield isn't allowed inside async defs.

async def mapfilter(aiterable, p, func):
    async for payload in aiterable:
        if p(payload):

            # This part isn't allowed, but hopefully it should be clear
            # what I'm trying to accomplish.
            yield func(payload)
like image 740
Te-jé Rodgers Avatar asked Mar 18 '16 20:03

Te-jé Rodgers


People also ask

Can you use async in map function?

map() algorithm applies an async callback to each element of an array, creating promises as it does. However, the returned result by . map() is no promise, but an array of promises.

How do you use async await in DART map?

map( (i) async => await foo(i) // Returns a Future, not an int ); You are printing are the Futures returned by (i) async => await foo(i) . Those Futures complete when the chain of Futures within them complete. When the Timer fires: foo() completes, then await foo(i) , then your mapping function.

Can I use async in Map Javascript?

The map function An async version needs to do two things. First, it needs to map every item to a Promise with the new value, which is what adding async before the function does. And second, it needs to wait for all the Promises then collect the results in an Array.

How do I run asynchronously in Python?

To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.


2 Answers

A recently published PEP draft (PEP 525), whose support is scheduled for Python 3.6, proposes to allow Asynchronous Generators with the same syntax you came up with.

Meanwhile, you can also use the asyncio_extras library mentioned by CryingCyclops in its comment if you don't want to deal with the asynchronous iterator boilerplate.

From the docs:

@async_generator
async def mygenerator(websites):
    for website in websites:
        page = await http_fetch(website)
        await yield_async(page)

async def fetch_pages():
    websites = ('http://foo.bar', 'http://example.org')
    async for sanitized_page in mygenerator(websites):
        print(sanitized_page)

There is also the async_generator library which supports yield from constructs.

like image 127
lbonn Avatar answered Oct 24 '22 05:10

lbonn


You can't use yield inside coroutines. To implement your idea, only way I see is to implement Asynchronous Iterator. If I'm right, something like that:

class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()  # StopAsyncIteration would be raise here on no new values
            if self.p(payload):
                return self.func(payload)

Let's test it. Here's complete example with helper arange class (I took it from here):

import asyncio


class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration


class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()
            if self.p(payload):
                return self.func(payload)


async def main():
    aiterable = arange(5)
    p = lambda x: bool(x>2)
    func = lambda x: x*2

    async for i in MapFilter(aiterable, p, func):
        print(i)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

6
8
like image 30
Mikhail Gerasimov Avatar answered Oct 24 '22 04:10

Mikhail Gerasimov