Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get all messages in Amazon SQS queue using boto library in Python?

I'm working on an application whose workflow is managed by passing messages in SQS, using boto.

My SQS queue is growing gradually, and I have no way to check how many elements it is supposed to contain.

Now I have a daemon that periodically polls the queue, and checks if i have a fixed-size set of elements. For example, consider the following "queue":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]

Now I want to check if I have "msg1_comp1", "msg2_comp1" and "msg3_comp1" in the queue together at some point in time, but I don't know the size of the queue.

After looking through the API, it seems you can either get only 1 element, or a fixed number of elements in the queue, but not all:

>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10

A suggestion proposed in the answers would be to get for example 10 messages in a loop until I get nothing back, but messages in SQS have a visibility timeout, meaning that if I poll elements from the queue, they won't be really removed, they will only be invisible for a short period of time.

Is there a simple way to get all messages in the queue, without knowing how many there are?

like image 936
Charles Menguy Avatar asked Apr 16 '12 20:04

Charles Menguy


People also ask

How do I read messages from SQS queue in Python?

To receive a message from an AWS SQS Python Queue, you can use receive_message() Boto3 method. The receive_message() SQS Python method retrieves one or more messages (up to 10), from your specified SQS Queue.

How do I retrieve messages from SQS?

To receive and delete a message (console)Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . In the navigation pane, choose Queues. On the Queues page, choose a queue. Choose Send and receive messages.

How many messages can be read from SQS at once?

A single Amazon SQS message queue can contain an unlimited number of messages. However, there is a quota of 120,000 for the number of inflight messages for a standard queue and 20,000 for a FIFO queue.

Can you query an SQS queue?

It is not possible to 'search' messages in an Amazon SQS queue. A ReceiveMessage() call will retrieve one or more messages, but there is no ability to control which messages will be returned.


2 Answers

Put your call to q.get_messages(n) inside while loop:

all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
    all_messages.extend(rs)
    rs=q.get_messages(10)

Additionally, dump won't support more than 10 messages either:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    """Utility function to dump the messages in a queue to a file
    NOTE: Page size must be < 10 else SQS errors"""
like image 117
AJ. Avatar answered Sep 22 '22 01:09

AJ.


I've been working with AWS SQS queues to provide instant notifications, so I need to be processing all of the messages in real time. The following code will help you to efficiently dequeue (all) messages and handle any errors when removing.

Note: to remove messages off the queue you need to delete them. I'm using the updated boto3 AWS python SDK, json library, and the following default values:

import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)
like image 45
Timothy Liu Avatar answered Sep 19 '22 01:09

Timothy Liu