Docs / Architect Labs / Lab 24

Lab 24 — AIOps Incident Remediation

Flujo AIOps end-to-end: una alerta de producción es diagnosticada por la plataforma de observabilidad, Architect lee el diagnóstico vía MCP, aplica un hotfix mínimo, ejecuta tests, y crea un PR con trazabilidad OTel completa.

Setup

Nivel: Full-Stack

Duración estimada: 40 minutos. Features: MCP, run, guardrails, reports, budget, OTel.

bash
mkdir -p ~/architect-labs/lab-24 && cd ~/architect-labs/lab-24
git init && mkdir -p src tests mcp-servers incidents reports

Crear servicio con bugs latentes

src/queue_processor.py

python
import time
from collections import deque
from typing import Any, Optional
from dataclasses import dataclass, field

@dataclass
class QueueItem:
    id: str
    payload: Any
    retries: int = 0
    max_retries: int = 3
    created_at: float = field(default_factory=time.time)

class QueueProcessor:
    def __init__(self, max_size: int = 10000):
        self._queue: deque = deque()
        self._processing: dict[str, QueueItem] = {}
        self._dead_letter: list[QueueItem] = []
        self.max_size = max_size
        self.processed_count = 0

    def enqueue(self, item_id: str, payload: Any) -> bool:
        if len(self._queue) >= self.max_size:
            return False
        self._queue.append(QueueItem(id=item_id, payload=payload))
        return True

    def dequeue(self) -> Optional[QueueItem]:
        if not self._queue:
            return None
        item = self._queue.popleft()
        # BUG: Memory leak — items added to _processing but never removed
        self._processing[item.id] = item
        return item

    def ack(self, item_id: str) -> bool:
        """Acknowledge successful processing."""
        if item_id in self._processing:
            # BUG: Missing del self._processing[item_id]
            # This causes memory leak as _processing grows unboundedly
            self.processed_count += 1
            return True
        return False

    def nack(self, item_id: str) -> bool:
        """Negative acknowledge — retry or dead letter."""
        item = self._processing.get(item_id)
        if item is None:
            return False
        del self._processing[item_id]
        item.retries += 1
        if item.retries >= item.max_retries:
            self._dead_letter.append(item)
        else:
            self._queue.appendleft(item)
        return True

    def get_stats(self) -> dict:
        return {
            "queue_size": len(self._queue),
            "processing": len(self._processing),
            "dead_letter": len(self._dead_letter),
            "processed_total": self.processed_count
        }

    def get_processing_items(self) -> list[str]:
        """Returns IDs of items currently being processed."""
        return list(self._processing.keys())

    def cleanup_stale(self, max_age_seconds: int = 300) -> int:
        """Remove items stuck in processing for too long."""
        now = time.time()
        stale_ids = [
            item_id for item_id, item in self._processing.items()
            if now - item.created_at > max_age_seconds
        ]
        for item_id in stale_ids:
            item = self._processing.pop(item_id)
            self._dead_letter.append(item)
        return len(stale_ids)

Bug latente

El método ack() incrementa processed_count pero nunca elimina el item de self._processing. Esto causa un memory leak que en producción provoca OOM después de procesar miles de items.

tests/test_queue.py

python
import pytest
from src.queue_processor import QueueProcessor

@pytest.fixture
def processor():
    return QueueProcessor(max_size=100)

def test_enqueue_dequeue(processor):
    processor.enqueue("1", {"data": "test"})
    item = processor.dequeue()
    assert item.id == "1"

def test_ack_removes_from_processing(processor):
    processor.enqueue("1", {"data": "test"})
    processor.dequeue()
    assert len(processor.get_processing_items()) == 1
    processor.ack("1")
    # After ack, item should be removed from processing
    assert len(processor.get_processing_items()) == 0

def test_memory_leak_simulation(processor):
    """Simulate 1000 processed items and verify memory doesn't grow."""
    for i in range(1000):
        processor.enqueue(str(i), {"data": i})
    for i in range(1000):
        item = processor.dequeue()
        processor.ack(item.id)
    stats = processor.get_stats()
    # processing should be 0 after all items are ack'd
    assert stats["processing"] == 0
    assert stats["processed_total"] == 1000

