JWT and Oauth2 Authentication with Fast API

Photo by FLY:D on Unsplash

JWT and Oauth2 Authentication with Fast API

Introduction

Authentication and authorization are essential concepts of API building. While building any API, it is vital to authenticate and authorize the users to access any resources. People often misuse these two terms. The difference between the terms can be understood by asking two simple questions.

  1. Who are you?

  2. What can you do?

Authentication answers the first question, and authorization answers the second. Authentication is verifying the identity of the person he claims to be, while authorization is granting a user access to particular resources. There are various ways one can implement authentication and authorization. Such as,

  1. API Keys

  2. Bearer Authentication

  3. Basic Authentication

  4. Oauth2

So, here in this article, we will learn how to implement Outh2 along with JWT in Fast API. But before that, let's understand what are Oauth2 and JWT.

What are Oauth2 and JWT?

Oauth2 (Open Authentication)

Oauth2 is simply an open standard for authorization. In simple terms, Oauth provides delegated access to resources for client applications. It can be used in web apps, APIs, mobile apps etc. Oauth2 is an upgrade over its predecessor, Oauth1a. There is no backward compatibility. So, Oauth2 is what is being accepted widely.

Oauth2 uses access tokens to authorize clients to access server resources on behalf of the users. An access token is a piece of data holding specific authorization to access data from a resource server.

JWT (JSON Web Token)

JWT, on the other hand, is an open standard for securely transmitting information from one place to other. JWT is an authentication method where the token is signed, which means the data inside JWTs are easily verifiable and trusted. Various cryptographic algorithms can be used to sign JWTs, such as RSA, HS256 etc.

Signed tokens don't ensure data privacy. The data shared over JWTs can easily be intercepted. If in case, someone tampers the data in between, the server will reject the claim as the token is signed.

A typical JWT has three parts. Header, Payload and signature. In a standard JWT structure, these three parts are separated by periods.

xxxxx.yyyyy.zzzzz

The header of a JWT holds the type of token and algorithm used for signing.

For example { "alg": "HS256", "typ": "JWT" }

This data is Base64Url encoded to form the first part of JWT.

The payload contains the data needed for signing, such as subject (sub), expiration time(exp), the issuer (iss) etc.

{ "sub": "1234567890", "name": "John Doe", "admin": true }

This is also base64Url encoded to form the second part.

The final consists of the signature. To create a signature, you need to take both the encoded parts, a secret key, and an algorithm specified in the header and sign that.

HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), secret)

This signature is the central idea of JWT-based authentication. It proves if the sender is the person he claims to be.

As we saw earlier, Oauth2 uses access tokens, but there is no standard for the type of token to be used. Hence, we can use JWT with Oauth2 to provide elevated security.

Oauth2 and JWT with Fast API

Well, enough chit-chat. Let's get to the business. So, here is what we will be doing.

Create a simple API with Fast API and SQLite to create and store text posts, and we will add Oauth2 and JWT functionality for authentication and authorisation. So, let's get started.

As with any Python project, first, create a virtual environment with any of your favourite tools.

python venv -m JWT_Auth

Enter into the virtual environment.

source bin/activate

Install the libraries to get started with the project.

anyio==3.6.2
bcrypt==4.0.1
cffi==1.15.1
click==8.1.3
cryptography==39.0.0
dnspython==2.2.1
ecdsa==0.18.0
email-validator==1.3.0
fastapi==0.88.0
greenlet==2.0.1
h11==0.14.0
idna==3.4
jose==1.0.0
jwt==1.3.1
passlib==1.7.4
pyasn1==0.4.8
pycparser==2.21
pydantic==1.10.4
python-decouple==3.6
python-jose==3.3.0
python-multipart==0.0.5
rsa==4.9
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.46
starlette==0.22.0
typing_extensions==4.4.0
uvicorn==0.20.0

Create a main.py in your environment. This will be the driver code. Create a simple API endpoint and set up our uvicorn server.

from fastapi import FastAPI

app = FastApi()

@app.get('/')
def welcome():
    return "Welcome"

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, log_level="info")

Execute the file by running the python script

python main.py

Then open the local host on the specified port i.e. 0.0.0.0:8000.

