Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clean Python multiprocess termination dependant on an exit flag

I am attempting to create a program using multiple processes and I would like to cleanly terminate all the spawned processes if errors occur. below I've wrote out some pseudo type code for what I think I need to do but I don't know what the best way is to communicate to all the processes that an error has occured and they should terminate.

I think I should be using classes for this sort of thing but I'm quite new to Python so I'm just trying to get my head around the basics first.

#imports

exitFlag = True

# Function for threads to process   
def url_thread_worker( ):   
# while exitFlag:
    try:
        # do something
    except:
        # we've ran into a problem, we need to kill all the spawned processes and cleanly exit the program
        exitFlag = False

def processStarter( ):

    process_1 = multiprocessing.Process( name="Process-1", target=url_thread_worker, args=( ) ) 
    process_2 = multiprocessing.Process( name="Process-2", target=url_thread_worker, args=( ) ) 

    process_1.start()
    process_2.start()


if __name__ == '__main__':
     processStarter( )

Thanks in advance

like image 645
Daniel Pilch Avatar asked Nov 14 '13 15:11

Daniel Pilch


2 Answers

Here's my suggestion:

import multiprocessing
import threading
import time

def good_worker():   
    print "[GoodWorker] Starting"
    time.sleep(4)
    print "[GoodWorker] all good"

def bad_worker():
    print "[BadWorker] Starting"
    time.sleep(2)
    raise Exception("ups!")

class MyProcManager(object):
    def __init__(self):
        self.procs = []
        self.errors_flag = False
        self._threads = []
        self._lock = threading.Lock()

    def terminate_all(self):
        with self._lock:
            for p in self.procs:
                if p.is_alive():
                    print "Terminating %s" % p
                    p.terminate()

    def launch_proc(self, func, args=(), kwargs= {}):
        t = threading.Thread(target=self._proc_thread_runner,
                             args=(func, args, kwargs))
        self._threads.append(t)
        t.start()

    def _proc_thread_runner(self, func, args, kwargs):
        p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
        self.procs.append(p)
        p.start()
        while p.exitcode is None:
            p.join()
        if p.exitcode > 0:
            self.errors_flag = True
            self.terminate_all()

    def wait(self):
        for t in self._threads:
            t.join()

if __name__ == '__main__':
    proc_manager = MyProcManager()
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(bad_worker) 
    proc_manager.wait()
    if proc_manager.errors_flag:
        print "Errors flag is set: some process crashed"
    else:
        print "Everything closed cleanly"

You need to have a wrapper thread for each process run, that waits for its end. When a process ends, check for the exitcode: if > 0, means it raised some unhandled exception. Now call terminate_all() to close all remaining active processes. The wrapper threads will also finish as they are dependent on the process run.

Also, in your code you're completely free to call proc_manager.terminate_all() whenever you want. You can be checking for some flags in a different thread or something like that..

Hope it's good for your case.

PS: btw.. in your original code you did something like an global exit_flag: you can never have a "global" exit_flag in multiprocessing because it simply ain't global as you are using separated processes with separated memory spaces. That only works in threaded environments where state can be shared. If you need it in multiprocessing then you must have explicit communication between processes (Pipe and Queue accomplish that) or something like shared memory objects

like image 174
abranches Avatar answered Oct 10 '22 13:10

abranches


If you want your child processes to be terminated automatically when the parent process exits; you could make them daemonic (set .daemon=True before .start()) i.e., if the parent detects an error; it may just quit -- the children will be taken care of.

If you want children to cleanup after themselves; you could use multiprocessing.Event() as a global flag:

import multiprocessing

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

if __name__ == '__main__':
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()
like image 37
jfs Avatar answered Oct 10 '22 13:10

jfs