Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make worker threads quit after work is finished in a multithreaded producer-consumer pattern?

I am trying to implement a multithreaded producer-consumer pattern using Queue.Queue in Python 2.7. I am trying to figure out how to make the consumers, i.e. the worker threads, stop once all required work is done.

See the second comment by Martin James to this answer: https://stackoverflow.com/a/19369877/1175080

Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.

But this does not work for me. See the following code for example.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

This program hangs after all three workers have read the exit indicator, i.e. -1 from the queue, because each worker requeues -1 before exiting, so the queue never becomes empty and q.join() never returns.

I came up with the following but ugly solution where I send a -1 exit indicator for each worker via the queue, so that each worker can see it and commit suicide. But the fact that I have to send an exit indicator for each worker feels a little ugly.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

I have two questions.

  1. Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?
  2. If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?
like image 209
Lone Learner Avatar asked Jul 18 '17 14:07

Lone Learner


People also ask

How do I stop a worker thread?

For a worker thread, normal thread termination is simple: Exit the controlling function and return a value that signifies the reason for termination.

Will python ever be multithreaded?

Multi-threading enables you to write in a way where multiple activities can proceed concurrently in the same program. Python doesn't allow multi-threading, but if you want to run your program at a speed that needs to wait for something like IO, then it's used a lot.


2 Answers

Don't call it a special case for a task.

Use an Event instead, with non-blocking implementation for your workers.

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'
like image 84
Travis Mehlinger Avatar answered Oct 10 '22 06:10

Travis Mehlinger


Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?

As you have notice it can't work, spreading the message will make the last thread to update the queue with one more item and since you are waiting for a queue that will never be empty, not with the code you have.

If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?

You can join the threads instead of the queue:

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
    for t in threads:
        threads.start()
    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    for t in threads:
        t.join()
    print 'done'

master()

As the Queue documentation explain get method will rise an execption once its empty so if you know already the data to process you can fill the queue and then spam the threads:

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        try:
            data = q.get(block=False, timeout=1)
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
        except Queue.Empty:
            break


def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

Here you have a live example

like image 35
Netwave Avatar answered Oct 10 '22 04:10

Netwave