Docs / Agentic AI / Agents in Containers

Agents in Containers

GuΓ­a para containerizar agentes de IA autΓ³nomos. Cubre la arquitectura de un agente como microservicio, comunicaciΓ³n via mensajerΓ­a, persistencia de estado, health checks y escalado horizontal en Kubernetes.

Agent Architecture

Un agente de IA en producciΓ³n es un microservicio que recibe tareas, razona, ejecuta tools y devuelve resultados. Al containerizarlo, lo tratamos como cualquier otro servicio: desplegable, escalable y observable.

text
Arquitectura de un agente containerizado

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Agent Container                        β”‚
β”‚                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Agent Core    β”‚  β”‚ Tool Registryβ”‚  β”‚ Memory/Stateβ”‚  β”‚
β”‚  β”‚  (reasoning)   β”‚  β”‚              β”‚  β”‚             β”‚  β”‚
β”‚  β”‚  LLM calls     β”‚  β”‚  - Search    β”‚  β”‚  - Redis    β”‚  β”‚
β”‚  β”‚  Planning      β”‚  β”‚  - SQL Query β”‚  β”‚  - SQLite   β”‚  β”‚
β”‚  β”‚  Execution     β”‚  β”‚  - API Call  β”‚  β”‚  - Files    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚          β”‚                  β”‚                  β”‚         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚              FastAPI / gRPC Server                  β”‚  β”‚
β”‚  β”‚         /task    /status    /health                 β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚                β”‚                β”‚
     β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
     β”‚ RabbitMQ β”‚   β”‚ LLM API  β”‚   β”‚ PostgreSQL β”‚
     β”‚ (tasks)  β”‚   β”‚(OpenAI/  β”‚   β”‚  (results)  β”‚
     β”‚          β”‚   β”‚ Ollama)  β”‚   β”‚             β”‚
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Containerizing an Agent

El Dockerfile del agente incluye el runtime de Python, las dependencias del framework de agentes, y configura un servidor HTTP para recibir tareas.

dockerfile
# Dockerfile para un agente de IA
FROM python:3.12-slim AS base

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

WORKDIR /agent

# Copiar dependencies primero (cache-friendly)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copiar cΓ³digo del agente
COPY . .

# Crear usuario no-root
RUN useradd -r agent && chown -R agent:agent /agent
USER agent

EXPOSE 8080

HEALTHCHECK --interval=30s --timeout=5s \
  CMD curl -f http://localhost:8080/health || exit 1

CMD ["uvicorn", "agent.server:app", "--host", "0.0.0.0", "--port", "8080"]

Servidor del agente

python
# agent/server.py
import asyncio
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from agent.core import AgentRunner

app = FastAPI(title="AI Agent Service")
runner = AgentRunner()

class TaskRequest(BaseModel):
    task_id: str
    prompt: str
    context: dict = {}
    max_iterations: int = 10

class TaskResponse(BaseModel):
    task_id: str
    status: str
    result: str | None = None

@app.post("/task", response_model=TaskResponse)
async def submit_task(req: TaskRequest, bg: BackgroundTasks):
    """Enviar tarea al agente (async, devuelve inmediatamente)."""
    bg.add_task(runner.execute, req.task_id, req.prompt, req.context)
    return TaskResponse(task_id=req.task_id, status="accepted")

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    return runner.get_status(task_id)

@app.get("/health")
async def health():
    return {"status": "healthy", "agent": runner.name}

Message-driven Communication

Para sistemas multi-agente, la comunicaciΓ³n por colas de mensajes (RabbitMQ, Redis Streams) desacopla los agentes y permite escalado independiente.

python
# agent/consumer.py β€” Consumir tareas desde RabbitMQ
import aio_pika
import json
from agent.core import AgentRunner

async def start_consumer():
    connection = await aio_pika.connect_robust(
        "amqp://user:pass@rabbitmq:5672/"
    )
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)  # Procesar 1 tarea a la vez

    queue = await channel.declare_queue("agent-tasks", durable=True)
    runner = AgentRunner()

    async def process_message(message: aio_pika.IncomingMessage):
        async with message.process():
            task = json.loads(message.body)
            result = await runner.execute(
                task["task_id"],
                task["prompt"],
                task.get("context", {}),
            )
            # Publicar resultado en cola de respuestas
            await channel.default_exchange.publish(
                aio_pika.Message(json.dumps(result).encode()),
                routing_key="agent-results",
            )

    await queue.consume(process_message)

