Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to block until a BlockingQueue is empty?

Tags:

I'm looking for a way to block until a BlockingQueue is empty.

I know that, in a multithreaded environment, as long as there are producers putting items into the BlockingQueue, there can be situations in which the queue becomes empty and a few nanoseconds later it is full of items.

But, if there's only one producer, then it may want to wait (and block) until the queue is empty after it has stopped putting items into the queue.

Java/Pseudocode:

// Producer code BlockingQueue queue = new BlockingQueue();  while (having some tasks to do) {     queue.put(task); }  queue.waitUntilEmpty(); // <-- how to do this?  print("Done"); 

Do you have any idea?

EDIT: I know that wrapping BlockingQueue and using an extra condition would do the trick, I'm just asking if there are some pre-made solutions and/or better alternatives.

like image 349
gd1 Avatar asked Mar 03 '13 10:03

gd1


People also ask

What happens if blocking queue is full?

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

Is blocking queue thread-safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.


2 Answers

A simple solution using wait() and notify():

// Producer: synchronized(queue) {     while (!queue.isEmpty())         queue.wait(); //wait for the queue to become empty     queue.put(); }  //Consumer: synchronized(queue) {     queue.get();     if (queue.isEmpty())         queue.notify(); // notify the producer } 
like image 94
niculare Avatar answered Nov 10 '22 06:11

niculare


I understand you could already have bunch of threads actively polling or taking the queue but I still feel not quite right about your flow/design.

The queue becomes empty doesn't mean the previously added tasks are finished, somes of the items could take ages to process, so it is not too useful to check for empty.

So what you should do is forget about the BlockingQueue, you can use it as any other collections. Translate the items into a Collections of Callable and make use of the ExecutorService.invokeAll().

    Collection<Item> queue = ...     Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();      for (Item item : queue) {         tasks.add(new Callable<Result>() {              @Override             public Result call() throws Exception {                 // process the item ...                  return result;             }         });     }      // look at the results, add timeout for invokeAll if necessary     List<Future<Result>> results = executorService.invokeAll(tasks);      // done 

This approach will give you full control of how long your producer could wait and proper exception handling.

like image 34
Henry Wong Avatar answered Nov 10 '22 05:11

Henry Wong