I don't understand how SQS QueueListener works.
This is my config:
/**
* AWS Credentials Bean
*/
@Bean
public AWSCredentials awsCredentials() {
return new BasicAWSCredentials(accessKey, secretAccessKey);
}
/**
* AWS Client Bean
*/
@Bean
public AmazonSQS amazonSQSAsyncClient() {
AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials());
sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1));
return sqsClient;
}
/**
* AWS Connection Factory
*/
@Bean
public SQSConnectionFactory connectionFactory() {
SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
Region.getRegion(Regions.US_EAST_1));
factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return awsCredentials();
}
@Override
public void refresh() {
}
});
return factoryBuilder.build();
}
/**
* Registering QueueListener for queueName
*/
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName(queueName);
messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener()));
messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler());
messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3));
return messageListenerContainer;
}
As you can see, I have configured my DefaultMessageListenerContainer
with Executors.newFixedThreadPool(3)
This way I expect to have 3 concurrent task execution in my queue listener at one time.
Thsi is my listener logic:
public class QueueListener {
public void handleMessage(String messageContent) {
try {
logger.info(String.format("message received: %s", messageContent));
logger.info("wait 30 sec");
Thread.sleep(1000 * 30);
logger.info("done");
} catch (Throwable th) {
throw new QueueListenerException(messageContent, th);
}
}
}
Right now each handleMessage
method blocks (Thread.sleep(1000 * 30);
) execution for 30 seconds and only 1 handleMessage
method executes at one time.
What am I doing wrong ?
How to achieve concurrent handleMessage
method invocation at one time ?
With a current configuration I expect to have 3 handleMessage
that are performed simultaneously.
You can add the parameter to handle concurrent execution in the bean for DefaultMessageListenerConfigurator by adding messageListenerContainer.setConcurrency("3-10");
This means it will start with 3 threads and scale up to 10.
The number of concurrentConsumers can also be alternatively set by using messageListenerContainer.setConcurrentConsumers(3);
Refer: https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrency-java.lang.String-
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With