My question is very similar to Combining asyncio with a multi-worker ProcessPoolExecutor - however a slight change (I believe it's the async for
) makes the excellent answers there unusuable for me.
I am trying the following MWE:
import concurrent.futures
import asyncio
import time
async def mygen(u: int = 2):
i = 0
while i < u:
yield i
i += 1
def blocking(delay):
time.sleep(delay+1)
return('EXECUTOR: Completed blocking task number ' + str(delay+1))
async def non_blocking(loop):
with concurrent.futures.ProcessPoolExecutor() as executor:
async for i in mygen():
print('MASTER: Sending to executor blocking task number ' + str(i+1))
result = await loop.run_in_executor(executor, blocking, i)
print(result)
print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))
loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
The output from this, as expected, is not asynchronous:
MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
I would like to adjust the code so that the tasks are running in two concurrent processes and printing the output as it becomes available. Desired output is:
MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
I understand from Combining asyncio with a multi-worker ProcessPoolExecutor that, as things stand, my syntax of await loop.run_in_executor()
is blocking. I don't know how to replace it in a way that allows the async for
to move to the next generated value while waiting for the executor to finish their work. Note I am not using asyncio.gather
as in their example.
asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes.
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
The intended use of asyncio tasks is to allow independently running tasks to run 'concurrently' with other tasks within the same event loop.
Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop.
If you want to have a maximum of two processes running your tasks, the simplest way to achieve that is to create the executor with max_workers=2
. Then you can submit tasks as fast as possible, i.e. proceed with the next iteration of async for
without waiting for the previous task to finish. You can gather the results of all tasks at the end, to ensure the exceptions don't go unnoticed (and possibly to get the actual results).
The following code produces the expected output:
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
async def mygen(u: int = 2):
i = 0
while i < u:
yield i
i += 1
def blocking(delay):
time.sleep(delay+1)
return('EXECUTOR: Completed blocking task number ' + str(delay+1))
async def run_blocking(executor, task_no, delay):
print('MASTER: Sending to executor blocking task number '
+ str(task_no))
result = await loop.run_in_executor(executor, blocking, delay)
print(result)
print('MASTER: Well done executor - you seem to have completed '
'blocking task number ' + str(task_no))
async def non_blocking(loop):
tasks = []
with ProcessPoolExecutor(max_workers=2) as executor:
async for i in mygen():
# spawn the task and let it run in the background
tasks.append(asyncio.create_task(
run_blocking(executor, i + 1, i)))
# if there was an exception, retrieve it now
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With