Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java BlockingQueue with batching?

Tags:

I am interested in a data structure identical to the Java BlockingQueue, with the exception that it must be able to batch objects in the queue. In other words, I would like the producer to be able to put objects into the queue, but have the consumer block on take() untill the queue reaches a certain size (the batch size).

Then, once the queue has reached the batch size, the producer must block on put() untill the consumer has consumed all of the elements in the queue (in which case the producer will start producing again and the consumer block untill the batch is reached again).

Does a similar data structure exist? Or should I write it (which I don't mind), I just don't want to waste my time if there is something out there.


UPDATE

Maybe to clarify things a bit:

The situation will always be as follows. There can be multiple producers adding items to the queue, but there will never be more than one consumer taking items from the queue.

Now, the problem is that there are multiple of these setups in parallel and serial. In other words, producers produce items for multiple queues, while consumers in their own right can also be producers. This can be more easily thought of as a directed graph of producers, consumer-producers, and finally consumers.

The reason that producers should block until the queues are empty (@Peter Lawrey) is because each of these will be running in a thread. If you leave them to simply produce as space becomes available, you will end up with a situation where you have too many threads trying to process too many things at once.

Maybe coupling this with an execution service could solve the problem?

like image 392
Nico Huysamen Avatar asked Feb 27 '12 14:02

Nico Huysamen


People also ask

What is BlockingQueue in Java with example?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

Is Java BlockingQueue thread safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.

How is BlockingQueue implemented in Java?

Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.

What are the consumer methods available for a BlockingQueue?

Each consumer will take an element from a BlockingQueue using take() method so it will block until there is an element in a queue. After taking an Integer from a queue it checks if the message is a poison pill, if yes then execution of a thread is finished.


1 Answers

I would suggest you use BlockingQueue.drainTo(Collection, int). You can use it with take() to ensure you get a minimum number of elements.

The advantage of using this approach is that your batch size grows dynamically with the workload and the producer doesn't have to block when the consumer is busy. i.e. it self optimises for latency and throughput.


To implement exactly as asked (which I think is a bad idea) you can use a SynchronousQueue with a busy consuming thread.

i.e. the consuming thread does a

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

The producer will block when ever the consumer is busy.

like image 82
Peter Lawrey Avatar answered Nov 10 '22 02:11

Peter Lawrey