Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to interrupt a BlockingQueue which is blocking on take()?

I have a class that takes objects from a BlockingQueue and processes them by calling take() in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take() method so that it stops blocking?

Here's the class that processes the objects:

public class MyObjHandler implements Runnable {    private final BlockingQueue<MyObj> queue;    public class MyObjHandler(BlockingQueue queue) {     this.queue = queue;   }    public void run() {     try {       while (true) {         MyObj obj = queue.take();         // process obj here         // ...       }     } catch (InterruptedException e) {       Thread.currentThread().interrupt();     }   } } 

And here's the method that uses this class to process objects:

public void testHandler() {    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);      MyObjectHandler  handler = new MyObjectHandler(queue);   new Thread(handler).start();    // get objects for handler to process   for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {     queue.put(i.next());   }    // what code should go here to tell the handler   // to stop waiting for more objects? } 
like image 879
MCS Avatar asked May 01 '09 17:05

MCS


People also ask

Can you interrupt a blocked thread?

A blocked thread can be interrupted by calling the interrupt() method of Thread class. This interrupt is a pure Java mechanism and is neither CPU nor operating system level interrupt. The interrupt() method does not interrupt a running thread i.e. thread which is in RUNNABLE state.

Is BlockingQueue concurrent?

concurrent. BlockingQueue , represents a queue which is thread safe to put elements into, and take elements out of from. In other words, multiple threads can be inserting and taking elements concurrently from a Java BlockingQueue , without any concurrency issues arising.


2 Answers

If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.

like image 157
Chris Thornhill Avatar answered Sep 27 '22 20:09

Chris Thornhill


BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {   queue.put(i.next()); } thread.interrupt(); 

However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll instead of take, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.

like image 20
erickson Avatar answered Sep 27 '22 21:09

erickson