I have an SQS queue that is constantly being populated by a data consumer and I am now trying to create the service that will pull this data from SQS using Python's boto.
The way I designed it is that I will have 10-20 threads all trying to read messages from the SQS queue and then doing what they have to do on the data (business logic), before going back to the queue to get the next batch of data once they're done. If there's no data they will just wait until some data is available.
I have two areas I'm not sure about with this design
Thanks
Depending on your application's needs, you might have to use short or long polling to receive messages. Amazon SQS doesn't automatically delete a message after retrieving it for you, in case you don't successfully receive the message (for example, if the consumers fail or you lose connectivity).
Long polling helps reduce the cost of using Amazon SQS by eliminating the number of empty responses (when there are no messages available for a ReceiveMessage request) and false empty responses (when messages are available but aren't included in a response).
To purge a queue, log in to the AWS Management Console and choose Amazon SQS. Then, select a queue, and choose “Purge Queue” from the Queue Actions menu. The queue will then be cleared of all messages. You can also purge queues using the AWS SDKs or command-line tools.
From SQS FAQs: In almost all cases, Amazon SQS long polling is preferable to short polling. Long-polling requests let your queue consumers receive messages as soon as they arrive in your queue while reducing the number of empty ReceiveMessageResponse instances returned.
The long-polling capability of the receive_message()
method is the most efficient way to poll SQS. If that returns without any messages, I would recommend a short delay before retrying, especially if you have multiple readers. You may want to even do an incremental delay so that each subsequent empty read waits a bit longer, just so you don't end up getting throttled by AWS.
And yes, you do have to delete the message after you have read or it will reappear in the queue. This can actually be very useful in the case of a worker reading a message and then failing before it can fully process the message. In that case, it would be re-queued and read by another worker. You also want to make sure the invisibility timeout of the messages is set to be long enough the the worker has enough time to process the message before it automatically reappears on the queue. If necessary, your workers can adjust the timeout as they are processing if it is taking longer than expected.
If you want a simple way to set up a listener that includes automatic deletion of messages when they're finished being processed, and automatic pushing of exceptions to a specified queue, you can use the pySqsListener package.
You can set up a listener like this:
from sqs_listener import SqsListener
class MyListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
run_my_function(body['param1'], body['param2']
listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()
There is a flag to switch from short polling to long polling - it's all documented in the README file.
Disclaimer: I am the author of said package.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With