There are 2 sqs listener in my project. I want one of them to have the same setting and one of them different setting. The only value I want to change is maxNumberOfMessages.
What is the most practical way to do this ? ı want set different maxNumberOfMessages value for one of listener.
this is my config ;
@Bean
public AWSCredentialsProvider awsCredentialsProvider(@Value("${cloud.aws.profile}") String profile,
@Value("${cloud.aws.region.static}") String region,
@Value("${cloud.aws.roleArn}") String role,
@Value("${cloud.aws.user}") String user) {
...
return new AWSStaticCredentialsProvider(sessionCredentials);
}
@Bean
@Primary
@Qualifier("amazonSQSAsync")
public AmazonSQSAsync amazonSQSAsync(@Value("${cloud.aws.region.static}") String region, AWSCredentialsProvider awsCredentialsProvider) {
return AmazonSQSAsyncClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withRegion(region)
.build();
}
@Bean
@Primary
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setMaxNumberOfMessages(1);
factory.setWaitTimeOut(10);
factory.setQueueMessageHandler(new SqsQueueMessageHandler());
return factory;
}
This is listener;
@SqsListener(value = "${messaging.queue.blabla.source}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(Message message, Acknowledgment acknowledgment, @Header("MessageId") String messageId) {
log.info("Message Received");
try {
....
acknowledgment.acknowledge().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage());
}
}
You can have multiple consumers, but you have to remember that each message can be processed only once. It means that you can have multiple instances of the same consumer, but you can't read the same message from one queue in two different components. Each of these components should use a separate SQS queue.
Q: Do Amazon SQS FIFO queues support multiple producers? Yes. One or more producers can send messages to a FIFO queue. Messages are stored in the order that they were successfully received by Amazon SQS.
The QueueMessagingTemplate contains many convenience methods to send a message. There are send methods that specify the destination using a QueueMessageChannel object and those that specify the destination using a string which is going to be resolved against the SQS API.
SQS is a regional service, that is highly available within a single region. There is no cross-region replication capability. You can definitely access the queue from different regions, just initialize the sqs client with the correct destination region.
Following hack worked for me (if each listener listens to different queue)
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
return new SimpleMessageListenerContainerFactory() {
@Override
public SimpleMessageListenerContainer createSimpleMessageListenerContainer() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer() {
@Override
protected void startQueue(String queueName, QueueAttributes queueAttributes) {
// A place to configure queue based maxNumberOfMessages
try {
if (queueName.endsWith(".fifo")) {
FieldUtils.writeField(queueAttributes, "maxNumberOfMessages", 1, true);
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
super.startQueue(queueName, queueAttributes);
}
};
simpleMessageListenerContainer.setAmazonSqs(amazonSqs);
return simpleMessageListenerContainer;
}
};
}
ı found the solution and share on example repo on github. github link
if ı add @EnableAsync annotation on listener class and @Async annotation to handler method my problem is solving :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With