Docs / ML / MLFlow Versioning

MLFlow Versioning

Sistema completo de versionado de modelos de ML con MLFlow. Cubre el tracking de experimentos, el Model Registry, comparación de modelos, y despliegue en producción con serving integrado.

Setup & Architecture

MLFlow tiene cuatro componentes principales: Tracking (métricas y params), Projects (empaquetado), Models (formato estándar) y Registry (ciclo de vida).

text
Arquitectura de MLFlow en producción

┌──────────────┐     ┌──────────────────────────────────────┐
│  Data        │     │         MLFlow Server                │
│  Scientists  │────>│                                      │
│  (notebook)  │     │  ┌────────────┐  ┌───────────────┐  │
└──────────────┘     │  │  Tracking  │  │   Model       │  │
                     │  │  Server    │  │   Registry    │  │
┌──────────────┐     │  └─────┬──────┘  └───────┬───────┘  │
│  CI/CD       │────>│        │                │           │
│  Pipeline   │     └────────┼────────────────┼───────────┘
└──────────────┘              │                │
                     ┌────────▼──┐     ┌───────▼────────┐
                     │ PostgreSQL│     │  Artifact Store│
                     │ (metadata)│     │  (S3 / MinIO)  │
                     └───────────┘     └────────────────┘
bash
# Instalar MLFlow
pip install mlflow

# Iniciar servidor de tracking local
mlflow server \
  --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root ./mlruns \
  --host 0.0.0.0 \
  --port 5000

# Servidor de producción con PostgreSQL + S3
mlflow server \
  --backend-store-uri postgresql://user:pass@db:5432/mlflow \
  --default-artifact-root s3://mlflow-artifacts/ \
  --host 0.0.0.0 \
  --port 5000

Experiment Tracking

Cada run registra parámetros, métricas y artefactos. Los experimentos agrupan runs relacionados que exploran el mismo problema.

python
import mlflow
import mlflow.keras

# Configurar tracking server
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("image-classifier-v2")

# ── Training run con auto-tracking ──
with mlflow.start_run(run_name="efficientnet-fine-tune") as run:
    # Registrar parámetros
    mlflow.log_params({
        "model": "EfficientNetV2B0",
        "learning_rate": 1e-3,
        "batch_size": 32,
        "epochs": 50,
        "fine_tune_layers": 20,
        "augmentation": "flip+rotate+zoom",
    })

    # Entrenar modelo
    history = model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=50,
        callbacks=callbacks_list,
    )

    # Registrar métricas por epoch
    for epoch, (loss, acc) in enumerate(
        zip(history.history["loss"], history.history["accuracy"])
    ):
        mlflow.log_metrics({
            "train_loss": loss,
            "train_accuracy": acc,
        }, step=epoch)

    # Registrar métricas finales del test set
    test_loss, test_acc = model.evaluate(test_dataset)
    mlflow.log_metrics({
        "test_loss": test_loss,
        "test_accuracy": test_acc,
    })

    # Guardar modelo como artefacto
    mlflow.keras.log_model(
        model,
        artifact_path="model",
        registered_model_name="image-classifier",  # Auto-registra en Model Registry
    )

    # Guardar artefactos adicionales
    mlflow.log_artifact("config.yaml")
    mlflow.log_artifact("confusion_matrix.png")

    print(f"Run ID: {run.info.run_id}")

Model Registry

El Model Registry centraliza el ciclo de vida del modelo: desde la experimentación hasta la producción. Cada modelo tiene versiones con estados de transición.

text
Ciclo de vida del modelo en el Registry

 ┌─────────┐     ┌───────────┐     ┌──────────────┐     ┌──────────┐
 │  None   │────>│ Staging   │────>│ Production   │────>│ Archived │
 │(default)│     │(validación)│    │(tráfico real) │     │(retirado)│
 └─────────┘     └───────────┘     └──────────────┘     └──────────┘
                       │                   │
                       │    ┌──────────┐   │
                       └───>│ Rejected │<──┘
                            └──────────┘
