Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Ack or Nack in Spring AMQP

I am new to Spring AMQP. I am having an application which is a producer sending messages to the other application which is a consumer.

Once the consumer receives the message, we will do validation of the data.

If the data is proper we have to ACK and message should be removed from the Queue. If the data is improper we have to NACK(Negative Acknowledge) the data so that it will be re-queued in RabbitMQ.

I came across

**factory.setDefaultRequeueRejected(false);**( It will not requeue the message at all)

**factory.setDefaultRequeueRejected(true);**( It will requeue the message when exception occurs)

But my case i will acknowledge the message based on validation. Then it should remove the message. If NACK then requeue the message.

I have read in RabbitMQ website

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them

How to achieve the above scenario? Please provide me some examples.

I tried a small Program

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

Message is not re queuing for different exception factory.setDefaultRequeueRejected(true)

09:46:38,854 ERROR [stderr] (SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException: no processes deployed with key 'WF89012'

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1) Received from Error Queue: {ERROR=Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly}

like image 318
Chandan Avatar asked Sep 16 '16 11:09

Chandan


People also ask

How does Spring AMQP work?

The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container".

How does RabbitMQ Ack work?

Whether the mechanism is used is decided at the time consumer subscribes. Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit ("manual") client acknowledgement is received.

What is RabbitAdmin?

The RabbitAdmin component can declare exchanges, queues and bindings on startup. It does this lazily, through a ConnectionListener, so if the broker is not present on startup it doesn't matter.

What is RabbitTemplate?

RabbitTemplate() Convenient constructor for use with setter injection. RabbitTemplate​(ConnectionFactory connectionFactory) Create a rabbit template with default strategies and settings.


1 Answers

See the documentation.

By default, (with defaultRequeueRejected=true) the container will ack the message (causing it to be removed) if the listener exits normally or reject (and requeue) it if the listener throws an exception.

If the listener (or error handler) throws an AmqpRejectAndDontRequeueException, the default behavior is overridden and the message is discarded (or routed to a DLX/DLQ if so configured) - the container calls basicReject(false) instead of basicReject(true).

So, if your validation fails, throw an AmqpRejectAndDontRequeueException. Or, configure your listener with a custom error handler to convert your exception to an AmqpRejectAndDontRequeueException.

That is described in this answer.

If you really want to take responsibility for acking yourself, set the acknowledge mode to MANUAL and use a ChannelAwareMessageListener or this technique if you are using a @RabbitListener.

But most people just let the container take care of things (once they understand what's going on). Generally, using manual acks is for special use cases, such as deferring acks, or early acking.

EDIT

There was a mistake in the answer I pointed you to (now fixed); you have to look at the cause of the ListenerExecutionFailedException. I just tested this and it works as expected...

@SpringBootApplication
public class So39530787Application {

    private static final String QUEUE = "So39530787";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend(QUEUE, "foo");
        template.convertAndSend(QUEUE, "bar");
        template.convertAndSend(QUEUE, "baz");
        So39530787Application bean = context.getBean(So39530787Application.class);
        bean.latch.await(10, TimeUnit.SECONDS);
        System.out.println("Expect 1 foo:"  + bean.fooCount);
        System.out.println("Expect 3 bar:"  + bean.barCount);
        System.out.println("Expect 1 baz:"  + bean.bazCount);
        context.close();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
        return factory;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, false, false, true);
    }
    private int fooCount;

    private int barCount;

    private int bazCount;

    private final CountDownLatch latch = new CountDownLatch(5);

    @RabbitListener(queues = QUEUE)
    public void handle(String in) throws Exception {
        System.out.println(in);
        latch.countDown();
        if ("foo".equals(in) && ++this.fooCount < 3) {
            throw new FooException();
        }
        else if ("bar".equals(in) && ++this.barCount < 3) {
            throw new BarException();
        }
        else if ("baz".equals(in)) {
            this.bazCount++;
        }
    }

    @SuppressWarnings("serial")
    public static class FooException extends Exception { }

    @SuppressWarnings("serial")
    public static class BarException extends Exception { }

}

Result:

Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1
like image 127
Gary Russell Avatar answered Oct 15 '22 02:10

Gary Russell