Docs / REST API / FastAPI

FastAPI REST

Guía completa para construir APIs REST de alto rendimiento con FastAPI. Cubre desde la estructura inicial del proyecto hasta autenticación, validación con Pydantic, operaciones CRUD y manejo de errores en producción.

Project Setup Essentials

FastAPI es un framework moderno de Python basado en type hints estandar. Utiliza Starlette para la parte web y Pydantic para la validacion de datos, logrando un rendimiento comparable a Node.js y Go.

Estructura del proyecto

text
Estructura recomendada para un proyecto FastAPI

api-project/
├── app/
│   ├── __init__.py
│   ├── main.py              # Entry point, CORS, lifespan
│   ├── config.py            # Settings con pydantic-settings
│   ├── database.py          # Engine, SessionLocal, Base
│   ├── models/              # SQLAlchemy models
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── project.py
│   ├── schemas/             # Pydantic schemas
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── project.py
│   ├── routers/             # APIRouter modules
│   │   ├── __init__.py
│   │   ├── users.py
│   │   └── projects.py
│   ├── services/            # Business logic layer
│   │   └── user_service.py
│   └── middleware/          # Custom middleware
│       └── auth.py
├── tests/
├── requirements.txt
├── Dockerfile
└── .env

Instalacion y dependencias

bash
# Crear entorno virtual
python -m venv venv
source venv/bin/activate  # Linux/Mac
venv\Scripts\activate     # Windows

# Instalar dependencias core
pip install fastapi[standard] uvicorn[standard]
pip install sqlalchemy alembic pydantic-settings
pip install python-jose[cryptography] passlib[bcrypt]

# Iniciar servidor de desarrollo
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

Entry point (main.py)

python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.config import settings
from app.database import engine, Base
from app.routers import users, projects

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: crear tablas, inicializar conexiones
    Base.metadata.create_all(bind=engine)
    yield
    # Shutdown: cerrar conexiones
    engine.dispose()

app = FastAPI(
    title="Cookbooks API",
    version="1.4.2",
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Registrar routers
app.include_router(users.router, prefix="/api/v1", tags=["users"])
app.include_router(projects.router, prefix="/api/v1", tags=["projects"])

Routing & Endpoints

FastAPI usa APIRouter para organizar endpoints en modulos. Cada router agrupa operaciones relacionadas con un recurso especifico.

python
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session

from app.database import get_db
from app.schemas.user import UserCreate, UserRead, UserUpdate
from app.services import user_service

router = APIRouter(prefix="/users")

# GET /api/v1/users — Listar con paginación
@router.get("/", response_model=list[UserRead])
async def list_users(
    skip: int = Query(0, ge=0),
    limit: int = Query(25, ge=1, le=100),
    db: Session = Depends(get_db),
):
    return user_service.get_users(db, skip=skip, limit=limit)

# GET /api/v1/users/{user_id} — Detalle
@router.get("/{user_id}", response_model=UserRead)
async def get_user(user_id: int, db: Session = Depends(get_db)):
    user = user_service.get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# POST /api/v1/users — Crear
@router.post("/", response_model=UserRead, status_code=status.HTTP_201_CREATED)
async def create_user(user_in: UserCreate, db: Session = Depends(get_db)):
    return user_service.create_user(db, user_in)

# PATCH /api/v1/users/{user_id} — Actualizar parcial
@router.patch("/{user_id}", response_model=UserRead)
async def update_user(
    user_id: int,
    user_in: UserUpdate,
    db: Session = Depends(get_db),
):
    return user_service.update_user(db, user_id, user_in)

# DELETE /api/v1/users/{user_id}
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: Session = Depends(get_db)):
    user_service.delete_user(db, user_id)
MetodoEndpointDescripcionStatus Code
GET/usersListar usuarios con paginacion200
GET/users/{id}Obtener usuario por ID200 / 404
POST/usersCrear nuevo usuario201
PATCH/users/{id}Actualizacion parcial200 / 404
DELETE/users/{id}Eliminar usuario204 / 404

Pydantic Schemas