K8s Deployment

Desplegar agentes en Kubernetes permite escalar horizontalmente. Cada agente es un pod que consume tareas de la cola y puede autoescalar segΓΊn la carga.

yaml
# agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ai-agent
  template:
    spec:
      containers:
        - name: agent
          image: registry.io/ai-agent:1.0.0
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "1000m"
              memory: "1Gi"
          env:
            - name: LLM_API_KEY
              valueFrom:
                secretKeyRef:
                  name: llm-secrets
                  key: api-key
            - name: RABBITMQ_URL
              value: "amqp://user:pass@rabbitmq:5672/"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
---
# HPA β€” Autoescalar basado en la longitud de la cola
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-agent
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

KEDA para queue-based scaling

Usa KEDA (Kubernetes Event-Driven Autoscaling) para escalar basado en la longitud de la cola RabbitMQ: mΓ‘s mensajes pendientes β†’ mΓ‘s pods agente.

Observability

Los agentes de IA necesitan observabilidad especial: ademΓ‘s de mΓ©tricas estΓ‘ndar, necesitamos tracking de tokens consumidos, latency por tool y trazas de reasoning.

MΓ©tricaTipoDescripciΓ³n
agent_tasks_totalCounterTotal de tareas procesadas
agent_task_duration_secondsHistogramDuraciΓ³n de la ejecuciΓ³n completa
agent_llm_tokens_totalCounterTokens consumidos (input + output)
agent_tool_calls_totalCounterLlamadas por tool
agent_errors_totalCounterErrores por tipo
agent_iterationsHistogramNΓΊmero de pasos de reasoning por tarea

Structured logging

Usa JSON logging para que Loki/EFK pueda filtrar por task_id, agent_name, tool_name y llm_model. Cada iteraciΓ³n del agente debe loguear su paso de reasoning para depuraciΓ³n.

Multi-Agent Orchestration

Cuando el sistema crece a mΓΊltiples agentes especializados, se necesita un orquestador que reciba las tareas, analice su naturaleza y las enrute al agente adecuado. Este patrΓ³n evita tener un β€œsuper-agente” monolΓ­tico y permite escalar cada especialista de forma independiente.

text
Arquitectura de orquestaciΓ³n multi-agente

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Orchestrator Agent                        β”‚
β”‚                                                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  1. Recibe tarea                                       β”‚  β”‚
β”‚  β”‚  2. Clasifica intent β†’ {code_review, data, devops}     β”‚  β”‚
β”‚  β”‚  3. Consulta registry de agentes disponibles           β”‚  β”‚
β”‚  β”‚  4. Enruta al especialista (via Redis pub/sub)         β”‚  β”‚
β”‚  β”‚  5. Agrega resultados                                  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚              β”‚              β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
    β”‚ Code Review β”‚β”‚ Data Agent  β”‚β”‚ DevOps      β”‚
    β”‚ Agent       β”‚β”‚             β”‚β”‚ Agent       β”‚
    β”‚ (3 replicas)β”‚β”‚(2 replicas) β”‚β”‚(2 replicas) β”‚
    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
           β”‚              β”‚              β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
    β”‚          Redis (shared state + pub/sub)    β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Estado compartido via Redis

Los agentes necesitan compartir estado para coordinar tareas complejas que involucran a varios especialistas. Redis funciona como el almacΓ©n de estado compartido, cola de mensajes y sistema de descubrimiento.

python
# orchestrator/shared_state.py
import redis.asyncio as redis
import json
from datetime import datetime, timedelta