python
from mlflow import MlflowClient

client = MlflowClient()

# Listar versiones de un modelo
versions = client.search_model_versions("name='image-classifier'")
for v in versions:
    print(f"Version {v.version} | Stage: {v.current_stage} | Run: {v.run_id}")

# Promover modelo a Staging
client.transition_model_version_stage(
    name="image-classifier",
    version="3",
    stage="Staging",
)

# Promover a Production (reemplaza la versión anterior)
client.transition_model_version_stage(
    name="image-classifier",
    version="3",
    stage="Production",
    archive_existing_versions=True,  # Archiva la versión en prod actual
)

Model Comparison

MLFlow permite comparar múltiples runs para identificar qué combinación de hiperparámetros y arquitectura produce los mejores resultados.

python
import mlflow

# Buscar los mejores runs de un experimento
experiment = mlflow.get_experiment_by_name("image-classifier-v2")

best_runs = mlflow.search_runs(
    experiment_ids=[experiment.experiment_id],
    filter_string="metrics.test_accuracy > 0.90",
    order_by=["metrics.test_accuracy DESC"],
    max_results=5,
)

# Tabla comparativa
print(best_runs[[
    "run_id",
    "params.model",
    "params.learning_rate",
    "metrics.test_accuracy",
    "metrics.test_loss",
]].to_string())

# ── Cargar modelo de un run específico ──
best_run_id = best_runs.iloc[0].run_id
model = mlflow.keras.load_model(f"runs:/{best_run_id}/model")

MLFlow UI

Accede al dashboard en http://mlflow-server:5000 para visualizar gráficas de métricas, comparar runs side-by-side, y navegar los artefactos. La UI es la forma más eficiente de explorar experimentos.

Deployment & Serving

MLFlow Models puede servir modelos directamente como APIs REST o empaquetarlos como contenedores Docker para despliegue en Kubernetes.

bash
# Servir modelo directamente desde MLFlow
mlflow models serve \
  -m "models:/image-classifier/Production" \
  --port 5001 \
  --no-conda

# Construir imagen Docker del modelo
mlflow models build-docker \
  -m "models:/image-classifier/Production" \
  -n "image-classifier-server" \
  --enable-mlserver

# Ejecutar contenedor
docker run -p 5001:8080 image-classifier-server

# Test con cURL
curl -X POST http://localhost:5001/invocations \
  -H "Content-Type: application/json" \
  -d '{"inputs": [[0.1, 0.2, ...]]}'

Versionado automático en CI

Integra MLFlow en tu pipeline de CI/CD: después de entrenar, registra automáticamente el modelo en el Registry. Usa GitHub Actions para promover a Staging después de validar las métricas contra un threshold definido.

Model Monitoring

Un modelo en producción se degrada con el tiempo. Los datos del mundo real cambian (data drift), nuevas categorías aparecen, y el comportamiento de usuarios evoluciona. El monitoreo continuo detecta estos problemas antes de que impacten al negocio.

text
Pipeline de monitoreo en producción

┌────────────┐     ┌─────────────────┐     ┌──────────────────┐
│ Predicción │────>│  Log de datos   │────>│  Drift Detection │
│ en prod    │     │  (features +    │     │  (batch diario)  │
│            │     │   predictions)  │     │                  │
└────────────┘     └─────────────────┘     └────────┬─────────┘

                          ┌─────────────────────────┼─────────────────┐
                          │                         │                 │
                   ┌──────▼──────┐   ┌──────────────▼──┐   ┌─────────▼────────┐
                   │ Dashboard   │   │  Alertas        │   │  Retrain trigger │
                   │ (Grafana)   │   │  (Slack/Email)  │   │  (si drift > th) │
                   └─────────────┘   └─────────────────┘   └──────────────────┘

Detección de Data Drift con Evidently AI

Data drift ocurre cuando la distribución de los datos de entrada en producción difiere significativamente de los datos de entrenamiento. Evidently AI genera reportes detallados de drift por cada feature.

