Docs / Agentic AI / CrewAI Multi-Agent

CrewAI Multi-Agent

Guía para construir sistemas multi-agente con CrewAI. Cubre la definición de agentes especializados, tareas, herramientas, flujos de trabajo secuenciales e jerárquicos, y memoria compartida.

CrewAI Overview

CrewAI permite crear equipos de agentes de IA que colaboran para resolver tareas complejas. Cada agente tiene un rol, objetivo, backstory y herramientas específicas.

text
Arquitectura CrewAI

┌──────────────────────────────────────────────────────────┐
│                       CREW                               │
│                                                          │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐   │
│  │ Agent        │  │ Agent        │  │ Agent        │   │
│  │ Researcher   │  │ Writer       │  │ Reviewer     │   │
│  │              │  │              │  │              │   │
│  │ Tools:       │  │ Tools:       │  │ Tools:       │   │
│  │ - Search     │  │ - FileWrite  │  │ - CodeExec   │   │
│  │ - Scrape     │  │ - Format     │  │ - Lint       │   │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘   │
│         │                 │                  │           │
│  ┌──────▼─────────────────▼──────────────────▼───────┐   │
│  │              Task Pipeline                          │   │
│  │  Task 1 (Research) → Task 2 (Write) → Task 3 (QA)│   │
│  └───────────────────────────────────────────────────┘   │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │  Process: sequential | hierarchical                  │  │
│  │  Memory:  shared context between agents              │  │
│  │  LLM:    GPT-4o / Llama / Mixtral                    │  │
│  └────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────┘
bash
# Instalación
pip install crewai crewai-tools

Agents & Roles

Cada agente tiene un rol claro, un objetivo, y una backstory que define su personalidad y expertise.

python
from crewai import Agent
from crewai_tools import SerperDevTool, ScrapeWebsiteTool

# ── Agente investigador ──
researcher = Agent(
    role="Senior Tech Researcher",
    goal="Investigar las últimas tendencias y mejores prácticas en {topic}",
    backstory="""Eres un investigador técnico senior con 10 años de experiencia
    analizando tecnologías emergentes. Eres meticuloso y siempre citas fuentes.
    Tu especialidad es separar el hype de lo realmente útil.""",
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
    verbose=True,
    memory=True,
    max_iter=5,
)

# ── Agente escritor ──
writer = Agent(
    role="Technical Writer",
    goal="Crear documentación clara y completa sobre {topic}",
    backstory="""Eres un escritor técnico que transforma investigaciones complejas
    en guías prácticas y bien estructuradas. Priorizas ejemplos de código
    funcionales sobre explicaciones teóricas.""",
    verbose=True,
    memory=True,
)

# ── Agente revisor ──
reviewer = Agent(
    role="Code Reviewer & QA",
    goal="Revisar la documentación para asegurar precisión técnica y calidad",
    backstory="""Eres un ingeniero principal conocido por tu atención al detalle.
    Detectas errores técnicos, inconsistencias y sugieres mejoras.
    Nunca apruebas algo que no esté listo para producción.""",
    verbose=True,
    allow_delegation=True,  # Puede pedir correcciones al writer
)

Backstory matters

La backstory es crucial. Un agente con backstory “eres un junior aprendiendo” produce resultados muy diferentes a “eres un senior con 15 años de experiencia”. Sé específico.

Tasks & Workflows

Las Tasks definen el trabajo a realizar. Se asignan a agentes y se encadenan en pipelines secuenciales o jerárquicos.

python
from crewai import Task, Crew, Process

# ── Definir tareas ──
research_task = Task(
    description="""Investiga {topic}:
    1. Estado actual de la tecnología
    2. Principales frameworks y herramientas
    3. Casos de uso en producción
    4. Mejores prácticas y anti-patterns
    Incluye URLs de fuentes verificadas.""",
    expected_output="Documento de investigación con al menos 5 fuentes citadas",
    agent=researcher,
)

write_task = Task(
    description="""Basándote en la investigación, crea una guía técnica:
    - Estructura clara con secciones y subsecciones
    - Ejemplos de código funcionales (Python)
    - Tabla comparativa de herramientas
    - Sección de troubleshooting""",
    expected_output="Guía técnica lista para publicar en formato Markdown",
    agent=writer,
    context=[research_task],  # Recibe output del researcher
)

