I've been working on a small Spring Boot application that receives messages from Amazon SQS. However I foresee that processing these messages may fail, so that's why I thought adding a dead letter queue would be a good idea.
There is a problem though: when the processing fails (which I force by throwing an Exception for some of the messages) it is not reattempted later on and it's not moved to the dead letter queue. I am struggling to find the issue, since there doesn't seem to much info on it.
However if I look at Amazon's documentation, they seem to be able to do it, but without using the Spring Boot annotations. Is there any way I can make the code below work transactional without writing too much of the JMS code myself?
This is the current configuration that I am using.
@Configuration
public class AWSConfiguration {
@Value("${aws.sqs.endpoint}")
private String endpoint;
@Value("${aws.iam.key}")
private String iamKey;
@Value("${aws.iam.secret}")
private String iamSecret;
@Value("${aws.sqs.queue}")
private String queue;
@Bean
public JmsTemplate createJMSTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(getSQSConnectionFactory());
jmsTemplate.setDefaultDestinationName(queue);
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
return jmsTemplate;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getSQSConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(getSQSConnectionFactory());
}
@Bean
public ConnectionFactory getSQSConnectionFactory() {
return SQSConnectionFactory.builder()
.withAWSCredentialsProvider(awsCredentialsProvider)
.withEndpoint(endpoint)
.withNumberOfMessagesToPrefetch(10).build();
}
private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(iamKey, iamSecret);
}
@Override
public void refresh() {
}
};
}
And finally the receiving end:
@Service
public class QueueReceiver {
private static final String EXPERIMENTAL_QUEUE = "${aws.sqs.queue}";
@JmsListener(destination = EXPERIMENTAL_QUEUE)
public void receiveSegment(String jsonSegment) throws IOException {
Segment segment = Segment.fromJSON(jsonSegment);
if(segment.shouldFail()) {
throw new IOException("This segment is expected to fail");
}
System.out.println(segment.getText());
}
}
You can greatly simplify your configuration by leveraging Spring Cloud AWS.
MessageHandler
@Service
public class MessageHandler {
@SqsListener(value = "test-queue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void queueListener(String msg, Acknowledgment acknowledgment){
System.out.println("message: " + msg);
if(/*successful*/){
acknowledgment.acknowledge();
}
}
}
The example shown above is all you need to receive messages. This assumes you've created an sqs queue with an associated dead letter queue. If you're messages aren't acknowledged, then they will be retried again until they reach the maximum # of receives. Then it will be forwarded to the dead letter queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With