Is it possible to extend the visibility time out of a message that is in flight.
See:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html.
Section: Changing a Message's Visibility Timeout.
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html#changeMessageVisibility-com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest-
In summary I want to be able to extend the first set visibility timeout for a given message that is in flight.
Example if 15secs have passed I then want to extend the timeout by another 20secs. Better example in java docs above.
From my understanding in the links above you can do this on the amazon side.
Below are my current settings;
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(queue);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
adapter.setMaxNumberOfMessages(1);
adapter.setSendTimeout(2000);
adapter.setVisibilityTimeout(200);
adapter.setWaitTimeOut(20);
Is it possible to extend this timeout?
OK. Looks like I see your point.
We can change visibility for particular message using API:
AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)
For this purpose in downstream flow you have to get access to (inject) AmazonSQS
bean and extract special headers from the Message
:
@Autowired
AmazonSQS amazonSqs;
@Autowired
ResourceIdResolver resourceIdResolver;
...
MessageHeaders headers = message.getHeaders();
DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);
String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));
String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);
amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);
But eh, I agree that we should provide something on the matter as out-of-the-box feature. That may be even something similar to QueueMessageAcknowledgment
as a new header. Or even just one more changeMessageVisibility
method to this one.
Please, raise a GH issue for Spring Cloud AWS project on the matter with link to this SO topic.
Spring Cloud AWS supports this starting with Version 2.0. Injecting a Visiblity
parameter in your SQS listener method does the trick:
@SqsListener(value = "my-sqs-queue")
void onMessageReceived(@Payload String payload, Visibility visibility) {
...
var extension = visibility.extend(20);
...
}
Note, that extend
will work asynchronously and will return a Future. So if you want to be sure further down the processing, that the visibility of the message is really extended at the AWS side of things, either block on the Future using extension.get()
or query the Future with extension.isDone()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With