Note: this is a repost of a thread from here.
Hi all, I've got a process that processes messages in a single SQS queue. The queue can have many message in it and each message results in a database hit. Therefore I wanted to thread the readers of this queue.
The basic code for each thread is:
public void run() {
while(true) {
ReceiveMessageRequest rmr = new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(3);
List<Message> messages = sqsClient.receiveMessage(rmr).getMessages();
// process messages
// delete messages
}
}
What I'm seeing is that there are tons of duplicated messages between the threads. I know that I should expect a few duplicates here and there but it appears that each thread gets the same set of messages and, realistically, only one thread ever does much work.
Am I misunderstanding how to use the API or am I doing something else wrong? The Javadocs indicate that the AmazonSQS class is threadsafe and, indeed, even creating a new AmazonSQS class for each thread changed nothing.
Any pointers would be most appreciated. My current thought of a fix is to have a single thread reading from the SQS queue, putting each message into something like a LinkedBlockingDeque and then have the workers reading that. But I feel that that implementation will not drain the queue as fast as I'd like.
A single Amazon SQS message queue can contain an unlimited number of messages. However, there is a quota of 120,000 for the number of inflight messages for a standard queue and 20,000 for a FIFO queue.
Amazon SQS is engineered to provide “at least once” delivery of all messages in its queues. Although most of the time each message will be delivered to your application exactly once, you should design your system so that processing a message more than once does not create any errors or inconsistencies.
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. When using SQS as a Lambda event source mapping, Lambda functions can be triggered with a batch of messages from SQS.
The aws documentation for SQS says multiple policies could be applied to a single queue; however the cloudformation QueuePolicy does not have explicit mention on whether this is allowed or not.
As you have a database hit for each message, it seems that processing each message takes time. You should increase the visibility timeout of the queue.
From AWS SQS documentation:
Immediately after the message is received, it remains in the queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consuming components from receiving and processing the message.
(http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With