Agents in Containers
GuΓa para containerizar agentes de IA autΓ³nomos. Cubre la arquitectura de un agente como microservicio, comunicaciΓ³n via mensajerΓa, persistencia de estado, health checks y escalado horizontal en Kubernetes.
Agent Architecture
Un agente de IA en producciΓ³n es un microservicio que recibe tareas, razona, ejecuta tools y devuelve resultados. Al containerizarlo, lo tratamos como cualquier otro servicio: desplegable, escalable y observable.
Arquitectura de un agente containerizado
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Agent Container β
β β
β ββββββββββββββββββ ββββββββββββββββ βββββββββββββββ β
β β Agent Core β β Tool Registryβ β Memory/Stateβ β
β β (reasoning) β β β β β β
β β LLM calls β β - Search β β - Redis β β
β β Planning β β - SQL Query β β - SQLite β β
β β Execution β β - API Call β β - Files β β
β βββββββββ¬βββββββββ ββββββββ¬ββββββββ ββββββββ¬βββββββ β
β β β β β
β βββββββββΌβββββββββββββββββββΌβββββββββββββββββββΌβββββββ β
β β FastAPI / gRPC Server β β
β β /task /status /health β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββΌβββββββββββββββββ
β β β
ββββββΌββββββ βββββββΌβββββ ββββββββΌβββββββ
β RabbitMQ β β LLM API β β PostgreSQL β
β (tasks) β β(OpenAI/ β β (results) β
β β β Ollama) β β β
ββββββββββββ ββββββββββββ βββββββββββββββ Containerizing an Agent
El Dockerfile del agente incluye el runtime de Python, las dependencias del framework de agentes, y configura un servidor HTTP para recibir tareas.
# Dockerfile para un agente de IA
FROM python:3.12-slim AS base
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
WORKDIR /agent
# Copiar dependencies primero (cache-friendly)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copiar cΓ³digo del agente
COPY . .
# Crear usuario no-root
RUN useradd -r agent && chown -R agent:agent /agent
USER agent
EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=5s \
CMD curl -f http://localhost:8080/health || exit 1
CMD ["uvicorn", "agent.server:app", "--host", "0.0.0.0", "--port", "8080"] Servidor del agente
# agent/server.py
import asyncio
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from agent.core import AgentRunner
app = FastAPI(title="AI Agent Service")
runner = AgentRunner()
class TaskRequest(BaseModel):
task_id: str
prompt: str
context: dict = {}
max_iterations: int = 10
class TaskResponse(BaseModel):
task_id: str
status: str
result: str | None = None
@app.post("/task", response_model=TaskResponse)
async def submit_task(req: TaskRequest, bg: BackgroundTasks):
"""Enviar tarea al agente (async, devuelve inmediatamente)."""
bg.add_task(runner.execute, req.task_id, req.prompt, req.context)
return TaskResponse(task_id=req.task_id, status="accepted")
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
return runner.get_status(task_id)
@app.get("/health")
async def health():
return {"status": "healthy", "agent": runner.name} Message-driven Communication
Para sistemas multi-agente, la comunicaciΓ³n por colas de mensajes (RabbitMQ, Redis Streams) desacopla los agentes y permite escalado independiente.
# agent/consumer.py β Consumir tareas desde RabbitMQ
import aio_pika
import json
from agent.core import AgentRunner
async def start_consumer():
connection = await aio_pika.connect_robust(
"amqp://user:pass@rabbitmq:5672/"
)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1) # Procesar 1 tarea a la vez
queue = await channel.declare_queue("agent-tasks", durable=True)
runner = AgentRunner()
async def process_message(message: aio_pika.IncomingMessage):
async with message.process():
task = json.loads(message.body)
result = await runner.execute(
task["task_id"],
task["prompt"],
task.get("context", {}),
)
# Publicar resultado en cola de respuestas
await channel.default_exchange.publish(
aio_pika.Message(json.dumps(result).encode()),
routing_key="agent-results",
)
await queue.consume(process_message) K8s Deployment
Desplegar agentes en Kubernetes permite escalar horizontalmente. Cada agente es un pod que consume tareas de la cola y puede autoescalar segΓΊn la carga.
# agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent
spec:
replicas: 3
selector:
matchLabels:
app: ai-agent
template:
spec:
containers:
- name: agent
image: registry.io/ai-agent:1.0.0
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
env:
- name: LLM_API_KEY
valueFrom:
secretKeyRef:
name: llm-secrets
key: api-key
- name: RABBITMQ_URL
value: "amqp://user:pass@rabbitmq:5672/"
livenessProbe:
httpGet:
path: /health
port: 8080
---
# HPA β Autoescalar basado en la longitud de la cola
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ai-agent
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70 KEDA para queue-based scaling
Usa KEDA (Kubernetes Event-Driven Autoscaling) para escalar basado en la longitud de la cola RabbitMQ: mΓ‘s mensajes pendientes β mΓ‘s pods agente.
Observability
Los agentes de IA necesitan observabilidad especial: ademΓ‘s de mΓ©tricas estΓ‘ndar, necesitamos tracking de tokens consumidos, latency por tool y trazas de reasoning.
| MΓ©trica | Tipo | DescripciΓ³n |
|---|---|---|
agent_tasks_total | Counter | Total de tareas procesadas |
agent_task_duration_seconds | Histogram | DuraciΓ³n de la ejecuciΓ³n completa |
agent_llm_tokens_total | Counter | Tokens consumidos (input + output) |
agent_tool_calls_total | Counter | Llamadas por tool |
agent_errors_total | Counter | Errores por tipo |
agent_iterations | Histogram | NΓΊmero de pasos de reasoning por tarea |
Structured logging
Usa JSON logging para que Loki/EFK pueda filtrar por
task_id, agent_name, tool_name y
llm_model. Cada iteraciΓ³n del agente debe loguear su paso de
reasoning para depuraciΓ³n.
Multi-Agent Orchestration
Cuando el sistema crece a mΓΊltiples agentes especializados, se necesita un orquestador que reciba las tareas, analice su naturaleza y las enrute al agente adecuado. Este patrΓ³n evita tener un βsuper-agenteβ monolΓtico y permite escalar cada especialista de forma independiente.
Arquitectura de orquestaciΓ³n multi-agente
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Orchestrator Agent β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. Recibe tarea β β
β β 2. Clasifica intent β {code_review, data, devops} β β
β β 3. Consulta registry de agentes disponibles β β
β β 4. Enruta al especialista (via Redis pub/sub) β β
β β 5. Agrega resultados β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββ¬βββββββββββββββ¬βββββββββββββββ¬ββββββββββββββββββββββ
β β β
ββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββ
β Code Review ββ Data Agent ββ DevOps β
β Agent ββ ββ Agent β
β (3 replicas)ββ(2 replicas) ββ(2 replicas) β
ββββββββ¬βββββββββββββββ¬βββββββββββββββ¬βββββββ
β β β
ββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββ
β Redis (shared state + pub/sub) β
βββββββββββββββββββββββββββββββββββββββββββββ Estado compartido via Redis
Los agentes necesitan compartir estado para coordinar tareas complejas que involucran a varios especialistas. Redis funciona como el almacΓ©n de estado compartido, cola de mensajes y sistema de descubrimiento.
# orchestrator/shared_state.py
import redis.asyncio as redis
import json
from datetime import datetime, timedelta
class SharedAgentState:
"""Estado compartido entre agentes via Redis."""
def __init__(self, redis_url: str = "redis://redis:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
async def register_agent(self, agent_id: str, capabilities: list[str]):
"""Registrar un agente especialista con sus capacidades."""
agent_info = {
"agent_id": agent_id,
"capabilities": capabilities,
"status": "available",
"registered_at": datetime.utcnow().isoformat(),
}
# Hash con info del agente
await self.redis.hset(f"agent:{agent_id}", mapping={
k: json.dumps(v) if isinstance(v, list) else v
for k, v in agent_info.items()
})
# Γndice de capacidades β agentes
for cap in capabilities:
await self.redis.sadd(f"capability:{cap}", agent_id)
# TTL como heartbeat β el agente renueva cada 30s
await self.redis.expire(f"agent:{agent_id}", 60)
async def find_agents_for_task(self, required_capability: str) -> list[str]:
"""Encontrar agentes disponibles con la capacidad requerida."""
agent_ids = await self.redis.smembers(f"capability:{required_capability}")
available = []
for agent_id in agent_ids:
status = await self.redis.hget(f"agent:{agent_id}", "status")
if status == "available":
available.append(agent_id)
return available
async def set_task_result(self, task_id: str, agent_id: str, result: dict):
"""Almacenar resultado de una sub-tarea."""
await self.redis.hset(f"task:{task_id}:results", agent_id, json.dumps(result))
await self.redis.expire(f"task:{task_id}:results", 3600)
async def get_all_results(self, task_id: str) -> dict:
"""Obtener todos los resultados parciales de una tarea."""
raw = await self.redis.hgetall(f"task:{task_id}:results")
return {k: json.loads(v) for k, v in raw.items()} Circuit breaker para APIs de LLM
Las APIs de LLM son el punto de fallo mas critico. Un circuit breaker evita cascadas de errores cuando la API esta caida, abriendo el circuito tras N fallos consecutivos y redirigiendo a un modelo de fallback.
# orchestrator/circuit_breaker.py
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
class CircuitState(Enum):
CLOSED = "closed" # Funcionando normal
OPEN = "open" # Cortado β no envΓa requests
HALF_OPEN = "half_open" # Probando si se recuperΓ³
@dataclass
class CircuitBreaker:
"""Circuit breaker para llamadas a APIs de LLM."""
failure_threshold: int = 5 # Fallos para abrir circuito
recovery_timeout: float = 60.0 # Segundos antes de probar recovery
half_open_max_calls: int = 2 # Calls en half-open para validar
state: CircuitState = field(default=CircuitState.CLOSED)
failure_count: int = field(default=0)
last_failure_time: float = field(default=0.0)
half_open_calls: int = field(default=0)
async def call(self, func, *args, **kwargs):
"""Ejecutar funciΓ³n protegida por circuit breaker."""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitBreakerOpen(
f"Circuito abierto. Reintentando en "
f"{self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerOpen(Exception):
pass Orquestador completo con enrutamiento inteligente
El orquestador actua como punto de entrada unico: clasifica cada tarea, la enruta al agente especialista correcto, espera los resultados y los agrega en una respuesta unificada.
# orchestrator/main.py
import asyncio
import json
import redis.asyncio as redis
from fastapi import FastAPI
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from orchestrator.shared_state import SharedAgentState
from orchestrator.circuit_breaker import CircuitBreaker, CircuitBreakerOpen
app = FastAPI(title="Agent Orchestrator")
state = SharedAgentState()
llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
# LLM para clasificaciΓ³n (modelo rΓ‘pido y barato)
classifier_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# LLM de fallback si el principal falla
fallback_llm = ChatOpenAI(
model="gpt-4o-mini",
base_url="http://ollama:11434/v1", # Ollama local como fallback
api_key="not-needed",
)
class OrchestratorRequest(BaseModel):
task_id: str
prompt: str
context: dict = {}
# ββ Clasificar intent de la tarea ββ
async def classify_task(prompt: str) -> str:
"""Usar LLM rΓ‘pido para determinar quΓ© agente necesita esta tarea."""
classification_prompt = f"""Clasifica esta tarea en UNA categorΓa:
- code_review: revisiΓ³n de cΓ³digo, bugs, mejoras
- data_analysis: consultas SQL, anΓ‘lisis de datos, reportes
- devops: infraestructura, deployment, monitoring
- general: cualquier otra cosa
Tarea: {prompt}
Responde SOLO con la categorΓa, sin explicaciΓ³n."""
try:
response = await llm_breaker.call(
classifier_llm.ainvoke, classification_prompt
)
return response.content.strip().lower()
except CircuitBreakerOpen:
# Fallback a modelo local si la API principal estΓ‘ caΓda
response = await fallback_llm.ainvoke(classification_prompt)
return response.content.strip().lower()
# ββ Enrutar tarea al agente especialista ββ
async def route_to_specialist(task_id: str, category: str, prompt: str, context: dict):
"""Publicar tarea en el canal Redis del agente especialista."""
r = redis.from_url("redis://redis:6379", decode_responses=True)
# Buscar agentes disponibles para esta capacidad
agents = await state.find_agents_for_task(category)
if not agents:
raise ValueError(f"No hay agentes disponibles para: {category}")
# Publicar tarea en canal del tipo de agente
task_payload = json.dumps({
"task_id": task_id,
"prompt": prompt,
"context": context,
"assigned_to": agents[0], # Round-robin simple
})
await r.publish(f"tasks:{category}", task_payload)
return agents[0]
# ββ Endpoint principal del orquestador ββ
@app.post("/orchestrate")
async def orchestrate(req: OrchestratorRequest):
"""Recibe tarea, clasifica, enruta y devuelve resultado."""
# 1. Clasificar
category = await classify_task(req.prompt)
# 2. Enrutar al especialista
try:
assigned_agent = await route_to_specialist(
req.task_id, category, req.prompt, req.context
)
except ValueError as e:
return {"task_id": req.task_id, "status": "no_agent", "error": str(e)}
# 3. Esperar resultado (con timeout)
try:
result = await asyncio.wait_for(
wait_for_result(req.task_id, assigned_agent),
timeout=120.0,
)
return {"task_id": req.task_id, "status": "completed", "result": result}
except asyncio.TimeoutError:
return {"task_id": req.task_id, "status": "timeout"}
async def wait_for_result(task_id: str, agent_id: str) -> dict:
"""Polling de resultados hasta que el agente complete."""
while True:
results = await state.get_all_results(task_id)
if agent_id in results:
return results[agent_id]
await asyncio.sleep(1.0)
# ββ Health check con estado de agentes ββ
@app.get("/health")
async def health():
r = redis.from_url("redis://redis:6379", decode_responses=True)
agent_keys = [k async for k in r.scan_iter("agent:*")]
return {
"status": "healthy",
"circuit_breaker": llm_breaker.state.value,
"registered_agents": len(agent_keys),
} Docker Compose para el sistema multi-agente
# docker-compose.yaml
services:
orchestrator:
build: ./orchestrator
ports:
- "8080:8080"
environment:
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
code-review-agent:
build: ./agents/code-review
deploy:
replicas: 3
environment:
- REDIS_URL=redis://redis:6379
- AGENT_CAPABILITIES=code_review
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
data-agent:
build: ./agents/data-analysis
deploy:
replicas: 2
environment:
- REDIS_URL=redis://redis:6379
- AGENT_CAPABILITIES=data_analysis
- DATABASE_URL=postgresql://user:pass@postgres:5432/analytics
devops-agent:
build: ./agents/devops
deploy:
replicas: 2
environment:
- REDIS_URL=redis://redis:6379
- AGENT_CAPABILITIES=devops
- KUBECONFIG=/etc/kube/config
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama-models:/root/.ollama
volumes:
redis-data:
ollama-models: | PatrΓ³n | CuΓ‘ndo usarlo | Complejidad |
|---|---|---|
| Orquestador centralizado | Cuando el flujo de tareas es predecible y hay pocos tipos de agentes | Media |
| Service mesh (Istio/Linkerd) | Cuando los agentes necesitan comunicarse directamente entre sΓ con mTLS | Alta |
| Event-driven (Redis pub/sub) | Cuando los agentes son independientes y procesan eventos asΓncronamente | Baja |
| JerΓ‘rquico (manager agent) | Cuando las tareas requieren descomposiciΓ³n dinΓ‘mica en sub-tareas | Alta |
Circuit breaker es obligatorio
Sin circuit breaker, un fallo en la API de OpenAI puede derribar todo el sistema multi-agente. Cada agente y el orquestador deben tener su propio circuit breaker configurado. Usa un modelo local (Ollama) como fallback para mantener disponibilidad parcial cuando la API externa estΓ‘ caΓda.