Files
automatic-process/TRANSFORMATIONS.md
egutierrez ea84a8e1f8 refactor: remove Temporal in favor of Dagu for transformations
Temporal era overkill para nuestros pipelines de datos típicos.

Cambios:
- Eliminado docker-compose-temporal.yml y configuración
- Removido Temporal de Homer dashboard
- Actualizado README y CLAUDE.md sin referencias a Temporal
- Añadida documentación completa de transformaciones con Dagu

Dagu es suficiente porque:
- Workflows terminan en minutos, no días
- Transformaciones simples/medias (Python/SQL)
- No necesitamos pausar/reanudar workflows
- Menor overhead y más simple de mantener

Si en el futuro necesitamos workflows de larga duración o state complejo,
podemos volver a levantar Temporal.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-23 22:58:53 +01:00

16 KiB

Transformaciones con Dagu

Guía completa de cómo hacer transformaciones de datos con Dagu.


Dagu PUEDE hacer transformaciones

Dagu ejecuta cualquier script o comando, por lo que puede hacer:

  • Transformaciones SQL
  • Transformaciones Python/Pandas
  • Agregaciones y cálculos
  • Limpieza de datos
  • Enriquecimiento de datos
  • Joins complejos
  • Transformaciones en streaming

🎯 Patrón 1: Transformaciones Python/Pandas

Ejemplo: Limpieza y agregación

# ~/dagu/dags/transform_sales.yaml
name: transform_sales_data
schedule: "0 2 * * *"  # Cada día a las 2 AM

steps:
  # 1. Extract desde PostgreSQL
  - name: extract
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine

      engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
      df = pd.read_sql('SELECT * FROM raw_sales WHERE date = CURRENT_DATE', engine)
      df.to_parquet('/tmp/raw_sales.parquet')
      EOF

  # 2. Transform - Limpieza
  - name: clean
    command: |
      python <<EOF
      import pandas as pd

      df = pd.read_parquet('/tmp/raw_sales.parquet')

      # Limpiar datos
      df = df.dropna(subset=['customer_id', 'amount'])
      df['amount'] = df['amount'].astype(float)
      df['date'] = pd.to_datetime(df['date'])

      # Remover duplicados
      df = df.drop_duplicates(subset=['transaction_id'])

      df.to_parquet('/tmp/clean_sales.parquet')
      print(f"Cleaned {len(df)} records")
      EOF
    depends: [extract]

  # 3. Transform - Agregaciones
  - name: aggregate
    command: |
      python <<EOF
      import pandas as pd

      df = pd.read_parquet('/tmp/clean_sales.parquet')

      # Agregación por cliente
      customer_summary = df.groupby('customer_id').agg({
          'amount': ['sum', 'mean', 'count'],
          'date': 'max'
      }).reset_index()

      customer_summary.columns = ['customer_id', 'total_spent', 'avg_spent', 'num_purchases', 'last_purchase']

      customer_summary.to_parquet('/tmp/customer_summary.parquet')
      print(f"Aggregated {len(customer_summary)} customers")
      EOF
    depends: [clean]

  # 4. Load a PostgreSQL
  - name: load
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine

      df = pd.read_parquet('/tmp/customer_summary.parquet')
      engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')

      df.to_sql('customer_summary', engine, if_exists='replace', index=False)
      print(f"Loaded {len(df)} records to customer_summary table")
      EOF
    depends: [aggregate]

  # 5. Log lineage
  - name: lineage
    command: |
      python ~/dagu/scripts/log_lineage.py \
        --event COMPLETE \
        --source postgres://raw_sales \
        --target postgres://customer_summary \
        --job transform_sales_data
    depends: [load]

🎯 Patrón 2: Transformaciones SQL (dbt-style)

Ejemplo: Transformación incremental

# ~/dagu/dags/transform_orders.yaml
name: transform_orders
schedule: "*/15 * * * *"  # Cada 15 minutos

env:
  - DB_URL: postgresql://postgres:postgres@localhost:5434/postgres

