I'm doing a project involving data collection and logging. I have 2 threads running, a collection thread and a logging thread, both started in main. I'm trying to allow the program to be terminated gracefully when with Ctrl-C.
I'm using a threading.Event
to signal to the threads to end their respective loops. It works fine to stop the sim_collectData
method, but it doesn't seem to be properly stopping the logData
thread. The Collection terminated
print statement is never executed, and the program just stalls. (It doesn't end, just sits there).
The second while
loop in logData
is to make sure everything in the queue is logged. The goal is for Ctrl-C to stop the collection thread immediately, then allow the logging thread to finish emptying the queue, and only then fully terminate the program. (Right now, the data is just being printed out - eventually it's going to be logged to a database).
I don't understand why the second thread never terminates. I'm basing what I've done on this answer: Stopping a thread after a certain amount of time. What am I missing?
def sim_collectData(input_queue, stop_event):
''' this provides some output simulating the serial
data from the data logging hardware.
'''
n = 0
while not stop_event.is_set():
input_queue.put("DATA: <here are some random data> " + str(n))
stop_event.wait(random.randint(0,5))
n += 1
print "Terminating data collection..."
return
def logData(input_queue, stop_event):
n = 0
# we *don't* want to loop based on queue size because the queue could
# theoretically be empty while waiting on some data.
while not stop_event.is_set():
d = input_queue.get()
if d.startswith("DATA:"):
print d
input_queue.task_done()
n += 1
# if the stop event is recieved and the previous loop terminates,
# finish logging the rest of the items in the queue.
print "Collection terminated. Logging remaining data to database..."
while not input_queue.empty():
d = input_queue.get()
if d.startswith("DATA:"):
print d
input_queue.task_done()
n += 1
return
def main():
input_queue = Queue.Queue()
stop_event = threading.Event() # used to signal termination to the threads
print "Starting data collection thread...",
collection_thread = threading.Thread(target=sim_collectData, args=(input_queue, stop_event))
collection_thread.start()
print "Done."
print "Starting logging thread...",
logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event))
logging_thread.start()
print "Done."
try:
while True:
time.sleep(10)
except (KeyboardInterrupt, SystemExit):
# stop data collection. Let the logging thread finish logging everything in the queue
stop_event.set()
main()
The goal is for Ctrl-C to stop the collection thread immediately, then allow the logging thread to finish emptying the queue, and only then fully terminate the program.
To end the thread, just return from that function. According to this, you can also call thread. exit() , which will throw an exception that will end the thread silently. thread.
The thread continues to run as if nothing happened right until the Python process terminates and returns to the operating system.
But you might want to kill a thread once some specific time period has passed or some interrupt has been generated. There are the various methods by which you can kill a thread in python. This method uses the function PyThreadState_SetAsyncExc () to raise an exception in the a thread. For Example,
Because of the way CPython implementation of Python works, threading may not speed up all tasks. This is due to interactions with the GIL that essentially limit one Python thread to run at a time. Tasks that spend much of their time waiting for external events are generally good candidates for threading.
If you look around the logging statements, you can see that the main section is creating and starting the thread: When you create a Thread, you pass it a function and a list containing the arguments to that function. In this case, you’re telling the Thread to run thread_function () and to pass it 1 as an argument.
Threads are executed in their own system-level thread (e.g., a POSIX thread or Windows threads) that is fully managed by the host operating system. Once started, threads run independently until the target function returns.
The problem is that your logger is waiting on d = input_queue.get()
and will not check the event. One solution is to skip the event completely and invent a unique message that tells the logger to stop. When you get a signal, send that message to the queue.
import threading
import Queue
import random
import time
def sim_collectData(input_queue, stop_event):
''' this provides some output simulating the serial
data from the data logging hardware.
'''
n = 0
while not stop_event.is_set():
input_queue.put("DATA: <here are some random data> " + str(n))
stop_event.wait(random.randint(0,5))
n += 1
print "Terminating data collection..."
input_queue.put(None)
return
def logData(input_queue):
n = 0
# we *don't* want to loop based on queue size because the queue could
# theoretically be empty while waiting on some data.
while True:
d = input_queue.get()
if d is None:
input_queue.task_done()
return
if d.startswith("DATA:"):
print d
input_queue.task_done()
n += 1
def main():
input_queue = Queue.Queue()
stop_event = threading.Event() # used to signal termination to the threads
print "Starting data collection thread...",
collection_thread = threading.Thread(target=sim_collectData, args=(input_queue, stop_event))
collection_thread.start()
print "Done."
print "Starting logging thread...",
logging_thread = threading.Thread(target=logData, args=(input_queue,))
logging_thread.start()
print "Done."
try:
while True:
time.sleep(10)
except (KeyboardInterrupt, SystemExit):
# stop data collection. Let the logging thread finish logging everything in the queue
stop_event.set()
main()
I'm not an expert in threading, but in your logData
function the first d=input_queue.get()
is blocking, i.e., if the queue is empty it will sit an wait forever until a queue message is received. This is likely why the logData
thread never terminates, it's sitting waiting forever for a queue message.
Refer to the [Python docs] to change this to a non-blocking queue read: use .get(False)
or .get_nowait()
- but either will require some exception handling for cases when the queue is empty.
You are calling a blocking get on your input_queue
with no timeout. In either section of logData
, if you call input_queue.get()
and the queue is empty, it will block indefinitely, preventing the logging_thread
from reaching completion.
To fix, you will want to call input_queue.get_nowait()
or pass a timeout to input_queue.get()
.
Here is my suggestion:
def logData(input_queue, stop_event):
n = 0
while not stop_event.is_set():
try:
d = input_queue.get_nowait()
if d.startswith("DATA:"):
print "LOG: " + d
n += 1
except Queue.Empty:
time.sleep(1)
return
You are also signalling the threads to terminate, but not waiting for them to do so. Consider doing this in your main
function.
try:
while True:
time.sleep(10)
except (KeyboardInterrupt, SystemExit):
stop_event.set()
collection_thread.join()
logging_thread.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With