Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing - Capturing signals to restart child processes or shut down parent process

I am using the multiprocessing library to spawn two child processes. I would like to ensure that as long as the parent process is alive, if the child processes die (receive a SIGKILL or SIGTERM), that they are restarted automatically. On the other hand, if the parent process receives a SIGTERM/SIGINT, I want it to terminate all child processes then exit.

This is how I approached the problem:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    if signum == 17:

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

    else:

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    multiprocessing.active_children()

If I send a SIGKILL to the counterProcess, it will restart correctly. However, sending a SIGKILL to the helloProcess also restarts the counterProcess instead of the helloProcess?

If I send a SIGTERM to the parent process, the parent will exit, but the child processes become orphans and continue on. How do I correct this behavior?

like image 990
Python Novice Avatar asked Nov 06 '16 19:11

Python Novice


2 Answers

There are several issues with the code, so I'm going to go over them in sequentailly.

If I send a SIGKILL to the counterProcess, it will restart correctly. However, sending a SIGKILL to the helloProcess also restarts the counterProcess instead of the helloProcess?

This peculiar behavior is most likely due to lack of blocking call in your main process, since multiprocessing.active_children() doesn't really act as one. I can't really explain the exact reason why program behaves the way it does, but adding blocking call in __main__ function, eg.

while True:
    time.sleep(1)

addresses the issue.

Another pretty serious issue is the way you pass objects into handler:

helloProcess = HelloWorld()
...
partial(signal_handler, helloProcess, counterProcess)

which is obsolate, considering you create new objects inside:

if not helloProcess.is_alive():
    print "Restarting helloProcess"

    helloProcess = HelloWorld()
    helloProcess.start()

Note that both objects use different aliases for HelloWorld() objects. The partial object is bound to alias in __main__ function, while the object in callback is bound to its local scope alias. Therefore, by assigning new object to local scope alias you don't really influence the object that callback is bound to (it is still bound to object created in __main__ scope).

You can fix it by rebinding your signal callback with new objects same way in callback scope:

def signal_handler(...):
    ...
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))
    ...

However, this leads to another trap, because now every child process will inherit the callback from parent and access it each time it receives signals. To fix it, you can temporarily set signal handlers to default right before creating child process:

for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
    signal(signame, SIG_DFL)

Finally, you may want to squelch any signal coming from your child processes before terminating them, otherwise they would trigger callback again:

signal(SIGCHLD, SIG_IGN)

Note that you ma want to redesign architecture of your application and utilise some of the features multiprocessing provides.

Final code:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
from functools import partial
import multiprocessing
#import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)

    def run(self):

        #setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)

    def run(self):

        #setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    print "current_process: ", multiprocessing.current_process()

    if signum == 17:

        # Since each new child inherits current signal handler,
        # temporarily set it to default before spawning new child.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, SIG_DFL)

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

        # After new children are spawned, revert to old signal handling policy.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, partial(signal_handler, helloProcess, counterProcess))


    else:

        # Ignore any signal that child communicates before quit   
        signal(SIGCHLD, SIG_IGN) 

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    while True:
        print multiprocessing.active_children()
        time.sleep(1)
like image 179
thorhunter Avatar answered Oct 22 '22 11:10

thorhunter


To recreate dead children from signal.SIGCHLD handler, the mother must call one of os.wait functions, because Process.is_alive doesn't work here.
Though possible, it's complicated, because signal.SIGCHLD is delivered to mother when one of it's children status changes f.e. signal.SIGSTOP, signal.SIGCONT or any other terminating signals are received by the child.
So the signal.SIGCHLD handler must differentiate between theses states of the child. Just merely recreating children when signal.SIGCHLD delivered may create more children than necessary.

The following code uses os.waitpid with os.WNOHANG to make it non-blocking and os.WUNTRACED and os.WCONTINUED for learning if signal.SIGCHLD is from signal.SIGSTOP or signal.SIGCONT.
os.waitpid doesn't work, i.e. returns (0, 0) if any of the Process instance is printed, i.e str(Process()) before you call os.waitpid.

import sys
import time
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
import multiprocessing
import os

class HelloWorld(multiprocessing.Process):
    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1

    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(signum, _):
    global helloProcess, counterProcess

    if signum == SIGCHLD:
        pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
        if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
            return
        if os.WIFSIGNALED(status) or os.WIFEXITED(status):
            if helloProcess.pid == pid:
                print("Restarting helloProcess")
                helloProcess = HelloWorld()
                helloProcess.start()

            elif counterProcess.pid == pid:
                print("Restarting counterProcess")
                counterProcess = Counter()
                counterProcess.start()

    else:
        # mother shouldn't be notified when it terminates children
        signal(SIGCHLD, SIG_DFL)
        if helloProcess.is_alive():
            print("Stopping helloProcess")
            helloProcess.terminate()

        if counterProcess.is_alive():
            print("Stopping counterProcess")
            counterProcess.terminate()

        sys.exit(0)

if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, signal_handler)

    while True:
        pause()

The following code recreates dead children without using signal.SIGCHLD. So it's simpler than the former.
Having created two children, mother process sets a signal handler named term_child for SIGINT, SIGTERM, SIGQUIT. term_child terminates and joins each child upon invocation.

The mother process keeps checking if children are alive, and recreates them if necessary in the while loop.

Because every child inherits signal handlers from mother, the SIGINT handler should be reset to its default value for Process.terminate to work

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT
import multiprocessing

class HelloWorld(multiprocessing.Process):    
    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1

    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1

def term_child(_, __):
    for child in children:
        child.terminate()
        child.join()
    sys.exit(0)

if __name__ == '__main__':

    children = [HelloWorld(), Counter()]
    for child in children:
        child.start()

    for signame in (SIGINT, SIGTERM, SIGQUIT):
        signal(signame, term_child)

    while True:
        for i, child in enumerate(children):
            if not child.is_alive():
                children[i] = type(child)()
                children[i].start()
        time.sleep(1)
like image 38
Nizam Mohamed Avatar answered Oct 22 '22 10:10

Nizam Mohamed