Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring AMQP (Rabbit) Listener goes in a loop in case of exception

@Bean
RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setMessageConverter(messageConverter);
    template.setExchange(amqpProperties.getRabbitMqTopicExchangeName());
    return template;
}

@Bean
@Conditional (OperationsCondition.class)
 SimpleMessageListenerContainer opsMessageListenerContainer() {
    return listenerContainer(amqpProperties.getRabbitMqOperationsQueue(), 
            amqpProperties.getInitialRabbitOperationsConsumerCount(), 
            amqpProperties.getMaximumRabbitOperationsConsumerCount(),
            opsReceiver());
}

@Bean
@Conditional (OperationsCondition.class)
OperationsListener opsReceiver() {
    return new OperationsListener();
}

private SimpleMessageListenerContainer listenerContainer(String queue,
        int initConsumers,int maxConsumers, MessageListener listener)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueueNames(queue);
    container.setMessageListener(listener);
    container.setConcurrentConsumers(initConsumers);
    container.setMaxConcurrentConsumers(maxConsumers);
    container.setMessageConverter(messageConverter);
    return container;
}

Message listener is:

public class OperationsListener  implements MessageListener
{
    public static final Logger logger = Logger.getInstance(OperationsListener.class);

    @Autowired (required=true)
    private OperationsProcessor processor;
    @Autowired (required=true)
    private ObjectMapper objectMapper;

    public void onMessage(Message message)
    {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();        
        converter.setJsonObjectMapper(objectMapper);
        OperationsMessage request = (OperationsMessage)converter.fromMessage(message);
        processor.createMessage(request);
        //This is throwing a JPA database exception
        processor.createOperation(request);
    }
}

processor.createOperation() is throwing an exception due to database issue. Problem is that message listener is going in a loop and message keeps coming back.

My processor class:

@Component
@Transactional (propagation = Propagation.REQUIRES_NEW) 
public class OperationsProcessor
{
...............

    public void createOperation(OperationsMessage message)
    {
            try
            {
                .............
                .............
                //this call throws exception.
                opsRepo.create(operation,null);
            }
            catch (Exception e)
            {
                logger.error(e);
            }

    }
}

opsRepo.create throws an exception. Even though i am catching error, i was hoping that message doesn't gets sent again by spring amqp. Not sure why same message keeps coming back.

EDIT:

I think i found some pointers on how to deal with this. The cause is that spring is requeing events upon failure and this is the default nature. Found an helpful thread here and here.

like image 312
Sannu Avatar asked Mar 24 '17 21:03

Sannu


1 Answers

To confirm what you have found, this is clearly documented in the reference manual section "Exception Handling".

like image 155
Gary Russell Avatar answered Sep 30 '22 15:09

Gary Russell