bash
# Instalar Evidently AI
pip install evidently
python
import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
    DatasetDriftMetric,
    DataDriftTable,
    ColumnDriftMetric,
)

# ── Cargar datos de referencia (training) y producción ──
reference_data = pd.read_parquet("data/training_features.parquet")
production_data = pd.read_parquet("data/production_features_last_7d.parquet")

# Definir mapping de columnas
column_mapping = ColumnMapping(
    target="label",
    prediction="prediction",
    numerical_features=["feature_1", "feature_2", "feature_3", "feature_4"],
    categorical_features=["category", "region"],
)

# ── Generar reporte de drift ──
drift_report = Report(metrics=[
    DatasetDriftMetric(),      # Drift general del dataset
    DataDriftTable(),          # Drift por cada feature
    ColumnDriftMetric(column_name="feature_1"),  # Detalle de feature específica
])

drift_report.run(
    reference_data=reference_data,
    current_data=production_data,
    column_mapping=column_mapping,
)

# Guardar como HTML para revisión
drift_report.save_html("reports/drift_report.html")

# Extraer resultados programáticamente
report_dict = drift_report.as_dict()
dataset_drift = report_dict["metrics"][0]["result"]["dataset_drift"]
drift_share = report_dict["metrics"][0]["result"]["share_of_drifted_columns"]

print(f"Dataset drift detectado: {dataset_drift}")
print(f"Porcentaje de features con drift: {drift_share:.1%}")

Pipeline de monitoreo automatizado

Este script se ejecuta diariamente (vía cron o Airflow) y alerta al equipo cuando detecta degradación.

python
import mlflow
from datetime import datetime, timedelta
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric
from evidently.test_suite import TestSuite
from evidently.tests import (
    TestShareOfDriftedColumns,
    TestColumnDrift,
)

def run_monitoring_pipeline(
    reference_path: str,
    production_path: str,
    model_name: str,
    drift_threshold: float = 0.3,
    accuracy_threshold: float = 0.85,
):
    """Pipeline diario de monitoreo de modelo en producción."""

    reference = pd.read_parquet(reference_path)
    production = pd.read_parquet(production_path)

    # ── Test Suite: pasa/falla automático ──
    test_suite = TestSuite(tests=[
        TestShareOfDriftedColumns(lt=drift_threshold),
        TestColumnDrift(column_name="feature_1"),
        TestColumnDrift(column_name="feature_2"),
    ])

    test_suite.run(reference_data=reference, current_data=production)
    test_results = test_suite.as_dict()

    all_passed = test_results["summary"]["all_passed"]

    # ── Registrar resultados en MLFlow ──
    with mlflow.start_run(run_name=f"monitoring-{datetime.now():%Y%m%d}"):
        mlflow.log_metrics({
            "drift_share": test_results["summary"]["total_tests"],
            "tests_passed": int(all_passed),
            "production_samples": len(production),
        })
        mlflow.log_artifact("reports/drift_report.html")

    # ── Alertar si hay problemas ──
    if not all_passed:
        send_alert(
            channel="#ml-alerts",
            message=f"DRIFT DETECTADO en {model_name}. "
                    f"Tests fallidos. Revisar reporte en MLFlow.",
        )

    # ── Verificar accuracy en producción (si hay labels) ──
    if "ground_truth" in production.columns:
        prod_accuracy = (
            production["prediction"] == production["ground_truth"]
        ).mean()

        mlflow.log_metric("production_accuracy", prod_accuracy)

        if prod_accuracy < accuracy_threshold:
            send_alert(
                channel="#ml-alerts",
                message=f"DEGRADACION en {model_name}: "
                        f"accuracy={prod_accuracy:.3f} < {accuracy_threshold}",
            )

    return all_passed

def send_alert(channel: str, message: str):
    """Enviar alerta via Slack webhook."""
    import requests
    webhook_url = os.environ["SLACK_WEBHOOK_URL"]
    requests.post(webhook_url, json={"channel": channel, "text": message})
