Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ delayed message when there is an exception

I'm using the plugin (https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq) and it works well. I send a message with X seconds of delay, and it's processed with X seconds of delay.

The problem is in the process logic. If it works well (happy path) I don't have issues. But if the process fails, what I expect is that the message is requeued to be processed with the same delay, but it is processed inmediately.

Is there a way to requeue the message automatically with the original specified delay in case of an exception?

like image 753
abullor Avatar asked Oct 26 '22 11:10

abullor


People also ask

How do you delay a message on RabbitMQ?

To delay a message, the user must publish it with the x-delay header, which accepts an integer representing the number of milliseconds the message should be delayed by RabbitMQ. It's worth noting that delay in this context means delaying message routing to queues or other exchanges.

What is delayed exchange in RabbitMQ?

The RabbitMQ delayed exchange plugin is used to implement a wait time between when a message reaches the exchange and when it is delivered to a queue. Every time a message is published, an offset in milliseconds can be specified.

Does RabbitMQ guarantee message order?

Message Ordering The RabbitMQ documentation states the following regarding its ordering guarantees: Messages published in one channel, passing through one exchange and one queue and one outgoing channel will be received in the same order that they were sent.


1 Answers

No; the delay is not applicable for retries; if message order is not important, you can re-publish the message to the tail of the queue.

Or, you can configure a retry interceptor with a fixed back off.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#retry

Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others). Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic. ...

EDIT

Using a RabbitTemplate instead of a message-driven listener:

@SpringBootApplication
@EnableScheduling
@EnableTransactionManagement
public class So69020120Application {

    public static void main(String[] args) {
        SpringApplication.run(So69020120Application.class, args);
    }

    @Autowired
    Processor processor;

    @Scheduled(fixedDelay = 5000)
    public void sched() {
        try {
            while (this.processor.process()) {
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    RabbitTransactionManager transactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            IntStream.range(0,  4).forEach(i -> template.convertAndSend("queue", "good"));
            template.convertAndSend("queue", "fail");
            IntStream.range(0,  4).forEach(i -> template.convertAndSend("queue", "good"));
        };
    }

    @Bean
    Queue queue() {
        return new Queue("queue");
    }

}

@Component
class Processor {

    private final RabbitTemplate template;

    private final AtomicBoolean fail = new AtomicBoolean(true);

    Processor(RabbitTemplate template) {
        this.template = template;
        template.setChannelTransacted(true);
    }

    @Transactional
    public boolean process() {
        String data = (String) template.receiveAndConvert("queue");
        if (data == null) {
            System.out.println("No More Messages");
            return false;
        }
        System.out.println(data);
        if (data.equals("fail") && this.fail.getAndSet(false)) {
            throw new RuntimeException("test");
        }
        return true;
    }

}
good
good
good
good
fail
java.lang.RuntimeException: test
    at com.example.demo.Processor.process(So69020120Application.java:86)
    at com.example.demo.Processor$$FastClassBySpringCGLIB$$6adeaa38.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.Processor$$EnhancerBySpringCGLIB$$bad30db1.process(<generated>)
    at com.example.demo.So69020120Application.sched(So69020120Application.java:36)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
fail
good
good
good
good
No More Messages
like image 94
Gary Russell Avatar answered Nov 17 '22 00:11

Gary Russell