One thing that sets Fast API apart from other frameworks is we get integrated, Swagger UI.

To open swagger UI, go to localhost:8000/docs endpoint.

Now in a different file, define the pydantic models. For this project, we will have two different schemas, user schema and post schema.

from pydantic import BaseModel, Field, EmailStr
class post_schema(BaseModel): 
    title: str = Field(...) 
    body : str = Field(...)

class user_schema(BaseModel): 
    fullname : str = Field(...) 
    email : EmailStr = Field(...) 
    password : str = Field(...) #disabled : bool = None

    class Config:
        schema_extra = {
            "example": {
                "fullname": "Joe Doe",
                "email": "joe@xyz.com",
                "password": "any"
            }
        }

The next step is to create a database schema to store the data. To achieve this, we will use SQLalchemy ORM (Object Relational Mapper) to interact with our SQLite database.

from sqlalchemy import create_engine, Column, Integer, String, TEXT
from sqlalchemy.orm import declarative_base, sessionmaker

engine = create_engine("sqlite:///user_database.db")

Base = declarative_base()

SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)

class user_details(Base):
    __tablename__ = 'user_details_table'
    email = Column(String(30), primary_key = True)
    name = Column(String(256))
    password_hashed = Column(String(40))

class user_blogs(Base):
    __tablename__ = 'User_blogs'
    id = Column(Integer, primary_key = True)
    user_email = Column(String, )
    title = Column(TEXT)
    body = Column(TEXT)

Now in our main.py file, we will define the API path for users to sign up. We want the users to input their email and password in the sign-up form, so we will be using the user_schema model that we defined earlier.

Passwords should never be stored as plain text, so we will hash the password before storing it in the database. For that, we will use passlib's crypto_context. Then we will add and commit the data to our database and return a message.

from fastapi import FastAPI
from datamodels import user_schema, post_schema
from database import engine, Base, SessionLocal

Base.metadata.create_all(engine) #creates a database unless it exists
crypto_context = CryptContext(schemes="bcrypt")

app = FastApi()

def get_session():
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

@app.get('/')
def welcome():
    return "Welcome"

@app.post("/posts/sign_up", tags=['User'], status_code=status.HTTP_201_CREATED)
async def user_signUp(user: user_schema, session : Session = Depends(get_session)):
    user.password = crypto_context.hash(user.password)
    user_val = user_details(email=user.email, name = user.fullname, password_hashed = user.password)

    session.add(user_val)
    session.commit()

    return f"Mr/Mrs {user.fullname} welcome!"

Next, we will create an endpoint for the JWT token generation. For this, we will be using fastapi's Oauth2PasswordRequestForm as a dependency. It will return a class with objects like username, password, scopes, client_id and client_secret etc. But here, we only need a password and username.

from fastapi import FastAPI
from datamodels import user_schema, post_schema
from database import engine, Base, SessionLocal
from auth_handler import encode_jwt, decode_jwt

Base.metadata.create_all(engine) #creates a database unless it exists
crypto_context = CryptContext(schemes="bcrypt")

app = FastApi()

def get_session():
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

#fetches user details from databse in user_schema model format
def get_user(userid:str, session: Session = Depends(get_session)):

        session = Session(bind = engine, expire_on_commit=False)

        stmt = select(user_details).where(user_details.email == userid)
        result = session.execute(stmt)
        obj = list(result)[0].user_details
        return user_schema(**{'fullname':obj.name,'email':obj.email, 'password':obj.password_hashed})

def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not crypto_context.verify(password, user.password):
        return False
    return user

@app.get('/')
def welcome():
    return "Welcome"

@app.post("/posts/sign_up", tags=['User'], status_code=status.HTTP_201_CREATED)
async def user_signUp(user: user_schema, session : Session = Depends(get_session)):
    user.password = crypto_context.hash(user.password)
    user_val = user_details(email=user.email, name = user.fullname, password_hashed = user.password)

    session.add(user_val)
    session.commit()

    return f"Mr/Mrs {user.fullname} welcome!"

