Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS SQS Java. Not all messages are retrieved from the SQS queue

I have been trying several approaches to retrieve all messages from the SQS queue by using AWS SDK for Java to no avail. I have read about the distributed nature of the AWS SQS and that messages are stored on the different servers. But what I do not understand is why this architecture is not hidden from the end user. What tricks do I have to apply in Java code to retrieve all messages and be 100% sure that no one was missed?

I tried this with the "Long Polling":

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();

And this with Request Batching / Client-Side Buffering:

    // Create the basic Amazon SQS async client
    AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();

    // Create the buffered client
    AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

    CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");

    CreateQueueResult res = bufferedSqs.createQueue(createRequest);

    SendMessageRequest request = new SendMessageRequest();
    String body = "test message_" + System.currentTimeMillis();
    request.setMessageBody( body );
    request.setQueueUrl(res.getQueueUrl());

    SendMessageResult sendResult = bufferedSqs.sendMessage(request);

    ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(10)
    .withQueueUrl(queueUrl);
    ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

    List<Message> messages = rx.getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String, String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }

But I am still unable to retrieve all messages.

Any idea?

AWS Forum keeps silence on my post.

like image 911
lk7777 Avatar asked Jun 24 '15 12:06

lk7777


People also ask

How do I retrieve messages from SQS?

To receive and delete a message (console) Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . In the navigation pane, choose Queues. On the Queues page, choose a queue. Choose Send and receive messages.

How many messages can be read from SQS at once?

A single Amazon SQS message queue can contain an unlimited number of messages. However, there is a quota of 120,000 for the number of inflight messages for a standard queue and 20,000 for a FIFO queue.

How do I read messages from SQS queue?

When you request messages from a queue, you can't specify which message to retrieve. Instead, you specify the maximum number of messages (up to 10) that you want to retrieve. From the Queues page, select a queue. From Queue Actions, select Send and receive messages.

Can SQS lose messages?

With SQS, it is easy to send, store, and receive messages between software components, without losing messages.


Video Answer


4 Answers

When receiving messages from an SQS queue, you need to repeatedly call sqs:ReceiveMessage.

On each call to sqs:ReceiveMessage, you will get 0 or more messages from the queue which you'll need to iterate through. For each message, you'll also need to call sqs:DeleteMessage to remove the message from the queue when you're done processing each message.

Add a loop around your "Long Polling" sample above to receive all messages.

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

Also note that you may receive the same message more than once. So allow your work to "reprocess" the same message, or detect a repeated message.

like image 93
Matt Houser Avatar answered Sep 21 '22 19:09

Matt Houser


I too was facing same issue - only one message was getting returned , then i tried receiveMessageRequest.setMaxNumberOfMessages(10) , which would help me in retrieving 10 messages in a loop,

since my queue has >500 records what i did was

    List<String> messagelist = new ArrayList<>();

    try
    {
        AmazonSQS sqs = new AmazonSQSClient(credentials);
        Region usWest2 = Region.getRegion(Regions.US_WEST_2);
        sqs.setRegion(usWest2);
        boolean flag = true;

        while(flag)
        {
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
            receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
            receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
            List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

            for (Message message : messages) 
                  {
                    //   System.out.println("    Body:          " + message.getBody());
                       messagelist.add( message.getBody());

                       String messageReceiptHandle = message.getReceiptHandle();
                       sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
                    }
            if(messages.size()==0)
            {
                flag = false;
            }
        }         

    }
    catch (AmazonServiceException ase) {
        ase.printStackTrace();
    } catch (AmazonClientException ace) {
       ace.printStackTrace();
    }
    finally {
        return messagelist ;
    }

I am reading records from SQS then saving it into a String list and then deletion the record from queue.

so in the end i will have all the data from the queue in a list

like image 40
Shally Dhar Avatar answered Sep 23 '22 19:09

Shally Dhar


An SQS queue is not a database. You can't read all the messages into a list like you are trying to do. There is no beginning and no end to the queue. You poll the queue and ask for some messages, it returns you some messages if they exist.

If you want a method that can return the entire dataset, then sqs is not the right tool - a traditional database might be better in that case.

like image 24
E.J. Brennan Avatar answered Sep 23 '22 19:09

E.J. Brennan


Long polling will wait if there is no message in Queue. This means that if you call ReceiveMessage with long polling in loop you are guaranteed that you will get all messages. When there is 0 messages received in response, you've already received all messages.

You mentioned that you used also web console. Web console works in same way as calling API with SDK. This means that when you receive and see messages in console, messages are invisible to other clients until visibility timeout expires. That's probably reason why you don't see messages.

See more information about visibility timeout:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

like image 32
Lubo Sach Avatar answered Sep 19 '22 19:09

Lubo Sach