Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to ask RabbitMQ to retry when business Exception occurs in Spring Asynchronous MessageListener use case

I have a Spring AMQP message listener running.

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

As you can see, there could be exception coming out during process. I want to retry because of a particular error in Catch block. I cannot through exception in onMessage. How to tell RabbitMQ to there is an exception and retry?

like image 967
Santosh Avatar asked May 02 '16 10:05

Santosh


People also ask

How do you handle error in RabbitMQ?

Acknowledge the original message, and put the new message onto the queue. Then you can check the retries count each time the message goes to a worker for processing. If it fails, either create a new message with updated metadata or permanently fail the message by sending reject with requeue set to false.

How does RabbitMQ connect to spring boot?

In another tutorial we have explained the various exchange types and their implementation using Spring Boot. Define the pom. xml as follows- Add the spring-boot-starter-amqp dependency. Define the RabbitMQSender class which sends the message to the RabbitMQ using AmqpTemplate.

What is RabbitTemplate?

RabbitTemplate() Convenient constructor for use with setter injection. RabbitTemplate​(ConnectionFactory connectionFactory) Create a rabbit template with default strategies and settings.

What is Retry message in RabbitMQ?

RabbitMQ provides a method for handling message failures in a really efficient way known as the Retry and Error Handling feature . What is Dead Message in RabbitMQ. If certain messages become undeliverable or unhandled even though when message received by the broker.

What are the RabbitMQ retry parameters in Spring Boot?

Here we enable the Spring Boot RabbitMQ retry mechanism and specify some more additional parameters: Initial interval: The message should be retried after an interval of 3s. Max-attempts: The message should be retried maximum of 6 times.

What is RabbitMQ error handling?

In this article we will implement a RabbitMQ Error Handling. Whenever any data in the message is transmitted that the receiver does not accept, or when a message is sent to a queue that does not exist. The message is retried and sent up to a set number of times.

How to send a failed message to a RabbitMQ DLQ?

If you instead want to send the failed message to a DLQ you will need either a RepublishMessageRecoverer (which publishes the failed message to a different Exchange/Queue) or a custom MessageRecoverer (which rejects the message without requeuing). In that latter case you should also set up a RabbitMQ DLQ on the queue as explained above.


1 Answers

Since onMessage() doesn't allow to throw checked exceptions you can wrap the exception in a RuntimeException and re-throw it.

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

Note however that this may result in the message to be re-delivered indefinitely. Here is how this works:

RabbitMQ supports rejecting a message and asking the broker to requeue it. This is shown here. But RabbitMQ doesn't natively have a mechanism for retry policy, e.g. setting max retries, delay, etc.

When using Spring AMQP, "requeue on reject" is the default option. Spring's SimpleMessageListenerContainer will by default do this when there is an unhandled exception. So in your case you just need to re-throw the caught exception. Note however that if you cannot process a message and you always throw the exception this will be re-delivered indefinitely and will result in an infinite loop.

You can override this behaviour per message by throwing a AmqpRejectAndDontRequeueException exception, in which case the message will not be requeued.

You can also switch off the "requeue on reject" behavior of SimpleMessageListenerContainer entirely by setting

container.setDefaultRequeueRejected(false) 

When a message is rejected and not requeued it will either be lost or transferred to a DLQ, if one is set in RabbitMQ.

If you need a retry policy with max attempts, delay, etc the easiest is to setup a spring "stateless" RetryOperationsInterceptor which will do all retries within the thread (using Thread.sleep()) without rejecting the message on each retry (so without going back to RabbitMQ for each retry). When retries are exhausted, by default a warning will be logged and the message will be consumed. If you want to send to a DLQ you will need either a RepublishMessageRecoverer or a custom MessageRecoverer that rejects the message without requeuing (in that latter case you should also setup a RabbitMQ DLQ on the queue). Example with default message recoverer:

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

This obviously has the drawback that you will occupy the Thread for the entire duration of the retries. You also have the option to use a "stateful" RetryOperationsInterceptor, which will send the message back to RabbitMQ for each retry, but the delay will still be implemented with Thread.sleep() within the application, plus setting up a stateful interceptor is a bit more complicated.

Therefore, if you want retries with delays without occupying a Thread you will need a much more involved custom solution using TTL on RabbitMQ queues. If you don't want exponential backoff (so delay doesn't increase on each retry) it's a bit simpler. To implement such a solution you basically create another queue on rabbitMQ with arguments: "x-message-ttl": <delay time in milliseconds> and "x-dead-letter-exchange":"<name of the original queue>". Then on the main queue you set "x-dead-letter-exchange":"<name of the queue with the TTL>". So now when you reject and don't requeue a message RabbitMQ will redirect it to the second queue. When TTL expires it will be redirected to the original queue and thus redelivered to the application. So now you need a retry interceptor that rejects the message to RabbitMQ after each failure and also keeps track of the retry count. To avoid the need to keep state in the application (because if your application is clustered you need to replicate state) you can calculate the retry count from the x-death header that RabbitMQ sets. See more info about this header here. So at that point implementing a custom interceptor is easier than customising the Spring stateful interceptor with this behaviour.

Also check the section about retries in the Spring AMQP reference.

like image 78
Nazaret K. Avatar answered Sep 17 '22 14:09

Nazaret K.