My application reads data from a slow i/o source, does some processing and then writes it to a local file. I've implemented this with generators like so:
import time
def io_task(x):
print("requesting data for input %s" % x)
time.sleep(1) # this simulates a blocking I/O task
return 2*x
def producer(xs):
for x in xs:
yield io_task(x)
def consumer(xs):
with open('output.txt', 'w') as fp:
for x in xs:
print("writing %s" % x)
fp.write(str(x) + '\n')
data = [1,2,3,4,5]
consumer(producer(data))
Now I'd like to parallelize this task with the help of asyncio, but I can't seem to figure out how. The main issue for me is to directly feed data through a generator from the producer to the consumer while letting asyncio make multiple parallel requests to io_task(x)
. Also, this whole async def
vs. @asyncio.coroutine
thing is confusing me.
Can someone show me how to build a minimal working example that uses asyncio
from this sample code?
(Note: It is not ok to just make calls to io_task()
, buffer the results and then write them to a file. I need a solution that works on large data sets that can exceed the main memory, that's why I've been using generators so far. It is however safe to assume that the consumer is always faster than all producers combined)
Asynchronous generators Simply put these are generators written in asynchronous functions (instead of def function(..) they use async def function(..) So to convert next_delay function from previous example we just add async keyword before def.
It makes use of Python async features using asyncio/await provided in Python 3. The time and queue modules have been replaced with the asyncio package. This gives your program access to asynchronous friendly (non-blocking) sleep and queue functionality.
Actually, asyncio is much slower due to the high impact of using coroutines. I have no numbers, so this is just a comment, instead of a post, but you can verify this with a simple http echo server written in both styles. Python + high performance async IO do not work together, sadly.
Lock. Implements a mutex lock for asyncio tasks. Not thread-safe. An asyncio lock can be used to guarantee exclusive access to a shared resource.
Since python 3.6 and asynchronous generators, very few changes need be applied to make your code compatible with asyncio.
The io_task
function becomes a coroutine:
async def io_task(x):
await asyncio.sleep(1)
return 2*x
The producer
generator becomes an asynchronous generator:
async def producer(xs):
for x in xs:
yield await io_task(x)
The consumer
function becomes a coroutine and uses aiofiles, asynchronous context management and asynchronous iteration:
async def consumer(xs):
async with aiofiles.open('output.txt', 'w') as fp:
async for x in xs:
await fp.write(str(x) + '\n')
And the main coroutine runs in an event loop:
data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()
Also, you may consider using aiostream to pipeline some processing operations between the producer and the consumer.
EDIT: The different I/O tasks can easily be run concurrently on the producer side by using as_completed:
async def producer(xs):
coros = [io_task(x) for x in xs]
for future in asyncio.as_completed(coros):
yield await future
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With