def test_nack_retry(processor):
    processor.enqueue("1", {"data": "test"})
    item = processor.dequeue()
    processor.nack("1")
    assert len(processor._queue) == 1

def test_dead_letter_after_max_retries(processor):
    processor.enqueue("1", {"data": "test"})
    for _ in range(4):
        item = processor.dequeue()
        processor.nack(item.id)
    assert len(processor._dead_letter) == 1

MCP Server mock (observability platform)

mcp-servers/observability.py

python
import json
from http.server import HTTPServer, BaseHTTPRequestHandler

INCIDENTS = {
    "INC-5001": {
        "id": "INC-5001",
        "title": "Memory leak in QueueProcessor",
        "severity": "P1",
        "status": "investigating",
        "root_cause_analysis": {
            "component": "QueueProcessor",
            "file": "src/queue_processor.py",
            "function": "ack",
            "diagnosis": (
                "Memory leak: ack() increments processed_count but never "
                "removes items from self._processing dict. Over time, "
                "_processing grows unboundedly causing OOM."
            ),
            "evidence": [
                "Processing dict size grew from 0 to 50,000 in 2 hours",
                "Memory usage increased linearly with throughput",
                "No items ever removed from _processing on successful ack"
            ],
            "suggested_fix": (
                "Add 'del self._processing[item_id]' before incrementing "
                "processed_count in ack() method"
            )
        },
        "logs": [
            "[ERROR] OOM killer invoked on queue-processor pod",
            "[WARN] QueueProcessor._processing size: 50432 (expected: <100)",
            "[INFO] Last successful ack: item-49999, processing dict not shrinking"
        ]
    }
}

class Handler(BaseHTTPRequestHandler):
    def do_POST(self):
        body = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
        method = body.get("method", "")
        params = body.get("params", {})

        if method == "tools/list":
            result = {"tools": [
                {
                    "name": "get_incident",
                    "description": "Get incident details and RCA",
                    "inputSchema": {
                        "type": "object",
                        "properties": {"incident_id": {"type": "string"}},
                        "required": ["incident_id"]
                    }
                },
                {
                    "name": "get_incident_logs",
                    "description": "Get logs for an incident",
                    "inputSchema": {
                        "type": "object",
                        "properties": {"incident_id": {"type": "string"}},
                        "required": ["incident_id"]
                    }
                },
                {
                    "name": "acknowledge_fix",
                    "description": "Mark incident as fix applied",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "incident_id": {"type": "string"},
                            "fix_summary": {"type": "string"}
                        },
                        "required": ["incident_id", "fix_summary"]
                    }
                }
            ]}
        elif method == "tools/call":
            name = params.get("name", "")
            args = params.get("arguments", {})
            if name == "get_incident":
                inc = INCIDENTS.get(args["incident_id"])
                result = {"content": [{"type": "text",
                    "text": json.dumps(inc, indent=2)}]}
            elif name == "get_incident_logs":
                inc = INCIDENTS.get(args["incident_id"])
                logs = inc["logs"] if inc else []
                result = {"content": [{"type": "text",
                    "text": json.dumps(logs, indent=2)}]}
            elif name == "acknowledge_fix":
                result = {"content": [{"type": "text",
                    "text": f"Incident {args['incident_id']} marked as fix_applied"}]}
            else:
                result = {"error": "unknown tool"}
        else:
            result = {}

        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.end_headers()
        response = {"jsonrpc": "2.0", "id": body.get("id"), "result": result}
        self.wfile.write(json.dumps(response).encode())

    def log_message(self, *a):
        pass

if __name__ == "__main__":
    print("Observability MCP on :8092")
    HTTPServer(("localhost", 8092), Handler).serve_forever()

Configuración

.architect.yaml

yaml
llm:
  model: openai/gpt-4.1
  api_base: http://localhost:4000/v1
  api_key_env: LITELLM_API_KEY