Los schemas definen la forma de los datos de entrada y salida. Pydantic v2 usa model_validator y field_validator para validaciones complejas. Separamos en Create, Read y Update para controlar que campos se exponen en cada operacion.

python
from pydantic import BaseModel, EmailStr, field_validator, ConfigDict
from datetime import datetime
from typing import Optional

class UserBase(BaseModel):
    """Campos compartidos entre Create y Read."""
    email: EmailStr
    full_name: str
    role: str = "viewer"

    @field_validator("role")
    @classmethod
    def validate_role(cls, v: str) -> str:
        allowed = {"admin", "editor", "viewer"}
        if v not in allowed:
            raise ValueError(f"Role must be one of {allowed}")
        return v

class UserCreate(UserBase):
    """Schema para POST /users — incluye password."""
    password: str

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError("Password must be >= 8 characters")
        return v

class UserRead(UserBase):
    """Schema para respuestas — nunca expone password."""
    id: int
    created_at: datetime
    model_config = ConfigDict(from_attributes=True)

class UserUpdate(BaseModel):
    """Schema para PATCH — todos opcionales."""
    email: Optional[EmailStr] = None
    full_name: Optional[str] = None
    role: Optional[str] = None

from_attributes=True

Con from_attributes=True (antes orm_mode), Pydantic puede leer directamente desde objetos SQLAlchemy sin necesidad de convertir a dict.

CRUD Operations

La capa de servicios encapsula la logica de negocio y las operaciones de base de datos. Los routers solo se encargan de HTTP; la logica real vive en services/.

python
# app/services/user_service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException
from passlib.context import CryptContext

from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate

pwd_context = CryptContext(schemes=["bcrypt"])

def get_users(db: Session, skip: int = 0, limit: int = 25):
    return db.query(User).offset(skip).limit(limit).all()

