Files
egutierrez ea84a8e1f8 refactor: remove Temporal in favor of Dagu for transformations
Temporal era overkill para nuestros pipelines de datos típicos.

Cambios:
- Eliminado docker-compose-temporal.yml y configuración
- Removido Temporal de Homer dashboard
- Actualizado README y CLAUDE.md sin referencias a Temporal
- Añadida documentación completa de transformaciones con Dagu

Dagu es suficiente porque:
- Workflows terminan en minutos, no días
- Transformaciones simples/medias (Python/SQL)
- No necesitamos pausar/reanudar workflows
- Menor overhead y más simple de mantener

Si en el futuro necesitamos workflows de larga duración o state complejo,
podemos volver a levantar Temporal.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-23 22:58:53 +01:00

524 lines
14 KiB
Markdown

# Automatic Process - Suite Completa de Datos
Plataforma completa de ingesta, procesamiento y visualización de datos con lineage tracking automático.
---
## 🎯 Arquitectura
```
┌─────────────────────────────────────────────────────────────────┐
│ DATA PIPELINE STACK │
└─────────────────────────────────────────────────────────────────┘
📅 SCHEDULING 🔄 MESSAGING 💾 STORAGE
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Dagu │────────→ │ NATS │────────→ │PostgreSQL│
│ (Native) │ │JetStream │ │ClickHouse│
└────┬─────┘ └──────────┘ └──────────┘
│ │
│ ⚙️ TRANSFORMATIONS │
└────────────────────────────────────────────┘
📊 LINEAGE 📈 VISUALIZATION
┌──────────┐ ┌──────────┐
│ Marquez │ │ Grafana │
│OpenLineage│ │ Metabase │
└──────────┘ │ Rill │
↑ └──────────┘
🔍 MONITORING
┌──────────┐
│Prometheus│
│ Loki │
│ Alloy │
└──────────┘
```
---
## 🚀 Quick Start
### 1. Iniciar Todos los Servicios
```bash
# Core services
docker-compose up -d
# Analytics
docker-compose -f docker-compose-analytics.yml up -d
# Databases
docker-compose -f docker-compose-databases.yml up -d
# Lineage
docker-compose -f docker-compose-marquez.yml up -d
# Messaging
docker-compose -f docker-compose-nats.yml up -d
# Orchestration (Dagu ya está corriendo como systemd)
systemctl --user status dagu.service
```
### 2. Acceder al Dashboard
**Homer Dashboard**: http://localhost:8080
Desde ahí puedes acceder a todos los servicios.
---
## 📦 Servicios Disponibles
### 🎨 Visualization
| Servicio | Puerto | Descripción |
|----------|--------|-------------|
| **Grafana** | 3500 | Dashboards y alertas |
| **Metabase** | 3200 | Business Intelligence |
| **Rill** | 9009 | BI Dashboard moderno |
### 📊 Monitoring
| Servicio | Puerto | Descripción |
|----------|--------|-------------|
| **Prometheus** | 9090 | Métricas y alertas |
| **Loki** | 3100 | Agregación de logs |
| **Alloy** | 12345 | Colector de telemetría |
### 🔄 Orchestration & Transformations
| Servicio | Puerto | Descripción |
|----------|--------|-------------|
| **Dagu** | 8090 | DAG Scheduler & Data Transformations (nativo WSL) |
### 📨 Messaging
| Servicio | Puerto | Descripción |
|----------|--------|-------------|
| **NATS JetStream** | 4222/8222 | Message broker |
### 💾 Databases
| Servicio | Puerto | Descripción |
|----------|--------|-------------|
| **PostgreSQL** | 5434 | Base de datos relacional |
| **ClickHouse** | 8123/9000 | Base de datos analítica |
| **DBGate** | 3300 | Database management UI |
### 🗺️ Data Lineage
| Servicio | Puerto | Descripción |
|----------|--------|-------------|
| **Marquez** | 3001/5000 | OpenLineage tracking |
---
## 🏗️ Crear un Pipeline de Datos
### Ejemplo: Ingestión desde API
#### 1. Crear el script de recolección
```python
# ~/dagu/scripts/fetch_api_data.py
#!/usr/bin/env python3
import requests
import json
from datetime import datetime
def fetch_data():
response = requests.get('https://api.example.com/data')
data = response.json()
# Guardar temporalmente
with open('/tmp/api_data.json', 'w') as f:
json.dump(data, f)
# Log a Marquez
log_lineage('START', 'api.example.com', '/tmp/api_data.json')
if __name__ == '__main__':
fetch_data()
```
#### 2. Crear el DAG en Dagu
```yaml
# ~/dagu/dags/api_ingestion.yaml
name: api_ingestion
description: Ingesta datos desde API cada hora
schedule:
- "0 * * * *" # Cada hora
env:
- NATS_URL: nats://localhost:4222
- POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres
steps:
# 1. Fetch data from API
- name: fetch
command: python ~/dagu/scripts/fetch_api_data.py
# 2. Validate data
- name: validate
command: |
python ~/dagu/scripts/validate_schema.py \
--input /tmp/api_data.json
depends: [fetch]
# 3. Publish to NATS
- name: publish_nats
command: |
nats pub data.api.raw \
"$(cat /tmp/api_data.json)" \
--server=$NATS_URL
depends: [validate]
# 4. Consume and ingest to PostgreSQL
- name: ingest_postgres
command: |
python ~/dagu/scripts/nats_to_postgres.py \
--stream data.api.raw \
--table api_events
depends: [publish_nats]
# 5. Push metrics
- name: metrics
command: |
python ~/dagu/scripts/push_metrics.py \
--job api_ingestion \
--success true
depends: [ingest_postgres]
handlers:
failure:
- name: alert
command: |
echo "Pipeline failed!" | \
curl -X POST http://localhost:9093/api/v1/alerts
```
#### 3. Monitorear en Grafana
1. Ir a http://localhost:3500
2. Crear dashboard con:
- Query a Prometheus: `job_success{job="api_ingestion"}`
- Logs de Loki: `{job="dagu"} |= "api_ingestion"`
#### 4. Verificar Lineage en Marquez
1. Ir a http://localhost:3001
2. Buscar job: `api_ingestion`
3. Ver el grafo completo de datos:
```
api.example.com → /tmp/api_data.json → NATS → PostgreSQL
```
---
## 📝 Scripts Helper Incluidos
### `~/dagu/scripts/log_lineage.py`
Envía eventos OpenLineage a Marquez
```bash
python ~/dagu/scripts/log_lineage.py \
--event START \
--source api.example.com \
--target /tmp/data.json \
--job my_pipeline
```
### `~/dagu/scripts/push_metrics.py`
Publica métricas a Prometheus
```bash
python ~/dagu/scripts/push_metrics.py \
--job my_pipeline \
--success true \
--duration 45
```
### `~/dagu/scripts/publish_to_nats.sh`
Publica mensajes a NATS JetStream
```bash
./~/dagu/scripts/publish_to_nats.sh data.stream data.json
```
### `~/dagu/scripts/nats_to_postgres.py`
Consume de NATS e ingesta a PostgreSQL
```bash
python ~/dagu/scripts/nats_to_postgres.py \
--stream data.raw \
--table events \
--batch-size 100
```
---
## 🎯 Casos de Uso
### 1. ETL desde API a Warehouse
```
API → Dagu (fetch) → NATS → PostgreSQL → Grafana
Marquez (lineage tracking)
```
### 2. Stream Processing en Tiempo Real
```
IoT Devices → NATS → Dagu (transform) → ClickHouse → Rill
Marquez
```
### 3. Reporting Diario
```
Dagu (schedule) → PostgreSQL (query) → Metabase (dashboard) → Email
Marquez
```
---
## 🔧 Configuración
### NATS JetStream
```bash
# Crear stream
nats stream add DATA_STREAM \
--subjects "data.*" \
--storage file \
--retention limits \
--max-age 7d
# Ver estado
nats stream ls
nats stream info DATA_STREAM
```
### PostgreSQL
```bash
# Conectar
psql -h localhost -p 5434 -U postgres -d postgres
# Crear tabla
CREATE TABLE events (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT NOW(),
source VARCHAR(255),
data JSONB,
lineage_job VARCHAR(255)
);
```
### ClickHouse
```bash
# Conectar
clickhouse-client --host localhost --port 9000
# Crear tabla
CREATE TABLE events (
timestamp DateTime,
source String,
data String,
lineage_job String
) ENGINE = MergeTree()
ORDER BY timestamp;
```
---
## 📊 Monitoring
### Ver Métricas en Prometheus
```
http://localhost:9090
Queries útiles:
- job_success{job="*"}
- job_duration_seconds{job="*"}
- rate(job_executions_total[5m])
```
### Ver Logs en Grafana
```
http://localhost:3500 → Explore → Loki
Queries útiles:
- {job="dagu"}
- {job="dagu"} |= "error"
- {job="dagu"} |= "api_ingestion"
```
### Ver Lineage en Marquez
```
http://localhost:3001
Buscar:
- Jobs: api_ingestion, data_transform
- Datasets: /tmp/api_data.json, postgres://events
- Runs: últimas ejecuciones
```
---
## 🚨 Troubleshooting
### Dagu no responde
```bash
# Ver logs
journalctl --user -u dagu.service -f
# Reiniciar
systemctl --user restart dagu.service
```
### NATS no conecta
```bash
# Ver estado
docker logs nats
# Verificar puerto
nats server ping nats://localhost:4222
```
### Base de datos no accesible
```bash
# PostgreSQL
docker logs postgres-main
# ClickHouse
docker logs clickhouse
```
### Marquez no registra eventos
```bash
# Ver logs
docker logs marquez
# Probar API manualmente
curl http://localhost:5000/api/v1/namespaces
```
---
## 📚 Documentación Adicional
- **CLAUDE.md**: Guía técnica de manipulación de servicios
- **TRANSFORMATIONS.md**: Guía completa de transformaciones con Dagu
- **~/dagu/README.md**: Documentación específica de Dagu
- **Dagu Docs**: https://dagu.sh/
- **OpenLineage Spec**: https://openlineage.io/
- **NATS Docs**: https://docs.nats.io/
---
## 🤝 Contribuir
### Añadir un nuevo pipeline
1. Crear script en `~/dagu/scripts/`
2. Crear DAG en `~/dagu/dags/`
3. Añadir lineage tracking
4. Crear dashboard en Grafana
5. Documentar en este README
### Añadir un nuevo servicio
1. Crear `docker-compose-<servicio>.yml`
2. Añadir a Homer en `homer/assets/config.yml`
3. Documentar puertos y configuración
4. Actualizar CLAUDE.md si necesita manipulación especial
---
## 📊 Arquitectura Detallada
```
┌─────────────┐
│ DAGU │
│ Scheduler │
└──────┬──────┘
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Script │ │ Script │ │ Script │
│ Fetch │ │ Transform │ │ Export │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└──────────────┼──────────────┘
┌────▼────┐
│ NATS │
│JetStream│
└────┬────┘
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│PostgreSQL│ │ClickHouse│ │ Dagu │
│ │ │ │ │Transform │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└─────────────┼─────────────┘
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Grafana │ │Metabase │ │ Rill │
└─────────┘ └─────────┘ └─────────┘
Lineage Tracking (Marquez)
┌────────────────────────────────┐
│ API → NATS → DB → Visualization│
└────────────────────────────────┘
```
---
## 🔐 Credenciales
| Servicio | Usuario | Password | Puerto |
|----------|---------|----------|--------|
| PostgreSQL | postgres | postgres | 5434 |
| ClickHouse | default | clickhouse | 8123 |
| Marquez DB | marquez | marquez | 5433 |
| Metabase DB | metabase | metabase | (interno) |
| NATS | nats | nats123 | 4222 |
**⚠️ IMPORTANTE**: Cambiar passwords en producción
---
## 📈 Roadmap
- [ ] Añadir dbt para transformaciones SQL
- [ ] Integrar Airflow como alternativa a Dagu
- [ ] Añadir Kafka como alternativa a NATS
- [ ] Implementar data quality con Great Expectations
- [ ] Dashboard unificado de lineage + monitoring
- [ ] CI/CD para pipelines de datos
- [ ] Disaster recovery y backups automáticos
---
**Última actualización**: 2026-03-23
**Versión**: 1.0.0
**Mantenedor**: Lucas (@egutierrez)
---
## 📞 Soporte
Para issues y preguntas:
- Gitea: https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/automatic-process
- Claude Assistant: Disponible 24/7 para gestión de pipelines