I am using boto3 SQS client to receive messages from AWS SQS FIFO queue.
def consume_msgs():
sqs = None
try:
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
except Exception:
logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
logger.error(traceback.format_exc())
### more code to process message
The application is set up as service on EC2 using upstart
. It works fine most of time. But sometimes when I restart the service after code change, the app would exit with the following error
2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
return _get_default_session().client(*args, **kwargs)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
aws_session_token=aws_session_token, config=config)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
return self._components.get_component(name)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'
Restarting the service usually fixes it. It doesn't happen every time I restart the service. What is confusing is the KeyError
warning leading the actual error traceback. What exactly does this KeyError
refer to? It can't be the AWS_SECRET_ACCESS_KEY
since this key is never changed and it works just fine most of the time. The issue happens quite randomly and comes and goes. Therefore it is hard to debug. And I don't understand how this error escaped the try..except
block
Based on comments, this seem to be related to multithreading. consume_msg
is indeed run by multiple threads
def process_msgs():
for i in range(NUM_WORKERS):
t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
t.setDaemon(True)
t.start()
while True:
time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)
Async Events (such as SQS and SNS): will trigger two retries (by default). If all retries have failed, it's important to save the event somewhere for later processing.
An SQS Queue can also be configured with a Message Retention Period in seconds. This value specifies how long a message can stay on a queue before it is automatically deleted, regardless of its processing status. The retention period can be set between 60 seconds and 14 days, with the default at 4 days.
class SQS. Client. A low-level client representing Amazon Simple Queue Service (SQS) Welcome to the Amazon SQS API Reference . Amazon SQS is a reliable, highly-scalable hosted queue for storing messages as they travel between applications or microservices.
To confirm that a queue is empty (console)Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . In the navigation pane, choose Queues. On the Queues page, choose a queue. Choose the Monitoring tab.
Maybe I misunderstand some of the other answers, but in the case of multithreaded execution, I don't think that having one boto3 client object and passing it to other functions will work if those functions are executed in separate threads. I've been experiencing sporadic endpoint_resolver
errors invoking a boto3 client service, and they were stopped by following the example in the documentation and the comments on boto3 GitHub issues such as #1246 and #1592, and creating a separate session object in each thread. In my case, it meant an almost trivial change in my code, going from
client = boto3.client(variant, region_name = creds['region_name'],
aws_access_key_id = ...,
aws_secret_access_key = ...)
to
session = boto3.session.Session()
client = session.client(variant, region_name = creds['region_name'],
aws_access_key_id = ...,
aws_secret_access_key = ...)
in the function that is executed in separate threads. My reading of the OP's code for consume_msgs()
is that a similar change could be made and it would eliminate the occasional endpoint_resolver
error.
This github issue suggests you should set the sqs client in the top-level once (rather than in the function):
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
def consume_msgs():
# code to process message
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With