Docs / DevOps / Apache Airflow

Apache Airflow

GuΓ­a de orquestaciΓ³n de workflows con Apache Airflow. Cubre la arquitectura del sistema, definiciΓ³n de DAGs, operadores, conexiones, despliegue en contenedores y patrones de producciΓ³n.

Arquitectura

Airflow es un orquestador de workflows programΓ‘ticos que define pipelines como cΓ³digo Python (DAGs). Sus componentes principales son el Scheduler, Webserver, Workers y la base de datos de metadatos.

text
Arquitectura de Apache Airflow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Airflow Cluster                          β”‚
β”‚                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Webserver   β”‚    β”‚  Scheduler   β”‚    β”‚  Triggerer   β”‚  β”‚
β”‚  β”‚              β”‚    β”‚              β”‚    β”‚              β”‚  β”‚
β”‚  β”‚  UI Dashboardβ”‚    β”‚  Parsea DAGs β”‚    β”‚  Deferrable  β”‚  β”‚
β”‚  β”‚  REST API    β”‚    β”‚  Asigna tasksβ”‚    β”‚  Operators   β”‚  β”‚
β”‚  β”‚  Auth/RBAC   β”‚    β”‚  Heartbeats  β”‚    β”‚              β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚         β”‚                   β”‚                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚              Metadata Database (PostgreSQL)             β”‚  β”‚
β”‚  β”‚  DAG runs Β· Task instances Β· Variables Β· Connections  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                          β”‚                                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚              Executor                                  β”‚  β”‚
β”‚  β”‚                                                       β”‚  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚  β”‚
β”‚  β”‚  β”‚ Worker 1 β”‚  β”‚ Worker 2 β”‚  β”‚ Worker N β”‚            β”‚  β”‚
β”‚  β”‚  β”‚ (Celery) β”‚  β”‚ (Celery) β”‚  β”‚ (K8s Pod)β”‚            β”‚  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
ExecutorDescripciΓ³nCaso de uso
LocalExecutorEjecuta tasks en subprocesos localesDesarrollo, cargas pequeΓ±as
CeleryExecutorDistribuye tasks via Celery + Redis/RabbitMQProducciΓ³n con carga media
KubernetesExecutorCada task es un Pod de KubernetesProducciΓ³n, aislamiento total

DAGs & Tasks

Un DAG (Directed Acyclic Graph) define un workflow como un grafo de tareas con dependencias. Se escribe en Python puro.

python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

# ── ConfiguraciΓ³n del DAG ──
default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["alerts@company.com"],
}

with DAG(
    dag_id="etl_daily_pipeline",
    description="Pipeline ETL diario: extrae, transforma y carga datos",
    schedule="0 6 * * *",           # Cada dΓ­a a las 6:00 UTC
    start_date=datetime(2024, 1, 1),
    catchup=False,                   # No ejecutar runs pasados
    default_args=default_args,
    tags=["etl", "production"],
) as dag:

    # ── Extract ──
    extract = PythonOperator(
        task_id="extract_data",
        python_callable=extract_from_api,
        op_kwargs={"source": "production_db"},
    )

    # ── Transform (grupo de tareas paralelas) ──
    with TaskGroup(group_id="transform") as transform_group:
        clean = PythonOperator(
            task_id="clean_data",
            python_callable=clean_records,
        )
        enrich = PythonOperator(
            task_id="enrich_data",
            python_callable=enrich_with_geo,
        )
        validate = PythonOperator(
            task_id="validate_schema",
            python_callable=validate_output,
        )
        [clean, enrich] >> validate

    # ── Load ──
    load = PythonOperator(
        task_id="load_to_warehouse",
        python_callable=load_to_bigquery,
    )

    # ── Notify ──
    notify = BashOperator(
        task_id="send_notification",
        bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text":"ETL completado"}\'',
    )

    # ── Dependencias ──
    extract >> transform_group >> load >> notify

Operators & Hooks

Los Operators encapsulan tipos de trabajo. Los Hooks gestionan conexiones a servicios externos. Airflow tiene operadores para casi cualquier servicio cloud.

python
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.external_task import ExternalTaskSensor

# ── DockerOperator: ejecutar contenedor ──
run_model = DockerOperator(
    task_id="train_model",
    image="ml-training:latest",
    command="python train.py --epochs 50",
    docker_url="unix:///var/run/docker.sock",
    network_mode="bridge",
    auto_remove=True,
    environment={
        "MLFLOW_TRACKING_URI": "http://mlflow:5000",
    },
)

