I have created session in hornetQ with one consumer, then I have added 4 messages in queue using producer. After this I have created new consumer.
Will this consumer knows about older messages?
If no, is it possible to configure it in XML?
I have create new consumer which was not able to get previous messages. I just wanted to confirm whether this behavior is correct or not? I didn't find any help in the documentation.
Below is the code snippet :
TextMessage receivedMessage = (TextMessage)consumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
If I uncomment the consumer.close() line it works fine.
My hornetq-jms.xml
<connection-factory name="NettyConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/XAConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/ThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
Code snippet of connection factory
TransportConfiguration transportConfiguration = new
TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");
Code for getTransportConfiguration() :
private synchronized static TransportConfiguration getTransportConfiguration() {
HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
if(tc == null){
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
transportConfigurationMap.put("machinename:5455", tc);
}
return tc;
Yes it will know about your old messages. But on this case you had your old consumer still open, so that consumer will be caching messages on its buffer unless you close it, or you change consumer-window-size = 0.
Most Message Systems will cache ahead on the consumer so next time you call receive on the consumer the message will be ready to be received.
However, if your consumer is slow and you don't have that many messages the message will be on the client's buffer until you close that consumer.
For fast consumers in production the best is always to cache ahead as that will improve your throughput that would be limited by your network latency without caching.
On the HornetQ case you could cope with slow consumers by setting consumer-window-size=0.
http://docs.jboss.org/hornetq/2.3.0.beta3/docs/user-manual/html/flow-control.html#flow-control.consumer.window
In the case you are instantiating the connection factory through JNDI lookup:
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<!-- We set the consumer window size to 0, which means messages are not buffered at all
on the client side -->
<consumer-window-size>0</consumer-window-size>
</connection-factory>
Or on the case of you directly instantiating your connection factory, you must set consumerWindowSize at the instance:
TransportConfiguration transportConfiguration = new
TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
cf.setConsumerWindowSize(0) // <<<<<< here
This is a running example from the HornetQ distribution at examples/jms/no-consumer-buffering. It's exactly the same as your code snippet and it works every time:
// Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);
// Step 7. Create a JMS MessageConsumer
MessageConsumer consumer1 = session.createConsumer(queue);
// Step 8. Start the connection
connection.start();
// Step 9. Send 10 messages to the queue
final int numMessages = 10;
for (int i = 0; i < numMessages; i++)
{
TextMessage message = session.createTextMessage("This is text message: " + i);
producer.send(message);
}
System.out.println("Sent messages");
// Step 10. Create another consumer on the same queue
MessageConsumer consumer2 = session.createConsumer(queue);
// Step 11. Consume three messages from consumer2
for (int i = 0; i < 3; i++)
{
TextMessage message = (TextMessage)consumer2.receive(2000);
System.out.println("Consumed message from consumer2: " + message.getText());
}
And as you can see on this example old messages are being received.
Anything different than that is a misconfiguration from your system. Perhaps you didn't set the right connection factory?
BTW: On ActiveMQ you can manage the prefetch limit to manage the same behaviour:
http://activemq.apache.org/what-is-the-prefetch-limit-for.html
This question is an exact duplication of JMS queue with multiple consumers
As for retroactive messages that's another feature on ActiveMQ that only applies to Topics, a subscription being created with old messages on it.
The functionality you're looking for is provided through Durable subscriptions. This is part of standard JMS spec. I'm sure you will find it if you look through HornetQ documentation for your version. Also, here is a good example of a Java client using JMS Durable subscription in HornetQ.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With