Métrica de monitoreoQué mideThreshold típicoAcción si se supera
PSI (Population Stability Index)Cambio en la distribución de una featurePSI > 0.2Investigar feature, posible retrain
KL DivergenceDivergencia entre distribuciones de referencia y prodKL > 0.1Revisar pipeline de datos upstream
Accuracy DecayCaída de accuracy respecto al baselineCaída > 5%Retrain con datos recientes
Prediction DriftCambio en la distribución de prediccionesDrift > 0.15Verificar data drift, posible concept drift
Missing Value RateAumento de valores nulos en featuresAumento > 2xRevisar fuente de datos
Latencia P99Tiempo de inferencia percentil 99P99 > 500msOptimizar modelo o escalar infra

Labels en producción

En la mayoría de escenarios reales, los ground truth labels llegan con retraso (días o semanas). Por eso el monitoreo de data drift (sin labels) es más práctico que el monitoreo de accuracy. Configura drift detection como tu primera línea de defensa y accuracy monitoring como segunda.

Automated ML Pipeline

Un pipeline automatizado de ML elimina la intervención manual en el ciclo de vida del modelo: desde que llegan datos nuevos hasta que un modelo mejorado se despliega en producción. La combinación de MLFlow + GitHub Actions permite implementar ML CI/CD sin infraestructura adicional.

text
Pipeline end-to-end de ML

┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│  Datos   │────>│  Train   │────>│ Evaluate │────>│ Register │────>│  Deploy  │
│  nuevos  │     │  modelo  │     │ métricas │     │ en MLFlow│     │  (auto)  │
└──────────┘     └──────────┘     └──────────┘     └──────────┘     └──────────┘
     │                │                │                │                │
     │           MLFlow run       Compare vs       Auto-promote     Blue/Green
     │           + artifacts      Production       si threshold     o A/B test
     ▼                                                 │
  Validación                                    ┌──────▼──────┐
  de calidad                                    │  Staging    │──> Tests de integración
  de datos                                      └─────────────┘

GitHub Actions workflow para ML CI/CD

Este workflow se activa con un push a la rama main que modifique datos o código de entrenamiento, o manualmente via workflow_dispatch.

yaml
# .github/workflows/ml-pipeline.yml
name: ML Training & Deployment Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'src/train.py'
      - 'src/model.py'
      - 'data/processed/**'
      - 'config.yaml'
  workflow_dispatch:
    inputs:
      force_deploy:
        description: 'Forzar deploy sin validar threshold'
        type: boolean
        default: false
  schedule:
    - cron: '0 2 * * 1'  # Retrain semanal los lunes a las 2am

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  MODEL_NAME: "image-classifier"
  ACCURACY_THRESHOLD: "0.92"