steps:
  # 1. Staging - Raw to Clean
  - name: stage_orders
    command: |
      psql $DB_URL <<SQL
      -- Crear tabla staging si no existe
      CREATE TABLE IF NOT EXISTS stg_orders (
        order_id BIGINT PRIMARY KEY,
        customer_id BIGINT,
        amount DECIMAL(10,2),
        status VARCHAR(50),
        created_at TIMESTAMPTZ,
        processed_at TIMESTAMPTZ DEFAULT NOW()
      );

      -- Insert incremental
      INSERT INTO stg_orders (order_id, customer_id, amount, status, created_at)
      SELECT
        order_id,
        customer_id,
        amount::DECIMAL(10,2),
        LOWER(TRIM(status)) as status,
        created_at
      FROM raw_orders
      WHERE created_at > (SELECT COALESCE(MAX(created_at), '1970-01-01') FROM stg_orders)
      ON CONFLICT (order_id) DO UPDATE SET
        amount = EXCLUDED.amount,
        status = EXCLUDED.status,
        processed_at = NOW();
      SQL

  # 2. Transform - Calcular métricas
  - name: calc_metrics
    command: |
      psql $DB_URL <<SQL
      -- Tabla de métricas diarias
      CREATE TABLE IF NOT EXISTS daily_metrics (
        date DATE PRIMARY KEY,
        total_orders INT,
        total_revenue DECIMAL(12,2),
        avg_order_value DECIMAL(10,2),
        completed_orders INT,
        cancelled_orders INT,
        updated_at TIMESTAMPTZ DEFAULT NOW()
      );

      -- Upsert métricas
      INSERT INTO daily_metrics (date, total_orders, total_revenue, avg_order_value, completed_orders, cancelled_orders)
      SELECT
        DATE(created_at) as date,
        COUNT(*) as total_orders,
        SUM(amount) as total_revenue,
        AVG(amount) as avg_order_value,
        COUNT(*) FILTER (WHERE status = 'completed') as completed_orders,
        COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled_orders
      FROM stg_orders
      WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
      GROUP BY DATE(created_at)
      ON CONFLICT (date) DO UPDATE SET
        total_orders = EXCLUDED.total_orders,
        total_revenue = EXCLUDED.total_revenue,
        avg_order_value = EXCLUDED.avg_order_value,
        completed_orders = EXCLUDED.completed_orders,
        cancelled_orders = EXCLUDED.cancelled_orders,
        updated_at = NOW();
      SQL
    depends: [stage_orders]

  # 3. Transform - Snapshot histórico
  - name: snapshot
    command: |
      psql $DB_URL <<SQL
      -- Tabla de snapshots
      CREATE TABLE IF NOT EXISTS order_snapshots (
        snapshot_id SERIAL PRIMARY KEY,
        order_id BIGINT,
        status VARCHAR(50),
        amount DECIMAL(10,2),
        snapshot_at TIMESTAMPTZ DEFAULT NOW()
      );

      -- Insertar snapshot de órdenes en progreso
      INSERT INTO order_snapshots (order_id, status, amount)
      SELECT order_id, status, amount
      FROM stg_orders
      WHERE status IN ('pending', 'processing');
      SQL
    depends: [calc_metrics]

🎯 Patrón 3: Transformación Multi-Tabla con Joins

Ejemplo: Enriquecer datos con múltiples fuentes

# ~/dagu/dags/enrich_customer_data.yaml
name: enrich_customer_data
schedule: "0 3 * * *"

