I'm working on an application whose workflow is managed by passing messages in SQS, using boto.
My SQS queue is growing gradually, and I have no way to check how many elements it is supposed to contain.
Now I have a daemon that periodically polls the queue, and checks if i have a fixed-size set of elements. For example, consider the following "queue":
q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]
Now I want to check if I have "msg1_comp1", "msg2_comp1" and "msg3_comp1" in the queue together at some point in time, but I don't know the size of the queue.
After looking through the API, it seems you can either get only 1 element, or a fixed number of elements in the queue, but not all:
>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10
A suggestion proposed in the answers would be to get for example 10 messages in a loop until I get nothing back, but messages in SQS have a visibility timeout, meaning that if I poll elements from the queue, they won't be really removed, they will only be invisible for a short period of time.
Is there a simple way to get all messages in the queue, without knowing how many there are?
To receive a message from an AWS SQS Python Queue, you can use receive_message() Boto3 method. The receive_message() SQS Python method retrieves one or more messages (up to 10), from your specified SQS Queue.
To receive and delete a message (console)Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . In the navigation pane, choose Queues. On the Queues page, choose a queue. Choose Send and receive messages.
A single Amazon SQS message queue can contain an unlimited number of messages. However, there is a quota of 120,000 for the number of inflight messages for a standard queue and 20,000 for a FIFO queue.
It is not possible to 'search' messages in an Amazon SQS queue. A ReceiveMessage() call will retrieve one or more messages, but there is no ability to control which messages will be returned.
Put your call to q.get_messages(n)
inside while loop:
all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
all_messages.extend(rs)
rs=q.get_messages(10)
Additionally, dump won't support more than 10 messages either:
def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
"""Utility function to dump the messages in a queue to a file
NOTE: Page size must be < 10 else SQS errors"""
I've been working with AWS SQS queues to provide instant notifications, so I need to be processing all of the messages in real time. The following code will help you to efficiently dequeue (all) messages and handle any errors when removing.
Note: to remove messages off the queue you need to delete them. I'm using the updated boto3 AWS python SDK, json library, and the following default values:
import boto3
import json
region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
messages_to_delete = []
for message in queue.receive_messages(
MaxNumberOfMessages=max_queue_messages):
# process message body
body = json.loads(message.body)
message_bodies.append(body)
# add message to delete
messages_to_delete.append({
'Id': message.message_id,
'ReceiptHandle': message.receipt_handle
})
# if you don't receive any notifications the
# messages_to_delete list will be empty
if len(messages_to_delete) == 0:
break
# delete messages to remove them from SQS queue
# handle any errors
else:
delete_response = queue.delete_messages(
Entries=messages_to_delete)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With