Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining asyncio with a multi-worker ProcessPoolExecutor and for async

My question is very similar to Combining asyncio with a multi-worker ProcessPoolExecutor - however a slight change (I believe it's the async for) makes the excellent answers there unusuable for me.

I am trying the following MWE:

import concurrent.futures
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def non_blocking(loop):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        async for i in mygen():
            print('MASTER: Sending to executor blocking task number ' + str(i+1))
            result = await loop.run_in_executor(executor, blocking, i)
            print(result)
            print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))

The output from this, as expected, is not asynchronous:

MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2 
EXECUTOR: Completed blocking task number 2 
MASTER: Well done executor - you seem to have completed blocking task number 2

I would like to adjust the code so that the tasks are running in two concurrent processes and printing the output as it becomes available. Desired output is:

MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2

I understand from Combining asyncio with a multi-worker ProcessPoolExecutor that, as things stand, my syntax of await loop.run_in_executor() is blocking. I don't know how to replace it in a way that allows the async for to move to the next generated value while waiting for the executor to finish their work. Note I am not using asyncio.gather as in their example.

like image 702
adrug Avatar asked May 05 '19 15:05

adrug


People also ask

Can I use Asyncio with multiprocessing?

asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes.

Is Asyncio asynchronous?

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

Does Asyncio run concurrently?

The intended use of asyncio tasks is to allow independently running tasks to run 'concurrently' with other tasks within the same event loop.

Is Asyncio a concurrency?

Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop.


1 Answers

If you want to have a maximum of two processes running your tasks, the simplest way to achieve that is to create the executor with max_workers=2. Then you can submit tasks as fast as possible, i.e. proceed with the next iteration of async for without waiting for the previous task to finish. You can gather the results of all tasks at the end, to ensure the exceptions don't go unnoticed (and possibly to get the actual results).

The following code produces the expected output:

from concurrent.futures import ProcessPoolExecutor
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def run_blocking(executor, task_no, delay):
    print('MASTER: Sending to executor blocking task number '
          + str(task_no))
    result = await loop.run_in_executor(executor, blocking, delay)
    print(result)
    print('MASTER: Well done executor - you seem to have completed '
          'blocking task number ' + str(task_no))

async def non_blocking(loop):
    tasks = []
    with ProcessPoolExecutor(max_workers=2) as executor:
        async for i in mygen():
            # spawn the task and let it run in the background
            tasks.append(asyncio.create_task(
                run_blocking(executor, i + 1, i)))
        # if there was an exception, retrieve it now
        await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
like image 130
user4815162342 Avatar answered Nov 01 '22 14:11

user4815162342