Today I want to integrate with AWS Cognito. I use Python SDK interface - boto3.
In the docs I can find the method to sign up account, but I can't find authenticate user. doc: https://boto3.readthedocs.io/en/latest/reference/services/cognito-idp.html
My question is, Maybe this method is not implemented? So, if this method is not implemented. Maybe someone created a method for auth, for AWS cognito?
Thanks guys :)
2.1.Go to AWS Cognito service and click “Manage Identity Pools”. 2. Enter “Identity pool name”, expand the “Authentication providers” section and select “Cognito” tab. This is where the Cognito authentication provider will be registered with the Identity pool.
An authorization code grant is a code parameter that Amazon Cognito appends to your redirect URL. Your app can exchange the code with the Token endpoint for access, ID, and refresh tokens. As a security best practice, and to receive refresh tokens for your users, use an authorization code grant in your app.
Writing a custom authorizer is what you need to. I used serverless to accomplish this because it offers the ability to cross-compile the native libs required to run on the lambda. I have created a comprehensive example that should help here.
The basics:
You will need something to verify the token. I use python-jose:
def get_claims(event, context):
token = event['authorizationToken'][7:]
# get the kid from the headers prior to verification
headers = jwt.get_unverified_headers(token)
kid = headers['kid']
# search for the kid in the downloaded public keys
key_index = -1
for i in range(len(keys)):
if kid == keys[i]['kid']:
key_index = i
break
if key_index == -1:
print('Public key not found in jwks.json')
return False
# construct the public key
public_key = jwk.construct(keys[key_index])
# get the last two sections of the token,
# message and signature (encoded in base64)
message, encoded_signature = str(token).rsplit('.', 1)
# decode the signature
decoded_signature = base64url_decode(encoded_signature.encode('utf-8'))
# verify the signature
if not public_key.verify(message.encode("utf8"), decoded_signature):
print('Signature verification failed')
return False
print('Signature successfully verified')
# since we passed the verification, we can now safely
# use the unverified claims
claims = jwt.get_unverified_claims(token)
# additionally we can verify the token expiration
if time.time() > claims['exp']:
print('Token is expired')
return False
# and the Audience (use claims['client_id'] if verifying an access token)
if 'aud' in claims and claims['aud'] != app_client_id:
print('Token was not issued for this audience')
return False
# now we can use the claims
return claimsenter code here
Secondly you need the authorizer to return a policy based on the claims, in this example all paths are allowed, but it can be refined based on the claims if you wish:
def authorize(event, context):
print("Client token: " + event['authorizationToken'])
print("Method ARN: " + event['methodArn'])
"""
validate the incoming token
and produce the principal user identifier associated with the token
this could be accomplished in a number of ways:
1. Call out to OAuth provider
2. Decode a JWT token inline
3. Lookup in a self-managed DB
"""
token = event['authorizationToken'][7:]
unverified_claims = jwt.get_unverified_claims(token)
print json.dumps(unverified_claims)
principalId = jwt.get_unverified_claims(token).get('username')
"""
you can send a 401 Unauthorized response to the client by failing like so:
raise Exception('Unauthorized')
if the token is valid, a policy must be generated which will allow or deny access to the client
if access is denied, the client will recieve a 403 Access Denied response
if access is allowed, API Gateway will proceed with the backend integration configured on the method that was called
this function must generate a policy that is associated with the recognized principal user identifier.
depending on your use case, you might store policies in a DB, or generate them on the fly
keep in mind, the policy is cached for 5 minutes by default (TTL is configurable in the authorizer)
and will apply to subsequent calls to any method/resource in the RestApi
made with the same token
the example policy below denies access to all resources in the RestApi
"""
tmp = event['methodArn'].split(':')
apiGatewayArnTmp = tmp[5].split('/')
awsAccountId = tmp[4]
policy = AuthPolicy(principalId, awsAccountId)
policy.restApiId = apiGatewayArnTmp[0]
policy.region = tmp[3]
policy.stage = apiGatewayArnTmp[1]
try:
print 'getting claims'
#verified = verify_token(jwt_token,'access_token','access')
claims = get_claims(event, context)
print json.dumps(claims)
if claims != False:
print 'a'
policy.allowAllMethods()
else:
policy.denyAllMethods()
except:
policy.denyAllMethods()
"""policy.allowMethod(HttpVerb.GET, "/pets/*")"""
# Finally, build the policy
authResponse = policy.build()
# new! -- add additional key-value pairs associated with the authenticated principal
# these are made available by APIGW like so: $context.authorizer.<key>
# additional context is cached
context = {
'key': 'value', # $context.authorizer.key -> value
'number' : 1,
'bool' : True
}
# context['arr'] = ['foo'] <- this is invalid, APIGW will not accept it
# context['obj'] = {'foo':'bar'} <- also invalid
authResponse['context'] = context
return authResponse
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With