Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing SQS queues with boto

I have a python script using the boto library on ec2 instance which is part of an autoscaling group. The script processes messages from a SQS queue:

import boto
from boto.sqs.message import Message

conn = boto.connect_sqs()
q = conn.create_queue('queue-name')

while (qin.count() > 0):
    m = q.get_messages()
    #do something with the message

Does using the while statement make sense? Does count() update in real time as:

  1. other instances take messages off the queue (or am I going to double up)
  2. new messages are added to the queue (or will I miss them?)

How do I make this script constantly listen for new additions to the queue, even while the queue is empty?

In this question Processing items in SQS queue with a php script it was mentioned that 'sqs ruby client library has a method "poll" which continuously polls the queue and on receiving a message in the queue passes it on to a block'. Is there an equivalent in Python?

It has also been suggested that the SNS could be used to notify the scripts of the message queue status but I do not see how you could configure a responsive system with SNS as the metric alarms are not fine grained enough.

like image 558
waigani Avatar asked Jun 21 '12 04:06

waigani


People also ask

How do I read messages from SQS queue in Python?

To receive a message from an AWS SQS Python Queue, you can use receive_message() Boto3 method. The receive_message() SQS Python method retrieves one or more messages (up to 10), from your specified SQS Queue.

Can you query an SQS queue?

You can query and explore your data using the QueueSample event type, with a provider value of SqsQueue . For more on how to use your data, see Understand integration data.


3 Answers

You shouldn't rely on the count for a queue as it's only meant to provide an approximate count and is not guaranteed to be accurate.

If you want to just keep polling forever, just do this:

while 1:
    messages = q.get_messages()
    # do something with messages
    time.sleep(N)

I have added the call to time.sleep to introduce a delay in the loop. The value of N should be at least one second and could be considerably more, depending on how quickly you expect new messages to appear in your queue. If you don't put some sort of delay in the loop you will probably start getting throttled by the service.

To avoid a message getting read multiple times, you should try to adjust the visibility timeout of the queue to be a value greater than the time it takes you to process a message and then make sure you delete the message when processing has completed.

like image 197
garnaat Avatar answered Oct 14 '22 12:10

garnaat


Example:

# wait_time_seconds count only 1 request in x seconds (0 - 20)
# num_messages get x messages in same request (1 - 10)
while 1:
    logger.info("... waiting messages ...")
    messages = queue_in.get_messages(wait_time_seconds=20, num_messages=10)
    for message in messages:
        logger.info('message: %s' % (message,))
        queue_in.delete_message(message)
like image 30
raulfortes Avatar answered Oct 14 '22 11:10

raulfortes


  1. When you pull a message from SQS, the message becomes invisible and unreachable by other queue queries (edit - invisibility can be set between 0 and 12 hrs).
  2. You will have to get the queue again each time new messages are added, but this should not be a problem - that's why the queuing service exists in the first place.

If you want to constantly poll the queue, try what's called Long Polling - you can have a continuous poll for up to 20 seconds that returns when the queue is populated.

Hope that's helpful, otherwise poke around the boto sqs documentation.

like image 20
tyleha Avatar answered Oct 14 '22 12:10

tyleha