Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Signal handling in multi-threaded Python

This should be very simple and I'm very surprised that I haven't been able to find this questions answered already on stackoverflow.

I have a daemon like program that needs to respond to the SIGTERM and SIGINT signals in order to work well with upstart. I read that the best way to do this is to run the main loop of the program in a separate thread from the main thread and let the main thread handle the signals. Then when a signal is received the signal handler should tell the main loop to exit by setting a sentinel flag that is routinely being checked in the main loop.

I've tried doing this but it is not working the way I expected. See the code below:

from threading import Thread import signal import time import sys  stop_requested = False      def sig_handler(signum, frame):     sys.stdout.write("handling signal: %s\n" % signum)     sys.stdout.flush()      global stop_requested     stop_requested = True      def run():     sys.stdout.write("run started\n")     sys.stdout.flush()     while not stop_requested:         time.sleep(2)      sys.stdout.write("run exited\n")     sys.stdout.flush()  signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler)  t = Thread(target=run) t.start() t.join() sys.stdout.write("join completed\n") sys.stdout.flush() 

I tested this in the following two ways:

1)

$ python main.py > output.txt& [2] 3204 $ kill -15 3204 

2)

$ python main.py ctrl+c 

In both cases I expect this written to the output:

run started handling signal: 15 run exited join completed 

In the first case the program exits but all I see is:

run started 

In the second case the SIGTERM signal is seemingly ignored when ctrl+c is pressed and the program doesn't exit.

What am I missing here?

like image 209
stuckintheshuck Avatar asked Sep 05 '14 00:09

stuckintheshuck


People also ask

How does Python handle multiple threads?

Define a new subclass of the Thread class. Override the __init__(self [,args]) method to add additional arguments. Then, override the run(self [,args]) method to implement what the thread should do when started.

Where should a signal be delivered for multi threaded?

sigwait(2) For multithreaded programs, sigwait(2) is the preferred interface to use, because it deals so well with aysynchronously generated signals. sigwait() causes the calling thread to wait until any signal identified by its set argument is delivered to the thread.

How do you use a signal handler in Python?

Python signal handlers are always executed in the main Python thread of the main interpreter, even if the signal was received in another thread. This means that signals can't be used as a means of inter-thread communication. You can use the synchronization primitives from the threading module instead.

How do you communicate between threads in Python?

Perhaps the safest way to send data from one thread to another is to use a Queue from the queue library. To do this, create a Queue instance that is shared by the threads. Threads then use put() or get() operations to add or remove items from the queue as shown in the code given below.


2 Answers

The problem is that, as explained in Execution of Python signal handlers:

A Python signal handler does not get executed inside the low-level (C) signal handler. Instead, the low-level signal handler sets a flag which tells the virtual machine to execute the corresponding Python signal handler at a later point(for example at the next bytecode instruction)

A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.

Your main thread is blocked on threading.Thread.join, which ultimately means it's blocked in C on a pthread_join call. Of course that's not a "long-running calculation", it's a block on a syscall… but nevertheless, until that call finishes, your signal handler can't run.

And, while on some platforms pthread_join will fail with EINTR on a signal, on others it won't. On linux, I believe it depends on whether you select BSD-style or default siginterrupt behavior, but the default is no.


So, what can you do about it?

Well, I'm pretty sure the changes to signal handling in Python 3.3 actually changed the default behavior on Linux so you won't need to do anything if you upgrade; just run under 3.3+ and your code will work as you're expecting. At least it does for me with CPython 3.4 on OS X and 3.3 on Linux. (If I'm wrong about this, I'm not sure whether it's a bug in CPython or not, so you may want to raise it on python-list rather than opening an issue…)

On the other hand, pre-3.3, the signal module definitely doesn't expose the tools you'd need to fix this problem yourself. So, if you can't upgrade to 3.3, the solution is to wait on something interruptible, like a Condition or an Event. The child thread notifies the event right before it quits, and the main thread waits on the event before it joins the child thread. This is definitely hacky. And I can't find anything that guarantees it will make a difference; it just happens to work for me in various builds of CPython 2.7 and 3.2 on OS X and 2.6 and 2.7 on Linux…

like image 51
abarnert Avatar answered Oct 13 '22 09:10

abarnert


abarnert's answer was spot on. I'm still using Python 2.7 however. In order to solve this problem for myself I wrote an InterruptableThread class.

Right now it doesn't allow passing additional arguments to the thread target. Join doesn't accept a timeout parameter either. This is just because I don't need to do that. You can add it if you want. You will probably want to remove the output statements if you use this yourself. They are just there as a way of commenting and testing.

import threading import signal import sys  class InvalidOperationException(Exception):     pass      # noinspection PyClassHasNoInit class GlobalInterruptableThreadHandler:     threads = []     initialized = False      @staticmethod     def initialize():         signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler)         signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler)         GlobalInterruptableThreadHandler.initialized = True      @staticmethod     def add_thread(thread):         if threading.current_thread().name != 'MainThread':             raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.")          if not GlobalInterruptableThreadHandler.initialized:             GlobalInterruptableThreadHandler.initialize()          GlobalInterruptableThreadHandler.threads.append(thread)      @staticmethod     def sig_handler(signum, frame):         sys.stdout.write("handling signal: %s\n" % signum)         sys.stdout.flush()          for thread in GlobalInterruptableThreadHandler.threads:             thread.stop()          GlobalInterruptableThreadHandler.threads = []      class InterruptableThread:     def __init__(self, target=None):         self.stop_requested = threading.Event()         self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run)      def run(self):         pass      def start(self):         GlobalInterruptableThreadHandler.add_thread(self)         self.t.start()      def stop(self):         self.stop_requested.set()      def is_stop_requested(self):         return self.stop_requested.is_set()      def join(self):         try:             while self.t.is_alive():                 self.t.join(timeout=1)         except (KeyboardInterrupt, SystemExit):             self.stop_requested.set()             self.t.join()          sys.stdout.write("join completed\n")         sys.stdout.flush() 

The class can be used two different ways. You can sub-class InterruptableThread:

import time import sys from interruptable_thread import InterruptableThread  class Foo(InterruptableThread):     def __init__(self):         InterruptableThread.__init__(self)      def run(self):         sys.stdout.write("run started\n")         sys.stdout.flush()         while not self.is_stop_requested():             time.sleep(2)          sys.stdout.write("run exited\n")         sys.stdout.flush()  sys.stdout.write("all exited\n") sys.stdout.flush()  foo = Foo() foo2 = Foo() foo.start() foo2.start() foo.join() foo2.join() 

Or you can use it more like the way threading.thread works. The run method has to take the InterruptableThread object as a parameter though.

import time import sys from interruptable_thread import InterruptableThread  def run(t):     sys.stdout.write("run started\n")     sys.stdout.flush()     while not t.is_stop_requested():         time.sleep(2)      sys.stdout.write("run exited\n")     sys.stdout.flush()  t1 = InterruptableThread(run) t2 = InterruptableThread(run) t1.start() t2.start() t1.join() t2.join()  sys.stdout.write("all exited\n") sys.stdout.flush() 

Do with it what you will.

like image 29
stuckintheshuck Avatar answered Oct 13 '22 09:10

stuckintheshuck