steps:
  # 1. Extract y combinar múltiples fuentes
  - name: merge_sources
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine

      engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')

      # Cargar múltiples tablas
      customers = pd.read_sql('SELECT * FROM customers', engine)
      orders = pd.read_sql('SELECT * FROM orders WHERE created_at >= CURRENT_DATE - 30', engine)
      reviews = pd.read_sql('SELECT * FROM reviews', engine)

      # Agregaciones de órdenes
      order_stats = orders.groupby('customer_id').agg({
          'order_id': 'count',
          'amount': ['sum', 'mean'],
          'created_at': 'max'
      }).reset_index()
      order_stats.columns = ['customer_id', 'total_orders', 'total_spent', 'avg_order', 'last_order']

      # Agregaciones de reviews
      review_stats = reviews.groupby('customer_id').agg({
          'rating': 'mean',
          'review_id': 'count'
      }).reset_index()
      review_stats.columns = ['customer_id', 'avg_rating', 'total_reviews']

      # Merge todo
      enriched = customers.merge(order_stats, on='customer_id', how='left')
      enriched = enriched.merge(review_stats, on='customer_id', how='left')

      # Calcular segmento
      enriched['segment'] = enriched.apply(lambda x:
          'VIP' if x['total_spent'] > 1000 else
          'Regular' if x['total_spent'] > 100 else
          'New', axis=1
      )

      enriched.to_parquet('/tmp/enriched_customers.parquet')
      EOF

  # 2. Load enriquecido
  - name: load_enriched
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine

      df = pd.read_parquet('/tmp/enriched_customers.parquet')
      engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')

      df.to_sql('enriched_customers', engine, if_exists='replace', index=False)
      EOF
    depends: [merge_sources]

🎯 Patrón 4: Transformación Incremental (Solo cambios)

Ejemplo: CDC (Change Data Capture) simplificado

# ~/dagu/dags/incremental_transform.yaml
name: incremental_transform
schedule: "*/5 * * * *"  # Cada 5 minutos

steps:
  # 1. Identificar cambios
  - name: detect_changes
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine

      engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')

      # Última marca de agua
      last_sync = pd.read_sql(
          "SELECT MAX(updated_at) as last_sync FROM transformed_data",
          engine
      ).iloc[0]['last_sync']

      # Solo registros nuevos/modificados
      new_data = pd.read_sql(f"""
          SELECT * FROM raw_data
          WHERE updated_at > '{last_sync}'
      """, engine)

      if len(new_data) > 0:
          new_data.to_parquet('/tmp/new_data.parquet')
          print(f"Found {len(new_data)} new/changed records")
      else:
          print("No changes detected")
          exit(0)
      EOF

  # 2. Transformar solo cambios
  - name: transform_changes
    command: |
      python <<EOF
      import pandas as pd

      if not os.path.exists('/tmp/new_data.parquet'):
          exit(0)

      df = pd.read_parquet('/tmp/new_data.parquet')

      # Aplicar transformaciones
      df['normalized_value'] = df['value'] / df['value'].max()
      df['category'] = df['type'].map({
          'A': 'Category 1',
          'B': 'Category 2',
          'C': 'Category 3'
      })

      df.to_parquet('/tmp/transformed_changes.parquet')
      EOF
    depends: [detect_changes]

  # 3. Upsert cambios
  - name: upsert_changes
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine

      if not os.path.exists('/tmp/transformed_changes.parquet'):
          exit(0)

      df = pd.read_parquet('/tmp/transformed_changes.parquet')
      engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')

      # Usar ON CONFLICT para upsert
      for _, row in df.iterrows():
          engine.execute(f"""
              INSERT INTO transformed_data (id, value, category, updated_at)
              VALUES ({row['id']}, {row['normalized_value']}, '{row['category']}', NOW())
              ON CONFLICT (id) DO UPDATE SET
                value = EXCLUDED.value,
                category = EXCLUDED.category,
                updated_at = NOW()
          """)

      print(f"Upserted {len(df)} records")
      EOF
    depends: [transform_changes]

🎯 Patrón 5: Transformación con ClickHouse (Analítica)

Ejemplo: Agregaciones pesadas

# ~/dagu/dags/analytics_clickhouse.yaml
name: analytics_transform
schedule: "0 4 * * *"

