Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How To Abort Threads that Pull Items from a Queue Using Ctrl+C In Python

I've implemented some threaded application using python. During runtime i want to catch the CTRL+C sigcall and exit the program. To do that I've registered a function called exit_gracefully which also takes care of stopping the threads in a more controlled way. However, it does not seem to work. It seems the handler is never called

Here's the example I'm working with:

import Queue
import threading
import signal
import sys
import time

queue = Queue.Queue()
workers = list()

def callback(id, item):
    print("{}: {}".format(id, item))
    time.sleep(1)

def exit_gracefully(signum, frame):
    print("Ctrl+C was pressed. Shutting threads down ...")
    print("Stopping workers ...")
    for worker in workers:
        worker.stop()
    sys.exit(1)


class ThreadedTask(threading.Thread):
    def __init__(self, id, queue, callbacks):
        threading.Thread.__init__(self)
        self._stop_event = threading.Event()
        self.id = str(id)
        self.queue = queue
        self.callbacks = callbacks
        self._stopped = False

    def run(self):
        while not self.stopped():
            item = self.queue.get()
            for callback in self.callbacks:
                callback(self.id, item)
            self.queue.task_done()

    def stop(self):
        self._stop_event.set()
        self._stopped = True

    def stopped(self):
        return self._stop_event.is_set() or self._stopped


def main(input_file, thread_count, callbacks):
    print("Initializing queue ...")
    queue = Queue.Queue()

    print("Parsing '{}' ...".format(input_file))
    with open(input_file) as f:
        for line in f:
            queue.put(line.replace("\n", ""))

    print("Initializing {} threads ...".format(thread_count))
    for id in range(thread_count):
        worker = ThreadedTask(id, queue, callbacks)
        worker.setDaemon(True)
        workers.append(worker)

    print("Starting {} threads ...".format(thread_count))
    for worker in workers:
        worker.start()

    queue.join()


if __name__ == '__main__':
    signal.signal(signal.SIGINT, exit_gracefully)
    print("Starting main ...")
    input_file = "list.txt"
    thread_count = 10
    callbacks = [
        callback
    ]
    main(input_file, thread_count, callbacks)

If you want to try the example above you may generate some test-data first:

seq 1 10000 > list.txt

Any help is appreciated!

like image 909
rednammoc Avatar asked Oct 24 '25 18:10

rednammoc


1 Answers

Here's a solution that seems to work.

One issue is that Queue.get() will ignore SIGINT unless a timeout is set. That's documented here: https://bugs.python.org/issue1360.

Another issue is that Queue.join() also seems to ignore SIGINT. I worked around that by polling the queue in a loop to see if it's empty.

These issues appear to have been fixed in Python 3.

I also added a shared event that's used in the SIGINT handler to tell all the threads to shut down.

import Queue
import signal
import sys
import threading
import time


def callback(id, item):
    print '{}: {}'.format(id, item)
    time.sleep(1)


class ThreadedTask(threading.Thread):

    def __init__(self, id, queue, run_event, callbacks):
        super(ThreadedTask, self).__init__()
        self.id = id
        self.queue = queue
        self.run_event = run_event
        self.callbacks = callbacks

    def run(self):
        queue = self.queue
        while not self.run_event.is_set():
            try:
                item = queue.get(timeout=0.1)
            except Queue.Empty:
                pass
            else:
                for callback in self.callbacks:
                    callback(self.id, item)
                queue.task_done()


def main():
    queue = Queue.Queue()
    run_event = threading.Event()
    workers = []

    def stop():
        run_event.set()
        for worker in workers:
            # Allow worker threads to shut down completely
            worker.join()

    def sigint_handler(signum, frame):
        print '\nShutting down...'
        stop()
        sys.exit(0)

    signal.signal(signal.SIGINT, sigint_handler)

    callbacks = [callback]

    for id in range(1, 11):
        worker = ThreadedTask(id, queue, run_event, callbacks)
        workers.append(worker)

    for worker in workers:
        worker.start()

    with open('list.txt') as fp:
        for line in fp:
            line = line.strip()
            queue.put(line)

    while not queue.empty():
        time.sleep(0.1)

    # Update: Added this to gracefully shut down threads after all
    # items are consumed from the queue.
    stop()


if __name__ == '__main__':
    main()

Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!