Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Robust way to manage and kill any process

I am writing code to run experiments in parallel. I don't have control over what the experiments do, they might open use subprocess.Popen or check_output to run one or multiple additional child processes.

I have two conditions: I want to be able to kill experiments that exceed a time out and I want to kill experiments upon KeyboardInterrupt.

Most ways to terminate processes don't make sure that all subprocesses etc are killed. This is obviously a problem if 100s of experiments are run one after the other but they all spawn child processes that stay alive after the timeout occurred and the experiment was supposedly killed.

The way I am dealing with this now it to include code to store experiment configurations in a database, generating code that loads and runs experiments from command line and then calling these commands via subprocess.Popen(cmd, shell=True, start_new_session=True) and killing them using os.killpg on timeout.

My main question then is: Calling these experiments via command line feels cumbersome, so is there a way to call code directly via multiprocessing.Process(target=fn) and achieving the same effect of start_new_session=True + os.killpg upon timeout and KeyboardInterrupt?

<file1>
def run_exp(config):
    do work
    return result

if __name__ == "__main__":
    save_exp(run_exp(load_config(sys.args)))

<file2>
def monitor(queue):
    active = set()  # active process ids
    while True:
        msg = queue.get()
        if msg == "sentinel":
             <loop over active ids and kill them with os.killpg>
        else:
            <add or remove id from active set>


def worker(args):
    id, queue = args
    command = f"python <file1> {id}"
    with subprocess.Popen(command, shell=True, ..., start_new_session=True) as process:
        try:
            queue.put(f"start {process.pid}")
            process.communicate(timeout=timeout)
        except TimeoutExpired:
            os.killpg(process.pid, signal.SIGINT)  # send signal to the process group
            process.communicate()
        finally:
            queue.put(f"done {process.pid}")

def main():
    <save configs => c_ids>
    queue = manager.Queue()
    process = Process(target=monitor, args=(queue,))
    process.start()

    def clean_exit():
        queue.put("sentinel")
        <terminate pool and monitor process>

    r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
    atexit.register(clean_exit)
    r.wait()
    <terminate pool and monitor process>

I posted a skeleton of the code that details the approach of starting processes via command line and killing them. An additional complication of that version of my approach is that when the KeyboardInterrupt arrives, the queue already gets terminated (for a lack of a better word) and communicating with the monitor process is impossible (the sentinel message never arrives). Instead I have to resort to writing process ids to file and reading the file back to in the main process to kill the still running processes. If you know a way to work around this queue-issue I'd be eager to learn about it.

like image 397
Samuel Avatar asked Feb 04 '19 18:02

Samuel


People also ask

How do I kill multiple processes at once?

killall Command – kill the processes by name. By default, it will send a TERM signal. The killall command can kill multiple processes with a single command. If more than one process runs with that name, all of them will be killed.

How can I kill all descendants of a process?

If it is a process group you want to kill, just use the kill(1) command but instead of giving it a process number, give it the negation of the group number. For example to kill every process in group 5112, use kill -TERM -- -5112 .


2 Answers

I think the problem is you are storing Subprocess pid to kill it you need host process pid, and you used signal.SIGINT which I think should be signal.SIGTERM. try this, instead of this line:

os.killpg(process.pid, signal.SIGINT)

use this line:

os.killpg(os.getpgid(process.pid), signal.SIGTERM) 
like image 167
Siyavash vaez afshar Avatar answered Oct 19 '22 22:10

Siyavash vaez afshar


I guess there is one way to avoid this is using Try catch block.
Say if the KeyboardInterrupt arrives in main() then you could try this:

def main():
    try:
        <save configs => c_ids>
        queue = manager.Queue()
        process = Process(target=monitor, args=(queue,))
        process.start()

        def clean_exit():
            queue.put("sentinel")
            <terminate pool and monitor process>

        r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
        atexit.register(clean_exit)
        r.wait()
        <terminate pool and monitor process>
    except KeyboardInterrupt as e:
        pass
        #write the process you want to keep continuing. 

Guess this will be helpful.

like image 23
Amazing Things Around You Avatar answered Oct 19 '22 23:10

Amazing Things Around You