Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to let a Python thread finish gracefully

I'm doing a project involving data collection and logging. I have 2 threads running, a collection thread and a logging thread, both started in main. I'm trying to allow the program to be terminated gracefully when with Ctrl-C.

I'm using a threading.Event to signal to the threads to end their respective loops. It works fine to stop the sim_collectData method, but it doesn't seem to be properly stopping the logData thread. The Collection terminated print statement is never executed, and the program just stalls. (It doesn't end, just sits there).

The second while loop in logData is to make sure everything in the queue is logged. The goal is for Ctrl-C to stop the collection thread immediately, then allow the logging thread to finish emptying the queue, and only then fully terminate the program. (Right now, the data is just being printed out - eventually it's going to be logged to a database).

I don't understand why the second thread never terminates. I'm basing what I've done on this answer: Stopping a thread after a certain amount of time. What am I missing?

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue, stop_event):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while not stop_event.is_set():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

    # if the stop event is recieved and the previous loop terminates, 
    # finish logging the rest of the items in the queue.
    print "Collection terminated. Logging remaining data to database..."
    while not input_queue.empty():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1
    return


def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event))
    logging_thread.start()
    print "Done."

    try:
        while True:
        time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

 main()
like image 415
Isaac Dontje Lindell Avatar asked Jul 09 '13 17:07

Isaac Dontje Lindell


People also ask

How do you close a thread in Python gracefully?

The goal is for Ctrl-C to stop the collection thread immediately, then allow the logging thread to finish emptying the queue, and only then fully terminate the program.

How do you exit a thread?

To end the thread, just return from that function. According to this, you can also call thread. exit() , which will throw an exception that will end the thread silently. thread.

What happens to thread when process exits in Python?

The thread continues to run as if nothing happened right until the Python process terminates and returns to the operating system.

How to kill a thread in Python?

But you might want to kill a thread once some specific time period has passed or some interrupt has been generated. There are the various methods by which you can kill a thread in python. This method uses the function PyThreadState_SetAsyncExc () to raise an exception in the a thread. For Example,

Does threading in Python speed up tasks?

Because of the way CPython implementation of Python works, threading may not speed up all tasks. This is due to interactions with the GIL that essentially limit one Python thread to run at a time. Tasks that spend much of their time waiting for external events are generally good candidates for threading.

How do I start a thread in Python?

If you look around the logging statements, you can see that the main section is creating and starting the thread: When you create a Thread, you pass it a function and a list containing the arguments to that function. In this case, you’re telling the Thread to run thread_function () and to pass it 1 as an argument.

What is threading in Linux?

Threads are executed in their own system-level thread (e.g., a POSIX thread or Windows threads) that is fully managed by the host operating system. Once started, threads run independently until the target function returns.


3 Answers

The problem is that your logger is waiting on d = input_queue.get() and will not check the event. One solution is to skip the event completely and invent a unique message that tells the logger to stop. When you get a signal, send that message to the queue.

import threading
import Queue
import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    input_queue.put(None)
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while True:
        d = input_queue.get()
        if d is None:
            input_queue.task_done()
            return
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

main()
like image 86
tdelaney Avatar answered Oct 15 '22 00:10

tdelaney


I'm not an expert in threading, but in your logData function the first d=input_queue.get() is blocking, i.e., if the queue is empty it will sit an wait forever until a queue message is received. This is likely why the logData thread never terminates, it's sitting waiting forever for a queue message.

Refer to the [Python docs] to change this to a non-blocking queue read: use .get(False) or .get_nowait() - but either will require some exception handling for cases when the queue is empty.

like image 25
Raceyman Avatar answered Oct 15 '22 01:10

Raceyman


You are calling a blocking get on your input_queue with no timeout. In either section of logData, if you call input_queue.get() and the queue is empty, it will block indefinitely, preventing the logging_thread from reaching completion.

To fix, you will want to call input_queue.get_nowait() or pass a timeout to input_queue.get().

Here is my suggestion:

def logData(input_queue, stop_event):
    n = 0

    while not stop_event.is_set():
        try:
            d = input_queue.get_nowait()
            if d.startswith("DATA:"):
                print "LOG: " + d
                n += 1
        except Queue.Empty:
            time.sleep(1)
    return

You are also signalling the threads to terminate, but not waiting for them to do so. Consider doing this in your main function.

try:
    while True:
        time.sleep(10)
except (KeyboardInterrupt, SystemExit):
    stop_event.set()
    collection_thread.join()
    logging_thread.join()
like image 44
rrhartjr Avatar answered Oct 15 '22 00:10

rrhartjr