I have a multi threaded application which has one producer thread and several consumer threads. The data is stored in a shared thread safe collection and flushed to a database when there is sufficient data in the buffer.
From the javadocs -
BlockingQueue<E>
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
My questions -
- Is there another collection that has a E[] take(int n) method? i.e. Blocking queue waits until an element is available. What I want is that it should wait until 100 or 200 elements are available.
- Alternatively, is there another method I could use to address the problem without polling?
I think the only way is to either extend some implementation of BlockingQueue
or create some kind of utility method using take
:
public <E> void take(BlockingQueue<E> queue, List<E> to, int max)
throws InterruptedException {
for (int i = 0; i < max; i++)
to.add(queue.take());
}
The drainTo
method isn't exactly what you're looking for, but would it serve your purpose?
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection, int)
EDIT
You could implement a slightly more performant batch blocking takemin using a combination of take
and drainTo
:
public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException
{
int drained = 0;
do
{
if (queue.size() > 0)
drained += queue.drainTo(list, min - drained);
else
{
list.add(queue.take());
drained++;
}
}
while (drained < min);
}
I am not sure if there's a similar class in the standard library that has take(int n)
type method, but you should be able to wrap the default BlockingQueue
to add that function without too much hassle, don't you think?
Alternative scenario would be to trigger an action where you put elements in the collection, where a threshold set by you would trigger the flushing.
So this should be a threadsafe queue that lets you block on taking an arbitrary number of elements. More eyes to verify the threading code is correct would be welcome.
package mybq;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class ChunkyBlockingQueue<T> {
protected final LinkedList<T> q = new LinkedList<T>();
protected final Object lock = new Object();
public void add(T t) {
synchronized (lock) {
q.add(t);
lock.notifyAll();
}
}
public List<T> take(int numElements) {
synchronized (lock) {
while (q.size() < numElements) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ArrayList<T> l = new ArrayList<T>(numElements);
l.addAll(q.subList(0, numElements));
q.subList(0, numElements).clear();
return l;
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With