I have multiple BlockingQueues containing messages to be sent. Is it possible to have fewer consumers than queues? I don't want to loop over the queues and keep polling them (busy waiting) and I don't want a thread for every queue. Instead, I would like to have one thread that is awoken when a message is available on any of the queues.
concurrent. BlockingQueue , represents a queue which is thread safe to put elements into, and take elements out of from. In other words, multiple threads can be inserting and taking elements concurrently from a Java BlockingQueue , without any concurrency issues arising.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.
Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.
The LinkedBlockingMultiQueue does what you are asking for. It does not allow the consumer to block on arbitrary BlockingQueues, but it is possible to create "sub queues" from a single "multi queue" and achieve the same effect. Producers offer in the sub queues, and consumers can block themselves polling the single multi queue, waiting for any element.
It also supports priorities, that is, taking elements from some queues before considering others.
Example:
LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);
Then you can offer and poll:
sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"
Disclaimer: I am the author of the library.
One trick that you could do is to have a queue of queues. So what you'd do is have a single blocking queue which all threads subscribe to. Then when you enqueue something into one of your BlockingQueues, you also enqueue your blocking queue on this single queue. So you would have something like:
BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();
Then when you get a new work item:
void addWorkItem(int queueIndex, WorkItem workItem) {
assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
//Note: You may want to make the two operations a single atomic operation
producers[queueIndex].add(workItem);
producerProducer.add(producers[queueIndex]);
}
Now your consumers can all block on the producerProducer. I am not sure how valuable this strategy would be, but it does accomplish what you want.
This can be solved using polymorphism, imagine you have two jobs called JobOne
and JobTwo
and you are going to consume these by a consumer. You should define an interface called jobs like:
interface Jobs {
...
}
And then you can implement you jobs by this interface like:
class JobOne implements Job {
...
}
class JobTwo implements Job {
...
}
Now you can use Jobs
for your blocking queue to add two kinds of jobs to consume.
BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With