Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Data buffering in multithreaded java application

I have a multi threaded application which has one producer thread and several consumer threads. The data is stored in a shared thread safe collection and flushed to a database when there is sufficient data in the buffer.

From the javadocs -

BlockingQueue<E>

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

take()

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

My questions -

  1. Is there another collection that has a E[] take(int n) method? i.e. Blocking queue waits until an element is available. What I want is that it should wait until 100 or 200 elements are available.
  2. Alternatively, is there another method I could use to address the problem without polling?
like image 637
Kshitiz Sharma Avatar asked Jun 07 '12 10:06

Kshitiz Sharma


4 Answers

I think the only way is to either extend some implementation of BlockingQueue or create some kind of utility method using take:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
        throws InterruptedException {

    for (int i = 0; i < max; i++)
        to.add(queue.take());
}
like image 98
dacwe Avatar answered Oct 17 '22 12:10

dacwe


The drainTo method isn't exactly what you're looking for, but would it serve your purpose?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection, int)

EDIT

You could implement a slightly more performant batch blocking takemin using a combination of take and drainTo:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException
{
  int drained = 0;
  do 
  {
    if (queue.size() > 0)
      drained += queue.drainTo(list, min - drained);
    else
    {
      list.add(queue.take());
      drained++;
    }
  }
  while (drained < min);
}
like image 45
SimonC Avatar answered Oct 17 '22 13:10

SimonC


I am not sure if there's a similar class in the standard library that has take(int n) type method, but you should be able to wrap the default BlockingQueue to add that function without too much hassle, don't you think?

Alternative scenario would be to trigger an action where you put elements in the collection, where a threshold set by you would trigger the flushing.

like image 1
posdef Avatar answered Oct 17 '22 13:10

posdef


So this should be a threadsafe queue that lets you block on taking an arbitrary number of elements. More eyes to verify the threading code is correct would be welcome.

package mybq;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ChunkyBlockingQueue<T> {
    protected final LinkedList<T> q = new LinkedList<T>();
    protected final Object lock = new Object();

    public void add(T t) {
        synchronized (lock) {
            q.add(t);
            lock.notifyAll();
        }
    }

    public List<T> take(int numElements) {
        synchronized (lock) {
            while (q.size() < numElements) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            ArrayList<T> l = new ArrayList<T>(numElements);
            l.addAll(q.subList(0, numElements));
            q.subList(0, numElements).clear();
            return l;
        }
    }
}
like image 1
Zarkonnen Avatar answered Oct 17 '22 14:10

Zarkonnen