I have a Spring AMQP message listener running.
public class ConsumerService implements MessageListener {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
try {
testService.process(message); //This process method can throw Business Exception
} catch (BusinessException e) {
//Here we can just log the exception. How the retry attempt is made?
} catch (Exception e) {
//Here we can just log the exception. How the retry attempt is made?
}
}
}
As you can see, there could be exception coming out during process. I want to retry because of a particular error in Catch block. I cannot through exception in onMessage. How to tell RabbitMQ to there is an exception and retry?
Acknowledge the original message, and put the new message onto the queue. Then you can check the retries count each time the message goes to a worker for processing. If it fails, either create a new message with updated metadata or permanently fail the message by sending reject with requeue set to false.
In another tutorial we have explained the various exchange types and their implementation using Spring Boot. Define the pom. xml as follows- Add the spring-boot-starter-amqp dependency. Define the RabbitMQSender class which sends the message to the RabbitMQ using AmqpTemplate.
RabbitTemplate() Convenient constructor for use with setter injection. RabbitTemplate(ConnectionFactory connectionFactory) Create a rabbit template with default strategies and settings.
RabbitMQ provides a method for handling message failures in a really efficient way known as the Retry and Error Handling feature . What is Dead Message in RabbitMQ. If certain messages become undeliverable or unhandled even though when message received by the broker.
Here we enable the Spring Boot RabbitMQ retry mechanism and specify some more additional parameters: Initial interval: The message should be retried after an interval of 3s. Max-attempts: The message should be retried maximum of 6 times.
In this article we will implement a RabbitMQ Error Handling. Whenever any data in the message is transmitted that the receiver does not accept, or when a message is sent to a queue that does not exist. The message is retried and sent up to a set number of times.
If you instead want to send the failed message to a DLQ you will need either a RepublishMessageRecoverer (which publishes the failed message to a different Exchange/Queue) or a custom MessageRecoverer (which rejects the message without requeuing). In that latter case you should also set up a RabbitMQ DLQ on the queue as explained above.
Since onMessage()
doesn't allow to throw checked exceptions you can wrap the exception in a RuntimeException
and re-throw it.
try {
testService.process(message);
} catch (BusinessException e) {
throw new RuntimeException(e);
}
Note however that this may result in the message to be re-delivered indefinitely. Here is how this works:
RabbitMQ supports rejecting a message and asking the broker to requeue it. This is shown here. But RabbitMQ doesn't natively have a mechanism for retry policy, e.g. setting max retries, delay, etc.
When using Spring AMQP, "requeue on reject" is the default option. Spring's SimpleMessageListenerContainer
will by default do this when there is an unhandled exception. So in your case you just need to re-throw the caught exception. Note however that if you cannot process a message and you always throw the exception this will be re-delivered indefinitely and will result in an infinite loop.
You can override this behaviour per message by throwing a AmqpRejectAndDontRequeueException
exception, in which case the message will not be requeued.
You can also switch off the "requeue on reject" behavior of SimpleMessageListenerContainer
entirely by setting
container.setDefaultRequeueRejected(false)
When a message is rejected and not requeued it will either be lost or transferred to a DLQ, if one is set in RabbitMQ.
If you need a retry policy with max attempts, delay, etc the easiest is to setup a spring "stateless" RetryOperationsInterceptor
which will do all retries within the thread (using Thread.sleep()
) without rejecting the message on each retry (so without going back to RabbitMQ for each retry). When retries are exhausted, by default a warning will be logged and the message will be consumed. If you want to send to a DLQ you will need either a RepublishMessageRecoverer
or a custom MessageRecoverer
that rejects the message without requeuing (in that latter case you should also setup a RabbitMQ DLQ on the queue). Example with default message recoverer:
container.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.build()
});
This obviously has the drawback that you will occupy the Thread for the entire duration of the retries. You also have the option to use a "stateful" RetryOperationsInterceptor
, which will send the message back to RabbitMQ for each retry, but the delay will still be implemented with Thread.sleep()
within the application, plus setting up a stateful interceptor is a bit more complicated.
Therefore, if you want retries with delays without occupying a Thread
you will need a much more involved custom solution using TTL on RabbitMQ queues. If you don't want exponential backoff (so delay doesn't increase on each retry) it's a bit simpler. To implement such a solution you basically create another queue on rabbitMQ with arguments: "x-message-ttl": <delay time in milliseconds>
and "x-dead-letter-exchange":"<name of the original queue>"
. Then on the main queue you set "x-dead-letter-exchange":"<name of the queue with the TTL>"
. So now when you reject and don't requeue a message RabbitMQ will redirect it to the second queue. When TTL expires it will be redirected to the original queue and thus redelivered to the application. So now you need a retry interceptor that rejects the message to RabbitMQ after each failure and also keeps track of the retry count. To avoid the need to keep state in the application (because if your application is clustered you need to replicate state) you can calculate the retry count from the x-death
header that RabbitMQ sets. See more info about this header here. So at that point implementing a custom interceptor is easier than customising the Spring stateful interceptor with this behaviour.
Also check the section about retries in the Spring AMQP reference.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With