Files
automatic-process/CLAUDE.md
T
egutierrez ea84a8e1f8 refactor: remove Temporal in favor of Dagu for transformations
Temporal era overkill para nuestros pipelines de datos típicos.

Cambios:
- Eliminado docker-compose-temporal.yml y configuración
- Removido Temporal de Homer dashboard
- Actualizado README y CLAUDE.md sin referencias a Temporal
- Añadida documentación completa de transformaciones con Dagu

Dagu es suficiente porque:
- Workflows terminan en minutos, no días
- Transformaciones simples/medias (Python/SQL)
- No necesitamos pausar/reanudar workflows
- Menor overhead y más simple de mantener

Si en el futuro necesitamos workflows de larga duración o state complejo,
podemos volver a levantar Temporal.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-23 22:58:53 +01:00

9.1 KiB

CLAUDE.md - Guía de Manipulación de Servicios

🎯 Propósito

Este documento describe qué servicios puedo manipular directamente, cuáles requieren MCPs, y cómo interactuar con cada uno para construir pipelines de datos.


Servicios que PUEDO Manipular Directamente

1. Dagu (Fácil - Acceso Total)

  • Capacidad: Crear, modificar y eliminar workflows (DAGs)
  • Ubicación: ~/dagu/dags/*.yaml
  • Uso: Scheduling, lanzamiento de scripts, orchestración básica
  • Ejemplo:
    name: ingest_data
    schedule: "0 * * * *"  # Cada hora
    steps:
      - name: fetch
        command: python ~/scripts/fetch_data.py
      - name: publish
        command: ~/scripts/publish_to_nats.sh
        depends: [fetch]
    

2. NATS JetStream (Medio - API REST/CLI)

  • Capacidad: Publicar mensajes, crear streams, suscripciones
  • ⚠️ Limitación: Requiero usar nats CLI o scripts con la API
  • Uso: Message broker, event streaming, pub/sub
  • Acceso:
    • Puerto 4222: Cliente NATS
    • Puerto 8222: HTTP Monitoring API
  • Ejemplo (vía Dagu):
    # Publicar a NATS
    nats pub data.ingested "$(cat data.json)" --server=nats://localhost:4222
    

3. Bases de Datos (Fácil - SQL Directo)

  • PostgreSQL: Puerto 5434
    psql -h localhost -p 5434 -U postgres -d postgres -c "INSERT INTO..."
    
  • ClickHouse: Puertos 8123 (HTTP), 9000 (Native)
    curl -X POST 'http://localhost:8123/' -d "INSERT INTO table VALUES..."
    
  • Marquez DB: Puerto 5433 (para metadata)

4. Marquez (OpenLineage) (Medio - API REST)

  • Capacidad: Enviar eventos de lineage via API
  • Uso: Rastrear origen/destino de datos en cada paso
  • Ejemplo:
    curl -X POST http://localhost:5000/api/v1/lineage \
      -H "Content-Type: application/json" \
      -d @lineage_event.json
    

5. Logs (Prometheus/Loki) (Medio - Pushgateway/API)

  • Prometheus: Exportar métricas vía Pushgateway
  • Loki: Enviar logs vía HTTP API
  • Uso: Monitoreo, alertas, debugging

Servicios que NECESITAN MCP

1. Grafana (Dashboards/Datasources)

  • Problema: Crear dashboards complejos requiere UI o API compleja
  • 🔧 Solución: MCP de Grafana
    • Crear datasources programáticamente
    • Generar dashboards desde templates
    • Configurar alertas
  • Sin MCP puedo: Usar datasources existentes manualmente

2. Metabase (Queries/Dashboards)

  • Problema: Crear questions/dashboards es vía UI
  • 🔧 Solución: MCP de Metabase
    • Crear queries SQL desde código
    • Generar dashboards automáticamente
    • Configurar filtros y parámetros
  • Sin MCP puedo: Ejecutar queries manualmente en la UI

3. Rill (Dashboards Modernos)

  • Problema: Configuración específica de modelos y dashboards
  • 🔧 Solución: MCP de Rill o manipular archivos YAML
  • Sin MCP puedo: Editar archivos en ~/rill-data/ si conozco la estructura

🏗️ Arquitectura de Datos Propuesta

Flujo Completo (SIEMPRE con Lineage)

┌──────────┐
│  DAGU    │ ← Scheduling (cron, manual)
│ (Native) │
└────┬─────┘
     │
     ├─→ [PASO 1: RECOLECCIÓN]
     │   ├─→ Script Python/Bash
     │   ├─→ API calls, scraping, etc.
     │   └─→ 📝 Log a Marquez (source: API)
     │
     ├─→ [PASO 2: VALIDACIÓN]
     │   ├─→ Schema validation
     │   ├─→ Data quality checks
     │   └─→ 📝 Log a Marquez (transformation)
     │
     ├─→ [PASO 3: PUBLICACIÓN A NATS]
     │   ├─→ NATS JetStream (stream: raw_data)
     │   ├─→ Formato: JSON events
     │   └─→ 📝 Log a Marquez (target: NATS)
     │
     ├─→ [PASO 4: CONSUMO E INGESTA]
     │   ├─→ Consumer NATS → PostgreSQL
     │   ├─→ Consumer NATS → ClickHouse
     │   └─→ 📝 Log a Marquez (target: DB)
     │
     ├─→ [PASO 5: TRANSFORMACIÓN (en Dagu)]
     │   ├─→ Python/Pandas o SQL
     │   ├─→ Agregaciones, cálculos
     │   └─→ 📝 Log a Marquez (transformation)
     │
     └─→ [PASO 6: LOGS & MONITORING]
         ├─→ Prometheus: Métricas (éxito, fallos, tiempo)
         ├─→ Loki: Logs estructurados
         └─→ Grafana: Dashboards en tiempo real

📋 Template de DAG con Lineage

name: data_pipeline_template
description: Template para pipelines con lineage completo

tags:
  - data-pipeline
  - lineage
  - production

env:
  - MARQUEZ_URL: http://localhost:5000
  - NATS_URL: nats://localhost:4222
  - POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres

schedule:
  - "0 */6 * * *"  # Cada 6 horas

