Web Security

FastAPI Security Guide: Auth, Input Validation, and OWASP Best Practices

Secure FastAPI applications with OAuth2PasswordBearer, Pydantic validation, CORS configuration, SQL injection prevention with SQLAlchemy, and rate limiting.

March 9, 20266 min readShipSafer Team

FastAPI Security Guide: Auth, Input Validation, and OWASP Best Practices

FastAPI's type system and automatic validation give it a security advantage over many frameworks — but authentication, authorization, and infrastructure-level controls still require explicit implementation. This guide covers a production-ready security setup for FastAPI applications.

Authentication with OAuth2 and JWT

FastAPI ships with OAuth2PasswordBearer, a dependency that extracts Bearer tokens from the Authorization header. Combine it with PyJWT for a complete JWT authentication flow:

pip install python-jose[cryptography] passlib[bcrypt]
# app/auth/jwt.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

SECRET_KEY = os.environ["JWT_SECRET_KEY"]   # min 32 chars, random
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire, "iat": datetime.utcnow()})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await get_user_by_id(user_id)
    if user is None:
        raise credentials_exception
    return user

Use the dependency in your routes:

from fastapi import APIRouter, Depends

router = APIRouter()

@router.get("/profile")
async def get_profile(current_user: User = Depends(get_current_user)):
    return current_user

@router.delete("/account")
async def delete_account(current_user: User = Depends(get_current_user)):
    # current_user is authenticated and validated
    await delete_user(current_user.id)
    return {"message": "Account deleted"}

Role-Based Authorization

Add a role-check dependency:

from functools import partial
from typing import set

def require_role(*roles: str):
    async def role_checker(current_user: User = Depends(get_current_user)) -> User:
        if current_user.role not in roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions",
            )
        return current_user
    return role_checker

# Usage
@router.delete("/users/{user_id}")
async def delete_user_admin(
    user_id: str,
    admin: User = Depends(require_role("admin", "super_admin")),
):
    await delete_user(user_id)

Pydantic Input Validation

FastAPI uses Pydantic models for request body validation. Define strict schemas with appropriate constraints:

from pydantic import BaseModel, EmailStr, Field, field_validator
import re

class UserCreateRequest(BaseModel):
    email: EmailStr
    password: str = Field(min_length=12, max_length=128)
    name: str = Field(min_length=1, max_length=100)
    role: str = Field(default="user")

    @field_validator("password")
    @classmethod
    def password_strength(cls, v: str) -> str:
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain an uppercase letter")
        if not re.search(r"[a-z]", v):
            raise ValueError("Password must contain a lowercase letter")
        if not re.search(r"\d", v):
            raise ValueError("Password must contain a digit")
        if not re.search(r"[!@#$%^&*]", v):
            raise ValueError("Password must contain a special character")
        return v

    @field_validator("role")
    @classmethod
    def validate_role(cls, v: str) -> str:
        allowed = {"user", "editor"}
        if v not in allowed:
            raise ValueError(f"Role must be one of: {allowed}")
        return v

For query parameters, use Query with constraints:

from fastapi import Query

@router.get("/users")
async def list_users(
    page: int = Query(default=1, ge=1),
    limit: int = Query(default=20, ge=1, le=100),
    search: str = Query(default=None, max_length=100),
):
    ...

SQL Injection Prevention with SQLAlchemy

SQLAlchemy's ORM and Core both use parameterized queries when used correctly:

from sqlalchemy.orm import Session
from sqlalchemy import select, text

# Safe — ORM parameterizes automatically
stmt = select(User).where(User.email == email)
user = session.scalars(stmt).first()

# Safe — Core with bound parameters
stmt = select(User).where(User.name.ilike(f"%{search}%"))

# Safe raw SQL with text() — always use bound parameters
stmt = text("SELECT * FROM users WHERE email = :email")
result = session.execute(stmt, {"email": email})

# DANGEROUS — string formatting in raw SQL
stmt = text(f"SELECT * FROM users WHERE email = '{email}'")  # SQL injection

Never use Python f-strings or .format() to build SQL queries.

CORS Configuration

FastAPI's CORS middleware must be configured with an explicit origin allowlist:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://app.example.com",
        "https://www.example.com",
    ],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
    allow_headers=["Content-Type", "Authorization"],
    max_age=3600,
)

Never use allow_origins=["*"] in combination with allow_credentials=True — this is both a FastAPI validation error and a security vulnerability.

Rate Limiting with slowapi

pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@router.post("/auth/login")
@limiter.limit("5/minute")
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
    # rate limited to 5 attempts per minute per IP
    ...

For user-based rate limiting (post-authentication):

def get_user_id(request: Request) -> str:
    # Extract user ID from JWT for per-user limiting
    token = request.headers.get("Authorization", "").removeprefix("Bearer ")
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload.get("sub", get_remote_address(request))
    except JWTError:
        return get_remote_address(request)

user_limiter = Limiter(key_func=get_user_id)

@router.post("/ai/generate")
@user_limiter.limit("10/hour")
async def generate(request: Request, current_user: User = Depends(get_current_user)):
    ...

Security Headers Middleware

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Permissions-Policy"] = "camera=(), microphone=()"
        response.headers["Strict-Transport-Security"] = (
            "max-age=63072000; includeSubDomains; preload"
        )
        return response

app.add_middleware(SecurityHeadersMiddleware)

Secrets Management

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    jwt_secret_key: str
    database_url: str
    redis_url: str
    debug: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

# Usage
settings = get_settings()

Pydantic Settings will raise a validation error at startup if any required environment variable is missing — fail fast rather than discovering a missing secret at runtime.

Error Handling

FastAPI's default validation error responses are safe, but custom exception handlers should never expose internal details:

from fastapi import Request
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error("Unhandled exception", exc_info=exc, extra={
        "path": request.url.path,
        "method": request.method,
    })
    return JSONResponse(
        status_code=500,
        content={"detail": "An internal error occurred"},
    )

Security Checklist

  • OAuth2PasswordBearer with JWT and bcrypt (rounds >= 12)
  • Role-based authorization dependency on admin routes
  • Pydantic models with Field constraints on all inputs
  • Query() constraints on all query parameters
  • SQLAlchemy ORM or text() with bound parameters — no f-strings in SQL
  • CORS configured with explicit origin allowlist, no wildcard with credentials
  • Rate limiting on auth endpoints and expensive operations
  • SecurityHeadersMiddleware applied
  • pydantic_settings for type-validated environment variables
  • Global exception handler returns generic 500 messages
  • pip-audit or safety check in CI pipeline

Check Your Security Score — Free

See exactly how your domain scores on DMARC, TLS, HTTP headers, and 25+ other automated security checks in under 60 seconds.