I want to build a simple consumer program (in java) to get all messages stocked in an ActiveMQ subject. I have a producer which send TextMessage in the queue.
But I don't know how to start to write my consumer for retrieving old messages and wait for new one.
If you have an example, thanks!
This is my Producer: http://pastebin.com/uRy9D8mY
This is my Consumer: http://pastebin.com/bZh4r66e
When I run my producer before my consumer, then run the consumer, I got nothing. When I run my consumer then my producer, I add 72 messages in the queue but my consumer got only 24 message...
I suggest reading this tutorial (as does Apache ActiveMQ) SUN Jms tutorial
There are many ways to write JMS/ActiveMQ programs, using various frameworks such as Spring, or by using plain java.
Essentially, write a listener class like this:
public class MyListener implements MessageListener{
public void onMessage(Message message){
// Read and handle message here.
}
}
Since you already are producing message, I assume you have connection up and running.
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer("MyQueue");
listener = new MyListener ();
consumer.setMessageListener(listener);
connection.start();
// At this point, messages should arrive from the queue to your listener.
Then there are some error handling code not included in this example, but you should be able to figure it out with the help of the tutorial and JMS documentation.
Using the code given below you can read all the messages en-queued in the queue.
If you required an unending consumer which will read all the messages whenever newly added to the queue, then remove the else part, so the program will not terminate.
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("tmp_queue2");
MessageConsumer consumer = session.createConsumer(queue);
con.start();
while (true) {
Message msg = consumer.receive(5000);
if (msg instanceof TextMessage) {
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText());
}
else{
System.out.println("Queue Empty");
con.stop();
break;
}
}
Hope this consumer program will helps people who were new to ActiveMQ.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With