diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..b175ed8 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,338 @@ +# CLAUDE.md - Guía de Manipulación de Servicios + +## 🎯 Propósito + +Este documento describe qué servicios puedo manipular directamente, cuáles requieren MCPs, y cómo interactuar con cada uno para construir pipelines de datos. + +--- + +## ✅ Servicios que PUEDO Manipular Directamente + +### 1. **Dagu** (Fácil - Acceso Total) +- ✅ **Capacidad**: Crear, modificar y eliminar workflows (DAGs) +- ✅ **Ubicación**: `~/dagu/dags/*.yaml` +- ✅ **Uso**: Scheduling, lanzamiento de scripts, orchestración básica +- **Ejemplo**: + ```yaml + name: ingest_data + schedule: "0 * * * *" # Cada hora + steps: + - name: fetch + command: python ~/scripts/fetch_data.py + - name: publish + command: ~/scripts/publish_to_nats.sh + depends: [fetch] + ``` + +### 2. **NATS JetStream** (Medio - API REST/CLI) +- ✅ **Capacidad**: Publicar mensajes, crear streams, suscripciones +- ⚠️ **Limitación**: Requiero usar `nats` CLI o scripts con la API +- ✅ **Uso**: Message broker, event streaming, pub/sub +- **Acceso**: + - Puerto 4222: Cliente NATS + - Puerto 8222: HTTP Monitoring API +- **Ejemplo (vía Dagu)**: + ```bash + # Publicar a NATS + nats pub data.ingested "$(cat data.json)" --server=nats://localhost:4222 + ``` + +### 3. **Bases de Datos** (Fácil - SQL Directo) +- ✅ **PostgreSQL**: Puerto 5434 + ```bash + psql -h localhost -p 5434 -U postgres -d postgres -c "INSERT INTO..." + ``` +- ✅ **ClickHouse**: Puertos 8123 (HTTP), 9000 (Native) + ```bash + curl -X POST 'http://localhost:8123/' -d "INSERT INTO table VALUES..." + ``` +- ✅ **Marquez DB**: Puerto 5433 (para metadata) + +### 4. **Marquez (OpenLineage)** (Medio - API REST) +- ✅ **Capacidad**: Enviar eventos de lineage via API +- ✅ **Uso**: Rastrear origen/destino de datos en cada paso +- **Ejemplo**: + ```bash + curl -X POST http://localhost:5000/api/v1/lineage \ + -H "Content-Type: application/json" \ + -d @lineage_event.json + ``` + +### 5. **Logs (Prometheus/Loki)** (Medio - Pushgateway/API) +- ✅ **Prometheus**: Exportar métricas vía Pushgateway +- ✅ **Loki**: Enviar logs vía HTTP API +- ✅ **Uso**: Monitoreo, alertas, debugging + +--- + +## ❌ Servicios que NECESITAN MCP + +### 1. **Grafana** (Dashboards/Datasources) +- ❌ **Problema**: Crear dashboards complejos requiere UI o API compleja +- 🔧 **Solución**: MCP de Grafana + - Crear datasources programáticamente + - Generar dashboards desde templates + - Configurar alertas +- **Sin MCP puedo**: Usar datasources existentes manualmente + +### 2. **Metabase** (Queries/Dashboards) +- ❌ **Problema**: Crear questions/dashboards es vía UI +- 🔧 **Solución**: MCP de Metabase + - Crear queries SQL desde código + - Generar dashboards automáticamente + - Configurar filtros y parámetros +- **Sin MCP puedo**: Ejecutar queries manualmente en la UI + +### 3. **Rill** (Dashboards Modernos) +- ❌ **Problema**: Configuración específica de modelos y dashboards +- 🔧 **Solución**: MCP de Rill o manipular archivos YAML +- **Sin MCP puedo**: Editar archivos en `~/rill-data/` si conozco la estructura + +--- + +## 🏗️ Arquitectura de Datos Propuesta + +### Flujo Completo (SIEMPRE con Lineage) + +``` +┌──────────┐ +│ DAGU │ ← Scheduling (cron, manual) +│ (Native) │ +└────┬─────┘ + │ + ├─→ [PASO 1: RECOLECCIÓN] + │ ├─→ Script Python/Bash + │ ├─→ API calls, scraping, etc. + │ └─→ 📝 Log a Marquez (source: API) + │ + ├─→ [PASO 2: VALIDACIÓN] + │ ├─→ Schema validation + │ ├─→ Data quality checks + │ └─→ 📝 Log a Marquez (transformation) + │ + ├─→ [PASO 3: PUBLICACIÓN A NATS] + │ ├─→ NATS JetStream (stream: raw_data) + │ ├─→ Formato: JSON events + │ └─→ 📝 Log a Marquez (target: NATS) + │ + ├─→ [PASO 4: CONSUMO E INGESTA] + │ ├─→ Consumer NATS → PostgreSQL + │ ├─→ Consumer NATS → ClickHouse + │ └─→ 📝 Log a Marquez (target: DB) + │ + ├─→ [PASO 5: TRANSFORMACIÓN (en Dagu)] + │ ├─→ Python/Pandas o SQL + │ ├─→ Agregaciones, cálculos + │ └─→ 📝 Log a Marquez (transformation) + │ + └─→ [PASO 6: LOGS & MONITORING] + ├─→ Prometheus: Métricas (éxito, fallos, tiempo) + ├─→ Loki: Logs estructurados + └─→ Grafana: Dashboards en tiempo real +``` + +--- + +## 📋 Template de DAG con Lineage + +```yaml +name: data_pipeline_template +description: Template para pipelines con lineage completo + +tags: + - data-pipeline + - lineage + - production + +env: + - MARQUEZ_URL: http://localhost:5000 + - NATS_URL: nats://localhost:4222 + - POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres + +schedule: + - "0 */6 * * *" # Cada 6 horas + +steps: + # 1. FETCH DATA + - name: fetch_data + command: | + python ~/dagu/scripts/fetch_data.py \ + --output /tmp/raw_data.json \ + --log-lineage + + # 2. VALIDATE + - name: validate_data + command: | + python ~/dagu/scripts/validate.py \ + --input /tmp/raw_data.json \ + --log-lineage + depends: [fetch_data] + + # 3. PUBLISH TO NATS + - name: publish_to_nats + command: | + nats pub data.raw \ + "$(cat /tmp/raw_data.json)" \ + --server=$NATS_URL + + # Log lineage + python ~/dagu/scripts/log_lineage.py \ + --event publish \ + --source /tmp/raw_data.json \ + --target nats://data.raw + depends: [validate_data] + + # 4. INGEST TO POSTGRES + - name: ingest_postgres + command: | + python ~/dagu/scripts/ingest_postgres.py \ + --nats-stream data.raw \ + --table raw_events \ + --log-lineage + depends: [publish_to_nats] + + # 5. SEND METRICS + - name: log_metrics + command: | + python ~/dagu/scripts/push_metrics.py \ + --job data_pipeline_template \ + --success true + depends: [ingest_postgres] + +handlers: + failure: + - name: alert_failure + command: | + python ~/dagu/scripts/push_metrics.py \ + --job data_pipeline_template \ + --success false +``` + +--- + +## 🎯 Scripts Helper Necesarios + +### 1. `~/dagu/scripts/log_lineage.py` +```python +#!/usr/bin/env python3 +import requests +import json +from datetime import datetime + +def log_openlineage_event(event_type, source, target, job_name): + """Envía evento OpenLineage a Marquez""" + event = { + "eventType": event_type, # START, COMPLETE, FAIL + "eventTime": datetime.utcnow().isoformat() + "Z", + "producer": "dagu://pipeline", + "job": { + "namespace": "automatic-process", + "name": job_name + }, + "inputs": [{"namespace": "automatic-process", "name": source}], + "outputs": [{"namespace": "automatic-process", "name": target}] + } + + requests.post( + "http://localhost:5000/api/v1/lineage", + json=event + ) +``` + +### 2. `~/dagu/scripts/push_metrics.py` +```python +#!/usr/bin/env python3 +from prometheus_client import CollectorRegistry, Gauge, push_to_gateway + +def push_metrics(job_name, success): + """Push métricas a Prometheus Pushgateway""" + registry = CollectorRegistry() + g = Gauge('job_success', 'Job success status', registry=registry) + g.set(1 if success else 0) + + push_to_gateway( + 'localhost:9091', + job=job_name, + registry=registry + ) +``` + +### 3. `~/dagu/scripts/publish_to_nats.sh` +```bash +#!/bin/bash +# Publicar a NATS JetStream +nats pub "$1" "$(cat $2)" --server=nats://localhost:4222 +``` + +--- + +## 🚀 Primeros Pasos + +### 1. Instalar CLIs necesarios +```bash +# NATS CLI +curl -sf https://binaries.nats.dev/nats-io/natscli/nats@latest | sh +``` + +### 2. Crear directorio de scripts +```bash +mkdir -p ~/dagu/scripts +chmod +x ~/dagu/scripts/*.{py,sh} +``` + +### 3. Configurar variables de entorno +```bash +# Añadir a ~/.bashrc +export MARQUEZ_URL=http://localhost:5000 +export NATS_URL=nats://localhost:4222 +export POSTGRES_URL=postgresql://postgres:postgres@localhost:5434/postgres +export CLICKHOUSE_URL=http://localhost:8123 +``` + +--- + +## 📊 MCPs Recomendados (Futuro) + +### Prioridad Alta +1. **Grafana MCP** - Automatizar dashboards +2. **PostgreSQL MCP** - Queries complejas y migraciones +3. **ClickHouse MCP** - Queries analíticas + +### Prioridad Media +4. **Metabase MCP** - BI self-service + +### Prioridad Baja +5. **Rill MCP** - Dashboards modernos + +--- + +## 📝 Checklist para Cada Pipeline + +Cuando crees un pipeline, SIEMPRE: + +- [ ] Define el schedule en Dagu +- [ ] Log inicio en Marquez (START event) +- [ ] Valida datos antes de procesar +- [ ] Publica a NATS para desacoplar +- [ ] Log cada transformación en Marquez +- [ ] Ingesta a bases de datos +- [ ] Log fin en Marquez (COMPLETE event) +- [ ] Push métricas a Prometheus +- [ ] Envía logs estructurados a Loki +- [ ] Maneja errores (FAIL event a Marquez) + +--- + +## 🔗 URLs de Servicios + +- **Dagu**: http://localhost:8090 +- **NATS Monitoring**: http://localhost:8222 +- **Marquez**: http://localhost:3001 +- **Grafana**: http://localhost:3500 +- **Prometheus**: http://localhost:9090 +- **DBGate**: http://localhost:3300 + +--- + +**Última actualización**: 2026-03-23 +**Mantenedor**: Claude (Assistant) diff --git a/README.md b/README.md new file mode 100644 index 0000000..2eaa669 --- /dev/null +++ b/README.md @@ -0,0 +1,523 @@ +# Automatic Process - Suite Completa de Datos + +Plataforma completa de ingesta, procesamiento y visualización de datos con lineage tracking automático. + +--- + +## 🎯 Arquitectura + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ DATA PIPELINE STACK │ +└─────────────────────────────────────────────────────────────────┘ + +📅 SCHEDULING 🔄 MESSAGING 💾 STORAGE +┌──────────┐ ┌──────────┐ ┌──────────┐ +│ Dagu │────────→ │ NATS │────────→ │PostgreSQL│ +│ (Native) │ │JetStream │ │ClickHouse│ +└────┬─────┘ └──────────┘ └──────────┘ + │ │ + │ ⚙️ TRANSFORMATIONS │ + └────────────────────────────────────────────┘ + │ + ↓ + 📊 LINEAGE 📈 VISUALIZATION + ┌──────────┐ ┌──────────┐ + │ Marquez │ │ Grafana │ + │OpenLineage│ │ Metabase │ + └──────────┘ │ Rill │ + ↑ └──────────┘ + │ + 🔍 MONITORING + ┌──────────┐ + │Prometheus│ + │ Loki │ + │ Alloy │ + └──────────┘ +``` + +--- + +## 🚀 Quick Start + +### 1. Iniciar Todos los Servicios + +```bash +# Core services +docker-compose up -d + +# Analytics +docker-compose -f docker-compose-analytics.yml up -d + +# Databases +docker-compose -f docker-compose-databases.yml up -d + +# Lineage +docker-compose -f docker-compose-marquez.yml up -d + +# Messaging +docker-compose -f docker-compose-nats.yml up -d + +# Orchestration (Dagu ya está corriendo como systemd) +systemctl --user status dagu.service +``` + +### 2. Acceder al Dashboard + +**Homer Dashboard**: http://localhost:8080 + +Desde ahí puedes acceder a todos los servicios. + +--- + +## 📦 Servicios Disponibles + +### 🎨 Visualization +| Servicio | Puerto | Descripción | +|----------|--------|-------------| +| **Grafana** | 3500 | Dashboards y alertas | +| **Metabase** | 3200 | Business Intelligence | +| **Rill** | 9009 | BI Dashboard moderno | + +### 📊 Monitoring +| Servicio | Puerto | Descripción | +|----------|--------|-------------| +| **Prometheus** | 9090 | Métricas y alertas | +| **Loki** | 3100 | Agregación de logs | +| **Alloy** | 12345 | Colector de telemetría | + +### 🔄 Orchestration & Transformations +| Servicio | Puerto | Descripción | +|----------|--------|-------------| +| **Dagu** | 8090 | DAG Scheduler & Data Transformations (nativo WSL) | + +### 📨 Messaging +| Servicio | Puerto | Descripción | +|----------|--------|-------------| +| **NATS JetStream** | 4222/8222 | Message broker | + +### 💾 Databases +| Servicio | Puerto | Descripción | +|----------|--------|-------------| +| **PostgreSQL** | 5434 | Base de datos relacional | +| **ClickHouse** | 8123/9000 | Base de datos analítica | +| **DBGate** | 3300 | Database management UI | + +### 🗺️ Data Lineage +| Servicio | Puerto | Descripción | +|----------|--------|-------------| +| **Marquez** | 3001/5000 | OpenLineage tracking | + +--- + +## 🏗️ Crear un Pipeline de Datos + +### Ejemplo: Ingestión desde API + +#### 1. Crear el script de recolección + +```python +# ~/dagu/scripts/fetch_api_data.py +#!/usr/bin/env python3 +import requests +import json +from datetime import datetime + +def fetch_data(): + response = requests.get('https://api.example.com/data') + data = response.json() + + # Guardar temporalmente + with open('/tmp/api_data.json', 'w') as f: + json.dump(data, f) + + # Log a Marquez + log_lineage('START', 'api.example.com', '/tmp/api_data.json') + +if __name__ == '__main__': + fetch_data() +``` + +#### 2. Crear el DAG en Dagu + +```yaml +# ~/dagu/dags/api_ingestion.yaml +name: api_ingestion +description: Ingesta datos desde API cada hora + +schedule: + - "0 * * * *" # Cada hora + +env: + - NATS_URL: nats://localhost:4222 + - POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres + +steps: + # 1. Fetch data from API + - name: fetch + command: python ~/dagu/scripts/fetch_api_data.py + + # 2. Validate data + - name: validate + command: | + python ~/dagu/scripts/validate_schema.py \ + --input /tmp/api_data.json + depends: [fetch] + + # 3. Publish to NATS + - name: publish_nats + command: | + nats pub data.api.raw \ + "$(cat /tmp/api_data.json)" \ + --server=$NATS_URL + depends: [validate] + + # 4. Consume and ingest to PostgreSQL + - name: ingest_postgres + command: | + python ~/dagu/scripts/nats_to_postgres.py \ + --stream data.api.raw \ + --table api_events + depends: [publish_nats] + + # 5. Push metrics + - name: metrics + command: | + python ~/dagu/scripts/push_metrics.py \ + --job api_ingestion \ + --success true + depends: [ingest_postgres] + +handlers: + failure: + - name: alert + command: | + echo "Pipeline failed!" | \ + curl -X POST http://localhost:9093/api/v1/alerts +``` + +#### 3. Monitorear en Grafana + +1. Ir a http://localhost:3500 +2. Crear dashboard con: + - Query a Prometheus: `job_success{job="api_ingestion"}` + - Logs de Loki: `{job="dagu"} |= "api_ingestion"` + +#### 4. Verificar Lineage en Marquez + +1. Ir a http://localhost:3001 +2. Buscar job: `api_ingestion` +3. Ver el grafo completo de datos: + ``` + api.example.com → /tmp/api_data.json → NATS → PostgreSQL + ``` + +--- + +## 📝 Scripts Helper Incluidos + +### `~/dagu/scripts/log_lineage.py` +Envía eventos OpenLineage a Marquez + +```bash +python ~/dagu/scripts/log_lineage.py \ + --event START \ + --source api.example.com \ + --target /tmp/data.json \ + --job my_pipeline +``` + +### `~/dagu/scripts/push_metrics.py` +Publica métricas a Prometheus + +```bash +python ~/dagu/scripts/push_metrics.py \ + --job my_pipeline \ + --success true \ + --duration 45 +``` + +### `~/dagu/scripts/publish_to_nats.sh` +Publica mensajes a NATS JetStream + +```bash +./~/dagu/scripts/publish_to_nats.sh data.stream data.json +``` + +### `~/dagu/scripts/nats_to_postgres.py` +Consume de NATS e ingesta a PostgreSQL + +```bash +python ~/dagu/scripts/nats_to_postgres.py \ + --stream data.raw \ + --table events \ + --batch-size 100 +``` + +--- + +## 🎯 Casos de Uso + +### 1. ETL desde API a Warehouse +``` +API → Dagu (fetch) → NATS → PostgreSQL → Grafana + ↓ + Marquez (lineage tracking) +``` + +### 2. Stream Processing en Tiempo Real +``` +IoT Devices → NATS → Dagu (transform) → ClickHouse → Rill + ↓ + Marquez +``` + +### 3. Reporting Diario +``` +Dagu (schedule) → PostgreSQL (query) → Metabase (dashboard) → Email + ↓ + Marquez +``` + +--- + +## 🔧 Configuración + +### NATS JetStream + +```bash +# Crear stream +nats stream add DATA_STREAM \ + --subjects "data.*" \ + --storage file \ + --retention limits \ + --max-age 7d + +# Ver estado +nats stream ls +nats stream info DATA_STREAM +``` + +### PostgreSQL + +```bash +# Conectar +psql -h localhost -p 5434 -U postgres -d postgres + +# Crear tabla +CREATE TABLE events ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ DEFAULT NOW(), + source VARCHAR(255), + data JSONB, + lineage_job VARCHAR(255) +); +``` + +### ClickHouse + +```bash +# Conectar +clickhouse-client --host localhost --port 9000 + +# Crear tabla +CREATE TABLE events ( + timestamp DateTime, + source String, + data String, + lineage_job String +) ENGINE = MergeTree() +ORDER BY timestamp; +``` + +--- + +## 📊 Monitoring + +### Ver Métricas en Prometheus +``` +http://localhost:9090 + +Queries útiles: +- job_success{job="*"} +- job_duration_seconds{job="*"} +- rate(job_executions_total[5m]) +``` + +### Ver Logs en Grafana +``` +http://localhost:3500 → Explore → Loki + +Queries útiles: +- {job="dagu"} +- {job="dagu"} |= "error" +- {job="dagu"} |= "api_ingestion" +``` + +### Ver Lineage en Marquez +``` +http://localhost:3001 + +Buscar: +- Jobs: api_ingestion, data_transform +- Datasets: /tmp/api_data.json, postgres://events +- Runs: últimas ejecuciones +``` + +--- + +## 🚨 Troubleshooting + +### Dagu no responde +```bash +# Ver logs +journalctl --user -u dagu.service -f + +# Reiniciar +systemctl --user restart dagu.service +``` + +### NATS no conecta +```bash +# Ver estado +docker logs nats + +# Verificar puerto +nats server ping nats://localhost:4222 +``` + +### Base de datos no accesible +```bash +# PostgreSQL +docker logs postgres-main + +# ClickHouse +docker logs clickhouse +``` + +### Marquez no registra eventos +```bash +# Ver logs +docker logs marquez + +# Probar API manualmente +curl http://localhost:5000/api/v1/namespaces +``` + +--- + +## 📚 Documentación Adicional + +- **CLAUDE.md**: Guía técnica de manipulación de servicios +- **TRANSFORMATIONS.md**: Guía completa de transformaciones con Dagu +- **~/dagu/README.md**: Documentación específica de Dagu +- **Dagu Docs**: https://dagu.sh/ +- **OpenLineage Spec**: https://openlineage.io/ +- **NATS Docs**: https://docs.nats.io/ + +--- + +## 🤝 Contribuir + +### Añadir un nuevo pipeline + +1. Crear script en `~/dagu/scripts/` +2. Crear DAG en `~/dagu/dags/` +3. Añadir lineage tracking +4. Crear dashboard en Grafana +5. Documentar en este README + +### Añadir un nuevo servicio + +1. Crear `docker-compose-.yml` +2. Añadir a Homer en `homer/assets/config.yml` +3. Documentar puertos y configuración +4. Actualizar CLAUDE.md si necesita manipulación especial + +--- + +## 📊 Arquitectura Detallada + +``` + ┌─────────────┐ + │ DAGU │ + │ Scheduler │ + └──────┬──────┘ + │ + ┌──────────────┼──────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌───────────┐ ┌───────────┐ ┌───────────┐ + │ Script │ │ Script │ │ Script │ + │ Fetch │ │ Transform │ │ Export │ + └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ + │ │ │ + └──────────────┼──────────────┘ + │ + ┌────▼────┐ + │ NATS │ + │JetStream│ + └────┬────┘ + │ + ┌──────────────┼──────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌──────────┐ ┌──────────┐ ┌──────────┐ + │PostgreSQL│ │ClickHouse│ │ Dagu │ + │ │ │ │ │Transform │ + └────┬─────┘ └────┬─────┘ └────┬─────┘ + │ │ │ + └─────────────┼─────────────┘ + │ + ┌────────────┼────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Grafana │ │Metabase │ │ Rill │ + └─────────┘ └─────────┘ └─────────┘ + + Lineage Tracking (Marquez) + ┌────────────────────────────────┐ + │ API → NATS → DB → Visualization│ + └────────────────────────────────┘ +``` + +--- + +## 🔐 Credenciales + +| Servicio | Usuario | Password | Puerto | +|----------|---------|----------|--------| +| PostgreSQL | postgres | postgres | 5434 | +| ClickHouse | default | clickhouse | 8123 | +| Marquez DB | marquez | marquez | 5433 | +| Metabase DB | metabase | metabase | (interno) | +| NATS | nats | nats123 | 4222 | + +**⚠️ IMPORTANTE**: Cambiar passwords en producción + +--- + +## 📈 Roadmap + +- [ ] Añadir dbt para transformaciones SQL +- [ ] Integrar Airflow como alternativa a Dagu +- [ ] Añadir Kafka como alternativa a NATS +- [ ] Implementar data quality con Great Expectations +- [ ] Dashboard unificado de lineage + monitoring +- [ ] CI/CD para pipelines de datos +- [ ] Disaster recovery y backups automáticos + +--- + +**Última actualización**: 2026-03-23 +**Versión**: 1.0.0 +**Mantenedor**: Lucas (@egutierrez) + +--- + +## 📞 Soporte + +Para issues y preguntas: +- Gitea: https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/automatic-process +- Claude Assistant: Disponible 24/7 para gestión de pipelines diff --git a/TRANSFORMATIONS.md b/TRANSFORMATIONS.md new file mode 100644 index 0000000..86f8a36 --- /dev/null +++ b/TRANSFORMATIONS.md @@ -0,0 +1,582 @@ +# Transformaciones con Dagu + +Guía completa de cómo hacer transformaciones de datos con Dagu. + +--- + +## ✅ Dagu PUEDE hacer transformaciones + +Dagu ejecuta **cualquier script o comando**, por lo que puede hacer: +- ✅ Transformaciones SQL +- ✅ Transformaciones Python/Pandas +- ✅ Agregaciones y cálculos +- ✅ Limpieza de datos +- ✅ Enriquecimiento de datos +- ✅ Joins complejos +- ✅ Transformaciones en streaming + +--- + +## 🎯 Patrón 1: Transformaciones Python/Pandas + +### Ejemplo: Limpieza y agregación + +```yaml +# ~/dagu/dags/transform_sales.yaml +name: transform_sales_data +schedule: "0 2 * * *" # Cada día a las 2 AM + +steps: + # 1. Extract desde PostgreSQL + - name: extract + command: | + python < (SELECT COALESCE(MAX(created_at), '1970-01-01') FROM stg_orders) + ON CONFLICT (order_id) DO UPDATE SET + amount = EXCLUDED.amount, + status = EXCLUDED.status, + processed_at = NOW(); + SQL + + # 2. Transform - Calcular métricas + - name: calc_metrics + command: | + psql $DB_URL <= CURRENT_DATE - INTERVAL '7 days' + GROUP BY DATE(created_at) + ON CONFLICT (date) DO UPDATE SET + total_orders = EXCLUDED.total_orders, + total_revenue = EXCLUDED.total_revenue, + avg_order_value = EXCLUDED.avg_order_value, + completed_orders = EXCLUDED.completed_orders, + cancelled_orders = EXCLUDED.cancelled_orders, + updated_at = NOW(); + SQL + depends: [stage_orders] + + # 3. Transform - Snapshot histórico + - name: snapshot + command: | + psql $DB_URL <= CURRENT_DATE - 30', engine) + reviews = pd.read_sql('SELECT * FROM reviews', engine) + + # Agregaciones de órdenes + order_stats = orders.groupby('customer_id').agg({ + 'order_id': 'count', + 'amount': ['sum', 'mean'], + 'created_at': 'max' + }).reset_index() + order_stats.columns = ['customer_id', 'total_orders', 'total_spent', 'avg_order', 'last_order'] + + # Agregaciones de reviews + review_stats = reviews.groupby('customer_id').agg({ + 'rating': 'mean', + 'review_id': 'count' + }).reset_index() + review_stats.columns = ['customer_id', 'avg_rating', 'total_reviews'] + + # Merge todo + enriched = customers.merge(order_stats, on='customer_id', how='left') + enriched = enriched.merge(review_stats, on='customer_id', how='left') + + # Calcular segmento + enriched['segment'] = enriched.apply(lambda x: + 'VIP' if x['total_spent'] > 1000 else + 'Regular' if x['total_spent'] > 100 else + 'New', axis=1 + ) + + enriched.to_parquet('/tmp/enriched_customers.parquet') + EOF + + # 2. Load enriquecido + - name: load_enriched + command: | + python < '{last_sync}' + """, engine) + + if len(new_data) > 0: + new_data.to_parquet('/tmp/new_data.parquet') + print(f"Found {len(new_data)} new/changed records") + else: + print("No changes detected") + exit(0) + EOF + + # 2. Transformar solo cambios + - name: transform_changes + command: | + python <= today() + GROUP BY event_type, hour, day_of_week + " + depends: [load_to_clickhouse] +``` + +--- + +## 🎯 Patrón 6: Transformación con Dependencias Complejas + +### Ejemplo: DAG con múltiples transformaciones en paralelo + +```yaml +# ~/dagu/dags/complex_transform.yaml +name: complex_multi_transform +schedule: "0 1 * * *" + +steps: + # Paso inicial - Extracción + - name: extract + command: python ~/dagu/scripts/extract_data.py + + # Transformaciones en paralelo + - name: transform_customers + command: python ~/dagu/scripts/transform_customers.py + depends: [extract] + + - name: transform_products + command: python ~/dagu/scripts/transform_products.py + depends: [extract] + + - name: transform_orders + command: python ~/dagu/scripts/transform_orders.py + depends: [extract] + + # Join todo + - name: join_all + command: python ~/dagu/scripts/join_datasets.py + depends: [transform_customers, transform_products, transform_orders] + + # Calcular métricas finales + - name: calc_metrics + command: python ~/dagu/scripts/calculate_metrics.py + depends: [join_all] + + # Cargar a destinos + - name: load_postgres + command: python ~/dagu/scripts/load_postgres.py + depends: [calc_metrics] + + - name: load_clickhouse + command: python ~/dagu/scripts/load_clickhouse.py + depends: [calc_metrics] +``` + +--- + +## 💡 Buenas Prácticas + +### 1. Usa archivos intermedios +```bash +/tmp/raw_data.parquet +/tmp/clean_data.parquet +/tmp/transformed_data.parquet +``` + +### 2. Validaciones entre pasos +```python +# Validar antes de continuar +assert len(df) > 0, "No data to process" +assert df['amount'].sum() > 0, "Invalid amounts" +``` + +### 3. Logs estructurados +```python +import logging +logging.info(f"Processed {len(df)} records in {elapsed:.2f}s") +``` + +### 4. Idempotencia +```sql +-- Usar UPSERT en lugar de INSERT +INSERT ... ON CONFLICT DO UPDATE +``` + +### 5. Cleanup +```yaml +steps: + # ... tus pasos + + - name: cleanup + command: rm -f /tmp/*.parquet + continueOn: + failure: true +``` + +--- + +## 🆚 Dagu vs dbt + +| Feature | Dagu | dbt | +|---------|------|-----| +| SQL transforms | ✅ Sí | ✅ Sí (mejor) | +| Python transforms | ✅ Sí (mejor) | ⚠️ Limitado | +| Scheduling | ✅ Built-in | ❌ Externo | +| Lineage | ⚠️ Manual | ✅ Automático | +| Testing | ⚠️ Manual | ✅ Built-in | +| Docs | ⚠️ Manual | ✅ Automático | + +**Recomendación**: +- Usa **Dagu** para pipelines end-to-end +- Considera **dbt** si haces mucho SQL y quieres lineage automático + +--- + +## 🎯 Resumen + +**Dagu PUEDE hacer transformaciones:** +- ✅ Python/Pandas (limpieza, agregaciones) +- ✅ SQL (staging, métricas, joins) +- ✅ Transformaciones incrementales +- ✅ Multi-tabla con joins complejos +- ✅ Paralelo (múltiples transforms a la vez) +- ✅ ClickHouse (analítica pesada) + +**NO necesitas Temporal para:** +- ❌ Transformaciones simples/medias +- ❌ ETL típico (Extract → Transform → Load) +- ❌ Pipelines que terminan en < 1 hora +- ❌ Agregaciones SQL o Pandas + +**SÍ necesitas Temporal solo si:** +- ✅ Transformación tarda > 1 hora +- ✅ Necesitas pausar/reanudar +- ✅ State machine muy complejo +- ✅ Compensaciones distribuidas + +--- + +**Última actualización**: 2026-03-23 diff --git a/docker-compose-temporal.yml b/docker-compose-temporal.yml deleted file mode 100644 index 4e952f2..0000000 --- a/docker-compose-temporal.yml +++ /dev/null @@ -1,64 +0,0 @@ -services: - temporal-postgresql: - image: postgres:15 - container_name: temporal-db - environment: - POSTGRES_USER: temporal - POSTGRES_PASSWORD: temporal - POSTGRES_DB: temporal - ports: - - "5435:5432" - volumes: - - temporal-postgres-data:/var/lib/postgresql/data - healthcheck: - test: ["CMD", "pg_isready", "-U", "temporal"] - interval: 5s - timeout: 5s - retries: 5 - restart: unless-stopped - - temporal: - image: temporalio/auto-setup:latest - container_name: temporal - depends_on: - temporal-postgresql: - condition: service_healthy - environment: - - DB=postgres12 - - DB_PORT=5432 - - POSTGRES_USER=temporal - - POSTGRES_PWD=temporal - - POSTGRES_SEEDS=temporal-postgresql - - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml - ports: - - "7233:7233" - volumes: - - ./temporal-dynamicconfig:/etc/temporal/config/dynamicconfig - restart: unless-stopped - - temporal-ui: - image: temporalio/ui:latest - container_name: temporal-ui - depends_on: - - temporal - environment: - - TEMPORAL_ADDRESS=temporal:7233 - - TEMPORAL_CORS_ORIGINS=http://localhost:3400 - ports: - - "3400:8080" - restart: unless-stopped - - temporal-admin-tools: - image: temporalio/admin-tools:latest - container_name: temporal-admin-tools - depends_on: - - temporal - environment: - - TEMPORAL_ADDRESS=temporal:7233 - - TEMPORAL_CLI_ADDRESS=temporal:7233 - stdin_open: true - tty: true - restart: unless-stopped - -volumes: - temporal-postgres-data: diff --git a/homer/assets/config.yml b/homer/assets/config.yml index 2be6c89..4b2bdc1 100644 --- a/homer/assets/config.yml +++ b/homer/assets/config.yml @@ -95,16 +95,9 @@ services: - name: "Orchestration" icon: "fas fa-code-branch" items: - - name: "Temporal UI" - logo: "http://localhost:3400/favicon.ico" - subtitle: "Workflow Orchestration" - tag: "orchestration" - url: "http://localhost:3400" - target: "_blank" - - name: "Dagu" logo: "http://localhost:8090/assets/favicon.ico" - subtitle: "DAG Scheduler - Local Scripts" + subtitle: "DAG Scheduler & Transformations" tag: "orchestration" url: "http://localhost:8090" target: "_blank" diff --git a/temporal-dynamicconfig/development-sql.yaml b/temporal-dynamicconfig/development-sql.yaml deleted file mode 100644 index 88c0ec6..0000000 --- a/temporal-dynamicconfig/development-sql.yaml +++ /dev/null @@ -1,20 +0,0 @@ -# Temporal dynamic configuration for development -system.forceSearchAttributesCacheRefreshOnRead: - - value: true - constraints: {} - -frontend.enableUpdateWorkflowExecution: - - value: true - constraints: {} - -history.enableParentClosePolicyWorker: - - value: true - constraints: {} - -system.enableActivityEagerExecution: - - value: true - constraints: {} - -frontend.enableExecuteMultiOperation: - - value: true - constraints: {}