review_task = Task(
    description="""Revisa la guía técnica:
    - Verifica que los ejemplos de código sean correctos
    - Asegura consistencia terminológica
    - Valida que las fuentes sean actuales (2024+)
    - Score final del 1 al 10""",
    expected_output="Reporte de revisión con score y lista de correcciones",
    agent=reviewer,
    context=[write_task],
)

# ── Crear y ejecutar crew ──
crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, write_task, review_task],
    process=Process.sequential,     # Ejecutar en orden
    verbose=True,
    memory=True,                    # Memoria compartida entre agentes
)

result = crew.kickoff(inputs={"topic": "Kubernetes Operators"})
print(result)
ProcessDescripciónCaso de uso
sequentialTareas se ejecutan en orden, cada una recibe el output anteriorPipelines lineales (research → write → review)
hierarchicalUn agente manager coordina y delega tareas dinámicamenteTareas complejas con decisiones en runtime

Custom Tools

Las herramientas dan a los agentes acceso al mundo real: APIs, bases de datos, sistemas de archivos, etc. CrewAI soporta tools propios y de LangChain.

python
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import httpx

# ── Tool personalizado: consultar API interna ──
class APIQueryInput(BaseModel):
    endpoint: str = Field(description="Endpoint de la API (ej: /users, /orders)")
    method: str = Field(default="GET", description="HTTP method")

class InternalAPITool(BaseTool):
    name: str = "internal_api"
    description: str = "Consulta la API interna del backend. Usa para obtener datos de producción."
    args_schema = APIQueryInput

    def _run(self, endpoint: str, method: str = "GET") -> str:
        base = "https://api.internal.company.com"
        response = httpx.request(method, f"{base}{endpoint}")
        return response.text

# ── Tool para ejecutar SQL de solo lectura ──
class ReadOnlySQLTool(BaseTool):
    name: str = "sql_query"
    description: str = "Ejecuta queries SQL de solo lectura en la base de datos."

    def _run(self, query: str) -> str:
        if any(kw in query.upper() for kw in ["DROP", "DELETE", "UPDATE", "INSERT"]):
            return "ERROR: Solo queries SELECT permitidas"
        # Ejecutar query segura...
        return execute_query(query)

Production Patterns

Patrones para llevar crews de agentes a producción: gestión de errores, timeouts, costes y observabilidad.

python
from crewai import Crew, Process
import logging

# ── Configuración production-ready ──
crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, write_task, review_task],
    process=Process.sequential,
    verbose=False,        # Sin logs verbosos en prod
    memory=True,
    max_rpm=30,           # Rate limit para API calls
    share_crew=False,      # No compartir telemetría
    full_output=True,      # Devolver output de todas las tasks
)

# ── Wrapper con error handling ──
async def run_crew_safe(topic: str) -> dict:
    try:
        result = crew.kickoff(inputs={"topic": topic})
        return {
            "status": "success",
            "output": result.raw,
            "tasks": [t.output.raw for t in result.tasks_output],
            "token_usage": result.token_usage,
        }
    except Exception as e:
        logging.error(f"Crew failed: {e}")
        return {"status": "error", "error": str(e)}

Costes de API

Un crew con 3 agentes y 3 tasks secuenciales puede hacer 15-30+ llamadas a la API del LLM por ejecución. Monitoriza token_usage y configura max_rpm para controlar costes.

Monitoring & Scaling

Para usar CrewAI en producción, necesitas monitorización de cada ejecución (tokens, tiempo, errores por agente), estimación de costes en tiempo real, y una estrategia de escalado para servir múltiples crews concurrentes desde una API web.

text
Arquitectura de monitorización y escalado

┌────────────────────────────────────────────────────────────┐
│                    FastAPI Web Service                      │
│                                                            │
│  POST /crew/run  ──▶  Celery Task Queue                   │
│  GET  /crew/{id} ──▶  Redis (resultados)                  │
│                                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Callbacks & Monitoring                   │   │
│  │  on_task_start → log + metrics                       │   │
│  │  on_task_end   → tokens + cost + duration            │   │
│  │  on_agent_action → tool_calls tracking               │   │
│  └─────────────────────────────────────────────────────┘   │
└────────────────┬───────────────────────────────────────────┘

    ┌────────────▼────────────┐
    │  Celery Workers (N)     │
    │  Cada worker ejecuta    │
    │  un crew independiente  │
    └────────────┬────────────┘

    ┌────────────▼────────────┐
    │  Redis                  │
    │  - Task queue           │
    │  - Results store        │
    │  - Metrics buffer       │
    └─────────────────────────┘

