Assume there are 1 producer P and 2 consumers C1 and C2. And there are 2 queues Q1 and Q2, both with a specific capacity.
P will produce items and put it into Q1 and Q2 alternately. Item is produced for specific consumer and cannot be consumed by other consumers. How can I implement the following in Java: After I start the 3 threads, if Q1 is empty, thread C1 is blocked until it is notified when there is something in Q1. So is Q2. And P will be blocked when both Q1 and Q2 are full until it is notified when either Q1 or Q2 is not full.
I was thinking to use BlockingQueue, which will block a consumer when its queue is empty. But the problem is when either of queues is full, the producer will be blocked. Is there any data structure in Java we can use to solve this problem?
Update
I've got a solution myself, but I'm not sure whether it is efficient. We can still have 2 BlockingQueues. And when consumer take item from its queue, it uses BlockingQueue.take()
, so it will be blocked when there is no item in the queue. When producer add item to either queue, it uses BlockingQueue.offer()
. So that it will never be blocked by this operation and will get 'false' if the queue is full. In addition, we keep a AtomicInteger to indicate the number of queues which is not full. Every time when producer P want to put an item in a queue, if it get false return, we decrease the AtomicInteger by 1. When it reaches 0, the producer call AtomicInteger.wait()
. Every time when a consumer takes an item from its queue, it also examines the AtomicInteger. When it is 0, the consumer increase it by 1 and call AtomicInteger.notify()
.
Please let me know whether this solution makes sense.
Thanks a lot!
Support for multiple-consumer queues is a Message Queue feature (the JMS specification defines messaging behavior in the case of only one consumer accessing a queue). When multiple consumers access a queue, the load-balancing among them takes into account each consumer's capacity and message processing rate.
Solution: The producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again. In the same manner, the consumer can go to sleep if it finds the buffer to be empty.
Yes, we can have multiple producers for single queue. Multiple producers can also publish messages at the same time.
Re: Multiple producer one consumerYes, you can.
Have you considered a Striped Executor Service. This will allow you to solve your problem and put your consumers into a pool which would be much more efficient.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With