I have a JMS client which is producing messages and sending over a JMS queue to its unique consumer.
What I want is more than one consumer getting those messages. The first thing that comes to my mind is converting the queue to a topic, so current and new consumers can subscribe and get the same message delivered to all of them.
This will obviously involve modifying the current clients code in both producer and consumer side of things.
I would like to also look at other options like creating a second queue, so that I don't have to modify the existing consumer. I believe there are advantages in this approach like (correct me if I am wrong) balancing the load between two different queues rather than one, which might have a positive impact on performance.
I would like to get advise on these options and cons / pros that you might see. Any feedback is highly appreciated.
Support for multiple-consumer queues is a Message Queue feature (the JMS specification defines messaging behavior in the case of only one consumer accessing a queue). When multiple consumers access a queue, the load-balancing among them takes into account each consumer's capacity and message processing rate.
concurrency. The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime.
A message consumer is an object that is created by a session and used for receiving messages sent to a destination. It implements the MessageConsumer interface. A message consumer allows a JMS client to register interest in a destination with a JMS provider.
The JMS IQ Manager is set to fully concurrent FIFO processing. However, each Collaboration on each integration server retrieves messages as they come in, and is able to commit them unrestricted to the queue.
You have a few options as you stated.
If you convert it to a topic to get the same effect you will need to make the consumers persistent consumers. One thing the queue offers is persistence if your consumer isn't alive. This will depend on the MQ system you are using.
If you want to stick with queues, you will create a queue for each consumer and a dispatcher that will listen on the original queue.
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 -> Queue_Consumer_2 <- Consumer_2 -> Queue_Consumer_3 <- Consumer_3
Pros of Topics
Cons of Topics
Pros of Queues
Cons of Queues
When developing a Messaging System I prefer topics as it gives me the most power, but seeing as you are already using Queues it would require you to change how your system works to implement Topics instead.
Design and Implementation of Queue System with multiple consumers
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 -> Queue_Consumer_2 <- Consumer_2 -> Queue_Consumer_3 <- Consumer_3
Source
Keep in mind there are other things you'll need to take care of such as problem exception handling, reconnection to the connection and queues if you lose your connection, etc. This is just designed to give you an idea of how to accomplish what I described.
In a real system I probably wouldn't exit out at the first exception. I would allow the system to continue operating the best it could and log errors. As it stands in this code if putting a message in a single consumers queue fails, the whole dispatcher will stop.
Dispatcher.java
/* * To change this template, choose Tools | Templates * and open the template in the editor. */ package stackoverflow_4615895; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; public class Dispatcher { private static long QUEUE_WAIT_TIME = 1000; private boolean mStop = false; private QueueConnectionFactory mFactory; private String mSourceQueueName; private String[] mConsumerQueueNames; /** * Create a dispatcher * @param factory * The QueueConnectionFactory in which new connections, session, and consumers * will be created. This is needed to ensure the connection is associated * with the correct thread. * @param source * * @param consumerQueues */ public Dispatcher( QueueConnectionFactory factory, String sourceQueue, String[] consumerQueues) { mFactory = factory; mSourceQueueName = sourceQueue; mConsumerQueueNames = consumerQueues; } public void start() { Thread thread = new Thread(new Runnable() { public void run() { Dispatcher.this.run(); } }); thread.setName("Queue Dispatcher"); thread.start(); } public void stop() { mStop = true; } private void run() { QueueConnection connection = null; MessageProducer producer = null; MessageConsumer consumer = null; QueueSession session = null; try { // Setup connection and queues for receiving the messages connection = mFactory.createQueueConnection(); session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); Queue sourceQueue = session.createQueue(mSourceQueueName); consumer = session.createConsumer(sourceQueue); // Create a null producer allowing us to send messages // to any queue. producer = session.createProducer(null); // Create the destination queues based on the consumer names we // were given. Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; for (int index = 0; index < mConsumerQueueNames.length; ++index) { destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); } connection.start(); while (!mStop) { // Only wait QUEUE_WAIT_TIME in order to give // the dispatcher a chance to see if it should // quit Message m = consumer.receive(QUEUE_WAIT_TIME); if (m == null) { continue; } // Take the message we received and put // it in each of the consumers destination // queues for them to process for (Queue q : destinationQueues) { producer.send(q, m); } } } catch (JMSException ex) { // Do wonderful things here } finally { if (producer != null) { try { producer.close(); } catch (JMSException ex) { } } if (consumer != null) { try { consumer.close(); } catch (JMSException ex) { } } if (session != null) { try { session.close(); } catch (JMSException ex) { } } if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } } }
Main.java
QueueConnectionFactory factory = ...; Dispatcher dispatcher = new Dispatcher( factory, "Queue_Original", new String[]{ "Consumer_Queue_1", "Consumer_Queue_2", "Consumer_Queue_3"}); dispatcher.start();
You may not have to modify the code; it depends on how you wrote it.
For example, if your code sends messages using MessageProducer
rather than QueueSender
, then it will work for topics as well as queues. Similarly if you used MessageConsumer
rather than QueueReceiver
.
Essentially, it is good practice in JMS applications to use non-specific interfaces to interact with the JMS system, such as MessageProducer
, MessageConsumer
, Destination
, etc. If that's the case, it's a "mere" matter of configuration.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With