Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass an Asyncio coroutine to a multiprocessing process?

I have developed an application that works well in "asynchronous mode" and performs calculations. When the calculations meet certain criteria, a function of a "Telegram bot" is called to send the results to the bot, while the calculations continue because they are looped infinitely.

It works fine, but it's too slow.

Either my knowledge in asyncio is insufficient, or the functionality of this library is not capable of what I need to accomplish. So, I started to think about how to speed up the process and came to the conclusion that I need to run the main chain of functions that performs the calculations in multiple processes.

However, I faced an issue where I cannot get asynchronous code to work inside the processes, no matter how hard I tried. Moreover, I need to add separate asynchronous work for the Telegram bot somewhere in my application. I have tried using multiprocessing and aioprocessing, but nothing seems to work.

async def main():
    try:
        pairs = [[..., ...], [..., ...], [..., ...], [..., ...], [..., ...]]
        tasks_run = [func(pair) for pair in pairs]
        await asyncio.gather(*tasks_run)
    except Exception:
        logger.exception('main')


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [loop.create_task(dp.start_polling()), loop.create_task(main())]
    try:
        loop.run_until_complete(asyncio.gather(*tasks))
    except KeyboardInterrupt:
        pass
    finally:
        for task in tasks:
            task.cancel()
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

I need to create a separate process for each pair and run the coroutine func(pair) in it. Is it possible to use multiprocessing and asyncio together? If yes, how can I do that? If not, how can I solve my problem differently?

By the way, in if __name__ == '__main__', two tasks are created. One runs a Telegram bot, and the other runs the calculation chain. And something tells me that adding process execution will somehow break the interaction between the Telegram bot and the calculations themselves.

Please don't judge me too harshly, I've only been programming for six months.

like image 627
xScr3amox Avatar asked Sep 20 '25 08:09

xScr3amox


1 Answers

If you need to run CPU-intensive computations in parallel inside an AsyncIO run loop, you can use asyncio.run_in_executor() together with a process pool:

import asyncio
from time import sleep, perf_counter
from concurrent.futures import ProcessPoolExecutor


def heavy_work(i: int) -> int:
    """CPU intensive function"""
    sleep(1.0)
    return i


async def main():
    start = perf_counter()
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor() as pool:
        r1, r2 = await asyncio.gather(
            loop.run_in_executor(pool, heavy_work, 1),
            loop.run_in_executor(pool, heavy_work, 2),
        )

        print(r1, r2)

    elapsed = perf_counter() - start
    print(f"Time: {elapsed:.2f} s")


if __name__ == "__main__":
    asyncio.run(main())

On a two-core computer you'll see that the elapsed time is approximately one second instead of the two seconds it would have taken if run serially.

Alternatively, if you need to offload IO-bound operations or functions that release the GIL, you can une a ThreadPoolExecutor which will perform better and will avoid pickle issues.

As a final note, you may have noticed that the function heavy_work is not async. This is intended since async functions should not contain any CPU-intensive work since this will block the event loop. If you want to make such a function async, then you'll have to offload the CPU-intensive part to a process pool. Here is an example using a global shared process pool:

SHARED_PROCESS_POOL = ProcessPoolExecutor()

async def heavy_work(i: int) -> int:
    loop = asyncio.get_event_loop()
    
    # Isolated CPU-intensive work
    await loop.run_in_executor(SHARED_PROCESS_POOL, sleep, 1.0)

    return i

The same thing using a thread pool is much simple since you can use the dedicated asyncio.to_thread function.

like image 84
Louis Lac Avatar answered Sep 22 '25 13:09

Louis Lac