Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple blocking queues, single consumer

I have multiple BlockingQueues containing messages to be sent. Is it possible to have fewer consumers than queues? I don't want to loop over the queues and keep polling them (busy waiting) and I don't want a thread for every queue. Instead, I would like to have one thread that is awoken when a message is available on any of the queues.

like image 629
Alex Avatar asked Mar 06 '12 16:03

Alex


People also ask

What is a concurrent blocking queue?

concurrent. BlockingQueue , represents a queue which is thread safe to put elements into, and take elements out of from. In other words, multiple threads can be inserting and taking elements concurrently from a Java BlockingQueue , without any concurrency issues arising.

Is array blocking queue thread safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.

What happens if blocking queue is full?

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

How is blocking queue implemented?

Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.


3 Answers

The LinkedBlockingMultiQueue does what you are asking for. It does not allow the consumer to block on arbitrary BlockingQueues, but it is possible to create "sub queues" from a single "multi queue" and achieve the same effect. Producers offer in the sub queues, and consumers can block themselves polling the single multi queue, waiting for any element.

It also supports priorities, that is, taking elements from some queues before considering others.

Example:

LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);

Then you can offer and poll:

sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"

Disclaimer: I am the author of the library.

like image 198
Mariano Barrios Avatar answered Oct 20 '22 07:10

Mariano Barrios


One trick that you could do is to have a queue of queues. So what you'd do is have a single blocking queue which all threads subscribe to. Then when you enqueue something into one of your BlockingQueues, you also enqueue your blocking queue on this single queue. So you would have something like:

BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();

Then when you get a new work item:

void addWorkItem(int queueIndex, WorkItem workItem) {
    assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
    //Note: You may want to make the two operations a single atomic operation
    producers[queueIndex].add(workItem);
    producerProducer.add(producers[queueIndex]);
}

Now your consumers can all block on the producerProducer. I am not sure how valuable this strategy would be, but it does accomplish what you want.

like image 44
mindvirus Avatar answered Oct 20 '22 08:10

mindvirus


Polymorphism

This can be solved using polymorphism, imagine you have two jobs called JobOne and JobTwo and you are going to consume these by a consumer. You should define an interface called jobs like:

interface Jobs {
    ...
}

And then you can implement you jobs by this interface like:

class JobOne implements Job {
    ...
}

class JobTwo implements Job {
    ...
}

Now you can use Jobs for your blocking queue to add two kinds of jobs to consume.

BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();
like image 1
Amir Fo Avatar answered Oct 20 '22 07:10

Amir Fo