Docs / Backend / Python Backend

Python Backend

GuΓ­a completa de desarrollo de backend en Python. Cubre la estructura del proyecto, SQLAlchemy + Alembic para base de datos, patrones async, testing con Pytest y despliegue en producciΓ³n con Gunicorn + Docker.

Project Structure

Una arquitectura por capas (layered architecture) separa las responsabilidades: los routers manejan HTTP, los services contienen lΓ³gica de negocio, y los repositories encapsulan el acceso a datos.

text
Arquitectura por capas

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚               HTTP Layer (Routers)                  β”‚
β”‚         ValidaciΓ³n, serializaciΓ³n, status codes      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚ calls
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Business Layer (Services)               β”‚
β”‚        LΓ³gica de negocio, reglas, validaciones        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚ calls
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚             Data Layer (Repositories)               β”‚
β”‚          SQLAlchemy queries, ORM mapping              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Database                            β”‚
β”‚            PostgreSQL / SQLite                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
text
Estructura de archivos recomendada

backend/
β”œβ”€β”€ app/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ main.py                # FastAPI app factory
β”‚   β”œβ”€β”€ config.py              # pydantic-settings
β”‚   β”œβ”€β”€ database.py            # Engine, session, Base
β”‚   β”œβ”€β”€ dependencies.py        # Shared Depends()
β”‚   β”œβ”€β”€ models/                # SQLAlchemy models
β”‚   β”œβ”€β”€ schemas/               # Pydantic schemas
β”‚   β”œβ”€β”€ routers/               # API endpoints
β”‚   β”œβ”€β”€ services/              # Business logic
β”‚   β”œβ”€β”€ repositories/          # Database queries
β”‚   └── utils/                 # Helpers, decorators
β”œβ”€β”€ alembic/                   # Migrations
β”‚   β”œβ”€β”€ env.py
β”‚   └── versions/
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ conftest.py            # Fixtures (test DB, client)
β”‚   β”œβ”€β”€ unit/
β”‚   └── integration/
β”œβ”€β”€ alembic.ini
β”œβ”€β”€ pyproject.toml
└── Dockerfile

ConfiguraciΓ³n con pydantic-settings

python
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    """Variables de entorno con validaciΓ³n y defaults."""

    # Database
    DATABASE_URL: str = "sqlite:///./dev.db"
    DB_ECHO: bool = False

    # Auth
    SECRET_KEY: str = "change-me-in-production"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

    # Server
    ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
    DEBUG: bool = False

    model_config = SettingsConfigDict(env_file=".env")

settings = Settings()

Database & SQLAlchemy

SQLAlchemy 2.0 usa un estilo declarativo con type annotations y sessions async-ready. Los modelos se mantienen simples y delegamos las queries a la capa de repositories.

python
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

from app.config import settings

engine = create_engine(settings.DATABASE_URL, echo=settings.DB_ECHO)
SessionLocal = sessionmaker(bind=engine, autoflush=False)

class Base(DeclarativeBase):
    pass

# Dependency injection para FastAPI
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Modelo SQLAlchemy 2.0

python
# app/models/user.py
from datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column

from app.database import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(
        String(255), unique=True, index=True
    )
    hashed_password: Mapped[str] = mapped_column(String(255))
    full_name: Mapped[str] = mapped_column(String(100))
    is_active: Mapped[bool] = mapped_column(
        Boolean, default=True
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

    def __repr__(self) -> str:
        return f"<User id={self.id} email={self.email}>"

Migrations (Alembic)

Alembic gestiona las migraciones de esquema de la base de datos de forma versionada. Cada migraciΓ³n es un script Python que puede aplicarse o revertirse.

bash
# Inicializar Alembic (solo una vez)
alembic init alembic

# Crear nueva migraciΓ³n auto-generada
alembic revision --autogenerate -m "add users table"

# Aplicar migraciones pendientes
alembic upgrade head

# Revertir ΓΊltima migraciΓ³n
alembic downgrade -1

# Ver historial de migraciones
alembic history --verbose

# Ver migraciΓ³n actual
alembic current
python
# alembic/env.py β€” ConfiguraciΓ³n clave
from app.database import Base
from app.config import settings

# IMPORTANTE: importar TODOS los models para que
# autogenerate detecte los cambios
from app.models import user, project  # noqa

config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
target_metadata = Base.metadata

No olvides importar los models

Si no importas un model en alembic/env.py, Alembic no detectarΓ‘ los cambios en ese modelo y no generarΓ‘ las migraciones correspondientes.

Async Patterns

Python async/await permite manejar miles de conexiones concurrentes con un solo hilo. FastAPI soporta endpoints async def nativamente con asyncio.

python
# Async database session con SQLAlchemy
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)

