JWT認証と権限制御の実装
前の章節では、私たちのAPIは完全に「無防備」でした。APIのURLを知っている誰もが、自由にデータを読み取ったり削除したりできました。 もしこれが商用システムなら、以下のことを保証する必要があります:
- APIを呼び出している人が誰かを知っていること。
- その人が操作を実行する権限を持っていることを確認すること。
現代のWeb開発(特にフロントエンドとバックエンドが分離したアーキテクチャ)では、最も主流な解決策が JWT(JSON Web Token) です。
1. JWTとは?なぜCookie/Sessionを使わないのか?
従来のサーバーでは、ユーザーのログイン状態(Session)をサーバーのメモリに保存していました。ユーザーが増えるとサーバーがクラッシュしやすく、複数のサーバーがある場合、Sessionを共有するのが困難でした。
JWTは「ステートレス(Stateless)」です。
ユーザーがIDとパスワードでログインに成功すると、サーバーはデジタル署名付きの「通行証(Token)」を発行します。
この通行証は長い文字列(例:eyJhbG...)で、「この人が誰か」と「いつ期限切れになるか」が記録されています。
その後、フロントエンドはAPIリクエストを送信するたびに、HTTPヘッダーにこのTokenを添付します。サーバーはデジタル署名が正しいかどうかを検証するだけで、通過を許可できるか判断できます。
2. パスワードの暗号化(Hashing)
通行証を発行する前に、まず登録プロセスを処理する必要があります。
絶対に、絶対に平文のパスワードをデータベースに保存してはいけません! データベースが漏洩すると、すべてのユーザーのパスワードが暴露されます。bcryptのようなハッシュアルゴリズムを使用して暗号化する必要があります。
まずパッケージをインストールします:
pip install passlib[bcrypt] python-jose[cryptography]
���ロジェクトにsecurity.pyを作成します:
from passlib.context import CryptContext
# bcryptアルゴリズムを指定
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 平文パスワードを暗号化
def get_password_hash(password: str):
return pwd_context.hash(password)
# ユーザーが入力したパスワードがデータベースのハッシュ値と一致するか検証
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
これで、ユーザーが登録する際にget_password_hash(user.password)をデータベースに書き込むだけで済みます。
3. ログインルートとJWT発行の実装
次に、ログイン機能を実装します。ユーザーがEmailとパスワードを送信し、確認が成功したらJWTを発行します。
from datetime import datetime, timedelta
from jose import jwt
SECRET_KEY = "超機密の長い文字列を絶対に漏らさないでください" # .env変数に保存すること
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Tokenは30分後に期限切れ
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
# JWTを発行
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
ログインAPIを実装:
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
# 1. データベースからユーザーを検索(form_data.usernameはemailと仮定)
user = db.query(models.User).filter(models.User.email == form_data.username).first()
# 2. ユーザーが見つからないか、パスワード検証失敗
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="アカウントまたはパスワードが間違っています",
headers={"WWW-Authenticate": "Bearer"},
)
# 3. 検証成功、Tokenを生成(ユーザーのemailを埋め込む)
access_token = create_access_token(data={"sub": user.email})
return {"access_token": access_token, "token_type": "bearer"}
[!TIP] なぜ
OAuth2PasswordRequestFormを使うのか? これはFastAPIの組み込み依存ツールです。これを使用すると、FastAPIはこれがログインエンドポイントであることを認識し、Swaggerドキュメントの右上に緑色のAuthorizeボタンを自動的に追加します!ドキュメント画面で直接ログインテストができ、非常に便利です。
4. ルートガード(Route Guard)の作成
Tokenが発行されましたが、他のAPIを保護し、有効なTokenを持っている人だけが呼び出せるようにするにはどうすればよいでしょうか? ここでもFastAPIの最も強力な機能である**Dependency Injection(依存性注入)**を使用します。
Tokenを解析する依存関数を作成:
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
# FastAPIにTokenの取得場所を教える(Swaggerの鍵アイコンも生成)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="認証情報を検証できません",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Tokenの署名を解読
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# データベースからこのユーザーを取得
user = db.query(models.User).filter(models.User.email == email).first()
if user is None:
raise credentials_exception
return user
5. 保護層の適用
これで、APIを保護するのは魔法のように簡単です!
get_current_userを依存関係としてパラメータに配置するだけ:
@app.get("/users/me")
async def read_users_me(current_user: models.User = Depends(get_current_user)):
# このコードに到達できるということは、Tokenが有効で、ユーザーオブジェクトが取得できているということ!
return {"email": current_user.email, "is_active": current_user.is_active}
有効なTokenが含まれていないリクエストは、このコードに到達する前にFastAPIによってブロックされ、401 Unauthorizedが返されます。
ここまでで、あなたのマイクロサービスは完全なデータベースストレージとセキュリティ保護機能を備えました。 最後のステップは、これをクラウドにデプロイし、世界中の誰もがあなたのAPIを呼び出せるようにすることです!