# ── PostgresOperator: ejecutar SQL ──
create_table = PostgresOperator(
    task_id="create_staging_table",
    postgres_conn_id="warehouse_db",
    sql="""
    CREATE TABLE IF NOT EXISTS staging.daily_metrics (
        date DATE PRIMARY KEY,
        users_active INT,
        revenue DECIMAL(10,2),
        processed_at TIMESTAMP DEFAULT NOW()
    );
    """,
)

# ── Sensor: esperar a que otro DAG termine ──
wait_for_upstream = ExternalTaskSensor(
    task_id="wait_for_ingestion",
    external_dag_id="data_ingestion",
    external_task_id="final_check",
    timeout=3600,       # 1 hora mΓ‘ximo de espera
    mode="reschedule",  # Libera el worker mientras espera
)

Deferrable Operators

Usa Deferrable Operators (asyncio-based) para tareas que esperan eventos externos. Liberan el worker slot mientras esperan, permitiendo mayor concurrencia con menos recursos.

Despliegue con Docker

Airflow se despliega fΓ‘cilmente con Docker Compose, ideal para desarrollo y equipos pequeΓ±os. Para producciΓ³n, se recomienda Kubernetes con el Helm chart oficial.

yaml
# docker-compose.yml β€” Airflow con CeleryExecutor
x-airflow-common: &airflow-common
  image: apache/airflow:2.9.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__WEBSERVER__SECRET_KEY: 'change-me-in-production'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  depends_on:
    - postgres
    - redis

services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

  webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"

  scheduler:
    <<: *airflow-common
    command: scheduler

  worker:
    <<: *airflow-common
    command: celery worker
    deploy:
      replicas: 2

volumes:
  postgres-data:
bash
# Inicializar y arrancar
docker compose up airflow-init    # Crea DB + usuario admin
docker compose up -d              # Arrancar todos los servicios

# Acceder al UI
# http://localhost:8080 (user: airflow, pass: airflow)

# CLI ΓΊtil
docker compose exec webserver airflow dags list
docker compose exec webserver airflow tasks test etl_daily_pipeline extract_data 2024-01-01

Patrones de ProducciΓ³n

Patrones esenciales para operar Airflow en producciΓ³n: idempotencia, gestiΓ³n de secrets, alertas y testing de DAGs.

python
from airflow.models import Variable
from airflow.hooks.base import BaseHook

# ── Variables y Secrets (nunca hardcodear) ──
api_key = Variable.get("api_key", deserialize_json=False)
db_conn = BaseHook.get_connection("warehouse_db")

# ── DAG idempotente: usar execution_date ──
def extract_data(**context):
    # Siempre usar logical_date para idempotencia
    execution_date = context["logical_date"]
    data = api.fetch(
        start=execution_date,
        end=execution_date + timedelta(days=1),
    )
    # Guardar con particiΓ³n por fecha
    save_to_s3(
        data,
        key=f"raw/{execution_date.strftime('%Y-%m-%d')}/data.parquet",
    )

# ── Testing de DAGs ──
import pytest
from airflow.models import DagBag

def test_dag_integrity():
    """Verificar que todos los DAGs se parsean sin errores."""
    dag_bag = DagBag(include_examples=False)
    assert len(dag_bag.import_errors) == 0, (
        f"Errores de importaciΓ³n: {dag_bag.import_errors}"
    )

def test_dag_has_tags():
    """Todo DAG debe tener al menos un tag."""
    dag_bag = DagBag(include_examples=False)
    for dag_id, dag in dag_bag.dags.items():
        assert dag.tags, f"{dag_id} no tiene tags"

Idempotencia obligatoria

Todo DAG debe ser idempotente: ejecutar el mismo DAG run dos veces debe producir el mismo resultado. Usa logical_date para particionar datos y evita side effects no controlados.

XComs & Data Passing

Los XComs (Cross-Communications) permiten que las tareas de un DAG intercambien pequenos datos entre si. Son el mecanismo nativo de Airflow para pasar resultados de una tarea a la siguiente sin usar almacenamiento externo.

text
Flujo de XComs entre tareas

  Task A                    XCom Backend                  Task B
    β”‚                           β”‚                            β”‚
    β”‚  xcom_push(key, value)    β”‚                            β”‚
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”‚  Almacena en               β”‚
    β”‚                           β”‚  metadata DB               β”‚
    β”‚                           β”‚                            β”‚
    β”‚                           β”‚  xcom_pull(task_id, key)   β”‚
    β”‚                           │◀────────────────────────────
    β”‚                           β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”‚
    β”‚                           β”‚  Retorna value             β”‚
    β”‚                           β”‚                            β”‚

