I have a class which subclasses threading.Thread
. It's sole responsibility is putting messages read from a UNIX named pipe to a queue.Queue
object (so that other threads can process these values later).
Example code:
class PipeReaderThread(Thread):
def __init__(self, results_queue, pipe_path):
Thread.__init__(self)
self._stop_event = Event()
self._results_queue = results_queue
self._pipe_path = pipe_path
def run(self):
while not self._stop_event.is_set():
with open(self._pipe_path, 'r') as pipe:
message = pipe.read()
self._results_queue.put(message, block=True)
def stop(self):
self._stop_event.set()
As you can see I wanted to use a threading.Event
object to stop the loop, but since the open()
or read()
calls on the named pipe will block (until someone opens the pipe for writing / writes to it then closes it), the thread never has the chance to stop.
I didn't want to use nonblocking mode for the named pipe, as the blocking is actually what I want in a sense that I want to wait for someone to open and write to the pipe.
With sockets I'd try something like setting a timeout flag on the socket, but I couldn't find any way of doing this for named pipes. I've also considered just killing the thread in cold blood without giving it a chance to stop gracefully, but this doesn't really feel like something I should be doing, and I don't even know if Python provides any way of doing this.
How should I stop this thread properly, so that I could call join()
on it afterwards?
Here comes the problem: There is no terminate or similar method in threading.Thread, so we cannot use the solution of first problem. Also, ctrl-c cannot break out the python process here (this seems is a bug of Python). We can send some siginal to the threads we want to terminate.
Killing a thread forcibly is not recommended unless it is known for sure, that doing so will not cause any leaks or deadlocks. In order to kill a thread, we use hidden function _stop () this function is not documented but might disappear in the next version of python.
Thread.sleep (), BlockingQueue.put (), ServerSocket.accept () are some examples of blocking methods. Most blocking methods support interruptions. They usually respond by throwing InterruptException or ClosedByInterruptionException when they detect interruptions. Let us consider an example. the code below calls Thread.sleep ().
Threads are executed in their own system-level thread (e.g., a POSIX thread or Windows threads) that is fully managed by the host operating system. Once started, threads run independently until the target function returns.
Classic way to do this is to have unnamed pipe that signals closing and to use select
to know which one is to be used.
select
will block until one of descriptors is ready for read and then you can use os.read
which will not block in this case.
Code for demonstration (doesn't handle errors, might leak descriptors):
class PipeReaderThread(Thread):
def __init__(self, results_queue, pipe_path):
Thread.__init__(self)
self._stop_pipe_r, self._stop_pipe_w = os.pipe()
self._results_queue = results_queue
self._pipe = os.open(pipe_path, os.O_RDONLY) # use file descriptors directly to read file in parts
self._buffer = b''
def run(self):
while True:
result = select.select([self._stop_pipe_r, self._pipe], [], [])
if self._stop_pipe_r in result[0]:
os.close(self._stop_pipe_r)
os.close(self._stop_pipe_w)
os.close(self._pipe)
return
self._buffer += os.read(self._pipe, 4096) # select above guarantees read is noblocking
self._extract_messages_from_buffer() # left as an exercise
def stop(self):
os.write(self._stop_pipe_w, b'c')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With