Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent SQS queue listeners

I don't understand how SQS QueueListener works.

This is my config:

    /**
     * AWS Credentials Bean
     */
    @Bean
    public AWSCredentials awsCredentials() {
        return new BasicAWSCredentials(accessKey, secretAccessKey);
    }

    /**
     * AWS Client Bean
     */
    @Bean
    public AmazonSQS amazonSQSAsyncClient() {
        AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials());
        sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1));
        return sqsClient;
    }

    /**
     * AWS Connection Factory
     */
    @Bean
    public SQSConnectionFactory connectionFactory() {
        SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
                Region.getRegion(Regions.US_EAST_1));
        factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() {

            @Override
            public AWSCredentials getCredentials() {
                return awsCredentials();
            }

            @Override
            public void refresh() {
            }

        });
        return factoryBuilder.build();
    }

    /**
     * Registering QueueListener for queueName
     */
    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer() {
        DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(connectionFactory());
        messageListenerContainer.setDestinationName(queueName);
        messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener()));
        messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler());
        messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3));

        return messageListenerContainer;
    }

As you can see, I have configured my DefaultMessageListenerContainer with Executors.newFixedThreadPool(3)

This way I expect to have 3 concurrent task execution in my queue listener at one time.

Thsi is my listener logic:

public class QueueListener {

    public void handleMessage(String messageContent) {
        try {
            logger.info(String.format("message received: %s", messageContent));
            logger.info("wait 30 sec");
            Thread.sleep(1000 * 30);
            logger.info("done");
        } catch (Throwable th) {
            throw new QueueListenerException(messageContent, th);
        }
    }
}

Right now each handleMessage method blocks (Thread.sleep(1000 * 30);) execution for 30 seconds and only 1 handleMessage method executes at one time.

What am I doing wrong ? How to achieve concurrent handleMessage method invocation at one time ? With a current configuration I expect to have 3 handleMessage that are performed simultaneously.

like image 873
alexanoid Avatar asked Nov 09 '22 06:11

alexanoid


1 Answers

You can add the parameter to handle concurrent execution in the bean for DefaultMessageListenerConfigurator by adding messageListenerContainer.setConcurrency("3-10"); This means it will start with 3 threads and scale up to 10.
The number of concurrentConsumers can also be alternatively set by using messageListenerContainer.setConcurrentConsumers(3);

Refer: https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrency-java.lang.String-

like image 158
DPancs Avatar answered Nov 14 '22 22:11

DPancs