MLFlow Versioning
Sistema completo de versionado de modelos de ML con MLFlow. Cubre el tracking de experimentos, el Model Registry, comparación de modelos, y despliegue en producción con serving integrado.
Setup & Architecture
MLFlow tiene cuatro componentes principales: Tracking (métricas y params), Projects (empaquetado), Models (formato estándar) y Registry (ciclo de vida).
Arquitectura de MLFlow en producción
┌──────────────┐ ┌──────────────────────────────────────┐
│ Data │ │ MLFlow Server │
│ Scientists │────>│ │
│ (notebook) │ │ ┌────────────┐ ┌───────────────┐ │
└──────────────┘ │ │ Tracking │ │ Model │ │
│ │ Server │ │ Registry │ │
┌──────────────┐ │ └─────┬──────┘ └───────┬───────┘ │
│ CI/CD │────>│ │ │ │
│ Pipeline │ └────────┼────────────────┼───────────┘
└──────────────┘ │ │
┌────────▼──┐ ┌───────▼────────┐
│ PostgreSQL│ │ Artifact Store│
│ (metadata)│ │ (S3 / MinIO) │
└───────────┘ └────────────────┘ # Instalar MLFlow
pip install mlflow
# Iniciar servidor de tracking local
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 \
--port 5000
# Servidor de producción con PostgreSQL + S3
mlflow server \
--backend-store-uri postgresql://user:pass@db:5432/mlflow \
--default-artifact-root s3://mlflow-artifacts/ \
--host 0.0.0.0 \
--port 5000 Experiment Tracking
Cada run registra parámetros, métricas y artefactos. Los experimentos agrupan runs relacionados que exploran el mismo problema.
import mlflow
import mlflow.keras
# Configurar tracking server
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("image-classifier-v2")
# ── Training run con auto-tracking ──
with mlflow.start_run(run_name="efficientnet-fine-tune") as run:
# Registrar parámetros
mlflow.log_params({
"model": "EfficientNetV2B0",
"learning_rate": 1e-3,
"batch_size": 32,
"epochs": 50,
"fine_tune_layers": 20,
"augmentation": "flip+rotate+zoom",
})
# Entrenar modelo
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=50,
callbacks=callbacks_list,
)
# Registrar métricas por epoch
for epoch, (loss, acc) in enumerate(
zip(history.history["loss"], history.history["accuracy"])
):
mlflow.log_metrics({
"train_loss": loss,
"train_accuracy": acc,
}, step=epoch)
# Registrar métricas finales del test set
test_loss, test_acc = model.evaluate(test_dataset)
mlflow.log_metrics({
"test_loss": test_loss,
"test_accuracy": test_acc,
})
# Guardar modelo como artefacto
mlflow.keras.log_model(
model,
artifact_path="model",
registered_model_name="image-classifier", # Auto-registra en Model Registry
)
# Guardar artefactos adicionales
mlflow.log_artifact("config.yaml")
mlflow.log_artifact("confusion_matrix.png")
print(f"Run ID: {run.info.run_id}") Model Registry
El Model Registry centraliza el ciclo de vida del modelo: desde la experimentación hasta la producción. Cada modelo tiene versiones con estados de transición.
Ciclo de vida del modelo en el Registry
┌─────────┐ ┌───────────┐ ┌──────────────┐ ┌──────────┐
│ None │────>│ Staging │────>│ Production │────>│ Archived │
│(default)│ │(validación)│ │(tráfico real) │ │(retirado)│
└─────────┘ └───────────┘ └──────────────┘ └──────────┘
│ │
│ ┌──────────┐ │
└───>│ Rejected │<──┘
└──────────┘ from mlflow import MlflowClient
client = MlflowClient()
# Listar versiones de un modelo
versions = client.search_model_versions("name='image-classifier'")
for v in versions:
print(f"Version {v.version} | Stage: {v.current_stage} | Run: {v.run_id}")
# Promover modelo a Staging
client.transition_model_version_stage(
name="image-classifier",
version="3",
stage="Staging",
)
# Promover a Production (reemplaza la versión anterior)
client.transition_model_version_stage(
name="image-classifier",
version="3",
stage="Production",
archive_existing_versions=True, # Archiva la versión en prod actual
) Model Comparison
MLFlow permite comparar múltiples runs para identificar qué combinación de hiperparámetros y arquitectura produce los mejores resultados.
import mlflow
# Buscar los mejores runs de un experimento
experiment = mlflow.get_experiment_by_name("image-classifier-v2")
best_runs = mlflow.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.test_accuracy > 0.90",
order_by=["metrics.test_accuracy DESC"],
max_results=5,
)
# Tabla comparativa
print(best_runs[[
"run_id",
"params.model",
"params.learning_rate",
"metrics.test_accuracy",
"metrics.test_loss",
]].to_string())
# ── Cargar modelo de un run específico ──
best_run_id = best_runs.iloc[0].run_id
model = mlflow.keras.load_model(f"runs:/{best_run_id}/model") MLFlow UI
Accede al dashboard en http://mlflow-server:5000 para visualizar
gráficas de métricas, comparar runs side-by-side, y navegar los artefactos.
La UI es la forma más eficiente de explorar experimentos.
Deployment & Serving
MLFlow Models puede servir modelos directamente como APIs REST o empaquetarlos como contenedores Docker para despliegue en Kubernetes.
# Servir modelo directamente desde MLFlow
mlflow models serve \
-m "models:/image-classifier/Production" \
--port 5001 \
--no-conda
# Construir imagen Docker del modelo
mlflow models build-docker \
-m "models:/image-classifier/Production" \
-n "image-classifier-server" \
--enable-mlserver
# Ejecutar contenedor
docker run -p 5001:8080 image-classifier-server
# Test con cURL
curl -X POST http://localhost:5001/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [[0.1, 0.2, ...]]}' Versionado automático en CI
Integra MLFlow en tu pipeline de CI/CD: después de entrenar, registra automáticamente el modelo en el Registry. Usa GitHub Actions para promover a Staging después de validar las métricas contra un threshold definido.
Model Monitoring
Un modelo en producción se degrada con el tiempo. Los datos del mundo real cambian (data drift), nuevas categorías aparecen, y el comportamiento de usuarios evoluciona. El monitoreo continuo detecta estos problemas antes de que impacten al negocio.
Pipeline de monitoreo en producción
┌────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Predicción │────>│ Log de datos │────>│ Drift Detection │
│ en prod │ │ (features + │ │ (batch diario) │
│ │ │ predictions) │ │ │
└────────────┘ └─────────────────┘ └────────┬─────────┘
│
┌─────────────────────────┼─────────────────┐
│ │ │
┌──────▼──────┐ ┌──────────────▼──┐ ┌─────────▼────────┐
│ Dashboard │ │ Alertas │ │ Retrain trigger │
│ (Grafana) │ │ (Slack/Email) │ │ (si drift > th) │
└─────────────┘ └─────────────────┘ └──────────────────┘ Detección de Data Drift con Evidently AI
Data drift ocurre cuando la distribución de los datos de entrada en producción difiere significativamente de los datos de entrenamiento. Evidently AI genera reportes detallados de drift por cada feature.
# Instalar Evidently AI
pip install evidently import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
# ── Cargar datos de referencia (training) y producción ──
reference_data = pd.read_parquet("data/training_features.parquet")
production_data = pd.read_parquet("data/production_features_last_7d.parquet")
# Definir mapping de columnas
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3", "feature_4"],
categorical_features=["category", "region"],
)
# ── Generar reporte de drift ──
drift_report = Report(metrics=[
DatasetDriftMetric(), # Drift general del dataset
DataDriftTable(), # Drift por cada feature
ColumnDriftMetric(column_name="feature_1"), # Detalle de feature específica
])
drift_report.run(
reference_data=reference_data,
current_data=production_data,
column_mapping=column_mapping,
)
# Guardar como HTML para revisión
drift_report.save_html("reports/drift_report.html")
# Extraer resultados programáticamente
report_dict = drift_report.as_dict()
dataset_drift = report_dict["metrics"][0]["result"]["dataset_drift"]
drift_share = report_dict["metrics"][0]["result"]["share_of_drifted_columns"]
print(f"Dataset drift detectado: {dataset_drift}")
print(f"Porcentaje de features con drift: {drift_share:.1%}") Pipeline de monitoreo automatizado
Este script se ejecuta diariamente (vía cron o Airflow) y alerta al equipo cuando detecta degradación.
import mlflow
from datetime import datetime, timedelta
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric
from evidently.test_suite import TestSuite
from evidently.tests import (
TestShareOfDriftedColumns,
TestColumnDrift,
)
def run_monitoring_pipeline(
reference_path: str,
production_path: str,
model_name: str,
drift_threshold: float = 0.3,
accuracy_threshold: float = 0.85,
):
"""Pipeline diario de monitoreo de modelo en producción."""
reference = pd.read_parquet(reference_path)
production = pd.read_parquet(production_path)
# ── Test Suite: pasa/falla automático ──
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=drift_threshold),
TestColumnDrift(column_name="feature_1"),
TestColumnDrift(column_name="feature_2"),
])
test_suite.run(reference_data=reference, current_data=production)
test_results = test_suite.as_dict()
all_passed = test_results["summary"]["all_passed"]
# ── Registrar resultados en MLFlow ──
with mlflow.start_run(run_name=f"monitoring-{datetime.now():%Y%m%d}"):
mlflow.log_metrics({
"drift_share": test_results["summary"]["total_tests"],
"tests_passed": int(all_passed),
"production_samples": len(production),
})
mlflow.log_artifact("reports/drift_report.html")
# ── Alertar si hay problemas ──
if not all_passed:
send_alert(
channel="#ml-alerts",
message=f"DRIFT DETECTADO en {model_name}. "
f"Tests fallidos. Revisar reporte en MLFlow.",
)
# ── Verificar accuracy en producción (si hay labels) ──
if "ground_truth" in production.columns:
prod_accuracy = (
production["prediction"] == production["ground_truth"]
).mean()
mlflow.log_metric("production_accuracy", prod_accuracy)
if prod_accuracy < accuracy_threshold:
send_alert(
channel="#ml-alerts",
message=f"DEGRADACION en {model_name}: "
f"accuracy={prod_accuracy:.3f} < {accuracy_threshold}",
)
return all_passed
def send_alert(channel: str, message: str):
"""Enviar alerta via Slack webhook."""
import requests
webhook_url = os.environ["SLACK_WEBHOOK_URL"]
requests.post(webhook_url, json={"channel": channel, "text": message}) | Métrica de monitoreo | Qué mide | Threshold típico | Acción si se supera |
|---|---|---|---|
| PSI (Population Stability Index) | Cambio en la distribución de una feature | PSI > 0.2 | Investigar feature, posible retrain |
| KL Divergence | Divergencia entre distribuciones de referencia y prod | KL > 0.1 | Revisar pipeline de datos upstream |
| Accuracy Decay | Caída de accuracy respecto al baseline | Caída > 5% | Retrain con datos recientes |
| Prediction Drift | Cambio en la distribución de predicciones | Drift > 0.15 | Verificar data drift, posible concept drift |
| Missing Value Rate | Aumento de valores nulos en features | Aumento > 2x | Revisar fuente de datos |
| Latencia P99 | Tiempo de inferencia percentil 99 | P99 > 500ms | Optimizar modelo o escalar infra |
Labels en producción
En la mayoría de escenarios reales, los ground truth labels llegan con retraso (días o semanas). Por eso el monitoreo de data drift (sin labels) es más práctico que el monitoreo de accuracy. Configura drift detection como tu primera línea de defensa y accuracy monitoring como segunda.
Automated ML Pipeline
Un pipeline automatizado de ML elimina la intervención manual en el ciclo de vida del modelo: desde que llegan datos nuevos hasta que un modelo mejorado se despliega en producción. La combinación de MLFlow + GitHub Actions permite implementar ML CI/CD sin infraestructura adicional.
Pipeline end-to-end de ML
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Datos │────>│ Train │────>│ Evaluate │────>│ Register │────>│ Deploy │
│ nuevos │ │ modelo │ │ métricas │ │ en MLFlow│ │ (auto) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │ │
│ MLFlow run Compare vs Auto-promote Blue/Green
│ + artifacts Production si threshold o A/B test
▼ │
Validación ┌──────▼──────┐
de calidad │ Staging │──> Tests de integración
de datos └─────────────┘ GitHub Actions workflow para ML CI/CD
Este workflow se activa con un push a la rama main que modifique datos o código de entrenamiento, o manualmente via workflow_dispatch.
# .github/workflows/ml-pipeline.yml
name: ML Training & Deployment Pipeline
on:
push:
branches: [main]
paths:
- 'src/train.py'
- 'src/model.py'
- 'data/processed/**'
- 'config.yaml'
workflow_dispatch:
inputs:
force_deploy:
description: 'Forzar deploy sin validar threshold'
type: boolean
default: false
schedule:
- cron: '0 2 * * 1' # Retrain semanal los lunes a las 2am
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
MODEL_NAME: "image-classifier"
ACCURACY_THRESHOLD: "0.92"
jobs:
# ── Job 1: Validar datos ──
validate-data:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Instalar dependencias
run: pip install pandas great-expectations
- name: Validar calidad de datos
run: python src/validate_data.py
# ── Job 2: Entrenar modelo ──
train:
needs: validate-data
runs-on: [self-hosted, gpu] # Runner con GPU
outputs:
run_id: ${{ steps.train.outputs.run_id }}
test_accuracy: ${{ steps.train.outputs.test_accuracy }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Instalar dependencias
run: pip install -r requirements.txt
- name: Entrenar modelo
id: train
run: |
python src/train.py \
--config config.yaml \
--experiment "${{ env.MODEL_NAME }}-ci" \
--run-name "ci-${{ github.sha }}"
# Capturar outputs del script
echo "run_id=$(cat /tmp/mlflow_run_id)" >> $GITHUB_OUTPUT
echo "test_accuracy=$(cat /tmp/test_accuracy)" >> $GITHUB_OUTPUT
# ── Job 3: Evaluar y decidir promoción ──
evaluate:
needs: train
runs-on: ubuntu-latest
outputs:
should_promote: ${{ steps.compare.outputs.should_promote }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Instalar dependencias
run: pip install mlflow
- name: Comparar con modelo en producción
id: compare
run: |
python src/evaluate_and_promote.py \
--run-id "${{ needs.train.outputs.run_id }}" \
--model-name "${{ env.MODEL_NAME }}" \
--threshold "${{ env.ACCURACY_THRESHOLD }}" \
--force "${{ github.event.inputs.force_deploy || 'false' }}"
# ── Job 4: Deploy ──
deploy:
needs: [train, evaluate]
if: needs.evaluate.outputs.should_promote == 'true'
runs-on: ubuntu-latest
environment: production # Requiere aprobación manual en GitHub
steps:
- uses: actions/checkout@v4
- name: Promover a Production
run: |
python src/promote_model.py \
--model-name "${{ env.MODEL_NAME }}" \
--run-id "${{ needs.train.outputs.run_id }}" \
--stage "Production"
- name: Construir y publicar imagen Docker
run: |
mlflow models build-docker \
-m "models:/${{ env.MODEL_NAME }}/Production" \
-n "${{ env.MODEL_NAME }}-server" \
--enable-mlserver
docker tag ${{ env.MODEL_NAME }}-server \
${{ secrets.REGISTRY_URL }}/${{ env.MODEL_NAME }}:${{ github.sha }}
docker push ${{ secrets.REGISTRY_URL }}/${{ env.MODEL_NAME }}:${{ github.sha }}
- name: Deploy a Kubernetes
run: |
kubectl set image deployment/${{ env.MODEL_NAME }} \
model=${{ secrets.REGISTRY_URL }}/${{ env.MODEL_NAME }}:${{ github.sha }} Script de evaluación y auto-promoción
Este script compara el modelo recién entrenado contra el modelo actualmente en producción y decide si promoverlo.
# src/evaluate_and_promote.py
import argparse
import mlflow
from mlflow import MlflowClient
def evaluate_and_promote(run_id: str, model_name: str, threshold: float, force: bool):
"""Compara nuevo modelo vs producción y decide promoción."""
client = MlflowClient()
# ── Obtener métricas del nuevo modelo ──
new_run = client.get_run(run_id)
new_accuracy = new_run.data.metrics.get("test_accuracy", 0)
new_loss = new_run.data.metrics.get("test_loss", float("inf"))
print(f"Nuevo modelo - Accuracy: {new_accuracy:.4f}, Loss: {new_loss:.4f}")
# ── Obtener métricas del modelo en producción ──
prod_accuracy = 0.0
try:
prod_versions = client.get_latest_versions(model_name, stages=["Production"])
if prod_versions:
prod_run = client.get_run(prod_versions[0].run_id)
prod_accuracy = prod_run.data.metrics.get("test_accuracy", 0)
print(f"Modelo en producción - Accuracy: {prod_accuracy:.4f}")
else:
print("No hay modelo en producción (primer deploy)")
except Exception as e:
print(f"No se pudo obtener modelo de producción: {e}")
# ── Reglas de promoción ──
should_promote = False
reason = ""
if force:
should_promote = True
reason = "Deploy forzado via workflow_dispatch"
elif new_accuracy >= threshold and new_accuracy > prod_accuracy:
should_promote = True
reason = (
f"Nuevo modelo ({new_accuracy:.4f}) supera threshold ({threshold}) "
f"y modelo actual ({prod_accuracy:.4f})"
)
elif new_accuracy >= threshold and prod_accuracy == 0:
should_promote = True
reason = f"Primer deploy, accuracy ({new_accuracy:.4f}) >= threshold ({threshold})"
else:
reason = (
f"No cumple criterios: accuracy={new_accuracy:.4f}, "
f"threshold={threshold}, prod={prod_accuracy:.4f}"
)
print(f"\nDecisión: {'PROMOVER' if should_promote else 'NO PROMOVER'}")
print(f"Razón: {reason}")
# Registrar decisión en MLFlow
with mlflow.start_run(run_id=run_id):
mlflow.log_params({
"promotion_decision": str(should_promote),
"promotion_reason": reason,
})
# Output para GitHub Actions
import os
with open(os.environ.get("GITHUB_OUTPUT", "/dev/null"), "a") as f:
f.write(f"should_promote={'true' if should_promote else 'false'}\n")
return should_promote
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--run-id", required=True)
parser.add_argument("--model-name", required=True)
parser.add_argument("--threshold", type=float, default=0.90)
parser.add_argument("--force", type=str, default="false")
args = parser.parse_args()
evaluate_and_promote(
run_id=args.run_id,
model_name=args.model_name,
threshold=args.threshold,
force=args.force.lower() == "true",
) A/B Testing entre versiones de modelo
El A/B testing permite validar un nuevo modelo con tráfico real antes de hacer un rollout completo.
# src/ab_test_router.py
import mlflow
import numpy as np
from fastapi import FastAPI, Request
from pydantic import BaseModel
import hashlib
app = FastAPI()
# ── Cargar ambos modelos ──
model_a = mlflow.keras.load_model("models:/image-classifier/Production") # Control
model_b = mlflow.keras.load_model("models:/image-classifier/Staging") # Challenger
# Configuración del A/B test
AB_CONFIG = {
"traffic_split": 0.2, # 20% al modelo B (Staging)
"model_a_name": "production-v5",
"model_b_name": "staging-v6",
}
class PredictionRequest(BaseModel):
features: list[list[float]]
user_id: str | None = None
class PredictionResponse(BaseModel):
prediction: list[float]
model_version: str
experiment_group: str # "control" o "treatment"
def assign_group(user_id: str, split: float) -> str:
"""Asignación determinística basada en hash del user_id."""
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return "treatment" if (hash_val % 100) < (split * 100) else "control"
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
features = np.array(request.features)
user_id = request.user_id or str(np.random.randint(0, 100000))
# Asignar grupo de forma determinística por usuario
group = assign_group(user_id, AB_CONFIG["traffic_split"])
if group == "treatment":
prediction = model_b.predict(features)
model_version = AB_CONFIG["model_b_name"]
else:
prediction = model_a.predict(features)
model_version = AB_CONFIG["model_a_name"]
# Registrar para análisis posterior
mlflow.log_metrics({
f"{group}_predictions": 1,
})
return PredictionResponse(
prediction=prediction[0].tolist(),
model_version=model_version,
experiment_group=group,
)
@app.get("/ab-test/stats")
async def ab_stats():
"""Endpoint para verificar estadísticas del A/B test."""
return {
"config": AB_CONFIG,
"status": "running",
} Criterios para finalizar un A/B test
- Significancia estadística: al menos 1000 predicciones por grupo
- Duración mínima: 7 días para capturar patrones semanales
- Métrica primaria: definir UNA métrica de éxito antes de empezar (ej: accuracy, engagement)
- Rollback automático: si el modelo B tiene errores > 1% o latencia > 2x, revertir inmediatamente
Pipeline completo en resumen
La automatización completa sigue este flujo:
- Datos nuevos llegan (o se activa retrain semanal)
- GitHub Actions entrena, evalúa y registra en MLFlow
- Si accuracy > threshold y > modelo actual -> auto-promote a Staging
- A/B test con 20% de tráfico durante 1 semana
- Si métricas de A/B son positivas -> promote a Production
- Monitoreo continuo detecta drift y activa retrain si es necesario