I'm looking for a way to block until a BlockingQueue
is empty.
I know that, in a multithreaded environment, as long as there are producers putting items into the BlockingQueue
, there can be situations in which the queue becomes empty and a few nanoseconds later it is full of items.
But, if there's only one producer, then it may want to wait (and block) until the queue is empty after it has stopped putting items into the queue.
Java/Pseudocode:
// Producer code BlockingQueue queue = new BlockingQueue(); while (having some tasks to do) { queue.put(task); } queue.waitUntilEmpty(); // <-- how to do this? print("Done");
Do you have any idea?
EDIT: I know that wrapping BlockingQueue
and using an extra condition would do the trick, I'm just asking if there are some pre-made solutions and/or better alternatives.
Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
A simple solution using wait()
and notify()
:
// Producer: synchronized(queue) { while (!queue.isEmpty()) queue.wait(); //wait for the queue to become empty queue.put(); } //Consumer: synchronized(queue) { queue.get(); if (queue.isEmpty()) queue.notify(); // notify the producer }
I understand you could already have bunch of threads actively polling or taking the queue but I still feel not quite right about your flow/design.
The queue becomes empty doesn't mean the previously added tasks are finished, somes of the items could take ages to process, so it is not too useful to check for empty.
So what you should do is forget about the BlockingQueue
, you can use it as any other collections. Translate the items into a Collections
of Callable
and make use of the ExecutorService.invokeAll()
.
Collection<Item> queue = ... Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (Item item : queue) { tasks.add(new Callable<Result>() { @Override public Result call() throws Exception { // process the item ... return result; } }); } // look at the results, add timeout for invokeAll if necessary List<Future<Result>> results = executorService.invokeAll(tasks); // done
This approach will give you full control of how long your producer could wait and proper exception handling.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With