Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using many consumers in SQS Queue

I know that it is possible to consume a SQS queue using multiple threads. I would like to guarantee that each message will be consumed once. I know that it is possible to change the visibility timeout of a message, e.g., equal to my processing time. If my process spend more time than the visibility timeout (e.g. a slow connection) other thread can consume the same message.

What is the best approach to guarantee that a message will be processed once?

like image 684
p.magalhaes Avatar asked May 26 '16 22:05

p.magalhaes


People also ask

Can you have multiple consumers for SQS queue?

By design, Amazon SQS FIFO queues don't serve messages from the same message group to more than one consumer at a time. However, if your FIFO queue has multiple message groups, you can take advantage of parallel consumers, allowing Amazon SQS to serve messages from different message groups to different consumers.

How many consumer can consume message send over SQS FIFO queue?

A FIFO SQS queue has an important limit compared to standard SQS queues. The number of inflight messages is limited to 20,000. That means that if you've got consumers that are currently processing 20,000 messages, then the next receive message request you make won't return anything.

Is there any size limit for the message in the SQS?

The maximum is 262,144 bytes (256 KiB). To send messages larger than 256 KB, you can use the Amazon SQS Extended Client Library for Java . This library allows you to send an Amazon SQS message that contains a reference to a message payload in Amazon S3.

How large SQS messages can be what happens when that limit is exceeded?

Each SQS queue is limited to 120,000 inflight messages, or 20,000 if it is a FIFO queue. When sending a message to a queue with too many inflight messages, SQS returns the "OverLimit" error message.


1 Answers

What is the best approach to guarantee that a message will be processed once?

You're asking for a guarantee - you won't get one. You can reduce probability of a message being processed more than once to a very small amount, but you won't get a guarantee.

I'll explain why, along with strategies for reducing duplication.

Where does duplication come from

  1. When you put a message in SQS, SQS might actually receive that message more than once
    • For example: a minor network hiccup while sending the message caused a transient error that was automatically retried - from the message sender's perspective, it failed once, and successfully sent once, but SQS received both messages.
  2. SQS can internally generate duplicates
    • Simlar to the first example - there's a lot of computers handling messages under the covers, and SQS needs to make sure nothing gets lost - messages are stored on multiple servers, and can this can result in duplication.

For the most part, by taking advantage of SQS message visibility timeout, the chances of duplication from these sources are already pretty small - like fraction of a percent small.

If processing duplicates really isn't that bad (strive to make your message consumption idempotent!), I'd consider this good enough - reducing chances of duplication further is complicated and potentially expensive...


What can your application do to reduce duplication further?

Ok, here we go down the rabbit hole... at a high level, you will want to assign unique ids to your messages, and check against an atomic cache of ids that are in progress or completed before starting processing:

  1. Make sure your messages have unique identifiers provided at insertion time
    • Without this, you'll have no way of telling duplicates apart.
  2. Handle duplication at the 'end of the line' for messages.
    • If your message receiver needs to send messages off-box for further processing, then it can be another source of duplication (for similar reasons to above)
  3. You'll need somewhere to atomically store and check these unique ids (and flush them after some timeout). There are two important states: "InProgress" and "Completed"
    • InProgress entries should have a timeout based on how fast you need to recover in case of processing failure.
    • Completed entries should have a timeout based on how long you want your deduplication window
    • The simplest is probably a Guava cache, but would only be good for a single processing app. If you have a lot of messages or distributed consumption, consider a database for this job (with a background process to sweep for expired entries)
  4. Before processing the message, attempt to store the messageId in "InProgress". If it's already there, stop - you just handled a duplicate.
  5. Check if the message is "Completed" (and stop if it's there)
  6. Your thread now has an exclusive lock on that messageId - Process your message
  7. Mark the messageId as "Completed" - As long as this messageId stays here, you won't process any duplicates for that messageId.
    • You likely can't afford infinite storage though.
  8. Remove the messageId from "InProgress" (or just let it expire from here)

Some notes

  • Keep in mind that chances of duplicate without all of that is already pretty low. Depending on how much time and money deduplication of messages is worth to you, feel free to skip or modify any of the steps
    • For example, you could leave out "InProgress", but that opens up the small chance of two threads working on a duplicated message at the same time (the second one starting before the first has "Completed" it)
  • Your deduplication window is as long as you can keep messageIds in "Completed". Since you likely can't afford infinite storage, make this last at least as long as 2x your SQS message visibility timeout; there is reduced chances of duplication after that (on top of the already very low chances, but still not guaranteed).
  • Even with all this, there is still a chance of duplication - all the precautions and SQS message visibility timeouts help reduce this chance to very small, but the chance is still there:
    • Your app can crash/hang/do a very long GC right after processing the message, but before the messageId is "Completed" (maybe you're using a database for this storage and the connection to it is down)
    • In this case, "Processing" will eventually expire, and another thread could process this message (either after SQS visibility timeout also expires or because SQS had a duplicate in it).
like image 79
Krease Avatar answered Oct 13 '22 21:10

Krease