Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aws integration spring: Extend Visibility Timeout

Tags:

Is it possible to extend the visibility time out of a message that is in flight.

See:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html.

Section: Changing a Message's Visibility Timeout.

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html#changeMessageVisibility-com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest-

In summary I want to be able to extend the first set visibility timeout for a given message that is in flight.

Example if 15secs have passed I then want to extend the timeout by another 20secs. Better example in java docs above.

From my understanding in the links above you can do this on the amazon side.

Below are my current settings;

  SqsMessageDrivenChannelAdapter adapter =
  new SqsMessageDrivenChannelAdapter(queue);
  adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
  adapter.setMaxNumberOfMessages(1);
  adapter.setSendTimeout(2000);
  adapter.setVisibilityTimeout(200);
  adapter.setWaitTimeOut(20);

Is it possible to extend this timeout?

like image 306
user101010101 Avatar asked Jul 13 '16 15:07

user101010101


2 Answers

OK. Looks like I see your point.

We can change visibility for particular message using API:

AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)

For this purpose in downstream flow you have to get access to (inject) AmazonSQS bean and extract special headers from the Message:

@Autowired
AmazonSQS amazonSqs;

@Autowired
ResourceIdResolver resourceIdResolver;
...


MessageHeaders headers = message.getHeaders();

DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);

String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));

String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);

amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);

But eh, I agree that we should provide something on the matter as out-of-the-box feature. That may be even something similar to QueueMessageAcknowledgment as a new header. Or even just one more changeMessageVisibility method to this one.

Please, raise a GH issue for Spring Cloud AWS project on the matter with link to this SO topic.

like image 99
Artem Bilan Avatar answered Sep 28 '22 04:09

Artem Bilan


Spring Cloud AWS supports this starting with Version 2.0. Injecting a Visiblity parameter in your SQS listener method does the trick:

  @SqsListener(value = "my-sqs-queue")
  void onMessageReceived(@Payload String payload, Visibility visibility) {
    ...
    var extension = visibility.extend(20);
    ...
  }

Note, that extend will work asynchronously and will return a Future. So if you want to be sure further down the processing, that the visibility of the message is really extended at the AWS side of things, either block on the Future using extension.get() or query the Future with extension.isDone()

like image 35
Stefan Haberl Avatar answered Sep 28 '22 04:09

Stefan Haberl