I am interested in a data structure identical to the Java BlockingQueue, with the exception that it must be able to batch objects in the queue. In other words, I would like the producer to be able to put objects into the queue, but have the consumer block on take()
untill the queue reaches a certain size (the batch size).
Then, once the queue has reached the batch size, the producer must block on put()
untill the consumer has consumed all of the elements in the queue (in which case the producer will start producing again and the consumer block untill the batch is reached again).
Does a similar data structure exist? Or should I write it (which I don't mind), I just don't want to waste my time if there is something out there.
UPDATE
Maybe to clarify things a bit:
The situation will always be as follows. There can be multiple producers adding items to the queue, but there will never be more than one consumer taking items from the queue.
Now, the problem is that there are multiple of these setups in parallel and serial. In other words, producers produce items for multiple queues, while consumers in their own right can also be producers. This can be more easily thought of as a directed graph of producers, consumer-producers, and finally consumers.
The reason that producers should block until the queues are empty (@Peter Lawrey) is because each of these will be running in a thread. If you leave them to simply produce as space becomes available, you will end up with a situation where you have too many threads trying to process too many things at once.
Maybe coupling this with an execution service could solve the problem?
BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.
Each consumer will take an element from a BlockingQueue using take() method so it will block until there is an element in a queue. After taking an Integer from a queue it checks if the message is a poison pill, if yes then execution of a thread is finished.
I would suggest you use BlockingQueue.drainTo(Collection, int). You can use it with take() to ensure you get a minimum number of elements.
The advantage of using this approach is that your batch size grows dynamically with the workload and the producer doesn't have to block when the consumer is busy. i.e. it self optimises for latency and throughput.
To implement exactly as asked (which I think is a bad idea) you can use a SynchronousQueue with a busy consuming thread.
i.e. the consuming thread does a
list.clear();
while(list.size() < required) list.add(queue.take());
// process list.
The producer will block when ever the consumer is busy.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With