Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boto3 how i can cognito auth

Today I want to integrate with AWS Cognito. I use Python SDK interface - boto3.

In the docs I can find the method to sign up account, but I can't find authenticate user. doc: https://boto3.readthedocs.io/en/latest/reference/services/cognito-idp.html

My question is, Maybe this method is not implemented? So, if this method is not implemented. Maybe someone created a method for auth, for AWS cognito?

Thanks guys :)

like image 874
Konrad Mariusz Kur Avatar asked Jun 18 '16 14:06

Konrad Mariusz Kur


People also ask

How do I use Cognito for authorization?

2.1.Go to AWS Cognito service and click “Manage Identity Pools”. 2. Enter “Identity pool name”, expand the “Authentication providers” section and select “Cognito” tab. This is where the Cognito authentication provider will be registered with the Identity pool.

Can AWS Cognito be used for authorization?

An authorization code grant is a code parameter that Amazon Cognito appends to your redirect URL. Your app can exchange the code with the Token endpoint for access, ID, and refresh tokens. As a security best practice, and to receive refresh tokens for your users, use an authorization code grant in your app.


1 Answers

Writing a custom authorizer is what you need to. I used serverless to accomplish this because it offers the ability to cross-compile the native libs required to run on the lambda. I have created a comprehensive example that should help here.

The basics:

You will need something to verify the token. I use python-jose:

def get_claims(event, context):
    token = event['authorizationToken'][7:]
    # get the kid from the headers prior to verification
    headers = jwt.get_unverified_headers(token)
    kid = headers['kid']
    # search for the kid in the downloaded public keys
    key_index = -1
    for i in range(len(keys)):
    if kid == keys[i]['kid']:
        key_index = i
        break
    if key_index == -1:
       print('Public key not found in jwks.json')
       return False
    # construct the public key
    public_key = jwk.construct(keys[key_index])
    # get the last two sections of the token,
    # message and signature (encoded in base64)
    message, encoded_signature = str(token).rsplit('.', 1)
    # decode the signature
    decoded_signature = base64url_decode(encoded_signature.encode('utf-8'))
    # verify the signature
    if not public_key.verify(message.encode("utf8"), decoded_signature):
      print('Signature verification failed')
      return False
    print('Signature successfully verified')
    # since we passed the verification, we can now safely
    # use the unverified claims
    claims = jwt.get_unverified_claims(token)
    # additionally we can verify the token expiration
    if time.time() > claims['exp']:
      print('Token is expired')
      return False
    # and the Audience  (use claims['client_id'] if verifying an access token)
    if 'aud' in claims and claims['aud'] != app_client_id:
      print('Token was not issued for this audience')
      return False
    # now we can use the claims
    return claimsenter code here

Secondly you need the authorizer to return a policy based on the claims, in this example all paths are allowed, but it can be refined based on the claims if you wish:

def authorize(event, context):
  print("Client token: " + event['authorizationToken'])
  print("Method ARN: " + event['methodArn'])

  """
  validate the incoming token
  and produce the principal user identifier associated with the token
  this could be accomplished in a number of ways:
  1. Call out to OAuth provider
  2. Decode a JWT token inline
  3. Lookup in a self-managed DB
  """

  token = event['authorizationToken'][7:]
  unverified_claims = jwt.get_unverified_claims(token)
  print json.dumps(unverified_claims)
  principalId = jwt.get_unverified_claims(token).get('username')

  """
  you can send a 401 Unauthorized response to the client by failing like so:
  raise Exception('Unauthorized')
  if the token is valid, a policy must be generated which will allow or deny access to the client
  if access is denied, the client will recieve a 403 Access Denied response
  if access is allowed, API Gateway will proceed with the backend integration configured on the method that was called
  this function must generate a policy that is associated with the recognized principal user identifier.
  depending on your use case, you might store policies in a DB, or generate them on the fly
  keep in mind, the policy is cached for 5 minutes by default (TTL is configurable in the authorizer)
  and will apply to subsequent calls to any method/resource in the RestApi
  made with the same token
  the example policy below denies access to all resources in the RestApi
  """

  tmp = event['methodArn'].split(':')
  apiGatewayArnTmp = tmp[5].split('/')
  awsAccountId = tmp[4]
  policy = AuthPolicy(principalId, awsAccountId)
  policy.restApiId = apiGatewayArnTmp[0]
  policy.region = tmp[3]
  policy.stage = apiGatewayArnTmp[1]
  try:
    print 'getting claims'
    #verified = verify_token(jwt_token,'access_token','access')
    claims = get_claims(event, context)
    print json.dumps(claims)
    if claims != False:
        print 'a'
        policy.allowAllMethods()
    else:
        policy.denyAllMethods()
  except:
    policy.denyAllMethods()
    """policy.allowMethod(HttpVerb.GET, "/pets/*")"""
    # Finally, build the policy
    authResponse = policy.build()
    # new! -- add additional key-value pairs associated with the authenticated principal
    # these are made available by APIGW like so: $context.authorizer.<key>
    # additional context is cached
    context = {
      'key': 'value', # $context.authorizer.key -> value
      'number' : 1,
      'bool' : True
    }
    # context['arr'] = ['foo'] <- this is invalid, APIGW will not accept it
    # context['obj'] = {'foo':'bar'} <- also invalid
    authResponse['context'] = context
    return authResponse
like image 180
Clay Graham Avatar answered Sep 28 '22 11:09

Clay Graham