async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/mydb",
    pool_size=20,
    max_overflow=10,
)

AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

async def get_async_db():
    async with AsyncSessionLocal() as session:
        yield session

# --- Endpoint async ---
@router.get("/users")
async def list_users(db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User).limit(25))
    return result.scalars().all()

# --- Tareas concurrentes ---
import asyncio

async def fetch_all_data():
    # Ejecutar 3 queries en paralelo
    users, projects, stats = await asyncio.gather(
        get_users(),
        get_projects(),
        get_stats(),
    )
    return {"users": users, "projects": projects, "stats": stats}

asyncpg vs psycopg

Para PostgreSQL async, usa asyncpg (driver nativo, mΓ‘s rΓ‘pido) o psycopg[binary] (versiΓ³n 3 con soporte async). Ambos funcionan con SQLAlchemy async.

Testing (Pytest)

Pytest es el estΓ‘ndar para testing en Python. La clave es usar fixtures para manejar el estado de prueba (base de datos de test, cliente HTTP, etc.).

python
# tests/conftest.py β€” Fixtures compartidas
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.main import app
from app.database import Base, get_db

TEST_DB = "sqlite:///./test.db"
engine = create_engine(TEST_DB)
TestSession = sessionmaker(bind=engine)

@pytest.fixture(autouse=True)
def setup_database():
    """Crea las tablas antes de cada test, las borra despuΓ©s."""
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)

@pytest.fixture
def db():
    session = TestSession()
    try:
        yield session
    finally:
        session.close()

@pytest.fixture
def client(db):
    def override_db():
        yield db
    app.dependency_overrides[get_db] = override_db
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()

# --- Test ---
def test_create_user(client):
    response = client.post("/api/v1/users", json={
        "email": "test@example.com",
        "full_name": "Test User",
        "password": "securepass123",
    })
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert "password" not in data  # Nunca exponer password
bash
# Ejecutar tests
pytest tests/ -v --cov=app --cov-report=term-missing

# Solo tests unitarios
pytest tests/unit/ -v

# Con output detallado en caso de fallo
pytest tests/ -v --tb=short -x  # -x para parar al primer fallo

Production Deployment

En producciΓ³n, usamos Gunicorn como process manager con workers uvicorn para manejar mΓΊltiples peticiones concurrentes.

bash
# Comando de producciΓ³n
gunicorn app.main:app \
  --workers 4 \
  --worker-class uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --access-logfile - \
  --error-logfile -

# Regla: workers = (2 Γ— CPU cores) + 1
# Para 2 cores β†’ 5 workers

Checklist de producciΓ³n

  1. DEBUG=False en las variables de entorno
  2. SECRET_KEY generado de forma segura (>=32 chars)
  3. Base de datos PostgreSQL (no SQLite)
  4. Migraciones ejecutadas (alembic upgrade head)
  5. CORS configurado con orΓ­genes especΓ­ficos
  6. Health check endpoint en /health
  7. Logging estructurado (JSON) para sistemas de observabilidad

Caching con Redis Performance

Redis es la solucion mas popular como capa de cache en aplicaciones Python. Reduce la carga sobre la base de datos almacenando resultados de consultas frecuentes en memoria, con tiempos de acceso inferiores a 1 ms. La libreria redis-py ofrece soporte sincrono y asincrono.

Instalacion y conexion basica

bash
# Instalar redis-py con soporte async
pip install redis[hiredis]

# Verificar que Redis esta corriendo
redis-cli ping  # Debe responder: PONG
python
# app/cache.py β€” Conexion a Redis
import redis.asyncio as redis
from app.config import settings

# Pool de conexiones async (reutilizable en toda la app)
redis_pool = redis.ConnectionPool.from_url(
    settings.REDIS_URL,  # "redis://localhost:6379/0"
    max_connections=20,
    decode_responses=True,
)

async def get_redis() -> redis.Redis:
    """Dependency injection para FastAPI."""
    client = redis.Redis(connection_pool=redis_pool)
    try:
        yield client
    finally:
        await client.aclose()

