refactor: remove Temporal in favor of Dagu for transformations
Temporal era overkill para nuestros pipelines de datos típicos. Cambios: - Eliminado docker-compose-temporal.yml y configuración - Removido Temporal de Homer dashboard - Actualizado README y CLAUDE.md sin referencias a Temporal - Añadida documentación completa de transformaciones con Dagu Dagu es suficiente porque: - Workflows terminan en minutos, no días - Transformaciones simples/medias (Python/SQL) - No necesitamos pausar/reanudar workflows - Menor overhead y más simple de mantener Si en el futuro necesitamos workflows de larga duración o state complejo, podemos volver a levantar Temporal. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,338 @@
|
|||||||
|
# CLAUDE.md - Guía de Manipulación de Servicios
|
||||||
|
|
||||||
|
## 🎯 Propósito
|
||||||
|
|
||||||
|
Este documento describe qué servicios puedo manipular directamente, cuáles requieren MCPs, y cómo interactuar con cada uno para construir pipelines de datos.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## ✅ Servicios que PUEDO Manipular Directamente
|
||||||
|
|
||||||
|
### 1. **Dagu** (Fácil - Acceso Total)
|
||||||
|
- ✅ **Capacidad**: Crear, modificar y eliminar workflows (DAGs)
|
||||||
|
- ✅ **Ubicación**: `~/dagu/dags/*.yaml`
|
||||||
|
- ✅ **Uso**: Scheduling, lanzamiento de scripts, orchestración básica
|
||||||
|
- **Ejemplo**:
|
||||||
|
```yaml
|
||||||
|
name: ingest_data
|
||||||
|
schedule: "0 * * * *" # Cada hora
|
||||||
|
steps:
|
||||||
|
- name: fetch
|
||||||
|
command: python ~/scripts/fetch_data.py
|
||||||
|
- name: publish
|
||||||
|
command: ~/scripts/publish_to_nats.sh
|
||||||
|
depends: [fetch]
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. **NATS JetStream** (Medio - API REST/CLI)
|
||||||
|
- ✅ **Capacidad**: Publicar mensajes, crear streams, suscripciones
|
||||||
|
- ⚠️ **Limitación**: Requiero usar `nats` CLI o scripts con la API
|
||||||
|
- ✅ **Uso**: Message broker, event streaming, pub/sub
|
||||||
|
- **Acceso**:
|
||||||
|
- Puerto 4222: Cliente NATS
|
||||||
|
- Puerto 8222: HTTP Monitoring API
|
||||||
|
- **Ejemplo (vía Dagu)**:
|
||||||
|
```bash
|
||||||
|
# Publicar a NATS
|
||||||
|
nats pub data.ingested "$(cat data.json)" --server=nats://localhost:4222
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. **Bases de Datos** (Fácil - SQL Directo)
|
||||||
|
- ✅ **PostgreSQL**: Puerto 5434
|
||||||
|
```bash
|
||||||
|
psql -h localhost -p 5434 -U postgres -d postgres -c "INSERT INTO..."
|
||||||
|
```
|
||||||
|
- ✅ **ClickHouse**: Puertos 8123 (HTTP), 9000 (Native)
|
||||||
|
```bash
|
||||||
|
curl -X POST 'http://localhost:8123/' -d "INSERT INTO table VALUES..."
|
||||||
|
```
|
||||||
|
- ✅ **Marquez DB**: Puerto 5433 (para metadata)
|
||||||
|
|
||||||
|
### 4. **Marquez (OpenLineage)** (Medio - API REST)
|
||||||
|
- ✅ **Capacidad**: Enviar eventos de lineage via API
|
||||||
|
- ✅ **Uso**: Rastrear origen/destino de datos en cada paso
|
||||||
|
- **Ejemplo**:
|
||||||
|
```bash
|
||||||
|
curl -X POST http://localhost:5000/api/v1/lineage \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d @lineage_event.json
|
||||||
|
```
|
||||||
|
|
||||||
|
### 5. **Logs (Prometheus/Loki)** (Medio - Pushgateway/API)
|
||||||
|
- ✅ **Prometheus**: Exportar métricas vía Pushgateway
|
||||||
|
- ✅ **Loki**: Enviar logs vía HTTP API
|
||||||
|
- ✅ **Uso**: Monitoreo, alertas, debugging
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## ❌ Servicios que NECESITAN MCP
|
||||||
|
|
||||||
|
### 1. **Grafana** (Dashboards/Datasources)
|
||||||
|
- ❌ **Problema**: Crear dashboards complejos requiere UI o API compleja
|
||||||
|
- 🔧 **Solución**: MCP de Grafana
|
||||||
|
- Crear datasources programáticamente
|
||||||
|
- Generar dashboards desde templates
|
||||||
|
- Configurar alertas
|
||||||
|
- **Sin MCP puedo**: Usar datasources existentes manualmente
|
||||||
|
|
||||||
|
### 2. **Metabase** (Queries/Dashboards)
|
||||||
|
- ❌ **Problema**: Crear questions/dashboards es vía UI
|
||||||
|
- 🔧 **Solución**: MCP de Metabase
|
||||||
|
- Crear queries SQL desde código
|
||||||
|
- Generar dashboards automáticamente
|
||||||
|
- Configurar filtros y parámetros
|
||||||
|
- **Sin MCP puedo**: Ejecutar queries manualmente en la UI
|
||||||
|
|
||||||
|
### 3. **Rill** (Dashboards Modernos)
|
||||||
|
- ❌ **Problema**: Configuración específica de modelos y dashboards
|
||||||
|
- 🔧 **Solución**: MCP de Rill o manipular archivos YAML
|
||||||
|
- **Sin MCP puedo**: Editar archivos en `~/rill-data/` si conozco la estructura
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🏗️ Arquitectura de Datos Propuesta
|
||||||
|
|
||||||
|
### Flujo Completo (SIEMPRE con Lineage)
|
||||||
|
|
||||||
|
```
|
||||||
|
┌──────────┐
|
||||||
|
│ DAGU │ ← Scheduling (cron, manual)
|
||||||
|
│ (Native) │
|
||||||
|
└────┬─────┘
|
||||||
|
│
|
||||||
|
├─→ [PASO 1: RECOLECCIÓN]
|
||||||
|
│ ├─→ Script Python/Bash
|
||||||
|
│ ├─→ API calls, scraping, etc.
|
||||||
|
│ └─→ 📝 Log a Marquez (source: API)
|
||||||
|
│
|
||||||
|
├─→ [PASO 2: VALIDACIÓN]
|
||||||
|
│ ├─→ Schema validation
|
||||||
|
│ ├─→ Data quality checks
|
||||||
|
│ └─→ 📝 Log a Marquez (transformation)
|
||||||
|
│
|
||||||
|
├─→ [PASO 3: PUBLICACIÓN A NATS]
|
||||||
|
│ ├─→ NATS JetStream (stream: raw_data)
|
||||||
|
│ ├─→ Formato: JSON events
|
||||||
|
│ └─→ 📝 Log a Marquez (target: NATS)
|
||||||
|
│
|
||||||
|
├─→ [PASO 4: CONSUMO E INGESTA]
|
||||||
|
│ ├─→ Consumer NATS → PostgreSQL
|
||||||
|
│ ├─→ Consumer NATS → ClickHouse
|
||||||
|
│ └─→ 📝 Log a Marquez (target: DB)
|
||||||
|
│
|
||||||
|
├─→ [PASO 5: TRANSFORMACIÓN (en Dagu)]
|
||||||
|
│ ├─→ Python/Pandas o SQL
|
||||||
|
│ ├─→ Agregaciones, cálculos
|
||||||
|
│ └─→ 📝 Log a Marquez (transformation)
|
||||||
|
│
|
||||||
|
└─→ [PASO 6: LOGS & MONITORING]
|
||||||
|
├─→ Prometheus: Métricas (éxito, fallos, tiempo)
|
||||||
|
├─→ Loki: Logs estructurados
|
||||||
|
└─→ Grafana: Dashboards en tiempo real
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📋 Template de DAG con Lineage
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
name: data_pipeline_template
|
||||||
|
description: Template para pipelines con lineage completo
|
||||||
|
|
||||||
|
tags:
|
||||||
|
- data-pipeline
|
||||||
|
- lineage
|
||||||
|
- production
|
||||||
|
|
||||||
|
env:
|
||||||
|
- MARQUEZ_URL: http://localhost:5000
|
||||||
|
- NATS_URL: nats://localhost:4222
|
||||||
|
- POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres
|
||||||
|
|
||||||
|
schedule:
|
||||||
|
- "0 */6 * * *" # Cada 6 horas
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. FETCH DATA
|
||||||
|
- name: fetch_data
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/fetch_data.py \
|
||||||
|
--output /tmp/raw_data.json \
|
||||||
|
--log-lineage
|
||||||
|
|
||||||
|
# 2. VALIDATE
|
||||||
|
- name: validate_data
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/validate.py \
|
||||||
|
--input /tmp/raw_data.json \
|
||||||
|
--log-lineage
|
||||||
|
depends: [fetch_data]
|
||||||
|
|
||||||
|
# 3. PUBLISH TO NATS
|
||||||
|
- name: publish_to_nats
|
||||||
|
command: |
|
||||||
|
nats pub data.raw \
|
||||||
|
"$(cat /tmp/raw_data.json)" \
|
||||||
|
--server=$NATS_URL
|
||||||
|
|
||||||
|
# Log lineage
|
||||||
|
python ~/dagu/scripts/log_lineage.py \
|
||||||
|
--event publish \
|
||||||
|
--source /tmp/raw_data.json \
|
||||||
|
--target nats://data.raw
|
||||||
|
depends: [validate_data]
|
||||||
|
|
||||||
|
# 4. INGEST TO POSTGRES
|
||||||
|
- name: ingest_postgres
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/ingest_postgres.py \
|
||||||
|
--nats-stream data.raw \
|
||||||
|
--table raw_events \
|
||||||
|
--log-lineage
|
||||||
|
depends: [publish_to_nats]
|
||||||
|
|
||||||
|
# 5. SEND METRICS
|
||||||
|
- name: log_metrics
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/push_metrics.py \
|
||||||
|
--job data_pipeline_template \
|
||||||
|
--success true
|
||||||
|
depends: [ingest_postgres]
|
||||||
|
|
||||||
|
handlers:
|
||||||
|
failure:
|
||||||
|
- name: alert_failure
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/push_metrics.py \
|
||||||
|
--job data_pipeline_template \
|
||||||
|
--success false
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Scripts Helper Necesarios
|
||||||
|
|
||||||
|
### 1. `~/dagu/scripts/log_lineage.py`
|
||||||
|
```python
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
def log_openlineage_event(event_type, source, target, job_name):
|
||||||
|
"""Envía evento OpenLineage a Marquez"""
|
||||||
|
event = {
|
||||||
|
"eventType": event_type, # START, COMPLETE, FAIL
|
||||||
|
"eventTime": datetime.utcnow().isoformat() + "Z",
|
||||||
|
"producer": "dagu://pipeline",
|
||||||
|
"job": {
|
||||||
|
"namespace": "automatic-process",
|
||||||
|
"name": job_name
|
||||||
|
},
|
||||||
|
"inputs": [{"namespace": "automatic-process", "name": source}],
|
||||||
|
"outputs": [{"namespace": "automatic-process", "name": target}]
|
||||||
|
}
|
||||||
|
|
||||||
|
requests.post(
|
||||||
|
"http://localhost:5000/api/v1/lineage",
|
||||||
|
json=event
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. `~/dagu/scripts/push_metrics.py`
|
||||||
|
```python
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
|
||||||
|
|
||||||
|
def push_metrics(job_name, success):
|
||||||
|
"""Push métricas a Prometheus Pushgateway"""
|
||||||
|
registry = CollectorRegistry()
|
||||||
|
g = Gauge('job_success', 'Job success status', registry=registry)
|
||||||
|
g.set(1 if success else 0)
|
||||||
|
|
||||||
|
push_to_gateway(
|
||||||
|
'localhost:9091',
|
||||||
|
job=job_name,
|
||||||
|
registry=registry
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. `~/dagu/scripts/publish_to_nats.sh`
|
||||||
|
```bash
|
||||||
|
#!/bin/bash
|
||||||
|
# Publicar a NATS JetStream
|
||||||
|
nats pub "$1" "$(cat $2)" --server=nats://localhost:4222
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🚀 Primeros Pasos
|
||||||
|
|
||||||
|
### 1. Instalar CLIs necesarios
|
||||||
|
```bash
|
||||||
|
# NATS CLI
|
||||||
|
curl -sf https://binaries.nats.dev/nats-io/natscli/nats@latest | sh
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Crear directorio de scripts
|
||||||
|
```bash
|
||||||
|
mkdir -p ~/dagu/scripts
|
||||||
|
chmod +x ~/dagu/scripts/*.{py,sh}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Configurar variables de entorno
|
||||||
|
```bash
|
||||||
|
# Añadir a ~/.bashrc
|
||||||
|
export MARQUEZ_URL=http://localhost:5000
|
||||||
|
export NATS_URL=nats://localhost:4222
|
||||||
|
export POSTGRES_URL=postgresql://postgres:postgres@localhost:5434/postgres
|
||||||
|
export CLICKHOUSE_URL=http://localhost:8123
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📊 MCPs Recomendados (Futuro)
|
||||||
|
|
||||||
|
### Prioridad Alta
|
||||||
|
1. **Grafana MCP** - Automatizar dashboards
|
||||||
|
2. **PostgreSQL MCP** - Queries complejas y migraciones
|
||||||
|
3. **ClickHouse MCP** - Queries analíticas
|
||||||
|
|
||||||
|
### Prioridad Media
|
||||||
|
4. **Metabase MCP** - BI self-service
|
||||||
|
|
||||||
|
### Prioridad Baja
|
||||||
|
5. **Rill MCP** - Dashboards modernos
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📝 Checklist para Cada Pipeline
|
||||||
|
|
||||||
|
Cuando crees un pipeline, SIEMPRE:
|
||||||
|
|
||||||
|
- [ ] Define el schedule en Dagu
|
||||||
|
- [ ] Log inicio en Marquez (START event)
|
||||||
|
- [ ] Valida datos antes de procesar
|
||||||
|
- [ ] Publica a NATS para desacoplar
|
||||||
|
- [ ] Log cada transformación en Marquez
|
||||||
|
- [ ] Ingesta a bases de datos
|
||||||
|
- [ ] Log fin en Marquez (COMPLETE event)
|
||||||
|
- [ ] Push métricas a Prometheus
|
||||||
|
- [ ] Envía logs estructurados a Loki
|
||||||
|
- [ ] Maneja errores (FAIL event a Marquez)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔗 URLs de Servicios
|
||||||
|
|
||||||
|
- **Dagu**: http://localhost:8090
|
||||||
|
- **NATS Monitoring**: http://localhost:8222
|
||||||
|
- **Marquez**: http://localhost:3001
|
||||||
|
- **Grafana**: http://localhost:3500
|
||||||
|
- **Prometheus**: http://localhost:9090
|
||||||
|
- **DBGate**: http://localhost:3300
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Última actualización**: 2026-03-23
|
||||||
|
**Mantenedor**: Claude (Assistant)
|
||||||
@@ -0,0 +1,523 @@
|
|||||||
|
# Automatic Process - Suite Completa de Datos
|
||||||
|
|
||||||
|
Plataforma completa de ingesta, procesamiento y visualización de datos con lineage tracking automático.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Arquitectura
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────────────────────────────────────────────────────┐
|
||||||
|
│ DATA PIPELINE STACK │
|
||||||
|
└─────────────────────────────────────────────────────────────────┘
|
||||||
|
|
||||||
|
📅 SCHEDULING 🔄 MESSAGING 💾 STORAGE
|
||||||
|
┌──────────┐ ┌──────────┐ ┌──────────┐
|
||||||
|
│ Dagu │────────→ │ NATS │────────→ │PostgreSQL│
|
||||||
|
│ (Native) │ │JetStream │ │ClickHouse│
|
||||||
|
└────┬─────┘ └──────────┘ └──────────┘
|
||||||
|
│ │
|
||||||
|
│ ⚙️ TRANSFORMATIONS │
|
||||||
|
└────────────────────────────────────────────┘
|
||||||
|
│
|
||||||
|
↓
|
||||||
|
📊 LINEAGE 📈 VISUALIZATION
|
||||||
|
┌──────────┐ ┌──────────┐
|
||||||
|
│ Marquez │ │ Grafana │
|
||||||
|
│OpenLineage│ │ Metabase │
|
||||||
|
└──────────┘ │ Rill │
|
||||||
|
↑ └──────────┘
|
||||||
|
│
|
||||||
|
🔍 MONITORING
|
||||||
|
┌──────────┐
|
||||||
|
│Prometheus│
|
||||||
|
│ Loki │
|
||||||
|
│ Alloy │
|
||||||
|
└──────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🚀 Quick Start
|
||||||
|
|
||||||
|
### 1. Iniciar Todos los Servicios
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Core services
|
||||||
|
docker-compose up -d
|
||||||
|
|
||||||
|
# Analytics
|
||||||
|
docker-compose -f docker-compose-analytics.yml up -d
|
||||||
|
|
||||||
|
# Databases
|
||||||
|
docker-compose -f docker-compose-databases.yml up -d
|
||||||
|
|
||||||
|
# Lineage
|
||||||
|
docker-compose -f docker-compose-marquez.yml up -d
|
||||||
|
|
||||||
|
# Messaging
|
||||||
|
docker-compose -f docker-compose-nats.yml up -d
|
||||||
|
|
||||||
|
# Orchestration (Dagu ya está corriendo como systemd)
|
||||||
|
systemctl --user status dagu.service
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Acceder al Dashboard
|
||||||
|
|
||||||
|
**Homer Dashboard**: http://localhost:8080
|
||||||
|
|
||||||
|
Desde ahí puedes acceder a todos los servicios.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📦 Servicios Disponibles
|
||||||
|
|
||||||
|
### 🎨 Visualization
|
||||||
|
| Servicio | Puerto | Descripción |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| **Grafana** | 3500 | Dashboards y alertas |
|
||||||
|
| **Metabase** | 3200 | Business Intelligence |
|
||||||
|
| **Rill** | 9009 | BI Dashboard moderno |
|
||||||
|
|
||||||
|
### 📊 Monitoring
|
||||||
|
| Servicio | Puerto | Descripción |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| **Prometheus** | 9090 | Métricas y alertas |
|
||||||
|
| **Loki** | 3100 | Agregación de logs |
|
||||||
|
| **Alloy** | 12345 | Colector de telemetría |
|
||||||
|
|
||||||
|
### 🔄 Orchestration & Transformations
|
||||||
|
| Servicio | Puerto | Descripción |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| **Dagu** | 8090 | DAG Scheduler & Data Transformations (nativo WSL) |
|
||||||
|
|
||||||
|
### 📨 Messaging
|
||||||
|
| Servicio | Puerto | Descripción |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| **NATS JetStream** | 4222/8222 | Message broker |
|
||||||
|
|
||||||
|
### 💾 Databases
|
||||||
|
| Servicio | Puerto | Descripción |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| **PostgreSQL** | 5434 | Base de datos relacional |
|
||||||
|
| **ClickHouse** | 8123/9000 | Base de datos analítica |
|
||||||
|
| **DBGate** | 3300 | Database management UI |
|
||||||
|
|
||||||
|
### 🗺️ Data Lineage
|
||||||
|
| Servicio | Puerto | Descripción |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| **Marquez** | 3001/5000 | OpenLineage tracking |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🏗️ Crear un Pipeline de Datos
|
||||||
|
|
||||||
|
### Ejemplo: Ingestión desde API
|
||||||
|
|
||||||
|
#### 1. Crear el script de recolección
|
||||||
|
|
||||||
|
```python
|
||||||
|
# ~/dagu/scripts/fetch_api_data.py
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
def fetch_data():
|
||||||
|
response = requests.get('https://api.example.com/data')
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
# Guardar temporalmente
|
||||||
|
with open('/tmp/api_data.json', 'w') as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
|
||||||
|
# Log a Marquez
|
||||||
|
log_lineage('START', 'api.example.com', '/tmp/api_data.json')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
fetch_data()
|
||||||
|
```
|
||||||
|
|
||||||
|
#### 2. Crear el DAG en Dagu
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/api_ingestion.yaml
|
||||||
|
name: api_ingestion
|
||||||
|
description: Ingesta datos desde API cada hora
|
||||||
|
|
||||||
|
schedule:
|
||||||
|
- "0 * * * *" # Cada hora
|
||||||
|
|
||||||
|
env:
|
||||||
|
- NATS_URL: nats://localhost:4222
|
||||||
|
- POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. Fetch data from API
|
||||||
|
- name: fetch
|
||||||
|
command: python ~/dagu/scripts/fetch_api_data.py
|
||||||
|
|
||||||
|
# 2. Validate data
|
||||||
|
- name: validate
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/validate_schema.py \
|
||||||
|
--input /tmp/api_data.json
|
||||||
|
depends: [fetch]
|
||||||
|
|
||||||
|
# 3. Publish to NATS
|
||||||
|
- name: publish_nats
|
||||||
|
command: |
|
||||||
|
nats pub data.api.raw \
|
||||||
|
"$(cat /tmp/api_data.json)" \
|
||||||
|
--server=$NATS_URL
|
||||||
|
depends: [validate]
|
||||||
|
|
||||||
|
# 4. Consume and ingest to PostgreSQL
|
||||||
|
- name: ingest_postgres
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/nats_to_postgres.py \
|
||||||
|
--stream data.api.raw \
|
||||||
|
--table api_events
|
||||||
|
depends: [publish_nats]
|
||||||
|
|
||||||
|
# 5. Push metrics
|
||||||
|
- name: metrics
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/push_metrics.py \
|
||||||
|
--job api_ingestion \
|
||||||
|
--success true
|
||||||
|
depends: [ingest_postgres]
|
||||||
|
|
||||||
|
handlers:
|
||||||
|
failure:
|
||||||
|
- name: alert
|
||||||
|
command: |
|
||||||
|
echo "Pipeline failed!" | \
|
||||||
|
curl -X POST http://localhost:9093/api/v1/alerts
|
||||||
|
```
|
||||||
|
|
||||||
|
#### 3. Monitorear en Grafana
|
||||||
|
|
||||||
|
1. Ir a http://localhost:3500
|
||||||
|
2. Crear dashboard con:
|
||||||
|
- Query a Prometheus: `job_success{job="api_ingestion"}`
|
||||||
|
- Logs de Loki: `{job="dagu"} |= "api_ingestion"`
|
||||||
|
|
||||||
|
#### 4. Verificar Lineage en Marquez
|
||||||
|
|
||||||
|
1. Ir a http://localhost:3001
|
||||||
|
2. Buscar job: `api_ingestion`
|
||||||
|
3. Ver el grafo completo de datos:
|
||||||
|
```
|
||||||
|
api.example.com → /tmp/api_data.json → NATS → PostgreSQL
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📝 Scripts Helper Incluidos
|
||||||
|
|
||||||
|
### `~/dagu/scripts/log_lineage.py`
|
||||||
|
Envía eventos OpenLineage a Marquez
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python ~/dagu/scripts/log_lineage.py \
|
||||||
|
--event START \
|
||||||
|
--source api.example.com \
|
||||||
|
--target /tmp/data.json \
|
||||||
|
--job my_pipeline
|
||||||
|
```
|
||||||
|
|
||||||
|
### `~/dagu/scripts/push_metrics.py`
|
||||||
|
Publica métricas a Prometheus
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python ~/dagu/scripts/push_metrics.py \
|
||||||
|
--job my_pipeline \
|
||||||
|
--success true \
|
||||||
|
--duration 45
|
||||||
|
```
|
||||||
|
|
||||||
|
### `~/dagu/scripts/publish_to_nats.sh`
|
||||||
|
Publica mensajes a NATS JetStream
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./~/dagu/scripts/publish_to_nats.sh data.stream data.json
|
||||||
|
```
|
||||||
|
|
||||||
|
### `~/dagu/scripts/nats_to_postgres.py`
|
||||||
|
Consume de NATS e ingesta a PostgreSQL
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python ~/dagu/scripts/nats_to_postgres.py \
|
||||||
|
--stream data.raw \
|
||||||
|
--table events \
|
||||||
|
--batch-size 100
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Casos de Uso
|
||||||
|
|
||||||
|
### 1. ETL desde API a Warehouse
|
||||||
|
```
|
||||||
|
API → Dagu (fetch) → NATS → PostgreSQL → Grafana
|
||||||
|
↓
|
||||||
|
Marquez (lineage tracking)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Stream Processing en Tiempo Real
|
||||||
|
```
|
||||||
|
IoT Devices → NATS → Dagu (transform) → ClickHouse → Rill
|
||||||
|
↓
|
||||||
|
Marquez
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Reporting Diario
|
||||||
|
```
|
||||||
|
Dagu (schedule) → PostgreSQL (query) → Metabase (dashboard) → Email
|
||||||
|
↓
|
||||||
|
Marquez
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔧 Configuración
|
||||||
|
|
||||||
|
### NATS JetStream
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Crear stream
|
||||||
|
nats stream add DATA_STREAM \
|
||||||
|
--subjects "data.*" \
|
||||||
|
--storage file \
|
||||||
|
--retention limits \
|
||||||
|
--max-age 7d
|
||||||
|
|
||||||
|
# Ver estado
|
||||||
|
nats stream ls
|
||||||
|
nats stream info DATA_STREAM
|
||||||
|
```
|
||||||
|
|
||||||
|
### PostgreSQL
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Conectar
|
||||||
|
psql -h localhost -p 5434 -U postgres -d postgres
|
||||||
|
|
||||||
|
# Crear tabla
|
||||||
|
CREATE TABLE events (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
timestamp TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
source VARCHAR(255),
|
||||||
|
data JSONB,
|
||||||
|
lineage_job VARCHAR(255)
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
### ClickHouse
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Conectar
|
||||||
|
clickhouse-client --host localhost --port 9000
|
||||||
|
|
||||||
|
# Crear tabla
|
||||||
|
CREATE TABLE events (
|
||||||
|
timestamp DateTime,
|
||||||
|
source String,
|
||||||
|
data String,
|
||||||
|
lineage_job String
|
||||||
|
) ENGINE = MergeTree()
|
||||||
|
ORDER BY timestamp;
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📊 Monitoring
|
||||||
|
|
||||||
|
### Ver Métricas en Prometheus
|
||||||
|
```
|
||||||
|
http://localhost:9090
|
||||||
|
|
||||||
|
Queries útiles:
|
||||||
|
- job_success{job="*"}
|
||||||
|
- job_duration_seconds{job="*"}
|
||||||
|
- rate(job_executions_total[5m])
|
||||||
|
```
|
||||||
|
|
||||||
|
### Ver Logs en Grafana
|
||||||
|
```
|
||||||
|
http://localhost:3500 → Explore → Loki
|
||||||
|
|
||||||
|
Queries útiles:
|
||||||
|
- {job="dagu"}
|
||||||
|
- {job="dagu"} |= "error"
|
||||||
|
- {job="dagu"} |= "api_ingestion"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Ver Lineage en Marquez
|
||||||
|
```
|
||||||
|
http://localhost:3001
|
||||||
|
|
||||||
|
Buscar:
|
||||||
|
- Jobs: api_ingestion, data_transform
|
||||||
|
- Datasets: /tmp/api_data.json, postgres://events
|
||||||
|
- Runs: últimas ejecuciones
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🚨 Troubleshooting
|
||||||
|
|
||||||
|
### Dagu no responde
|
||||||
|
```bash
|
||||||
|
# Ver logs
|
||||||
|
journalctl --user -u dagu.service -f
|
||||||
|
|
||||||
|
# Reiniciar
|
||||||
|
systemctl --user restart dagu.service
|
||||||
|
```
|
||||||
|
|
||||||
|
### NATS no conecta
|
||||||
|
```bash
|
||||||
|
# Ver estado
|
||||||
|
docker logs nats
|
||||||
|
|
||||||
|
# Verificar puerto
|
||||||
|
nats server ping nats://localhost:4222
|
||||||
|
```
|
||||||
|
|
||||||
|
### Base de datos no accesible
|
||||||
|
```bash
|
||||||
|
# PostgreSQL
|
||||||
|
docker logs postgres-main
|
||||||
|
|
||||||
|
# ClickHouse
|
||||||
|
docker logs clickhouse
|
||||||
|
```
|
||||||
|
|
||||||
|
### Marquez no registra eventos
|
||||||
|
```bash
|
||||||
|
# Ver logs
|
||||||
|
docker logs marquez
|
||||||
|
|
||||||
|
# Probar API manualmente
|
||||||
|
curl http://localhost:5000/api/v1/namespaces
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📚 Documentación Adicional
|
||||||
|
|
||||||
|
- **CLAUDE.md**: Guía técnica de manipulación de servicios
|
||||||
|
- **TRANSFORMATIONS.md**: Guía completa de transformaciones con Dagu
|
||||||
|
- **~/dagu/README.md**: Documentación específica de Dagu
|
||||||
|
- **Dagu Docs**: https://dagu.sh/
|
||||||
|
- **OpenLineage Spec**: https://openlineage.io/
|
||||||
|
- **NATS Docs**: https://docs.nats.io/
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🤝 Contribuir
|
||||||
|
|
||||||
|
### Añadir un nuevo pipeline
|
||||||
|
|
||||||
|
1. Crear script en `~/dagu/scripts/`
|
||||||
|
2. Crear DAG en `~/dagu/dags/`
|
||||||
|
3. Añadir lineage tracking
|
||||||
|
4. Crear dashboard en Grafana
|
||||||
|
5. Documentar en este README
|
||||||
|
|
||||||
|
### Añadir un nuevo servicio
|
||||||
|
|
||||||
|
1. Crear `docker-compose-<servicio>.yml`
|
||||||
|
2. Añadir a Homer en `homer/assets/config.yml`
|
||||||
|
3. Documentar puertos y configuración
|
||||||
|
4. Actualizar CLAUDE.md si necesita manipulación especial
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📊 Arquitectura Detallada
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────┐
|
||||||
|
│ DAGU │
|
||||||
|
│ Scheduler │
|
||||||
|
└──────┬──────┘
|
||||||
|
│
|
||||||
|
┌──────────────┼──────────────┐
|
||||||
|
│ │ │
|
||||||
|
▼ ▼ ▼
|
||||||
|
┌───────────┐ ┌───────────┐ ┌───────────┐
|
||||||
|
│ Script │ │ Script │ │ Script │
|
||||||
|
│ Fetch │ │ Transform │ │ Export │
|
||||||
|
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
|
||||||
|
│ │ │
|
||||||
|
└──────────────┼──────────────┘
|
||||||
|
│
|
||||||
|
┌────▼────┐
|
||||||
|
│ NATS │
|
||||||
|
│JetStream│
|
||||||
|
└────┬────┘
|
||||||
|
│
|
||||||
|
┌──────────────┼──────────────┐
|
||||||
|
│ │ │
|
||||||
|
▼ ▼ ▼
|
||||||
|
┌──────────┐ ┌──────────┐ ┌──────────┐
|
||||||
|
│PostgreSQL│ │ClickHouse│ │ Dagu │
|
||||||
|
│ │ │ │ │Transform │
|
||||||
|
└────┬─────┘ └────┬─────┘ └────┬─────┘
|
||||||
|
│ │ │
|
||||||
|
└─────────────┼─────────────┘
|
||||||
|
│
|
||||||
|
┌────────────┼────────────┐
|
||||||
|
│ │ │
|
||||||
|
▼ ▼ ▼
|
||||||
|
┌─────────┐ ┌─────────┐ ┌─────────┐
|
||||||
|
│ Grafana │ │Metabase │ │ Rill │
|
||||||
|
└─────────┘ └─────────┘ └─────────┘
|
||||||
|
|
||||||
|
Lineage Tracking (Marquez)
|
||||||
|
┌────────────────────────────────┐
|
||||||
|
│ API → NATS → DB → Visualization│
|
||||||
|
└────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔐 Credenciales
|
||||||
|
|
||||||
|
| Servicio | Usuario | Password | Puerto |
|
||||||
|
|----------|---------|----------|--------|
|
||||||
|
| PostgreSQL | postgres | postgres | 5434 |
|
||||||
|
| ClickHouse | default | clickhouse | 8123 |
|
||||||
|
| Marquez DB | marquez | marquez | 5433 |
|
||||||
|
| Metabase DB | metabase | metabase | (interno) |
|
||||||
|
| NATS | nats | nats123 | 4222 |
|
||||||
|
|
||||||
|
**⚠️ IMPORTANTE**: Cambiar passwords en producción
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📈 Roadmap
|
||||||
|
|
||||||
|
- [ ] Añadir dbt para transformaciones SQL
|
||||||
|
- [ ] Integrar Airflow como alternativa a Dagu
|
||||||
|
- [ ] Añadir Kafka como alternativa a NATS
|
||||||
|
- [ ] Implementar data quality con Great Expectations
|
||||||
|
- [ ] Dashboard unificado de lineage + monitoring
|
||||||
|
- [ ] CI/CD para pipelines de datos
|
||||||
|
- [ ] Disaster recovery y backups automáticos
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Última actualización**: 2026-03-23
|
||||||
|
**Versión**: 1.0.0
|
||||||
|
**Mantenedor**: Lucas (@egutierrez)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📞 Soporte
|
||||||
|
|
||||||
|
Para issues y preguntas:
|
||||||
|
- Gitea: https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/automatic-process
|
||||||
|
- Claude Assistant: Disponible 24/7 para gestión de pipelines
|
||||||
@@ -0,0 +1,582 @@
|
|||||||
|
# Transformaciones con Dagu
|
||||||
|
|
||||||
|
Guía completa de cómo hacer transformaciones de datos con Dagu.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## ✅ Dagu PUEDE hacer transformaciones
|
||||||
|
|
||||||
|
Dagu ejecuta **cualquier script o comando**, por lo que puede hacer:
|
||||||
|
- ✅ Transformaciones SQL
|
||||||
|
- ✅ Transformaciones Python/Pandas
|
||||||
|
- ✅ Agregaciones y cálculos
|
||||||
|
- ✅ Limpieza de datos
|
||||||
|
- ✅ Enriquecimiento de datos
|
||||||
|
- ✅ Joins complejos
|
||||||
|
- ✅ Transformaciones en streaming
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Patrón 1: Transformaciones Python/Pandas
|
||||||
|
|
||||||
|
### Ejemplo: Limpieza y agregación
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/transform_sales.yaml
|
||||||
|
name: transform_sales_data
|
||||||
|
schedule: "0 2 * * *" # Cada día a las 2 AM
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. Extract desde PostgreSQL
|
||||||
|
- name: extract
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
df = pd.read_sql('SELECT * FROM raw_sales WHERE date = CURRENT_DATE', engine)
|
||||||
|
df.to_parquet('/tmp/raw_sales.parquet')
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# 2. Transform - Limpieza
|
||||||
|
- name: clean
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
df = pd.read_parquet('/tmp/raw_sales.parquet')
|
||||||
|
|
||||||
|
# Limpiar datos
|
||||||
|
df = df.dropna(subset=['customer_id', 'amount'])
|
||||||
|
df['amount'] = df['amount'].astype(float)
|
||||||
|
df['date'] = pd.to_datetime(df['date'])
|
||||||
|
|
||||||
|
# Remover duplicados
|
||||||
|
df = df.drop_duplicates(subset=['transaction_id'])
|
||||||
|
|
||||||
|
df.to_parquet('/tmp/clean_sales.parquet')
|
||||||
|
print(f"Cleaned {len(df)} records")
|
||||||
|
EOF
|
||||||
|
depends: [extract]
|
||||||
|
|
||||||
|
# 3. Transform - Agregaciones
|
||||||
|
- name: aggregate
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
df = pd.read_parquet('/tmp/clean_sales.parquet')
|
||||||
|
|
||||||
|
# Agregación por cliente
|
||||||
|
customer_summary = df.groupby('customer_id').agg({
|
||||||
|
'amount': ['sum', 'mean', 'count'],
|
||||||
|
'date': 'max'
|
||||||
|
}).reset_index()
|
||||||
|
|
||||||
|
customer_summary.columns = ['customer_id', 'total_spent', 'avg_spent', 'num_purchases', 'last_purchase']
|
||||||
|
|
||||||
|
customer_summary.to_parquet('/tmp/customer_summary.parquet')
|
||||||
|
print(f"Aggregated {len(customer_summary)} customers")
|
||||||
|
EOF
|
||||||
|
depends: [clean]
|
||||||
|
|
||||||
|
# 4. Load a PostgreSQL
|
||||||
|
- name: load
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
df = pd.read_parquet('/tmp/customer_summary.parquet')
|
||||||
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
|
||||||
|
df.to_sql('customer_summary', engine, if_exists='replace', index=False)
|
||||||
|
print(f"Loaded {len(df)} records to customer_summary table")
|
||||||
|
EOF
|
||||||
|
depends: [aggregate]
|
||||||
|
|
||||||
|
# 5. Log lineage
|
||||||
|
- name: lineage
|
||||||
|
command: |
|
||||||
|
python ~/dagu/scripts/log_lineage.py \
|
||||||
|
--event COMPLETE \
|
||||||
|
--source postgres://raw_sales \
|
||||||
|
--target postgres://customer_summary \
|
||||||
|
--job transform_sales_data
|
||||||
|
depends: [load]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Patrón 2: Transformaciones SQL (dbt-style)
|
||||||
|
|
||||||
|
### Ejemplo: Transformación incremental
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/transform_orders.yaml
|
||||||
|
name: transform_orders
|
||||||
|
schedule: "*/15 * * * *" # Cada 15 minutos
|
||||||
|
|
||||||
|
env:
|
||||||
|
- DB_URL: postgresql://postgres:postgres@localhost:5434/postgres
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. Staging - Raw to Clean
|
||||||
|
- name: stage_orders
|
||||||
|
command: |
|
||||||
|
psql $DB_URL <<SQL
|
||||||
|
-- Crear tabla staging si no existe
|
||||||
|
CREATE TABLE IF NOT EXISTS stg_orders (
|
||||||
|
order_id BIGINT PRIMARY KEY,
|
||||||
|
customer_id BIGINT,
|
||||||
|
amount DECIMAL(10,2),
|
||||||
|
status VARCHAR(50),
|
||||||
|
created_at TIMESTAMPTZ,
|
||||||
|
processed_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Insert incremental
|
||||||
|
INSERT INTO stg_orders (order_id, customer_id, amount, status, created_at)
|
||||||
|
SELECT
|
||||||
|
order_id,
|
||||||
|
customer_id,
|
||||||
|
amount::DECIMAL(10,2),
|
||||||
|
LOWER(TRIM(status)) as status,
|
||||||
|
created_at
|
||||||
|
FROM raw_orders
|
||||||
|
WHERE created_at > (SELECT COALESCE(MAX(created_at), '1970-01-01') FROM stg_orders)
|
||||||
|
ON CONFLICT (order_id) DO UPDATE SET
|
||||||
|
amount = EXCLUDED.amount,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
processed_at = NOW();
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# 2. Transform - Calcular métricas
|
||||||
|
- name: calc_metrics
|
||||||
|
command: |
|
||||||
|
psql $DB_URL <<SQL
|
||||||
|
-- Tabla de métricas diarias
|
||||||
|
CREATE TABLE IF NOT EXISTS daily_metrics (
|
||||||
|
date DATE PRIMARY KEY,
|
||||||
|
total_orders INT,
|
||||||
|
total_revenue DECIMAL(12,2),
|
||||||
|
avg_order_value DECIMAL(10,2),
|
||||||
|
completed_orders INT,
|
||||||
|
cancelled_orders INT,
|
||||||
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Upsert métricas
|
||||||
|
INSERT INTO daily_metrics (date, total_orders, total_revenue, avg_order_value, completed_orders, cancelled_orders)
|
||||||
|
SELECT
|
||||||
|
DATE(created_at) as date,
|
||||||
|
COUNT(*) as total_orders,
|
||||||
|
SUM(amount) as total_revenue,
|
||||||
|
AVG(amount) as avg_order_value,
|
||||||
|
COUNT(*) FILTER (WHERE status = 'completed') as completed_orders,
|
||||||
|
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled_orders
|
||||||
|
FROM stg_orders
|
||||||
|
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
|
||||||
|
GROUP BY DATE(created_at)
|
||||||
|
ON CONFLICT (date) DO UPDATE SET
|
||||||
|
total_orders = EXCLUDED.total_orders,
|
||||||
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
|
avg_order_value = EXCLUDED.avg_order_value,
|
||||||
|
completed_orders = EXCLUDED.completed_orders,
|
||||||
|
cancelled_orders = EXCLUDED.cancelled_orders,
|
||||||
|
updated_at = NOW();
|
||||||
|
SQL
|
||||||
|
depends: [stage_orders]
|
||||||
|
|
||||||
|
# 3. Transform - Snapshot histórico
|
||||||
|
- name: snapshot
|
||||||
|
command: |
|
||||||
|
psql $DB_URL <<SQL
|
||||||
|
-- Tabla de snapshots
|
||||||
|
CREATE TABLE IF NOT EXISTS order_snapshots (
|
||||||
|
snapshot_id SERIAL PRIMARY KEY,
|
||||||
|
order_id BIGINT,
|
||||||
|
status VARCHAR(50),
|
||||||
|
amount DECIMAL(10,2),
|
||||||
|
snapshot_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Insertar snapshot de órdenes en progreso
|
||||||
|
INSERT INTO order_snapshots (order_id, status, amount)
|
||||||
|
SELECT order_id, status, amount
|
||||||
|
FROM stg_orders
|
||||||
|
WHERE status IN ('pending', 'processing');
|
||||||
|
SQL
|
||||||
|
depends: [calc_metrics]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Patrón 3: Transformación Multi-Tabla con Joins
|
||||||
|
|
||||||
|
### Ejemplo: Enriquecer datos con múltiples fuentes
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/enrich_customer_data.yaml
|
||||||
|
name: enrich_customer_data
|
||||||
|
schedule: "0 3 * * *"
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. Extract y combinar múltiples fuentes
|
||||||
|
- name: merge_sources
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
|
||||||
|
# Cargar múltiples tablas
|
||||||
|
customers = pd.read_sql('SELECT * FROM customers', engine)
|
||||||
|
orders = pd.read_sql('SELECT * FROM orders WHERE created_at >= CURRENT_DATE - 30', engine)
|
||||||
|
reviews = pd.read_sql('SELECT * FROM reviews', engine)
|
||||||
|
|
||||||
|
# Agregaciones de órdenes
|
||||||
|
order_stats = orders.groupby('customer_id').agg({
|
||||||
|
'order_id': 'count',
|
||||||
|
'amount': ['sum', 'mean'],
|
||||||
|
'created_at': 'max'
|
||||||
|
}).reset_index()
|
||||||
|
order_stats.columns = ['customer_id', 'total_orders', 'total_spent', 'avg_order', 'last_order']
|
||||||
|
|
||||||
|
# Agregaciones de reviews
|
||||||
|
review_stats = reviews.groupby('customer_id').agg({
|
||||||
|
'rating': 'mean',
|
||||||
|
'review_id': 'count'
|
||||||
|
}).reset_index()
|
||||||
|
review_stats.columns = ['customer_id', 'avg_rating', 'total_reviews']
|
||||||
|
|
||||||
|
# Merge todo
|
||||||
|
enriched = customers.merge(order_stats, on='customer_id', how='left')
|
||||||
|
enriched = enriched.merge(review_stats, on='customer_id', how='left')
|
||||||
|
|
||||||
|
# Calcular segmento
|
||||||
|
enriched['segment'] = enriched.apply(lambda x:
|
||||||
|
'VIP' if x['total_spent'] > 1000 else
|
||||||
|
'Regular' if x['total_spent'] > 100 else
|
||||||
|
'New', axis=1
|
||||||
|
)
|
||||||
|
|
||||||
|
enriched.to_parquet('/tmp/enriched_customers.parquet')
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# 2. Load enriquecido
|
||||||
|
- name: load_enriched
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
df = pd.read_parquet('/tmp/enriched_customers.parquet')
|
||||||
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
|
||||||
|
df.to_sql('enriched_customers', engine, if_exists='replace', index=False)
|
||||||
|
EOF
|
||||||
|
depends: [merge_sources]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Patrón 4: Transformación Incremental (Solo cambios)
|
||||||
|
|
||||||
|
### Ejemplo: CDC (Change Data Capture) simplificado
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/incremental_transform.yaml
|
||||||
|
name: incremental_transform
|
||||||
|
schedule: "*/5 * * * *" # Cada 5 minutos
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. Identificar cambios
|
||||||
|
- name: detect_changes
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
|
||||||
|
# Última marca de agua
|
||||||
|
last_sync = pd.read_sql(
|
||||||
|
"SELECT MAX(updated_at) as last_sync FROM transformed_data",
|
||||||
|
engine
|
||||||
|
).iloc[0]['last_sync']
|
||||||
|
|
||||||
|
# Solo registros nuevos/modificados
|
||||||
|
new_data = pd.read_sql(f"""
|
||||||
|
SELECT * FROM raw_data
|
||||||
|
WHERE updated_at > '{last_sync}'
|
||||||
|
""", engine)
|
||||||
|
|
||||||
|
if len(new_data) > 0:
|
||||||
|
new_data.to_parquet('/tmp/new_data.parquet')
|
||||||
|
print(f"Found {len(new_data)} new/changed records")
|
||||||
|
else:
|
||||||
|
print("No changes detected")
|
||||||
|
exit(0)
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# 2. Transformar solo cambios
|
||||||
|
- name: transform_changes
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
if not os.path.exists('/tmp/new_data.parquet'):
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
df = pd.read_parquet('/tmp/new_data.parquet')
|
||||||
|
|
||||||
|
# Aplicar transformaciones
|
||||||
|
df['normalized_value'] = df['value'] / df['value'].max()
|
||||||
|
df['category'] = df['type'].map({
|
||||||
|
'A': 'Category 1',
|
||||||
|
'B': 'Category 2',
|
||||||
|
'C': 'Category 3'
|
||||||
|
})
|
||||||
|
|
||||||
|
df.to_parquet('/tmp/transformed_changes.parquet')
|
||||||
|
EOF
|
||||||
|
depends: [detect_changes]
|
||||||
|
|
||||||
|
# 3. Upsert cambios
|
||||||
|
- name: upsert_changes
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
if not os.path.exists('/tmp/transformed_changes.parquet'):
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
df = pd.read_parquet('/tmp/transformed_changes.parquet')
|
||||||
|
engine = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
|
||||||
|
# Usar ON CONFLICT para upsert
|
||||||
|
for _, row in df.iterrows():
|
||||||
|
engine.execute(f"""
|
||||||
|
INSERT INTO transformed_data (id, value, category, updated_at)
|
||||||
|
VALUES ({row['id']}, {row['normalized_value']}, '{row['category']}', NOW())
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
value = EXCLUDED.value,
|
||||||
|
category = EXCLUDED.category,
|
||||||
|
updated_at = NOW()
|
||||||
|
""")
|
||||||
|
|
||||||
|
print(f"Upserted {len(df)} records")
|
||||||
|
EOF
|
||||||
|
depends: [transform_changes]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Patrón 5: Transformación con ClickHouse (Analítica)
|
||||||
|
|
||||||
|
### Ejemplo: Agregaciones pesadas
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/analytics_clickhouse.yaml
|
||||||
|
name: analytics_transform
|
||||||
|
schedule: "0 4 * * *"
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# 1. Transformar y cargar a ClickHouse
|
||||||
|
- name: load_to_clickhouse
|
||||||
|
command: |
|
||||||
|
python <<EOF
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from clickhouse_driver import Client
|
||||||
|
|
||||||
|
# Extract de PostgreSQL
|
||||||
|
pg = create_engine('postgresql://postgres:postgres@localhost:5434/postgres')
|
||||||
|
df = pd.read_sql('SELECT * FROM events WHERE date = CURRENT_DATE', pg)
|
||||||
|
|
||||||
|
# Transform
|
||||||
|
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
|
||||||
|
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
|
||||||
|
|
||||||
|
# Load a ClickHouse
|
||||||
|
ch = Client('localhost', port=9000)
|
||||||
|
|
||||||
|
ch.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS events_analytics (
|
||||||
|
event_id UInt64,
|
||||||
|
user_id UInt64,
|
||||||
|
event_type String,
|
||||||
|
timestamp DateTime,
|
||||||
|
hour UInt8,
|
||||||
|
day_of_week UInt8,
|
||||||
|
value Float64
|
||||||
|
) ENGINE = MergeTree()
|
||||||
|
ORDER BY (event_type, timestamp)
|
||||||
|
''')
|
||||||
|
|
||||||
|
# Insert
|
||||||
|
ch.execute(
|
||||||
|
'INSERT INTO events_analytics VALUES',
|
||||||
|
df.to_dict('records')
|
||||||
|
)
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# 2. Agregación en ClickHouse (super rápido)
|
||||||
|
- name: aggregate
|
||||||
|
command: |
|
||||||
|
clickhouse-client --query "
|
||||||
|
CREATE TABLE IF NOT EXISTS hourly_stats
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
ORDER BY (event_type, hour)
|
||||||
|
AS SELECT
|
||||||
|
event_type,
|
||||||
|
hour,
|
||||||
|
day_of_week,
|
||||||
|
COUNT(*) as event_count,
|
||||||
|
AVG(value) as avg_value,
|
||||||
|
SUM(value) as total_value
|
||||||
|
FROM events_analytics
|
||||||
|
WHERE timestamp >= today()
|
||||||
|
GROUP BY event_type, hour, day_of_week
|
||||||
|
"
|
||||||
|
depends: [load_to_clickhouse]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Patrón 6: Transformación con Dependencias Complejas
|
||||||
|
|
||||||
|
### Ejemplo: DAG con múltiples transformaciones en paralelo
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# ~/dagu/dags/complex_transform.yaml
|
||||||
|
name: complex_multi_transform
|
||||||
|
schedule: "0 1 * * *"
|
||||||
|
|
||||||
|
steps:
|
||||||
|
# Paso inicial - Extracción
|
||||||
|
- name: extract
|
||||||
|
command: python ~/dagu/scripts/extract_data.py
|
||||||
|
|
||||||
|
# Transformaciones en paralelo
|
||||||
|
- name: transform_customers
|
||||||
|
command: python ~/dagu/scripts/transform_customers.py
|
||||||
|
depends: [extract]
|
||||||
|
|
||||||
|
- name: transform_products
|
||||||
|
command: python ~/dagu/scripts/transform_products.py
|
||||||
|
depends: [extract]
|
||||||
|
|
||||||
|
- name: transform_orders
|
||||||
|
command: python ~/dagu/scripts/transform_orders.py
|
||||||
|
depends: [extract]
|
||||||
|
|
||||||
|
# Join todo
|
||||||
|
- name: join_all
|
||||||
|
command: python ~/dagu/scripts/join_datasets.py
|
||||||
|
depends: [transform_customers, transform_products, transform_orders]
|
||||||
|
|
||||||
|
# Calcular métricas finales
|
||||||
|
- name: calc_metrics
|
||||||
|
command: python ~/dagu/scripts/calculate_metrics.py
|
||||||
|
depends: [join_all]
|
||||||
|
|
||||||
|
# Cargar a destinos
|
||||||
|
- name: load_postgres
|
||||||
|
command: python ~/dagu/scripts/load_postgres.py
|
||||||
|
depends: [calc_metrics]
|
||||||
|
|
||||||
|
- name: load_clickhouse
|
||||||
|
command: python ~/dagu/scripts/load_clickhouse.py
|
||||||
|
depends: [calc_metrics]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 💡 Buenas Prácticas
|
||||||
|
|
||||||
|
### 1. Usa archivos intermedios
|
||||||
|
```bash
|
||||||
|
/tmp/raw_data.parquet
|
||||||
|
/tmp/clean_data.parquet
|
||||||
|
/tmp/transformed_data.parquet
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Validaciones entre pasos
|
||||||
|
```python
|
||||||
|
# Validar antes de continuar
|
||||||
|
assert len(df) > 0, "No data to process"
|
||||||
|
assert df['amount'].sum() > 0, "Invalid amounts"
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Logs estructurados
|
||||||
|
```python
|
||||||
|
import logging
|
||||||
|
logging.info(f"Processed {len(df)} records in {elapsed:.2f}s")
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. Idempotencia
|
||||||
|
```sql
|
||||||
|
-- Usar UPSERT en lugar de INSERT
|
||||||
|
INSERT ... ON CONFLICT DO UPDATE
|
||||||
|
```
|
||||||
|
|
||||||
|
### 5. Cleanup
|
||||||
|
```yaml
|
||||||
|
steps:
|
||||||
|
# ... tus pasos
|
||||||
|
|
||||||
|
- name: cleanup
|
||||||
|
command: rm -f /tmp/*.parquet
|
||||||
|
continueOn:
|
||||||
|
failure: true
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🆚 Dagu vs dbt
|
||||||
|
|
||||||
|
| Feature | Dagu | dbt |
|
||||||
|
|---------|------|-----|
|
||||||
|
| SQL transforms | ✅ Sí | ✅ Sí (mejor) |
|
||||||
|
| Python transforms | ✅ Sí (mejor) | ⚠️ Limitado |
|
||||||
|
| Scheduling | ✅ Built-in | ❌ Externo |
|
||||||
|
| Lineage | ⚠️ Manual | ✅ Automático |
|
||||||
|
| Testing | ⚠️ Manual | ✅ Built-in |
|
||||||
|
| Docs | ⚠️ Manual | ✅ Automático |
|
||||||
|
|
||||||
|
**Recomendación**:
|
||||||
|
- Usa **Dagu** para pipelines end-to-end
|
||||||
|
- Considera **dbt** si haces mucho SQL y quieres lineage automático
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎯 Resumen
|
||||||
|
|
||||||
|
**Dagu PUEDE hacer transformaciones:**
|
||||||
|
- ✅ Python/Pandas (limpieza, agregaciones)
|
||||||
|
- ✅ SQL (staging, métricas, joins)
|
||||||
|
- ✅ Transformaciones incrementales
|
||||||
|
- ✅ Multi-tabla con joins complejos
|
||||||
|
- ✅ Paralelo (múltiples transforms a la vez)
|
||||||
|
- ✅ ClickHouse (analítica pesada)
|
||||||
|
|
||||||
|
**NO necesitas Temporal para:**
|
||||||
|
- ❌ Transformaciones simples/medias
|
||||||
|
- ❌ ETL típico (Extract → Transform → Load)
|
||||||
|
- ❌ Pipelines que terminan en < 1 hora
|
||||||
|
- ❌ Agregaciones SQL o Pandas
|
||||||
|
|
||||||
|
**SÍ necesitas Temporal solo si:**
|
||||||
|
- ✅ Transformación tarda > 1 hora
|
||||||
|
- ✅ Necesitas pausar/reanudar
|
||||||
|
- ✅ State machine muy complejo
|
||||||
|
- ✅ Compensaciones distribuidas
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Última actualización**: 2026-03-23
|
||||||
@@ -1,64 +0,0 @@
|
|||||||
services:
|
|
||||||
temporal-postgresql:
|
|
||||||
image: postgres:15
|
|
||||||
container_name: temporal-db
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: temporal
|
|
||||||
POSTGRES_PASSWORD: temporal
|
|
||||||
POSTGRES_DB: temporal
|
|
||||||
ports:
|
|
||||||
- "5435:5432"
|
|
||||||
volumes:
|
|
||||||
- temporal-postgres-data:/var/lib/postgresql/data
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "pg_isready", "-U", "temporal"]
|
|
||||||
interval: 5s
|
|
||||||
timeout: 5s
|
|
||||||
retries: 5
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
||||||
temporal:
|
|
||||||
image: temporalio/auto-setup:latest
|
|
||||||
container_name: temporal
|
|
||||||
depends_on:
|
|
||||||
temporal-postgresql:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
|
||||||
- DB=postgres12
|
|
||||||
- DB_PORT=5432
|
|
||||||
- POSTGRES_USER=temporal
|
|
||||||
- POSTGRES_PWD=temporal
|
|
||||||
- POSTGRES_SEEDS=temporal-postgresql
|
|
||||||
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
|
|
||||||
ports:
|
|
||||||
- "7233:7233"
|
|
||||||
volumes:
|
|
||||||
- ./temporal-dynamicconfig:/etc/temporal/config/dynamicconfig
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
||||||
temporal-ui:
|
|
||||||
image: temporalio/ui:latest
|
|
||||||
container_name: temporal-ui
|
|
||||||
depends_on:
|
|
||||||
- temporal
|
|
||||||
environment:
|
|
||||||
- TEMPORAL_ADDRESS=temporal:7233
|
|
||||||
- TEMPORAL_CORS_ORIGINS=http://localhost:3400
|
|
||||||
ports:
|
|
||||||
- "3400:8080"
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
||||||
temporal-admin-tools:
|
|
||||||
image: temporalio/admin-tools:latest
|
|
||||||
container_name: temporal-admin-tools
|
|
||||||
depends_on:
|
|
||||||
- temporal
|
|
||||||
environment:
|
|
||||||
- TEMPORAL_ADDRESS=temporal:7233
|
|
||||||
- TEMPORAL_CLI_ADDRESS=temporal:7233
|
|
||||||
stdin_open: true
|
|
||||||
tty: true
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
temporal-postgres-data:
|
|
||||||
@@ -95,16 +95,9 @@ services:
|
|||||||
- name: "Orchestration"
|
- name: "Orchestration"
|
||||||
icon: "fas fa-code-branch"
|
icon: "fas fa-code-branch"
|
||||||
items:
|
items:
|
||||||
- name: "Temporal UI"
|
|
||||||
logo: "http://localhost:3400/favicon.ico"
|
|
||||||
subtitle: "Workflow Orchestration"
|
|
||||||
tag: "orchestration"
|
|
||||||
url: "http://localhost:3400"
|
|
||||||
target: "_blank"
|
|
||||||
|
|
||||||
- name: "Dagu"
|
- name: "Dagu"
|
||||||
logo: "http://localhost:8090/assets/favicon.ico"
|
logo: "http://localhost:8090/assets/favicon.ico"
|
||||||
subtitle: "DAG Scheduler - Local Scripts"
|
subtitle: "DAG Scheduler & Transformations"
|
||||||
tag: "orchestration"
|
tag: "orchestration"
|
||||||
url: "http://localhost:8090"
|
url: "http://localhost:8090"
|
||||||
target: "_blank"
|
target: "_blank"
|
||||||
|
|||||||
@@ -1,20 +0,0 @@
|
|||||||
# Temporal dynamic configuration for development
|
|
||||||
system.forceSearchAttributesCacheRefreshOnRead:
|
|
||||||
- value: true
|
|
||||||
constraints: {}
|
|
||||||
|
|
||||||
frontend.enableUpdateWorkflowExecution:
|
|
||||||
- value: true
|
|
||||||
constraints: {}
|
|
||||||
|
|
||||||
history.enableParentClosePolicyWorker:
|
|
||||||
- value: true
|
|
||||||
constraints: {}
|
|
||||||
|
|
||||||
system.enableActivityEagerExecution:
|
|
||||||
- value: true
|
|
||||||
constraints: {}
|
|
||||||
|
|
||||||
frontend.enableExecuteMultiOperation:
|
|
||||||
- value: true
|
|
||||||
constraints: {}
|
|
||||||
Reference in New Issue
Block a user