jobs:
  # ── Job 1: Validar datos ──
  validate-data:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Instalar dependencias
        run: pip install pandas great-expectations

      - name: Validar calidad de datos
        run: python src/validate_data.py

  # ── Job 2: Entrenar modelo ──
  train:
    needs: validate-data
    runs-on: [self-hosted, gpu]  # Runner con GPU
    outputs:
      run_id: ${{ steps.train.outputs.run_id }}
      test_accuracy: ${{ steps.train.outputs.test_accuracy }}
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Instalar dependencias
        run: pip install -r requirements.txt

      - name: Entrenar modelo
        id: train
        run: |
          python src/train.py \
            --config config.yaml \
            --experiment "${{ env.MODEL_NAME }}-ci" \
            --run-name "ci-${{ github.sha }}"

          # Capturar outputs del script
          echo "run_id=$(cat /tmp/mlflow_run_id)" >> $GITHUB_OUTPUT
          echo "test_accuracy=$(cat /tmp/test_accuracy)" >> $GITHUB_OUTPUT

  # ── Job 3: Evaluar y decidir promoción ──
  evaluate:
    needs: train
    runs-on: ubuntu-latest
    outputs:
      should_promote: ${{ steps.compare.outputs.should_promote }}
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Instalar dependencias
        run: pip install mlflow

      - name: Comparar con modelo en producción
        id: compare
        run: |
          python src/evaluate_and_promote.py \
            --run-id "${{ needs.train.outputs.run_id }}" \
            --model-name "${{ env.MODEL_NAME }}" \
            --threshold "${{ env.ACCURACY_THRESHOLD }}" \
            --force "${{ github.event.inputs.force_deploy || 'false' }}"

  # ── Job 4: Deploy ──
  deploy:
    needs: [train, evaluate]
    if: needs.evaluate.outputs.should_promote == 'true'
    runs-on: ubuntu-latest
    environment: production  # Requiere aprobación manual en GitHub
    steps:
      - uses: actions/checkout@v4

      - name: Promover a Production
        run: |
          python src/promote_model.py \
            --model-name "${{ env.MODEL_NAME }}" \
            --run-id "${{ needs.train.outputs.run_id }}" \
            --stage "Production"

      - name: Construir y publicar imagen Docker
        run: |
          mlflow models build-docker \
            -m "models:/${{ env.MODEL_NAME }}/Production" \
            -n "${{ env.MODEL_NAME }}-server" \
            --enable-mlserver

          docker tag ${{ env.MODEL_NAME }}-server \
            ${{ secrets.REGISTRY_URL }}/${{ env.MODEL_NAME }}:${{ github.sha }}

          docker push ${{ secrets.REGISTRY_URL }}/${{ env.MODEL_NAME }}:${{ github.sha }}

      - name: Deploy a Kubernetes
        run: |
          kubectl set image deployment/${{ env.MODEL_NAME }} \
            model=${{ secrets.REGISTRY_URL }}/${{ env.MODEL_NAME }}:${{ github.sha }}

Script de evaluación y auto-promoción

Este script compara el modelo recién entrenado contra el modelo actualmente en producción y decide si promoverlo.

python
# src/evaluate_and_promote.py
import argparse
import mlflow
from mlflow import MlflowClient

def evaluate_and_promote(run_id: str, model_name: str, threshold: float, force: bool):
    """Compara nuevo modelo vs producción y decide promoción."""
    client = MlflowClient()

    # ── Obtener métricas del nuevo modelo ──
    new_run = client.get_run(run_id)
    new_accuracy = new_run.data.metrics.get("test_accuracy", 0)
    new_loss = new_run.data.metrics.get("test_loss", float("inf"))

    print(f"Nuevo modelo - Accuracy: {new_accuracy:.4f}, Loss: {new_loss:.4f}")

    # ── Obtener métricas del modelo en producción ──
    prod_accuracy = 0.0
    try:
        prod_versions = client.get_latest_versions(model_name, stages=["Production"])
        if prod_versions:
            prod_run = client.get_run(prod_versions[0].run_id)
            prod_accuracy = prod_run.data.metrics.get("test_accuracy", 0)
            print(f"Modelo en producción - Accuracy: {prod_accuracy:.4f}")
        else:
            print("No hay modelo en producción (primer deploy)")
    except Exception as e:
        print(f"No se pudo obtener modelo de producción: {e}")

    # ── Reglas de promoción ──
    should_promote = False
    reason = ""

    if force:
        should_promote = True
        reason = "Deploy forzado via workflow_dispatch"
    elif new_accuracy >= threshold and new_accuracy > prod_accuracy:
        should_promote = True
        reason = (
            f"Nuevo modelo ({new_accuracy:.4f}) supera threshold ({threshold}) "
            f"y modelo actual ({prod_accuracy:.4f})"
        )
    elif new_accuracy >= threshold and prod_accuracy == 0:
        should_promote = True
        reason = f"Primer deploy, accuracy ({new_accuracy:.4f}) >= threshold ({threshold})"
    else:
        reason = (
            f"No cumple criterios: accuracy={new_accuracy:.4f}, "
            f"threshold={threshold}, prod={prod_accuracy:.4f}"
        )

    print(f"\nDecisión: {'PROMOVER' if should_promote else 'NO PROMOVER'}")
    print(f"Razón: {reason}")

    # Registrar decisión en MLFlow
    with mlflow.start_run(run_id=run_id):
        mlflow.log_params({
            "promotion_decision": str(should_promote),
            "promotion_reason": reason,
        })

    # Output para GitHub Actions
    import os
    with open(os.environ.get("GITHUB_OUTPUT", "/dev/null"), "a") as f:
        f.write(f"should_promote={'true' if should_promote else 'false'}\n")

    return should_promote

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--run-id", required=True)
    parser.add_argument("--model-name", required=True)
    parser.add_argument("--threshold", type=float, default=0.90)
    parser.add_argument("--force", type=str, default="false")
    args = parser.parse_args()

    evaluate_and_promote(
        run_id=args.run_id,
        model_name=args.model_name,
        threshold=args.threshold,
        force=args.force.lower() == "true",
    )

