@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setMessageConverter(messageConverter);
template.setExchange(amqpProperties.getRabbitMqTopicExchangeName());
return template;
}
@Bean
@Conditional (OperationsCondition.class)
SimpleMessageListenerContainer opsMessageListenerContainer() {
return listenerContainer(amqpProperties.getRabbitMqOperationsQueue(),
amqpProperties.getInitialRabbitOperationsConsumerCount(),
amqpProperties.getMaximumRabbitOperationsConsumerCount(),
opsReceiver());
}
@Bean
@Conditional (OperationsCondition.class)
OperationsListener opsReceiver() {
return new OperationsListener();
}
private SimpleMessageListenerContainer listenerContainer(String queue,
int initConsumers,int maxConsumers, MessageListener listener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queue);
container.setMessageListener(listener);
container.setConcurrentConsumers(initConsumers);
container.setMaxConcurrentConsumers(maxConsumers);
container.setMessageConverter(messageConverter);
return container;
}
Message listener is:
public class OperationsListener implements MessageListener
{
public static final Logger logger = Logger.getInstance(OperationsListener.class);
@Autowired (required=true)
private OperationsProcessor processor;
@Autowired (required=true)
private ObjectMapper objectMapper;
public void onMessage(Message message)
{
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(objectMapper);
OperationsMessage request = (OperationsMessage)converter.fromMessage(message);
processor.createMessage(request);
//This is throwing a JPA database exception
processor.createOperation(request);
}
}
processor.createOperation() is throwing an exception due to database issue. Problem is that message listener is going in a loop and message keeps coming back.
My processor class:
@Component
@Transactional (propagation = Propagation.REQUIRES_NEW)
public class OperationsProcessor
{
...............
public void createOperation(OperationsMessage message)
{
try
{
.............
.............
//this call throws exception.
opsRepo.create(operation,null);
}
catch (Exception e)
{
logger.error(e);
}
}
}
opsRepo.create throws an exception. Even though i am catching error, i was hoping that message doesn't gets sent again by spring amqp. Not sure why same message keeps coming back.
EDIT:
I think i found some pointers on how to deal with this. The cause is that spring is requeing events upon failure and this is the default nature. Found an helpful thread here and here.
To confirm what you have found, this is clearly documented in the reference manual section "Exception Handling".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With