Lab 24 — AIOps Incident Remediation
Flujo AIOps end-to-end: una alerta de producción es diagnosticada por la plataforma de observabilidad, Architect lee el diagnóstico vía MCP, aplica un hotfix mínimo, ejecuta tests, y crea un PR con trazabilidad OTel completa.
Setup
Nivel: Full-Stack
Duración estimada: 40 minutos. Features: MCP, run, guardrails, reports, budget, OTel.
mkdir -p ~/architect-labs/lab-24 && cd ~/architect-labs/lab-24
git init && mkdir -p src tests mcp-servers incidents reports Crear servicio con bugs latentes
src/queue_processor.py
import time
from collections import deque
from typing import Any, Optional
from dataclasses import dataclass, field
@dataclass
class QueueItem:
id: str
payload: Any
retries: int = 0
max_retries: int = 3
created_at: float = field(default_factory=time.time)
class QueueProcessor:
def __init__(self, max_size: int = 10000):
self._queue: deque = deque()
self._processing: dict[str, QueueItem] = {}
self._dead_letter: list[QueueItem] = []
self.max_size = max_size
self.processed_count = 0
def enqueue(self, item_id: str, payload: Any) -> bool:
if len(self._queue) >= self.max_size:
return False
self._queue.append(QueueItem(id=item_id, payload=payload))
return True
def dequeue(self) -> Optional[QueueItem]:
if not self._queue:
return None
item = self._queue.popleft()
# BUG: Memory leak — items added to _processing but never removed
self._processing[item.id] = item
return item
def ack(self, item_id: str) -> bool:
"""Acknowledge successful processing."""
if item_id in self._processing:
# BUG: Missing del self._processing[item_id]
# This causes memory leak as _processing grows unboundedly
self.processed_count += 1
return True
return False
def nack(self, item_id: str) -> bool:
"""Negative acknowledge — retry or dead letter."""
item = self._processing.get(item_id)
if item is None:
return False
del self._processing[item_id]
item.retries += 1
if item.retries >= item.max_retries:
self._dead_letter.append(item)
else:
self._queue.appendleft(item)
return True
def get_stats(self) -> dict:
return {
"queue_size": len(self._queue),
"processing": len(self._processing),
"dead_letter": len(self._dead_letter),
"processed_total": self.processed_count
}
def get_processing_items(self) -> list[str]:
"""Returns IDs of items currently being processed."""
return list(self._processing.keys())
def cleanup_stale(self, max_age_seconds: int = 300) -> int:
"""Remove items stuck in processing for too long."""
now = time.time()
stale_ids = [
item_id for item_id, item in self._processing.items()
if now - item.created_at > max_age_seconds
]
for item_id in stale_ids:
item = self._processing.pop(item_id)
self._dead_letter.append(item)
return len(stale_ids) Bug latente
El método ack() incrementa processed_count pero nunca elimina el item de self._processing. Esto causa un memory leak que en producción provoca OOM después de procesar miles de items.
tests/test_queue.py
import pytest
from src.queue_processor import QueueProcessor
@pytest.fixture
def processor():
return QueueProcessor(max_size=100)
def test_enqueue_dequeue(processor):
processor.enqueue("1", {"data": "test"})
item = processor.dequeue()
assert item.id == "1"
def test_ack_removes_from_processing(processor):
processor.enqueue("1", {"data": "test"})
processor.dequeue()
assert len(processor.get_processing_items()) == 1
processor.ack("1")
# After ack, item should be removed from processing
assert len(processor.get_processing_items()) == 0
def test_memory_leak_simulation(processor):
"""Simulate 1000 processed items and verify memory doesn't grow."""
for i in range(1000):
processor.enqueue(str(i), {"data": i})
for i in range(1000):
item = processor.dequeue()
processor.ack(item.id)
stats = processor.get_stats()
# processing should be 0 after all items are ack'd
assert stats["processing"] == 0
assert stats["processed_total"] == 1000
def test_nack_retry(processor):
processor.enqueue("1", {"data": "test"})
item = processor.dequeue()
processor.nack("1")
assert len(processor._queue) == 1
def test_dead_letter_after_max_retries(processor):
processor.enqueue("1", {"data": "test"})
for _ in range(4):
item = processor.dequeue()
processor.nack(item.id)
assert len(processor._dead_letter) == 1 MCP Server mock (observability platform)
mcp-servers/observability.py
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
INCIDENTS = {
"INC-5001": {
"id": "INC-5001",
"title": "Memory leak in QueueProcessor",
"severity": "P1",
"status": "investigating",
"root_cause_analysis": {
"component": "QueueProcessor",
"file": "src/queue_processor.py",
"function": "ack",
"diagnosis": (
"Memory leak: ack() increments processed_count but never "
"removes items from self._processing dict. Over time, "
"_processing grows unboundedly causing OOM."
),
"evidence": [
"Processing dict size grew from 0 to 50,000 in 2 hours",
"Memory usage increased linearly with throughput",
"No items ever removed from _processing on successful ack"
],
"suggested_fix": (
"Add 'del self._processing[item_id]' before incrementing "
"processed_count in ack() method"
)
},
"logs": [
"[ERROR] OOM killer invoked on queue-processor pod",
"[WARN] QueueProcessor._processing size: 50432 (expected: <100)",
"[INFO] Last successful ack: item-49999, processing dict not shrinking"
]
}
}
class Handler(BaseHTTPRequestHandler):
def do_POST(self):
body = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
method = body.get("method", "")
params = body.get("params", {})
if method == "tools/list":
result = {"tools": [
{
"name": "get_incident",
"description": "Get incident details and RCA",
"inputSchema": {
"type": "object",
"properties": {"incident_id": {"type": "string"}},
"required": ["incident_id"]
}
},
{
"name": "get_incident_logs",
"description": "Get logs for an incident",
"inputSchema": {
"type": "object",
"properties": {"incident_id": {"type": "string"}},
"required": ["incident_id"]
}
},
{
"name": "acknowledge_fix",
"description": "Mark incident as fix applied",
"inputSchema": {
"type": "object",
"properties": {
"incident_id": {"type": "string"},
"fix_summary": {"type": "string"}
},
"required": ["incident_id", "fix_summary"]
}
}
]}
elif method == "tools/call":
name = params.get("name", "")
args = params.get("arguments", {})
if name == "get_incident":
inc = INCIDENTS.get(args["incident_id"])
result = {"content": [{"type": "text",
"text": json.dumps(inc, indent=2)}]}
elif name == "get_incident_logs":
inc = INCIDENTS.get(args["incident_id"])
logs = inc["logs"] if inc else []
result = {"content": [{"type": "text",
"text": json.dumps(logs, indent=2)}]}
elif name == "acknowledge_fix":
result = {"content": [{"type": "text",
"text": f"Incident {args['incident_id']} marked as fix_applied"}]}
else:
result = {"error": "unknown tool"}
else:
result = {}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
response = {"jsonrpc": "2.0", "id": body.get("id"), "result": result}
self.wfile.write(json.dumps(response).encode())
def log_message(self, *a):
pass
if __name__ == "__main__":
print("Observability MCP on :8092")
HTTPServer(("localhost", 8092), Handler).serve_forever() Configuración
.architect.yaml
llm:
model: openai/gpt-4.1
api_base: http://localhost:4000/v1
api_key_env: LITELLM_API_KEY
mcp:
servers:
- name: observability
url: http://localhost:8092/mcp
auth:
type: none
guardrails:
max_files_modified: 3
protected_files:
- "*.env*"
- "config/production.*"
- "k8s/**"
- "Dockerfile*"
- "*.lock"
blocked_commands:
- "kubectl apply"
- "docker push"
code_rules:
- pattern: 'eval\('
severity: block
costs:
budget_usd: 1.00
telemetry:
enabled: true
exporter: json-file
endpoint: traces/ mkdir -p traces
git add -A && git commit -m "initial: queue processor with memory leak" Paso 1: Verificar que los tests fallan
export PYTHONPATH=.
pytest tests/ -v
# test_ack_removes_from_processing y test_memory_leak_simulation fallan Paso 2: Arrancar MCP server
python mcp-servers/observability.py &
echo "Observability MCP running" Paso 3: Script de incident remediation
incidents/auto-fix.sh
#!/bin/bash
INCIDENT_ID=$1
echo "Processing incident: $INCIDENT_ID"
architect run \
"Un incidente de producción ha sido diagnosticado. \
Incident ID: ${INCIDENT_ID}. \
Usa la tool MCP get_incident para obtener el diagnóstico completo. \
Usa get_incident_logs para ver los logs. \
Identifica el archivo y función afectados. \
Aplica el fix MÍNIMO necesario. \
Ejecuta solo los tests del módulo afectado. \
Al terminar, usa acknowledge_fix para marcar el incidente." \
--config .architect.yaml \
--confirm-mode yolo \
--budget 1.00 \
--report-file "incidents/${INCIDENT_ID}-report.json"
EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
echo "OK: Fix applied for $INCIDENT_ID"
git checkout -b "hotfix/${INCIDENT_ID}"
git add -A
git commit -m "hotfix(${INCIDENT_ID}): auto-remediation via architect"
echo "Branch created: hotfix/${INCIDENT_ID}"
else
echo "FAIL: Auto-fix failed for $INCIDENT_ID — escalating"
fi chmod +x incidents/auto-fix.sh Consejo
Este script puede integrarse con PagerDuty, OpsGenie o cualquier herramienta de alertas. Cuando llega un incidente P1 con diagnóstico automático, el script se ejecuta y crea un hotfix branch.
Paso 4: Ejecutar
bash incidents/auto-fix.sh INC-5001 Paso 5: Verificar
# Tests pasan
pytest tests/ -v
# Ver qué cambió
git diff main -- src/queue_processor.py
# Ver report
cat incidents/INC-5001-report.json | python -m json.tool | head -20
# Ver traza OTel
ls traces/
cat traces/*.json | python -m json.tool | head -30
# Verificar que el fix es mínimo
git diff --stat main
# Solo 1 archivo, pocas líneas Paso 6: Verificar guardrails
# Intentar fix que toque demasiados archivos
architect run "Refactoriza todo el sistema de colas" \
--config .architect.yaml --confirm-mode yolo
# max_files_modified: 3 limita el alcance Cleanup
kill %1 2>/dev/null Resumen
Arquitectura completa
Alert → Observability MCP → architect run → Fix code → Run tests → Create PR
| (fail)
+→ Escalate to on-call | Componente | Rol |
|---|---|
| MCP | Lee diagnóstico de la plataforma AIOps |
| run | Ejecución única: un incidente, un fix |
| Guardrails | Ultra-estrictos: max 3 archivos, no deploy |
| Budget | $1.00 hard limit |
| OTel | Traza linkeable al incidente |
| Report | JSON para post-mortem |
Felicidades
Has completado todos los 24 Architect Labs. Desde un simple architect run hasta arquitecturas completas de AIOps con MCP, OTel, y guardrails de producción.