
FastAPI는 OAuth2 및 JWT(Json Web Token)를 사용하여 강력하고 안전한 인증 시스템을 쉽게 구축할 수 있도록 지원합니다.
다음 섹션에서는 FastAPI에서 OAuth2와 JWT를 사용하여 인증 시스템을 구현하는 방법과 보안 강화를 위한 추가 팁을 제공하겠습니다.
기본 설정
필요한 라이브러리 설치
먼저 FastAPI와 관련된 라이브러리를 설치합니다.
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]
주요 라이브러리 설명:
- python-jose: JWT 생성 및 검증을 위한 라이브러리
- passlib: 비밀번호 해싱을 위한 라이브러리
- bcrypt: 안전한 비밀번호 해싱 알고리즘
FastAPI 애플리케이션 구성
기본 FastAPI 애플리케이션을 설정합니다.
# main.py
from fastapi import FastAPI
app = FastAPI(
title="FastAPI JWT Authentication",
description="OAuth2와 JWT를 활용한 인증 시스템",
version="1.0.0"
)
OAuth2 및 JWT 인증 구현
사용자 모델 및 비밀번호 해싱
사용자 모델을 정의하고 비밀번호를 해싱하는 유틸리티 함수를 작성합니다.
from pydantic import BaseModel, EmailStr
from passlib.context import CryptContext
from typing import Optional
class User(BaseModel):
username: str
email: Optional[EmailStr] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class UserCreate(BaseModel):
username: str
password: str
email: EmailStr
full_name: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# 비밀번호 해싱 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
JWT 생성 및 검증
JWT를 생성하고 검증하는 함수를 구현합니다.
from datetime import datetime, timedelta
from jose import JWTError, jwt
from fastapi import HTTPException, status
import os
# 환경 변수에서 비밀 키 가져오기
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key_change_in_production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
token_data = TokenData(username=username)
return payload
except JWTError:
return None
def decode_token(token: str) -> str:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
return username
OAuth2 설정
OAuth2PasswordBearer를 사용하여 인증을 설정합니다.
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 데이터베이스 시뮬레이션 (실제 환경에서는 데이터베이스 사용)
fake_users_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "testuser@example.com",
"hashed_password": hash_password("password123"),
"disabled": False,
},
"admin": {
"username": "admin",
"full_name": "Administrator",
"email": "admin@example.com",
"hashed_password": hash_password("admin123"),
"disabled": False,
}
}
def get_user(username: str) -> Optional[UserInDB]:
if username in fake_users_db:
user_dict = fake_users_db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(username: str, password: str) -> Optional[UserInDB]:
user = get_user(username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
로그인 엔드포인트 구현
사용자가 로그인할 수 있는 엔드포인트를 구현합니다.
from fastapi import Depends, HTTPException, status
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/register", response_model=dict)
async def register_user(user_data: UserCreate):
# 사용자 중복 확인
if get_user(user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# 새 사용자 생성
hashed_password = hash_password(user_data.password)
fake_users_db[user_data.username] = {
"username": user_data.username,
"full_name": user_data.full_name,
"email": user_data.email,
"hashed_password": hashed_password,
"disabled": False,
}
return {"message": "User registered successfully"}
보호된 엔드포인트 구현
JWT 인증을 요구하는 보호된 엔드포인트를 구현합니다.
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [
{"item_id": "Foo", "owner": current_user.username},
{"item_id": "Bar", "owner": current_user.username}
]
@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_active_user)):
return {
"message": f"Hello {current_user.username}, this is a protected route!",
"user_info": current_user
}
고급 인증 기능
리프레시 토큰 구현
from datetime import timedelta
REFRESH_TOKEN_EXPIRE_DAYS = 7
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
@app.post("/token", response_model=TokenResponse)
async def login_with_refresh_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.post("/refresh", response_model=Token)
async def refresh_access_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_type: str = payload.get("type")
if username is None or token_type != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
user = get_user(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
역할 기반 접근 제어 (RBAC)
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserWithRole(User):
role: UserRole
def get_current_user_with_role(token: str = Depends(oauth2_scheme)) -> UserWithRole:
# JWT에서 사용자 정보와 역할 추출
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
username = payload.get("sub")
user = get_user(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
# 실제 환경에서는 데이터베이스에서 역할 조회
role = UserRole.ADMIN if username == "admin" else UserRole.USER
return UserWithRole(**user.dict(), role=role)
def require_role(required_role: UserRole):
def role_checker(current_user: UserWithRole = Depends(get_current_user_with_role)):
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
return role_checker
@app.get("/admin/users")
async def get_all_users(admin_user: UserWithRole = Depends(require_role(UserRole.ADMIN))):
return {"users": list(fake_users_db.keys()), "admin": admin_user.username}
보안 강화를 위한 추가 팁
환경 변수 관리
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
secret_key: str = "your_secret_key_change_in_production"
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
class Config:
env_file = ".env"
settings = Settings()
# .env 파일 예시
"""
SECRET_KEY=super_secret_key_for_production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
"""
CORS 및 보안 헤더 설정
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # 프론트엔드 도메인
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# 신뢰할 수 있는 호스트만 허용
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["localhost", "*.example.com"]
)
# 보안 헤더 미들웨어
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
return response
레이트 리미팅
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/token")
@limiter.limit("5/minute") # 분당 5회 로그인 시도 제한
async def login_with_rate_limit(
request: Request,
form_data: OAuth2PasswordRequestForm = Depends()
):
# 로그인 로직...
pass
주요 보안 고려사항:
- 비밀 키 관리: SECRET_KEY는 안전하게 관리해야 하며, 환경 변수나 비밀 관리 시스템을 사용하여 저장하는 것이 좋습니다
- HTTPS 사용: 프로덕션 환경에서는 HTTPS를 사용하여 데이터 전송을 암호화해야 합니다
- JWT 만료 시간 설정: JWT의 만료 시간을 적절히 설정하여 보안을 강화합니다. 필요에 따라 리프레시 토큰을 구현할 수 있습니다
- 비밀번호 정책: 강력한 비밀번호 정책을 적용하고, 비밀번호를 주기적으로 변경하도록 유도합니다
- 로그 및 모니터링: 인증 관련 로그를 기록하고, 비정상적인 활동을 모니터링하여 보안 사고를 예방합니다
결론
FastAPI를 사용하여 OAuth2 및 JWT를 활용한 인증 시스템을 쉽게 구현할 수 있습니다.
위의 예제를 기반으로 다양한 기능을 추가하여 보다 안전하고 강력한 인증 시스템을 구축할 수 있습니다.
핵심 포인트:
- OAuth2PasswordBearer와 JWT 조합으로 표준 인증 구현
- 비밀번호 해싱으로 사용자 데이터 보호
- 리프레시 토큰으로 사용자 경험 개선
- 역할 기반 접근 제어로 세밀한 권한 관리
- 환경 변수와 보안 헤더로 보안 강화
FastAPI의 강력한 보안 기능을 활용하면 엔터프라이즈급 웹 애플리케이션에서 요구하는 높은 수준의 보안을 쉽게 구현할 수 있습니다.
FastAPI 성능 최적화 실전 가이드: 워커 설정부터 캐싱·DB·비동기 작업까지
FastAPI는 기본적으로 매우 빠른 웹 프레임워크지만, 실제 서비스 환경에서는 추가적인 성능 최적화가 필수입니다. 아래에 FastAPI 애플리케이션의 성능을 극대화할 수 있는 실전 팁을 정리합니다.
hoosfa.tistory.com
'IT기술 > 파이썬 (python)' 카테고리의 다른 글
| FastAPI 완벽 가이드: 현대적이고 고성능 Python 웹 프레임워크 (0) | 2025.07.06 |
|---|---|
| FastAPI로 머신러닝 모델 서빙 완벽 가이드: Docker와 함께하는 프로덕션 배포 (0) | 2025.07.05 |
| FastAPI 성능 최적화 실전 가이드: 워커 설정부터 캐싱·DB·비동기 작업까지 (0) | 2025.04.30 |
| [FastAPI] 대규모 프로젝트 설계 가이드: 모듈화, 의존성 주입, 라우터 분리 (0) | 2025.04.28 |
| [FastAPI] RESTful API와 GraphQL API – 차이점, 구현, 선택 가이드 (0) | 2025.04.27 |