Files
egutierrez 2ca0b1169f docs: update CLAUDE.md with marquez-cli reference
Actualizada la sección de Marquez en CLAUDE.md para referenciar marquez-cli.

Cambios:
- Cambió categoría de "Medio - API REST" a "Fácil - CLI Nativo"
- Añadida información sobre instalación de marquez-cli
- Añadidos ejemplos de uso del CLI (run start/complete/fail, lineage)
- Referencia a MARQUEZ_UTILITIES.md para guía completa
- Actualizada sección de Scripts Helper con marquez-cli como RECOMENDADO

El CLI nativo simplifica la gestión de lineage en comparación con curl/Python.
2026-03-23 23:41:14 +01:00

386 lines
10 KiB
Markdown

# CLAUDE.md - Guía de Manipulación de Servicios
## 🎯 Propósito
Este documento describe qué servicios puedo manipular directamente, cuáles requieren MCPs, y cómo interactuar con cada uno para construir pipelines de datos.
---
## ✅ Servicios que PUEDO Manipular Directamente
### 1. **Dagu** (Fácil - Acceso Total)
-**Capacidad**: Crear, modificar y eliminar workflows (DAGs)
-**Ubicación**: `~/dagu/dags/*.yaml`
-**Uso**: Scheduling, lanzamiento de scripts, orchestración básica
- **Ejemplo**:
```yaml
name: ingest_data
schedule: "0 * * * *" # Cada hora
steps:
- name: fetch
command: python ~/scripts/fetch_data.py
- name: publish
command: ~/scripts/publish_to_nats.sh
depends: [fetch]
```
### 2. **NATS JetStream** (Medio - API REST/CLI)
- ✅ **Capacidad**: Publicar mensajes, crear streams, suscripciones
- ⚠️ **Limitación**: Requiero usar `nats` CLI o scripts con la API
- ✅ **Uso**: Message broker, event streaming, pub/sub
- **Acceso**:
- Puerto 4222: Cliente NATS
- Puerto 8222: HTTP Monitoring API
- **Ejemplo (vía Dagu)**:
```bash
# Publicar a NATS
nats pub data.ingested "$(cat data.json)" --server=nats://localhost:4222
```
### 3. **Bases de Datos** (Fácil - SQL Directo)
- ✅ **PostgreSQL**: Puerto 5434
```bash
psql -h localhost -p 5434 -U postgres -d postgres -c "INSERT INTO..."
```
- ✅ **ClickHouse**: Puertos 8123 (HTTP), 9000 (Native)
```bash
curl -X POST 'http://localhost:8123/' -d "INSERT INTO table VALUES..."
```
- ✅ **Marquez DB**: Puerto 5433 (para metadata)
### 4. **Marquez (OpenLineage)** (Fácil - CLI Nativo)
- ✅ **Capacidad**: Enviar eventos de lineage via CLI `marquez-cli`
- ✅ **Uso**: Rastrear origen/destino de datos en cada paso
- ✅ **CLI Nativo**: `marquez-cli` (binario Go sin dependencias)
- **Instalación**:
```bash
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
make install
```
- **Ejemplo**:
```bash
# Iniciar un run
marquez-cli run start -job my_pipeline -inputs "api://source"
# Completar run
marquez-cli run complete -job my_pipeline -run-id <uuid> -outputs "postgres://table"
# Ver lineage
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
```
- **Documentación**: Ver `MARQUEZ_UTILITIES.md` para guía completa
### 5. **Logs (Prometheus/Loki)** (Medio - Pushgateway/API)
- ✅ **Prometheus**: Exportar métricas vía Pushgateway
- ✅ **Loki**: Enviar logs vía HTTP API
- ✅ **Uso**: Monitoreo, alertas, debugging
---
## ❌ Servicios que NECESITAN MCP
### 1. **Grafana** (Dashboards/Datasources)
- ❌ **Problema**: Crear dashboards complejos requiere UI o API compleja
- 🔧 **Solución**: MCP de Grafana
- Crear datasources programáticamente
- Generar dashboards desde templates
- Configurar alertas
- **Sin MCP puedo**: Usar datasources existentes manualmente
### 2. **Metabase** (Queries/Dashboards)
- ❌ **Problema**: Crear questions/dashboards es vía UI
- 🔧 **Solución**: MCP de Metabase
- Crear queries SQL desde código
- Generar dashboards automáticamente
- Configurar filtros y parámetros
- **Sin MCP puedo**: Ejecutar queries manualmente en la UI
### 3. **Rill** (Dashboards Modernos)
- ❌ **Problema**: Configuración específica de modelos y dashboards
- 🔧 **Solución**: MCP de Rill o manipular archivos YAML
- **Sin MCP puedo**: Editar archivos en `~/rill-data/` si conozco la estructura
---
## 🏗️ Arquitectura de Datos Propuesta
### Flujo Completo (SIEMPRE con Lineage)
```
┌──────────┐
│ DAGU │ ← Scheduling (cron, manual)
│ (Native) │
└────┬─────┘
├─→ [PASO 1: RECOLECCIÓN]
│ ├─→ Script Python/Bash
│ ├─→ API calls, scraping, etc.
│ └─→ 📝 Log a Marquez (source: API)
├─→ [PASO 2: VALIDACIÓN]
│ ├─→ Schema validation
│ ├─→ Data quality checks
│ └─→ 📝 Log a Marquez (transformation)
├─→ [PASO 3: PUBLICACIÓN A NATS]
│ ├─→ NATS JetStream (stream: raw_data)
│ ├─→ Formato: JSON events
│ └─→ 📝 Log a Marquez (target: NATS)
├─→ [PASO 4: CONSUMO E INGESTA]
│ ├─→ Consumer NATS → PostgreSQL
│ ├─→ Consumer NATS → ClickHouse
│ └─→ 📝 Log a Marquez (target: DB)
├─→ [PASO 5: TRANSFORMACIÓN (en Dagu)]
│ ├─→ Python/Pandas o SQL
│ ├─→ Agregaciones, cálculos
│ └─→ 📝 Log a Marquez (transformation)
└─→ [PASO 6: LOGS & MONITORING]
├─→ Prometheus: Métricas (éxito, fallos, tiempo)
├─→ Loki: Logs estructurados
└─→ Grafana: Dashboards en tiempo real
```
---
## 📋 Template de DAG con Lineage
```yaml
name: data_pipeline_template
description: Template para pipelines con lineage completo
tags:
- data-pipeline
- lineage
- production
env:
- MARQUEZ_URL: http://localhost:5000
- NATS_URL: nats://localhost:4222
- POSTGRES_URL: postgresql://postgres:postgres@localhost:5434/postgres
schedule:
- "0 */6 * * *" # Cada 6 horas
steps:
# 1. FETCH DATA
- name: fetch_data
command: |
python ~/dagu/scripts/fetch_data.py \
--output /tmp/raw_data.json \
--log-lineage
# 2. VALIDATE
- name: validate_data
command: |
python ~/dagu/scripts/validate.py \
--input /tmp/raw_data.json \
--log-lineage
depends: [fetch_data]
# 3. PUBLISH TO NATS
- name: publish_to_nats
command: |
nats pub data.raw \
"$(cat /tmp/raw_data.json)" \
--server=$NATS_URL
# Log lineage
python ~/dagu/scripts/log_lineage.py \
--event publish \
--source /tmp/raw_data.json \
--target nats://data.raw
depends: [validate_data]
# 4. INGEST TO POSTGRES
- name: ingest_postgres
command: |
python ~/dagu/scripts/ingest_postgres.py \
--nats-stream data.raw \
--table raw_events \
--log-lineage
depends: [publish_to_nats]
# 5. SEND METRICS
- name: log_metrics
command: |
python ~/dagu/scripts/push_metrics.py \
--job data_pipeline_template \
--success true
depends: [ingest_postgres]
handlers:
failure:
- name: alert_failure
command: |
python ~/dagu/scripts/push_metrics.py \
--job data_pipeline_template \
--success false
```
---
## 🎯 Scripts Helper Disponibles
### ✅ `marquez-cli` (RECOMENDADO - Binario Go Nativo)
**CLI oficial** para gestionar lineage en Marquez. Instalado en `~/.local/bin/marquez-cli`.
```bash
# Iniciar run
marquez-cli run start -job my_pipeline -inputs "api://source"
# Marcar progreso
marquez-cli run running -job my_pipeline -run-id <uuid> \
-inputs "file:///tmp/raw.json" -outputs "file:///tmp/clean.json"
# Completar exitosamente
marquez-cli run complete -job my_pipeline -run-id <uuid> \
-outputs "postgres://table"
# Marcar como fallido
marquez-cli run fail -job my_pipeline -run-id <uuid>
# Ver lineage
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
```
**Documentación completa**: Ver `MARQUEZ_UTILITIES.md`
**Ejemplos de uso**:
- `~/dagu/scripts/examples/simple_pipeline_with_lineage.sh`
- `~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh`
- `~/dagu/dags/example_lineage_tracking.yaml`
---
### Alternativa: `~/dagu/scripts/log_lineage.py` (Python)
Solo si no puedes usar `marquez-cli`:
```python
#!/usr/bin/env python3
import requests
import json
from datetime import datetime
def log_openlineage_event(event_type, source, target, job_name):
"""Envía evento OpenLineage a Marquez"""
event = {
"eventType": event_type, # START, COMPLETE, FAIL
"eventTime": datetime.utcnow().isoformat() + "Z",
"producer": "dagu://pipeline",
"job": {
"namespace": "automatic-process",
"name": job_name
},
"inputs": [{"namespace": "automatic-process", "name": source}],
"outputs": [{"namespace": "automatic-process", "name": target}]
}
requests.post(
"http://localhost:5000/api/v1/lineage",
json=event
)
```
### 2. `~/dagu/scripts/push_metrics.py`
```python
#!/usr/bin/env python3
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
def push_metrics(job_name, success):
"""Push métricas a Prometheus Pushgateway"""
registry = CollectorRegistry()
g = Gauge('job_success', 'Job success status', registry=registry)
g.set(1 if success else 0)
push_to_gateway(
'localhost:9091',
job=job_name,
registry=registry
)
```
### 3. `~/dagu/scripts/publish_to_nats.sh`
```bash
#!/bin/bash
# Publicar a NATS JetStream
nats pub "$1" "$(cat $2)" --server=nats://localhost:4222
```
---
## 🚀 Primeros Pasos
### 1. Instalar CLIs necesarios
```bash
# NATS CLI
curl -sf https://binaries.nats.dev/nats-io/natscli/nats@latest | sh
```
### 2. Crear directorio de scripts
```bash
mkdir -p ~/dagu/scripts
chmod +x ~/dagu/scripts/*.{py,sh}
```
### 3. Configurar variables de entorno
```bash
# Añadir a ~/.bashrc
export MARQUEZ_URL=http://localhost:5000
export NATS_URL=nats://localhost:4222
export POSTGRES_URL=postgresql://postgres:postgres@localhost:5434/postgres
export CLICKHOUSE_URL=http://localhost:8123
```
---
## 📊 MCPs Recomendados (Futuro)
### Prioridad Alta
1. **Grafana MCP** - Automatizar dashboards
2. **PostgreSQL MCP** - Queries complejas y migraciones
3. **ClickHouse MCP** - Queries analíticas
### Prioridad Media
4. **Metabase MCP** - BI self-service
### Prioridad Baja
5. **Rill MCP** - Dashboards modernos
---
## 📝 Checklist para Cada Pipeline
Cuando crees un pipeline, SIEMPRE:
- [ ] Define el schedule en Dagu
- [ ] Log inicio en Marquez (START event)
- [ ] Valida datos antes de procesar
- [ ] Publica a NATS para desacoplar
- [ ] Log cada transformación en Marquez
- [ ] Ingesta a bases de datos
- [ ] Log fin en Marquez (COMPLETE event)
- [ ] Push métricas a Prometheus
- [ ] Envía logs estructurados a Loki
- [ ] Maneja errores (FAIL event a Marquez)
---
## 🔗 URLs de Servicios
- **Dagu**: http://localhost:8090
- **NATS Monitoring**: http://localhost:8222
- **Marquez**: http://localhost:3001
- **Grafana**: http://localhost:3500
- **Prometheus**: http://localhost:9090
- **DBGate**: http://localhost:3300
---
**Última actualización**: 2026-03-23
**Mantenedor**: Claude (Assistant)