Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop a thread which is blocking on a named pipe in Python?

I have a class which subclasses threading.Thread. It's sole responsibility is putting messages read from a UNIX named pipe to a queue.Queue object (so that other threads can process these values later).

Example code:

class PipeReaderThread(Thread):
    def __init__(self, results_queue, pipe_path):
        Thread.__init__(self)
        self._stop_event = Event()
        self._results_queue = results_queue
        self._pipe_path = pipe_path

    def run(self):
        while not self._stop_event.is_set():
            with open(self._pipe_path, 'r') as pipe:
                message = pipe.read()
            self._results_queue.put(message, block=True)

    def stop(self):
        self._stop_event.set()

As you can see I wanted to use a threading.Event object to stop the loop, but since the open() or read() calls on the named pipe will block (until someone opens the pipe for writing / writes to it then closes it), the thread never has the chance to stop.

I didn't want to use nonblocking mode for the named pipe, as the blocking is actually what I want in a sense that I want to wait for someone to open and write to the pipe.

With sockets I'd try something like setting a timeout flag on the socket, but I couldn't find any way of doing this for named pipes. I've also considered just killing the thread in cold blood without giving it a chance to stop gracefully, but this doesn't really feel like something I should be doing, and I don't even know if Python provides any way of doing this.

How should I stop this thread properly, so that I could call join() on it afterwards?

like image 799
krispet krispet Avatar asked Dec 09 '18 23:12

krispet krispet


People also ask

How to terminate a thread in Python?

Here comes the problem: There is no terminate or similar method in threading.Thread, so we cannot use the solution of first problem. Also, ctrl-c cannot break out the python process here (this seems is a bug of Python). We can send some siginal to the threads we want to terminate.

Is it safe to kill a thread in Python?

Killing a thread forcibly is not recommended unless it is known for sure, that doing so will not cause any leaks or deadlocks. In order to kill a thread, we use hidden function _stop () this function is not documented but might disappear in the next version of python.

What are the blocking methods of a thread?

Thread.sleep (), BlockingQueue.put (), ServerSocket.accept () are some examples of blocking methods. Most blocking methods support interruptions. They usually respond by throwing InterruptException or ClosedByInterruptionException when they detect interruptions. Let us consider an example. the code below calls Thread.sleep ().

What is threading in Linux?

Threads are executed in their own system-level thread (e.g., a POSIX thread or Windows threads) that is fully managed by the host operating system. Once started, threads run independently until the target function returns.


1 Answers

Classic way to do this is to have unnamed pipe that signals closing and to use select to know which one is to be used.

select will block until one of descriptors is ready for read and then you can use os.read which will not block in this case.

Code for demonstration (doesn't handle errors, might leak descriptors):

class PipeReaderThread(Thread):
    def __init__(self, results_queue, pipe_path):
        Thread.__init__(self)
        self._stop_pipe_r, self._stop_pipe_w = os.pipe()
        self._results_queue = results_queue
        self._pipe = os.open(pipe_path, os.O_RDONLY) # use file descriptors directly to read file in parts
        self._buffer = b''

    def run(self):
        while True:
            result = select.select([self._stop_pipe_r, self._pipe], [], [])
            if self._stop_pipe_r in result[0]:
                os.close(self._stop_pipe_r)
                os.close(self._stop_pipe_w)
                os.close(self._pipe)
                return
            self._buffer += os.read(self._pipe, 4096) # select above guarantees read is noblocking
            self._extract_messages_from_buffer() # left as an exercise

    def stop(self):
        os.write(self._stop_pipe_w, b'c')
like image 164
zch Avatar answered Oct 05 '22 04:10

zch