I have a class that takes objects from a BlockingQueue
and processes them by calling take()
in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take()
method so that it stops blocking?
Here's the class that processes the objects:
public class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
And here's the method that uses this class to process objects:
public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? }
A blocked thread can be interrupted by calling the interrupt() method of Thread class. This interrupt is a pure Java mechanism and is neither CPU nor operating system level interrupt. The interrupt() method does not interrupt a running thread i.e. thread which is in RUNNABLE state.
concurrent. BlockingQueue , represents a queue which is thread safe to put elements into, and take elements out of from. In other words, multiple threads can be inserting and taking elements concurrently from a Java BlockingQueue , without any concurrency issues arising.
If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt();
However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll
instead of take
, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With