Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread polling sqs and adding it to a python queue for processing dies

I have a piece of multi threaded code - 3 threads that polls data from SQS and add it to a python queue. 5 threads that take the messages from python queue, process them and send it to a back end system.

Here is the code:

python_queue = Queue.Queue()

class GetDataFromSQS(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, python_queue):
        threading.Thread.__init__(self)
        self.python_queue = python_queue

    def run(self):
        while True:
            time.sleep(0.5) //sleep for a few secs before querying again
            try:
                msgs = sqs_queue.get_messages(10)
                if msgs == None:
                    print "sqs is empty now"!
                for msg in msgs:
                    #place each message block from sqs into python queue for processing
                    self.python_queue.put(msg)
                    print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
                    #delete from sqs
                    sqs_queue.delete_message(msg)
            except Exception as e:
                print "Exception in GetDataFromSQS :: " +  e


class ProcessSQSMsgs(threading.Thread):
    def __init__(self, python_queue):
        threading.Thread.__init__(self)
        self.python_queue = python_queue
        self.pool_manager = PoolManager(num_pools=6)

    def run(self):
        while True:
            #grabs the message to be parsed from sqs queue
            python_queue_msg = self.python_queue.get()
            try:
                processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
            except Exception as e:
                print "Error parsing:: " + e
            finally:
                self.python_queue.task_done()

def processMsgAndSendToBackend(msg, pool_manager):
    if msg != "":
        ###### All the code related to processing the msg
        for individualValue in processedMsg:
            try:
                response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
                if response == None:
                    print "Error"
                else:
                    response.release_conn()
            except Exception as e:
                print "Exception! Post data to backend: " + e


def startMyPython():
    #spawn a pool of threads, and pass them queue instance
    for i in range(3):
        sqsThread = GetDataFromSQS(python_queue)
        sqsThread.start()

    for j in range(5):
        parseThread = ProcessSQSMsgs(python_queue)
        #parseThread.setDaemon(True)
        parseThread.start()

    #wait on the queue until everything has been processed
    python_queue.join()
    # python_queue.close() -- should i do this?

startMyPython()

The problem: 3 python workers die randomly (monitored using top -p -H) once every few days and everything is alright if i kill the process and start the script again. I suspect the workers that vanish are the 3 GetDataFromSQS threads.. And because the GetDataFromSQS dies, the other 5 workers although running always sleep as there is no data in the python queue. I am not sure what I am doing wrong here as I am pretty new to python and followed this tutorial for creating this queuing logic and threads - http://www.ibm.com/developerworks/aix/library/au-threadingpython/

Thanks in advance for your help. Hope I have explained my problem clear.

like image 588
Codedroid Avatar asked Sep 01 '14 06:09

Codedroid


People also ask

Which is generally preferred SQS long polling or SQS short polling?

In almost all cases, Amazon SQS long polling is preferable to short polling. Long-polling requests let your queue consumers receive messages as soon as they arrive in your queue while reducing the number of empty ReceiveMessageResponse instances returned.

How do I read messages from SQS queue in Python?

To receive a message from an AWS SQS Python Queue, you can use receive_message() Boto3 method. The receive_message() SQS Python method retrieves one or more messages (up to 10), from your specified SQS Queue.

Is SQS always polling?

By default, queues use short polling. With short polling, the ReceiveMessage request queries only a subset of the servers (based on a weighted random distribution) to find messages that are available to include in the response. Amazon SQS sends the response right away, even if the query found no messages.

What is polling duration in AWS SQS?

The maximum long polling wait time is 20 seconds. Long polling helps reduce the cost of using Amazon SQS by eliminating the number of empty responses (when there are no messages available for a ReceiveMessage request) and false empty responses (when messages are available but aren't included in a response).


1 Answers

The problem for the thread hanging was related to getting a handle of the sqs queue. I used IAM for managing credentials and the boto sdk for connecting to sqs.

The root cause for this issue was that the boto package was reading the metadata for auth from AWS and it was failing once in a while.

The fix is to edit the boto config, increasing the attempts that are made to perform the auth call to AWS.

[Boto] metadata_service_num_attempts = 5

( https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E )

like image 131
Codedroid Avatar answered Oct 23 '22 10:10

Codedroid