Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using asyncio to wait for results from subprocess

My Python script contains a loop that uses subprocess to run commands outside the script. Each subprocess is independent. I listen for the returned message in case there's an error; I can't ignore the result of the subprocess. Here's the script without asyncio (I've replaced my computationally expensive call with sleep):

from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess

def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    for index in range(val):
        print("index = "+str(index))
        go_do_something(index)

# run the script
my_long_func(3) # launch three tasks

I think I could use asyncio to speed up this activity since the Python script is waiting on the external subprocess to complete. I think threading or multiprocessing are not necessary, though they could also result in faster execution. Using a task queue (e.g., Celery) is another option.

I tried implementing the asyncio approach, but am missing something since the following attempt doesn't change the overall execution time:

import asyncio
from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess


async def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    # https://docs.python.org/3/library/asyncio-eventloop.html
    loop = asyncio.get_event_loop()
    tasks = []
    for index in range(val):
        task = go_do_something(index)
        tasks.append(task)
    # https://docs.python.org/3/library/asyncio-task.html
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)
    loop.close()
    return

my_long_func(3) # launch three tasks

If I want to monitor the output of each subprocess but not wait while each subprocess runs, can I benefit from asyncio? Or does this situation require something like multiprocessing or Celery?

like image 656
Ben Avatar asked Mar 11 '26 18:03

Ben


2 Answers

Try executing the commands using asyncio instead of subprocess.

Define a run() function:

import asyncio

async def run(cmd: str):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stderr=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE
    )

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

Then you may call it as you would call any async function:

asyncio.run(run('sleep 2'))

#=>

['sleep 2' exited with 0]

The example was taken from the official documentation. Also available here.

like image 126
ronginat Avatar answered Mar 16 '26 05:03

ronginat


@ronginat pointed me to https://asyncio.readthedocs.io/en/latest/subprocess.html which I was able to adapt to the situation I am seeking:

import asyncio

async def run_command(*args):
    # Create subprocess
    process = await asyncio.create_subprocess_exec(
        *args,
        # stdout must a pipe to be accessible as process.stdout
        stdout=asyncio.subprocess.PIPE)
    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()
    # Return stdout
    return stdout.decode().strip()

async def go_do_something(index: int) -> None:
    print('index=',index)
    res = await run_command('sleep','2')
    return res

def my_long_func(val: int) -> None:
    task_list = []
    for indx in range(val):
        task_list.append( go_do_something(indx) )
    loop = asyncio.get_event_loop()
    commands = asyncio.gather(*task_list)
    reslt = loop.run_until_complete(commands)
    print(reslt)
    loop.close()

my_long_func(3) # launch three tasks

The total time of execution is just over 2 seconds even though there are three sleeps of duration 2 seconds. And I get the stdout from each subprocess.

like image 29
Ben Avatar answered Mar 16 '26 05:03

Ben



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!