Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set a Message Handler programmatically in Spring Cloud AWS SQS?

maybe someone has an idea to my following problem:

I am currently on a project, where i want to use the AWS SQS with Spring Cloud integration. For the receiver part i want to provide a API, where a user can register a "message handler" on a queue, which is an interface and will contain the user's business logic, e.g.

MyAwsSqsReceiver receiver = new MyAwsSqsReceiver();
receiver.register("a-queue-name", new MessageHandler(){
  @Override
  public void handle(String message){
    //... business logic for the received message
  }
});

I found examples, e.g. https://codemason.me/2016/03/12/amazon-aws-sqs-with-spring-cloud/ and read the docu http://cloud.spring.io/spring-cloud-aws/spring-cloud-aws.html#_sqs_support

But the only thing i found there to "connect" a functionality for processing a incoming message is a annotation on a method, e.g. @SqsListener or @MessageMapping.

These annotations are fixed to a certain queue-name, though. So now i am at a loss, how to dynamically "connect" my provided "MessageHandler" (from my API) to the incoming message for the specified queuename.

In the Config the example there is a SimpleMessageListenerContainer, which gets a QueueMessageHandler set, but this QueueMessageHandler does not seem to be the right place to set my handler or to override its methods and provide my own subclass of QueueMessageHandler.

I already did something like this with the Spring Amqp integration and RabbitMq and thought, that it would be also similar here with AWS SQS.

Does anyone have an idea, how to accomplish this?

thx + bye, Ximon

EDIT:

I found, that Spring JMS could actually do that, e.g. www.javacodegeeks.com/2016/02/aws-sqs-spring-jms-integration.html. Does anybody know, what consequences using JMS protocol has here, good or bad?

like image 625
Ximon Avatar asked Sep 28 '16 18:09

Ximon


People also ask

How are messages Access in Amazon SQS?

Amazon SQS begins to poll servers to find messages in the queue. The progress bar on the right side of the Receive messages section displays the polling duration. The Messages section displays a list of the received messages. For each message, the list displays the message ID, sent date, size, and receive count.

How do I configure the maximum message size for SQS?

The maximum is 262,144 bytes (256 KiB). To send messages larger than 256 KB, you can use the Amazon SQS Extended Client Library for Java . This library allows you to send an Amazon SQS message that contains a reference to a message payload in Amazon S3. The maximum payload size is 2 GB.


1 Answers

I am facing the same issue.

I am trying to go in an unusual way where I set up an Aws client bean at build time and then instead of using sqslistener annotation to consume from the specific queue I use the scheduled annotation which I can programmatically pool (each 10 secs in my case) from which queue I want to consume.

I did the example that iterates over queues defined in properties and then consumes from each one.

Client Bean:

@Bean
@Primary
public AmazonSQSAsync awsSqsClient() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withRegion(Regions.EU_WEST_1.getName())
            .build();
}

Consumer:

// injected in the constructor
private final AmazonSQSAsync awsSqsClient;

@Scheduled(fixedDelay = 10000)
public void pool() {
    properties.getSqsQueues()
            .forEach(queue -> {
                val receiveMessageRequest = new ReceiveMessageRequest(queue)
                        .withWaitTimeSeconds(10)
                        .withMaxNumberOfMessages(10);

                // reading the messages
                val result = awsSqsClient.receiveMessage(receiveMessageRequest);
                val sqsMessages = result.getMessages();
                log.info("Received Message on queue {}: message = {}", queue, sqsMessages.toString());

                // deleting the messages
                sqsMessages.forEach(message -> {
                    val deleteMessageRequest = new DeleteMessageRequest(queue, message.getReceiptHandle());
                    awsSqsClient.deleteMessage(deleteMessageRequest);
                });
            });
}

Just to clarify, in my case, I need multiple queues, one for each tenant, with the queue URL for each one passed in a property file. Of course, in your case, you could get the queue names from another source, maybe a ThreadLocal which has the queues you have created in runtime.

If you wish, you can also try the JMS approach where you create message consumers and add a listener to each one you wish (See the doc Aws Jms documentation).

like image 65
Guilherme Alencar Avatar answered Oct 04 '22 06:10

Guilherme Alencar