Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring cloud aws multiple sqs listener

There are 2 sqs listener in my project. I want one of them to have the same setting and one of them different setting. The only value I want to change is maxNumberOfMessages.

What is the most practical way to do this ? ı want set different maxNumberOfMessages value for one of listener.

this is my config ;

@Bean
public AWSCredentialsProvider awsCredentialsProvider(@Value("${cloud.aws.profile}") String profile,
                                                     @Value("${cloud.aws.region.static}") String region,
                                                     @Value("${cloud.aws.roleArn}") String role,
                                                     @Value("${cloud.aws.user}") String user) {
    ...

    return new AWSStaticCredentialsProvider(sessionCredentials);
}

@Bean
@Primary
@Qualifier("amazonSQSAsync")
public AmazonSQSAsync amazonSQSAsync(@Value("${cloud.aws.region.static}") String region, AWSCredentialsProvider awsCredentialsProvider) {
    return AmazonSQSAsyncClientBuilder.standard()
            .withCredentials(awsCredentialsProvider)
            .withRegion(region)
            .build();
}

@Bean
@Primary
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setMaxNumberOfMessages(1);
    factory.setWaitTimeOut(10);
    factory.setQueueMessageHandler(new SqsQueueMessageHandler());
    return factory;
}

This is listener;

@SqsListener(value = "${messaging.queue.blabla.source}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(Message message, Acknowledgment acknowledgment, @Header("MessageId") String messageId) {
    log.info("Message Received");

    try {
        ....
        acknowledgment.acknowledge().get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (Exception ex) {
        throw new RuntimeException(ex.getMessage());
    }
}
like image 466
Rhmn61 Avatar asked Mar 14 '19 13:03

Rhmn61


People also ask

Can AWS SQS have multiple consumers?

You can have multiple consumers, but you have to remember that each message can be processed only once. It means that you can have multiple instances of the same consumer, but you can't read the same message from one queue in two different components. Each of these components should use a separate SQS queue.

Can SQS have multiple producers?

Q: Do Amazon SQS FIFO queues support multiple producers? Yes. One or more producers can send messages to a FIFO queue. Messages are stored in the order that they were successfully received by Amazon SQS.

What is QueueMessagingTemplate?

The QueueMessagingTemplate contains many convenience methods to send a message. There are send methods that specify the destination using a QueueMessageChannel object and those that specify the destination using a string which is going to be resolved against the SQS API.

Is AWS SQS multi region?

SQS is a regional service, that is highly available within a single region. There is no cross-region replication capability. You can definitely access the queue from different regions, just initialize the sqs client with the correct destination region.


2 Answers

Following hack worked for me (if each listener listens to different queue)

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {

    return new SimpleMessageListenerContainerFactory() {
        @Override
        public SimpleMessageListenerContainer createSimpleMessageListenerContainer() {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer() {
                @Override
                protected void startQueue(String queueName, QueueAttributes queueAttributes) {
                    
                    // A place to configure queue based maxNumberOfMessages
                    
                    try {
                        if (queueName.endsWith(".fifo")) {
                            FieldUtils.writeField(queueAttributes, "maxNumberOfMessages", 1, true);
                        }
                    } catch (IllegalAccessException e) {
                        throw new RuntimeException(e);
                    }
                    super.startQueue(queueName, queueAttributes);
                }
            };
            simpleMessageListenerContainer.setAmazonSqs(amazonSqs);
            return simpleMessageListenerContainer;
        }
    };
}
like image 89
Sushant Avatar answered Nov 04 '22 00:11

Sushant


ı found the solution and share on example repo on github. github link

if ı add @EnableAsync annotation on listener class and @Async annotation to handler method my problem is solving :)

like image 27
Rhmn61 Avatar answered Nov 03 '22 23:11

Rhmn61