def get_user(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def create_user(db: Session, user_in: UserCreate):
    # Verificar email unico
    existing = db.query(User).filter(User.email == user_in.email).first()
    if existing:
        raise HTTPException(status_code=409, detail="Email already registered")

    hashed = pwd_context.hash(user_in.password)
    user = User(
        email=user_in.email,
        full_name=user_in.full_name,
        role=user_in.role,
        hashed_password=hashed,
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user

def update_user(db: Session, user_id: int, user_in: UserUpdate):
    user = get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    update_data = user_in.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(user, field, value)

    db.commit()
    db.refresh(user)
    return user

def delete_user(db: Session, user_id: int):
    user = get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    db.delete(user)
    db.commit()

exclude_unset vs exclude_none

Usa exclude_unset=True en PATCH, no exclude_none. Asi distingues entre “el campo no fue enviado” y “el campo fue enviado como null”.

Auth & Middleware

La autenticacion usa JWT (JSON Web Tokens) con la libreria python-jose. El token se envia en el header Authorization: Bearer <token>.

python
# app/middleware/auth.py
from datetime import datetime, timedelta
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

from app.config import settings

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    return user_id

# Usar en endpoints protegidos:
@router.get("/me")
async def read_profile(user_id: str = Depends(get_current_user)):
    return {"user_id": user_id}

Flujo de autenticacion

  1. El cliente envia credenciales a POST /auth/login
  2. El servidor valida y retorna un JWT
  3. El cliente incluye el JWT en cada peticion subsecuente
  4. get_current_user valida el token automaticamente via Depends

Error Handling

FastAPI permite registrar exception handlers globales que capturan errores y los transforman en respuestas JSON consistentes. Esto asegura que el cliente siempre reciba un formato predecible.

python
# app/main.py — Exception handlers
from fastapi import Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError

# Error de validacion personalizado
@app.exception_handler(ValidationError)
async def validation_handler(request: Request, exc: ValidationError):
    return JSONResponse(
        status_code=422,
        content={
            "error": "validation_error",
            "detail": exc.errors(),
            "body": exc.body if hasattr(exc, "body") else None,
        },
    )

# Error generico (500)
@app.exception_handler(Exception)
async def generic_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={
            "error": "internal_server_error",
            "detail": "An unexpected error occurred",
        },
    )

Formato estandar de error

json
{
  "error": "not_found",
  "detail": "User with id 42 not found",
  "status_code": 404,
  "timestamp": "2025-01-15T10:30:00Z"
}

OpenAPI automatico

FastAPI genera documentacion Swagger/OpenAPI en /docs automaticamente. Cada endpoint, schema y error handler se refleja sin configuracion adicional.

Background Tasks Async

En aplicaciones reales, muchas operaciones no necesitan completarse antes de responder al cliente: enviar emails, generar reportes, procesar imagenes o sincronizar datos con servicios externos. FastAPI ofrece BackgroundTasks para tareas ligeras y se integra con Celery para trabajos pesados y distribuidos.

BackgroundTasks nativo de FastAPI

BackgroundTasks ejecuta funciones despues de enviar la respuesta HTTP. Es ideal para tareas rapidas que no requieren reintentos ni persistencia.

python
# app/routers/notifications.py
from fastapi import APIRouter, BackgroundTasks, Depends
from sqlalchemy.orm import Session

from app.database import get_db
from app.services.email_service import send_welcome_email
from app.services.audit_service import log_user_action

router = APIRouter(prefix="/notifications")

def send_email_task(email: str, subject: str, body: str):
    """Tarea en background para enviar email."""
    # Esta funcion se ejecuta DESPUES de enviar la respuesta
    import smtplib
    from email.message import EmailMessage

    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"] = "noreply@cookbooks.dev"
    msg["To"] = email
    msg.set_content(body)

    with smtplib.SMTP("smtp.example.com", 587) as server:
        server.starttls()
        server.login("user", "password")
        server.send_message(msg)

@router.post("/welcome")
async def send_welcome(
    user_email: str,
    background_tasks: BackgroundTasks,
):
    # Encolar multiples tareas en background
    background_tasks.add_task(
        send_email_task,
        email=user_email,
        subject="Bienvenido a Cookbooks",
        body="Gracias por registrarte en nuestra plataforma.",
    )
    background_tasks.add_task(
        log_user_action,
        action="welcome_email_sent",
        email=user_email,
    )

    # La respuesta se envia INMEDIATAMENTE
    return {"message": "Welcome email queued", "email": user_email}

Integracion con Celery para trabajos pesados

Cuando las tareas requieren reintentos, programacion, o ejecucion distribuida, Celery es la solucion estandar. Usa Redis o RabbitMQ como broker de mensajes.

python
# app/worker.py — Configuracion de Celery
from celery import Celery

celery_app = Celery(
    "cookbooks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    task_track_started=True,
    task_acks_late=True,           # ACK despues de ejecutar (mas seguro)
    worker_prefetch_multiplier=1,  # Una tarea a la vez por worker
)

# --- Definir tareas ---
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def generate_report(self, user_id: int, report_type: str):
    """Genera un reporte PDF pesado."""
    try:
        # Simulacion de tarea pesada (5-30 segundos)
        from app.services.report_service import build_report
        result = build_report(user_id, report_type)
        return {"status": "completed", "file_url": result.url}
    except Exception as exc:
        # Reintentar con backoff exponencial
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@celery_app.task
def cleanup_temp_files():
    """Tarea programada para limpiar archivos temporales."""
    import glob, os
    for f in glob.glob("/tmp/cookbooks_*.tmp"):
        os.remove(f)

Patron de polling para estado de tareas

El cliente envia una tarea, recibe un task_id, y consulta periodicamente el estado. Este patron es comun para operaciones que tardan mas de unos segundos.

python
# app/routers/reports.py
from fastapi import APIRouter, HTTPException
from celery.result import AsyncResult

from app.worker import celery_app, generate_report

router = APIRouter(prefix="/reports")

@router.post("/", status_code=202)
async def request_report(user_id: int, report_type: str = "monthly"):
    """Encola la generacion de un reporte. Retorna task_id."""
    task = generate_report.delay(user_id, report_type)
    return {
        "task_id": task.id,
        "status": "queued",
        "poll_url": f"/api/v1/reports/status/{task.id}",
    }

@router.get("/status/{task_id}")
async def get_report_status(task_id: str):
    """Consultar el estado de una tarea de Celery."""
    result = AsyncResult(task_id, app=celery_app)

    if result.state == "PENDING":
        return {"task_id": task_id, "status": "pending"}
    elif result.state == "STARTED":
        return {"task_id": task_id, "status": "processing"}
    elif result.state == "SUCCESS":
        return {
            "task_id": task_id,
            "status": "completed",
            "result": result.result,
        }
    elif result.state == "FAILURE":
        return {
            "task_id": task_id,
            "status": "failed",
            "error": str(result.info),
        }
    else:
        return {"task_id": task_id, "status": result.state.lower()}
CaracteristicaBackgroundTasksCelery
ComplejidadMinima, incluido en FastAPIRequiere broker (Redis/RabbitMQ)
ReintentosNo soportadoReintentos con backoff configurable
PersistenciaNo, se pierde si el proceso muereSi, las tareas persisten en el broker
MonitoreoNo incluidoFlower, resultado por task_id
Uso idealEmails, logs, notificaciones rapidasReportes, procesamiento de archivos, ETL

Cuando usar cada opcion

Usa BackgroundTasks para operaciones que tardan menos de 5 segundos y no necesitan reintentos (emails, logs, webhooks). Usa Celery cuando necesites reintentos, programacion (crontab), o ejecucion en workers separados del proceso principal.

File Upload & Download I/O

Gestionar archivos es una necesidad comun en APIs REST: subir imagenes de perfil, adjuntar documentos, exportar reportes CSV o servir archivos generados. FastAPI proporciona UploadFile para uploads y StreamingResponse / FileResponse para descargas eficientes.

Subida de archivos con validacion

UploadFile usa un buffer en memoria para archivos pequenos y cambia automaticamente a disco temporal para archivos grandes. Siempre se debe validar el tipo MIME y el tamano antes de procesar.

python
# app/routers/files.py
import os
import uuid
import aiofiles
from fastapi import APIRouter, UploadFile, File, HTTPException, Depends
from fastapi.responses import FileResponse, StreamingResponse

from app.middleware.auth import get_current_user

router = APIRouter(prefix="/files")

# Configuracion
UPLOAD_DIR = "uploads"
MAX_FILE_SIZE = 10 * 1024 * 1024  # 10 MB
ALLOWED_MIME_TYPES = {
    "image/jpeg", "image/png", "image/webp",
    "application/pdf",
    "text/csv",
}

os.makedirs(UPLOAD_DIR, exist_ok=True)

async def validate_file(file: UploadFile) -> UploadFile:
    """Valida tipo MIME y tamano del archivo."""
    # Validar tipo MIME
    if file.content_type not in ALLOWED_MIME_TYPES:
        raise HTTPException(
            status_code=415,
            detail=f"Tipo no soportado: {file.content_type}. "
                   f"Permitidos: {', '.join(ALLOWED_MIME_TYPES)}",
        )

    # Validar tamano leyendo en chunks
    size = 0
    while chunk := await file.read(8192):
        size += len(chunk)
        if size > MAX_FILE_SIZE:
            raise HTTPException(
                status_code=413,
                detail=f"Archivo excede el limite de "
                       f"{MAX_FILE_SIZE // (1024*1024)} MB",
            )

    # Rebobinar el archivo para que pueda leerse de nuevo
    await file.seek(0)
    return file

@router.post("/upload")
async def upload_file(
    file: UploadFile = File(..., description="Archivo a subir (max 10 MB)"),
    user_id: str = Depends(get_current_user),
):
    """Sube un archivo con validacion de tipo y tamano."""
    await validate_file(file)

    # Generar nombre unico para evitar colisiones
    ext = os.path.splitext(file.filename)[1]
    unique_name = f"{uuid.uuid4().hex}{ext}"
    file_path = os.path.join(UPLOAD_DIR, unique_name)

    # Escribir archivo de forma asincrona
    async with aiofiles.open(file_path, "wb") as out:
        while chunk := await file.read(8192):
            await out.write(chunk)

    return {
        "filename": unique_name,
        "original_name": file.filename,
        "size_bytes": os.path.getsize(file_path),
        "content_type": file.content_type,
        "download_url": f"/api/v1/files/download/{unique_name}",
    }

Subida de multiples archivos

python
@router.post("/upload-multiple")
async def upload_multiple(
    files: list[UploadFile] = File(
        ..., description="Hasta 5 archivos simultaneos"
    ),
):
    """Sube multiples archivos en una sola peticion."""
    if len(files) > 5:
        raise HTTPException(
            status_code=400,
            detail="Maximo 5 archivos por peticion",
        )

    results = []
    for file in files:
        await validate_file(file)
        ext = os.path.splitext(file.filename)[1]
        unique_name = f"{uuid.uuid4().hex}{ext}"
        file_path = os.path.join(UPLOAD_DIR, unique_name)

        async with aiofiles.open(file_path, "wb") as out:
            while chunk := await file.read(8192):
                await out.write(chunk)

        results.append({
            "filename": unique_name,
            "original_name": file.filename,
            "size_bytes": os.path.getsize(file_path),
        })

    return {"uploaded": len(results), "files": results}

Descarga directa y streaming de archivos grandes

python
@router.get("/download/{filename}")
async def download_file(filename: str):
    """Descarga directa de un archivo."""
    file_path = os.path.join(UPLOAD_DIR, filename)

    if not os.path.isfile(file_path):
        raise HTTPException(status_code=404, detail="Archivo no encontrado")

    return FileResponse(
        path=file_path,
        filename=filename,
        media_type="application/octet-stream",
    )

@router.get("/stream/{filename}")
async def stream_large_file(filename: str):
    """Streaming para archivos grandes (CSV, logs, exports)."""
    file_path = os.path.join(UPLOAD_DIR, filename)

    if not os.path.isfile(file_path):
        raise HTTPException(status_code=404, detail="Archivo no encontrado")

    async def file_iterator():
        async with aiofiles.open(file_path, "rb") as f:
            while chunk := await f.read(64 * 1024):  # 64 KB chunks
                yield chunk

    file_size = os.path.getsize(file_path)
    return StreamingResponse(
        file_iterator(),
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f'attachment; filename="{filename}"',
            "Content-Length": str(file_size),
        },
    )