Callbacks para monitorización detallada

CrewAI emite eventos en cada paso de la ejecución. Los callbacks permiten capturar métricas granulares: tokens por agente, duración por task, herramientas usadas y errores.

python
# monitoring/callbacks.py
import time
import logging
from dataclasses import dataclass, field
from crewai.utilities.events import (
    crewai_event_bus,
    AgentExecutionStarted,
    AgentExecutionCompleted,
    TaskEvaluationStarted,
    TaskEvaluationCompleted,
    ToolUsageStarted,
    ToolUsageFinished,
    ToolUsageError,
    CrewKickoffStarted,
    CrewKickoffCompleted,
)

logger = logging.getLogger("crewai.monitor")

@dataclass
class CrewMetrics:
    """Métricas acumuladas de una ejecución del crew."""
    crew_id: str
    start_time: float = 0.0
    end_time: float = 0.0
    total_tokens: int = 0
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_cost_usd: float = 0.0
    tasks_completed: int = 0
    tasks_failed: int = 0
    tool_calls: int = 0
    tool_errors: int = 0
    agent_metrics: dict = field(default_factory=dict)
    task_durations: list = field(default_factory=list)

    @property
    def total_duration(self) -> float:
        return self.end_time - self.start_time if self.end_time else 0.0

# ── Precios por modelo (USD por 1M tokens) ──
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
}

class CrewMonitor:
    """Monitor de ejecución de crews con callbacks."""

    def __init__(self, crew_id: str, model: str = "gpt-4o"):
        self.metrics = CrewMetrics(crew_id=crew_id)
        self.model = model
        self._task_start_times: dict[str, float] = {}
        self._register_callbacks()

    def _register_callbacks(self):
        """Registrar listeners en el event bus de CrewAI."""

        @crewai_event_bus.on(CrewKickoffStarted)
        def on_crew_start(source, event):
            self.metrics.start_time = time.time()
            logger.info(f"[{self.metrics.crew_id}] Crew iniciado")

        @crewai_event_bus.on(CrewKickoffCompleted)
        def on_crew_complete(source, event):
            self.metrics.end_time = time.time()
            duration = self.metrics.total_duration
            logger.info(
                f"[{self.metrics.crew_id}] Crew completado en {duration:.1f}s | "
                f"Tokens: {self.metrics.total_tokens} | "
                f"Coste: ${self.metrics.total_cost_usd:.4f}"
            )

        @crewai_event_bus.on(AgentExecutionStarted)
        def on_agent_start(source, event):
            agent_role = event.agent.role
            if agent_role not in self.metrics.agent_metrics:
                self.metrics.agent_metrics[agent_role] = {
                    "tokens": 0, "tool_calls": 0, "errors": 0, "duration": 0.0,
                }
            logger.info(f"[{self.metrics.crew_id}] Agente '{agent_role}' iniciado")

        @crewai_event_bus.on(AgentExecutionCompleted)
        def on_agent_complete(source, event):
            agent_role = event.agent.role
            token_usage = getattr(event, "token_usage", None)
            if token_usage:
                prompt_t = token_usage.get("prompt_tokens", 0)
                completion_t = token_usage.get("completion_tokens", 0)
                total_t = prompt_t + completion_t

                self.metrics.prompt_tokens += prompt_t
                self.metrics.completion_tokens += completion_t
                self.metrics.total_tokens += total_t
                self.metrics.agent_metrics[agent_role]["tokens"] += total_t

                # Calcular coste
                pricing = MODEL_PRICING.get(self.model, MODEL_PRICING["gpt-4o"])
                cost = (prompt_t / 1_000_000 * pricing["input"] +
                        completion_t / 1_000_000 * pricing["output"])
                self.metrics.total_cost_usd += cost

        @crewai_event_bus.on(TaskEvaluationStarted)
        def on_task_start(source, event):
            task_desc = event.task.description[:50]
            self._task_start_times[task_desc] = time.time()

        @crewai_event_bus.on(TaskEvaluationCompleted)
        def on_task_complete(source, event):
            task_desc = event.task.description[:50]
            start = self._task_start_times.get(task_desc, time.time())
            duration = time.time() - start
            self.metrics.task_durations.append({
                "task": task_desc,
                "duration": duration,
            })
            self.metrics.tasks_completed += 1

        @crewai_event_bus.on(ToolUsageStarted)
        def on_tool_start(source, event):
            self.metrics.tool_calls += 1

        @crewai_event_bus.on(ToolUsageError)
        def on_tool_error(source, event):
            self.metrics.tool_errors += 1
            logger.warning(
                f"[{self.metrics.crew_id}] Tool error: {event.error}"
            )

    def get_report(self) -> dict:
        """Generar reporte de la ejecución."""
        return {
            "crew_id": self.metrics.crew_id,
            "duration_seconds": self.metrics.total_duration,
            "tokens": {
                "total": self.metrics.total_tokens,
                "prompt": self.metrics.prompt_tokens,
                "completion": self.metrics.completion_tokens,
            },
            "cost_usd": round(self.metrics.total_cost_usd, 4),
            "tasks": {
                "completed": self.metrics.tasks_completed,
                "failed": self.metrics.tasks_failed,
            },
            "tools": {
                "calls": self.metrics.tool_calls,
                "errors": self.metrics.tool_errors,
            },
            "agent_breakdown": self.metrics.agent_metrics,
            "task_durations": self.metrics.task_durations,
        }

