My Python script contains a loop that uses subprocess to run commands outside the script. Each subprocess is independent. I listen for the returned message in case there's an error; I can't ignore the result of the subprocess. Here's the script without asyncio (I've replaced my computationally expensive call with sleep):
from subprocess import PIPE # https://docs.python.org/3/library/subprocess.html
import subprocess
def go_do_something(index: int) -> None:
"""
This function takes a long time
Nothing is returned
Each instance is independent
"""
process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
if "error" in stderr:
print("error for "+str(index))
return
def my_long_func(val: int) -> None:
"""
This function contains a loop
Each iteration of the loop calls a function
Nothing is returned
"""
for index in range(val):
print("index = "+str(index))
go_do_something(index)
# run the script
my_long_func(3) # launch three tasks
I think I could use asyncio to speed up this activity since the Python script is waiting on the external subprocess to complete. I think threading or multiprocessing are not necessary, though they could also result in faster execution. Using a task queue (e.g., Celery) is another option.
I tried implementing the asyncio approach, but am missing something since the following attempt doesn't change the overall execution time:
import asyncio
from subprocess import PIPE # https://docs.python.org/3/library/subprocess.html
import subprocess
async def go_do_something(index: int) -> None:
"""
This function takes a long time
Nothing is returned
Each instance is independent
"""
process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
if "error" in stderr:
print("error for "+str(index))
return
def my_long_func(val: int) -> None:
"""
This function contains a loop
Each iteration of the loop calls a function
Nothing is returned
"""
# https://docs.python.org/3/library/asyncio-eventloop.html
loop = asyncio.get_event_loop()
tasks = []
for index in range(val):
task = go_do_something(index)
tasks.append(task)
# https://docs.python.org/3/library/asyncio-task.html
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)
loop.close()
return
my_long_func(3) # launch three tasks
If I want to monitor the output of each subprocess but not wait while each subprocess runs, can I benefit from asyncio? Or does this situation require something like multiprocessing or Celery?
Try executing the commands using asyncio instead of subprocess.
Define a run() function:
import asyncio
async def run(cmd: str):
proc = await asyncio.create_subprocess_shell(
cmd,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
Then you may call it as you would call any async function:
asyncio.run(run('sleep 2'))
#=>
['sleep 2' exited with 0]
The example was taken from the official documentation. Also available here.
@ronginat pointed me to https://asyncio.readthedocs.io/en/latest/subprocess.html which I was able to adapt to the situation I am seeking:
import asyncio
async def run_command(*args):
# Create subprocess
process = await asyncio.create_subprocess_exec(
*args,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return stdout
return stdout.decode().strip()
async def go_do_something(index: int) -> None:
print('index=',index)
res = await run_command('sleep','2')
return res
def my_long_func(val: int) -> None:
task_list = []
for indx in range(val):
task_list.append( go_do_something(indx) )
loop = asyncio.get_event_loop()
commands = asyncio.gather(*task_list)
reslt = loop.run_until_complete(commands)
print(reslt)
loop.close()
my_long_func(3) # launch three tasks
The total time of execution is just over 2 seconds even though there are three sleeps of duration 2 seconds. And I get the stdout from each subprocess.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With