IT기술/파이썬 (python)

FastAPI OAuth2와 JWT 인증 완벽 가이드: 안전한 웹 애플리케이션 구축하기

후스파 2025. 7. 4. 15:18
반응형

FastAPI는 OAuth2 및 JWT(Json Web Token)를 사용하여 강력하고 안전한 인증 시스템을 쉽게 구축할 수 있도록 지원합니다.

다음 섹션에서는 FastAPI에서 OAuth2와 JWT를 사용하여 인증 시스템을 구현하는 방법과 보안 강화를 위한 추가 팁을 제공하겠습니다.


기본 설정

필요한 라이브러리 설치

먼저 FastAPI와 관련된 라이브러리를 설치합니다.

pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]

주요 라이브러리 설명:

  • python-jose: JWT 생성 및 검증을 위한 라이브러리
  • passlib: 비밀번호 해싱을 위한 라이브러리
  • bcrypt: 안전한 비밀번호 해싱 알고리즘

FastAPI 애플리케이션 구성

기본 FastAPI 애플리케이션을 설정합니다.

# main.py
from fastapi import FastAPI

app = FastAPI(
    title="FastAPI JWT Authentication",
    description="OAuth2와 JWT를 활용한 인증 시스템",
    version="1.0.0"
)

OAuth2 및 JWT 인증 구현

사용자 모델 및 비밀번호 해싱

사용자 모델을 정의하고 비밀번호를 해싱하는 유틸리티 함수를 작성합니다.

from pydantic import BaseModel, EmailStr
from passlib.context import CryptContext
from typing import Optional

class User(BaseModel):
    username: str
    email: Optional[EmailStr] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class UserCreate(BaseModel):
    username: str
    password: str
    email: EmailStr
    full_name: Optional[str] = None

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

# 비밀번호 해싱 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

JWT 생성 및 검증

JWT를 생성하고 검증하는 함수를 구현합니다.

from datetime import datetime, timedelta
from jose import JWTError, jwt
from fastapi import HTTPException, status
import os

# 환경 변수에서 비밀 키 가져오기
SECRET_KEY = os.getenv("SECRET_KEY", "your_secret_key_change_in_production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode.update({"exp": expire, "iat": datetime.utcnow()})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str) -> Optional[dict]:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            return None
        token_data = TokenData(username=username)
        return payload
    except JWTError:
        return None

def decode_token(token: str) -> str:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    return username

OAuth2 설정

OAuth2PasswordBearer를 사용하여 인증을 설정합니다.

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 데이터베이스 시뮬레이션 (실제 환경에서는 데이터베이스 사용)
fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": hash_password("password123"),
        "disabled": False,
    },
    "admin": {
        "username": "admin",
        "full_name": "Administrator",
        "email": "admin@example.com",
        "hashed_password": hash_password("admin123"),
        "disabled": False,
    }
}

def get_user(username: str) -> Optional[UserInDB]:
    if username in fake_users_db:
        user_dict = fake_users_db[username]
        return UserInDB(**user_dict)
    return None

def authenticate_user(username: str, password: str) -> Optional[UserInDB]:
    user = get_user(username)
    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

로그인 엔드포인트 구현

사용자가 로그인할 수 있는 엔드포인트를 구현합니다.

from fastapi import Depends, HTTPException, status

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.post("/register", response_model=dict)
async def register_user(user_data: UserCreate):
    # 사용자 중복 확인
    if get_user(user_data.username):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already registered"
        )

    # 새 사용자 생성
    hashed_password = hash_password(user_data.password)
    fake_users_db[user_data.username] = {
        "username": user_data.username,
        "full_name": user_data.full_name,
        "email": user_data.email,
        "hashed_password": hashed_password,
        "disabled": False,
    }

    return {"message": "User registered successfully"}

보호된 엔드포인트 구현

JWT 인증을 요구하는 보호된 엔드포인트를 구현합니다.

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception

    user = get_user(username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

@app.get("/users/me/items")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [
        {"item_id": "Foo", "owner": current_user.username},
        {"item_id": "Bar", "owner": current_user.username}
    ]

@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_active_user)):
    return {
        "message": f"Hello {current_user.username}, this is a protected route!",
        "user_info": current_user
    }

고급 인증 기능

리프레시 토큰 구현

from datetime import timedelta

REFRESH_TOKEN_EXPIRE_DAYS = 7

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

def create_refresh_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

@app.post("/token", response_model=TokenResponse)
async def login_with_refresh_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    refresh_token = create_refresh_token(data={"sub": user.username})

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

