I am using Python
3.5.1 with Requests
2.9.1. My use case as follows: I need to authenticate (get a token) from service T and use it as value of the Authorization
header when making requests to a resource server R. The token expires at some point and a new one needs to be fetched.
I have an application using requests
, which when started first fetches a token and remembers it - sets in the Session
used for requests to R. From then on for 3 minutes, everything works as a charm. After 3 minutes, I get unauthorized
responses as the token is invalid.
I use Session
for all requests except for the one to authenticate; this call updates the Authorization
header on the Session
for other requests to use.
I created code to re-authenticate automatically when unauthorized
is detected, using the response
hook (set only on the Session
), here is the code:
def __hook(self, res, *args, **kwargs):
if res.status_code == requests.codes.unauthorized:
print('Token expired, refreshing')
self.auth() # sets the token on self.__session
req = res.request
print('Resending request', req.method, req.url, req.headers)
req.headers['Authorization'] = self.__session.headers['Authorization'] # why is it needed?
return self.__session.send(res.request)
Basically, it even works. There are a couple of issues, though:
Why is it necessary to re-set the Authorization
header on the request, even though the session is updated and is used to resend the original request? Without the line, the application will just keep on refreshing the token as the new one is never used, this can be seen in the output, the token is the original one which caused the automatic refresh.
How can I make the code more robust, i.e. prevent endless recursion (I am not sure whether it is possible in reality, but with the line setting the Authorization
header on the retried request it will just go on an on)? I was thinking of setting a custom header, and if the hooks discovers the failed request has it, it would not re-authenticate and resend. Is there anything better?
Edit: it turns out it is possible to get an (almost) endless loop (it is recursive, after all) if the configuration is wrong: the tokens are taken for one environment (like STAGING), but the resource server is from another (TEST) - the auth
request will succeed but the token is actually incorrect for the resource server. For the time being I implemented the 'special' header solution mentioned above.
Is this a good approach in general or is there anything better suited for the task in requests
?
Here's the same solution but without using a class
session = requests.Session()
session.headers.update({"Authorization": f"Bearer deliberate-wrong-token"})
def refresh_token(r, *args, **kwargs):
if r.status_code == 401:
logger.info("Fetching new token as the previous token expired")
token = get_token()
session.headers.update({"Authorization": f"Bearer {token}"})
r.request.headers["Authorization"] = session.headers["Authorization"]
return session.send(r.request, verify=False)
session.hooks['response'].append(refresh_token)
The eventual code I came up with to solve this. It is more complete than OP. It exposes a session attribute you can use for authenticated requests.
The reason why you have to recall auth is because .send
just sends the PreparedRequest.
If you don't like the REATTEMPT header I added to prevent infinite recursion. You can also de-register the hook like OP mentioned (but you have to register it again for use in the future), or in my coded class instead do the new request with self._session
instead of self.session
. (self._session
is just a session without hooks.)
import requests
from urllib.parse import urljoin
class JWTAuth:
def __init__(self, user, passw, base):
self.base = base
self.user, self.passw = user, passw
self._session = requests.Session() # Session for tokens
self.authenticate()
self.session = requests.Session() # Authenticated session
self.session.auth = self.auth
self.session.hooks['response'].append(self.reauth)
def abs_url(self, path):
"""Combine the base url with a path."""
return urljoin(self.base, path)
def auth(self, req):
"""Just set the authentication token, on every request."""
req.headers['Authorization'] = f'JWT {self.access}'
return req
def reauth(self, res, *args, **kwargs):
"""Hook to re-authenticate whenever authentication expires."""
if res.status_code == requests.codes.unauthorized:
if res.request.headers.get('REATTEMPT'):
res.raise_for_status()
self.refresh_auth()
req = res.request
req.headers['REATTEMPT'] = 1
req = self.session.auth(req)
res = self.session.send(req)
return res
def refresh_auth(self):
"""Use the refresh token to get a new access token."""
res = self._session.post(self.abs_url("api/token/refresh/"), data={"refresh": self.refresh})
if res.status_code == 200:
self.access = res.json()['access']
else:
# Token expired -> re-authenticate
self.authenticate()
def authenticate(self):
res = self._session.post(
self.abs_url("api/token/"),
data={"username": self.user, "password": self.passw},
)
res.raise_for_status()
data = res.json()
self.refresh, self.access = data['refresh'], data['access']
So I got in touch with the Requests author discussing an unrelated issue and asked about this, and it turns out this approach is Ok. The only changed I did was to drop the custom header (it does work and is also Ok) and now I just deregister the response hook within the hook before re-sending the request. This breaks the loop and all is nice.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With