Deep Learning Keras
Guía completa de desarrollo de modelos de deep learning con Keras y TensorFlow. Desde la configuración del entorno con GPU hasta el entrenamiento, evaluación, exportación y serving de modelos en producción.
Setup & Environment
Keras 3 es multi-backend: funciona con TensorFlow, PyTorch y JAX. La instalación estándar usa TensorFlow como backend por defecto.
# Crear entorno con conda (recomendado para GPU)
conda create -n ml python=3.11 -y
conda activate ml
# Instalar TensorFlow con soporte GPU
pip install tensorflow[and-cuda]
# O instalar Keras 3 standalone
pip install keras
# Dependencias de datos y visualización
pip install numpy pandas matplotlib scikit-learn
# Verificar GPU disponible
python -c "import tensorflow as tf; print(tf.config.list_physical_devices('GPU'))" Estructura de un proyecto de ML
ml-project/
├── data/
│ ├── raw/ # Datos originales (nunca modificar)
│ ├── processed/ # Datos preprocesados
│ └── splits/ # train/val/test
├── models/ # Modelos exportados (.keras, .h5)
├── notebooks/ # Exploración y experimentación
├── src/
│ ├── data_pipeline.py # Carga y preprocesamiento
│ ├── model.py # Definición de arquitectura
│ ├── train.py # Training loop
│ ├── evaluate.py # Evaluación y métricas
│ └── predict.py # Inferencia
├── config.yaml # Hiperparámetros
├── Dockerfile
└── requirements.txt Model Architectures
Keras ofrece tres APIs para construir modelos: Sequential (lineal), Functional (DAG) y Subclassing (máxima flexibilidad).
Sequential API — CNN para clasificación
import keras
from keras import layers
def build_cnn(input_shape=(224, 224, 3), num_classes=10):
model = keras.Sequential([
# Bloque 1: Extracción de features
layers.Conv2D(32, (3, 3), activation="relu", input_shape=input_shape),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
# Bloque 2: Features más complejas
layers.Conv2D(64, (3, 3), activation="relu"),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
# Bloque 3: Features de alto nivel
layers.Conv2D(128, (3, 3), activation="relu"),
layers.BatchNormalization(),
layers.GlobalAveragePooling2D(),
# Clasificador
layers.Dense(256, activation="relu"),
layers.Dropout(0.5),
layers.Dense(num_classes, activation="softmax"),
])
return model Functional API — Transfer Learning
def build_transfer_model(num_classes=10, fine_tune_layers=20):
"""EfficientNet como feature extractor + clasificador personalizado."""
# Base pre-entrenada (ImageNet)
base = keras.applications.EfficientNetV2B0(
include_top=False,
weights="imagenet",
input_shape=(224, 224, 3),
)
# Congelar capas base (solo entrenar el clasificador)
base.trainable = False
# Functional API
inputs = keras.Input(shape=(224, 224, 3))
x = base(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation="relu")(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(num_classes, activation="softmax")(x)
model = keras.Model(inputs, outputs)
# Fine-tuning: descongelar últimas N capas
for layer in base.layers[-fine_tune_layers:]:
layer.trainable = True
return model Transfer Learning workflow
- Entrenar primero con
base.trainable = False(3-5 epochs) - Descongelar últimas capas ->
base.layers[-20:].trainable = True - Fine-tune con learning rate bajo (
1e-5) durante 5-10 epochs más
Training Pipeline
Un pipeline de entrenamiento robusto incluye data augmentation, callbacks para early stopping, checkpointing y logging con TensorBoard.
import keras
from keras import layers, callbacks
# ── Data Augmentation ──
augmentation = keras.Sequential([
layers.RandomFlip("horizontal"),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomContrast(0.1),
])
# ── Compilación ──
model = build_transfer_model(num_classes=10)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss="categorical_crossentropy",
metrics=["accuracy", keras.metrics.F1Score(average="macro")],
)
# ── Callbacks ──
cb = [
callbacks.EarlyStopping(
monitor="val_loss",
patience=10,
restore_best_weights=True,
),
callbacks.ModelCheckpoint(
filepath="models/best_model.keras",
save_best_only=True,
monitor="val_accuracy",
),
callbacks.ReduceLROnPlateau(
monitor="val_loss",
factor=0.5,
patience=5,
min_lr=1e-7,
),
callbacks.TensorBoard(log_dir="./logs"),
]
# ── Entrenamiento ──
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=50,
callbacks=cb,
verbose=1,
) Evaluation & Metrics
La evaluación va más allá del accuracy: en problemas desbalanceados, las métricas como F1-score, precision y recall son críticas.
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
# Evaluar en test set
test_loss, test_acc, test_f1 = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print(f"Test F1: {test_f1:.4f}")
# Predicciones detalladas
y_pred = model.predict(test_dataset)
y_pred_classes = np.argmax(y_pred, axis=1)
# Classification report completo
print(classification_report(
y_true,
y_pred_classes,
target_names=class_names,
)) | Métrica | Cuándo usarla | Fórmula |
|---|---|---|
| Accuracy | Datasets balanceados | TP+TN / Total |
| Precision | Minimizar falsos positivos | TP / (TP+FP) |
| Recall | Minimizar falsos negativos | TP / (TP+FN) |
| F1-Score | Balance entre precision y recall | 2·P·R / (P+R) |
| AUC-ROC | Clasificación binaria probabilística | Área bajo ROC |
Model Export & Serving
El modelo entrenado se exporta en formato .keras (nativo) o SavedModel (TensorFlow Serving). Para producción, TensorFlow Serving o TFLite son las opciones estándar.
# Exportar modelo
model.save("models/classifier.keras") # Formato Keras 3
model.export("models/classifier_savedmodel") # TF SavedModel
# Cargar modelo
loaded = keras.models.load_model("models/classifier.keras")
# ── Convertir a TFLite (edge/mobile) ──
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_saved_model(
"models/classifier_savedmodel"
)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
with open("models/classifier.tflite", "wb") as f:
f.write(tflite_model) FastAPI serving endpoint
# Servir modelo con FastAPI
import keras
import numpy as np
from fastapi import FastAPI, UploadFile
from PIL import Image
app = FastAPI()
model = keras.models.load_model("models/classifier.keras")
@app.post("/predict")
async def predict(file: UploadFile):
img = Image.open(file.file).resize((224, 224))
arr = np.array(img) / 255.0
arr = np.expand_dims(arr, axis=0)
predictions = model.predict(arr)
class_idx = int(np.argmax(predictions[0]))
confidence = float(predictions[0][class_idx])
return {
"class": class_names[class_idx],
"confidence": round(confidence, 4),
} Modelo en memoria
Carga el modelo una sola vez al iniciar el servidor (no por petición). Para múltiples workers, usa TensorFlow Serving que gestiona la memoria del modelo de forma eficiente y soporta versionado automático.
Hyperparameter Tuning
La búsqueda de hiperparámetros es una de las tareas más importantes y costosas en ML. Keras Tuner automatiza este proceso probando combinaciones de hiperparámetros de forma inteligente, evitando la búsqueda manual trial-and-error.
# Instalar Keras Tuner y MLFlow para tracking
pip install keras-tuner mlflow Definir el espacio de búsqueda
El primer paso es crear una función build_model que recibe un objeto hp (hyperparameters) y devuelve un modelo compilado. Cada llamada a hp.Int, hp.Float o hp.Choice define un rango de valores a explorar.
import keras_tuner as kt
import keras
from keras import layers
def build_model(hp):
"""Model builder con espacio de búsqueda de hiperparámetros."""
model = keras.Sequential()
# Número de bloques convolucionales (2, 3 o 4)
num_blocks = hp.Int("num_conv_blocks", min_value=2, max_value=4, step=1)
for i in range(num_blocks):
filters = hp.Choice(f"filters_{i}", values=[32, 64, 128, 256])
model.add(layers.Conv2D(filters, (3, 3), activation="relu", padding="same"))
model.add(layers.BatchNormalization())
if hp.Boolean(f"use_pooling_{i}"):
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.GlobalAveragePooling2D())
# Capas densas
dense_units = hp.Int("dense_units", min_value=64, max_value=512, step=64)
model.add(layers.Dense(dense_units, activation="relu"))
# Dropout rate
dropout_rate = hp.Float("dropout", min_value=0.1, max_value=0.5, step=0.1)
model.add(layers.Dropout(dropout_rate))
model.add(layers.Dense(10, activation="softmax"))
# Learning rate
learning_rate = hp.Float(
"learning_rate", min_value=1e-5, max_value=1e-2, sampling="log"
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss="categorical_crossentropy",
metrics=["accuracy"],
)
return model Ejecutar la búsqueda
Keras Tuner ofrece tres estrategias principales. El tuner.search() funciona igual que model.fit() pero prueba múltiples configuraciones automáticamente.
# ── Estrategia 1: Hyperband (recomendado para empezar) ──
tuner = kt.Hyperband(
build_model,
objective="val_accuracy",
max_epochs=50,
factor=3, # Reducción agresiva de trials malos
hyperband_iterations=2, # Repeticiones del bracket completo
directory="tuner_results",
project_name="cnn_tuning",
overwrite=True,
)
# ── Estrategia 2: Bayesiana (cuando cada trial es costoso) ──
# tuner = kt.BayesianOptimization(
# build_model,
# objective="val_accuracy",
# max_trials=30,
# directory="tuner_results",
# project_name="cnn_bayesian",
# )
# Ejecutar búsqueda
tuner.search(
train_dataset,
validation_data=val_dataset,
epochs=50,
callbacks=[
keras.callbacks.EarlyStopping(monitor="val_loss", patience=5),
],
)
# Obtener los mejores hiperparámetros
best_hps = tuner.get_best_hyperparameters(num_trials=3)
for i, hp in enumerate(best_hps):
print(f"\n--- Top {i+1} ---")
print(f" Conv blocks: {hp.get('num_conv_blocks')}")
print(f" Dense units: {hp.get('dense_units')}")
print(f" Dropout: {hp.get('dropout')}")
print(f" Learning rate: {hp.get('learning_rate'):.6f}")
# Entrenar el mejor modelo con todos los epochs
best_model = tuner.get_best_models(num_models=1)[0]
best_model.summary() Integración con MLFlow
Registrar cada trial de tuning en MLFlow permite comparar visualmente todas las combinaciones y mantener un historial completo.
import mlflow
class MLFlowTunerCallback(keras.callbacks.Callback):
"""Callback que registra cada trial del tuner en MLFlow."""
def __init__(self, trial_id, hyperparameters):
self.trial_id = trial_id
self.hyperparameters = hyperparameters
def on_train_begin(self, logs=None):
mlflow.start_run(run_name=f"tuner-trial-{self.trial_id}", nested=True)
mlflow.log_params(self.hyperparameters)
def on_epoch_end(self, epoch, logs=None):
mlflow.log_metrics(
{k: v for k, v in logs.items()},
step=epoch,
)
def on_train_end(self, logs=None):
mlflow.end_run()
# Uso con el tuner (dentro de un run padre de MLFlow)
mlflow.set_experiment("hyperparameter-tuning")
with mlflow.start_run(run_name="keras-tuner-search"):
mlflow.log_params({
"strategy": "Hyperband",
"max_epochs": 50,
"objective": "val_accuracy",
})
tuner.search(
train_dataset,
validation_data=val_dataset,
epochs=50,
)
# Registrar los mejores hiperparámetros encontrados
best_hp = tuner.get_best_hyperparameters(1)[0]
mlflow.log_params({
f"best_{k}": v for k, v in best_hp.values.items()
}) | Estrategia | Mecanismo | Tamaño de dataset ideal | Trials recomendados | Cuándo usarla |
|---|---|---|---|---|
| RandomSearch | Prueba combinaciones aleatorias | Cualquiera | 50-100+ | Exploración inicial rápida del espacio |
| BayesianOptimization | Modelo probabilístico guía la búsqueda | Medio-Grande | 20-50 | Trials costosos, datasets grandes |
| Hyperband | Early stopping adaptativo de trials malos | Cualquiera | Auto (brackets) | Mejor balance velocidad/calidad general |
Flujo recomendado de tuning
- Empezar con RandomSearch (50 trials) para entender el espacio de búsqueda
- Refinar con BayesianOptimization en los rangos más prometedores
- Para producción, usar Hyperband que descarta configuraciones malas tempranamente y ahorra GPU
Mixed Precision & Optimization
El entrenamiento con precisión mixta (mixed precision) utiliza float16 para la mayoría de operaciones y float32 solo donde es necesario para estabilidad numérica. Esto reduce el uso de memoria GPU y puede acelerar el entrenamiento hasta 2x en GPUs con Tensor Cores (NVIDIA V100, A100, RTX 3000+).
Configurar Mixed Precision
import keras
import tensorflow as tf
# ── Activar mixed precision globalmente ──
keras.mixed_precision.set_global_policy("mixed_float16")
# Verificar la política activa
policy = keras.mixed_precision.global_policy()
print(f"Compute dtype: {policy.compute_dtype}") # float16
print(f"Variable dtype: {policy.variable_dtype}") # float32
# ── Construir modelo (el modelo usa float16 automáticamente) ──
model = keras.Sequential([
layers.Conv2D(64, (3, 3), activation="relu", input_shape=(224, 224, 3)),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(128, (3, 3), activation="relu"),
layers.BatchNormalization(),
layers.GlobalAveragePooling2D(),
layers.Dense(256, activation="relu"),
layers.Dropout(0.3),
# IMPORTANTE: la última capa debe ser float32 para estabilidad numérica
layers.Dense(10, dtype="float32"),
layers.Activation("softmax", dtype="float32"),
])
# Compilar con loss scaling automático (maneja underflow en float16)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss="categorical_crossentropy",
metrics=["accuracy"],
)
print(f"Parámetros totales: {model.count_params():,}") Compilación XLA para mayor velocidad
XLA (Accelerated Linear Algebra) compila operaciones de TensorFlow en kernels optimizados, fusionando operaciones y eliminando overhead.
# ── Activar JIT compilation con XLA ──
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss="categorical_crossentropy",
metrics=["accuracy"],
jit_compile=True, # Activa XLA
)
# Alternativa: decorar funciones individuales
@tf.function(jit_compile=True)
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(predictions, labels)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss Pipeline de datos optimizado con tf.data
El cuello de botella más común en entrenamiento no es la GPU sino la carga de datos. Un pipeline tf.data bien configurado mantiene la GPU alimentada constantemente.
import tensorflow as tf
AUTOTUNE = tf.data.AUTOTUNE
BATCH_SIZE = 64
def parse_image(filename, label):
"""Lee y preprocesa una imagen desde disco."""
image = tf.io.read_file(filename)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
return image, label
def augment(image, label):
"""Data augmentation en GPU (más rápido que en CPU)."""
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.1)
image = tf.image.random_contrast(image, 0.9, 1.1)
return image, label
# ── Pipeline optimizado ──
train_ds = (
tf.data.Dataset.from_tensor_slices((train_files, train_labels))
.shuffle(buffer_size=10000) # Aleatorizar orden
.map(parse_image, num_parallel_calls=AUTOTUNE) # Parsear en paralelo
.cache() # Cachear en memoria después del primer epoch
.map(augment, num_parallel_calls=AUTOTUNE) # Augmentation en paralelo
.batch(BATCH_SIZE)
.prefetch(AUTOTUNE) # Prefetch del siguiente batch mientras GPU entrena
)
val_ds = (
tf.data.Dataset.from_tensor_slices((val_files, val_labels))
.map(parse_image, num_parallel_calls=AUTOTUNE)
.cache() # Datos de validación no cambian, siempre cachear
.batch(BATCH_SIZE)
.prefetch(AUTOTUNE)
)
# Verificar el throughput del pipeline
import time
start = time.time()
for batch in train_ds.take(100):
pass
elapsed = time.time() - start
print(f"100 batches en {elapsed:.2f}s ({100/elapsed:.0f} batches/seg)") Optimización de memoria para modelos grandes
Cuando el modelo no cabe en GPU, estas técnicas ayudan a reducir el consumo de memoria sin sacrificar demasiada calidad.
# ── Gradient accumulation (simular batch sizes grandes) ──
accumulation_steps = 4 # Equivale a batch_size * 4
@tf.function
def train_step_accumulated(dataset_iterator):
"""Acumula gradientes de múltiples mini-batches."""
accumulated_gradients = [
tf.zeros_like(v) for v in model.trainable_variables
]
total_loss = 0.0
for _ in range(accumulation_steps):
images, labels = next(dataset_iterator)
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(predictions, labels) / accumulation_steps
gradients = tape.gradient(loss, model.trainable_variables)
accumulated_gradients = [
ag + g for ag, g in zip(accumulated_gradients, gradients)
]
total_loss += loss
optimizer.apply_gradients(
zip(accumulated_gradients, model.trainable_variables)
)
return total_loss
# ── Limitar memoria GPU (evitar OOM en máquinas compartidas) ──
gpus = tf.config.list_physical_devices("GPU")
if gpus:
# Opción 1: Crecimiento dinámico (usa solo lo necesario)
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Opción 2: Límite fijo (ej: 6GB de 8GB disponibles)
# tf.config.set_logical_device_configuration(
# gpus[0],
# [tf.config.LogicalDeviceConfiguration(memory_limit=6144)]
# ) | Optimización | Speedup típico | Reducción de memoria | Requisitos | Complejidad |
|---|---|---|---|---|
| Mixed Precision (float16) | 1.5-2x | ~40% | GPU con Tensor Cores | Baja (1 línea de config) |
| XLA jit_compile | 1.2-1.5x | Variable | TensorFlow backend | Baja (1 flag) |
| tf.data pipeline | 2-5x en carga | Configurable (cache) | Ninguno | Media |
| Gradient accumulation | N/A (misma velocidad) | Proporcional a steps | Ninguno | Media |
| Mixed + XLA + tf.data | 3-5x combinado | ~50% | GPU moderna + TF | Media |
Compatibilidad de Mixed Precision
Mixed precision requiere GPUs con Tensor Cores (NVIDIA Volta o posterior: V100, T4, A100, RTX serie 20/30/40). En GPUs sin Tensor Cores el rendimiento puede ser peor que float32 puro. Verifica siempre con un benchmark antes de aplicar en producción.
Orden recomendado de optimización
- tf.data pipeline — es gratis y siempre mejora el throughput
- Mixed precision — una línea de código, gran impacto
- XLA compilation — beneficio variable, probar con tu modelo específico
- Gradient accumulation — solo si necesitas batch sizes mayores que tu GPU permite