Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Blocking queue and multi-threaded consumer, how to know when to stop

I have a single thread producer which creates some task objects which are then added into an ArrayBlockingQueue (which is of fixed size).

I also start a multi-threaded consumer. This is build as a fixed thread pool (Executors.newFixedThreadPool(threadCount);). I then submit some ConsumerWorker intances to this threadPool, each ConsumerWorker having a refference to the above mentioned ArrayBlockingQueue instance.

Each such Worker will do a take() on the queue and deal with the task.

My issue is, what's the best way to have a Worker know when there won't be any more work to be done. In other words, how do I tell the Workers that the producer has finished adding to the queue, and from this point on, each worker should stop when he sees that the Queue is empty.

What I've got now is a setup where my Producer is initialized with a callback which is triggered when he finishes it's job (of adding stuff to the queue). I also keep a list of all the ConsumerWorkers I've created and submitted to the ThreadPool. When the Producer Callback tells me that the producer is done, I can tell this to each of the workers. At this point they should simply keep checking if the queue is not empty, and when it becomes empty they should stop, thus allowing me to gracefully shutDown the ExecutorService thread pool. It's something like this

public class ConsumerWorker implements Runnable{  private BlockingQueue<Produced> inputQueue; private volatile boolean isRunning = true;  public ConsumerWorker(BlockingQueue<Produced> inputQueue) {     this.inputQueue = inputQueue; }  @Override public void run() {     //worker loop keeps taking en element from the queue as long as the producer is still running or as      //long as the queue is not empty:     while(isRunning || !inputQueue.isEmpty()) {         System.out.println("Consumer "+Thread.currentThread().getName()+" START");         try {             Object queueElement = inputQueue.take();             //process queueElement         } catch (Exception e) {             e.printStackTrace();         }     } }  //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void setRunning(boolean isRunning) {     this.isRunning = isRunning; } 

}

The problem here is that I have an obvious race condition where sometimes the producer will finish, signal it, and the ConsumerWorkers will stop BEFORE consuming everything in the queue.

My question is what's the best way to synchronize this so that it all works ok? Should I synchronize the whole part where it checks if the producer is running plus if the queue is empty plus take something from the queue in one block (on the queue object)? Should I just synchronize the update of the isRunning boolean on the ConsumerWorker instance? Any other suggestion?

UPDATE, HERE'S THE WORKING IMPLEMENTATION THAT I'VE ENDED UP USING:

public class ConsumerWorker implements Runnable{  private BlockingQueue<Produced> inputQueue;  private final static Produced POISON = new Produced(-1);   public ConsumerWorker(BlockingQueue<Produced> inputQueue) {     this.inputQueue = inputQueue; }  @Override public void run() {     //worker loop keeps taking en element from the queue as long as the producer is still running or as      //long as the queue is not empty:     while(true) {         System.out.println("Consumer "+Thread.currentThread().getName()+" START");         try {             Produced queueElement = inputQueue.take();             Thread.sleep(new Random().nextInt(100));             if(queueElement==POISON) {                 break;             }             //process queueElement         } catch (Exception e) {             e.printStackTrace();         }         System.out.println("Consumer "+Thread.currentThread().getName()+" END");     } }  //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void stopRunning() {     try {         inputQueue.put(POISON);     } catch (InterruptedException e) {         // TODO Auto-generated catch block         e.printStackTrace();     } } 

}

This was inspired heavily by JohnVint's answer below, with only some minor modifications.

=== Update due to @vendhan's comment.

Thank you for your obeservation. You are right, the first snippet of code in this question has (amongst other issues) the one where the while(isRunning || !inputQueue.isEmpty()) doesn't really make sense.

In my actual final implementation of this, I do something which is closer to your suggestion of replacing "||" (or) with "&&" (and), in the sense that each worker (consumer) now only checks if the element he's got from the list is a poison pill, and if so stops (so theoretically we can say that the worker has to be running AND the queue must not be empty).

like image 509
Shivan Dragon Avatar asked Jan 23 '12 16:01

Shivan Dragon


People also ask

What happens if blocking queue is full?

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

When should we use linked blocking queue and when array blocking queue?

ArrayBlockingQueue is bounded which means the size will never change after its creation. LinkedBlockingQueue is optionally bounded which means it can optionally have an upper bound if desired. If no upper bound is specified, Integer.

What is blocking queue in multithreading?

A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or if you try to enqueue items to it and the queue is already full. A thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue.

How blocking queue is working What kind of problem can be solved by using blocking queue?

If a producer thread tries to put an element in a full BlockingQueue, it gets blocked and stays blocked until a consumer removes an element. Similarly, if a consumer thread tries to take an element from an empty BlockingQueue, it gets blocked and remains blocked until a producer adds an element.


2 Answers

You should continue to take() from the queue. You can use a poison pill to tell the worker to stop. For example:

private final Object POISON_PILL = new Object();  @Override public void run() {     //worker loop keeps taking en element from the queue as long as the producer is still running or as      //long as the queue is not empty:     while(isRunning) {         System.out.println("Consumer "+Thread.currentThread().getName()+" START");         try {             Object queueElement = inputQueue.take();             if(queueElement == POISON_PILL) {                  inputQueue.add(POISON_PILL);//notify other threads to stop                  return;             }             //process queueElement         } catch (Exception e) {             e.printStackTrace();         }     } }  //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void finish() {     //you can also clear here if you wanted     isRunning = false;     inputQueue.add(POISON_PILL); } 
like image 99
John Vint Avatar answered Oct 14 '22 19:10

John Vint


I'd send the workers a special work packet to signal that they should shut down:

public class ConsumerWorker implements Runnable{  private static final Produced DONE = new Produced();  private BlockingQueue<Produced> inputQueue;  public ConsumerWorker(BlockingQueue<Produced> inputQueue) {     this.inputQueue = inputQueue; }  @Override public void run() {     for (;;) {         try {             Produced item = inputQueue.take();             if (item == DONE) {                 inputQueue.add(item); // keep in the queue so all workers stop                 break;             }             // process `item`         } catch (Exception e) {             e.printStackTrace();         }     } } 

}

To stop the workers, simply add ConsumerWorker.DONE to the queue.

like image 34
NPE Avatar answered Oct 14 '22 17:10

NPE