Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Closing" a blocking queue

I’m using java.util.concurrent.BlockingQueue in a very simple producer-consumer scenario. E.g. this pseudo code depicts the consumer part:

class QueueConsumer implements Runnable {      @Override     public void run() {         while(true)         {             try {                 ComplexObject complexObject = myBlockingQueue.take();                 //do something with the complex object             } catch (InterruptedException e) {                 Thread.currentThread().interrupt();             }         }     } } 

So far so good. In the javadoc of the blocking queue I read:

A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

Unfortunately because of the generics in use and the nature of ComplexObject it’s not trivial to push a "poison object" into the queue. So this "common tactic" is not really convenient in my scenario.

My question is: what other good tactics/patterns can I use to "close" the queue?

Thank you!

like image 955
Lachezar Balev Avatar asked Mar 21 '11 13:03

Lachezar Balev


People also ask

What happens if blocking queue is full?

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

How does a blocking queue work?

A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or if you try to enqueue items to it and the queue is already full. A thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue.

Why do we need blocking queue?

util. concurrent. BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

How is blocking queue implemented?

Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.


2 Answers

If you have a handle to the consumer thread, you can interrupt it. With the code you gave, that will kill the consumer. I would not expect the producer to have this; it would probably have to callback to the program controller somehow to let it know it's done. Then the controller would interrupt the consumer thread.

You can always finish doing work before obeying the interrupt. For instance:

class QueueConsumer implements Runnable {     @Override     public void run() {         while(!(Thread.currentThread().isInterrupted())) {             try {                 final ComplexObject complexObject = myBlockingQueue.take();                 this.process(complexObject);              } catch (InterruptedException e) {                 // Set interrupted flag.                 Thread.currentThread().interrupt();             }         }          // Thread is getting ready to die, but first,         // drain remaining elements on the queue and process them.         final LinkedList<ComplexObject> remainingObjects;         myBlockingQueue.drainTo(remainingObjects);         for(ComplexObject complexObject : remainingObjects) {             this.process(complexObject);         }     }      private void process(final ComplexObject complexObject) {         // Do something with the complex object.     } } 

I would actually prefer that to somehow poisoning the queue anyway. If you want to kill the thread, ask the thread to kill itself.

(It's nice to see someone handling InterruptedException properly.)


There seems to be some contention about the handling of interruptions here. First, I would like everyone to read this article.

Now, with the understanding that no one actually read that, here's the deal. A thread will only receive an InterruptedException if it was currently blocking at the time of interrupt. In this case, Thread.interrupted() will return false. If it was not blocking, it will NOT receive this exception, and instead Thread.interrupted() will return true. Therefore, your loop guard should absolutely, no matter what, check Thread.interrupted(), or otherwise risk missing an interruption to the thread.

So, since you are checking Thread.interrupted() no matter what, and you are forced to catch InterruptedException (and should be dealing with it even if you weren't forced to), you now have two code areas which handle the same event, thread interruption. One way to handle this is normalize them into one condition, meaning either the boolean state check can throw the exception, or the exception can set the boolean state. I choose the later.


Edit: Note that the static Thread#interrupted method clears the the interrupted status of the current thread.

like image 151
jdmichal Avatar answered Sep 17 '22 20:09

jdmichal


Another idea for making this simple:

class ComplexObject implements QueueableComplexObject {     /* the meat of your complex object is here as before, just need to      * add the following line and the "implements" clause above      */     @Override public ComplexObject asComplexObject() { return this; } }  enum NullComplexObject implements QueueableComplexObject {     INSTANCE;      @Override public ComplexObject asComplexObject() { return null; } }  interface QueueableComplexObject {     public ComplexObject asComplexObject(); } 

Then use BlockingQueue<QueueableComplexObject> as the queue. When you wish to end the queue's processing, do queue.offer(NullComplexObject.INSTANCE). On the consumer side, do

boolean ok = true; while (ok) {     ComplexObject obj = queue.take().asComplexObject();     if (obj == null)         ok = false;     else         process(obj); }  /* interrupt handling elided: implement this as you see fit,  * depending on whether you watch to swallow interrupts or propagate them  * as in your original post  */ 

No instanceof required, and you don't have to construct a fake ComplexObject which may be expensive/difficult depending on its implementation.

like image 37
Jason S Avatar answered Sep 18 '22 20:09

Jason S