I am writing code to run experiments in parallel. I don't have control over what the experiments do, they might open use subprocess.Popen
or check_output
to run one or multiple additional child processes.
I have two conditions: I want to be able to kill experiments that exceed a time out and I want to kill experiments upon KeyboardInterrupt
.
Most ways to terminate processes don't make sure that all subprocesses etc are killed. This is obviously a problem if 100s of experiments are run one after the other but they all spawn child processes that stay alive after the timeout occurred and the experiment was supposedly killed.
The way I am dealing with this now it to include code to store experiment configurations in a database, generating code that loads and runs experiments from command line and then calling these commands via subprocess.Popen(cmd, shell=True, start_new_session=True)
and killing them using os.killpg
on timeout.
My main question then is: Calling these experiments via command line feels cumbersome, so is there a way to call code directly via multiprocessing.Process(target=fn)
and achieving the same effect of start_new_session=True
+ os.killpg
upon timeout and KeyboardInterrupt
?
<file1>
def run_exp(config):
do work
return result
if __name__ == "__main__":
save_exp(run_exp(load_config(sys.args)))
<file2>
def monitor(queue):
active = set() # active process ids
while True:
msg = queue.get()
if msg == "sentinel":
<loop over active ids and kill them with os.killpg>
else:
<add or remove id from active set>
def worker(args):
id, queue = args
command = f"python <file1> {id}"
with subprocess.Popen(command, shell=True, ..., start_new_session=True) as process:
try:
queue.put(f"start {process.pid}")
process.communicate(timeout=timeout)
except TimeoutExpired:
os.killpg(process.pid, signal.SIGINT) # send signal to the process group
process.communicate()
finally:
queue.put(f"done {process.pid}")
def main():
<save configs => c_ids>
queue = manager.Queue()
process = Process(target=monitor, args=(queue,))
process.start()
def clean_exit():
queue.put("sentinel")
<terminate pool and monitor process>
r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
atexit.register(clean_exit)
r.wait()
<terminate pool and monitor process>
I posted a skeleton of the code that details the approach of starting processes via command line and killing them. An additional complication of that version of my approach is that when the KeyboardInterrupt
arrives, the queue already gets terminated (for a lack of a better word) and communicating with the monitor process is impossible (the sentinel message never arrives). Instead I have to resort to writing process ids to file and reading the file back to in the main process to kill the still running processes. If you know a way to work around this queue-issue I'd be eager to learn about it.
killall Command – kill the processes by name. By default, it will send a TERM signal. The killall command can kill multiple processes with a single command. If more than one process runs with that name, all of them will be killed.
If it is a process group you want to kill, just use the kill(1) command but instead of giving it a process number, give it the negation of the group number. For example to kill every process in group 5112, use kill -TERM -- -5112 .
I think the problem is you are storing Subprocess pid to kill it you need host process pid, and you used signal.SIGINT
which I think should be signal.SIGTERM
. try this, instead of this line:
os.killpg(process.pid, signal.SIGINT)
use this line:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
I guess there is one way to avoid this is using Try catch block.
Say if the KeyboardInterrupt
arrives in main()
then you could try this:
def main():
try:
<save configs => c_ids>
queue = manager.Queue()
process = Process(target=monitor, args=(queue,))
process.start()
def clean_exit():
queue.put("sentinel")
<terminate pool and monitor process>
r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
atexit.register(clean_exit)
r.wait()
<terminate pool and monitor process>
except KeyboardInterrupt as e:
pass
#write the process you want to keep continuing.
Guess this will be helpful.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With