FastAPI REST
Guía completa para construir APIs REST de alto rendimiento con FastAPI. Cubre desde la estructura inicial del proyecto hasta autenticación, validación con Pydantic, operaciones CRUD y manejo de errores en producción.
Project Setup Essentials
FastAPI es un framework moderno de Python basado en type hints estandar.
Utiliza Starlette para la parte web y
Pydantic para la validacion de datos, logrando un
rendimiento comparable a Node.js y Go.
Estructura del proyecto
Estructura recomendada para un proyecto FastAPI
api-project/
├── app/
│ ├── __init__.py
│ ├── main.py # Entry point, CORS, lifespan
│ ├── config.py # Settings con pydantic-settings
│ ├── database.py # Engine, SessionLocal, Base
│ ├── models/ # SQLAlchemy models
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── project.py
│ ├── schemas/ # Pydantic schemas
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── project.py
│ ├── routers/ # APIRouter modules
│ │ ├── __init__.py
│ │ ├── users.py
│ │ └── projects.py
│ ├── services/ # Business logic layer
│ │ └── user_service.py
│ └── middleware/ # Custom middleware
│ └── auth.py
├── tests/
├── requirements.txt
├── Dockerfile
└── .env Instalacion y dependencias
# Crear entorno virtual
python -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scripts\activate # Windows
# Instalar dependencias core
pip install fastapi[standard] uvicorn[standard]
pip install sqlalchemy alembic pydantic-settings
pip install python-jose[cryptography] passlib[bcrypt]
# Iniciar servidor de desarrollo
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 Entry point (main.py)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import engine, Base
from app.routers import users, projects
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: crear tablas, inicializar conexiones
Base.metadata.create_all(bind=engine)
yield
# Shutdown: cerrar conexiones
engine.dispose()
app = FastAPI(
title="Cookbooks API",
version="1.4.2",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Registrar routers
app.include_router(users.router, prefix="/api/v1", tags=["users"])
app.include_router(projects.router, prefix="/api/v1", tags=["projects"]) Routing & Endpoints
FastAPI usa APIRouter para organizar endpoints en modulos.
Cada router agrupa operaciones relacionadas con un recurso especifico.
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.user import UserCreate, UserRead, UserUpdate
from app.services import user_service
router = APIRouter(prefix="/users")
# GET /api/v1/users — Listar con paginación
@router.get("/", response_model=list[UserRead])
async def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(25, ge=1, le=100),
db: Session = Depends(get_db),
):
return user_service.get_users(db, skip=skip, limit=limit)
# GET /api/v1/users/{user_id} — Detalle
@router.get("/{user_id}", response_model=UserRead)
async def get_user(user_id: int, db: Session = Depends(get_db)):
user = user_service.get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# POST /api/v1/users — Crear
@router.post("/", response_model=UserRead, status_code=status.HTTP_201_CREATED)
async def create_user(user_in: UserCreate, db: Session = Depends(get_db)):
return user_service.create_user(db, user_in)
# PATCH /api/v1/users/{user_id} — Actualizar parcial
@router.patch("/{user_id}", response_model=UserRead)
async def update_user(
user_id: int,
user_in: UserUpdate,
db: Session = Depends(get_db),
):
return user_service.update_user(db, user_id, user_in)
# DELETE /api/v1/users/{user_id}
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: Session = Depends(get_db)):
user_service.delete_user(db, user_id) | Metodo | Endpoint | Descripcion | Status Code |
|---|---|---|---|
| GET | /users | Listar usuarios con paginacion | 200 |
| GET | /users/{id} | Obtener usuario por ID | 200 / 404 |
| POST | /users | Crear nuevo usuario | 201 |
| PATCH | /users/{id} | Actualizacion parcial | 200 / 404 |
| DELETE | /users/{id} | Eliminar usuario | 204 / 404 |
Pydantic Schemas
Los schemas definen la forma de los datos de entrada y salida. Pydantic v2 usa
model_validator y field_validator
para validaciones complejas. Separamos en Create, Read y
Update para controlar que campos se exponen en cada operacion.
from pydantic import BaseModel, EmailStr, field_validator, ConfigDict
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
"""Campos compartidos entre Create y Read."""
email: EmailStr
full_name: str
role: str = "viewer"
@field_validator("role")
@classmethod
def validate_role(cls, v: str) -> str:
allowed = {"admin", "editor", "viewer"}
if v not in allowed:
raise ValueError(f"Role must be one of {allowed}")
return v
class UserCreate(UserBase):
"""Schema para POST /users — incluye password."""
password: str
@field_validator("password")
@classmethod
def validate_password(cls, v: str) -> str:
if len(v) < 8:
raise ValueError("Password must be >= 8 characters")
return v
class UserRead(UserBase):
"""Schema para respuestas — nunca expone password."""
id: int
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserUpdate(BaseModel):
"""Schema para PATCH — todos opcionales."""
email: Optional[EmailStr] = None
full_name: Optional[str] = None
role: Optional[str] = None from_attributes=True
Con from_attributes=True (antes orm_mode), Pydantic puede
leer directamente desde objetos SQLAlchemy sin necesidad de convertir a dict.
CRUD Operations
La capa de servicios encapsula la logica de negocio y las operaciones de base de datos.
Los routers solo se encargan de HTTP; la logica real vive en services/.
# app/services/user_service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException
from passlib.context import CryptContext
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
pwd_context = CryptContext(schemes=["bcrypt"])
def get_users(db: Session, skip: int = 0, limit: int = 25):
return db.query(User).offset(skip).limit(limit).all()
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def create_user(db: Session, user_in: UserCreate):
# Verificar email unico
existing = db.query(User).filter(User.email == user_in.email).first()
if existing:
raise HTTPException(status_code=409, detail="Email already registered")
hashed = pwd_context.hash(user_in.password)
user = User(
email=user_in.email,
full_name=user_in.full_name,
role=user_in.role,
hashed_password=hashed,
)
db.add(user)
db.commit()
db.refresh(user)
return user
def update_user(db: Session, user_id: int, user_in: UserUpdate):
user = get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
update_data = user_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int):
user = get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(user)
db.commit() exclude_unset vs exclude_none
Usa exclude_unset=True en PATCH, no exclude_none.
Asi distingues entre “el campo no fue enviado” y “el campo fue enviado como null”.
Auth & Middleware
La autenticacion usa JWT (JSON Web Tokens) con la libreria
python-jose. El token se envia en el header
Authorization: Bearer <token>.
# app/middleware/auth.py
from datetime import datetime, timedelta
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from app.config import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
return user_id
# Usar en endpoints protegidos:
@router.get("/me")
async def read_profile(user_id: str = Depends(get_current_user)):
return {"user_id": user_id} Flujo de autenticacion
- El cliente envia credenciales a
POST /auth/login - El servidor valida y retorna un JWT
- El cliente incluye el JWT en cada peticion subsecuente
get_current_uservalida el token automaticamente viaDepends
Error Handling
FastAPI permite registrar exception handlers globales que capturan errores y los transforman en respuestas JSON consistentes. Esto asegura que el cliente siempre reciba un formato predecible.
# app/main.py — Exception handlers
from fastapi import Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError
# Error de validacion personalizado
@app.exception_handler(ValidationError)
async def validation_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"detail": exc.errors(),
"body": exc.body if hasattr(exc, "body") else None,
},
)
# Error generico (500)
@app.exception_handler(Exception)
async def generic_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={
"error": "internal_server_error",
"detail": "An unexpected error occurred",
},
) Formato estandar de error
{
"error": "not_found",
"detail": "User with id 42 not found",
"status_code": 404,
"timestamp": "2025-01-15T10:30:00Z"
} OpenAPI automatico
FastAPI genera documentacion Swagger/OpenAPI en /docs automaticamente.
Cada endpoint, schema y error handler se refleja sin configuracion adicional.
Background Tasks Async
En aplicaciones reales, muchas operaciones no necesitan completarse antes de responder al cliente: enviar emails, generar reportes, procesar imagenes o sincronizar datos con servicios externos. FastAPI ofrece BackgroundTasks para tareas ligeras y se integra con Celery para trabajos pesados y distribuidos.
BackgroundTasks nativo de FastAPI
BackgroundTasks ejecuta funciones despues de enviar la respuesta HTTP. Es ideal para tareas rapidas que no requieren reintentos ni persistencia.
# app/routers/notifications.py
from fastapi import APIRouter, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from app.database import get_db
from app.services.email_service import send_welcome_email
from app.services.audit_service import log_user_action
router = APIRouter(prefix="/notifications")
def send_email_task(email: str, subject: str, body: str):
"""Tarea en background para enviar email."""
# Esta funcion se ejecuta DESPUES de enviar la respuesta
import smtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = "noreply@cookbooks.dev"
msg["To"] = email
msg.set_content(body)
with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("user", "password")
server.send_message(msg)
@router.post("/welcome")
async def send_welcome(
user_email: str,
background_tasks: BackgroundTasks,
):
# Encolar multiples tareas en background
background_tasks.add_task(
send_email_task,
email=user_email,
subject="Bienvenido a Cookbooks",
body="Gracias por registrarte en nuestra plataforma.",
)
background_tasks.add_task(
log_user_action,
action="welcome_email_sent",
email=user_email,
)
# La respuesta se envia INMEDIATAMENTE
return {"message": "Welcome email queued", "email": user_email} Integracion con Celery para trabajos pesados
Cuando las tareas requieren reintentos, programacion, o ejecucion distribuida, Celery es la solucion estandar. Usa Redis o RabbitMQ como broker de mensajes.
# app/worker.py — Configuracion de Celery
from celery import Celery
celery_app = Celery(
"cookbooks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
task_track_started=True,
task_acks_late=True, # ACK despues de ejecutar (mas seguro)
worker_prefetch_multiplier=1, # Una tarea a la vez por worker
)
# --- Definir tareas ---
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def generate_report(self, user_id: int, report_type: str):
"""Genera un reporte PDF pesado."""
try:
# Simulacion de tarea pesada (5-30 segundos)
from app.services.report_service import build_report
result = build_report(user_id, report_type)
return {"status": "completed", "file_url": result.url}
except Exception as exc:
# Reintentar con backoff exponencial
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@celery_app.task
def cleanup_temp_files():
"""Tarea programada para limpiar archivos temporales."""
import glob, os
for f in glob.glob("/tmp/cookbooks_*.tmp"):
os.remove(f) Patron de polling para estado de tareas
El cliente envia una tarea, recibe un task_id, y consulta periodicamente el estado. Este patron es comun para operaciones que tardan mas de unos segundos.
# app/routers/reports.py
from fastapi import APIRouter, HTTPException
from celery.result import AsyncResult
from app.worker import celery_app, generate_report
router = APIRouter(prefix="/reports")
@router.post("/", status_code=202)
async def request_report(user_id: int, report_type: str = "monthly"):
"""Encola la generacion de un reporte. Retorna task_id."""
task = generate_report.delay(user_id, report_type)
return {
"task_id": task.id,
"status": "queued",
"poll_url": f"/api/v1/reports/status/{task.id}",
}
@router.get("/status/{task_id}")
async def get_report_status(task_id: str):
"""Consultar el estado de una tarea de Celery."""
result = AsyncResult(task_id, app=celery_app)
if result.state == "PENDING":
return {"task_id": task_id, "status": "pending"}
elif result.state == "STARTED":
return {"task_id": task_id, "status": "processing"}
elif result.state == "SUCCESS":
return {
"task_id": task_id,
"status": "completed",
"result": result.result,
}
elif result.state == "FAILURE":
return {
"task_id": task_id,
"status": "failed",
"error": str(result.info),
}
else:
return {"task_id": task_id, "status": result.state.lower()} | Caracteristica | BackgroundTasks | Celery |
|---|---|---|
| Complejidad | Minima, incluido en FastAPI | Requiere broker (Redis/RabbitMQ) |
| Reintentos | No soportado | Reintentos con backoff configurable |
| Persistencia | No, se pierde si el proceso muere | Si, las tareas persisten en el broker |
| Monitoreo | No incluido | Flower, resultado por task_id |
| Uso ideal | Emails, logs, notificaciones rapidas | Reportes, procesamiento de archivos, ETL |
Cuando usar cada opcion
Usa BackgroundTasks para operaciones que tardan menos de 5 segundos y no necesitan reintentos (emails, logs, webhooks). Usa Celery cuando necesites reintentos, programacion (crontab), o ejecucion en workers separados del proceso principal.
File Upload & Download I/O
Gestionar archivos es una necesidad comun en APIs REST: subir imagenes de perfil, adjuntar documentos, exportar reportes CSV o servir archivos generados. FastAPI proporciona UploadFile para uploads y StreamingResponse / FileResponse para descargas eficientes.
Subida de archivos con validacion
UploadFile usa un buffer en memoria para archivos pequenos y cambia automaticamente a disco temporal para archivos grandes. Siempre se debe validar el tipo MIME y el tamano antes de procesar.
# app/routers/files.py
import os
import uuid
import aiofiles
from fastapi import APIRouter, UploadFile, File, HTTPException, Depends
from fastapi.responses import FileResponse, StreamingResponse
from app.middleware.auth import get_current_user
router = APIRouter(prefix="/files")
# Configuracion
UPLOAD_DIR = "uploads"
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
ALLOWED_MIME_TYPES = {
"image/jpeg", "image/png", "image/webp",
"application/pdf",
"text/csv",
}
os.makedirs(UPLOAD_DIR, exist_ok=True)
async def validate_file(file: UploadFile) -> UploadFile:
"""Valida tipo MIME y tamano del archivo."""
# Validar tipo MIME
if file.content_type not in ALLOWED_MIME_TYPES:
raise HTTPException(
status_code=415,
detail=f"Tipo no soportado: {file.content_type}. "
f"Permitidos: {', '.join(ALLOWED_MIME_TYPES)}",
)
# Validar tamano leyendo en chunks
size = 0
while chunk := await file.read(8192):
size += len(chunk)
if size > MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"Archivo excede el limite de "
f"{MAX_FILE_SIZE // (1024*1024)} MB",
)
# Rebobinar el archivo para que pueda leerse de nuevo
await file.seek(0)
return file
@router.post("/upload")
async def upload_file(
file: UploadFile = File(..., description="Archivo a subir (max 10 MB)"),
user_id: str = Depends(get_current_user),
):
"""Sube un archivo con validacion de tipo y tamano."""
await validate_file(file)
# Generar nombre unico para evitar colisiones
ext = os.path.splitext(file.filename)[1]
unique_name = f"{uuid.uuid4().hex}{ext}"
file_path = os.path.join(UPLOAD_DIR, unique_name)
# Escribir archivo de forma asincrona
async with aiofiles.open(file_path, "wb") as out:
while chunk := await file.read(8192):
await out.write(chunk)
return {
"filename": unique_name,
"original_name": file.filename,
"size_bytes": os.path.getsize(file_path),
"content_type": file.content_type,
"download_url": f"/api/v1/files/download/{unique_name}",
} Subida de multiples archivos
@router.post("/upload-multiple")
async def upload_multiple(
files: list[UploadFile] = File(
..., description="Hasta 5 archivos simultaneos"
),
):
"""Sube multiples archivos en una sola peticion."""
if len(files) > 5:
raise HTTPException(
status_code=400,
detail="Maximo 5 archivos por peticion",
)
results = []
for file in files:
await validate_file(file)
ext = os.path.splitext(file.filename)[1]
unique_name = f"{uuid.uuid4().hex}{ext}"
file_path = os.path.join(UPLOAD_DIR, unique_name)
async with aiofiles.open(file_path, "wb") as out:
while chunk := await file.read(8192):
await out.write(chunk)
results.append({
"filename": unique_name,
"original_name": file.filename,
"size_bytes": os.path.getsize(file_path),
})
return {"uploaded": len(results), "files": results} Descarga directa y streaming de archivos grandes
@router.get("/download/{filename}")
async def download_file(filename: str):
"""Descarga directa de un archivo."""
file_path = os.path.join(UPLOAD_DIR, filename)
if not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail="Archivo no encontrado")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/octet-stream",
)
@router.get("/stream/{filename}")
async def stream_large_file(filename: str):
"""Streaming para archivos grandes (CSV, logs, exports)."""
file_path = os.path.join(UPLOAD_DIR, filename)
if not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail="Archivo no encontrado")
async def file_iterator():
async with aiofiles.open(file_path, "rb") as f:
while chunk := await f.read(64 * 1024): # 64 KB chunks
yield chunk
file_size = os.path.getsize(file_path)
return StreamingResponse(
file_iterator(),
media_type="application/octet-stream",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Length": str(file_size),
},
) Generacion y descarga de CSV en streaming
import csv
import io
from fastapi.responses import StreamingResponse
@router.get("/export/users")
async def export_users_csv(db: Session = Depends(get_db)):
"""Exporta todos los usuarios como CSV en streaming."""
users = db.query(User).all()
def csv_generator():
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "email", "full_name", "role", "created_at"])
yield output.getvalue()
output.seek(0)
output.truncate(0)
for user in users:
writer.writerow([
user.id, user.email, user.full_name,
user.role, user.created_at.isoformat(),
])
yield output.getvalue()
output.seek(0)
output.truncate(0)
return StreamingResponse(
csv_generator(),
media_type="text/csv",
headers={
"Content-Disposition": 'attachment; filename="users_export.csv"'
},
) | Metodo | Endpoint | Descripcion | Status Code |
|---|---|---|---|
| POST | /files/upload | Subir un archivo (max 10 MB) | 200 / 413 / 415 |
| POST | /files/upload-multiple | Subir hasta 5 archivos | 200 / 400 |
| GET | /files/download/{filename} | Descarga directa | 200 / 404 |
| GET | /files/stream/{filename} | Streaming para archivos grandes | 200 / 404 |
| GET | /files/export/users | Exportar usuarios como CSV | 200 |
Seguridad en uploads
Nunca confies solo en la extension del archivo. Valida siempre el content_type, limita el tamano maximo y genera nombres unicos con uuid para evitar colisiones y ataques de path traversal. En produccion, considera almacenar archivos en S3 o GCS en lugar del sistema de archivos local.