Apache Airflow
GuΓa de orquestaciΓ³n de workflows con Apache Airflow. Cubre la arquitectura del sistema, definiciΓ³n de DAGs, operadores, conexiones, despliegue en contenedores y patrones de producciΓ³n.
Arquitectura
Airflow es un orquestador de workflows programΓ‘ticos que define pipelines como cΓ³digo Python (DAGs). Sus componentes principales son el Scheduler, Webserver, Workers y la base de datos de metadatos.
Arquitectura de Apache Airflow
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Airflow Cluster β
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Webserver β β Scheduler β β Triggerer β β
β β β β β β β β
β β UI Dashboardβ β Parsea DAGs β β Deferrable β β
β β REST API β β Asigna tasksβ β Operators β β
β β Auth/RBAC β β Heartbeats β β β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββββββββββ β
β β β β
β ββββββββΌββββββββββββββββββββΌβββββββββββββββββββββββββββββ β
β β Metadata Database (PostgreSQL) β β
β β DAG runs Β· Task instances Β· Variables Β· Connections β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββ β
β β Executor β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Worker 1 β β Worker 2 β β Worker N β β β
β β β (Celery) β β (Celery) β β (K8s Pod)β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | Executor | DescripciΓ³n | Caso de uso |
|---|---|---|
| LocalExecutor | Ejecuta tasks en subprocesos locales | Desarrollo, cargas pequeΓ±as |
| CeleryExecutor | Distribuye tasks via Celery + Redis/RabbitMQ | ProducciΓ³n con carga media |
| KubernetesExecutor | Cada task es un Pod de Kubernetes | ProducciΓ³n, aislamiento total |
DAGs & Tasks
Un DAG (Directed Acyclic Graph) define un workflow como un grafo de tareas con dependencias. Se escribe en Python puro.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
# ββ ConfiguraciΓ³n del DAG ββ
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["alerts@company.com"],
}
with DAG(
dag_id="etl_daily_pipeline",
description="Pipeline ETL diario: extrae, transforma y carga datos",
schedule="0 6 * * *", # Cada dΓa a las 6:00 UTC
start_date=datetime(2024, 1, 1),
catchup=False, # No ejecutar runs pasados
default_args=default_args,
tags=["etl", "production"],
) as dag:
# ββ Extract ββ
extract = PythonOperator(
task_id="extract_data",
python_callable=extract_from_api,
op_kwargs={"source": "production_db"},
)
# ββ Transform (grupo de tareas paralelas) ββ
with TaskGroup(group_id="transform") as transform_group:
clean = PythonOperator(
task_id="clean_data",
python_callable=clean_records,
)
enrich = PythonOperator(
task_id="enrich_data",
python_callable=enrich_with_geo,
)
validate = PythonOperator(
task_id="validate_schema",
python_callable=validate_output,
)
[clean, enrich] >> validate
# ββ Load ββ
load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_to_bigquery,
)
# ββ Notify ββ
notify = BashOperator(
task_id="send_notification",
bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text":"ETL completado"}\'',
)
# ββ Dependencias ββ
extract >> transform_group >> load >> notify Operators & Hooks
Los Operators encapsulan tipos de trabajo. Los Hooks gestionan conexiones a servicios externos. Airflow tiene operadores para casi cualquier servicio cloud.
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.external_task import ExternalTaskSensor
# ββ DockerOperator: ejecutar contenedor ββ
run_model = DockerOperator(
task_id="train_model",
image="ml-training:latest",
command="python train.py --epochs 50",
docker_url="unix:///var/run/docker.sock",
network_mode="bridge",
auto_remove=True,
environment={
"MLFLOW_TRACKING_URI": "http://mlflow:5000",
},
)
# ββ PostgresOperator: ejecutar SQL ββ
create_table = PostgresOperator(
task_id="create_staging_table",
postgres_conn_id="warehouse_db",
sql="""
CREATE TABLE IF NOT EXISTS staging.daily_metrics (
date DATE PRIMARY KEY,
users_active INT,
revenue DECIMAL(10,2),
processed_at TIMESTAMP DEFAULT NOW()
);
""",
)
# ββ Sensor: esperar a que otro DAG termine ββ
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_ingestion",
external_dag_id="data_ingestion",
external_task_id="final_check",
timeout=3600, # 1 hora mΓ‘ximo de espera
mode="reschedule", # Libera el worker mientras espera
) Deferrable Operators
Usa Deferrable Operators (asyncio-based) para tareas que esperan eventos externos. Liberan el worker slot mientras esperan, permitiendo mayor concurrencia con menos recursos.
Despliegue con Docker
Airflow se despliega fΓ‘cilmente con Docker Compose, ideal para desarrollo y equipos pequeΓ±os. Para producciΓ³n, se recomienda Kubernetes con el Helm chart oficial.
# docker-compose.yml β Airflow con CeleryExecutor
x-airflow-common: &airflow-common
image: apache/airflow:2.9.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__WEBSERVER__SECRET_KEY: 'change-me-in-production'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
- redis
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
scheduler:
<<: *airflow-common
command: scheduler
worker:
<<: *airflow-common
command: celery worker
deploy:
replicas: 2
volumes:
postgres-data: # Inicializar y arrancar
docker compose up airflow-init # Crea DB + usuario admin
docker compose up -d # Arrancar todos los servicios
# Acceder al UI
# http://localhost:8080 (user: airflow, pass: airflow)
# CLI ΓΊtil
docker compose exec webserver airflow dags list
docker compose exec webserver airflow tasks test etl_daily_pipeline extract_data 2024-01-01 Patrones de ProducciΓ³n
Patrones esenciales para operar Airflow en producciΓ³n: idempotencia, gestiΓ³n de secrets, alertas y testing de DAGs.
from airflow.models import Variable
from airflow.hooks.base import BaseHook
# ββ Variables y Secrets (nunca hardcodear) ββ
api_key = Variable.get("api_key", deserialize_json=False)
db_conn = BaseHook.get_connection("warehouse_db")
# ββ DAG idempotente: usar execution_date ββ
def extract_data(**context):
# Siempre usar logical_date para idempotencia
execution_date = context["logical_date"]
data = api.fetch(
start=execution_date,
end=execution_date + timedelta(days=1),
)
# Guardar con particiΓ³n por fecha
save_to_s3(
data,
key=f"raw/{execution_date.strftime('%Y-%m-%d')}/data.parquet",
)
# ββ Testing de DAGs ββ
import pytest
from airflow.models import DagBag
def test_dag_integrity():
"""Verificar que todos los DAGs se parsean sin errores."""
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0, (
f"Errores de importaciΓ³n: {dag_bag.import_errors}"
)
def test_dag_has_tags():
"""Todo DAG debe tener al menos un tag."""
dag_bag = DagBag(include_examples=False)
for dag_id, dag in dag_bag.dags.items():
assert dag.tags, f"{dag_id} no tiene tags" Idempotencia obligatoria
Todo DAG debe ser idempotente: ejecutar el mismo
DAG run dos veces debe producir el mismo resultado. Usa
logical_date para particionar datos y evita side effects
no controlados.
XComs & Data Passing
Los XComs (Cross-Communications) permiten que las tareas de un DAG intercambien pequenos datos entre si. Son el mecanismo nativo de Airflow para pasar resultados de una tarea a la siguiente sin usar almacenamiento externo.
Flujo de XComs entre tareas
Task A XCom Backend Task B
β β β
β xcom_push(key, value) β β
ββββββββββββββββββββββββββββΆβ Almacena en β
β β metadata DB β
β β β
β β xcom_pull(task_id, key) β
β ββββββββββββββββββββββββββββββ€
β βββββββββββββββββββββββββββββΆβ
β β Retorna value β
β β β XCom push/pull clasico
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data(**context):
"""Extraer datos y enviarlos via XCom."""
records = [
{"id": 1, "name": "Alice", "score": 95},
{"id": 2, "name": "Bob", "score": 87},
{"id": 3, "name": "Carol", "score": 92},
]
# Push explicito con clave personalizada
context["ti"].xcom_push(key="extracted_records", value=records)
context["ti"].xcom_push(key="record_count", value=len(records))
def transform_data(**context):
"""Leer datos de la tarea anterior y transformarlos."""
ti = context["ti"]
# Pull explicito por task_id y key
records = ti.xcom_pull(task_ids="extract", key="extracted_records")
count = ti.xcom_pull(task_ids="extract", key="record_count")
print(f"Procesando {count} registros")
# Transformar: filtrar scores > 90
top_records = [r for r in records if r["score"] > 90]
# Push del resultado transformado
ti.xcom_push(key="top_records", value=top_records)
def load_data(**context):
"""Cargar los datos transformados."""
top_records = context["ti"].xcom_pull(
task_ids="transform",
key="top_records",
)
for record in top_records:
print(f"Cargando: {record['name']} (score: {record['score']})")
with DAG(
dag_id="xcom_classic_example",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
load = PythonOperator(
task_id="load",
python_callable=load_data,
)
extract >> transform >> load TaskFlow API con @task decorator
La TaskFlow API (Airflow 2.0+) simplifica drasticamente el uso de XComs.
Los valores de retorno de funciones decoradas con @task se pasan
automaticamente como XComs, sin necesidad de push/pull explicito.
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import json
@dag(
dag_id="taskflow_etl_pipeline",
description="ETL con TaskFlow API β XCom implicito",
schedule="0 8 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=3),
},
tags=["etl", "taskflow"],
)
def taskflow_etl():
@task()
def extract(source: str) -> dict:
"""Extraer datos de la fuente. El return se convierte en XCom."""
import requests
response = requests.get(f"https://api.example.com/{source}/data")
data = response.json()
return {
"records": data["items"],
"total": data["total"],
"source": source,
}
@task()
def validate(extracted: dict) -> dict:
"""Validar datos extraidos. Recibe XCom automaticamente."""
records = extracted["records"]
valid = [r for r in records if r.get("id") and r.get("value")]
invalid_count = len(records) - len(valid)
if invalid_count > len(records) * 0.1:
raise ValueError(
f"Demasiados registros invalidos: {invalid_count}/{len(records)}"
)
return {
"valid_records": valid,
"invalid_count": invalid_count,
"source": extracted["source"],
}
@task()
def transform(validated: dict) -> list:
"""Transformar registros validados."""
records = validated["valid_records"]
return [
{
"id": r["id"],
"value_normalized": r["value"] / 100.0,
"category": "high" if r["value"] > 75 else "low",
"source": validated["source"],
}
for r in records
]
@task()
def load(transformed: list) -> dict:
"""Cargar datos al warehouse. Retorna resumen."""
# Simular carga a base de datos
loaded_count = len(transformed)
print(f"Cargando {loaded_count} registros al warehouse...")
return {
"loaded_count": loaded_count,
"status": "success",
}
@task()
def notify(load_result: dict, source: str):
"""Notificar resultado del pipeline."""
print(
f"Pipeline completado: {load_result['loaded_count']} "
f"registros de '{source}' cargados exitosamente"
)
# ββ Encadenar tareas (XCom implicito via argumentos) ββ
extracted = extract(source="production")
validated = validate(extracted)
transformed = transform(validated)
load_result = load(transformed)
notify(load_result, source="production")
# Instanciar el DAG
taskflow_etl() Ventajas de TaskFlow API
- Menos boilerplate β no necesitas PythonOperator ni xcom_push/pull
- Tipado implicito β los argumentos de funcion son los XComs
- Lectura natural β el DAG se lee como codigo Python normal
- Compatibilidad β se puede mezclar con operadores clasicos
Buenas practicas con XComs
| Practica | Recomendacion | Motivo |
|---|---|---|
| Tamano maximo | Menos de 48 KB por XCom | Se almacena en la metadata DB; valores grandes degradan rendimiento |
| Datos grandes | Guardar en S3/GCS, pasar solo la ruta via XCom | Evita saturar la base de datos de metadatos |
| SerializaciΓ³n | Usar dict/list nativos, evitar objetos complejos | XCom serializa a JSON por defecto |
| Custom backend | Usar XCom backend de S3 o GCS para datos medianos | Transparente para el codigo, almacena en object storage |
| Limpieza | Configurar xcom_cleanup en el scheduler | Los XComs se acumulan y pueden llenar la DB |
# Ejemplo: pasar rutas en lugar de datos grandes
@task()
def extract_large_dataset() -> str:
"""Guardar datos en S3, retornar solo la ruta."""
import boto3
import pandas as pd
df = pd.read_sql("SELECT * FROM big_table", engine)
s3_path = f"s3://data-lake/staging/{datetime.now():%Y-%m-%d}/extract.parquet"
df.to_parquet(s3_path)
# Solo pasar la ruta como XCom (no los datos)
return s3_path
@task()
def transform_large(s3_path: str) -> str:
"""Leer de S3, transformar, guardar resultado."""
import pandas as pd
df = pd.read_parquet(s3_path)
df_clean = df.dropna().drop_duplicates()
output_path = s3_path.replace("extract.parquet", "transformed.parquet")
df_clean.to_parquet(output_path)
return output_path Generacion dinamica de DAGs
Para crear multiples DAGs similares (por cliente, region, fuente de datos), se pueden generar dinamicamente desde una configuracion externa.
# dags/dynamic_dag_generator.py
import yaml
from pathlib import Path
from airflow.decorators import dag, task
from datetime import datetime, timedelta
# ββ Cargar configuracion desde YAML ββ
CONFIG_PATH = Path(__file__).parent / "config" / "pipelines.yaml"
# config/pipelines.yaml:
# pipelines:
# - name: "client_acme"
# source_table: "acme.raw_events"
# schedule: "0 6 * * *"
# destination: "warehouse.acme_metrics"
# sla_hours: 2
#
# - name: "client_globex"
# source_table: "globex.raw_events"
# schedule: "0 8 * * *"
# destination: "warehouse.globex_metrics"
# sla_hours: 3
#
# - name: "client_initech"
# source_table: "initech.raw_events"
# schedule: "30 7 * * 1-5"
# destination: "warehouse.initech_metrics"
# sla_hours: 1
def create_pipeline(config: dict):
"""Factory function que genera un DAG por configuracion."""
@dag(
dag_id=f"etl_{config['name']}",
description=f"ETL pipeline para {config['name']}",
schedule=config["schedule"],
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(hours=config.get("sla_hours", 4)),
},
tags=["etl", "dynamic", config["name"]],
)
def pipeline():
@task()
def extract(source_table: str) -> str:
import pandas as pd
df = pd.read_sql(f"SELECT * FROM {source_table}", engine)
path = f"s3://lake/staging/{config['name']}/extract.parquet"
df.to_parquet(path)
return path
@task()
def transform(input_path: str) -> str:
import pandas as pd
df = pd.read_parquet(input_path)
df_agg = df.groupby("date").agg(
events=("event_id", "count"),
users=("user_id", "nunique"),
).reset_index()
output = input_path.replace("extract", "transformed")
df_agg.to_parquet(output)
return output
@task()
def load(input_path: str, destination: str):
import pandas as pd
df = pd.read_parquet(input_path)
df.to_sql(destination, engine, if_exists="replace")
# Encadenar
extracted = extract(config["source_table"])
transformed = transform(extracted)
load(transformed, config["destination"])
return pipeline()
# ββ Generar DAGs dinamicamente ββ
with open(CONFIG_PATH) as f:
all_configs = yaml.safe_load(f)
for pipeline_config in all_configs["pipelines"]:
# Cada llamada crea un DAG unico en el namespace global
globals()[f"etl_{pipeline_config['name']}"] = create_pipeline(pipeline_config) Precauciones con DAGs dinamicos
- Tiempo de parseo β el Scheduler parsea todos los DAGs periodicamente. Si generas
cientos de DAGs, asegurate de que el archivo se ejecuta rapido (no llamadas a APIs o DBs).
- Configuracion en archivos β carga la config desde YAML/JSON local, nunca desde
endpoints remotos durante el parseo.
- Nombre unico β cada
dag_iddebe ser unico. Usa el nombre del cliente/fuente como sufijo para evitar colisiones.