Uso del monitor con un crew

python
# run_monitored_crew.py
from crewai import Crew, Process, Agent, Task
from monitoring.callbacks import CrewMonitor
import uuid

# ── Crear crew con monitorización ──
def run_monitored_crew(topic: str) -> dict:
    crew_id = f"crew-{uuid.uuid4().hex[:8]}"
    monitor = CrewMonitor(crew_id=crew_id, model="gpt-4o")

    crew = Crew(
        agents=[researcher, writer, reviewer],
        tasks=[research_task, write_task, review_task],
        process=Process.sequential,
        verbose=False,
        memory=True,
        max_rpm=30,
    )

    result = crew.kickoff(inputs={"topic": topic})

    # Obtener reporte de métricas
    report = monitor.get_report()
    print(f"\n{'='*50}")
    print(f"Crew: {report['crew_id']}")
    print(f"Duración: {report['duration_seconds']:.1f}s")
    print(f"Tokens totales: {report['tokens']['total']:,}")
    print(f"Coste estimado: ${report['cost_usd']}")
    print(f"Tasks completadas: {report['tasks']['completed']}")
    print(f"Tool calls: {report['tools']['calls']} ({report['tools']['errors']} errores)")
    print(f"\nDesglose por agente:")
    for agent_name, metrics in report["agent_breakdown"].items():
        print(f"  {agent_name}: {metrics['tokens']:,} tokens, {metrics['tool_calls']} tools")

    return {"result": result.raw, "metrics": report}

Ejecución asíncrona con FastAPI + Celery

Para servir crews desde una API web, la ejecución debe ser asíncrona: el endpoint acepta el request, encola el trabajo en Celery, y el cliente consulta el estado por polling o webhook.

python
# api/main.py — FastAPI con ejecución asíncrona de crews
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery import Celery
import redis
import json
import uuid

app = FastAPI(title="CrewAI API Service")

# ── Celery para tareas en background ──
celery_app = Celery(
    "crews",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",
)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.task_time_limit = 600      # 10 min máximo por crew
celery_app.conf.task_soft_time_limit = 540  # Aviso a los 9 min

# Redis para estado y resultados
redis_client = redis.Redis(host="redis", port=6379, db=2, decode_responses=True)

class CrewRequest(BaseModel):
    topic: str
    process: str = "sequential"    # sequential | hierarchical
    max_rpm: int = 30
    webhook_url: str | None = None  # URL para notificar al completar

class CrewStatusResponse(BaseModel):
    crew_id: str
    status: str  # queued, running, completed, failed
    result: dict | None = None
    metrics: dict | None = None

