Is it possible to take a blocking function such as work
and have it run concurrently in a ProcessPoolExecutor
that has more than one worker?
import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()
def work():
sleep(1)
async def producer():
for i in range(num_jobs):
results = await loop.run_in_executor(executor, work)
await queue.put(results)
async def consumer():
completed = 0
while completed < num_jobs:
job = await queue.get()
completed += 1
s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)
Running the above on a machine with more than 4 cores takes ~4 seconds. How would you write producer
such that the above example takes only ~1 second?
asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes. Using this, we can get the benefits of the asyncio library even when using CPU-bound code.
However, to my understanding, Async/IO means the server can only run on one processing core. Regular, synchronous servers like uwsgi , on the other hand, can fully utilize the computer's computing resources with truly parallel threads and processes.
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
The problem is in the producer
. Instead of allowing the jobs to run in the background, it waits for each job to finish, thus serializing them. If you rewrite producer
to look like this (and leave consumer
unchanged), you get the expected 1s duration:
async def producer():
for i in range(num_jobs):
fut = loop.run_in_executor(executor, work)
fut.add_done_callback(lambda f: queue.put_nowait(f.result()))
await loop.run_in_executor(executor, work)
blocks the loop until work
completes, as a result you only have one function running at a time.
To run jobs concurrently, you could use asyncio.as_completed
:
async def producer():
tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
for f in asyncio.as_completed(tasks, loop=loop):
results = await f
await queue.put(results)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With