Generacion y descarga de CSV en streaming

python
import csv
import io
from fastapi.responses import StreamingResponse

@router.get("/export/users")
async def export_users_csv(db: Session = Depends(get_db)):
    """Exporta todos los usuarios como CSV en streaming."""
    users = db.query(User).all()

    def csv_generator():
        output = io.StringIO()
        writer = csv.writer(output)
        writer.writerow(["id", "email", "full_name", "role", "created_at"])
        yield output.getvalue()
        output.seek(0)
        output.truncate(0)

        for user in users:
            writer.writerow([
                user.id, user.email, user.full_name,
                user.role, user.created_at.isoformat(),
            ])
            yield output.getvalue()
            output.seek(0)
            output.truncate(0)

    return StreamingResponse(
        csv_generator(),
        media_type="text/csv",
        headers={
            "Content-Disposition": 'attachment; filename="users_export.csv"'
        },
    )
MetodoEndpointDescripcionStatus Code
POST/files/uploadSubir un archivo (max 10 MB)200 / 413 / 415
POST/files/upload-multipleSubir hasta 5 archivos200 / 400
GET/files/download/{filename}Descarga directa200 / 404
GET/files/stream/{filename}Streaming para archivos grandes200 / 404
GET/files/export/usersExportar usuarios como CSV200

Seguridad en uploads

Nunca confies solo en la extension del archivo. Valida siempre el content_type, limita el tamano maximo y genera nombres unicos con uuid para evitar colisiones y ataques de path traversal. En produccion, considera almacenar archivos en S3 o GCS en lugar del sistema de archivos local.

END OF DOCUMENT

¿Necesitas más? Volver a la Librería →