Python Backend
GuΓa completa de desarrollo de backend en Python. Cubre la estructura del proyecto, SQLAlchemy + Alembic para base de datos, patrones async, testing con Pytest y despliegue en producciΓ³n con Gunicorn + Docker.
Project Structure
Una arquitectura por capas (layered architecture) separa las responsabilidades: los routers manejan HTTP, los services contienen lΓ³gica de negocio, y los repositories encapsulan el acceso a datos.
Arquitectura por capas
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HTTP Layer (Routers) β
β ValidaciΓ³n, serializaciΓ³n, status codes β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β calls
ββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββ
β Business Layer (Services) β
β LΓ³gica de negocio, reglas, validaciones β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β calls
ββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββ
β Data Layer (Repositories) β
β SQLAlchemy queries, ORM mapping β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββ
β Database β
β PostgreSQL / SQLite β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ Estructura de archivos recomendada
backend/
βββ app/
β βββ __init__.py
β βββ main.py # FastAPI app factory
β βββ config.py # pydantic-settings
β βββ database.py # Engine, session, Base
β βββ dependencies.py # Shared Depends()
β βββ models/ # SQLAlchemy models
β βββ schemas/ # Pydantic schemas
β βββ routers/ # API endpoints
β βββ services/ # Business logic
β βββ repositories/ # Database queries
β βββ utils/ # Helpers, decorators
βββ alembic/ # Migrations
β βββ env.py
β βββ versions/
βββ tests/
β βββ conftest.py # Fixtures (test DB, client)
β βββ unit/
β βββ integration/
βββ alembic.ini
βββ pyproject.toml
βββ Dockerfile ConfiguraciΓ³n con pydantic-settings
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Variables de entorno con validaciΓ³n y defaults."""
# Database
DATABASE_URL: str = "sqlite:///./dev.db"
DB_ECHO: bool = False
# Auth
SECRET_KEY: str = "change-me-in-production"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Server
ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
DEBUG: bool = False
model_config = SettingsConfigDict(env_file=".env")
settings = Settings() Database & SQLAlchemy
SQLAlchemy 2.0 usa un estilo declarativo con type annotations y sessions async-ready. Los modelos se mantienen simples y delegamos las queries a la capa de repositories.
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import settings
engine = create_engine(settings.DATABASE_URL, echo=settings.DB_ECHO)
SessionLocal = sessionmaker(bind=engine, autoflush=False)
class Base(DeclarativeBase):
pass
# Dependency injection para FastAPI
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close() Modelo SQLAlchemy 2.0
# app/models/user.py
from datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(
String(255), unique=True, index=True
)
hashed_password: Mapped[str] = mapped_column(String(255))
full_name: Mapped[str] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(
Boolean, default=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
def __repr__(self) -> str:
return f"<User id={self.id} email={self.email}>" Migrations (Alembic)
Alembic gestiona las migraciones de esquema de la base de datos de forma versionada. Cada migraciΓ³n es un script Python que puede aplicarse o revertirse.
# Inicializar Alembic (solo una vez)
alembic init alembic
# Crear nueva migraciΓ³n auto-generada
alembic revision --autogenerate -m "add users table"
# Aplicar migraciones pendientes
alembic upgrade head
# Revertir ΓΊltima migraciΓ³n
alembic downgrade -1
# Ver historial de migraciones
alembic history --verbose
# Ver migraciΓ³n actual
alembic current # alembic/env.py β ConfiguraciΓ³n clave
from app.database import Base
from app.config import settings
# IMPORTANTE: importar TODOS los models para que
# autogenerate detecte los cambios
from app.models import user, project # noqa
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
target_metadata = Base.metadata No olvides importar los models
Si no importas un model en alembic/env.py, Alembic no detectarΓ‘
los cambios en ese modelo y no generarΓ‘ las migraciones correspondientes.
Async Patterns
Python async/await permite manejar miles de conexiones concurrentes con un solo hilo. FastAPI soporta endpoints async def nativamente con asyncio.
# Async database session con SQLAlchemy
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=20,
max_overflow=10,
)
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
# --- Endpoint async ---
@router.get("/users")
async def list_users(db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User).limit(25))
return result.scalars().all()
# --- Tareas concurrentes ---
import asyncio
async def fetch_all_data():
# Ejecutar 3 queries en paralelo
users, projects, stats = await asyncio.gather(
get_users(),
get_projects(),
get_stats(),
)
return {"users": users, "projects": projects, "stats": stats} asyncpg vs psycopg
Para PostgreSQL async, usa asyncpg (driver nativo, mΓ‘s rΓ‘pido) o
psycopg[binary] (versiΓ³n 3 con soporte async). Ambos funcionan con
SQLAlchemy async.
Testing (Pytest)
Pytest es el estΓ‘ndar para testing en Python. La clave es usar fixtures para manejar el estado de prueba (base de datos de test, cliente HTTP, etc.).
# tests/conftest.py β Fixtures compartidas
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base, get_db
TEST_DB = "sqlite:///./test.db"
engine = create_engine(TEST_DB)
TestSession = sessionmaker(bind=engine)
@pytest.fixture(autouse=True)
def setup_database():
"""Crea las tablas antes de cada test, las borra despuΓ©s."""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def db():
session = TestSession()
try:
yield session
finally:
session.close()
@pytest.fixture
def client(db):
def override_db():
yield db
app.dependency_overrides[get_db] = override_db
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
# --- Test ---
def test_create_user(client):
response = client.post("/api/v1/users", json={
"email": "test@example.com",
"full_name": "Test User",
"password": "securepass123",
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "password" not in data # Nunca exponer password # Ejecutar tests
pytest tests/ -v --cov=app --cov-report=term-missing
# Solo tests unitarios
pytest tests/unit/ -v
# Con output detallado en caso de fallo
pytest tests/ -v --tb=short -x # -x para parar al primer fallo Production Deployment
En producciΓ³n, usamos Gunicorn como process manager con workers uvicorn para manejar mΓΊltiples peticiones concurrentes.
# Comando de producciΓ³n
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--access-logfile - \
--error-logfile -
# Regla: workers = (2 Γ CPU cores) + 1
# Para 2 cores β 5 workers Checklist de producciΓ³n
DEBUG=Falseen las variables de entornoSECRET_KEYgenerado de forma segura (>=32 chars)- Base de datos PostgreSQL (no SQLite)
- Migraciones ejecutadas (
alembic upgrade head) - CORS configurado con orΓgenes especΓficos
- Health check endpoint en
/health - Logging estructurado (JSON) para sistemas de observabilidad
Caching con Redis Performance
Redis es la solucion mas popular como capa de cache en aplicaciones Python. Reduce la carga sobre la base de datos almacenando resultados de consultas frecuentes en memoria, con tiempos de acceso inferiores a 1 ms. La libreria redis-py ofrece soporte sincrono y asincrono.
Instalacion y conexion basica
# Instalar redis-py con soporte async
pip install redis[hiredis]
# Verificar que Redis esta corriendo
redis-cli ping # Debe responder: PONG # app/cache.py β Conexion a Redis
import redis.asyncio as redis
from app.config import settings
# Pool de conexiones async (reutilizable en toda la app)
redis_pool = redis.ConnectionPool.from_url(
settings.REDIS_URL, # "redis://localhost:6379/0"
max_connections=20,
decode_responses=True,
)
async def get_redis() -> redis.Redis:
"""Dependency injection para FastAPI."""
client = redis.Redis(connection_pool=redis_pool)
try:
yield client
finally:
await client.aclose() Patron de cache con decorador
Un decorador reutilizable que cachea automaticamente el resultado de cualquier funcion async. Soporta TTL configurable y generacion automatica de claves.
# app/utils/cache_decorator.py
import json
import functools
import hashlib
from typing import Optional
import redis.asyncio as redis
from app.cache import redis_pool
def cached(prefix: str, ttl: int = 300, key_builder=None):
"""
Decorador de cache para funciones async.
Args:
prefix: Prefijo para la clave en Redis (ej: "users", "products")
ttl: Tiempo de vida en segundos (default: 5 minutos)
key_builder: Funcion custom para generar la clave
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Generar clave unica basada en argumentos
if key_builder:
cache_key = key_builder(*args, **kwargs)
else:
raw = f"{prefix}:{args}:{sorted(kwargs.items())}"
hashed = hashlib.md5(raw.encode()).hexdigest()
cache_key = f"cache:{prefix}:{hashed}"
client = redis.Redis(connection_pool=redis_pool)
try:
# Intentar leer del cache
cached_value = await client.get(cache_key)
if cached_value is not None:
return json.loads(cached_value)
# Cache miss: ejecutar funcion original
result = await func(*args, **kwargs)
# Guardar resultado en cache
await client.setex(
cache_key,
ttl,
json.dumps(result, default=str),
)
return result
finally:
await client.aclose()
# Metodo para invalidar cache manualmente
wrapper.invalidate = lambda key: _invalidate(prefix, key)
return wrapper
return decorator
async def _invalidate(prefix: str, key: str):
"""Invalida una clave especifica del cache."""
client = redis.Redis(connection_pool=redis_pool)
try:
await client.delete(f"cache:{prefix}:{key}")
finally:
await client.aclose() Uso del decorador en servicios
# app/services/user_service.py
from app.utils.cache_decorator import cached
@cached(prefix="users", ttl=600)
async def get_user_profile(user_id: int) -> dict:
"""Obtiene el perfil completo de un usuario (cacheado 10 min)."""
user = await db.execute(
select(User).where(User.id == user_id)
)
row = user.scalar_one_or_none()
if not row:
return None
return {
"id": row.id,
"email": row.email,
"full_name": row.full_name,
"role": row.role,
}
@cached(prefix="stats", ttl=120)
async def get_dashboard_stats() -> dict:
"""Estadisticas del dashboard (cacheadas 2 min)."""
total_users = await db.scalar(select(func.count(User.id)))
active_users = await db.scalar(
select(func.count(User.id)).where(User.is_active == True)
)
return {
"total_users": total_users,
"active_users": active_users,
} Cache como dependencia de FastAPI
# app/routers/users.py
from fastapi import APIRouter, Depends
import redis.asyncio as redis
from app.cache import get_redis
from app.schemas.user import UserRead
router = APIRouter(prefix="/users")
@router.get("/{user_id}", response_model=UserRead)
async def get_user(
user_id: int,
cache: redis.Redis = Depends(get_redis),
db: AsyncSession = Depends(get_async_db),
):
# 1. Intentar cache primero
cache_key = f"user:{user_id}"
cached = await cache.get(cache_key)
if cached:
return json.loads(cached)
# 2. Cache miss β consultar DB
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 3. Guardar en cache con TTL de 5 minutos
user_data = UserRead.model_validate(user).model_dump()
await cache.setex(cache_key, 300, json.dumps(user_data, default=str))
return user_data
@router.patch("/{user_id}", response_model=UserRead)
async def update_user(
user_id: int,
user_in: UserUpdate,
cache: redis.Redis = Depends(get_redis),
db: AsyncSession = Depends(get_async_db),
):
# ... actualizar en DB ...
# Invalidar cache despues de modificar
await cache.delete(f"user:{user_id}")
await cache.delete("cache:stats:*") # Invalidar stats tambien
return updated_user Estrategias de invalidacion de cache
| Estrategia | Descripcion | Caso de uso | Complejidad |
|---|---|---|---|
| TTL (Time to Live) | La clave expira automaticamente tras N segundos | Datos que toleran ligera desactualizacion (stats, catalogo) | Baja |
| Write-through | Al escribir en DB, se actualiza tambien el cache | Datos que deben estar siempre frescos (perfil de usuario) | Media |
| Write-behind | Se escribe primero al cache y luego async a la DB | Contadores, metricas de alta frecuencia | Alta |
| Invalidacion por patron | Borrar todas las claves que matchean un patron con SCAN | Invalidar todo el cache de un recurso tras un cambio masivo | Media |
| Event-driven | Pub/Sub de Redis notifica cambios y los listeners invalidan | Sistemas distribuidos con multiples instancias | Alta |
TTL recomendados por tipo de dato
Datos de configuracion: 3600s (1 hora). Listados paginados: 60-120s. Perfil de usuario: 300-600s. Estadisticas de dashboard: 30-120s. Datos en tiempo real: no cachear o TTL de 5-10s.
Evitar cache stampede
Cuando muchas peticiones llegan al mismo tiempo y el cache acaba de expirar, todas golpean la DB simultaneamente. Soluciones: usar lock distribuido (solo un proceso reconstruye el cache), TTL con jitter (agregar segundos aleatorios al TTL), o refresh anticipado (renovar el cache antes de que expire).
Structured Logging Observability
El logging estructurado en formato JSON es esencial para sistemas en produccion. Permite buscar, filtrar y agregar logs de forma eficiente en herramientas como ELK Stack (Elasticsearch + Logstash + Kibana), Grafana Loki o Datadog. La libreria structlog es el estandar en Python para logging estructurado.
Instalacion
pip install structlog python-json-logger Configuracion de structlog
# app/logging_config.py
import logging
import structlog
from app.config import settings
def setup_logging():
"""Configura structlog para toda la aplicacion."""
# Procesadores que transforman cada entrada de log
shared_processors = [
structlog.contextvars.merge_contextvars, # Variables de contexto (request_id)
structlog.stdlib.add_log_level, # Agrega campo "level"
structlog.stdlib.add_logger_name, # Agrega campo "logger"
structlog.processors.TimeStamper(fmt="iso"), # Timestamp ISO 8601
structlog.processors.StackInfoRenderer(), # Stack trace si aplica
structlog.processors.UnicodeDecoder(), # Decodificar unicode
]
if settings.DEBUG:
# En desarrollo: output bonito y colorido en consola
structlog.configure(
processors=shared_processors + [
structlog.dev.ConsoleRenderer(colors=True),
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
else:
# En produccion: JSON puro para sistemas de observabilidad
structlog.configure(
processors=shared_processors + [
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Configurar logging estandar de Python para librerias de terceros
logging.basicConfig(
format="%(message)s",
level=logging.INFO if not settings.DEBUG else logging.DEBUG,
) Middleware de Request ID
Cada peticion HTTP recibe un identificador unico que se propaga a traves de todos los logs generados durante esa peticion. Esto permite rastrear el flujo completo de una request en los logs.
# app/middleware/logging_middleware.py
import uuid
import time
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
logger = structlog.get_logger()
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""Middleware que asigna request_id y registra cada peticion."""
async def dispatch(self, request: Request, call_next) -> Response:
# Generar o extraer request_id del header
request_id = request.headers.get(
"X-Request-ID", str(uuid.uuid4())
)
# Bind de variables al contexto de structlog
# Todos los logs dentro de esta request incluiran estos campos
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
client_ip=request.client.host if request.client else "unknown",
)
start_time = time.perf_counter()
logger.info(
"request_started",
query_params=str(request.query_params),
user_agent=request.headers.get("user-agent", ""),
)
try:
response = await call_next(request)
duration_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"request_completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
)
# Propagar request_id en la respuesta
response.headers["X-Request-ID"] = request_id
return response
except Exception as exc:
duration_ms = (time.perf_counter() - start_time) * 1000
logger.error(
"request_failed",
error=str(exc),
error_type=type(exc).__name__,
duration_ms=round(duration_ms, 2),
exc_info=True,
)
raise Registrar el middleware en la aplicacion
# app/main.py
from app.logging_config import setup_logging
from app.middleware.logging_middleware import RequestLoggingMiddleware
# Configurar logging ANTES de crear la app
setup_logging()
app = FastAPI(title="Cookbooks API")
app.add_middleware(RequestLoggingMiddleware) Uso en servicios y repositorios
# app/services/user_service.py
import structlog
logger = structlog.get_logger()
async def create_user(db: AsyncSession, user_in: UserCreate) -> User:
logger.info("creating_user", email=user_in.email, role=user_in.role)
existing = await db.execute(
select(User).where(User.email == user_in.email)
)
if existing.scalar_one_or_none():
logger.warning("user_email_conflict", email=user_in.email)
raise HTTPException(status_code=409, detail="Email already registered")
user = User(**user_in.model_dump())
db.add(user)
await db.commit()
logger.info("user_created", user_id=user.id, email=user.email)
return user Ejemplo de output JSON en produccion
{
"event": "request_completed",
"level": "info",
"logger": "app.middleware.logging_middleware",
"timestamp": "2025-06-15T14:23:01.456789Z",
"request_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"method": "GET",
"path": "/api/v1/users/42",
"client_ip": "192.168.1.100",
"status_code": 200,
"duration_ms": 12.45
} Niveles de log y cuando usarlos
| Nivel | Uso | Ejemplos | Visible en produccion |
|---|---|---|---|
DEBUG | Informacion detallada para diagnostico durante desarrollo | Queries SQL, payloads completos, valores de variables | No (solo en desarrollo) |
INFO | Eventos normales del flujo de la aplicacion | Request completada, usuario creado, cache hit/miss | Si |
WARNING | Situacion inesperada que no impide la operacion | Rate limit cercano, cache miss frecuente, retry de conexion | Si |
ERROR | Error que impide completar una operacion especifica | Fallo en DB, servicio externo caido, validacion inesperada | Si + alerta |
CRITICAL | Error grave que puede comprometer toda la aplicacion | Pool de DB agotado, disco lleno, error de configuracion fatal | Si + alerta inmediata |
Integracion con ELK Stack y Grafana Loki
# docker-compose.logging.yml β Stack de observabilidad
version: "3.8"
services:
# --- Grafana Loki (alternativa ligera a ELK) ---
loki:
image: grafana/loki:2.9.0
ports:
- "3100:3100"
volumes:
- loki-data:/loki
# --- Promtail (recolector de logs para Loki) ---
promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/log:/var/log
- ./promtail-config.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
# --- Grafana (visualizacion) ---
grafana:
image: grafana/grafana:10.0.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
loki-data: # promtail-config.yml
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: fastapi
docker_sd_configs:
- host: unix:///var/run/docker.sock
relabel_configs:
- source_labels: ['__meta_docker_container_name']
target_label: 'container'
pipeline_stages:
- json:
expressions:
level: level
request_id: request_id
event: event
- labels:
level:
event: Correlacion de logs distribuidos
El campo request_id es la clave para rastrear una peticion a traves de multiples servicios. Propaga el header X-Request-ID en llamadas entre microservicios para mantener la trazabilidad completa. En Grafana, filtra por request_id para ver todos los logs de una peticion.
Rendimiento del logging
structlog con cache_logger_on_first_use=True evita reconstruir el pipeline de procesadores en cada llamada. En produccion, usa nivel INFO como minimo para evitar el overhead de mensajes DEBUG. El formato JSON agrega ~5% de overhead comparado con texto plano, pero la capacidad de busqueda lo compensa.