steps:
  # 1. FETCH DATA
  - name: fetch_data
    command: |
      python ~/dagu/scripts/fetch_data.py \
        --output /tmp/raw_data.json \
        --log-lineage

  # 2. VALIDATE
  - name: validate_data
    command: |
      python ~/dagu/scripts/validate.py \
        --input /tmp/raw_data.json \
        --log-lineage
    depends: [fetch_data]

  # 3. PUBLISH TO NATS
  - name: publish_to_nats
    command: |
      nats pub data.raw \
        "$(cat /tmp/raw_data.json)" \
        --server=$NATS_URL

      # Log lineage
      python ~/dagu/scripts/log_lineage.py \
        --event publish \
        --source /tmp/raw_data.json \
        --target nats://data.raw
    depends: [validate_data]

  # 4. INGEST TO POSTGRES
  - name: ingest_postgres
    command: |
      python ~/dagu/scripts/ingest_postgres.py \
        --nats-stream data.raw \
        --table raw_events \
        --log-lineage
    depends: [publish_to_nats]

  # 5. SEND METRICS
  - name: log_metrics
    command: |
      python ~/dagu/scripts/push_metrics.py \
        --job data_pipeline_template \
        --success true
    depends: [ingest_postgres]

handlers:
  failure:
    - name: alert_failure
      command: |
        python ~/dagu/scripts/push_metrics.py \
          --job data_pipeline_template \
          --success false

🎯 Scripts Helper Necesarios

1. ~/dagu/scripts/log_lineage.py

#!/usr/bin/env python3
import requests
import json
from datetime import datetime

def log_openlineage_event(event_type, source, target, job_name):
    """Envía evento OpenLineage a Marquez"""
    event = {
        "eventType": event_type,  # START, COMPLETE, FAIL
        "eventTime": datetime.utcnow().isoformat() + "Z",
        "producer": "dagu://pipeline",
        "job": {
            "namespace": "automatic-process",
            "name": job_name
        },
        "inputs": [{"namespace": "automatic-process", "name": source}],
        "outputs": [{"namespace": "automatic-process", "name": target}]
    }

    requests.post(
        "http://localhost:5000/api/v1/lineage",
        json=event
    )

2. ~/dagu/scripts/push_metrics.py

#!/usr/bin/env python3
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

def push_metrics(job_name, success):
    """Push métricas a Prometheus Pushgateway"""
    registry = CollectorRegistry()
    g = Gauge('job_success', 'Job success status', registry=registry)
    g.set(1 if success else 0)

    push_to_gateway(
        'localhost:9091',
        job=job_name,
        registry=registry
    )

3. ~/dagu/scripts/publish_to_nats.sh

#!/bin/bash
# Publicar a NATS JetStream
nats pub "$1" "$(cat $2)" --server=nats://localhost:4222

🚀 Primeros Pasos

1. Instalar CLIs necesarios

# NATS CLI
curl -sf https://binaries.nats.dev/nats-io/natscli/nats@latest | sh

2. Crear directorio de scripts

mkdir -p ~/dagu/scripts
chmod +x ~/dagu/scripts/*.{py,sh}

3. Configurar variables de entorno

# Añadir a ~/.bashrc
export MARQUEZ_URL=http://localhost:5000
export NATS_URL=nats://localhost:4222
export POSTGRES_URL=postgresql://postgres:postgres@localhost:5434/postgres
export CLICKHOUSE_URL=http://localhost:8123

📊 MCPs Recomendados (Futuro)

Prioridad Alta

  1. Grafana MCP - Automatizar dashboards
  2. PostgreSQL MCP - Queries complejas y migraciones
  3. ClickHouse MCP - Queries analíticas

Prioridad Media

  1. Metabase MCP - BI self-service

Prioridad Baja

  1. Rill MCP - Dashboards modernos

📝 Checklist para Cada Pipeline

Cuando crees un pipeline, SIEMPRE:

  • Define el schedule en Dagu
  • Log inicio en Marquez (START event)
  • Valida datos antes de procesar
  • Publica a NATS para desacoplar
  • Log cada transformación en Marquez
  • Ingesta a bases de datos
  • Log fin en Marquez (COMPLETE event)
  • Push métricas a Prometheus
  • Envía logs estructurados a Loki
  • Maneja errores (FAIL event a Marquez)

🔗 URLs de Servicios


Última actualización: 2026-03-23 Mantenedor: Claude (Assistant)