# ── Task de Celery que ejecuta el crew ──
@celery_app.task(bind=True, name="run_crew")
def run_crew_task(self, crew_id: str, topic: str, process: str, max_rpm: int):
    """Ejecutar crew en un worker de Celery."""
    from crewai import Crew, Process as CrewProcess
    from monitoring.callbacks import CrewMonitor
    import httpx

    # Actualizar estado
    redis_client.hset(f"crew:{crew_id}", "status", "running")

    monitor = CrewMonitor(crew_id=crew_id, model="gpt-4o")

    try:
        crew = Crew(
            agents=[researcher, writer, reviewer],
            tasks=[research_task, write_task, review_task],
            process=CrewProcess.sequential if process == "sequential" else CrewProcess.hierarchical,
            verbose=False,
            memory=True,
            max_rpm=max_rpm,
        )

        result = crew.kickoff(inputs={"topic": topic})
        report = monitor.get_report()

        # Guardar resultado en Redis
        redis_client.hset(f"crew:{crew_id}", mapping={
            "status": "completed",
            "result": json.dumps({"output": result.raw}),
            "metrics": json.dumps(report),
        })
        redis_client.expire(f"crew:{crew_id}", 86400)  # TTL 24h

        return {"crew_id": crew_id, "status": "completed"}

    except Exception as e:
        redis_client.hset(f"crew:{crew_id}", mapping={
            "status": "failed",
            "error": str(e),
        })
        raise

# ── Endpoints de la API ──
@app.post("/crew/run", response_model=CrewStatusResponse)
async def start_crew(req: CrewRequest):
    """Iniciar ejecución de un crew (asíncrono)."""
    crew_id = f"crew-{uuid.uuid4().hex[:8]}"

    # Guardar estado inicial
    redis_client.hset(f"crew:{crew_id}", mapping={
        "status": "queued",
        "topic": req.topic,
    })

    # Encolar en Celery
    run_crew_task.delay(crew_id, req.topic, req.process, req.max_rpm)

    return CrewStatusResponse(crew_id=crew_id, status="queued")

@app.get("/crew/{crew_id}", response_model=CrewStatusResponse)
async def get_crew_status(crew_id: str):
    """Consultar estado de un crew."""
    data = redis_client.hgetall(f"crew:{crew_id}")
    if not data:
        raise HTTPException(status_code=404, detail="Crew not found")

    return CrewStatusResponse(
        crew_id=crew_id,
        status=data.get("status", "unknown"),
        result=json.loads(data["result"]) if "result" in data else None,
        metrics=json.loads(data["metrics"]) if "metrics" in data else None,
    )

@app.get("/crews/stats")
async def get_stats():
    """Estadísticas globales de ejecución."""
    keys = redis_client.keys("crew:*")
    stats = {"total": len(keys), "queued": 0, "running": 0, "completed": 0, "failed": 0}
    for key in keys:
        status = redis_client.hget(key, "status")
        if status in stats:
            stats[status] += 1
    return stats

Docker Compose para el stack completo

yaml
# docker-compose.yaml
services:
  api:
    build: .
    command: uvicorn api.main:app --host 0.0.0.0 --port 8000
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_HOST=redis
    depends_on:
      - redis

  worker:
    build: .
    command: celery -A api.main.celery_app worker --loglevel=info --concurrency=4
    deploy:
      replicas: 2   # 2 workers × 4 concurrency = 8 crews simultáneos
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_HOST=redis
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

volumes:
  redis-data:
MétricaQué monitorizarAlerta recomendada
Tokens por crewTotal, prompt y completion por ejecuciónAlertar si supera 50K tokens por crew
Coste por ejecuciónUSD estimado por crew basado en modeloAlertar si una ejecución supera $0.50
DuraciónTiempo total y por task individualAlertar si supera 5 minutos
Tool errorsRatio de errores de tools por crewAlertar si error rate supera 20%
Queue depthCrews en cola esperando workerAlertar si supera 10 crews encolados
Worker healthWorkers activos vs necesariosAlertar si hay menos de 2 workers

Concurrency de Celery workers

Cada worker de Celery puede ejecutar N crews simultáneos con --concurrency=N. Pero cuidado: cada crew hace múltiples llamadas a la API del LLM. Si tienes 4 workers con concurrency=4, puedes tener hasta 16 crews ejecutando a la vez, lo que puede saturar tu rate limit de OpenAI. Ajusta max_rpm del crew y --concurrency del worker en proporción a tu rate limit disponible.

END OF DOCUMENT

¿Necesitas más? Volver a la Librería →