Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is newly created client knows about older messages in hornetq?

Tags:

java

jms

hornetq

I have created session in hornetQ with one consumer, then I have added 4 messages in queue using producer. After this I have created new consumer.

Will this consumer knows about older messages?

If no, is it possible to configure it in XML?

I have create new consumer which was not able to get previous messages. I just wanted to confirm whether this behavior is correct or not? I didn't find any help in the documentation.

Below is the code snippet :

TextMessage receivedMessage = (TextMessage)consumer.receive(); 
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();  
System.out.println("Got order: " + receivedMessage.getText());


If I uncomment the consumer.close() line it works fine.
My hornetq-jms.xml

<connection-factory name="NettyConnectionFactory">
      <xa>true</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/XAConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/ConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyThroughputConnectionFactory">
      <xa>true</xa>
      <connectors>
         <connector-ref connector-name="netty-throughput"/>
      </connectors>
      <entries>
         <entry name="/XAThroughputConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyThroughputConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty-throughput"/>
      </connectors>
      <entries>
         <entry name="/ThroughputConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

Code snippet of connection factory

TransportConfiguration transportConfiguration = new
                TransportConfiguration(NettyConnectorFactory.class.getName());
        HornetQConnectionFactory cf =
                HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
        Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");

Code for getTransportConfiguration() :

private synchronized static TransportConfiguration getTransportConfiguration() {
        HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
        TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
        if(tc == null){
            Map<String, Object> connectionParams = new HashMap<String, Object>();
            connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
            connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
            tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
            transportConfigurationMap.put("machinename:5455", tc);
        }
        return tc;
like image 413
Sachin Avatar asked Dec 27 '22 08:12

Sachin


2 Answers

Yes it will know about your old messages. But on this case you had your old consumer still open, so that consumer will be caching messages on its buffer unless you close it, or you change consumer-window-size = 0.

Most Message Systems will cache ahead on the consumer so next time you call receive on the consumer the message will be ready to be received.

However, if your consumer is slow and you don't have that many messages the message will be on the client's buffer until you close that consumer.

For fast consumers in production the best is always to cache ahead as that will improve your throughput that would be limited by your network latency without caching.

On the HornetQ case you could cope with slow consumers by setting consumer-window-size=0.

http://docs.jboss.org/hornetq/2.3.0.beta3/docs/user-manual/html/flow-control.html#flow-control.consumer.window

In the case you are instantiating the connection factory through JNDI lookup:

   <connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty-connector"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>
      </entries>

      <!-- We set the consumer window size to 0, which means messages are not buffered at all
      on the client side -->
      <consumer-window-size>0</consumer-window-size>

   </connection-factory>

Or on the case of you directly instantiating your connection factory, you must set consumerWindowSize at the instance:

TransportConfiguration transportConfiguration = new
                TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
cf.setConsumerWindowSize(0) // <<<<<< here

This is a running example from the HornetQ distribution at examples/jms/no-consumer-buffering. It's exactly the same as your code snippet and it works every time:

 // Step 5. Create a JMS Session
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 // Step 6. Create a JMS Message Producer
 MessageProducer producer = session.createProducer(queue);

 // Step 7. Create a JMS MessageConsumer

 MessageConsumer consumer1 = session.createConsumer(queue);

 // Step 8. Start the connection

 connection.start();

 // Step 9. Send 10 messages to the queue

 final int numMessages = 10;

 for (int i = 0; i < numMessages; i++)
 {
    TextMessage message = session.createTextMessage("This is text message: " + i);

    producer.send(message);
 }


 System.out.println("Sent messages");

 // Step 10. Create another consumer on the same queue

 MessageConsumer consumer2 = session.createConsumer(queue);

 // Step 11. Consume three messages from consumer2

 for (int i = 0; i < 3; i++)
 {
    TextMessage message = (TextMessage)consumer2.receive(2000);

    System.out.println("Consumed message from consumer2: " + message.getText());
 }

And as you can see on this example old messages are being received.

Anything different than that is a misconfiguration from your system. Perhaps you didn't set the right connection factory?

BTW: On ActiveMQ you can manage the prefetch limit to manage the same behaviour:

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

This question is an exact duplication of JMS queue with multiple consumers

As for retroactive messages that's another feature on ActiveMQ that only applies to Topics, a subscription being created with old messages on it.

like image 179
Clebert Suconic Avatar answered Jan 31 '23 06:01

Clebert Suconic


The functionality you're looking for is provided through Durable subscriptions. This is part of standard JMS spec. I'm sure you will find it if you look through HornetQ documentation for your version. Also, here is a good example of a Java client using JMS Durable subscription in HornetQ.

like image 21
mazaneicha Avatar answered Jan 31 '23 06:01

mazaneicha