class SharedAgentState:
    """Estado compartido entre agentes via Redis."""

    def __init__(self, redis_url: str = "redis://redis:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)

    async def register_agent(self, agent_id: str, capabilities: list[str]):
        """Registrar un agente especialista con sus capacidades."""
        agent_info = {
            "agent_id": agent_id,
            "capabilities": capabilities,
            "status": "available",
            "registered_at": datetime.utcnow().isoformat(),
        }
        # Hash con info del agente
        await self.redis.hset(f"agent:{agent_id}", mapping={
            k: json.dumps(v) if isinstance(v, list) else v
            for k, v in agent_info.items()
        })
        # Índice de capacidades β†’ agentes
        for cap in capabilities:
            await self.redis.sadd(f"capability:{cap}", agent_id)
        # TTL como heartbeat β€” el agente renueva cada 30s
        await self.redis.expire(f"agent:{agent_id}", 60)

    async def find_agents_for_task(self, required_capability: str) -> list[str]:
        """Encontrar agentes disponibles con la capacidad requerida."""
        agent_ids = await self.redis.smembers(f"capability:{required_capability}")
        available = []
        for agent_id in agent_ids:
            status = await self.redis.hget(f"agent:{agent_id}", "status")
            if status == "available":
                available.append(agent_id)
        return available

    async def set_task_result(self, task_id: str, agent_id: str, result: dict):
        """Almacenar resultado de una sub-tarea."""
        await self.redis.hset(f"task:{task_id}:results", agent_id, json.dumps(result))
        await self.redis.expire(f"task:{task_id}:results", 3600)

    async def get_all_results(self, task_id: str) -> dict:
        """Obtener todos los resultados parciales de una tarea."""
        raw = await self.redis.hgetall(f"task:{task_id}:results")
        return {k: json.loads(v) for k, v in raw.items()}

Circuit breaker para APIs de LLM

Las APIs de LLM son el punto de fallo mas critico. Un circuit breaker evita cascadas de errores cuando la API esta caida, abriendo el circuito tras N fallos consecutivos y redirigiendo a un modelo de fallback.

python
# orchestrator/circuit_breaker.py
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field

class CircuitState(Enum):
    CLOSED = "closed"       # Funcionando normal
    OPEN = "open"           # Cortado β€” no envΓ­a requests
    HALF_OPEN = "half_open" # Probando si se recuperΓ³

@dataclass
class CircuitBreaker:
    """Circuit breaker para llamadas a APIs de LLM."""
    failure_threshold: int = 5          # Fallos para abrir circuito
    recovery_timeout: float = 60.0      # Segundos antes de probar recovery
    half_open_max_calls: int = 2        # Calls en half-open para validar

    state: CircuitState = field(default=CircuitState.CLOSED)
    failure_count: int = field(default=0)
    last_failure_time: float = field(default=0.0)
    half_open_calls: int = field(default=0)

    async def call(self, func, *args, **kwargs):
        """Ejecutar funciΓ³n protegida por circuit breaker."""
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitBreakerOpen(
                    f"Circuito abierto. Reintentando en "
                    f"{self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
                )

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        else:
            self.failure_count = 0

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

class CircuitBreakerOpen(Exception):
    pass

Orquestador completo con enrutamiento inteligente

El orquestador actua como punto de entrada unico: clasifica cada tarea, la enruta al agente especialista correcto, espera los resultados y los agrega en una respuesta unificada.

python
# orchestrator/main.py
import asyncio
import json
import redis.asyncio as redis
from fastapi import FastAPI
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from orchestrator.shared_state import SharedAgentState
from orchestrator.circuit_breaker import CircuitBreaker, CircuitBreakerOpen

app = FastAPI(title="Agent Orchestrator")
state = SharedAgentState()
llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

# LLM para clasificaciΓ³n (modelo rΓ‘pido y barato)
classifier_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# LLM de fallback si el principal falla
fallback_llm = ChatOpenAI(
    model="gpt-4o-mini",
    base_url="http://ollama:11434/v1",  # Ollama local como fallback
    api_key="not-needed",
)

class OrchestratorRequest(BaseModel):
    task_id: str
    prompt: str
    context: dict = {}

# ── Clasificar intent de la tarea ──
async def classify_task(prompt: str) -> str:
    """Usar LLM rΓ‘pido para determinar quΓ© agente necesita esta tarea."""
    classification_prompt = f"""Clasifica esta tarea en UNA categorΓ­a:
    - code_review: revisiΓ³n de cΓ³digo, bugs, mejoras
    - data_analysis: consultas SQL, anΓ‘lisis de datos, reportes
    - devops: infraestructura, deployment, monitoring
    - general: cualquier otra cosa

    Tarea: {prompt}

    Responde SOLO con la categorΓ­a, sin explicaciΓ³n."""

    try:
        response = await llm_breaker.call(
            classifier_llm.ainvoke, classification_prompt
        )
        return response.content.strip().lower()
    except CircuitBreakerOpen:
        # Fallback a modelo local si la API principal estΓ‘ caΓ­da
        response = await fallback_llm.ainvoke(classification_prompt)
        return response.content.strip().lower()

# ── Enrutar tarea al agente especialista ──
async def route_to_specialist(task_id: str, category: str, prompt: str, context: dict):
    """Publicar tarea en el canal Redis del agente especialista."""
    r = redis.from_url("redis://redis:6379", decode_responses=True)

    # Buscar agentes disponibles para esta capacidad
    agents = await state.find_agents_for_task(category)
    if not agents:
        raise ValueError(f"No hay agentes disponibles para: {category}")

    # Publicar tarea en canal del tipo de agente
    task_payload = json.dumps({
        "task_id": task_id,
        "prompt": prompt,
        "context": context,
        "assigned_to": agents[0],  # Round-robin simple
    })
    await r.publish(f"tasks:{category}", task_payload)
    return agents[0]

# ── Endpoint principal del orquestador ──
@app.post("/orchestrate")
async def orchestrate(req: OrchestratorRequest):
    """Recibe tarea, clasifica, enruta y devuelve resultado."""
    # 1. Clasificar
    category = await classify_task(req.prompt)

    # 2. Enrutar al especialista
    try:
        assigned_agent = await route_to_specialist(
            req.task_id, category, req.prompt, req.context
        )
    except ValueError as e:
        return {"task_id": req.task_id, "status": "no_agent", "error": str(e)}

    # 3. Esperar resultado (con timeout)
    try:
        result = await asyncio.wait_for(
            wait_for_result(req.task_id, assigned_agent),
            timeout=120.0,
        )
        return {"task_id": req.task_id, "status": "completed", "result": result}
    except asyncio.TimeoutError:
        return {"task_id": req.task_id, "status": "timeout"}

async def wait_for_result(task_id: str, agent_id: str) -> dict:
    """Polling de resultados hasta que el agente complete."""
    while True:
        results = await state.get_all_results(task_id)
        if agent_id in results:
            return results[agent_id]
        await asyncio.sleep(1.0)

# ── Health check con estado de agentes ──
@app.get("/health")
async def health():
    r = redis.from_url("redis://redis:6379", decode_responses=True)
    agent_keys = [k async for k in r.scan_iter("agent:*")]
    return {
        "status": "healthy",
        "circuit_breaker": llm_breaker.state.value,
        "registered_agents": len(agent_keys),
    }

Docker Compose para el sistema multi-agente

yaml
# docker-compose.yaml
services:
  orchestrator:
    build: ./orchestrator
    ports:
      - "8080:8080"
    environment:
      - REDIS_URL=redis://redis:6379
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - redis

  code-review-agent:
    build: ./agents/code-review
    deploy:
      replicas: 3
    environment:
      - REDIS_URL=redis://redis:6379
      - AGENT_CAPABILITIES=code_review
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - redis

  data-agent:
    build: ./agents/data-analysis
    deploy:
      replicas: 2
    environment:
      - REDIS_URL=redis://redis:6379
      - AGENT_CAPABILITIES=data_analysis
      - DATABASE_URL=postgresql://user:pass@postgres:5432/analytics

  devops-agent:
    build: ./agents/devops
    deploy:
      replicas: 2
    environment:
      - REDIS_URL=redis://redis:6379
      - AGENT_CAPABILITIES=devops
      - KUBECONFIG=/etc/kube/config

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  ollama:
    image: ollama/ollama:latest
    ports:
      - "11434:11434"
    volumes:
      - ollama-models:/root/.ollama

volumes:
  redis-data:
  ollama-models:
PatrΓ³nCuΓ‘ndo usarloComplejidad
Orquestador centralizadoCuando el flujo de tareas es predecible y hay pocos tipos de agentesMedia
Service mesh (Istio/Linkerd)Cuando los agentes necesitan comunicarse directamente entre sΓ­ con mTLSAlta
Event-driven (Redis pub/sub)Cuando los agentes son independientes y procesan eventos asΓ­ncronamenteBaja
JerΓ‘rquico (manager agent)Cuando las tareas requieren descomposiciΓ³n dinΓ‘mica en sub-tareasAlta

Circuit breaker es obligatorio

Sin circuit breaker, un fallo en la API de OpenAI puede derribar todo el sistema multi-agente. Cada agente y el orquestador deben tener su propio circuit breaker configurado. Usa un modelo local (Ollama) como fallback para mantener disponibilidad parcial cuando la API externa estΓ‘ caΓ­da.

END OF DOCUMENT

ΒΏNecesitas mΓ‘s? Volver a la LibrerΓ­a →