I am trying to implement JMS in my project. I am using active mq as the provider, and using persistent queues. Following is the code to retrieve elements from active mq
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ObjectMessage obj = (ObjectMessage) consumer.receiveNoWait();
This code returns data sometimes and sometimes it returns null, even when I can see number of pending messages as non -zero in active mq admin console. I read a bunch of articles, and few ppl mentioned that JMS api does not mandates that you will get the element everytime, and you will have to code accordingly. Since in my scenario, i am depending on the queue, as soon as queue returns null, i kill the process, so i modified the code in the following way
Instead of calling receiveNoWait, I started using receive after checking whether element is present in queue, via queue browser. Following is the modified code
public static <T> T retrieveObjectFromQueue(Queue queue, Class<T> clazz) {
synchronized (queue) {
if(!queueHasMoreElements(queue))
return null;
Connection conn = null;
Session session = null;
try {
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ObjectMessage obj = (ObjectMessage) consumer.receive();
return clazz.cast(obj.getObject());
} catch(Exception e) {
throw new RuntimeException(e);
}finally {
closeSessionAndConnection(session, conn);
}
}
public static boolean queueHasMoreElements(Queue queue) {
Connection conn = null;
Session session = null;
try {
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
return enumeration.hasMoreElements();
} catch(Exception e) {
throw new RuntimeException(e);
}finally {
closeSessionAndConnection(session, conn);
}
Now my code gets stuck after processing around 20-30 elements, and again, I can see pending elements in the admin console. When i tried using debug mode, i realised, that after retrieving 20-30 elements, my code gets stuck at consumer.receive(), which is expected in case queue is empty, but when i check my admin console, it shows a lot of elements in the queue.
I am using jdbc(mysql) as persistent store for activemq. The configuration xml that I am using is as given in activemq configuration examples (activemq/examples/conf/activemq-jdbc-performance.xml)
I am using tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 as activemq url.
Please let me know what i am doing wrong. I am using java8 and apache activeMq 5.13.1
Thanks
The JMS specification also does not mandate that a QueueBrowser will return all messages in a Queue, you might only get a snapshot of what is there when it starts depending on a lot of factors.
You are trying to impose semantics on JMS that it does not guarantee. You can try setting the prefetch to zero which will cause the client to poll the broker for a message and wait until the broker tells there is or isn't one. You might still get nothing if the message hasn't yet hit the Queue when you poll though, that's just something you need to handle.
You can also use the timed receive method and impose a timeout that you are willing to wait before returning and terminating your application.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With