steps:
  # 1. Transformar y cargar a ClickHouse
  - name: load_to_clickhouse
    command: |
      python <<EOF
      import pandas as pd
      from sqlalchemy import create_engine
      from clickhouse_driver import Client

      # Extract de PostgreSQL
      pg = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
      df = pd.read_sql('SELECT * FROM events WHERE date = CURRENT_DATE', pg)

      # Transform
      df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
      df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek

      # Load a ClickHouse
      ch = Client('localhost', port=9000)

      ch.execute('''
          CREATE TABLE IF NOT EXISTS events_analytics (
              event_id UInt64,
              user_id UInt64,
              event_type String,
              timestamp DateTime,
              hour UInt8,
              day_of_week UInt8,
              value Float64
          ) ENGINE = MergeTree()
          ORDER BY (event_type, timestamp)
      ''')

      # Insert
      ch.execute(
          'INSERT INTO events_analytics VALUES',
          df.to_dict('records')
      )
      EOF

  # 2. Agregación en ClickHouse (super rápido)
  - name: aggregate
    command: |
      clickhouse-client --query "
      CREATE TABLE IF NOT EXISTS hourly_stats
      ENGINE = MergeTree()
      ORDER BY (event_type, hour)
      AS SELECT
          event_type,
          hour,
          day_of_week,
          COUNT(*) as event_count,
          AVG(value) as avg_value,
          SUM(value) as total_value
      FROM events_analytics
      WHERE timestamp >= today()
      GROUP BY event_type, hour, day_of_week
      "
    depends: [load_to_clickhouse]

🎯 Patrón 6: Transformación con Dependencias Complejas

Ejemplo: DAG con múltiples transformaciones en paralelo

# ~/dagu/dags/complex_transform.yaml
name: complex_multi_transform
schedule: "0 1 * * *"

steps:
  # Paso inicial - Extracción
  - name: extract
    command: python ~/dagu/scripts/extract_data.py

  # Transformaciones en paralelo
  - name: transform_customers
    command: python ~/dagu/scripts/transform_customers.py
    depends: [extract]

  - name: transform_products
    command: python ~/dagu/scripts/transform_products.py
    depends: [extract]

  - name: transform_orders
    command: python ~/dagu/scripts/transform_orders.py
    depends: [extract]

  # Join todo
  - name: join_all
    command: python ~/dagu/scripts/join_datasets.py
    depends: [transform_customers, transform_products, transform_orders]

  # Calcular métricas finales
  - name: calc_metrics
    command: python ~/dagu/scripts/calculate_metrics.py
    depends: [join_all]

  # Cargar a destinos
  - name: load_postgres
    command: python ~/dagu/scripts/load_postgres.py
    depends: [calc_metrics]

  - name: load_clickhouse
    command: python ~/dagu/scripts/load_clickhouse.py
    depends: [calc_metrics]

💡 Buenas Prácticas

1. Usa archivos intermedios

/tmp/raw_data.parquet
/tmp/clean_data.parquet
/tmp/transformed_data.parquet

2. Validaciones entre pasos

# Validar antes de continuar
assert len(df) > 0, "No data to process"
assert df['amount'].sum() > 0, "Invalid amounts"

3. Logs estructurados

import logging
logging.info(f"Processed {len(df)} records in {elapsed:.2f}s")

4. Idempotencia

-- Usar UPSERT en lugar de INSERT
INSERT ... ON CONFLICT DO UPDATE

5. Cleanup

steps:
  # ... tus pasos

  - name: cleanup
    command: rm -f /tmp/*.parquet
    continueOn:
      failure: true

🆚 Dagu vs dbt

Feature Dagu dbt
SQL transforms Sí (mejor)
Python transforms Sí (mejor) ⚠️ Limitado
Scheduling Built-in Externo
Lineage ⚠️ Manual Automático
Testing ⚠️ Manual Built-in
Docs ⚠️ Manual Automático

Recomendación:

  • Usa Dagu para pipelines end-to-end
  • Considera dbt si haces mucho SQL y quieres lineage automático

🎯 Resumen

Dagu PUEDE hacer transformaciones:

  • Python/Pandas (limpieza, agregaciones)
  • SQL (staging, métricas, joins)
  • Transformaciones incrementales
  • Multi-tabla con joins complejos
  • Paralelo (múltiples transforms a la vez)
  • ClickHouse (analítica pesada)

NO necesitas Temporal para:

  • Transformaciones simples/medias
  • ETL típico (Extract → Transform → Load)
  • Pipelines que terminan en < 1 hora
  • Agregaciones SQL o Pandas

SÍ necesitas Temporal solo si:

  • Transformación tarda > 1 hora
  • Necesitas pausar/reanudar
  • State machine muy complejo
  • Compensaciones distribuidas

Última actualización: 2026-03-23