import hashlib
import traceback
from datetime import timedelta
from http import HTTPStatus
from flask import Flask, jsonify, request
from flask_jwt_extended import JWTManager, jwt_required, get_jwt_identity, create_access_token
app = Flask(__name__)
jwt = JWTManager(app)
app.config["JWT_SECRET_KEY"] = "very-secret1234567890"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=15)
app.config["JWT_REFRESH_TOKEN_EXPIRES"] = timedelta(days=30)
host = "localhost"
port = 5000
test_password = "test_password"
db = [
{
"username": "test_user",
"email": "test_email.gmail.com",
"password": hashlib.sha256(test_password.encode()).hexdigest()
}
]
@app.route('/login', methods=['POST'])
def login():
try:
json_data = request.get_json()
email = json_data.get("email")
password = json_data.get("password")
if not email or not password:
response = jsonify(error="'email' and 'password' are required")
return response, HTTPStatus.BAD_REQUEST
# Check if email exists in DB
user_result = [user for user in db if user["email"].lower() == email.lower()]
# Check if the password is correct
encoded_password = hashlib.sha256(password.encode()).hexdigest()
if not user_result or user_result[0]["password"] != encoded_password:
response = jsonify(error="Wrong credentials")
return response, HTTPStatus.BAD_REQUEST
user = user_result[0]
# Generate JWT token and return it
access_token = create_access_token(identity=user["username"])
response = jsonify(username=user["username"], token=access_token)
return response, HTTPStatus.OK
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc())
response = jsonify(result={"error": "Server error"})
return response, HTTPStatus.INTERNAL_SERVER_ERROR
@app.route('/secured_page', methods=['GET'])
@jwt_required()
def __create_participant():
try:
response = jsonify(message="You are logged in as {}".format(get_jwt_identity()))
return response, HTTPStatus.OK
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc())
response = jsonify(result={"error": "Server error"})
return response, HTTPStatus.INTERNAL_SERVER_ERROR
if __name__ == '__main__':
app.run(host=host, port=port, debug=True)
You can use libs like python-jose for JWT functionalities, but you have to implement jwt_required
and create_access_token
by yourself, for example
create_access_token
from jose import JWTError, jwt
from datetime import datetime, timedelta
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
jwt_required (instead of a decorator, you create a function and set it as a dependency of your route)
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def jwt_required(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(username=username)
if user is None:
raise credentials_exception
@app.get('/secured_page', dependencies=[jwt_required])
def __create_participant():
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With