Implementing JWT Authentication and Permission Control

In previous chapters, our API was completely "exposed." Anyone who knew your API URL could freely read or delete data. If this were a commercial system, we must ensure:

  1. Knowing who is calling the API.
  2. Confirming they have permission to perform the operation.

In modern web development (especially in frontend-backend separated architectures), the most mainstream solution is JWT (JSON Web Token).

1. What is JWT? Why Not Use Cookie/Session?

In traditional servers, user login states (Session) were stored in server memory. As the number of users increased, the server would crash; moreover, if you had multiple servers, sharing Sessions was difficult.

JWT is "stateless." When a user successfully logs in with their credentials, the server issues a digitally signed "pass (Token)." This pass is a long string of gibberish (e.g., eyJhbG...), which records "who this person is" and "when it expires." Thereafter, the frontend includes this Token in the HTTP Header with every API request. The server only needs to verify if the digital signature is correct to decide whether to grant access.

2. Password Encryption (Hashing)

Before issuing the pass, we must first handle the registration process. Never, ever store plaintext passwords in the database! If the database is leaked, all users' passwords are exposed. We must use hashing algorithms (like bcrypt) to encrypt them.

First, install the packages:

pip install passlib[bcrypt] python-jose[cryptography]

Create a security.py in your project:

from passlib.context import CryptContext

# Specify the use of bcrypt algorithm
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Encrypt plaintext passwords
def get_password_hash(password: str):
    return pwd_context.hash(password)

# Verify if the user-input password matches the hashed value in the database
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

Now, when a user registers, simply write get_password_hash(user.password) to the database.

3. Implementing Login Route and Issuing JWT

Next, we'll implement the login functionality. The user sends their Email and password, and upon verification, we issue a JWT to them.

from datetime import datetime, timedelta
from jose import jwt

SECRET_KEY = "super_secret_long_string_never_leak_this" # Store this in .env variables
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token expires after 30 minutes

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    
    # Issue JWT
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

Implement the login API:

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    # 1. Look up the user in the database (assuming form_data.username is email)
    user = db.query(models.User).filter(models.User.email == form_data.username).first()
    
    # 2. If user not found or password verification fails
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
        
    # 3. On successful verification, generate Token (embed the user's email)
    access_token = create_access_token(data={"sub": user.email})
    
    return {"access_token": access_token, "token_type": "bearer"}

[!TIP] Why use OAuth2PasswordRequestForm? This is FastAPI's built-in dependency tool. When you use it, FastAPI recognizes this as a login endpoint and automatically adds a green Authorize button to the top-right of your Swagger docs! You can test login directly on the docs pageโ€”super convenient.

4. Creating Route Guards

The Token is issued, but how do we protect other APIs to ensure only those with valid Tokens can call them? We again use FastAPI's most powerful feature: Dependency Injection.

Write a dependency function to parse the Token:

from fastapi.security import OAuth2PasswordBearer
from jose import JWTError

# Tell FastAPI where to get the Tokenโ€”this also helps generate the lock icon in Swagger
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    try:
        # Decode the Token's signature
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
        
    # Fetch the user from the database
    user = db.query(models.User).filter(models.User.email == email).first()
    if user is None:
        raise credentials_exception
    return user

5. Applying the Protection Layer

Now, protecting an API is as easy as magic! Just include get_current_user as a dependency in the parameters:

@app.get("/users/me")
async def read_users_me(current_user: models.User = Depends(get_current_user)):
    # If the code reaches here, the Token is valid, and we already have the user object!
    return {"email": current_user.email, "is_active": current_user.is_active}

Any request without a valid Token will be blocked by FastAPI before reaching this code, returning a 401 Unauthorized response.

At this point, your microservice has full database storage and security capabilities. The final step is to deploy it to the cloud, making your API accessible to the world!

JWT Token Structure

A JWT token consists of three parts separated by dots.

header.payload.signature

Header

{
  "alg": "HS256",
  "typ": "JWT"
}

Payload (Claims)

{
  "sub": "1234567890",
  "username": "alice",
  "role": "admin",
  "iat": 1516239022,
  "exp": 1516242622
}

Standard Claims

| Claim | Full Name | Purpose | |-------|-----------|---------| | sub | Subject | User identifier | | iat | Issued At | Token creation timestamp | | exp | Expiration | Token expiry timestamp | | iss | Issuer | Who issued the token | | aud | Audience | Intended recipient |

Password Hashing

Never store passwords in plain text. Use passlib with bcrypt.

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

Complete Auth Flow

Login Endpoint

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = db.query(User).filter(User.username == form_data.username).first()
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password"
        )
    
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": str(user.id), "username": user.username},
        expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

Token Creation

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

Dependency: Get Current User

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"}
    )
    
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    user = db.query(User).filter(User.id == int(user_id)).first()
    if user is None:
        raise credentials_exception
    return user

Protected Route

@app.get("/users/me", response_model=UserResponse)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

@app.get("/admin/dashboard")
async def admin_dashboard(current_user: User = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    return {"message": "Welcome to the admin dashboard"}

Token Best Practices

| Practice | Why | |----------|-----| | Short expiry (15-30 min) | Limits damage if token is stolen | | Use refresh tokens | Allows seamless re-authentication | | Store in httpOnly cookies | Prevents XSS token theft | | Rotate SECRET_KEY periodically | Reduces risk of compromised keys | | Validate aud claim | Ensures token is for your API | | Rate-limit login endpoint | Prevents brute force attacks |

Summary

JWT authentication provides stateless, secure API access control. Combined with password hashing and OAuth2, it forms the security backbone of modern microservices.

Key takeaways:

  • JWT = header.payload.signature, encoded with a secret key |
  • Always hash passwords with bcrypt, never store plain text |
  • OAuth2PasswordBearer extracts Bearer tokens from requests |
  • get_current_user dependency validates tokens and returns user |
  • Standard claims: sub (user ID), exp (expiry), iat (issued at) |
  • Use short token expiry (15-30 min) + refresh tokens |
  • Protect routes by adding current_user: User = Depends(get_current_user) |
  • Role-based access: check current_user.role for admin routes |

What's Next: Docker and Deployment

The next chapter covers Dockerizing your API and deploying to Render.

Unlock Full Tutorial

This chapter is paid content. Join the project to unlock over 5000 words of deep analysis, including 10+ god-tier Prompts and real Source Code examples!