Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to immediately release threads waiting on a BlockingQueue

Consider a BlockingQueue and a few threads waiting on poll(long, TimeUnit) (possibly also on on take()).

Now the queue is empty and it is desired to notify the waiting threads that they can stop waiting. The expected behaviour is to have either null returned or the declared InterruptedException thrown.

Object.notify() won't work for LinkedBlockingQueue as the threads are waiting on an internal lock.

Any straightforward way?

like image 411
Joel Shemtov Avatar asked Jul 22 '10 07:07

Joel Shemtov


2 Answers

Javadoc for the BlockingQueue suggests a good way:

A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

like image 194
unbeli Avatar answered Sep 17 '22 22:09

unbeli


The conventional way is to interrupt the threads, but this of course requires that they handle interruptions properly.

This means to catch and handle InterruptedExceptions properly around blocking methods, and to check (and act upon) the interrupted flag regularly otherwise.

There is nothing in the API or language specification that ties interruption to any specific cancellation semantics, but in practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger applications. [...]

Interruption is usually the most sensible way to implement cancellation.

Says Java Concurrency in Practice in section 7.1.1. An example of handling interruption properly, from same (this is a producer thread, not a consumer, but that difference is negligible in the current context):

class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /*  Allow thread to exit  */
        }
    }
    public void cancel() { interrupt(); }
}

An alternative solution would be to set the timeout parameter of poll reasonably low, so that the thread wakes up regularly and can notice interruptions quick enough. Still I believe it is always good practice to handle InterruptedException explicitly according to your specific thread cancellation policy.

like image 28
Péter Török Avatar answered Sep 19 '22 22:09

Péter Török