I have a single threaded ActiveMQ consumer written in Java. All I'm trying to do is receive() a messsage from the queue, attempt to send it to a web service, and if it succeeds acknowledge() it. If the web service call fails, I want the message to stay on the queue and be resent after some timeout.
It's more or less working, except for the resending part: each time I restart my consumer, it gets one message for each that's still on the queue, but after failing to send them, the messages are never resent.
My code looks like:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(!client.sendMessage(msg)) {
System.out.println("Webservice call failed. Keeping message");
//message.
} else {
message.acknowledge();
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
I'm not currently using transactions (maybe I should be?).
I'm sure I'm missing something easy and will be slapping my forehead soon but I can't seem to figure out how this is supposed to work. Thanks!
EDIT: Can't answer this myself as not enough rep:
OK, after some more experimentation, it turns out transactions are the only way to do this. Here is the new code:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(client.sendMessage(msg)) {
if(transacted) {
System.out.println("Call succeeded - committing message");
session.commit();
}
//message.acknowledge();
} else {
if(transacted) {
System.out.println("Webservice call failed. Rolling back message");
session.rollback();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Now, the message is being resent every 1000ms as specified in the Redelivery Policy.
Hope this helps someone else! :)
You don't have to use transactions, CLIENT_ACK/Session.recover() will work as well...
Messages are redelivered to a client when any of the following occurs:
- A transacted session is used and rollback() is called.
- A transacted session is closed before commit is called.
- A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
see http://activemq.apache.org/message-redelivery-and-dlq-handling.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With