Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly use concurrent.futures with asyncio

I am prototyping a FastAPI app with an endpoint that will launch long-running process using subprocess module. The obvious solution is to use concurrent.futures and ProcessPoolExecutor, however I am unable to get the behavior I want. Code sample:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random

pool = ProcessPoolExecutor(5)

def long_task(s):
    print("started")
    time.sleep(random.randrange(5, 15))
    sb.check_output(["touch", str(s)])
    print("done")

async def async_task():
    loop = asyncio.get_event_loop()
    print("started")
    tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
    while True:
        print("in async task")
        done, _ = await asyncio.wait(tasks, timeout=1)
        for task in done:
            await task
        await asyncio.sleep(1)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_task())


if __name__ == "__main__":
    main()

This sample works fine, on the surface, but spawned processes do not get stopped after execution completes - I see all of python processes in ps aux | grep python. Shouldn't awaiting completed task stop it? In the end I do not care much about the result of the execution, it just should happen in the background and exit cleanly - without any hanging processes.

like image 990
Timofey Bakunin Avatar asked Nov 05 '25 06:11

Timofey Bakunin


1 Answers

You must close the ProcessPool when you are done using it, either by explicitly calling its shutdown() method, or using it in a ContextManager. I used the ContextManager approach.

I don't know what subprocess.check_output does, so I commented it out.

I also replaced your infinite loop with a single call to asyncio.gather, which will yield until the Executor is finished.

I'm on Windows, so to observe the creation/deletion of Processes I watched the Windows Task Manager. The program creates 5 subprocesses and closes them again when the ProcessPool context manager exits.

import asyncio
from concurrent.futures import ProcessPoolExecutor
# import subprocess as sb
import time
import random

def long_task(s):
    print("started")
    time.sleep(random.randrange(5, 15))
    # sb.check_output(["touch", str(s)])
    print("done", s)

async def async_task():
    loop = asyncio.get_event_loop()
    print("started")
    with ProcessPoolExecutor(5) as pool:
        tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
        await asyncio.gather(*tasks)
    print("Completely done")

def main():
    asyncio.run(async_task())

if __name__ == "__main__":
    main()
like image 69
Paul Cornelius Avatar answered Nov 06 '25 21:11

Paul Cornelius



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!