Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring cloud SQS - Polling interval

Listening to a AWS SQS queue, using spring cloud as follows:

@SqsListener(value = "${queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders) {
    // code
}

Spring config:

<aws-messaging:annotation-driven-queue-listener
    max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
    amazon-sqs="awsSqsClient" />

AwsSqsClient:

@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);
}

This works fine.

Configured 10 threads to process these messages in SQS client as you can see above code. This is also working fine, at any point of time maximum 10 messages are processed.

The issue is, I couldn't figure-out a way to control the polling interval. By default spring polls once all threads are free.

i.e. consider the following example

  1. Around 3 messages are delivered to Queue
  2. Spring polls the queue and get 3 messages
  3. 3 messages are processing each message take roughly about 20 minutues

In the meantime there are around 25 messages delivered to queue. Spring is NOT polling the queue until all the 3 messages delivered earlier completed. Esentially as per example above Spring polls only after 20 minutes though there are 7 threads still free!!

Any idea how we can control this polling? i.e. Poll should start if there are any threads free and should not wait until all threads become free

like image 805
yottabrain Avatar asked Mar 08 '16 06:03

yottabrain


1 Answers

Your listener can load messages into your Spring app and submit them to another thread pool along with Acknowledgement and Visibility objects (if you want to control both).

Once messages are submitted to this thread pool, your listener can load more data. You can control the concurrency by adjusting thread pool settings.

Your listener's method signature will be similar to one below:

@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
                   @Headers Map<String, Object> headers,
                   Acknowledgment acknowledgment,
                   Visibility visibility) throws Exception {
...... Send pojo to worker thread and return

A worker thread then will acknowledge the successful processing

acknowledgment.acknowledge().get();

Make sure your message visibility is set to a value that is greater than your highest processing time (use some timeout to limit execution time).

like image 177
Moose on the Loose Avatar answered Sep 25 '22 04:09

Moose on the Loose