Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get all enqueued messages in ActiveMQ?

Tags:

java

activemq

I want to build a simple consumer program (in java) to get all messages stocked in an ActiveMQ subject. I have a producer which send TextMessage in the queue.

But I don't know how to start to write my consumer for retrieving old messages and wait for new one.

If you have an example, thanks!

This is my Producer: http://pastebin.com/uRy9D8mY

This is my Consumer: http://pastebin.com/bZh4r66e

When I run my producer before my consumer, then run the consumer, I got nothing. When I run my consumer then my producer, I add 72 messages in the queue but my consumer got only 24 message...

like image 981
kdelemme Avatar asked May 29 '12 08:05

kdelemme


2 Answers

I suggest reading this tutorial (as does Apache ActiveMQ) SUN Jms tutorial

There are many ways to write JMS/ActiveMQ programs, using various frameworks such as Spring, or by using plain java.

Essentially, write a listener class like this:

public class MyListener implements MessageListener{
   public void onMessage(Message message){
      // Read and handle message here.
   }
}

Since you already are producing message, I assume you have connection up and running.

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer("MyQueue");
listener = new MyListener ();
consumer.setMessageListener(listener);
connection.start(); 
// At this point, messages should arrive from the queue to your listener.

Then there are some error handling code not included in this example, but you should be able to figure it out with the help of the tutorial and JMS documentation.

like image 178
Petter Nordlander Avatar answered Sep 20 '22 21:09

Petter Nordlander


Using the code given below you can read all the messages en-queued in the queue.

  • In this code the while loop is an unending loop it will iterate all the messages in the queue.
  • Once there is no messages in the queue it will wait for 5 seconds then automatically stops the connection and breaks the loop.

If you required an unending consumer which will read all the messages whenever newly added to the queue, then remove the else part, so the program will not terminate.

    ConnectionFactory factory =  new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection con = factory.createConnection();
    Session session =   con.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("tmp_queue2");
    MessageConsumer consumer = session.createConsumer(queue);
    con.start();     
    while (true) {     
        Message msg = consumer.receive(5000); 
        if (msg instanceof TextMessage) {
            TextMessage tm = (TextMessage) msg;
            System.out.println(tm.getText());       
        }
        else{
            System.out.println("Queue Empty"); 
            con.stop();
            break;
        }
    }

Hope this consumer program will helps people who were new to ActiveMQ.

like image 39
Karthikeyan KR Avatar answered Sep 22 '22 21:09

Karthikeyan KR