Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize generators with asyncio

My application reads data from a slow i/o source, does some processing and then writes it to a local file. I've implemented this with generators like so:

import time

def io_task(x):
    print("requesting data for input %s" % x)
    time.sleep(1)   # this simulates a blocking I/O task
    return 2*x

def producer(xs):
    for x in xs:
        yield io_task(x)

def consumer(xs):
    with open('output.txt', 'w') as fp:
        for x in xs:
            print("writing %s" % x)
            fp.write(str(x) + '\n')

data = [1,2,3,4,5]
consumer(producer(data))

Now I'd like to parallelize this task with the help of asyncio, but I can't seem to figure out how. The main issue for me is to directly feed data through a generator from the producer to the consumer while letting asyncio make multiple parallel requests to io_task(x). Also, this whole async def vs. @asyncio.coroutine thing is confusing me.

Can someone show me how to build a minimal working example that uses asyncio from this sample code?

(Note: It is not ok to just make calls to io_task(), buffer the results and then write them to a file. I need a solution that works on large data sets that can exceed the main memory, that's why I've been using generators so far. It is however safe to assume that the consumer is always faster than all producers combined)

like image 733
klamann Avatar asked Oct 17 '17 11:10

klamann


People also ask

What is async generator in Python?

Asynchronous generators Simply put these are generators written in asynchronous functions (instead of def function(..) they use async def function(..) So to convert next_delay function from previous example we just add async keyword before def.

Does Python support asynchronous programming?

It makes use of Python async features using asyncio/await provided in Python 3. The time and queue modules have been replaced with the asyncio package. This gives your program access to asynchronous friendly (non-blocking) sleep and queue functionality.

Is Asyncio slow?

Actually, asyncio is much slower due to the high impact of using coroutines. I have no numbers, so this is just a comment, instead of a post, but you can verify this with a simple http echo server written in both styles. Python + high performance async IO do not work together, sadly.

Is Asyncio thread safe?

Lock. Implements a mutex lock for asyncio tasks. Not thread-safe. An asyncio lock can be used to guarantee exclusive access to a shared resource.


1 Answers

Since python 3.6 and asynchronous generators, very few changes need be applied to make your code compatible with asyncio.

The io_task function becomes a coroutine:

async def io_task(x):
    await asyncio.sleep(1)
    return 2*x

The producer generator becomes an asynchronous generator:

async def producer(xs):
    for x in xs:
        yield await io_task(x)

The consumer function becomes a coroutine and uses aiofiles, asynchronous context management and asynchronous iteration:

async def consumer(xs):
    async with aiofiles.open('output.txt', 'w') as fp:
        async for x in xs:
            await fp.write(str(x) + '\n')

And the main coroutine runs in an event loop:

data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()

Also, you may consider using aiostream to pipeline some processing operations between the producer and the consumer.


EDIT: The different I/O tasks can easily be run concurrently on the producer side by using as_completed:

async def producer(xs):
    coros = [io_task(x) for x in xs]
    for future in asyncio.as_completed(coros):
        yield await future
like image 86
Vincent Avatar answered Oct 14 '22 12:10

Vincent