mcp:
  servers:
    - name: observability
      url: http://localhost:8092/mcp
      auth:
        type: none

guardrails:
  max_files_modified: 3
  protected_files:
    - "*.env*"
    - "config/production.*"
    - "k8s/**"
    - "Dockerfile*"
    - "*.lock"
  blocked_commands:
    - "kubectl apply"
    - "docker push"
  code_rules:
    - pattern: 'eval\('
      severity: block

costs:
  budget_usd: 1.00

telemetry:
  enabled: true
  exporter: json-file
  endpoint: traces/
bash
mkdir -p traces
git add -A && git commit -m "initial: queue processor with memory leak"

Paso 1: Verificar que los tests fallan

bash
export PYTHONPATH=.
pytest tests/ -v
# test_ack_removes_from_processing y test_memory_leak_simulation fallan

Paso 2: Arrancar MCP server

bash
python mcp-servers/observability.py &
echo "Observability MCP running"

Paso 3: Script de incident remediation

incidents/auto-fix.sh

bash
#!/bin/bash
INCIDENT_ID=$1

echo "Processing incident: $INCIDENT_ID"

architect run \
  "Un incidente de producción ha sido diagnosticado. \
   Incident ID: ${INCIDENT_ID}. \
   Usa la tool MCP get_incident para obtener el diagnóstico completo. \
   Usa get_incident_logs para ver los logs. \
   Identifica el archivo y función afectados. \
   Aplica el fix MÍNIMO necesario. \
   Ejecuta solo los tests del módulo afectado. \
   Al terminar, usa acknowledge_fix para marcar el incidente." \
  --config .architect.yaml \
  --confirm-mode yolo \
  --budget 1.00 \
  --report-file "incidents/${INCIDENT_ID}-report.json"

EXIT_CODE=$?

if [ $EXIT_CODE -eq 0 ]; then
    echo "OK: Fix applied for $INCIDENT_ID"
    git checkout -b "hotfix/${INCIDENT_ID}"
    git add -A
    git commit -m "hotfix(${INCIDENT_ID}): auto-remediation via architect"
    echo "Branch created: hotfix/${INCIDENT_ID}"
else
    echo "FAIL: Auto-fix failed for $INCIDENT_ID — escalating"
fi
bash
chmod +x incidents/auto-fix.sh

Consejo

Este script puede integrarse con PagerDuty, OpsGenie o cualquier herramienta de alertas. Cuando llega un incidente P1 con diagnóstico automático, el script se ejecuta y crea un hotfix branch.

Paso 4: Ejecutar

bash
bash incidents/auto-fix.sh INC-5001

Paso 5: Verificar

bash
# Tests pasan
pytest tests/ -v

# Ver qué cambió
git diff main -- src/queue_processor.py

# Ver report
cat incidents/INC-5001-report.json | python -m json.tool | head -20

# Ver traza OTel
ls traces/
cat traces/*.json | python -m json.tool | head -30

# Verificar que el fix es mínimo
git diff --stat main
# Solo 1 archivo, pocas líneas

Paso 6: Verificar guardrails

bash
# Intentar fix que toque demasiados archivos
architect run "Refactoriza todo el sistema de colas" \
  --config .architect.yaml --confirm-mode yolo
# max_files_modified: 3 limita el alcance

Cleanup

bash
kill %1 2>/dev/null

Resumen

Arquitectura completa

text
Alert → Observability MCP → architect run → Fix code → Run tests → Create PR
                                  |  (fail)
                                  +→ Escalate to on-call
ComponenteRol
MCPLee diagnóstico de la plataforma AIOps
runEjecución única: un incidente, un fix
GuardrailsUltra-estrictos: max 3 archivos, no deploy
Budget$1.00 hard limit
OTelTraza linkeable al incidente
ReportJSON para post-mortem

Felicidades

Has completado todos los 24 Architect Labs. Desde un simple architect run hasta arquitecturas completas de AIOps con MCP, OTel, y guardrails de producción.

END OF DOCUMENT

¿Necesitas más? Volver a la Librería →