@app.post("/posts/token", tags=['User'])
async def user_login(user_details: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(user_details.username, user_details.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    else:
        access_token = encode_jwt(user.email)
        return {"access_token": access_token, "token_type": "bearer"}

Now, create a different file to create and decode JWT. But before that, create a secret key to sign JWT. There are several ways you can create a secret key. Here, we will use the secret library.

import secrets
return secrets.token_hex(10)

Now, create a .env file and save the values there.

secret = "367e3ee5b04132137894aeafcb3979f9"
algorithm = 'HS256'

We will now create two functions for encoding and decoding JWT.

from jose import jwt
from decouple import config
import time
from fastapi import HTTPException

jwt_token = config('secret')
algorithm = config('algorithm')

def encode_jwt(email:str):
    payload = {'user':email, 'expires':time.time()+180}  #JSON payload with user mail and expiration time 
    encoded_payload = jwt.encode(payload, key = jwt_token, algorithm=algorithm)
    return encoded_payload

def decode_jwt(token:str):    
    try:
        decoded_token = jwt.decode(token, key=jwt_token, algorithms = algorithm)  
        if decoded_token["expires"] >= time.time():
            return decoded_token 
    except:
        raise HTTPException(status_code=400, detail="inactive user")

Our user login and signup routes are now complete.

Create Protected Route

Now, we will create a protected route, that only authenticated users can access. This will allow them to publish text posts. We can achieve this by using Fast API's out-of-the-box support for dependency injection. Read more about it here.

Create a new route for writing posts.

@app.post("/posts", dependencies=[Depends(current_user)] ,tags=['posts'],status_code=status.HTTP_201_CREATED)
async def write_post(post:post_schema, session : Session = Depends(get_session), user : user_schema = Depends(current_user)):

    post = user_blogs(user_email= user.email, title = post.title, body = post.body)
    session.add(post)
    session.commit()

    return "post added"

You can see there's a dependency current_user. This is responsible for identifying if the user is active or not.

Now, let's define current_user().

Oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/posts/token')
async def current_user(token: str = Depends(Oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,   
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    decoded_token = decode_jwt(token)
    if not decoded_token:
        raise credentials_exception

    user = get_user(decoded_token['user'])

    return user

Add this function to main.py. The current_user() depends on Oauth2passwordbearer. Here the Oauth2passwordBearer takes the URL of the route responsible for JWT token generation i.e. /token. Oauth2passwordbearer will require the user's email and password to authorize accessing a particular route. The email and password will then be fed to the/topic endpoint to fetch a JWT. The generated JWT will be saved in memory.

Then each subsequent request to the protected endpoints will have the token sent as Authorization headers so OAuth2PasswordBearer can parse it.

from fastapi import FastAPI
from datamodels import user_schema, post_schema
from database import engine, Base, SessionLocal
from auth_handler import encode_jwt, decode_jwt

Base.metadata.create_all(engine) #creates a database unless it exists
crypto_context = CryptContext(schemes="bcrypt")
Oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/token')

app = FastApi()

def get_session():
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

#fetches user details from databse in user_schema model format
def get_user(userid:str, session: Session = Depends(get_session)):

        session = Session(bind = engine, expire_on_commit=False)

        stmt = select(user_details).where(user_details.email == userid)
        result = session.execute(stmt)
        obj = list(result)[0].user_details
        return user_schema(**{'fullname':obj.name,'email':obj.email, 'password':obj.password_hashed})

def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not crypto_context.verify(password, user.password):
        return False
    return user

async def current_user(token: str = Depends(Oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,   
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    decoded_token = decode_jwt(token)
    if not decoded_token:
        raise credentials_exception

    user = get_user(decoded_token['user'])

    return user
@app.get('/')
def welcome():
    return "Welcome"

@app.post("/posts/sign_up", tags=['User'], status_code=status.HTTP_201_CREATED)
async def user_signUp(user: user_schema, session : Session = Depends(get_session)):
    user.password = crypto_context.hash(user.password)
    user_val = user_details(email=user.email, name = user.fullname, password_hashed = user.password)

    session.add(user_val)
    session.commit()

    return f"Mr/Mrs {user.fullname} welcome!"

@app.post("/token", tags=['User'])
async def user_login(user_details: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(user_details.username, user_details.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    else:
        access_token = encode_jwt(user.email)
        return {"access_token": access_token, "token_type": "bearer"}

We can also add another protected route to view posts from an authenticated user.

@app.get("/posts/me", dependencies=[Depends(current_user)], tags=['posts'])
async def see_posts(user: user_schema = Depends(current_user)):
    session = Session(bind = engine, expire_on_commit=False)

    stmt = select(user_blogs).where(user_blogs.user_email == user.email)
    result = session.execute(stmt)
    post_list = []
    for row in result:
        post_dict = {'title':row.user_blogs.title,'body':row.user_blogs.body}

        post_list.append(post_dict)

    return post_list

This will search the database and return posts of an authenticated user using her email.

Now, let's have a look at our main.py file.

from fastapi import FastAPI, status, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from auth_handler import encode_jwt, decode_jwt
from passlib.context import CryptContext
import uvicorn

from datamodels import user_schema, UserLoginSchema, post_schema
from database import engine, Base, user_details, user_blogs, SessionLocal
from sqlalchemy.orm import Session
from sqlalchemy import select

Base.metadata.create_all(engine)

app = FastAPI()


users = dict()

crypto_context = CryptContext(schemes="bcrypt")
Oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/token')

def get_session():
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

def get_user(userid:str, session: Session = Depends(get_session)):

        session = Session(bind = engine, expire_on_commit=False)

        stmt = select(user_details).where(user_details.email == userid)
        result = session.execute(stmt)
        obj = list(result)[0].user_details
        return user_schema(**{'fullname':obj.name,'email':obj.email, 'password':obj.password_hashed})


def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not crypto_context.verify(password, user.password):
        return False
    return user

async def current_user(token: str = Depends(Oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,   
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    decoded_token = decode_jwt(token)
    if not decoded_token:
        raise credentials_exception    
    user = get_user(decoded_token['user'])    
    return user

@app.get('/')
def welcome():
    return "FastAPI JWT (JSON Web Token) Authentication"


@app.get("/posts/me", dependencies=[Depends(current_user)], tags=['posts'])
async def see_posts(user: user_schema = Depends(current_user)):
    session = Session(bind = engine, expire_on_commit=False)

    stmt = select(user_blogs).where(user_blogs.user_email == user.email)
    result = session.execute(stmt)
    post_list = []
    for row in result:
        post_dict = {'title':row.user_blogs.title,'body':row.user_blogs.body}

        post_list.append(post_dict)

    return post_list



@app.post("/posts", dependencies=[Depends(current_user)] ,tags=['posts'],status_code=status.HTTP_201_CREATED)
async def write_post(post:post_schema, session : Session = Depends(get_session), user : user_schema = Depends(current_user)):

    post = user_blogs(user_email= user.email, title = post.title, body = post.body)
    session.add(post)
    session.commit()

    return "post added"

@app.get('/users', tags=['User'])
async def users_in_db(session: Session = Depends(get_session)):

    users = session.query(user_details).all()
    user_list = []
    for user in users:

        data = {'user':user.email, 'name':user.name, 'password':user.password_hashed}

        user_list.append(data)

    return user_list

@app.post("/posts/sign_up", tags=['User'], status_code=status.HTTP_201_CREATED)
async def user_signUp(user: user_schema, session : Session = Depends(get_session)):
    user.password = crypto_context.hash(user.password)
    user_val = user_details(email=user.email, name = user.fullname, password_hashed = user.password)
    try:
        session.add(user_val)
        session.commit()
        return f"Mr/Mrs {user.fullname} welcome!"
    except:
        return ('user already exists')


@app.post("/posts/token", tags=['User'])
async def user_login(user_details: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(user_details.username, user_details.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    else:
        access_token = encode_jwt(user.email)
        return {"access_token": access_token, "token_type": "bearer"}

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, log_level="info", reload=True)

Run the script to fire the uvicorn server. Go to 0.0.0.0:8000/docs. We will have a nice and clear swagger UI.

The complete code can be found here on my GitHub.

You can also use Replit.

EndNote

So, this was all about Oauth2 and JWT implementation in Fast API. Thanks for reading, I hope you liked the article.