Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Generating an access token programatically with Django OAuth2 Toolkit

I'm using Python Social Auth and Django OAuth Toolkit to manage my user accounts and restrict access to my REST API.

I can create a token for users that sign up manually with my app by using the regular

curl -X POST -d "grant_type=password&username=<user_name>&password=<password>" -u"<client_id>:<client_secret>" http://localhost:8000/o/token/

But when I register my users with PSA by their access token, I'd like to create a OAuth2 Toolkit token for my own app and return it as JSON to the client so it can use it for making requests with my API.

Presently, I generate token simply using generate_token from oauthlib, is that good practice? Should I take into consideration other factors?

from oauthlib.common import generate_token

...

@psa('social:complete')
def register_by_access_token(request, backend):
    # This view expects an access_token GET parameter, if it's needed,
    # request.backend and request.strategy will be loaded with the current
    # backend and strategy.
    token = request.GET.get('access_token')
    user = request.backend.do_auth(token)

    if user:
        login(request, user)
        app = Application.objects.get(name="myapp")

        # We delete the old one
        try:
            old = AccessToken.objects.get(user=user, application=app)
        except:
            pass
        else:
            old.delete()

        # We create a new one
        tok = generate_token()

        AccessToken.objects.get_or_create(user=user,
                                          application=app,
                                          expires=now() + timedelta(days=365),
                                          token=tok)

        return "OK" # I will eventually return JSON with the token
    else:
        return "ERROR"
like image 882
Felix D. Avatar asked Apr 05 '15 15:04

Felix D.


1 Answers

I've recently used https://github.com/PhilipGarnero/django-rest-framework-social-oauth2 for this purpose like user Felix D. suggested. Below is my implementation:

class TokenHandler:
    application = Application.objects.filter(name=APPLICATION_NAME).values('client_id', 'client_secret')

    def handle_token(self, request):
        """
        Gets the latest token (to access my API) and if it's expired, check to see if the social token has expired.
        If the social token has expired, then the user must log back in to access the API. If it hasn't expired, 
        (my) token is refreshed.
        """
        try:
            token_list = AccessToken.objects.filter(user=request.user)\
                .order_by('-id').values('token', 'expires')
            if token_list[0]['expires'] < datetime.now(timezone.utc):
                if not self.social_token_is_expired(request):
                    token = self.refresh_token(request)
                else:
                    token = 'no_valid_token'
            else:
                token = token_list[0]['token']
        except IndexError:  # happens where there are no old tokens to check
            token = self.convert_social_token(request)
        except TypeError:  # happens when an anonymous user attempts to get a token for the API

            token = 'no_valid_token'
        return token

    def convert_social_token(self, request):
        grant_type = 'convert_token'
        client_id = self.application[0]['client_id']
        client_secret = self.application[0]['client_secret']
        try:
            user_social_auth = request.user.social_auth.filter(user=request.user).values('provider', 'extra_data')[0]
            backend = user_social_auth['provider']
            token = user_social_auth['extra_data']['access_token']
            url = get_base_url(request) + reverse('convert_token')
            fields = {'client_id': client_id, 'client_secret': client_secret, 'grant_type': grant_type,
                      'backend': backend,
                      'token': token}
            if backend == 'azuread-oauth2':
                fields['id_token'] = user_social_auth['extra_data']['id_token']
            response = requests.post(url, data=fields)
            response_dict = json.loads(response.text)
        except IndexError:
            return {'error': 'You must use an OAuth account to access the API.'}
        except UserSocialAuth.DoesNotExist:
            return {'error': 'You must use an OAuth account to access the API.'}
        return response_dict['access_token']

    def refresh_token(self, request):
        grant_type = 'refresh_token'
        client_id = self.application[0]['client_id']
        client_secret = self.application[0]['client_secret']
        try:
            refresh_token_object = RefreshToken.objects.filter(user=request.user).order_by('-id').values('token')[0]
            token = refresh_token_object['token']
            url = get_base_url(request) + reverse('token')
            fields = {'client_id': client_id, 'client_secret': client_secret, 'grant_type': grant_type,
                      'refresh_token': token}
            response = requests.post(url, data=fields)
            response_dict = json.loads(response.text)
        except RefreshToken.DoesNotExist:
            return {'error': 'You must use an OAuth account to access the API.'}

        return response_dict['access_token']

    @staticmethod
    def social_token_is_expired(request):
        user_social_auth = UserSocialAuth.objects.filter(user=request.user).values('provider', 'extra_data')[0]
        try:
            return float(user_social_auth['extra_data']['expires_on']) <= datetime.now().timestamp()
        except KeyError:  # social API did not provide an expiration
            return True  # if our token is expired and social API did not provide a time, we do consider them expired
like image 165
DragonBobZ Avatar answered Oct 12 '22 00:10

DragonBobZ