Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

type() function doesn't return correct result for a boto3 sqs object?

I'm using Python 3's type hinting syntax, and I'm writing a small AWS application that uses SQS. I'm trying to hint the type of the Queue. This is how I obtain the type of the Queue:

>>> import boto3
>>> session = boto3.session.Session(
>>>     aws_access_key_id=AWS_ACCESS_KEY,
>>>     aws_secret_access_key=AWS_SECRET_KEY,
>>>     region_name='us-west-2'
>>> )
>>> sqs = session.resource('sqs')
>>> queue=sqs.Queue(AWS_QUEUE_URL)
>>> 
>>> type(queue)
<class 'boto3.resources.factory.sqs.Queue'>

And I write my type-hinted function like this:

def get_session() -> boto3.resources.factory.sqs.Queue:
    ...

But I get an error:

AttributeError: module 'boto3.resources.factory' has no attribute 'sqs'

I've looked through the package myself using dir(...). It seems that factory does not contain sqs, indeed. Thus, I have two questions:

  • Why is type returning this nonexistent class?
  • How can I find the right type of this object?
like image 377
Newb Avatar asked Mar 30 '17 21:03

Newb


People also ask

How to send a message to the SQS queue from boto3?

We will be using the receive_message method from Boto3 to send a message to the SQS queue. Some of the important parameters to keep in mind while using this method: MaxNumberOfMessages: The maximum number of messages to retrieve. WaitTimeSeconds: Amount of time to wait for a message to arrive in the queue. Useful for long-polling of messages.

How does boto3 work with Amazon SQS APIs?

Here’s how we can instantiate the Boto3 SQS client to start working with Amazon SQS APIs: Amazon SQS provides an HTTP API over which applications can submit and read messages out of a messaging queue. An SQS queue works like a buffer between the application components that receive data and those components that process the data in your system.

How do I delete a queue in boto3?

The get_queue_url () method returns a queue URL. To delete an SQS queue, we will use the delete_queue () method from the Boto3 library. Note: when you delete a queue, any existing messages in the queue will disappear. The delete a queue operation might take up to 60 seconds to complete.

How to change SQS queue attributes after it has been created?

There is often a need to change SQS attributes like the VisiblityTimeout or DelaySeconds after the queue has already been created. We can use the set_queue_attributes method to change the attributes. How to remove all messages from a SQS queue? Boto3 provides the purge_queue method to delete all messages from the Queue.


1 Answers

The class of sqs.Queue appears to be generated on the fly every time it's called:

>>> import boto3
>>> session = boto3.session.Session(aws_access_key_id='foo', aws_secret_access_key='bar', region_name='us-west-2')
>>> sqs = session.resource('sqs')
>>> sqs.Queue
<bound method sqs.ServiceResource.Queue of sqs.ServiceResource()>
>>> q = sqs.Queue('blah')
>>> type(q)
<class 'boto3.resources.factory.sqs.Queue'>
>>> q2 = sqs.Queue('bluh')
>>> type(q) == type(q2)
False

So that's kind of a bad design choice on boto's end. I think it means it's not possible to reasonably type annotate it, even with forward references.

Your best bet is to give a type hint for the common base class of all these dynamic classes, boto3.resources.base.ServiceResource:

>>> type(q).__bases__
(<class 'boto3.resources.base.ServiceResource'>,)
like image 81
Nik Haldimann Avatar answered Nov 14 '22 23:11

Nik Haldimann