Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQS Message visibility timeout being set to 0 when exception is thrown and @JMSListener

I have a simple Spring Boot service that listens to an AWS SQS queue using JMSTemplate. Everything works as expected when the message is properly handled.

I am using CLIENT_ACKNOWLEDGE so when an exception is thrown during processing, the message is received again. However the Default Visibility Timeout setting on the SQS queue is being ignored and the message is being received again immediately.

The SQS queue is configured with a 30 second Default Visibility Timeout and a re-drive policy of 20 receives before putting the message on a DLQ.

I have disabled the service and used the SQS Console to verify that the Default Visibility Timeout is properly set. I have also tried adding the JMS Message to the method signature and performing manual validation.

Here is code for the JMS Configuration:

@Configuration
@EnableJms
class JmsConfig
{

    @Bean
    @Conditional(AWSEnvironmentCondition.class)
    public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
    {
        return new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .withRegion(Regions.fromName(awsRegion))
                                  .withCredentials(new DefaultAWSCredentialsProviderChain())
        );
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("3-10");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setErrorHandler(defaultErrorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler defaultErrorHandler()
    {
        return new ErrorHandler()
        {
            @Override
            public void handleError(Throwable throwable)
            {
                LOG.error("JMS message listener error: {}", throwable.getMessage());
            }
        };
    }

    @Bean
    public JmsTemplate defaultJmsTemplate(ConnectionFactory connectionFactory)
    {
        return new JmsTemplate(connectionFactory);
    }
}

And here is code for the Listener:

@Component
public class MessagingListener
{
    @Autowired
    private MessageService _messageService;

    @Autowired
    private Validator _validator;

    @JmsListener(destination = "myqueue")
    public void receiveMessage(String messageJson)
    {
        try
        {
            LOG.info("Received message");

            // The following line throws an IOException is the message is not JSON.
            MyMessage myMessage = MAPPER.readvalue(messageJson, MyMessage.class);

            Set<ConstraintViolation<MyMessage>> _validator.validate(myMessage);
            if (CollectionUtils.isNotEmpty(violations))
            {
                String errorMessage = violations.stream()
                        .map(v -> String.join(" : ", v.getPropertyPath().iterator().next().getName(),
                                v.getMessage()))
                LOG.error("Exception occurred while validating the model, details: {}", errorMessage)
                throw new ValidationException(errorMessage);
            }
        }
        catch (IOException e)
        {
            LOG.error("Error parsing message", e);
            throw new ValidationException("Error parsing message, details: " + e.getMessage());
        }
    }
}

When a message is placed on the SQS queue with either invalid JSON or JSON that that does not pass validation, the message is received 20 times very quickly and then ends up on the DLQ. What needs to be done so that the Default Visibility Timeout setting in SQS is respected?

like image 763
Robert P Avatar asked May 20 '19 21:05

Robert P


People also ask

What happens after SQS visibility timeout?

If the visibility timeout is 0 seconds, the message must be deleted within the same millisecond it was sent, or it is considered abandoned. This can cause Amazon SQS to include duplicate messages in the same response to a ReceiveMessage operation if the MaxNumberOfMessages parameter is greater than 1.

Can SQS lose a message?

An SQS Queue can also be configured with a Message Retention Period in seconds. This value specifies how long a message can stay on a queue before it is automatically deleted, regardless of its processing status. The retention period can be set between 60 seconds and 14 days, with the default at 4 days.

What will happen if you set the Amazon SQS message timer?

Message timers let you specify an initial invisibility period for a message added to a queue. For example, if you send a message with a 45-second timer, the message isn't visible to consumers for its first 45 seconds in the queue.

What is SQS receive message wait time?

For more information, see Amazon SQS short and long polling. For optimal message processing, use the following strategies: In most cases, you can set the ReceiveMessage wait time to 20 seconds. If 20 seconds is too long for your application, set a shorter ReceiveMessage wait time (1 second minimum).


1 Answers

In case of an exception, visibility timeout of the failed message is set to 0 via ChangeMessageVisibility so SQS will send this message immediately even though queue has a different visibilityTimeout setting.

How does that happen?

As you can see here, Spring JMS' AbstractMessageListenerContainer briefly does this:

try {
    invokeListener(session, message); // This is your @JMSListener method
}
catch (JMSException | RuntimeException | Error ex) {
    rollbackOnExceptionIfNecessary(session, ex);
    throw ex;
}
commitIfNecessary(session, message);

On rollbackOnExceptionIfNecessary method, session.recover() will be invoked because:

  1. session.getTransacted() will always be false since SQS does not support transactions. See here.
  2. isClientAcknowledge(session) will return true because you're using CLIENT_ACKNOWLEDGE mode.

And lastly recover() of SQSSession negative acknowledges the message, which means setting visibilityTimeout of that specific message to 0, causes SQS to try sending that message immediately.

The easiest way to override this behavior would be implementing a CustomJmsListenerContainerFactory & CustomMessageListenerContainer instead of using DefaultJmsListenerContainerFactory & DefaultMessageListenerContainer.

public class CustomMessageListenerContainer extends DefaultMessageListenerContainer {

    public CustomMessageListenerContainer() {
        super();
    }

    @Override
    protected void rollbackOnExceptionIfNecessary() {
        // do nothing, so that "visibilityTimeout" will stay same
    }

}

public class CustomJmsListenerContainerFactory {
    
    @Override
    protected DefaultMessageListenerContainer createContainerInstance() {
        return new CustomMesageListenerContainer();
    }
}

And make it a Spring bean either with @Component or just like you did in JmsConfig:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    // and set other stuff on factory
    return factory;
}

NOTE:
If your application is consuming other type of data sources along SQS with JMS, make sure to use different Container and ContainerFactory for them so that rollbackOnExceptionIfNecessary behaves as expected.

like image 59
sedooe Avatar answered Oct 04 '22 09:10

sedooe