In the below example , I am setting the max and core pool size to 1. However no messages are being processed. When I enable debug log , I am able to see the messages being pulled from SQS , but I guess it is not being processed / deleted. However when I increase core and max pool size to 2 , the messages seem to be processed.
EDIT
I believe Spring maybe allocating a thread for receiver which reads data off the queue and hence it is unable to allocate a thread to listener which is processing the message. When I increased the corepoolsize to 2 , I saw that messages were being read off the queue. When I added another listener (for dead letter queue) , I encountered the same issue - 2 threads were not sufficient as the messages were not being processed. When I increased the corepoolsize to 3 , it started processing the messages. I assume in this case , 1 thread was allocated to read messages off the queue and 2 listeners were assigned 1 thread each.
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
Listener Config
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.
The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to Executors.
By setting corePoolSize
and maximumPoolSize
the same, you create a fixed-size thread pool
. A very good explanation of the rules are documented here
Setting maxPoolSize
implicitly allows for tasks to get dropped.
However, the default queue capacity is Integer.MAX_VALUE
, which, for practical purposes, is infinity.
Something to watch out for is that ThreadPoolTaskExecutor
uses a ThreadPoolExecutor
underneath, which has a somewhat unusual approach to queueing, described in the docs:
If
corePoolSize
or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
This means that maxPoolSize
is only relevant when the queue is full, otherwise the number of threads will never grow beyond corePoolSize
.
As an example, if we submit tasks that never complete to the thread pool:
corePoolSize
submissions will start a new thread each;maxPoolSize
;Queuing - Read the docs
Any BlockingQueue
may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
Unbounded queues
. Using an unbounded queue (for example aLinkedBlockingQueue
without a predefined capacity) will cause new tasks to be queued in cases where all corePoolSize threads are busy. Thus, no more thancorePoolSize
threads will ever be created. (And the value of themaximumPoolSize
therefore doesn't have any effect.)
corePoolSize
, create a new
Thread to run a new task.corePoolSize
, put the task into the queue.maxPoolSize
, create a new thread to run tasks in.maxPoolSize
, reject the task.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With