I have some code that needs to run against several other systems that may hang or have problems not under my control. I would like to use python's multiprocessing to spawn child processes to run independent of the main program and then when they hang or have problems terminate them, but I am not sure of the best way to go about this.
When terminate is called it does kill the child process, but then it becomes a defunct zombie that is not released until the process object is gone. The example code below where the loop never ends works to kill it and allow a respawn when called again, but does not seem like a good way of going about this (ie multiprocessing.Process() would be better in the __init__()).
Anyone have a suggestion?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.
A process can be killed by calling the Process. kill() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.
To stop a script in Python, press Ctrl + C. If you are using Mac, press Ctrl + C. If you want to pause the process and put it in the background, press Ctrl + Z (at least on Linux). Then, if you want to kill it, run kill %n where “n” is the number you got next to “Stopped” when you pressed Ctrl + Z.
kill() method in Python is used to send specified signal to the process with specified process id.
You might run the child processes as daemons in the background.
process.daemon = True
Any errors and hangs (or an infinite loop) in a daemon process will not affect the main process, and it will only be terminated once the main process exits.
This will work for simple problems until you run into a lot of child daemon processes which will keep reaping memories from the parent process without any explicit control.
Best way is to set up a Queue
to have all the child processes communicate to the parent process so that we can join
them and clean up nicely. Here is some simple code that will check if a child processing is hanging (aka time.sleep(1000)
), and send a message to the queue for the main process to take action on it:
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print "working {0} ...".format(count)
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print "hanging..."
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print "[WATCHDOG]: Maybe WORKER is slacking"
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print "[MAIN]: starting process P1"
wdog.start()
# Poll the queue
while True:
msg = q.get()
if msg == "KILL WORKER":
print "[MAIN]: Terminating slacking WORKER"
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print "[MAIN]: WORKER is a goner"
workr.join(timeout=1.0)
print "[MAIN]: Joined WORKER successfully!"
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()
Without terminating worker
, attempt to join()
it to the main process would have blocked forever since worker
has never finished.
The way Python multiprocessing handles processes is a bit confusing.
From the multiprocessing guidelines:
Joining zombie processes
On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children() is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’s Process.is_alive will join the process. Even so it is probably good practice to explicitly join all the processes that you start.
In order to avoid a process to become a zombie, you need to call it's join()
method once you kill it.
If you want a simpler way to deal with the hanging calls in your system you can take a look at pebble.
(Not having enough reputation points to comment, hereby a full answer)
@PieOhPah: thank you for this very nice example.
Unfortunately there is just one little flaw that doesn't let the watchdog kill the worker:
if msg == "KILL WATCHDOG":
it should be:
if msg == "KILL WORKER":
So the code becomes (with print updated for python3):
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print ("working {0} ...".format(count))
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print ("hanging...")
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print ("[WATCHDOG]: Maybe WORKER is slacking")
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print ("[MAIN]: starting process P1")
wdog.start()
# Poll the queue
while True:
msg = q.get()
# if msg == "KILL WATCHDOG":
if msg == "KILL WORKER":
print ("[MAIN]: Terminating slacking WORKER")
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print ("[MAIN]: WORKER is a goner")
workr.join(timeout=1.0)
print ("[MAIN]: Joined WORKER successfully!")
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With