ea84a8e1f8
Temporal era overkill para nuestros pipelines de datos típicos. Cambios: - Eliminado docker-compose-temporal.yml y configuración - Removido Temporal de Homer dashboard - Actualizado README y CLAUDE.md sin referencias a Temporal - Añadida documentación completa de transformaciones con Dagu Dagu es suficiente porque: - Workflows terminan en minutos, no días - Transformaciones simples/medias (Python/SQL) - No necesitamos pausar/reanudar workflows - Menor overhead y más simple de mantener Si en el futuro necesitamos workflows de larga duración o state complejo, podemos volver a levantar Temporal. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
583 lines
16 KiB
Markdown
583 lines
16 KiB
Markdown
# Transformaciones con Dagu
|
|
|
|
Guía completa de cómo hacer transformaciones de datos con Dagu.
|
|
|
|
---
|
|
|
|
## ✅ Dagu PUEDE hacer transformaciones
|
|
|
|
Dagu ejecuta **cualquier script o comando**, por lo que puede hacer:
|
|
- ✅ Transformaciones SQL
|
|
- ✅ Transformaciones Python/Pandas
|
|
- ✅ Agregaciones y cálculos
|
|
- ✅ Limpieza de datos
|
|
- ✅ Enriquecimiento de datos
|
|
- ✅ Joins complejos
|
|
- ✅ Transformaciones en streaming
|
|
|
|
---
|
|
|
|
## 🎯 Patrón 1: Transformaciones Python/Pandas
|
|
|
|
### Ejemplo: Limpieza y agregación
|
|
|
|
```yaml
|
|
# ~/dagu/dags/transform_sales.yaml
|
|
name: transform_sales_data
|
|
schedule: "0 2 * * *" # Cada día a las 2 AM
|
|
|
|
steps:
|
|
# 1. Extract desde PostgreSQL
|
|
- name: extract
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
|
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
df = pd.read_sql('SELECT * FROM raw_sales WHERE date = CURRENT_DATE', engine)
|
|
df.to_parquet('/tmp/raw_sales.parquet')
|
|
EOF
|
|
|
|
# 2. Transform - Limpieza
|
|
- name: clean
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
|
|
df = pd.read_parquet('/tmp/raw_sales.parquet')
|
|
|
|
# Limpiar datos
|
|
df = df.dropna(subset=['customer_id', 'amount'])
|
|
df['amount'] = df['amount'].astype(float)
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
|
|
# Remover duplicados
|
|
df = df.drop_duplicates(subset=['transaction_id'])
|
|
|
|
df.to_parquet('/tmp/clean_sales.parquet')
|
|
print(f"Cleaned {len(df)} records")
|
|
EOF
|
|
depends: [extract]
|
|
|
|
# 3. Transform - Agregaciones
|
|
- name: aggregate
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
|
|
df = pd.read_parquet('/tmp/clean_sales.parquet')
|
|
|
|
# Agregación por cliente
|
|
customer_summary = df.groupby('customer_id').agg({
|
|
'amount': ['sum', 'mean', 'count'],
|
|
'date': 'max'
|
|
}).reset_index()
|
|
|
|
customer_summary.columns = ['customer_id', 'total_spent', 'avg_spent', 'num_purchases', 'last_purchase']
|
|
|
|
customer_summary.to_parquet('/tmp/customer_summary.parquet')
|
|
print(f"Aggregated {len(customer_summary)} customers")
|
|
EOF
|
|
depends: [clean]
|
|
|
|
# 4. Load a PostgreSQL
|
|
- name: load
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
|
|
df = pd.read_parquet('/tmp/customer_summary.parquet')
|
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
|
|
df.to_sql('customer_summary', engine, if_exists='replace', index=False)
|
|
print(f"Loaded {len(df)} records to customer_summary table")
|
|
EOF
|
|
depends: [aggregate]
|
|
|
|
# 5. Log lineage
|
|
- name: lineage
|
|
command: |
|
|
python ~/dagu/scripts/log_lineage.py \
|
|
--event COMPLETE \
|
|
--source postgres://raw_sales \
|
|
--target postgres://customer_summary \
|
|
--job transform_sales_data
|
|
depends: [load]
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Patrón 2: Transformaciones SQL (dbt-style)
|
|
|
|
### Ejemplo: Transformación incremental
|
|
|
|
```yaml
|
|
# ~/dagu/dags/transform_orders.yaml
|
|
name: transform_orders
|
|
schedule: "*/15 * * * *" # Cada 15 minutos
|
|
|
|
env:
|
|
- DB_URL: postgresql://postgres:postgres@localhost:5434/postgres
|
|
|
|
steps:
|
|
# 1. Staging - Raw to Clean
|
|
- name: stage_orders
|
|
command: |
|
|
psql $DB_URL <<SQL
|
|
-- Crear tabla staging si no existe
|
|
CREATE TABLE IF NOT EXISTS stg_orders (
|
|
order_id BIGINT PRIMARY KEY,
|
|
customer_id BIGINT,
|
|
amount DECIMAL(10,2),
|
|
status VARCHAR(50),
|
|
created_at TIMESTAMPTZ,
|
|
processed_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Insert incremental
|
|
INSERT INTO stg_orders (order_id, customer_id, amount, status, created_at)
|
|
SELECT
|
|
order_id,
|
|
customer_id,
|
|
amount::DECIMAL(10,2),
|
|
LOWER(TRIM(status)) as status,
|
|
created_at
|
|
FROM raw_orders
|
|
WHERE created_at > (SELECT COALESCE(MAX(created_at), '1970-01-01') FROM stg_orders)
|
|
ON CONFLICT (order_id) DO UPDATE SET
|
|
amount = EXCLUDED.amount,
|
|
status = EXCLUDED.status,
|
|
processed_at = NOW();
|
|
SQL
|
|
|
|
# 2. Transform - Calcular métricas
|
|
- name: calc_metrics
|
|
command: |
|
|
psql $DB_URL <<SQL
|
|
-- Tabla de métricas diarias
|
|
CREATE TABLE IF NOT EXISTS daily_metrics (
|
|
date DATE PRIMARY KEY,
|
|
total_orders INT,
|
|
total_revenue DECIMAL(12,2),
|
|
avg_order_value DECIMAL(10,2),
|
|
completed_orders INT,
|
|
cancelled_orders INT,
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Upsert métricas
|
|
INSERT INTO daily_metrics (date, total_orders, total_revenue, avg_order_value, completed_orders, cancelled_orders)
|
|
SELECT
|
|
DATE(created_at) as date,
|
|
COUNT(*) as total_orders,
|
|
SUM(amount) as total_revenue,
|
|
AVG(amount) as avg_order_value,
|
|
COUNT(*) FILTER (WHERE status = 'completed') as completed_orders,
|
|
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled_orders
|
|
FROM stg_orders
|
|
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
|
|
GROUP BY DATE(created_at)
|
|
ON CONFLICT (date) DO UPDATE SET
|
|
total_orders = EXCLUDED.total_orders,
|
|
total_revenue = EXCLUDED.total_revenue,
|
|
avg_order_value = EXCLUDED.avg_order_value,
|
|
completed_orders = EXCLUDED.completed_orders,
|
|
cancelled_orders = EXCLUDED.cancelled_orders,
|
|
updated_at = NOW();
|
|
SQL
|
|
depends: [stage_orders]
|
|
|
|
# 3. Transform - Snapshot histórico
|
|
- name: snapshot
|
|
command: |
|
|
psql $DB_URL <<SQL
|
|
-- Tabla de snapshots
|
|
CREATE TABLE IF NOT EXISTS order_snapshots (
|
|
snapshot_id SERIAL PRIMARY KEY,
|
|
order_id BIGINT,
|
|
status VARCHAR(50),
|
|
amount DECIMAL(10,2),
|
|
snapshot_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Insertar snapshot de órdenes en progreso
|
|
INSERT INTO order_snapshots (order_id, status, amount)
|
|
SELECT order_id, status, amount
|
|
FROM stg_orders
|
|
WHERE status IN ('pending', 'processing');
|
|
SQL
|
|
depends: [calc_metrics]
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Patrón 3: Transformación Multi-Tabla con Joins
|
|
|
|
### Ejemplo: Enriquecer datos con múltiples fuentes
|
|
|
|
```yaml
|
|
# ~/dagu/dags/enrich_customer_data.yaml
|
|
name: enrich_customer_data
|
|
schedule: "0 3 * * *"
|
|
|
|
steps:
|
|
# 1. Extract y combinar múltiples fuentes
|
|
- name: merge_sources
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
|
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
|
|
# Cargar múltiples tablas
|
|
customers = pd.read_sql('SELECT * FROM customers', engine)
|
|
orders = pd.read_sql('SELECT * FROM orders WHERE created_at >= CURRENT_DATE - 30', engine)
|
|
reviews = pd.read_sql('SELECT * FROM reviews', engine)
|
|
|
|
# Agregaciones de órdenes
|
|
order_stats = orders.groupby('customer_id').agg({
|
|
'order_id': 'count',
|
|
'amount': ['sum', 'mean'],
|
|
'created_at': 'max'
|
|
}).reset_index()
|
|
order_stats.columns = ['customer_id', 'total_orders', 'total_spent', 'avg_order', 'last_order']
|
|
|
|
# Agregaciones de reviews
|
|
review_stats = reviews.groupby('customer_id').agg({
|
|
'rating': 'mean',
|
|
'review_id': 'count'
|
|
}).reset_index()
|
|
review_stats.columns = ['customer_id', 'avg_rating', 'total_reviews']
|
|
|
|
# Merge todo
|
|
enriched = customers.merge(order_stats, on='customer_id', how='left')
|
|
enriched = enriched.merge(review_stats, on='customer_id', how='left')
|
|
|
|
# Calcular segmento
|
|
enriched['segment'] = enriched.apply(lambda x:
|
|
'VIP' if x['total_spent'] > 1000 else
|
|
'Regular' if x['total_spent'] > 100 else
|
|
'New', axis=1
|
|
)
|
|
|
|
enriched.to_parquet('/tmp/enriched_customers.parquet')
|
|
EOF
|
|
|
|
# 2. Load enriquecido
|
|
- name: load_enriched
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
|
|
df = pd.read_parquet('/tmp/enriched_customers.parquet')
|
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
|
|
df.to_sql('enriched_customers', engine, if_exists='replace', index=False)
|
|
EOF
|
|
depends: [merge_sources]
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Patrón 4: Transformación Incremental (Solo cambios)
|
|
|
|
### Ejemplo: CDC (Change Data Capture) simplificado
|
|
|
|
```yaml
|
|
# ~/dagu/dags/incremental_transform.yaml
|
|
name: incremental_transform
|
|
schedule: "*/5 * * * *" # Cada 5 minutos
|
|
|
|
steps:
|
|
# 1. Identificar cambios
|
|
- name: detect_changes
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
|
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
|
|
# Última marca de agua
|
|
last_sync = pd.read_sql(
|
|
"SELECT MAX(updated_at) as last_sync FROM transformed_data",
|
|
engine
|
|
).iloc[0]['last_sync']
|
|
|
|
# Solo registros nuevos/modificados
|
|
new_data = pd.read_sql(f"""
|
|
SELECT * FROM raw_data
|
|
WHERE updated_at > '{last_sync}'
|
|
""", engine)
|
|
|
|
if len(new_data) > 0:
|
|
new_data.to_parquet('/tmp/new_data.parquet')
|
|
print(f"Found {len(new_data)} new/changed records")
|
|
else:
|
|
print("No changes detected")
|
|
exit(0)
|
|
EOF
|
|
|
|
# 2. Transformar solo cambios
|
|
- name: transform_changes
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
|
|
if not os.path.exists('/tmp/new_data.parquet'):
|
|
exit(0)
|
|
|
|
df = pd.read_parquet('/tmp/new_data.parquet')
|
|
|
|
# Aplicar transformaciones
|
|
df['normalized_value'] = df['value'] / df['value'].max()
|
|
df['category'] = df['type'].map({
|
|
'A': 'Category 1',
|
|
'B': 'Category 2',
|
|
'C': 'Category 3'
|
|
})
|
|
|
|
df.to_parquet('/tmp/transformed_changes.parquet')
|
|
EOF
|
|
depends: [detect_changes]
|
|
|
|
# 3. Upsert cambios
|
|
- name: upsert_changes
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
|
|
if not os.path.exists('/tmp/transformed_changes.parquet'):
|
|
exit(0)
|
|
|
|
df = pd.read_parquet('/tmp/transformed_changes.parquet')
|
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
|
|
# Usar ON CONFLICT para upsert
|
|
for _, row in df.iterrows():
|
|
engine.execute(f"""
|
|
INSERT INTO transformed_data (id, value, category, updated_at)
|
|
VALUES ({row['id']}, {row['normalized_value']}, '{row['category']}', NOW())
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
value = EXCLUDED.value,
|
|
category = EXCLUDED.category,
|
|
updated_at = NOW()
|
|
""")
|
|
|
|
print(f"Upserted {len(df)} records")
|
|
EOF
|
|
depends: [transform_changes]
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Patrón 5: Transformación con ClickHouse (Analítica)
|
|
|
|
### Ejemplo: Agregaciones pesadas
|
|
|
|
```yaml
|
|
# ~/dagu/dags/analytics_clickhouse.yaml
|
|
name: analytics_transform
|
|
schedule: "0 4 * * *"
|
|
|
|
steps:
|
|
# 1. Transformar y cargar a ClickHouse
|
|
- name: load_to_clickhouse
|
|
command: |
|
|
python <<EOF
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from clickhouse_driver import Client
|
|
|
|
# Extract de PostgreSQL
|
|
pg = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
|
df = pd.read_sql('SELECT * FROM events WHERE date = CURRENT_DATE', pg)
|
|
|
|
# Transform
|
|
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
|
|
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
|
|
|
|
# Load a ClickHouse
|
|
ch = Client('localhost', port=9000)
|
|
|
|
ch.execute('''
|
|
CREATE TABLE IF NOT EXISTS events_analytics (
|
|
event_id UInt64,
|
|
user_id UInt64,
|
|
event_type String,
|
|
timestamp DateTime,
|
|
hour UInt8,
|
|
day_of_week UInt8,
|
|
value Float64
|
|
) ENGINE = MergeTree()
|
|
ORDER BY (event_type, timestamp)
|
|
''')
|
|
|
|
# Insert
|
|
ch.execute(
|
|
'INSERT INTO events_analytics VALUES',
|
|
df.to_dict('records')
|
|
)
|
|
EOF
|
|
|
|
# 2. Agregación en ClickHouse (super rápido)
|
|
- name: aggregate
|
|
command: |
|
|
clickhouse-client --query "
|
|
CREATE TABLE IF NOT EXISTS hourly_stats
|
|
ENGINE = MergeTree()
|
|
ORDER BY (event_type, hour)
|
|
AS SELECT
|
|
event_type,
|
|
hour,
|
|
day_of_week,
|
|
COUNT(*) as event_count,
|
|
AVG(value) as avg_value,
|
|
SUM(value) as total_value
|
|
FROM events_analytics
|
|
WHERE timestamp >= today()
|
|
GROUP BY event_type, hour, day_of_week
|
|
"
|
|
depends: [load_to_clickhouse]
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Patrón 6: Transformación con Dependencias Complejas
|
|
|
|
### Ejemplo: DAG con múltiples transformaciones en paralelo
|
|
|
|
```yaml
|
|
# ~/dagu/dags/complex_transform.yaml
|
|
name: complex_multi_transform
|
|
schedule: "0 1 * * *"
|
|
|
|
steps:
|
|
# Paso inicial - Extracción
|
|
- name: extract
|
|
command: python ~/dagu/scripts/extract_data.py
|
|
|
|
# Transformaciones en paralelo
|
|
- name: transform_customers
|
|
command: python ~/dagu/scripts/transform_customers.py
|
|
depends: [extract]
|
|
|
|
- name: transform_products
|
|
command: python ~/dagu/scripts/transform_products.py
|
|
depends: [extract]
|
|
|
|
- name: transform_orders
|
|
command: python ~/dagu/scripts/transform_orders.py
|
|
depends: [extract]
|
|
|
|
# Join todo
|
|
- name: join_all
|
|
command: python ~/dagu/scripts/join_datasets.py
|
|
depends: [transform_customers, transform_products, transform_orders]
|
|
|
|
# Calcular métricas finales
|
|
- name: calc_metrics
|
|
command: python ~/dagu/scripts/calculate_metrics.py
|
|
depends: [join_all]
|
|
|
|
# Cargar a destinos
|
|
- name: load_postgres
|
|
command: python ~/dagu/scripts/load_postgres.py
|
|
depends: [calc_metrics]
|
|
|
|
- name: load_clickhouse
|
|
command: python ~/dagu/scripts/load_clickhouse.py
|
|
depends: [calc_metrics]
|
|
```
|
|
|
|
---
|
|
|
|
## 💡 Buenas Prácticas
|
|
|
|
### 1. Usa archivos intermedios
|
|
```bash
|
|
/tmp/raw_data.parquet
|
|
/tmp/clean_data.parquet
|
|
/tmp/transformed_data.parquet
|
|
```
|
|
|
|
### 2. Validaciones entre pasos
|
|
```python
|
|
# Validar antes de continuar
|
|
assert len(df) > 0, "No data to process"
|
|
assert df['amount'].sum() > 0, "Invalid amounts"
|
|
```
|
|
|
|
### 3. Logs estructurados
|
|
```python
|
|
import logging
|
|
logging.info(f"Processed {len(df)} records in {elapsed:.2f}s")
|
|
```
|
|
|
|
### 4. Idempotencia
|
|
```sql
|
|
-- Usar UPSERT en lugar de INSERT
|
|
INSERT ... ON CONFLICT DO UPDATE
|
|
```
|
|
|
|
### 5. Cleanup
|
|
```yaml
|
|
steps:
|
|
# ... tus pasos
|
|
|
|
- name: cleanup
|
|
command: rm -f /tmp/*.parquet
|
|
continueOn:
|
|
failure: true
|
|
```
|
|
|
|
---
|
|
|
|
## 🆚 Dagu vs dbt
|
|
|
|
| Feature | Dagu | dbt |
|
|
|---------|------|-----|
|
|
| SQL transforms | ✅ Sí | ✅ Sí (mejor) |
|
|
| Python transforms | ✅ Sí (mejor) | ⚠️ Limitado |
|
|
| Scheduling | ✅ Built-in | ❌ Externo |
|
|
| Lineage | ⚠️ Manual | ✅ Automático |
|
|
| Testing | ⚠️ Manual | ✅ Built-in |
|
|
| Docs | ⚠️ Manual | ✅ Automático |
|
|
|
|
**Recomendación**:
|
|
- Usa **Dagu** para pipelines end-to-end
|
|
- Considera **dbt** si haces mucho SQL y quieres lineage automático
|
|
|
|
---
|
|
|
|
## 🎯 Resumen
|
|
|
|
**Dagu PUEDE hacer transformaciones:**
|
|
- ✅ Python/Pandas (limpieza, agregaciones)
|
|
- ✅ SQL (staging, métricas, joins)
|
|
- ✅ Transformaciones incrementales
|
|
- ✅ Multi-tabla con joins complejos
|
|
- ✅ Paralelo (múltiples transforms a la vez)
|
|
- ✅ ClickHouse (analítica pesada)
|
|
|
|
**NO necesitas Temporal para:**
|
|
- ❌ Transformaciones simples/medias
|
|
- ❌ ETL típico (Extract → Transform → Load)
|
|
- ❌ Pipelines que terminan en < 1 hora
|
|
- ❌ Agregaciones SQL o Pandas
|
|
|
|
**SÍ necesitas Temporal solo si:**
|
|
- ✅ Transformación tarda > 1 hora
|
|
- ✅ Necesitas pausar/reanudar
|
|
- ✅ State machine muy complejo
|
|
- ✅ Compensaciones distribuidas
|
|
|
|
---
|
|
|
|
**Última actualización**: 2026-03-23
|