Patron de cache con decorador

Un decorador reutilizable que cachea automaticamente el resultado de cualquier funcion async. Soporta TTL configurable y generacion automatica de claves.

python
# app/utils/cache_decorator.py
import json
import functools
import hashlib
from typing import Optional

import redis.asyncio as redis

from app.cache import redis_pool

def cached(prefix: str, ttl: int = 300, key_builder=None):
    """
    Decorador de cache para funciones async.

    Args:
        prefix: Prefijo para la clave en Redis (ej: "users", "products")
        ttl: Tiempo de vida en segundos (default: 5 minutos)
        key_builder: Funcion custom para generar la clave
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Generar clave unica basada en argumentos
            if key_builder:
                cache_key = key_builder(*args, **kwargs)
            else:
                raw = f"{prefix}:{args}:{sorted(kwargs.items())}"
                hashed = hashlib.md5(raw.encode()).hexdigest()
                cache_key = f"cache:{prefix}:{hashed}"

            client = redis.Redis(connection_pool=redis_pool)
            try:
                # Intentar leer del cache
                cached_value = await client.get(cache_key)
                if cached_value is not None:
                    return json.loads(cached_value)

                # Cache miss: ejecutar funcion original
                result = await func(*args, **kwargs)

                # Guardar resultado en cache
                await client.setex(
                    cache_key,
                    ttl,
                    json.dumps(result, default=str),
                )
                return result
            finally:
                await client.aclose()

        # Metodo para invalidar cache manualmente
        wrapper.invalidate = lambda key: _invalidate(prefix, key)
        return wrapper
    return decorator

async def _invalidate(prefix: str, key: str):
    """Invalida una clave especifica del cache."""
    client = redis.Redis(connection_pool=redis_pool)
    try:
        await client.delete(f"cache:{prefix}:{key}")
    finally:
        await client.aclose()

Uso del decorador en servicios

python
# app/services/user_service.py
from app.utils.cache_decorator import cached

@cached(prefix="users", ttl=600)
async def get_user_profile(user_id: int) -> dict:
    """Obtiene el perfil completo de un usuario (cacheado 10 min)."""
    user = await db.execute(
        select(User).where(User.id == user_id)
    )
    row = user.scalar_one_or_none()
    if not row:
        return None
    return {
        "id": row.id,
        "email": row.email,
        "full_name": row.full_name,
        "role": row.role,
    }

@cached(prefix="stats", ttl=120)
async def get_dashboard_stats() -> dict:
    """Estadisticas del dashboard (cacheadas 2 min)."""
    total_users = await db.scalar(select(func.count(User.id)))
    active_users = await db.scalar(
        select(func.count(User.id)).where(User.is_active == True)
    )
    return {
        "total_users": total_users,
        "active_users": active_users,
    }

Cache como dependencia de FastAPI

python
# app/routers/users.py
from fastapi import APIRouter, Depends
import redis.asyncio as redis

from app.cache import get_redis
from app.schemas.user import UserRead

router = APIRouter(prefix="/users")

@router.get("/{user_id}", response_model=UserRead)
async def get_user(
    user_id: int,
    cache: redis.Redis = Depends(get_redis),
    db: AsyncSession = Depends(get_async_db),
):
    # 1. Intentar cache primero
    cache_key = f"user:{user_id}"
    cached = await cache.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. Cache miss β†’ consultar DB
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # 3. Guardar en cache con TTL de 5 minutos
    user_data = UserRead.model_validate(user).model_dump()
    await cache.setex(cache_key, 300, json.dumps(user_data, default=str))

    return user_data

@router.patch("/{user_id}", response_model=UserRead)
async def update_user(
    user_id: int,
    user_in: UserUpdate,
    cache: redis.Redis = Depends(get_redis),
    db: AsyncSession = Depends(get_async_db),
):
    # ... actualizar en DB ...

    # Invalidar cache despues de modificar
    await cache.delete(f"user:{user_id}")
    await cache.delete("cache:stats:*")  # Invalidar stats tambien

    return updated_user

Estrategias de invalidacion de cache

EstrategiaDescripcionCaso de usoComplejidad
TTL (Time to Live)La clave expira automaticamente tras N segundosDatos que toleran ligera desactualizacion (stats, catalogo)Baja
Write-throughAl escribir en DB, se actualiza tambien el cacheDatos que deben estar siempre frescos (perfil de usuario)Media
Write-behindSe escribe primero al cache y luego async a la DBContadores, metricas de alta frecuenciaAlta
Invalidacion por patronBorrar todas las claves que matchean un patron con SCANInvalidar todo el cache de un recurso tras un cambio masivoMedia
Event-drivenPub/Sub de Redis notifica cambios y los listeners invalidanSistemas distribuidos con multiples instanciasAlta

TTL recomendados por tipo de dato

Datos de configuracion: 3600s (1 hora). Listados paginados: 60-120s. Perfil de usuario: 300-600s. Estadisticas de dashboard: 30-120s. Datos en tiempo real: no cachear o TTL de 5-10s.

Evitar cache stampede

Cuando muchas peticiones llegan al mismo tiempo y el cache acaba de expirar, todas golpean la DB simultaneamente. Soluciones: usar lock distribuido (solo un proceso reconstruye el cache), TTL con jitter (agregar segundos aleatorios al TTL), o refresh anticipado (renovar el cache antes de que expire).

Structured Logging Observability

El logging estructurado en formato JSON es esencial para sistemas en produccion. Permite buscar, filtrar y agregar logs de forma eficiente en herramientas como ELK Stack (Elasticsearch + Logstash + Kibana), Grafana Loki o Datadog. La libreria structlog es el estandar en Python para logging estructurado.

Instalacion

bash
pip install structlog python-json-logger

Configuracion de structlog

python
# app/logging_config.py
import logging
import structlog
from app.config import settings

def setup_logging():
    """Configura structlog para toda la aplicacion."""

    # Procesadores que transforman cada entrada de log
    shared_processors = [
        structlog.contextvars.merge_contextvars,     # Variables de contexto (request_id)
        structlog.stdlib.add_log_level,              # Agrega campo "level"
        structlog.stdlib.add_logger_name,            # Agrega campo "logger"
        structlog.processors.TimeStamper(fmt="iso"), # Timestamp ISO 8601
        structlog.processors.StackInfoRenderer(),    # Stack trace si aplica
        structlog.processors.UnicodeDecoder(),       # Decodificar unicode
    ]

    if settings.DEBUG:
        # En desarrollo: output bonito y colorido en consola
        structlog.configure(
            processors=shared_processors + [
                structlog.dev.ConsoleRenderer(colors=True),
            ],
            wrapper_class=structlog.stdlib.BoundLogger,
            context_class=dict,
            logger_factory=structlog.PrintLoggerFactory(),
            cache_logger_on_first_use=True,
        )
    else:
        # En produccion: JSON puro para sistemas de observabilidad
        structlog.configure(
            processors=shared_processors + [
                structlog.processors.format_exc_info,
                structlog.processors.JSONRenderer(),
            ],
            wrapper_class=structlog.stdlib.BoundLogger,
            context_class=dict,
            logger_factory=structlog.stdlib.LoggerFactory(),
            cache_logger_on_first_use=True,
        )

    # Configurar logging estandar de Python para librerias de terceros
    logging.basicConfig(
        format="%(message)s",
        level=logging.INFO if not settings.DEBUG else logging.DEBUG,
    )

Middleware de Request ID

Cada peticion HTTP recibe un identificador unico que se propaga a traves de todos los logs generados durante esa peticion. Esto permite rastrear el flujo completo de una request en los logs.

python
# app/middleware/logging_middleware.py
import uuid
import time
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

logger = structlog.get_logger()

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """Middleware que asigna request_id y registra cada peticion."""

    async def dispatch(self, request: Request, call_next) -> Response:
        # Generar o extraer request_id del header
        request_id = request.headers.get(
            "X-Request-ID", str(uuid.uuid4())
        )

        # Bind de variables al contexto de structlog
        # Todos los logs dentro de esta request incluiran estos campos
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_ip=request.client.host if request.client else "unknown",
        )

        start_time = time.perf_counter()

        logger.info(
            "request_started",
            query_params=str(request.query_params),
            user_agent=request.headers.get("user-agent", ""),
        )

        try:
            response = await call_next(request)
            duration_ms = (time.perf_counter() - start_time) * 1000

            logger.info(
                "request_completed",
                status_code=response.status_code,
                duration_ms=round(duration_ms, 2),
            )

            # Propagar request_id en la respuesta
            response.headers["X-Request-ID"] = request_id
            return response

        except Exception as exc:
            duration_ms = (time.perf_counter() - start_time) * 1000
            logger.error(
                "request_failed",
                error=str(exc),
                error_type=type(exc).__name__,
                duration_ms=round(duration_ms, 2),
                exc_info=True,
            )
            raise

Registrar el middleware en la aplicacion

python
# app/main.py
from app.logging_config import setup_logging
from app.middleware.logging_middleware import RequestLoggingMiddleware

# Configurar logging ANTES de crear la app
setup_logging()

app = FastAPI(title="Cookbooks API")
app.add_middleware(RequestLoggingMiddleware)

Uso en servicios y repositorios

python
# app/services/user_service.py
import structlog

logger = structlog.get_logger()

async def create_user(db: AsyncSession, user_in: UserCreate) -> User:
    logger.info("creating_user", email=user_in.email, role=user_in.role)

    existing = await db.execute(
        select(User).where(User.email == user_in.email)
    )
    if existing.scalar_one_or_none():
        logger.warning("user_email_conflict", email=user_in.email)
        raise HTTPException(status_code=409, detail="Email already registered")

    user = User(**user_in.model_dump())
    db.add(user)
    await db.commit()

    logger.info("user_created", user_id=user.id, email=user.email)
    return user

Ejemplo de output JSON en produccion

json
{
  "event": "request_completed",
  "level": "info",
  "logger": "app.middleware.logging_middleware",
  "timestamp": "2025-06-15T14:23:01.456789Z",
  "request_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "method": "GET",
  "path": "/api/v1/users/42",
  "client_ip": "192.168.1.100",
  "status_code": 200,
  "duration_ms": 12.45
}

Niveles de log y cuando usarlos

NivelUsoEjemplosVisible en produccion
DEBUGInformacion detallada para diagnostico durante desarrolloQueries SQL, payloads completos, valores de variablesNo (solo en desarrollo)
INFOEventos normales del flujo de la aplicacionRequest completada, usuario creado, cache hit/missSi
WARNINGSituacion inesperada que no impide la operacionRate limit cercano, cache miss frecuente, retry de conexionSi
ERRORError que impide completar una operacion especificaFallo en DB, servicio externo caido, validacion inesperadaSi + alerta
CRITICALError grave que puede comprometer toda la aplicacionPool de DB agotado, disco lleno, error de configuracion fatalSi + alerta inmediata

Integracion con ELK Stack y Grafana Loki

yaml
# docker-compose.logging.yml β€” Stack de observabilidad
version: "3.8"
services:
  # --- Grafana Loki (alternativa ligera a ELK) ---
  loki:
    image: grafana/loki:2.9.0
    ports:
      - "3100:3100"
    volumes:
      - loki-data:/loki

  # --- Promtail (recolector de logs para Loki) ---
  promtail:
    image: grafana/promtail:2.9.0
    volumes:
      - /var/log:/var/log
      - ./promtail-config.yml:/etc/promtail/config.yml
    command: -config.file=/etc/promtail/config.yml

  # --- Grafana (visualizacion) ---
  grafana:
    image: grafana/grafana:10.0.0
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

volumes:
  loki-data:
yaml
# promtail-config.yml
server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: fastapi
    docker_sd_configs:
      - host: unix:///var/run/docker.sock
    relabel_configs:
      - source_labels: ['__meta_docker_container_name']
        target_label: 'container'
    pipeline_stages:
      - json:
          expressions:
            level: level
            request_id: request_id
            event: event
      - labels:
          level:
          event:

Correlacion de logs distribuidos

El campo request_id es la clave para rastrear una peticion a traves de multiples servicios. Propaga el header X-Request-ID en llamadas entre microservicios para mantener la trazabilidad completa. En Grafana, filtra por request_id para ver todos los logs de una peticion.

Rendimiento del logging

structlog con cache_logger_on_first_use=True evita reconstruir el pipeline de procesadores en cada llamada. En produccion, usa nivel INFO como minimo para evitar el overhead de mensajes DEBUG. El formato JSON agrega ~5% de overhead comparado con texto plano, pero la capacidad de busqueda lo compensa.

END OF DOCUMENT

ΒΏNecesitas mΓ‘s? Volver a la LibrerΓ­a →