Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Combining asyncio with a multi-worker ProcessPoolExecutor

Is it possible to take a blocking function such as work and have it run concurrently in a ProcessPoolExecutor that has more than one worker?

import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor

num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()

def work():

async def producer():
    for i in range(num_jobs):
        results = await loop.run_in_executor(executor, work)
        await queue.put(results)

async def consumer():
    completed = 0
    while completed < num_jobs:
        job = await queue.get()
        completed += 1

s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)

Running the above on a machine with more than 4 cores takes ~4 seconds. How would you write producer such that the above example takes only ~1 second?

like image 632
Chris Seymour Avatar asked Jun 30 '18 17:06

Chris Seymour

People also ask

Does Asyncio use multiprocessing?

asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes. Using this, we can get the benefits of the asyncio library even when using CPU-bound code.

Does Asyncio use multiple cores?

However, to my understanding, Async/IO means the server can only run on one processing core. Regular, synchronous servers like uwsgi , on the other hand, can fully utilize the computer's computing resources with truly parallel threads and processes.

Which is used to write concurrently in Asyncio?

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

2 Answers

The problem is in the producer. Instead of allowing the jobs to run in the background, it waits for each job to finish, thus serializing them. If you rewrite producer to look like this (and leave consumer unchanged), you get the expected 1s duration:

async def producer():
    for i in range(num_jobs):
        fut = loop.run_in_executor(executor, work)
        fut.add_done_callback(lambda f: queue.put_nowait(f.result()))
like image 106
user4815162342 Avatar answered Oct 19 '22 13:10


await loop.run_in_executor(executor, work) blocks the loop until work completes, as a result you only have one function running at a time.

To run jobs concurrently, you could use asyncio.as_completed:

async def producer():
    tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
    for f in asyncio.as_completed(tasks, loop=loop):
        results = await f
        await queue.put(results)
like image 13
vaultah Avatar answered Oct 19 '22 12:10
