I have a piece of multi threaded code - 3 threads that polls data from SQS and add it to a python queue. 5 threads that take the messages from python queue, process them and send it to a back end system.
Here is the code:
python_queue = Queue.Queue()
class GetDataFromSQS(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
def run(self):
while True:
time.sleep(0.5) //sleep for a few secs before querying again
try:
msgs = sqs_queue.get_messages(10)
if msgs == None:
print "sqs is empty now"!
for msg in msgs:
#place each message block from sqs into python queue for processing
self.python_queue.put(msg)
print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
#delete from sqs
sqs_queue.delete_message(msg)
except Exception as e:
print "Exception in GetDataFromSQS :: " + e
class ProcessSQSMsgs(threading.Thread):
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
self.pool_manager = PoolManager(num_pools=6)
def run(self):
while True:
#grabs the message to be parsed from sqs queue
python_queue_msg = self.python_queue.get()
try:
processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
except Exception as e:
print "Error parsing:: " + e
finally:
self.python_queue.task_done()
def processMsgAndSendToBackend(msg, pool_manager):
if msg != "":
###### All the code related to processing the msg
for individualValue in processedMsg:
try:
response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
if response == None:
print "Error"
else:
response.release_conn()
except Exception as e:
print "Exception! Post data to backend: " + e
def startMyPython():
#spawn a pool of threads, and pass them queue instance
for i in range(3):
sqsThread = GetDataFromSQS(python_queue)
sqsThread.start()
for j in range(5):
parseThread = ProcessSQSMsgs(python_queue)
#parseThread.setDaemon(True)
parseThread.start()
#wait on the queue until everything has been processed
python_queue.join()
# python_queue.close() -- should i do this?
startMyPython()
The problem: 3 python workers die randomly (monitored using top -p -H) once every few days and everything is alright if i kill the process and start the script again. I suspect the workers that vanish are the 3 GetDataFromSQS threads.. And because the GetDataFromSQS dies, the other 5 workers although running always sleep as there is no data in the python queue. I am not sure what I am doing wrong here as I am pretty new to python and followed this tutorial for creating this queuing logic and threads - http://www.ibm.com/developerworks/aix/library/au-threadingpython/
Thanks in advance for your help. Hope I have explained my problem clear.
In almost all cases, Amazon SQS long polling is preferable to short polling. Long-polling requests let your queue consumers receive messages as soon as they arrive in your queue while reducing the number of empty ReceiveMessageResponse instances returned.
To receive a message from an AWS SQS Python Queue, you can use receive_message() Boto3 method. The receive_message() SQS Python method retrieves one or more messages (up to 10), from your specified SQS Queue.
By default, queues use short polling. With short polling, the ReceiveMessage request queries only a subset of the servers (based on a weighted random distribution) to find messages that are available to include in the response. Amazon SQS sends the response right away, even if the query found no messages.
The maximum long polling wait time is 20 seconds. Long polling helps reduce the cost of using Amazon SQS by eliminating the number of empty responses (when there are no messages available for a ReceiveMessage request) and false empty responses (when messages are available but aren't included in a response).
The problem for the thread hanging was related to getting a handle of the sqs queue. I used IAM for managing credentials and the boto sdk for connecting to sqs.
The root cause for this issue was that the boto package was reading the metadata for auth from AWS and it was failing once in a while.
The fix is to edit the boto config, increasing the attempts that are made to perform the auth call to AWS.
[Boto] metadata_service_num_attempts = 5
( https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With