I have a problem with spawning asynchronous subprocesses with timeout in Python 3.
What I want to achieve: I want to spawn multiple processes asynchronously without waiting for a results but I want also be assured that every spawned process will end within given timeout.
I have found similar problems here: Using module 'subprocess' with timeout and Asynchronous background processes in Python? but they does not solve my issue.
My code looks like this. I have Command class as suggested in Using module 'subprocess' with timeout :
class Command(object):
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run(self, timeout):
def target():
print('Thread started')
args = shlex.split(self.cmd)
self.process = subprocess.Popen(args, shell=True)
self.process.communicate()
print('Thread finished')
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
print('Terminating process')
self.process.terminate()
thread.join()
and then when I want to spawn subprocesses:
for system in systems:
for service in to_spawn_system_info:
command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir,
service, system, service_log_dir)
command = Command(command_str)
command.run(timeout=60)
When I run this the output seems to wait for every command to spawn and end. I get
Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished
So my question is what I am doing wrong? Now I starting to wonder if it is possible to spawn a process and limit its execution by timeout.
Why I need this? The spawner script will run in cron. It will be executed every 10 minutes and it has to spawn about 20 subprocesses. I want to guarantee that every subprocess will end before the script will be run again from cron.
The timeout argument is passed to Popen. communicate() . If the timeout expires, the child process will be killed and waited for. The TimeoutExpired exception will be re-raised after the child process has terminated.
wait() method is asynchronous, whereas subprocess. Popen. wait() method is implemented as a blocking busy loop; the universal_newlines parameter is not supported.
subprocess. Process class is not thread safe. The Concurrency and multithreading in asyncio section.
To start a new process, or in other words, a new subprocess in Python, you need to use the Popen function call. It is possible to pass two parameters in the function call. The first parameter is the program you want to start, and the second is the file argument.
As mentioned previously, the call to process.communicate() is making your code wait for the completion of the subprocess. However, if you just remove the communicate() call, the thread will exit immediately after spawning the process, causing your thread.join() call to exit too soon, and you'll kill off the subprocess prematurely. To do what you want without polling or busy waiting, you can set a timer that will kill the process (and runner thread) after a timeout if the process has not yet finished:
class Command(object):
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run(self, timeout):
def target():
print('Thread started')
# May want/need to skip the shlex.split() when using shell=True
# See Popen() constructor docs on 'shell' argument for more detail.
args = shlex.split(self.cmd)
self.process = subprocess.Popen(args, shell=True)
self.timer.start()
self.process.wait()
self.timer.cancel()
def timer_callback():
print('Terminating process (timed out)')
self.process.terminate()
thread = threading.Thread(target=target)
self.timer = threading.Timer(timeout, timer_callback)
thread.start()
Use threads that start and end independently of one another. This method would be useful if you knew all the commands you wanted to run ahead of time. Here is an example...
from threading import Thread
import subprocess
import Queue
import multiprocessing
class Command(object):
def __init__(self, cmds):
self.cmds = cmds
def run_cmds(self):
cmd_queue = Queue.Queue()
for cmd in self.cmds:
cmd_queue.put(cmd)
available_threads = multiprocessing.cpu_count()
for x in range(0,available_threads):
t = Thread(target=self.run_cmd,args=(cmd_queue,))
t.setDaemon(True)
t.start()
cmd_queue.join()
def run_cmd(self, cmd_queue):
while True:
try: cmd = cmd_queue.get()
except: break
print 'Thread started'
process = subprocess.Popen(cmd, shell=True)
process.communicate()
print 'Thread finished'
cmd_queue.task_done()
# create list of commands you want to run
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop']
# create class
c = Command(cmds)
# run them...
c.run_cmds()
This would print....
Thread started
Thread started
Thread started
Thread startedThread finished
Thread started
Thread finishedThread finished
Thread finished
Thread finished
As you can see from the output the subprocesses start and end independently of one another and no subprocess waits for another subprocess to finish because they are all called in different threads. Naturally, you could add timeouts and whatever else you wanted to, this is just a simple example. This assumes you know all the commands you want to run. If you wanted to add a thread timeout, see epicbrews answer. You could incorporate his thread timeout example into this one if you wanted to.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With