Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting SQS dead letter queue to work with Spring Boot and JMS

I've been working on a small Spring Boot application that receives messages from Amazon SQS. However I foresee that processing these messages may fail, so that's why I thought adding a dead letter queue would be a good idea.

There is a problem though: when the processing fails (which I force by throwing an Exception for some of the messages) it is not reattempted later on and it's not moved to the dead letter queue. I am struggling to find the issue, since there doesn't seem to much info on it.

However if I look at Amazon's documentation, they seem to be able to do it, but without using the Spring Boot annotations. Is there any way I can make the code below work transactional without writing too much of the JMS code myself?

This is the current configuration that I am using.

@Configuration
public class AWSConfiguration {

    @Value("${aws.sqs.endpoint}")
    private String endpoint;

    @Value("${aws.iam.key}")
    private String iamKey;

    @Value("${aws.iam.secret}")
    private String iamSecret;

    @Value("${aws.sqs.queue}")
    private String queue;

    @Bean
    public JmsTemplate createJMSTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(getSQSConnectionFactory());
        jmsTemplate.setDefaultDestinationName(queue);
        jmsTemplate.setDeliveryPersistent(true);
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        return jmsTemplate;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(getSQSConnectionFactory());
        factory.setConcurrency("1-1");
        return factory;
    }

    @Bean
    public JmsTransactionManager jmsTransactionManager() {
        return new JmsTransactionManager(getSQSConnectionFactory());
    }

    @Bean
    public ConnectionFactory getSQSConnectionFactory() {
        return SQSConnectionFactory.builder()
                .withAWSCredentialsProvider(awsCredentialsProvider)
                .withEndpoint(endpoint)
                .withNumberOfMessagesToPrefetch(10).build();
    }

    private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
        @Override
        public AWSCredentials getCredentials() {
            return new BasicAWSCredentials(iamKey, iamSecret);
        }
        @Override
        public void refresh() {
        }
    };
}

And finally the receiving end:

@Service
public class QueueReceiver {

    private static final String EXPERIMENTAL_QUEUE = "${aws.sqs.queue}";

    @JmsListener(destination = EXPERIMENTAL_QUEUE)
    public void receiveSegment(String jsonSegment) throws IOException {
        Segment segment = Segment.fromJSON(jsonSegment);
        if(segment.shouldFail()) {
            throw new IOException("This segment is expected to fail");
        }
        System.out.println(segment.getText());
    }

}
like image 285
Bram Vandewalle Avatar asked Dec 14 '16 13:12

Bram Vandewalle


1 Answers

Spring Cloud AWS

You can greatly simplify your configuration by leveraging Spring Cloud AWS.

MessageHandler

@Service
public class MessageHandler {

    @SqsListener(value = "test-queue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void queueListener(String msg, Acknowledgment acknowledgment){
        System.out.println("message: " + msg);
        if(/*successful*/){
            acknowledgment.acknowledge();
        }
    }
}

The example shown above is all you need to receive messages. This assumes you've created an sqs queue with an associated dead letter queue. If you're messages aren't acknowledged, then they will be retried again until they reach the maximum # of receives. Then it will be forwarded to the dead letter queue.

like image 111
Kyle Anderson Avatar answered Oct 29 '22 22:10

Kyle Anderson