# Transformaciones con Dagu Guía completa de cómo hacer transformaciones de datos con Dagu. --- ## ✅ Dagu PUEDE hacer transformaciones Dagu ejecuta **cualquier script o comando**, por lo que puede hacer: - ✅ Transformaciones SQL - ✅ Transformaciones Python/Pandas - ✅ Agregaciones y cálculos - ✅ Limpieza de datos - ✅ Enriquecimiento de datos - ✅ Joins complejos - ✅ Transformaciones en streaming --- ## 🎯 Patrón 1: Transformaciones Python/Pandas ### Ejemplo: Limpieza y agregación ```yaml # ~/dagu/dags/transform_sales.yaml name: transform_sales_data schedule: "0 2 * * *" # Cada día a las 2 AM steps: # 1. Extract desde PostgreSQL - name: extract command: | python < (SELECT COALESCE(MAX(created_at), '1970-01-01') FROM stg_orders) ON CONFLICT (order_id) DO UPDATE SET amount = EXCLUDED.amount, status = EXCLUDED.status, processed_at = NOW(); SQL # 2. Transform - Calcular métricas - name: calc_metrics command: | psql $DB_URL <= CURRENT_DATE - INTERVAL '7 days' GROUP BY DATE(created_at) ON CONFLICT (date) DO UPDATE SET total_orders = EXCLUDED.total_orders, total_revenue = EXCLUDED.total_revenue, avg_order_value = EXCLUDED.avg_order_value, completed_orders = EXCLUDED.completed_orders, cancelled_orders = EXCLUDED.cancelled_orders, updated_at = NOW(); SQL depends: [stage_orders] # 3. Transform - Snapshot histórico - name: snapshot command: | psql $DB_URL <= CURRENT_DATE - 30', engine) reviews = pd.read_sql('SELECT * FROM reviews', engine) # Agregaciones de órdenes order_stats = orders.groupby('customer_id').agg({ 'order_id': 'count', 'amount': ['sum', 'mean'], 'created_at': 'max' }).reset_index() order_stats.columns = ['customer_id', 'total_orders', 'total_spent', 'avg_order', 'last_order'] # Agregaciones de reviews review_stats = reviews.groupby('customer_id').agg({ 'rating': 'mean', 'review_id': 'count' }).reset_index() review_stats.columns = ['customer_id', 'avg_rating', 'total_reviews'] # Merge todo enriched = customers.merge(order_stats, on='customer_id', how='left') enriched = enriched.merge(review_stats, on='customer_id', how='left') # Calcular segmento enriched['segment'] = enriched.apply(lambda x: 'VIP' if x['total_spent'] > 1000 else 'Regular' if x['total_spent'] > 100 else 'New', axis=1 ) enriched.to_parquet('/tmp/enriched_customers.parquet') EOF # 2. Load enriquecido - name: load_enriched command: | python < '{last_sync}' """, engine) if len(new_data) > 0: new_data.to_parquet('/tmp/new_data.parquet') print(f"Found {len(new_data)} new/changed records") else: print("No changes detected") exit(0) EOF # 2. Transformar solo cambios - name: transform_changes command: | python <= today() GROUP BY event_type, hour, day_of_week " depends: [load_to_clickhouse] ``` --- ## 🎯 Patrón 6: Transformación con Dependencias Complejas ### Ejemplo: DAG con múltiples transformaciones en paralelo ```yaml # ~/dagu/dags/complex_transform.yaml name: complex_multi_transform schedule: "0 1 * * *" steps: # Paso inicial - Extracción - name: extract command: python ~/dagu/scripts/extract_data.py # Transformaciones en paralelo - name: transform_customers command: python ~/dagu/scripts/transform_customers.py depends: [extract] - name: transform_products command: python ~/dagu/scripts/transform_products.py depends: [extract] - name: transform_orders command: python ~/dagu/scripts/transform_orders.py depends: [extract] # Join todo - name: join_all command: python ~/dagu/scripts/join_datasets.py depends: [transform_customers, transform_products, transform_orders] # Calcular métricas finales - name: calc_metrics command: python ~/dagu/scripts/calculate_metrics.py depends: [join_all] # Cargar a destinos - name: load_postgres command: python ~/dagu/scripts/load_postgres.py depends: [calc_metrics] - name: load_clickhouse command: python ~/dagu/scripts/load_clickhouse.py depends: [calc_metrics] ``` --- ## 💡 Buenas Prácticas ### 1. Usa archivos intermedios ```bash /tmp/raw_data.parquet /tmp/clean_data.parquet /tmp/transformed_data.parquet ``` ### 2. Validaciones entre pasos ```python # Validar antes de continuar assert len(df) > 0, "No data to process" assert df['amount'].sum() > 0, "Invalid amounts" ``` ### 3. Logs estructurados ```python import logging logging.info(f"Processed {len(df)} records in {elapsed:.2f}s") ``` ### 4. Idempotencia ```sql -- Usar UPSERT en lugar de INSERT INSERT ... ON CONFLICT DO UPDATE ``` ### 5. Cleanup ```yaml steps: # ... tus pasos - name: cleanup command: rm -f /tmp/*.parquet continueOn: failure: true ``` --- ## 🆚 Dagu vs dbt | Feature | Dagu | dbt | |---------|------|-----| | SQL transforms | ✅ Sí | ✅ Sí (mejor) | | Python transforms | ✅ Sí (mejor) | ⚠️ Limitado | | Scheduling | ✅ Built-in | ❌ Externo | | Lineage | ⚠️ Manual | ✅ Automático | | Testing | ⚠️ Manual | ✅ Built-in | | Docs | ⚠️ Manual | ✅ Automático | **Recomendación**: - Usa **Dagu** para pipelines end-to-end - Considera **dbt** si haces mucho SQL y quieres lineage automático --- ## 🎯 Resumen **Dagu PUEDE hacer transformaciones:** - ✅ Python/Pandas (limpieza, agregaciones) - ✅ SQL (staging, métricas, joins) - ✅ Transformaciones incrementales - ✅ Multi-tabla con joins complejos - ✅ Paralelo (múltiples transforms a la vez) - ✅ ClickHouse (analítica pesada) **NO necesitas Temporal para:** - ❌ Transformaciones simples/medias - ❌ ETL típico (Extract → Transform → Load) - ❌ Pipelines que terminan en < 1 hora - ❌ Agregaciones SQL o Pandas **SÍ necesitas Temporal solo si:** - ✅ Transformación tarda > 1 hora - ✅ Necesitas pausar/reanudar - ✅ State machine muy complejo - ✅ Compensaciones distribuidas --- **Última actualización**: 2026-03-23