@app.post("/refresh", response_model=Token)
async def refresh_access_token(refresh_token: str):
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        token_type: str = payload.get("type")

        if username is None or token_type != "refresh":
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid refresh token"
            )

        user = get_user(username)
        if user is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="User not found"
            )

        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
            data={"sub": username}, expires_delta=access_token_expires
        )

        return {"access_token": access_token, "token_type": "bearer"}

    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token"
        )

역할 기반 접근 제어 (RBAC)

from enum import Enum

class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"

class UserWithRole(User):
    role: UserRole

def get_current_user_with_role(token: str = Depends(oauth2_scheme)) -> UserWithRole:
    # JWT에서 사용자 정보와 역할 추출
    payload = verify_token(token)
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials"
        )

    username = payload.get("sub")
    user = get_user(username)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found"
        )

    # 실제 환경에서는 데이터베이스에서 역할 조회
    role = UserRole.ADMIN if username == "admin" else UserRole.USER

    return UserWithRole(**user.dict(), role=role)

def require_role(required_role: UserRole):
    def role_checker(current_user: UserWithRole = Depends(get_current_user_with_role)):
        if current_user.role != required_role:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions"
            )
        return current_user
    return role_checker

@app.get("/admin/users")
async def get_all_users(admin_user: UserWithRole = Depends(require_role(UserRole.ADMIN))):
    return {"users": list(fake_users_db.keys()), "admin": admin_user.username}

보안 강화를 위한 추가 팁

환경 변수 관리

import os
from pydantic import BaseSettings

class Settings(BaseSettings):
    secret_key: str = "your_secret_key_change_in_production"
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7

    class Config:
        env_file = ".env"

settings = Settings()

# .env 파일 예시
"""
SECRET_KEY=super_secret_key_for_production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
"""

CORS 및 보안 헤더 설정

from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

# CORS 설정
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],  # 프론트엔드 도메인
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# 신뢰할 수 있는 호스트만 허용
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["localhost", "*.example.com"]
)

# 보안 헤더 미들웨어
@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    return response

레이트 리미팅

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/token")
@limiter.limit("5/minute")  # 분당 5회 로그인 시도 제한
async def login_with_rate_limit(
    request: Request,
    form_data: OAuth2PasswordRequestForm = Depends()
):
    # 로그인 로직...
    pass

주요 보안 고려사항:

  • 비밀 키 관리: SECRET_KEY는 안전하게 관리해야 하며, 환경 변수나 비밀 관리 시스템을 사용하여 저장하는 것이 좋습니다
  • HTTPS 사용: 프로덕션 환경에서는 HTTPS를 사용하여 데이터 전송을 암호화해야 합니다
  • JWT 만료 시간 설정: JWT의 만료 시간을 적절히 설정하여 보안을 강화합니다. 필요에 따라 리프레시 토큰을 구현할 수 있습니다
  • 비밀번호 정책: 강력한 비밀번호 정책을 적용하고, 비밀번호를 주기적으로 변경하도록 유도합니다
  • 로그 및 모니터링: 인증 관련 로그를 기록하고, 비정상적인 활동을 모니터링하여 보안 사고를 예방합니다

결론

FastAPI를 사용하여 OAuth2 및 JWT를 활용한 인증 시스템을 쉽게 구현할 수 있습니다.

위의 예제를 기반으로 다양한 기능을 추가하여 보다 안전하고 강력한 인증 시스템을 구축할 수 있습니다.

핵심 포인트:

  • OAuth2PasswordBearer와 JWT 조합으로 표준 인증 구현
  • 비밀번호 해싱으로 사용자 데이터 보호
  • 리프레시 토큰으로 사용자 경험 개선
  • 역할 기반 접근 제어로 세밀한 권한 관리
  • 환경 변수와 보안 헤더로 보안 강화

FastAPI의 강력한 보안 기능을 활용하면 엔터프라이즈급 웹 애플리케이션에서 요구하는 높은 수준의 보안을 쉽게 구현할 수 있습니다.

 

 

 

FastAPI 성능 최적화 실전 가이드: 워커 설정부터 캐싱·DB·비동기 작업까지

FastAPI는 기본적으로 매우 빠른 웹 프레임워크지만, 실제 서비스 환경에서는 추가적인 성능 최적화가 필수입니다. 아래에 FastAPI 애플리케이션의 성능을 극대화할 수 있는 실전 팁을 정리합니다.

hoosfa.tistory.com

 

반응형