XCom push/pull clasico

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data(**context):
    """Extraer datos y enviarlos via XCom."""
    records = [
        {"id": 1, "name": "Alice", "score": 95},
        {"id": 2, "name": "Bob", "score": 87},
        {"id": 3, "name": "Carol", "score": 92},
    ]
    # Push explicito con clave personalizada
    context["ti"].xcom_push(key="extracted_records", value=records)
    context["ti"].xcom_push(key="record_count", value=len(records))

def transform_data(**context):
    """Leer datos de la tarea anterior y transformarlos."""
    ti = context["ti"]

    # Pull explicito por task_id y key
    records = ti.xcom_pull(task_ids="extract", key="extracted_records")
    count = ti.xcom_pull(task_ids="extract", key="record_count")

    print(f"Procesando {count} registros")

    # Transformar: filtrar scores > 90
    top_records = [r for r in records if r["score"] > 90]

    # Push del resultado transformado
    ti.xcom_push(key="top_records", value=top_records)

def load_data(**context):
    """Cargar los datos transformados."""
    top_records = context["ti"].xcom_pull(
        task_ids="transform",
        key="top_records",
    )
    for record in top_records:
        print(f"Cargando: {record['name']} (score: {record['score']})")

with DAG(
    dag_id="xcom_classic_example",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_data,
    )
    transform = PythonOperator(
        task_id="transform",
        python_callable=transform_data,
    )
    load = PythonOperator(
        task_id="load",
        python_callable=load_data,
    )

    extract >> transform >> load

TaskFlow API con @task decorator

La TaskFlow API (Airflow 2.0+) simplifica drasticamente el uso de XComs. Los valores de retorno de funciones decoradas con @task se pasan automaticamente como XComs, sin necesidad de push/pull explicito.

python
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import json

@dag(
    dag_id="taskflow_etl_pipeline",
    description="ETL con TaskFlow API β€” XCom implicito",
    schedule="0 8 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        "owner": "data-team",
        "retries": 2,
        "retry_delay": timedelta(minutes=3),
    },
    tags=["etl", "taskflow"],
)
def taskflow_etl():

    @task()
    def extract(source: str) -> dict:
        """Extraer datos de la fuente. El return se convierte en XCom."""
        import requests
        response = requests.get(f"https://api.example.com/{source}/data")
        data = response.json()
        return {
            "records": data["items"],
            "total": data["total"],
            "source": source,
        }

    @task()
    def validate(extracted: dict) -> dict:
        """Validar datos extraidos. Recibe XCom automaticamente."""
        records = extracted["records"]
        valid = [r for r in records if r.get("id") and r.get("value")]
        invalid_count = len(records) - len(valid)

        if invalid_count > len(records) * 0.1:
            raise ValueError(
                f"Demasiados registros invalidos: {invalid_count}/{len(records)}"
            )

        return {
            "valid_records": valid,
            "invalid_count": invalid_count,
            "source": extracted["source"],
        }

    @task()
    def transform(validated: dict) -> list:
        """Transformar registros validados."""
        records = validated["valid_records"]
        return [
            {
                "id": r["id"],
                "value_normalized": r["value"] / 100.0,
                "category": "high" if r["value"] > 75 else "low",
                "source": validated["source"],
            }
            for r in records
        ]

    @task()
    def load(transformed: list) -> dict:
        """Cargar datos al warehouse. Retorna resumen."""
        # Simular carga a base de datos
        loaded_count = len(transformed)
        print(f"Cargando {loaded_count} registros al warehouse...")

        return {
            "loaded_count": loaded_count,
            "status": "success",
        }

    @task()
    def notify(load_result: dict, source: str):
        """Notificar resultado del pipeline."""
        print(
            f"Pipeline completado: {load_result['loaded_count']} "
            f"registros de '{source}' cargados exitosamente"
        )

    # ── Encadenar tareas (XCom implicito via argumentos) ──
    extracted = extract(source="production")
    validated = validate(extracted)
    transformed = transform(validated)
    load_result = load(transformed)
    notify(load_result, source="production")

# Instanciar el DAG
taskflow_etl()

Ventajas de TaskFlow API

  1. Menos boilerplate β€” no necesitas PythonOperator ni xcom_push/pull
  2. Tipado implicito β€” los argumentos de funcion son los XComs
  3. Lectura natural β€” el DAG se lee como codigo Python normal
  4. Compatibilidad β€” se puede mezclar con operadores clasicos

Buenas practicas con XComs

