Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQSListener with ThreadpoolExecutor

In the below example , I am setting the max and core pool size to 1. However no messages are being processed. When I enable debug log , I am able to see the messages being pulled from SQS , but I guess it is not being processed / deleted. However when I increase core and max pool size to 2 , the messages seem to be processed.

EDIT

I believe Spring maybe allocating a thread for receiver which reads data off the queue and hence it is unable to allocate a thread to listener which is processing the message. When I increased the corepoolsize to 2 , I saw that messages were being read off the queue. When I added another listener (for dead letter queue) , I encountered the same issue - 2 threads were not sufficient as the messages were not being processed. When I increased the corepoolsize to 3 , it started processing the messages. I assume in this case , 1 thread was allocated to read messages off the queue and 2 listeners were assigned 1 thread each.

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

Listener Config

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}
like image 648
Punter Vicky Avatar asked Apr 27 '18 21:04

Punter Vicky


People also ask

What is keepAliveTime in ThreadPoolExecutor?

keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

What is the use of ThreadPoolExecutor in Java?

ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

What is the default queue size in ThreadPoolExecutor?

The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to Executors.


1 Answers

By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. A very good explanation of the rules are documented here

Setting maxPoolSize implicitly allows for tasks to get dropped. However, the default queue capacity is Integer.MAX_VALUE, which, for practical purposes, is infinity.

Something to watch out for is that ThreadPoolTaskExecutor uses a ThreadPoolExecutor underneath, which has a somewhat unusual approach to queueing, described in the docs:

If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

This means that maxPoolSize is only relevant when the queue is full, otherwise the number of threads will never grow beyond corePoolSize. As an example, if we submit tasks that never complete to the thread pool:

  • the first corePoolSize submissions will start a new thread each;
  • after that, all submissions go to the queue;
  • if the queue is finite and its capacity is exhausted, each submission starts a new thread, up to maxPoolSize;
  • when both the pool and the queue are full, new submissions are rejected.

Queuing - Read the docs

Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to be queued in cases where all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)

  1. If the number of threads is less than the corePoolSize, create a new Thread to run a new task.
  2. If the number of threads is equal (or greater than) the corePoolSize, put the task into the queue.
  3. If the queue is full, and the number of threads is less than the maxPoolSize, create a new thread to run tasks in.
  4. If the queue is full, and the number of threads is greater than or equal to maxPoolSize, reject the task.
like image 156
Vikram Palakurthi Avatar answered Sep 19 '22 21:09

Vikram Palakurthi