CrewAI Multi-Agent
Guía para construir sistemas multi-agente con CrewAI. Cubre la definición de agentes especializados, tareas, herramientas, flujos de trabajo secuenciales e jerárquicos, y memoria compartida.
CrewAI Overview
CrewAI permite crear equipos de agentes de IA que colaboran para resolver tareas complejas. Cada agente tiene un rol, objetivo, backstory y herramientas específicas.
Arquitectura CrewAI
┌──────────────────────────────────────────────────────────┐
│ CREW │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ │ Researcher │ │ Writer │ │ Reviewer │ │
│ │ │ │ │ │ │ │
│ │ Tools: │ │ Tools: │ │ Tools: │ │
│ │ - Search │ │ - FileWrite │ │ - CodeExec │ │
│ │ - Scrape │ │ - Format │ │ - Lint │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ┌──────▼─────────────────▼──────────────────▼───────┐ │
│ │ Task Pipeline │ │
│ │ Task 1 (Research) → Task 2 (Write) → Task 3 (QA)│ │
│ └───────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Process: sequential | hierarchical │ │
│ │ Memory: shared context between agents │ │
│ │ LLM: GPT-4o / Llama / Mixtral │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘ # Instalación
pip install crewai crewai-tools Agents & Roles
Cada agente tiene un rol claro, un objetivo, y una backstory que define su personalidad y expertise.
from crewai import Agent
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
# ── Agente investigador ──
researcher = Agent(
role="Senior Tech Researcher",
goal="Investigar las últimas tendencias y mejores prácticas en {topic}",
backstory="""Eres un investigador técnico senior con 10 años de experiencia
analizando tecnologías emergentes. Eres meticuloso y siempre citas fuentes.
Tu especialidad es separar el hype de lo realmente útil.""",
tools=[SerperDevTool(), ScrapeWebsiteTool()],
verbose=True,
memory=True,
max_iter=5,
)
# ── Agente escritor ──
writer = Agent(
role="Technical Writer",
goal="Crear documentación clara y completa sobre {topic}",
backstory="""Eres un escritor técnico que transforma investigaciones complejas
en guías prácticas y bien estructuradas. Priorizas ejemplos de código
funcionales sobre explicaciones teóricas.""",
verbose=True,
memory=True,
)
# ── Agente revisor ──
reviewer = Agent(
role="Code Reviewer & QA",
goal="Revisar la documentación para asegurar precisión técnica y calidad",
backstory="""Eres un ingeniero principal conocido por tu atención al detalle.
Detectas errores técnicos, inconsistencias y sugieres mejoras.
Nunca apruebas algo que no esté listo para producción.""",
verbose=True,
allow_delegation=True, # Puede pedir correcciones al writer
) Backstory matters
La backstory es crucial. Un agente con backstory “eres un junior aprendiendo” produce resultados muy diferentes a “eres un senior con 15 años de experiencia”. Sé específico.
Tasks & Workflows
Las Tasks definen el trabajo a realizar. Se asignan a agentes y se encadenan en pipelines secuenciales o jerárquicos.
from crewai import Task, Crew, Process
# ── Definir tareas ──
research_task = Task(
description="""Investiga {topic}:
1. Estado actual de la tecnología
2. Principales frameworks y herramientas
3. Casos de uso en producción
4. Mejores prácticas y anti-patterns
Incluye URLs de fuentes verificadas.""",
expected_output="Documento de investigación con al menos 5 fuentes citadas",
agent=researcher,
)
write_task = Task(
description="""Basándote en la investigación, crea una guía técnica:
- Estructura clara con secciones y subsecciones
- Ejemplos de código funcionales (Python)
- Tabla comparativa de herramientas
- Sección de troubleshooting""",
expected_output="Guía técnica lista para publicar en formato Markdown",
agent=writer,
context=[research_task], # Recibe output del researcher
)
review_task = Task(
description="""Revisa la guía técnica:
- Verifica que los ejemplos de código sean correctos
- Asegura consistencia terminológica
- Valida que las fuentes sean actuales (2024+)
- Score final del 1 al 10""",
expected_output="Reporte de revisión con score y lista de correcciones",
agent=reviewer,
context=[write_task],
)
# ── Crear y ejecutar crew ──
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.sequential, # Ejecutar en orden
verbose=True,
memory=True, # Memoria compartida entre agentes
)
result = crew.kickoff(inputs={"topic": "Kubernetes Operators"})
print(result) | Process | Descripción | Caso de uso |
|---|---|---|
sequential | Tareas se ejecutan en orden, cada una recibe el output anterior | Pipelines lineales (research → write → review) |
hierarchical | Un agente manager coordina y delega tareas dinámicamente | Tareas complejas con decisiones en runtime |
Custom Tools
Las herramientas dan a los agentes acceso al mundo real: APIs, bases de datos, sistemas de archivos, etc. CrewAI soporta tools propios y de LangChain.
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import httpx
# ── Tool personalizado: consultar API interna ──
class APIQueryInput(BaseModel):
endpoint: str = Field(description="Endpoint de la API (ej: /users, /orders)")
method: str = Field(default="GET", description="HTTP method")
class InternalAPITool(BaseTool):
name: str = "internal_api"
description: str = "Consulta la API interna del backend. Usa para obtener datos de producción."
args_schema = APIQueryInput
def _run(self, endpoint: str, method: str = "GET") -> str:
base = "https://api.internal.company.com"
response = httpx.request(method, f"{base}{endpoint}")
return response.text
# ── Tool para ejecutar SQL de solo lectura ──
class ReadOnlySQLTool(BaseTool):
name: str = "sql_query"
description: str = "Ejecuta queries SQL de solo lectura en la base de datos."
def _run(self, query: str) -> str:
if any(kw in query.upper() for kw in ["DROP", "DELETE", "UPDATE", "INSERT"]):
return "ERROR: Solo queries SELECT permitidas"
# Ejecutar query segura...
return execute_query(query) Production Patterns
Patrones para llevar crews de agentes a producción: gestión de errores, timeouts, costes y observabilidad.
from crewai import Crew, Process
import logging
# ── Configuración production-ready ──
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.sequential,
verbose=False, # Sin logs verbosos en prod
memory=True,
max_rpm=30, # Rate limit para API calls
share_crew=False, # No compartir telemetría
full_output=True, # Devolver output de todas las tasks
)
# ── Wrapper con error handling ──
async def run_crew_safe(topic: str) -> dict:
try:
result = crew.kickoff(inputs={"topic": topic})
return {
"status": "success",
"output": result.raw,
"tasks": [t.output.raw for t in result.tasks_output],
"token_usage": result.token_usage,
}
except Exception as e:
logging.error(f"Crew failed: {e}")
return {"status": "error", "error": str(e)} Costes de API
Un crew con 3 agentes y 3 tasks secuenciales puede hacer 15-30+
llamadas a la API del LLM por ejecución. Monitoriza
token_usage y configura max_rpm para controlar costes.
Monitoring & Scaling
Para usar CrewAI en producción, necesitas monitorización de cada ejecución (tokens, tiempo, errores por agente), estimación de costes en tiempo real, y una estrategia de escalado para servir múltiples crews concurrentes desde una API web.
Arquitectura de monitorización y escalado
┌────────────────────────────────────────────────────────────┐
│ FastAPI Web Service │
│ │
│ POST /crew/run ──▶ Celery Task Queue │
│ GET /crew/{id} ──▶ Redis (resultados) │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Callbacks & Monitoring │ │
│ │ on_task_start → log + metrics │ │
│ │ on_task_end → tokens + cost + duration │ │
│ │ on_agent_action → tool_calls tracking │ │
│ └─────────────────────────────────────────────────────┘ │
└────────────────┬───────────────────────────────────────────┘
│
┌────────────▼────────────┐
│ Celery Workers (N) │
│ Cada worker ejecuta │
│ un crew independiente │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Redis │
│ - Task queue │
│ - Results store │
│ - Metrics buffer │
└─────────────────────────┘ Callbacks para monitorización detallada
CrewAI emite eventos en cada paso de la ejecución. Los callbacks permiten capturar métricas granulares: tokens por agente, duración por task, herramientas usadas y errores.
# monitoring/callbacks.py
import time
import logging
from dataclasses import dataclass, field
from crewai.utilities.events import (
crewai_event_bus,
AgentExecutionStarted,
AgentExecutionCompleted,
TaskEvaluationStarted,
TaskEvaluationCompleted,
ToolUsageStarted,
ToolUsageFinished,
ToolUsageError,
CrewKickoffStarted,
CrewKickoffCompleted,
)
logger = logging.getLogger("crewai.monitor")
@dataclass
class CrewMetrics:
"""Métricas acumuladas de una ejecución del crew."""
crew_id: str
start_time: float = 0.0
end_time: float = 0.0
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
total_cost_usd: float = 0.0
tasks_completed: int = 0
tasks_failed: int = 0
tool_calls: int = 0
tool_errors: int = 0
agent_metrics: dict = field(default_factory=dict)
task_durations: list = field(default_factory=list)
@property
def total_duration(self) -> float:
return self.end_time - self.start_time if self.end_time else 0.0
# ── Precios por modelo (USD por 1M tokens) ──
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
}
class CrewMonitor:
"""Monitor de ejecución de crews con callbacks."""
def __init__(self, crew_id: str, model: str = "gpt-4o"):
self.metrics = CrewMetrics(crew_id=crew_id)
self.model = model
self._task_start_times: dict[str, float] = {}
self._register_callbacks()
def _register_callbacks(self):
"""Registrar listeners en el event bus de CrewAI."""
@crewai_event_bus.on(CrewKickoffStarted)
def on_crew_start(source, event):
self.metrics.start_time = time.time()
logger.info(f"[{self.metrics.crew_id}] Crew iniciado")
@crewai_event_bus.on(CrewKickoffCompleted)
def on_crew_complete(source, event):
self.metrics.end_time = time.time()
duration = self.metrics.total_duration
logger.info(
f"[{self.metrics.crew_id}] Crew completado en {duration:.1f}s | "
f"Tokens: {self.metrics.total_tokens} | "
f"Coste: ${self.metrics.total_cost_usd:.4f}"
)
@crewai_event_bus.on(AgentExecutionStarted)
def on_agent_start(source, event):
agent_role = event.agent.role
if agent_role not in self.metrics.agent_metrics:
self.metrics.agent_metrics[agent_role] = {
"tokens": 0, "tool_calls": 0, "errors": 0, "duration": 0.0,
}
logger.info(f"[{self.metrics.crew_id}] Agente '{agent_role}' iniciado")
@crewai_event_bus.on(AgentExecutionCompleted)
def on_agent_complete(source, event):
agent_role = event.agent.role
token_usage = getattr(event, "token_usage", None)
if token_usage:
prompt_t = token_usage.get("prompt_tokens", 0)
completion_t = token_usage.get("completion_tokens", 0)
total_t = prompt_t + completion_t
self.metrics.prompt_tokens += prompt_t
self.metrics.completion_tokens += completion_t
self.metrics.total_tokens += total_t
self.metrics.agent_metrics[agent_role]["tokens"] += total_t
# Calcular coste
pricing = MODEL_PRICING.get(self.model, MODEL_PRICING["gpt-4o"])
cost = (prompt_t / 1_000_000 * pricing["input"] +
completion_t / 1_000_000 * pricing["output"])
self.metrics.total_cost_usd += cost
@crewai_event_bus.on(TaskEvaluationStarted)
def on_task_start(source, event):
task_desc = event.task.description[:50]
self._task_start_times[task_desc] = time.time()
@crewai_event_bus.on(TaskEvaluationCompleted)
def on_task_complete(source, event):
task_desc = event.task.description[:50]
start = self._task_start_times.get(task_desc, time.time())
duration = time.time() - start
self.metrics.task_durations.append({
"task": task_desc,
"duration": duration,
})
self.metrics.tasks_completed += 1
@crewai_event_bus.on(ToolUsageStarted)
def on_tool_start(source, event):
self.metrics.tool_calls += 1
@crewai_event_bus.on(ToolUsageError)
def on_tool_error(source, event):
self.metrics.tool_errors += 1
logger.warning(
f"[{self.metrics.crew_id}] Tool error: {event.error}"
)
def get_report(self) -> dict:
"""Generar reporte de la ejecución."""
return {
"crew_id": self.metrics.crew_id,
"duration_seconds": self.metrics.total_duration,
"tokens": {
"total": self.metrics.total_tokens,
"prompt": self.metrics.prompt_tokens,
"completion": self.metrics.completion_tokens,
},
"cost_usd": round(self.metrics.total_cost_usd, 4),
"tasks": {
"completed": self.metrics.tasks_completed,
"failed": self.metrics.tasks_failed,
},
"tools": {
"calls": self.metrics.tool_calls,
"errors": self.metrics.tool_errors,
},
"agent_breakdown": self.metrics.agent_metrics,
"task_durations": self.metrics.task_durations,
} Uso del monitor con un crew
# run_monitored_crew.py
from crewai import Crew, Process, Agent, Task
from monitoring.callbacks import CrewMonitor
import uuid
# ── Crear crew con monitorización ──
def run_monitored_crew(topic: str) -> dict:
crew_id = f"crew-{uuid.uuid4().hex[:8]}"
monitor = CrewMonitor(crew_id=crew_id, model="gpt-4o")
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.sequential,
verbose=False,
memory=True,
max_rpm=30,
)
result = crew.kickoff(inputs={"topic": topic})
# Obtener reporte de métricas
report = monitor.get_report()
print(f"\n{'='*50}")
print(f"Crew: {report['crew_id']}")
print(f"Duración: {report['duration_seconds']:.1f}s")
print(f"Tokens totales: {report['tokens']['total']:,}")
print(f"Coste estimado: ${report['cost_usd']}")
print(f"Tasks completadas: {report['tasks']['completed']}")
print(f"Tool calls: {report['tools']['calls']} ({report['tools']['errors']} errores)")
print(f"\nDesglose por agente:")
for agent_name, metrics in report["agent_breakdown"].items():
print(f" {agent_name}: {metrics['tokens']:,} tokens, {metrics['tool_calls']} tools")
return {"result": result.raw, "metrics": report} Ejecución asíncrona con FastAPI + Celery
Para servir crews desde una API web, la ejecución debe ser asíncrona: el endpoint acepta el request, encola el trabajo en Celery, y el cliente consulta el estado por polling o webhook.
# api/main.py — FastAPI con ejecución asíncrona de crews
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery import Celery
import redis
import json
import uuid
app = FastAPI(title="CrewAI API Service")
# ── Celery para tareas en background ──
celery_app = Celery(
"crews",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1",
)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.task_time_limit = 600 # 10 min máximo por crew
celery_app.conf.task_soft_time_limit = 540 # Aviso a los 9 min
# Redis para estado y resultados
redis_client = redis.Redis(host="redis", port=6379, db=2, decode_responses=True)
class CrewRequest(BaseModel):
topic: str
process: str = "sequential" # sequential | hierarchical
max_rpm: int = 30
webhook_url: str | None = None # URL para notificar al completar
class CrewStatusResponse(BaseModel):
crew_id: str
status: str # queued, running, completed, failed
result: dict | None = None
metrics: dict | None = None
# ── Task de Celery que ejecuta el crew ──
@celery_app.task(bind=True, name="run_crew")
def run_crew_task(self, crew_id: str, topic: str, process: str, max_rpm: int):
"""Ejecutar crew en un worker de Celery."""
from crewai import Crew, Process as CrewProcess
from monitoring.callbacks import CrewMonitor
import httpx
# Actualizar estado
redis_client.hset(f"crew:{crew_id}", "status", "running")
monitor = CrewMonitor(crew_id=crew_id, model="gpt-4o")
try:
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=CrewProcess.sequential if process == "sequential" else CrewProcess.hierarchical,
verbose=False,
memory=True,
max_rpm=max_rpm,
)
result = crew.kickoff(inputs={"topic": topic})
report = monitor.get_report()
# Guardar resultado en Redis
redis_client.hset(f"crew:{crew_id}", mapping={
"status": "completed",
"result": json.dumps({"output": result.raw}),
"metrics": json.dumps(report),
})
redis_client.expire(f"crew:{crew_id}", 86400) # TTL 24h
return {"crew_id": crew_id, "status": "completed"}
except Exception as e:
redis_client.hset(f"crew:{crew_id}", mapping={
"status": "failed",
"error": str(e),
})
raise
# ── Endpoints de la API ──
@app.post("/crew/run", response_model=CrewStatusResponse)
async def start_crew(req: CrewRequest):
"""Iniciar ejecución de un crew (asíncrono)."""
crew_id = f"crew-{uuid.uuid4().hex[:8]}"
# Guardar estado inicial
redis_client.hset(f"crew:{crew_id}", mapping={
"status": "queued",
"topic": req.topic,
})
# Encolar en Celery
run_crew_task.delay(crew_id, req.topic, req.process, req.max_rpm)
return CrewStatusResponse(crew_id=crew_id, status="queued")
@app.get("/crew/{crew_id}", response_model=CrewStatusResponse)
async def get_crew_status(crew_id: str):
"""Consultar estado de un crew."""
data = redis_client.hgetall(f"crew:{crew_id}")
if not data:
raise HTTPException(status_code=404, detail="Crew not found")
return CrewStatusResponse(
crew_id=crew_id,
status=data.get("status", "unknown"),
result=json.loads(data["result"]) if "result" in data else None,
metrics=json.loads(data["metrics"]) if "metrics" in data else None,
)
@app.get("/crews/stats")
async def get_stats():
"""Estadísticas globales de ejecución."""
keys = redis_client.keys("crew:*")
stats = {"total": len(keys), "queued": 0, "running": 0, "completed": 0, "failed": 0}
for key in keys:
status = redis_client.hget(key, "status")
if status in stats:
stats[status] += 1
return stats Docker Compose para el stack completo
# docker-compose.yaml
services:
api:
build: .
command: uvicorn api.main:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_HOST=redis
depends_on:
- redis
worker:
build: .
command: celery -A api.main.celery_app worker --loglevel=info --concurrency=4
deploy:
replicas: 2 # 2 workers × 4 concurrency = 8 crews simultáneos
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_HOST=redis
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
volumes:
redis-data: | Métrica | Qué monitorizar | Alerta recomendada |
|---|---|---|
| Tokens por crew | Total, prompt y completion por ejecución | Alertar si supera 50K tokens por crew |
| Coste por ejecución | USD estimado por crew basado en modelo | Alertar si una ejecución supera $0.50 |
| Duración | Tiempo total y por task individual | Alertar si supera 5 minutos |
| Tool errors | Ratio de errores de tools por crew | Alertar si error rate supera 20% |
| Queue depth | Crews en cola esperando worker | Alertar si supera 10 crews encolados |
| Worker health | Workers activos vs necesarios | Alertar si hay menos de 2 workers |
Concurrency de Celery workers
Cada worker de Celery puede ejecutar N crews simultáneos con --concurrency=N.
Pero cuidado: cada crew hace múltiples llamadas a la API del LLM. Si tienes
4 workers con concurrency=4, puedes tener hasta 16 crews ejecutando a la vez,
lo que puede saturar tu rate limit de OpenAI. Ajusta max_rpm del crew y
--concurrency del worker en proporción a tu rate limit disponible.