實作 JWT 身份驗證與權限控制
在先前的章節,我們的 API 是完全「裸奔」的。任何人只要知道你的 API 網址,就可以隨便讀取或刪除資料。 如果這是一個商業系統,我們必須確保:
- 知道呼叫 API 的人是誰。
- 確定他有權限執行這個操作。
在現代 Web 開發中(尤其前後端分離的架構),最主流的解決方案就是 JWT (JSON Web Token)。
1. 什麼是 JWT?為什麼不用 Cookie/Session?
過去傳統的伺服器會把使用者的登入狀態(Session)存在伺服器記憶體裡。當使用者變多,伺服器就會爆炸;而且如果你有多台伺服器,Session 很難共享。
JWT 是「無狀態 (Stateless)」的。
當使用者用帳號密碼登入成功後,伺服器會發放一張帶有數位簽章的「通行證 (Token)」。
這張通行證是一長串亂碼(例如 eyJhbG...),裡面記錄了「這個人是誰」以及「什麼時候過期」。
此後,前端每次發送 API 請求時,都會在 HTTP Header 裡帶上這張 Token。伺服器只要驗證數位簽章是否正確,就知道能不能放行。
2. 密碼加密 (Hashing)
在發放通行證之前,我們必須先處理註冊流程。
絕對、絕對不可以把明文密碼存進資料庫! 如果資料庫外洩,所有使用者的密碼就曝光了。我們必須使用雜湊 (Hash) 演算法(如 bcrypt)來加密。
先安裝套件:
pip install passlib[bcrypt] python-jose[cryptography]
在專案中建立一個 security.py:
from passlib.context import CryptContext
# 指定使用 bcrypt 演算法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 將明文密碼加密
def get_password_hash(password: str):
return pwd_context.hash(password)
# 驗證使用者輸入的密碼是否與資料庫中的 Hash 值吻合
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
現在,當使用者註冊時,把 get_password_hash(user.password) 寫入資料庫即可。
3. 實作登入路由與發放 JWT
接下來,我們要實作登入功能。使用者傳送 Email 和密碼,我們核對正確後,簽發一張 JWT 給他。
from datetime import datetime, timedelta
from jose import jwt
SECRET_KEY = "超級無敵無敵機密的長字串千萬不要外流" # 請存放在 .env 變數中
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 30 分鐘後過期
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
# 簽發 JWT
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
實作登入 API:
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
# 1. 到資料庫尋找使用者 (這裡假設 form_data.username 是 email)
user = db.query(models.User).filter(models.User.email == form_data.username).first()
# 2. 如果找不到人,或密碼驗證失敗
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="帳號或密碼錯誤",
headers={"WWW-Authenticate": "Bearer"},
)
# 3. 驗證成功,產生 Token (把使用者的 email 塞進去)
access_token = create_access_token(data={"sub": user.email})
return {"access_token": access_token, "token_type": "bearer"}
[!TIP] 為什麼要用
OAuth2PasswordRequestForm? 這是 FastAPI 內建的依賴工具。當你使用它時,FastAPI 知道這是一個登入入口,它會自動把你的 Swagger 文件右上角加上一個綠色的Authorize按鈕!你可以直接在文件畫面上進行登入測試,超級方便。
4. 建立路由守衛 (Route Guard)
Token 發放出去了,我們要怎麼保護其他的 API,確保只有拿著有效 Token 的人才能呼叫呢? 我們同樣使用 FastAPI 最強大的功能:Dependency Injection (依賴注入)。
寫一個依賴函式來解析 Token:
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
# 告訴 FastAPI 從哪裡拿 Token,這也會幫忙生出 Swagger 的鎖頭圖示
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無法驗證憑證",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 解開 Token 的簽章
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# 去資料庫把這個使用者撈出來
user = db.query(models.User).filter(models.User.email == email).first()
if user is None:
raise credentials_exception
return user
5. 套用保護層
現在,保護一個 API 就像變魔術一樣簡單!
只要把 get_current_user 放在參數裡當作依賴:
@app.get("/users/me")
async def read_users_me(current_user: models.User = Depends(get_current_user)):
# 只要程式能走到這裡,代表 Token 一定是有效的,而且我們已經拿到使用者物件了!
return {"email": current_user.email, "is_active": current_user.is_active}
任何沒有夾帶有效 Token 的請求,在進到這段程式碼之前,就會直接被 FastAPI 擋下來並回傳 401 Unauthorized。
到目前為止,你的微服務已經具備了完整的資料庫儲存與資安防護能力。 最後一步,我們要把它推上雲端,讓全世界的人都能呼叫你的 API!