PracticaRecomendacionMotivo
Tamano maximoMenos de 48 KB por XComSe almacena en la metadata DB; valores grandes degradan rendimiento
Datos grandesGuardar en S3/GCS, pasar solo la ruta via XComEvita saturar la base de datos de metadatos
SerializaciΓ³nUsar dict/list nativos, evitar objetos complejosXCom serializa a JSON por defecto
Custom backendUsar XCom backend de S3 o GCS para datos medianosTransparente para el codigo, almacena en object storage
LimpiezaConfigurar xcom_cleanup en el schedulerLos XComs se acumulan y pueden llenar la DB
python
# Ejemplo: pasar rutas en lugar de datos grandes
@task()
def extract_large_dataset() -> str:
    """Guardar datos en S3, retornar solo la ruta."""
    import boto3
    import pandas as pd

    df = pd.read_sql("SELECT * FROM big_table", engine)

    s3_path = f"s3://data-lake/staging/{datetime.now():%Y-%m-%d}/extract.parquet"
    df.to_parquet(s3_path)

    # Solo pasar la ruta como XCom (no los datos)
    return s3_path

@task()
def transform_large(s3_path: str) -> str:
    """Leer de S3, transformar, guardar resultado."""
    import pandas as pd

    df = pd.read_parquet(s3_path)
    df_clean = df.dropna().drop_duplicates()

    output_path = s3_path.replace("extract.parquet", "transformed.parquet")
    df_clean.to_parquet(output_path)
    return output_path

Generacion dinamica de DAGs

Para crear multiples DAGs similares (por cliente, region, fuente de datos), se pueden generar dinamicamente desde una configuracion externa.

python
# dags/dynamic_dag_generator.py
import yaml
from pathlib import Path
from airflow.decorators import dag, task
from datetime import datetime, timedelta

# ── Cargar configuracion desde YAML ──
CONFIG_PATH = Path(__file__).parent / "config" / "pipelines.yaml"

# config/pipelines.yaml:
# pipelines:
#   - name: "client_acme"
#     source_table: "acme.raw_events"
#     schedule: "0 6 * * *"
#     destination: "warehouse.acme_metrics"
#     sla_hours: 2
#
#   - name: "client_globex"
#     source_table: "globex.raw_events"
#     schedule: "0 8 * * *"
#     destination: "warehouse.globex_metrics"
#     sla_hours: 3
#
#   - name: "client_initech"
#     source_table: "initech.raw_events"
#     schedule: "30 7 * * 1-5"
#     destination: "warehouse.initech_metrics"
#     sla_hours: 1

def create_pipeline(config: dict):
    """Factory function que genera un DAG por configuracion."""

    @dag(
        dag_id=f"etl_{config['name']}",
        description=f"ETL pipeline para {config['name']}",
        schedule=config["schedule"],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        default_args={
            "owner": "data-team",
            "retries": 3,
            "retry_delay": timedelta(minutes=5),
            "sla": timedelta(hours=config.get("sla_hours", 4)),
        },
        tags=["etl", "dynamic", config["name"]],
    )
    def pipeline():

        @task()
        def extract(source_table: str) -> str:
            import pandas as pd
            df = pd.read_sql(f"SELECT * FROM {source_table}", engine)
            path = f"s3://lake/staging/{config['name']}/extract.parquet"
            df.to_parquet(path)
            return path

        @task()
        def transform(input_path: str) -> str:
            import pandas as pd
            df = pd.read_parquet(input_path)
            df_agg = df.groupby("date").agg(
                events=("event_id", "count"),
                users=("user_id", "nunique"),
            ).reset_index()
            output = input_path.replace("extract", "transformed")
            df_agg.to_parquet(output)
            return output

        @task()
        def load(input_path: str, destination: str):
            import pandas as pd
            df = pd.read_parquet(input_path)
            df.to_sql(destination, engine, if_exists="replace")

        # Encadenar
        extracted = extract(config["source_table"])
        transformed = transform(extracted)
        load(transformed, config["destination"])

    return pipeline()

# ── Generar DAGs dinamicamente ──
with open(CONFIG_PATH) as f:
    all_configs = yaml.safe_load(f)

for pipeline_config in all_configs["pipelines"]:
    # Cada llamada crea un DAG unico en el namespace global
    globals()[f"etl_{pipeline_config['name']}"] = create_pipeline(pipeline_config)

Precauciones con DAGs dinamicos

  1. Tiempo de parseo β€” el Scheduler parsea todos los DAGs periodicamente. Si generas cientos de DAGs, asegurate de que el archivo se ejecuta rapido (no llamadas a APIs o DBs).
  2. Configuracion en archivos β€” carga la config desde YAML/JSON local, nunca desde endpoints remotos durante el parseo.
  3. Nombre unico β€” cada dag_id debe ser unico. Usa el nombre del cliente/fuente como sufijo para evitar colisiones.
END OF DOCUMENT

ΒΏNecesitas mΓ‘s? Volver a la LibrerΓ­a →