Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BlockingQueue and putAll

Does anybody know why java's BlockingQueue does not have a putAll method? Is there a problem with such a method? Any good ways around this problem without having to completely re-implement BlockingQueue?

like image 931
Jeff Wu Avatar asked Jul 02 '10 01:07

Jeff Wu


People also ask

What is a BlockingQueue?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

What is the difference between PriorityBlockingQueue and a normal BlockingQueue?

It's also worth mentioning that the BlockingQueue interface also provides us with ways of blocking when adding to full queues. However, a PriorityBlockingQueue is unbounded. This means that it will never be full, thus it will always possible to add new elements.

Is BlockingQueue thread-safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.


2 Answers

for (Item item : items) {
    queue.put(item);
}

3 lines, not sure thats totally re-implementing blocking queue.

I imagine it wants you to put 1 by 1 in case threads are waiting to read from it, they aren't waiting for you to finish reading them all.

like image 102
bwawok Avatar answered Oct 02 '22 08:10

bwawok


I found the same issue with ArrayBlockingQueue. We wanted some additional methods that were missing:

  • void putAll(Collection<? extends E> c) throws InterruptedException
  • int drainAtLeastOneTo(@OutputParam Collection<? super E> c) throws InterruptedException
  • int drainAtLeastOneTo(@OutputParam Collection<? super E> c, int maxElements) throws InterruptedException

Some people advocate to use BlockingQueue<List<E>>, but that requires malloc'ing lists. Sometimes you want to avoid it. Also, that assumes you want producer and consumer to use the same "chunk" sizes.

Regarding draining: Again, you may want to have a mismatch between chunk size of producer and consumer. So producer might insert single items, but consumer works on batches. drainTo() does not block, so drainAtLeastOneTo() was a solution.

In the end, we copied the default impl of ArrayBlockingQueue and added the methods directly. Again, a drawback is that you need to operate on a concrete type, instead of interface BlockingQueue.

You may also consider using the famed (infamous?) LMAX Disruptor, but the model is quite different from a standard BlockingQueue, as you do not control when items are consumed.

like image 20
kevinarpe Avatar answered Oct 02 '22 07:10

kevinarpe