Let's say I have an asynchronous iterable that I can pass over using async for
, how then can I then map and filter it to a new asynchronous iterator? The following code which is an adaptation of how I'd do the same thing with a synchronous iterable doesn't work, since yield
isn't allowed inside async def
s.
async def mapfilter(aiterable, p, func):
async for payload in aiterable:
if p(payload):
# This part isn't allowed, but hopefully it should be clear
# what I'm trying to accomplish.
yield func(payload)
map() algorithm applies an async callback to each element of an array, creating promises as it does. However, the returned result by . map() is no promise, but an array of promises.
map( (i) async => await foo(i) // Returns a Future, not an int ); You are printing are the Futures returned by (i) async => await foo(i) . Those Futures complete when the chain of Futures within them complete. When the Timer fires: foo() completes, then await foo(i) , then your mapping function.
The map function An async version needs to do two things. First, it needs to map every item to a Promise with the new value, which is what adding async before the function does. And second, it needs to wait for all the Promises then collect the results in an Array.
To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.
A recently published PEP draft (PEP 525), whose support is scheduled for Python 3.6, proposes to allow Asynchronous Generators with the same syntax you came up with.
Meanwhile, you can also use the asyncio_extras
library mentioned by CryingCyclops in its comment if you don't want to deal with the asynchronous iterator boilerplate.
From the docs:
@async_generator
async def mygenerator(websites):
for website in websites:
page = await http_fetch(website)
await yield_async(page)
async def fetch_pages():
websites = ('http://foo.bar', 'http://example.org')
async for sanitized_page in mygenerator(websites):
print(sanitized_page)
There is also the async_generator library which supports yield from
constructs.
You can't use yield inside coroutines. To implement your idea, only way I see is to implement Asynchronous Iterator. If I'm right, something like that:
class MapFilter:
def __init__(self, aiterable, p, func):
self.aiterable = aiterable
self.p = p
self.func = func
async def __aiter__(self):
return self
async def __anext__(self):
while True:
payload = await self.aiterable.__anext__() # StopAsyncIteration would be raise here on no new values
if self.p(payload):
return self.func(payload)
Let's test it. Here's complete example with helper arange
class (I took it from here):
import asyncio
class arange:
def __init__(self, n):
self.n = n
self.i = 0
async def __aiter__(self):
return self
async def __anext__(self):
i = self.i
self.i += 1
if self.i <= self.n:
await asyncio.sleep(0) # insert yield point
return i
else:
raise StopAsyncIteration
class MapFilter:
def __init__(self, aiterable, p, func):
self.aiterable = aiterable
self.p = p
self.func = func
async def __aiter__(self):
return self
async def __anext__(self):
while True:
payload = await self.aiterable.__anext__()
if self.p(payload):
return self.func(payload)
async def main():
aiterable = arange(5)
p = lambda x: bool(x>2)
func = lambda x: x*2
async for i in MapFilter(aiterable, p, func):
print(i)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
6
8
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With