Does anybody know why java's BlockingQueue does not have a putAll method? Is there a problem with such a method? Any good ways around this problem without having to completely re-implement BlockingQueue?
BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.
It's also worth mentioning that the BlockingQueue interface also provides us with ways of blocking when adding to full queues. However, a PriorityBlockingQueue is unbounded. This means that it will never be full, thus it will always possible to add new elements.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
for (Item item : items) {
queue.put(item);
}
3 lines, not sure thats totally re-implementing blocking queue.
I imagine it wants you to put 1 by 1 in case threads are waiting to read from it, they aren't waiting for you to finish reading them all.
I found the same issue with ArrayBlockingQueue
. We wanted some additional methods that were missing:
void putAll(Collection<? extends E> c)
throws InterruptedException
int drainAtLeastOneTo(@OutputParam Collection<? super E> c)
throws InterruptedException
int drainAtLeastOneTo(@OutputParam Collection<? super E> c, int maxElements)
throws InterruptedException
Some people advocate to use BlockingQueue<List<E>>
, but that requires malloc'ing lists. Sometimes you want to avoid it. Also, that assumes you want producer and consumer to use the same "chunk" sizes.
Regarding draining: Again, you may want to have a mismatch between chunk size of producer and consumer. So producer might insert single items, but consumer works on batches. drainTo()
does not block, so drainAtLeastOneTo()
was a solution.
In the end, we copied the default impl of ArrayBlockingQueue
and added the methods directly. Again, a drawback is that you need to operate on a concrete type, instead of interface BlockingQueue
.
You may also consider using the famed (infamous?) LMAX Disruptor
, but the model is quite different from a standard BlockingQueue
, as you do not control when items are consumed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With