A/B Testing entre versiones de modelo

El A/B testing permite validar un nuevo modelo con tráfico real antes de hacer un rollout completo.

python
# src/ab_test_router.py
import mlflow
import numpy as np
from fastapi import FastAPI, Request
from pydantic import BaseModel
import hashlib

app = FastAPI()

# ── Cargar ambos modelos ──
model_a = mlflow.keras.load_model("models:/image-classifier/Production")   # Control
model_b = mlflow.keras.load_model("models:/image-classifier/Staging")      # Challenger

# Configuración del A/B test
AB_CONFIG = {
    "traffic_split": 0.2,     # 20% al modelo B (Staging)
    "model_a_name": "production-v5",
    "model_b_name": "staging-v6",
}

class PredictionRequest(BaseModel):
    features: list[list[float]]
    user_id: str | None = None

class PredictionResponse(BaseModel):
    prediction: list[float]
    model_version: str
    experiment_group: str  # "control" o "treatment"

def assign_group(user_id: str, split: float) -> str:
    """Asignación determinística basada en hash del user_id."""
    hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    return "treatment" if (hash_val % 100) < (split * 100) else "control"

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    features = np.array(request.features)
    user_id = request.user_id or str(np.random.randint(0, 100000))

    # Asignar grupo de forma determinística por usuario
    group = assign_group(user_id, AB_CONFIG["traffic_split"])

    if group == "treatment":
        prediction = model_b.predict(features)
        model_version = AB_CONFIG["model_b_name"]
    else:
        prediction = model_a.predict(features)
        model_version = AB_CONFIG["model_a_name"]

    # Registrar para análisis posterior
    mlflow.log_metrics({
        f"{group}_predictions": 1,
    })

    return PredictionResponse(
        prediction=prediction[0].tolist(),
        model_version=model_version,
        experiment_group=group,
    )

@app.get("/ab-test/stats")
async def ab_stats():
    """Endpoint para verificar estadísticas del A/B test."""
    return {
        "config": AB_CONFIG,
        "status": "running",
    }

Criterios para finalizar un A/B test

  1. Significancia estadística: al menos 1000 predicciones por grupo
  2. Duración mínima: 7 días para capturar patrones semanales
  3. Métrica primaria: definir UNA métrica de éxito antes de empezar (ej: accuracy, engagement)
  4. Rollback automático: si el modelo B tiene errores > 1% o latencia > 2x, revertir inmediatamente

Pipeline completo en resumen

La automatización completa sigue este flujo:

  1. Datos nuevos llegan (o se activa retrain semanal)
  2. GitHub Actions entrena, evalúa y registra en MLFlow
  3. Si accuracy > threshold y > modelo actual -> auto-promote a Staging
  4. A/B test con 20% de tráfico durante 1 semana
  5. Si métricas de A/B son positivas -> promote a Production
  6. Monitoreo continuo detecta drift y activa retrain si es necesario
END OF DOCUMENT

¿Necesitas más? Volver a la Librería →