diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8933312 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +rill-data/tmp/ diff --git a/CLAUDE.md b/CLAUDE.md index b175ed8..cc25b94 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -48,15 +48,27 @@ Este documento describe qué servicios puedo manipular directamente, cuáles req ``` - ✅ **Marquez DB**: Puerto 5433 (para metadata) -### 4. **Marquez (OpenLineage)** (Medio - API REST) -- ✅ **Capacidad**: Enviar eventos de lineage via API +### 4. **Marquez (OpenLineage)** (Fácil - CLI Nativo) +- ✅ **Capacidad**: Enviar eventos de lineage via CLI `marquez-cli` - ✅ **Uso**: Rastrear origen/destino de datos en cada paso +- ✅ **CLI Nativo**: `marquez-cli` (binario Go sin dependencias) +- **Instalación**: + ```bash + cd ~/AutomaticProyects/automatic_process/tools/marquez-cli + make install + ``` - **Ejemplo**: ```bash - curl -X POST http://localhost:5000/api/v1/lineage \ - -H "Content-Type: application/json" \ - -d @lineage_event.json + # Iniciar un run + marquez-cli run start -job my_pipeline -inputs "api://source" + + # Completar run + marquez-cli run complete -job my_pipeline -run-id -outputs "postgres://table" + + # Ver lineage + marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" ``` +- **Documentación**: Ver `MARQUEZ_UTILITIES.md` para guía completa ### 5. **Logs (Prometheus/Loki)** (Medio - Pushgateway/API) - ✅ **Prometheus**: Exportar métricas vía Pushgateway @@ -210,9 +222,44 @@ handlers: --- -## 🎯 Scripts Helper Necesarios +## 🎯 Scripts Helper Disponibles + +### ✅ `marquez-cli` (RECOMENDADO - Binario Go Nativo) + +**CLI oficial** para gestionar lineage en Marquez. Instalado en `~/.local/bin/marquez-cli`. + +```bash +# Iniciar run +marquez-cli run start -job my_pipeline -inputs "api://source" + +# Marcar progreso +marquez-cli run running -job my_pipeline -run-id \ + -inputs "file:///tmp/raw.json" -outputs "file:///tmp/clean.json" + +# Completar exitosamente +marquez-cli run complete -job my_pipeline -run-id \ + -outputs "postgres://table" + +# Marcar como fallido +marquez-cli run fail -job my_pipeline -run-id + +# Ver lineage +marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" +``` + +**Documentación completa**: Ver `MARQUEZ_UTILITIES.md` + +**Ejemplos de uso**: +- `~/dagu/scripts/examples/simple_pipeline_with_lineage.sh` +- `~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh` +- `~/dagu/dags/example_lineage_tracking.yaml` + +--- + +### Alternativa: `~/dagu/scripts/log_lineage.py` (Python) + +Solo si no puedes usar `marquez-cli`: -### 1. `~/dagu/scripts/log_lineage.py` ```python #!/usr/bin/env python3 import requests diff --git a/MARQUEZ_UTILITIES.md b/MARQUEZ_UTILITIES.md new file mode 100644 index 0000000..6823c76 --- /dev/null +++ b/MARQUEZ_UTILITIES.md @@ -0,0 +1,409 @@ +# Marquez CLI Utilities + +Utilidades en Go para gestionar **datasets**, **jobs** y **runs** con lineage tracking completo en Marquez (OpenLineage). + +--- + +## 📦 ¿Qué se Creó? + +### 1. **Binario `marquez-cli`** (Go) + +Herramienta CLI completa para interactuar con Marquez: + +- ✅ Registrar y consultar **datasets** +- ✅ Registrar y consultar **jobs** +- ✅ Enviar eventos de **runs** (START, RUNNING, COMPLETE, FAIL) +- ✅ Consultar **lineage** de datasets +- ✅ Listar recursos (namespaces, jobs, datasets) +- ✅ Sin dependencias externas (solo Go stdlib) +- ✅ Binario estático de ~5MB + +**Ubicación**: `~/AutomaticProyects/automatic_process/tools/marquez-cli/` + +**Instalación**: +```bash +cd ~/AutomaticProyects/automatic_process/tools/marquez-cli +make install +``` + +**Uso**: +```bash +marquez-cli help +marquez-cli version +``` + +--- + +### 2. **Scripts de Ejemplo** + +#### a) `simple_pipeline_with_lineage.sh` + +Pipeline simple que demuestra: +- Generación de Run ID +- Eventos START, RUNNING, COMPLETE +- Tracking de transformaciones +- Manejo de errores + +**Ubicación**: `~/dagu/scripts/examples/simple_pipeline_with_lineage.sh` + +**Uso**: +```bash +~/dagu/scripts/examples/simple_pipeline_with_lineage.sh +``` + +#### b) `etl_to_postgres_with_lineage.sh` + +ETL completo con carga a PostgreSQL: +- Extract desde API +- Transform con jq +- Load a PostgreSQL +- Lineage completo del flujo + +**Ubicación**: `~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh` + +**Uso**: +```bash +~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh +``` + +--- + +### 3. **DAG de Ejemplo para Dagu** + +DAG completo con lineage tracking integrado: +- Generación de Run ID único +- Eventos en cada paso del pipeline +- Handler de errores (FAIL event) +- Handler de éxito +- Cleanup de archivos temporales + +**Ubicación**: `~/dagu/dags/example_lineage_tracking.yaml` + +**Uso**: +```bash +# Ejecutar manualmente desde Dagu UI +http://localhost:8090 + +# O desde CLI +dagu start example_lineage_tracking +``` + +--- + +## 🎯 Conceptos Clave de Marquez + +### Datasets + +Representan **fuentes de datos** (tablas, archivos, APIs, streams). + +**Naming Convention**: +``` +postgres://host:port/db/schema/table +clickhouse://host:port/database/table +nats://host:port/subject +file:///absolute/path +api://domain/endpoint +``` + +**Comandos**: +```bash +# Registrar dataset +marquez-cli dataset register -name "postgres://localhost:5434/postgres/public/events" + +# Listar datasets +marquez-cli list datasets +``` + +--- + +### Jobs + +Representan **procesos/pipelines** que leen/escriben datasets. + +**Naming Convention**: +- Usa nombres descriptivos: `fetch_api_data`, `transform_sales` +- Evita guiones, usa guiones bajos +- Mismo nombre que el DAG en Dagu + +**Comandos**: +```bash +# Registrar job +marquez-cli job register -name my_pipeline + +# Listar jobs +marquez-cli list jobs + +# Ver runs de un job +marquez-cli job runs -name my_pipeline +``` + +--- + +### Runs + +Representan **ejecuciones** de un job con inputs/outputs específicos. + +**Lifecycle**: +1. **START** - Inicio de ejecución +2. **RUNNING** - Progreso (opcional, múltiples) +3. **COMPLETE** o **FAIL** - Finalización + +**Comandos**: +```bash +RUN_ID=$(uuidgen) + +# START +marquez-cli run start -job my_job -run-id $RUN_ID + +# RUNNING (progreso intermedio) +marquez-cli run running -job my_job -run-id $RUN_ID \ + -inputs "file:///tmp/raw.json" \ + -outputs "file:///tmp/clean.json" + +# COMPLETE +marquez-cli run complete -job my_job -run-id $RUN_ID \ + -outputs "postgres://table" + +# FAIL (en caso de error) +marquez-cli run fail -job my_job -run-id $RUN_ID +``` + +--- + +## 📋 Reglas de Lineage + +### ✅ SIEMPRE debes: + +1. **Enviar evento START** al inicio del pipeline +2. **Usar el mismo Run ID** en todos los eventos del mismo run +3. **Declarar TODOS los inputs** (APIs, archivos, tablas que lees) +4. **Declarar TODOS los outputs** (archivos, streams, tablas que escribes) +5. **Enviar evento COMPLETE** al finalizar exitosamente +6. **Enviar evento FAIL** si hay errores (usar trap/handler) +7. **Usar URIs bien formados** para datasets +8. **Usar namespace consistente** (`automatic-process`) + +### ❌ NUNCA debes: + +1. Omitir el evento START +2. Olvidar el evento COMPLETE/FAIL +3. Cambiar el Run ID en medio del pipeline +4. Usar nombres ambiguos para datasets +5. Saltarte el lineage en pipelines críticos + +--- + +## 🔍 Recuperar Lineage + +### Método 1: CLI + +```bash +# Ver lineage de un dataset +marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" + +# Formato JSON (para scripts) +marquez-cli lineage -name "postgres://table" -format json + +# Con más profundidad +marquez-cli lineage -name "postgres://table" -depth 20 +``` + +### Método 2: Web UI + +```bash +# Abrir Marquez Web UI +xdg-open http://localhost:3001 + +# Buscar por job o dataset +# Ver grafo visual de lineage +``` + +### Método 3: API REST + +```bash +# Ver lineage directo desde API +curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:automatic-process:postgres://table&depth=10" | jq . + +# Listar jobs +curl http://localhost:5000/api/v1/namespaces/automatic-process/jobs | jq . + +# Ver runs de un job +curl http://localhost:5000/api/v1/namespaces/automatic-process/jobs/my_job/runs | jq . +``` + +--- + +## 🚀 Quick Start + +### 1. Instalar marquez-cli + +```bash +cd ~/AutomaticProyects/automatic_process/tools/marquez-cli +make install +marquez-cli version +``` + +### 2. Probar con Ejemplo + +```bash +# Ejecutar pipeline de ejemplo +~/dagu/scripts/examples/simple_pipeline_with_lineage.sh + +# Ver lineage generado +marquez-cli lineage -name "file:///tmp/users_clean.json" +``` + +### 3. Crear tu Primer Pipeline + +```bash +#!/bin/bash +set -euo pipefail + +JOB_NAME="my_first_pipeline" +RUN_ID=$(uuidgen) + +# Manejo de errores +cleanup() { + marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID +} +trap cleanup ERR + +# START +marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + +# Tu trabajo aquí +echo "Doing work..." +curl https://api.example.com/data > /tmp/data.json + +# COMPLETE +marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "api://example.com/data" \ + -outputs "file:///tmp/data.json" + +echo "✓ Pipeline completed!" +``` + +--- + +## 📊 Estructura de Archivos + +``` +~/AutomaticProyects/automatic_process/ +├── tools/ +│ └── marquez-cli/ # Código fuente del CLI +│ ├── main.go # CLI principal +│ ├── openlineage.go # Cliente API +│ ├── go.mod # Módulo Go +│ ├── Makefile # Build automation +│ ├── README.md # Documentación completa +│ ├── QUICKSTART.md # Guía rápida +│ └── marquez-cli # Binario compilado +│ +├── MARQUEZ_UTILITIES.md # Este archivo +│ +~/dagu/ +├── scripts/ +│ └── examples/ +│ ├── simple_pipeline_with_lineage.sh +│ └── etl_to_postgres_with_lineage.sh +│ +└── dags/ + └── example_lineage_tracking.yaml +``` + +--- + +## 🔧 Comandos Útiles + +### Gestión de Runs + +```bash +# Iniciar run con inputs/outputs +marquez-cli run start -job my_job -inputs "api://source" -outputs "file:///tmp/data" + +# Marcar progreso +marquez-cli run running -job my_job -run-id -inputs "file:///a" -outputs "file:///b" + +# Completar exitosamente +marquez-cli run complete -job my_job -run-id -outputs "postgres://table" + +# Marcar como fallido +marquez-cli run fail -job my_job -run-id +``` + +### Consultas + +```bash +# Ver todos los jobs +marquez-cli list jobs + +# Ver runs de un job +marquez-cli job runs -name my_job + +# Ver lineage completo +marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" + +# Ver datasets +marquez-cli list datasets +``` + +--- + +## 📚 Documentación Adicional + +- **README Completo**: `~/AutomaticProyects/automatic_process/tools/marquez-cli/README.md` +- **Quick Start**: `~/AutomaticProyects/automatic_process/tools/marquez-cli/QUICKSTART.md` +- **CLAUDE.md**: Guía de servicios manipulables +- **OpenLineage Spec**: https://openlineage.io/ +- **Marquez Docs**: https://marquezproject.ai/ + +--- + +## 🎯 Próximos Pasos + +1. **Ejecuta el ejemplo**: + ```bash + ~/dagu/scripts/examples/simple_pipeline_with_lineage.sh + ``` + +2. **Verifica el lineage**: + ```bash + marquez-cli lineage -name "file:///tmp/users_clean.json" + ``` + +3. **Adapta el patrón** a tus propios pipelines + +4. **Siempre sigue las reglas** de lineage tracking + +--- + +## 💡 Tips + +- **Usa variables de entorno** para configuración: + ```bash + export MARQUEZ_URL="http://localhost:5000" + export MARQUEZ_NAMESPACE="automatic-process" + ``` + +- **Genera Run ID una sola vez** y reutilízalo en todo el pipeline: + ```bash + RUN_ID=$(uuidgen) + ``` + +- **Usa trap para manejar errores** automáticamente: + ```bash + trap 'marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID' ERR + ``` + +- **Verifica lineage después** de cada ejecución: + ```bash + marquez-cli lineage -name "postgres://table" + ``` + +--- + +**Última actualización**: 2026-03-23 +**Versión**: 1.0.0 +**Autor**: Lucas (@egutierrez) diff --git a/VISUALIZATION_SETUP.md b/VISUALIZATION_SETUP.md new file mode 100644 index 0000000..df4485e --- /dev/null +++ b/VISUALIZATION_SETUP.md @@ -0,0 +1,305 @@ +# 📊 Configuración de Herramientas de Visualización + +## ✅ Estado de Configuración + +Todas las herramientas de visualización están conectadas a PostgreSQL y ClickHouse: + +- ✅ **Grafana** - Datasources configurados via provisioning YAML +- ✅ **Metabase** - Configuración automática via API (sin UI) +- ✅ **Rill** - Sources configurados en rill.yaml + +--- + +## 🔑 Credenciales de Acceso + +### Grafana +- **URL**: http://localhost:3500 +- **Usuario**: `admin` +- **Contraseña**: `admin123` +- **Datasources configurados**: + - PostgreSQL (postgres-main:5432) + - ClickHouse (clickhouse:8123) + - Prometheus + - Loki + - Tempo + +### Metabase +- **URL**: http://localhost:3200 +- **Usuario**: `admin@example.com` +- **Contraseña**: `Admin123!@#` +- **Datasources configurados**: + - PostgreSQL Main (postgres-main:5432) + - ClickHouse Analytics (clickhouse:8123) + - Sample Database (H2 - demo) + +### Rill +- **URL**: http://localhost:9009 +- **Autenticación**: No requiere +- **Sources configurados**: + - `postgres_main` (postgres-main:5432) + - `clickhouse_main` (clickhouse:9000) + +--- + +## 🚀 Configuración Automática de Metabase + +El script `configure_metabase.py` configura automáticamente Metabase: + +### Primera vez (setup inicial): +```bash +cd /home/lucas/dagu/scripts +./configure_metabase.py +``` + +Este script: +1. ✅ Detecta si Metabase necesita setup inicial +2. ✅ Crea usuario admin automáticamente +3. ✅ Configura datasources de PostgreSQL y ClickHouse +4. ✅ Todo sin interacción manual + +### Si reseteas Metabase: +```bash +# Eliminar datos de Metabase +docker-compose -f docker-compose-analytics.yml down metabase metabase-db +docker volume rm automatic_process_metabase-data + +# Reiniciar y configurar automáticamente +docker-compose -f docker-compose-analytics.yml up -d metabase-db metabase +sleep 50 # Esperar a que inicie +./configure_metabase.py +``` + +--- + +## 🔧 Archivos de Configuración + +### Grafana +**Ubicación**: `/home/lucas/DataProyects/suite_logs/config/grafana/provisioning/datasources/datasources.yml` + +Datasources PostgreSQL y ClickHouse añadidos automáticamente al desplegar Grafana. + +### Rill +**Ubicación**: `/home/lucas/AutomaticProyects/automatic_process/rill-data/rill.yaml` + +```yaml +sources: + - name: postgres_main + type: sql + connector: postgres + settings: + host: postgres-main + port: 5432 + database: postgres + user: postgres + password: postgres + ssl_mode: disable + raw_sql: true + + - name: clickhouse_main + type: sql + connector: clickhouse + settings: + host: clickhouse + port: 9000 + database: default + user: default + password: clickhouse + ssl: false +``` + +--- + +## ✅ Verificación de Conectividad + +### Script de Verificación +```bash +/home/lucas/dagu/scripts/test_db_connections.sh +``` + +Verifica: +- ✓ PostgreSQL (localhost:5434) +- ✓ ClickHouse (localhost:8123) +- ✓ Grafana (localhost:3500) +- ✓ Metabase (localhost:3200) +- ✓ Rill (localhost:9009) + +### Verificación Manual + +#### Grafana +1. Login en http://localhost:3500 +2. Ir a **Connections** > **Data sources** +3. Verificar datasources "PostgreSQL" y "ClickHouse" +4. Hacer clic en cada uno y presionar **Test** (debe mostrar ✓) + +#### Metabase +1. Login en http://localhost:3200 +2. Ir a **Admin Settings** > **Databases** +3. Verificar "PostgreSQL Main" y "ClickHouse Analytics" +4. Estado debe mostrar "Connected" + +#### Rill +1. Abrir http://localhost:9009 +2. Ver pestaña **Sources** +3. Verificar `postgres_main` y `clickhouse_main` +4. Ejecutar query de prueba: `SELECT 1` + +--- + +## 📊 Bases de Datos Disponibles + +### PostgreSQL +- **Host interno**: `postgres-main:5432` +- **Host externo**: `localhost:5434` +- **Usuario**: `postgres` +- **Contraseña**: `postgres` +- **Base de datos**: `postgres` +- **Estado**: Vacía (sin tablas) + +### ClickHouse +- **Host interno**: `clickhouse:9000` (native), `clickhouse:8123` (HTTP) +- **Host externo**: `localhost:9000`, `localhost:8123` +- **Usuario**: `default` +- **Contraseña**: `clickhouse` +- **Base de datos**: `default` +- **Estado**: Vacía (sin tablas) + +--- + +## 🎯 Próximos Pasos + +### 1. Crear Tablas de Ejemplo (Opcional) + +#### PostgreSQL +```sql +-- Conectar vía psql +PGPASSWORD=postgres psql -h localhost -p 5434 -U postgres -d postgres + +-- Crear tabla de eventos +CREATE TABLE events ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMP DEFAULT NOW(), + event_type VARCHAR(100), + user_id INTEGER, + data JSONB +); + +-- Insertar datos de prueba +INSERT INTO events (event_type, user_id, data) +VALUES + ('login', 1, '{"ip": "192.168.1.1"}'), + ('purchase', 1, '{"product": "laptop", "amount": 1200}'), + ('logout', 1, '{"duration": 3600}'); +``` + +#### ClickHouse +```sql +-- Conectar vía clickhouse-client +clickhouse-client --host localhost --port 9000 --user default --password clickhouse + +-- Crear tabla de métricas +CREATE TABLE metrics ( + timestamp DateTime, + metric_name String, + metric_value Float64, + tags Map(String, String) +) ENGINE = MergeTree() +ORDER BY timestamp; + +-- Insertar datos de prueba +INSERT INTO metrics VALUES + (now(), 'cpu_usage', 45.2, {'host': 'server1'}), + (now(), 'memory_usage', 72.8, {'host': 'server1'}), + (now(), 'disk_usage', 58.3, {'host': 'server1'}); +``` + +### 2. Crear Dashboards de Ejemplo + +#### En Grafana +1. New Dashboard → Add visualization +2. Seleccionar datasource "PostgreSQL" +3. Query: `SELECT * FROM events ORDER BY timestamp DESC LIMIT 10` +4. Visualizar como tabla o gráfico + +#### En Metabase +1. New → Question +2. Seleccionar "PostgreSQL Main" +3. Simple Question → Pick Table → events +4. Guardar y añadir a dashboard + +#### En Rill +1. Crear modelo en `rill-data/rill.yaml`: +```yaml +models: + - name: recent_events + sql: | + SELECT * FROM events + ORDER BY timestamp DESC + LIMIT 100 + source: postgres_main +``` +2. Restart Rill: `docker-compose -f docker-compose-analytics.yml restart rill` + +### 3. Configurar Pipelines con Dagu + +Ver `TRANSFORMATIONS.md` para ejemplos de DAGs que: +- Extraigan datos de APIs +- Transformen datos +- Carguen a PostgreSQL/ClickHouse +- Generen visualizaciones automáticas + +--- + +## 🔧 Troubleshooting + +### Metabase no se configura +```bash +# Verificar logs +docker logs metabase + +# Resetear y reconfigurar +docker-compose -f docker-compose-analytics.yml down metabase metabase-db +docker volume rm automatic_process_metabase-data +docker-compose -f docker-compose-analytics.yml up -d metabase-db metabase +sleep 50 +/home/lucas/dagu/scripts/configure_metabase.py +``` + +### Grafana no ve las bases de datos +```bash +# Verificar que Grafana está en ambas redes +docker inspect grafana | grep -A 10 Networks + +# Debe mostrar: suite-logs_monitoring y automatic_process_default + +# Probar DNS desde Grafana +docker exec grafana nslookup postgres-main +docker exec grafana nslookup clickhouse +``` + +### Rill no carga sources +```bash +# Verificar configuración +cat /home/lucas/AutomaticProyects/automatic_process/rill-data/rill.yaml + +# Revisar logs +docker logs rill + +# Reiniciar +docker-compose -f docker-compose-analytics.yml restart rill +``` + +--- + +## 📚 Referencias + +- **Grafana Datasources**: http://localhost:3500/connections/datasources +- **Metabase Admin**: http://localhost:3200/admin/databases +- **Rill Dashboard**: http://localhost:9009 +- **DBGate** (DB Manager): http://localhost:3300 +- **Marquez** (Lineage): http://localhost:3001 +- **Homer** (Dashboard Hub): http://localhost:8080 + +--- + +**Última actualización**: 2026-03-23 +**Configuración**: Automática via scripts diff --git a/rill-data/rill.yaml b/rill-data/rill.yaml index 469f266..f5b8b2e 100755 --- a/rill-data/rill.yaml +++ b/rill-data/rill.yaml @@ -1,2 +1,25 @@ -sources: [] +sources: + - name: postgres_main + type: sql + connector: postgres + settings: + host: postgres-main + port: 5432 + database: postgres + user: postgres + password: postgres + ssl_mode: disable + raw_sql: true + + - name: clickhouse_main + type: sql + connector: clickhouse + settings: + host: clickhouse + port: 9000 + database: default + user: default + password: clickhouse + ssl: false + models: [] diff --git a/tools/marquez-cli/Makefile b/tools/marquez-cli/Makefile new file mode 100644 index 0000000..c5dd1ac --- /dev/null +++ b/tools/marquez-cli/Makefile @@ -0,0 +1,49 @@ +# Makefile for marquez-cli + +BINARY_NAME=marquez-cli +INSTALL_PATH=$(HOME)/.local/bin +GO=go +GOFLAGS=-ldflags="-s -w" + +.PHONY: all build install clean test help + +all: build + +## build: Build the binary +build: + @echo "Building $(BINARY_NAME)..." + @$(GO) build $(GOFLAGS) -o $(BINARY_NAME) . + @echo "✓ Binary built: ./$(BINARY_NAME)" + +## install: Build and install to ~/.local/bin +install: build + @echo "Installing $(BINARY_NAME) to $(INSTALL_PATH)..." + @mkdir -p $(INSTALL_PATH) + @cp $(BINARY_NAME) $(INSTALL_PATH)/ + @chmod +x $(INSTALL_PATH)/$(BINARY_NAME) + @echo "✓ Installed to $(INSTALL_PATH)/$(BINARY_NAME)" + @echo "" + @echo "Make sure $(INSTALL_PATH) is in your PATH:" + @echo " export PATH=\"\$$PATH:$(INSTALL_PATH)\"" + +## clean: Remove built binaries +clean: + @echo "Cleaning..." + @rm -f $(BINARY_NAME) + @echo "✓ Cleaned" + +## test: Run tests +test: + @echo "Running tests..." + @$(GO) test -v ./... + +## uninstall: Remove installed binary +uninstall: + @echo "Uninstalling $(BINARY_NAME)..." + @rm -f $(INSTALL_PATH)/$(BINARY_NAME) + @echo "✓ Uninstalled" + +## help: Show this help +help: + @echo "Available targets:" + @sed -n 's/^##//p' Makefile | column -t -s ':' | sed -e 's/^/ /' diff --git a/tools/marquez-cli/QUICKSTART.md b/tools/marquez-cli/QUICKSTART.md new file mode 100644 index 0000000..f4ace6f --- /dev/null +++ b/tools/marquez-cli/QUICKSTART.md @@ -0,0 +1,213 @@ +# marquez-cli - Quick Start Guide + +Guía rápida para empezar a usar `marquez-cli` en tus pipelines. + +--- + +## ⚡ Instalación Rápida + +```bash +# Compilar e instalar +cd ~/AutomaticProyects/automatic_process/tools/marquez-cli +make install + +# Verificar +marquez-cli version +``` + +--- + +## 🎯 Uso Básico + +### 1. Flujo Completo en un Script + +```bash +#!/bin/bash +JOB_NAME="my_pipeline" +RUN_ID=$(uuidgen) + +# START +marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + +# Hacer trabajo... +curl https://api.example.com/data > /tmp/data.json + +# COMPLETE +marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "api://example.com/data" \ + -outputs "file:///tmp/data.json" +``` + +### 2. Con Manejo de Errores + +```bash +#!/bin/bash +set -euo pipefail + +JOB_NAME="my_pipeline" +RUN_ID=$(uuidgen) + +cleanup() { + marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID +} +trap cleanup ERR + +marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + +# Tu trabajo aquí... + +marquez-cli run complete -job $JOB_NAME -run-id $RUN_ID +``` + +### 3. Pipeline Multi-Paso + +```bash +JOB_NAME="etl_pipeline" +RUN_ID=$(uuidgen) + +# START +marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + +# EXTRACT +curl https://api.example.com/data > /tmp/raw.json +marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "api://example.com/data" \ + -outputs "file:///tmp/raw.json" + +# TRANSFORM +jq '.data' /tmp/raw.json > /tmp/clean.json +marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "file:///tmp/raw.json" \ + -outputs "file:///tmp/clean.json" + +# LOAD +psql ... -c "COPY table FROM '/tmp/clean.json'" +marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "file:///tmp/clean.json" \ + -outputs "postgres://localhost:5434/postgres/public/table" +``` + +--- + +## 📊 Consultar Lineage + +```bash +# Ver datasets +marquez-cli list datasets + +# Ver jobs +marquez-cli list jobs + +# Ver lineage de un dataset +marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" + +# Ver runs de un job +marquez-cli job runs -name my_pipeline +``` + +--- + +## 🔧 Integración con Dagu + +Ver DAG de ejemplo: `~/dagu/dags/example_lineage_tracking.yaml` + +Patrón básico: + +```yaml +env: + - RUN_ID: "" + +steps: + - name: init + command: echo "RUN_ID=$(uuidgen)" >> $DAGU_ENV + + - name: start + command: marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + depends: [init] + + - name: work + command: # tu trabajo aquí + depends: [start] + + - name: complete + command: marquez-cli run complete -job $JOB_NAME -run-id $RUN_ID + depends: [work] + +handlers: + failure: + - command: marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID +``` + +--- + +## 🧪 Probar con Ejemplo + +```bash +# Ejecutar script de ejemplo +~/dagu/scripts/examples/simple_pipeline_with_lineage.sh + +# Ver lineage generado +marquez-cli lineage -name "file:///tmp/users_clean.json" + +# O abrir en navegador +xdg-open http://localhost:3001 +``` + +--- + +## 📋 Comandos Más Usados + +| Comando | Descripción | +|---------|-------------| +| `run start` | Iniciar un run | +| `run complete` | Completar exitosamente | +| `run fail` | Marcar como fallido | +| `run running` | Marcar progreso (intermedio) | +| `lineage` | Ver lineage de dataset | +| `list jobs` | Listar todos los jobs | +| `job runs` | Ver runs de un job | + +--- + +## 🔍 URIs de Datasets + +| Tipo | Formato | +|------|---------| +| PostgreSQL | `postgres://host:port/db/schema/table` | +| ClickHouse | `clickhouse://host:port/database/table` | +| NATS | `nats://host:port/subject` | +| Archivo | `file:///absolute/path` | +| API | `api://domain/endpoint` | + +--- + +## ✅ Checklist + +Cada pipeline debe: + +- [ ] Enviar evento START al inicio +- [ ] Enviar eventos RUNNING en transformaciones intermedias +- [ ] Enviar evento COMPLETE al finalizar exitosamente +- [ ] Enviar evento FAIL si hay errores (handler) +- [ ] Usar el mismo run-id en todos los eventos +- [ ] Declarar todos los inputs/outputs + +--- + +## 📚 Más Información + +- [README completo](./README.md) +- [Documentación de OpenLineage](https://openlineage.io/) +- [Marquez Web UI](http://localhost:3001) + +--- + +**Tip**: Usa `marquez-cli help` para ver todos los comandos disponibles. diff --git a/tools/marquez-cli/README.md b/tools/marquez-cli/README.md new file mode 100644 index 0000000..7cbd561 --- /dev/null +++ b/tools/marquez-cli/README.md @@ -0,0 +1,645 @@ +# marquez-cli + +**OpenLineage/Marquez CLI tool** para gestionar datasets, jobs y runs con lineage tracking completo. + +Binario escrito en Go sin dependencias externas, listo para usar en pipelines de Dagu y scripts bash. + +--- + +## 🎯 Características + +- ✅ **Registrar datasets** en Marquez +- ✅ **Registrar jobs** y sus runs +- ✅ **Enviar eventos OpenLineage** (START, RUNNING, COMPLETE, FAIL) +- ✅ **Consultar lineage** de datasets +- ✅ **Listar** namespaces, jobs, datasets y runs +- ✅ **Sin dependencias** externas (solo Go stdlib) +- ✅ **Binario estático** compilado (~5MB) + +--- + +## 📦 Instalación + +### Desde el Código Fuente + +```bash +cd ~/AutomaticProyects/automatic_process/tools/marquez-cli + +# Compilar +make build + +# Instalar en ~/.local/bin +make install + +# Verificar instalación +marquez-cli version +``` + +### Variables de Entorno (Opcional) + +```bash +export MARQUEZ_URL="http://localhost:5000" +export MARQUEZ_NAMESPACE="automatic-process" +``` + +--- + +## 🚀 Uso + +### 1. Gestión de Runs + +#### Iniciar un Run + +```bash +# Sintaxis básica +marquez-cli run start -job my_pipeline + +# Con inputs y outputs +marquez-cli run start \ + -job fetch_api_data \ + -inputs "api://jsonplaceholder.typicode.com/users" \ + -outputs "file:///tmp/users.json" + +# Con run-id específico (para continuar después) +marquez-cli run start \ + -job my_pipeline \ + -run-id "abc123-uuid-here" \ + -inputs "file:///tmp/raw.json" +``` + +**Salida:** +``` +✓ Run event sent successfully + Event Type: START + Job: automatic-process/fetch_api_data + Run ID: 3c7a4f21-1234-5678-90ab-cdef12345678 + Inputs: 1 dataset(s) + Outputs: 1 dataset(s) +``` + +#### Marcar Run como RUNNING (Progreso) + +```bash +marquez-cli run running \ + -job my_pipeline \ + -run-id "abc123-uuid-here" \ + -inputs "file:///tmp/raw.json" \ + -outputs "file:///tmp/processed.json" +``` + +#### Completar un Run Exitosamente + +```bash +marquez-cli run complete \ + -job my_pipeline \ + -run-id "abc123-uuid-here" \ + -inputs "file:///tmp/raw.json" \ + -outputs "postgres://localhost:5434/postgres/public/events" +``` + +#### Marcar Run como Fallido + +```bash +marquez-cli run fail \ + -job my_pipeline \ + -run-id "abc123-uuid-here" +``` + +--- + +### 2. Gestión de Datasets + +#### Registrar un Dataset + +```bash +# PostgreSQL table +marquez-cli dataset register \ + -name "postgres://localhost:5434/postgres/public/events" + +# ClickHouse table +marquez-cli dataset register \ + -name "clickhouse://localhost:8123/default/analytics" + +# NATS stream +marquez-cli dataset register \ + -name "nats://localhost:4222/data.raw" + +# Archivo +marquez-cli dataset register \ + -name "file:///tmp/data.json" + +# API endpoint +marquez-cli dataset register \ + -name "api://example.com/users" +``` + +#### Listar Datasets + +```bash +# En el namespace por defecto (automatic-process) +marquez-cli dataset get + +# En un namespace específico +marquez-cli dataset get -namespace my-namespace +``` + +**Salida:** +``` +Datasets in namespace 'automatic-process': + + • postgres://localhost:5434/postgres/public/events [DB_TABLE] + • file:///tmp/users.json [FILE] + • api://example.com/users [API] +``` + +--- + +### 3. Gestión de Jobs + +#### Registrar un Job + +```bash +marquez-cli job register -name my_pipeline +``` + +#### Listar Jobs + +```bash +# En el namespace por defecto +marquez-cli job get + +# En un namespace específico +marquez-cli job get -namespace my-namespace +``` + +#### Ver Runs de un Job + +```bash +marquez-cli job runs -name my_pipeline +``` + +**Salida:** +``` +Runs for job 'automatic-process/my_pipeline': + + • 3c7a4f21-1234-5678-90ab-cdef12345678 [COMPLETED] - 2026-03-23T10:30:00.000Z + • 7b8c9d0e-5678-1234-90ab-cdef12345678 [FAILED] - 2026-03-23T09:15:00.000Z + • 1a2b3c4d-9012-3456-78ab-cdef12345678 [RUNNING] - 2026-03-23T11:00:00.000Z +``` + +--- + +### 4. Consultar Lineage + +#### Obtener Lineage de un Dataset + +```bash +# Formato texto (legible) +marquez-cli lineage \ + -name "postgres://localhost:5434/postgres/public/events" + +# Formato JSON (para scripts) +marquez-cli lineage \ + -name "postgres://localhost:5434/postgres/public/events" \ + -format json + +# Con profundidad personalizada +marquez-cli lineage \ + -name "postgres://localhost:5434/postgres/public/events" \ + -depth 20 +``` + +**Salida (formato texto):** +``` +Lineage for dataset 'automatic-process/postgres://localhost:5434/postgres/public/events': + +📦 Datasets (4): + • api://jsonplaceholder.typicode.com/users + • file:///tmp/users.json + • file:///tmp/emails.json + • postgres://localhost:5434/postgres/public/user_emails + +⚙️ Jobs (3): + • fetch_api + ← Inputs: + - api://jsonplaceholder.typicode.com/users + → Outputs: + - file:///tmp/users.json + + • transform + ← Inputs: + - file:///tmp/users.json + → Outputs: + - file:///tmp/emails.json + + • ingest_postgres + ← Inputs: + - file:///tmp/emails.json + → Outputs: + - postgres://localhost:5434/postgres/public/user_emails +``` + +--- + +### 5. Listar Recursos + +#### Listar Namespaces + +```bash +marquez-cli list namespaces +``` + +#### Listar Jobs + +```bash +marquez-cli list jobs +marquez-cli list jobs -namespace my-namespace +``` + +#### Listar Datasets + +```bash +marquez-cli list datasets +marquez-cli list datasets -namespace my-namespace +``` + +--- + +## 🔧 Integración con Dagu + +### Ejemplo: DAG con Lineage Tracking + +```yaml +# ~/dagu/dags/example_with_lineage.yaml +name: example_with_lineage +description: Pipeline con lineage tracking usando marquez-cli + +schedule: + - "0 */6 * * *" + +env: + - RUN_ID: "" # Se genera dinámicamente + +steps: + # PASO 1: Generar Run ID único + - name: generate_run_id + command: echo "RUN_ID=$(uuidgen)" >> $DAGU_ENV + output: RUN_ID + + # PASO 2: START event + - name: start_run + command: | + marquez-cli run start \ + -job example_with_lineage \ + -run-id $RUN_ID \ + -inputs "api://jsonplaceholder.typicode.com/users" + depends: [generate_run_id] + + # PASO 3: Fetch data + - name: fetch_data + command: | + curl -s https://jsonplaceholder.typicode.com/users > /tmp/users.json + + marquez-cli run running \ + -job example_with_lineage \ + -run-id $RUN_ID \ + -inputs "api://jsonplaceholder.typicode.com/users" \ + -outputs "file:///tmp/users.json" + depends: [start_run] + + # PASO 4: Transform data + - name: transform_data + command: | + jq '[.[] | {email: .email, name: .name}]' /tmp/users.json > /tmp/emails.json + + marquez-cli run running \ + -job example_with_lineage \ + -run-id $RUN_ID \ + -inputs "file:///tmp/users.json" \ + -outputs "file:///tmp/emails.json" + depends: [fetch_data] + + # PASO 5: Load to PostgreSQL + - name: load_postgres + command: | + psql -h localhost -p 5434 -U postgres -d postgres -c \ + "CREATE TABLE IF NOT EXISTS user_emails (email TEXT, name TEXT);" + + cat /tmp/emails.json | jq -r '.[] | [.email, .name] | @csv' | \ + psql -h localhost -p 5434 -U postgres -d postgres -c \ + "COPY user_emails FROM STDIN WITH CSV;" + + marquez-cli run running \ + -job example_with_lineage \ + -run-id $RUN_ID \ + -inputs "file:///tmp/emails.json" \ + -outputs "postgres://localhost:5434/postgres/public/user_emails" + depends: [transform_data] + + # PASO 6: COMPLETE event + - name: complete_run + command: | + marquez-cli run complete \ + -job example_with_lineage \ + -run-id $RUN_ID \ + -inputs "api://jsonplaceholder.typicode.com/users" \ + -outputs "postgres://localhost:5434/postgres/public/user_emails" + depends: [load_postgres] + +handlers: + failure: + - name: fail_run + command: | + marquez-cli run fail \ + -job example_with_lineage \ + -run-id $RUN_ID +``` + +--- + +## 🔁 Workflow Típico + +### 1. Pipeline Simple (START → COMPLETE) + +```bash +#!/bin/bash +# Script: ~/dagu/scripts/simple_pipeline.sh + +NAMESPACE="automatic-process" +JOB_NAME="simple_pipeline" +RUN_ID=$(marquez-cli run start -job $JOB_NAME -inputs "api://source" | grep "Run ID" | awk '{print $NF}') + +echo "Started run: $RUN_ID" + +# Hacer el trabajo +curl -s https://api.example.com/data > /tmp/data.json + +# Completar +marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "api://source" \ + -outputs "file:///tmp/data.json" + +echo "Run completed: $RUN_ID" +``` + +### 2. Pipeline con Manejo de Errores + +```bash +#!/bin/bash +# Script: ~/dagu/scripts/pipeline_with_error_handling.sh + +set -euo pipefail + +JOB_NAME="pipeline_with_errors" +RUN_ID=$(uuidgen) + +# Función de cleanup en caso de error +cleanup() { + marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID + echo "Pipeline failed, run marked as FAILED" +} + +trap cleanup ERR + +# START +marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + +# Trabajo +echo "Processing..." +# ... tu lógica aquí ... + +# COMPLETE (solo si todo fue exitoso) +marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -outputs "postgres://table" + +echo "Pipeline completed successfully" +``` + +### 3. Pipeline Multi-Paso + +```bash +#!/bin/bash + +JOB_NAME="multi_step_pipeline" +RUN_ID=$(uuidgen) + +# START +marquez-cli run start -job $JOB_NAME -run-id $RUN_ID + +# PASO 1: Extract +echo "Extracting..." +curl -s https://api.example.com/data > /tmp/raw.json +marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "api://example.com/data" \ + -outputs "file:///tmp/raw.json" + +# PASO 2: Transform +echo "Transforming..." +jq '.data' /tmp/raw.json > /tmp/clean.json +marquez-cli run running \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "file:///tmp/raw.json" \ + -outputs "file:///tmp/clean.json" + +# PASO 3: Load +echo "Loading..." +psql -h localhost -p 5434 -U postgres -d postgres \ + -c "COPY events FROM '/tmp/clean.json';" +marquez-cli run complete \ + -job $JOB_NAME \ + -run-id $RUN_ID \ + -inputs "file:///tmp/clean.json" \ + -outputs "postgres://localhost:5434/postgres/public/events" + +echo "Pipeline completed" +``` + +--- + +## 📋 Convenciones de Naming + +### Dataset URIs + +Usa siempre URIs descriptivos: + +| Tipo | Formato | Ejemplo | +|------|---------|---------| +| PostgreSQL | `postgres://host:port/db/schema/table` | `postgres://localhost:5434/postgres/public/events` | +| ClickHouse | `clickhouse://host:port/database/table` | `clickhouse://localhost:8123/default/analytics` | +| NATS | `nats://host:port/subject` | `nats://localhost:4222/data.raw` | +| Archivo | `file:///absolute/path` | `file:///tmp/data.json` | +| API | `api://domain/endpoint` | `api://example.com/users` | +| S3 | `s3://bucket/key` | `s3://my-bucket/data/file.parquet` | + +### Namespaces + +- Usa el namespace `automatic-process` para todos tus pipelines +- Puedes crear namespaces adicionales para proyectos específicos + +### Job Names + +- Usa nombres descriptivos: `fetch_api_data`, `transform_sales`, `load_warehouse` +- Evita guiones (`-`), usa guiones bajos (`_`) +- Mantén consistencia con los nombres de DAGs en Dagu + +--- + +## 🔍 Verificar Lineage + +Después de ejecutar tu pipeline, verifica el lineage: + +```bash +# 1. Ver jobs ejecutados +marquez-cli list jobs + +# 2. Ver runs de tu job +marquez-cli job runs -name my_pipeline + +# 3. Ver datasets creados +marquez-cli list datasets + +# 4. Ver lineage completo de un dataset +marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" +``` + +También puedes usar la **Web UI de Marquez**: http://localhost:3001 + +--- + +## 🛠️ Desarrollo + +### Compilar desde el Código Fuente + +```bash +# Clonar el proyecto +cd ~/AutomaticProyects/automatic_process/tools/marquez-cli + +# Compilar +make build + +# Ejecutar sin instalar +./marquez-cli help + +# Instalar +make install + +# Limpiar binarios +make clean + +# Desinstalar +make uninstall +``` + +### Estructura del Proyecto + +``` +marquez-cli/ +├── main.go # CLI principal con comandos +├── openlineage.go # Cliente HTTP y estructuras OpenLineage +├── go.mod # Módulo de Go +├── Makefile # Build automation +└── README.md # Documentación +``` + +--- + +## 📊 API de Marquez Utilizada + +El CLI interactúa con estos endpoints de Marquez: + +| Endpoint | Método | Uso | +|----------|--------|-----| +| `/api/v1/lineage` | POST | Enviar eventos OpenLineage | +| `/api/v1/lineage` | GET | Obtener lineage de dataset | +| `/api/v1/namespaces` | GET | Listar namespaces | +| `/api/v1/namespaces/{ns}/jobs` | GET | Listar jobs | +| `/api/v1/namespaces/{ns}/datasets` | GET | Listar datasets | +| `/api/v1/namespaces/{ns}/jobs/{job}/runs` | GET | Listar runs de job | + +Documentación completa: https://marquezproject.github.io/marquez/openapi.html + +--- + +## 🎯 Checklist de Lineage + +Usa esta checklist en cada pipeline: + +- [ ] **START event** al inicio del pipeline +- [ ] **RUNNING events** en cada transformación intermedia +- [ ] **COMPLETE event** al finalizar exitosamente +- [ ] **FAIL event** si hay errores (handler) +- [ ] Declarar **TODOS** los inputs (APIs, archivos, tablas) +- [ ] Declarar **TODOS** los outputs (archivos, streams, tablas) +- [ ] Usar **URIs bien formados** para datasets +- [ ] Usar **mismo namespace** (`automatic-process`) +- [ ] Usar **mismo run-id** en todos los eventos del mismo run +- [ ] Verificar lineage en Marquez Web UI + +--- + +## 🐛 Troubleshooting + +### Error: "connection refused" + +```bash +# Verificar que Marquez esté corriendo +docker ps | grep marquez + +# Iniciar Marquez si no está corriendo +docker-compose -f docker-compose-marquez.yml up -d + +# Probar conexión +curl http://localhost:5000/api/v1/namespaces +``` + +### Error: "API error (status 400)" + +Revisa que: +- El namespace existe +- Los URIs de datasets estén bien formados +- El run-id sea un UUID válido + +### Run-id no es consistente entre pasos + +Usa una variable de entorno o archivo temporal: + +```bash +# Opción 1: Variable de entorno +export RUN_ID=$(uuidgen) +marquez-cli run start -job my_job -run-id $RUN_ID +marquez-cli run complete -job my_job -run-id $RUN_ID + +# Opción 2: Archivo temporal +uuidgen > /tmp/run_id.txt +RUN_ID=$(cat /tmp/run_id.txt) +marquez-cli run start -job my_job -run-id $RUN_ID +``` + +--- + +## 📚 Recursos + +- **OpenLineage Spec**: https://openlineage.io/ +- **Marquez Docs**: https://marquezproject.ai/ +- **Marquez Web UI**: http://localhost:3001 +- **Marquez API**: http://localhost:5000/api/v1 +- **Dagu Docs**: https://dagu.sh/ + +--- + +## 📝 Licencia + +MIT License - Proyecto Automatic Process + +--- + +**Última actualización**: 2026-03-23 +**Versión**: 1.0.0 +**Autor**: Lucas (@egutierrez) diff --git a/tools/marquez-cli/go.mod b/tools/marquez-cli/go.mod new file mode 100644 index 0000000..06f2608 --- /dev/null +++ b/tools/marquez-cli/go.mod @@ -0,0 +1,7 @@ +module github.com/automatic-process/marquez-cli + +go 1.21 + +require ( + // No external dependencies - uses only Go standard library +) diff --git a/tools/marquez-cli/main.go b/tools/marquez-cli/main.go new file mode 100644 index 0000000..1f019e3 --- /dev/null +++ b/tools/marquez-cli/main.go @@ -0,0 +1,604 @@ +package main + +import ( + "crypto/rand" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + "time" +) + +const ( + version = "1.0.0" + defaultMarquezURL = "http://localhost:5000" + defaultNamespace = "automatic-process" + defaultProducer = "marquez-cli" +) + +// generateUUID generates a simple UUID v4 +func generateUUID() string { + b := make([]byte, 16) + rand.Read(b) + // Set version (4) and variant bits + b[6] = (b[6] & 0x0f) | 0x40 + b[8] = (b[8] & 0x3f) | 0x80 + return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) +} + +func main() { + if len(os.Args) < 2 { + printUsage() + os.Exit(1) + } + + command := os.Args[1] + + switch command { + case "run": + handleRunCommand() + case "dataset": + handleDatasetCommand() + case "job": + handleJobCommand() + case "lineage": + handleLineageCommand() + case "list": + handleListCommand() + case "version": + fmt.Printf("marquez-cli version %s\n", version) + case "help", "-h", "--help": + printUsage() + default: + fmt.Fprintf(os.Stderr, "Unknown command: %s\n\n", command) + printUsage() + os.Exit(1) + } +} + +func handleRunCommand() { + if len(os.Args) < 3 { + fmt.Println("Usage: marquez-cli run [start|complete|fail|running] [options]") + os.Exit(1) + } + + eventType := strings.ToUpper(os.Args[2]) + runCmd := flag.NewFlagSet("run", flag.ExitOnError) + + marquez := runCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := runCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace") + jobName := runCmd.String("job", "", "Job name (required)") + runID := runCmd.String("run-id", "", "Run ID (auto-generated if not provided)") + producer := runCmd.String("producer", defaultProducer, "Producer URI") + inputs := runCmd.String("inputs", "", "Comma-separated list of input datasets") + outputs := runCmd.String("outputs", "", "Comma-separated list of output datasets") + eventTime := runCmd.String("event-time", "", "Event time (ISO 8601, defaults to now)") + + runCmd.Parse(os.Args[3:]) + + if *jobName == "" { + fmt.Fprintln(os.Stderr, "Error: -job is required") + os.Exit(1) + } + + // Generate run ID if not provided + if *runID == "" { + *runID = generateUUID() + } + + // Set event time to now if not provided + if *eventTime == "" { + *eventTime = time.Now().UTC().Format(time.RFC3339Nano) + } + + client := NewMarquezClient(*marquez) + + // Parse input datasets + var inputDatasets []Dataset + if *inputs != "" { + for _, input := range strings.Split(*inputs, ",") { + input = strings.TrimSpace(input) + if input != "" { + inputDatasets = append(inputDatasets, Dataset{ + Namespace: *namespace, + Name: input, + }) + } + } + } + + // Parse output datasets + var outputDatasets []Dataset + if *outputs != "" { + for _, output := range strings.Split(*outputs, ",") { + output = strings.TrimSpace(output) + if output != "" { + outputDatasets = append(outputDatasets, Dataset{ + Namespace: *namespace, + Name: output, + }) + } + } + } + + event := &OpenLineageEvent{ + EventType: eventType, + EventTime: *eventTime, + Producer: *producer, + Job: Job{ + Namespace: *namespace, + Name: *jobName, + }, + Run: Run{ + RunID: *runID, + }, + Inputs: inputDatasets, + Outputs: outputDatasets, + } + + if err := client.SendEvent(event); err != nil { + fmt.Fprintf(os.Stderr, "Error sending event: %v\n", err) + os.Exit(1) + } + + fmt.Printf("✓ Run event sent successfully\n") + fmt.Printf(" Event Type: %s\n", eventType) + fmt.Printf(" Job: %s/%s\n", *namespace, *jobName) + fmt.Printf(" Run ID: %s\n", *runID) + if len(inputDatasets) > 0 { + fmt.Printf(" Inputs: %d dataset(s)\n", len(inputDatasets)) + } + if len(outputDatasets) > 0 { + fmt.Printf(" Outputs: %d dataset(s)\n", len(outputDatasets)) + } +} + +func handleDatasetCommand() { + if len(os.Args) < 3 { + fmt.Println("Usage: marquez-cli dataset [register|get] [options]") + os.Exit(1) + } + + action := os.Args[2] + + switch action { + case "register": + registerDataset() + case "get": + getDataset() + default: + fmt.Fprintf(os.Stderr, "Unknown dataset action: %s\n", action) + os.Exit(1) + } +} + +func registerDataset() { + dsCmd := flag.NewFlagSet("dataset register", flag.ExitOnError) + + marquez := dsCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := dsCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace") + name := dsCmd.String("name", "", "Dataset name (required, e.g., 'postgres://table' or 'file:///path')") + jobName := dsCmd.String("job", "dataset-registration", "Job name that creates this dataset") + runID := dsCmd.String("run-id", "", "Run ID (auto-generated if not provided)") + + dsCmd.Parse(os.Args[3:]) + + if *name == "" { + fmt.Fprintln(os.Stderr, "Error: -name is required") + os.Exit(1) + } + + if *runID == "" { + *runID = generateUUID() + } + + client := NewMarquezClient(*marquez) + + // Create a simple event to register the dataset + event := &OpenLineageEvent{ + EventType: EventTypeComplete, + EventTime: time.Now().UTC().Format(time.RFC3339Nano), + Producer: defaultProducer, + Job: Job{ + Namespace: *namespace, + Name: *jobName, + }, + Run: Run{ + RunID: *runID, + }, + Outputs: []Dataset{ + { + Namespace: *namespace, + Name: *name, + }, + }, + } + + if err := client.SendEvent(event); err != nil { + fmt.Fprintf(os.Stderr, "Error registering dataset: %v\n", err) + os.Exit(1) + } + + fmt.Printf("✓ Dataset registered successfully\n") + fmt.Printf(" Namespace: %s\n", *namespace) + fmt.Printf(" Name: %s\n", *name) +} + +func getDataset() { + dsCmd := flag.NewFlagSet("dataset get", flag.ExitOnError) + + marquez := dsCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := dsCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace") + + dsCmd.Parse(os.Args[3:]) + + client := NewMarquezClient(*marquez) + + datasets, err := client.GetDatasets(*namespace) + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting datasets: %v\n", err) + os.Exit(1) + } + + if len(datasets) == 0 { + fmt.Printf("No datasets found in namespace '%s'\n", *namespace) + return + } + + fmt.Printf("Datasets in namespace '%s':\n\n", *namespace) + for _, ds := range datasets { + name, _ := ds["name"].(string) + dsType, _ := ds["type"].(string) + fmt.Printf(" • %s [%s]\n", name, dsType) + } +} + +func handleJobCommand() { + if len(os.Args) < 3 { + fmt.Println("Usage: marquez-cli job [register|get|runs] [options]") + os.Exit(1) + } + + action := os.Args[2] + + switch action { + case "register": + registerJob() + case "get": + getJobs() + case "runs": + getJobRuns() + default: + fmt.Fprintf(os.Stderr, "Unknown job action: %s\n", action) + os.Exit(1) + } +} + +func registerJob() { + jobCmd := flag.NewFlagSet("job register", flag.ExitOnError) + + marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace") + name := jobCmd.String("name", "", "Job name (required)") + runID := jobCmd.String("run-id", "", "Run ID (auto-generated if not provided)") + + jobCmd.Parse(os.Args[3:]) + + if *name == "" { + fmt.Fprintln(os.Stderr, "Error: -name is required") + os.Exit(1) + } + + if *runID == "" { + *runID = generateUUID() + } + + client := NewMarquezClient(*marquez) + + event := &OpenLineageEvent{ + EventType: EventTypeStart, + EventTime: time.Now().UTC().Format(time.RFC3339Nano), + Producer: defaultProducer, + Job: Job{ + Namespace: *namespace, + Name: *name, + }, + Run: Run{ + RunID: *runID, + }, + } + + if err := client.SendEvent(event); err != nil { + fmt.Fprintf(os.Stderr, "Error registering job: %v\n", err) + os.Exit(1) + } + + fmt.Printf("✓ Job registered successfully\n") + fmt.Printf(" Namespace: %s\n", *namespace) + fmt.Printf(" Name: %s\n", *name) + fmt.Printf(" Run ID: %s\n", *runID) +} + +func getJobs() { + jobCmd := flag.NewFlagSet("job get", flag.ExitOnError) + + marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace") + + jobCmd.Parse(os.Args[3:]) + + client := NewMarquezClient(*marquez) + + jobs, err := client.GetJobs(*namespace) + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting jobs: %v\n", err) + os.Exit(1) + } + + if len(jobs) == 0 { + fmt.Printf("No jobs found in namespace '%s'\n", *namespace) + return + } + + fmt.Printf("Jobs in namespace '%s':\n\n", *namespace) + for _, job := range jobs { + name, _ := job["name"].(string) + jobType, _ := job["type"].(string) + fmt.Printf(" • %s [%s]\n", name, jobType) + } +} + +func getJobRuns() { + jobCmd := flag.NewFlagSet("job runs", flag.ExitOnError) + + marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace") + name := jobCmd.String("name", "", "Job name (required)") + + jobCmd.Parse(os.Args[3:]) + + if *name == "" { + fmt.Fprintln(os.Stderr, "Error: -name is required") + os.Exit(1) + } + + client := NewMarquezClient(*marquez) + + runs, err := client.GetJobRuns(*namespace, *name) + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting job runs: %v\n", err) + os.Exit(1) + } + + if len(runs) == 0 { + fmt.Printf("No runs found for job '%s/%s'\n", *namespace, *name) + return + } + + fmt.Printf("Runs for job '%s/%s':\n\n", *namespace, *name) + for _, run := range runs { + runID, _ := run["id"].(string) + state, _ := run["state"].(string) + createdAt, _ := run["createdAt"].(string) + fmt.Printf(" • %s [%s] - %s\n", runID, state, createdAt) + } +} + +func handleLineageCommand() { + lineageCmd := flag.NewFlagSet("lineage", flag.ExitOnError) + + marquez := lineageCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := lineageCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace") + name := lineageCmd.String("name", "", "Dataset name (required)") + depth := lineageCmd.Int("depth", 10, "Lineage depth") + format := lineageCmd.String("format", "text", "Output format (text|json)") + + lineageCmd.Parse(os.Args[2:]) + + if *name == "" { + fmt.Fprintln(os.Stderr, "Error: -name is required") + os.Exit(1) + } + + client := NewMarquezClient(*marquez) + + lineage, err := client.GetLineage(*namespace, *name, *depth) + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting lineage: %v\n", err) + os.Exit(1) + } + + if *format == "json" { + jsonData, _ := json.MarshalIndent(lineage, "", " ") + fmt.Println(string(jsonData)) + } else { + printLineageText(lineage, *namespace, *name) + } +} + +func handleListCommand() { + if len(os.Args) < 3 { + fmt.Println("Usage: marquez-cli list [namespaces|jobs|datasets] [options]") + os.Exit(1) + } + + resource := os.Args[2] + listCmd := flag.NewFlagSet("list", flag.ExitOnError) + + marquez := listCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL") + namespace := listCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Namespace") + + listCmd.Parse(os.Args[3:]) + + client := NewMarquezClient(*marquez) + + switch resource { + case "namespaces": + namespaces, err := client.GetNamespaces() + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting namespaces: %v\n", err) + os.Exit(1) + } + fmt.Println("Namespaces:") + for _, ns := range namespaces { + name, _ := ns["name"].(string) + fmt.Printf(" • %s\n", name) + } + + case "jobs": + jobs, err := client.GetJobs(*namespace) + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting jobs: %v\n", err) + os.Exit(1) + } + fmt.Printf("Jobs in namespace '%s':\n", *namespace) + for _, job := range jobs { + name, _ := job["name"].(string) + fmt.Printf(" • %s\n", name) + } + + case "datasets": + datasets, err := client.GetDatasets(*namespace) + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting datasets: %v\n", err) + os.Exit(1) + } + fmt.Printf("Datasets in namespace '%s':\n", *namespace) + for _, ds := range datasets { + name, _ := ds["name"].(string) + fmt.Printf(" • %s\n", name) + } + + default: + fmt.Fprintf(os.Stderr, "Unknown resource: %s\n", resource) + os.Exit(1) + } +} + +func printLineageText(lineage map[string]interface{}, namespace, datasetName string) { + fmt.Printf("Lineage for dataset '%s/%s':\n\n", namespace, datasetName) + + graph, ok := lineage["graph"].([]interface{}) + if !ok || len(graph) == 0 { + fmt.Println("No lineage information found") + return + } + + datasets := make(map[string]bool) + jobs := make(map[string]map[string]interface{}) + + for _, node := range graph { + nodeMap, ok := node.(map[string]interface{}) + if !ok { + continue + } + + nodeType, _ := nodeMap["type"].(string) + nodeID, _ := nodeMap["id"].(string) + + if nodeType == "DATASET" { + datasets[nodeID] = true + } else if nodeType == "JOB" { + jobs[nodeID] = nodeMap + } + } + + fmt.Printf("📦 Datasets (%d):\n", len(datasets)) + for ds := range datasets { + fmt.Printf(" • %s\n", ds) + } + + fmt.Printf("\n⚙️ Jobs (%d):\n", len(jobs)) + for jobName, jobData := range jobs { + fmt.Printf(" • %s\n", jobName) + + // Show inputs + if inEdges, ok := jobData["inEdges"].([]interface{}); ok && len(inEdges) > 0 { + fmt.Printf(" ← Inputs:\n") + for _, edge := range inEdges { + if edgeMap, ok := edge.(map[string]interface{}); ok { + origin, _ := edgeMap["origin"].(string) + fmt.Printf(" - %s\n", origin) + } + } + } + + // Show outputs + if outEdges, ok := jobData["outEdges"].([]interface{}); ok && len(outEdges) > 0 { + fmt.Printf(" → Outputs:\n") + for _, edge := range outEdges { + if edgeMap, ok := edge.(map[string]interface{}); ok { + destination, _ := edgeMap["destination"].(string) + fmt.Printf(" - %s\n", destination) + } + } + } + fmt.Println() + } +} + +func printUsage() { + usage := `marquez-cli - OpenLineage/Marquez CLI tool + +USAGE: + marquez-cli [subcommand] [options] + +COMMANDS: + run Manage job runs + start Start a new run + complete Mark run as complete + fail Mark run as failed + running Mark run as running + + dataset Manage datasets + register Register a new dataset + get List datasets in namespace + + job Manage jobs + register Register a new job + get List jobs in namespace + runs Get runs for a specific job + + lineage Get lineage information for a dataset + list List resources (namespaces|jobs|datasets) + version Show version + help Show this help + +EXAMPLES: + # Start a run + marquez-cli run start -job my_pipeline -inputs "api://source" -outputs "postgres://table" + + # Complete a run + marquez-cli run complete -job my_pipeline -run-id -outputs "postgres://table" + + # Fail a run + marquez-cli run fail -job my_pipeline -run-id + + # Register a dataset + marquez-cli dataset register -name "postgres://localhost:5434/postgres/public/events" + + # Get lineage + marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events" + + # List all jobs + marquez-cli list jobs + + # Get job runs + marquez-cli job runs -name my_pipeline + +ENVIRONMENT VARIABLES: + MARQUEZ_URL Marquez API URL (default: http://localhost:5000) + MARQUEZ_NAMESPACE Default namespace (default: automatic-process) + +For more information, visit: https://openlineage.io/ +` + fmt.Print(usage) +} + +func getEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} diff --git a/tools/marquez-cli/marquez-cli b/tools/marquez-cli/marquez-cli new file mode 100755 index 0000000..957ca90 Binary files /dev/null and b/tools/marquez-cli/marquez-cli differ diff --git a/tools/marquez-cli/openlineage.go b/tools/marquez-cli/openlineage.go new file mode 100644 index 0000000..640d057 --- /dev/null +++ b/tools/marquez-cli/openlineage.go @@ -0,0 +1,230 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// OpenLineage Event Types +const ( + EventTypeStart = "START" + EventTypeRunning = "RUNNING" + EventTypeComplete = "COMPLETE" + EventTypeFail = "FAIL" + EventTypeAbort = "ABORT" +) + +// Dataset represents an OpenLineage dataset +type Dataset struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Facets map[string]interface{} `json:"facets,omitempty"` +} + +// Job represents an OpenLineage job +type Job struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Facets map[string]interface{} `json:"facets,omitempty"` +} + +// Run represents an OpenLineage run +type Run struct { + RunID string `json:"runId"` + Facets map[string]interface{} `json:"facets,omitempty"` +} + +// OpenLineageEvent represents a complete OpenLineage event +type OpenLineageEvent struct { + EventType string `json:"eventType"` + EventTime string `json:"eventTime"` + Producer string `json:"producer"` + SchemaURL string `json:"schemaURL,omitempty"` + Job Job `json:"job"` + Run Run `json:"run"` + Inputs []Dataset `json:"inputs,omitempty"` + Outputs []Dataset `json:"outputs,omitempty"` +} + +// MarquezClient handles communication with Marquez API +type MarquezClient struct { + BaseURL string + HTTPClient *http.Client +} + +// NewMarquezClient creates a new Marquez API client +func NewMarquezClient(baseURL string) *MarquezClient { + return &MarquezClient{ + BaseURL: baseURL, + HTTPClient: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +// SendEvent sends an OpenLineage event to Marquez +func (c *MarquezClient) SendEvent(event *OpenLineageEvent) error { + // Set default schema URL if not provided + if event.SchemaURL == "" { + event.SchemaURL = "https://openlineage.io/spec/1-0-5/OpenLineage.json" + } + + // Set event time if not provided + if event.EventTime == "" { + event.EventTime = time.Now().UTC().Format(time.RFC3339Nano) + } + + jsonData, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + url := fmt.Sprintf("%s/api/v1/lineage", c.BaseURL) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + + return nil +} + +// GetLineage retrieves lineage information for a dataset +func (c *MarquezClient) GetLineage(namespace, datasetName string, depth int) (map[string]interface{}, error) { + url := fmt.Sprintf("%s/api/v1/lineage?nodeId=dataset:%s:%s&depth=%d", + c.BaseURL, namespace, datasetName, depth) + + resp, err := c.HTTPClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get lineage: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + + var result map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return result, nil +} + +// GetNamespaces retrieves all namespaces from Marquez +func (c *MarquezClient) GetNamespaces() ([]map[string]interface{}, error) { + url := fmt.Sprintf("%s/api/v1/namespaces", c.BaseURL) + + resp, err := c.HTTPClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get namespaces: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + + var result struct { + Namespaces []map[string]interface{} `json:"namespaces"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return result.Namespaces, nil +} + +// GetJobs retrieves all jobs in a namespace +func (c *MarquezClient) GetJobs(namespace string) ([]map[string]interface{}, error) { + url := fmt.Sprintf("%s/api/v1/namespaces/%s/jobs", c.BaseURL, namespace) + + resp, err := c.HTTPClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get jobs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + + var result struct { + Jobs []map[string]interface{} `json:"jobs"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return result.Jobs, nil +} + +// GetDatasets retrieves all datasets in a namespace +func (c *MarquezClient) GetDatasets(namespace string) ([]map[string]interface{}, error) { + url := fmt.Sprintf("%s/api/v1/namespaces/%s/datasets", c.BaseURL, namespace) + + resp, err := c.HTTPClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get datasets: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + + var result struct { + Datasets []map[string]interface{} `json:"datasets"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return result.Datasets, nil +} + +// GetJobRuns retrieves runs for a specific job +func (c *MarquezClient) GetJobRuns(namespace, jobName string) ([]map[string]interface{}, error) { + url := fmt.Sprintf("%s/api/v1/namespaces/%s/jobs/%s/runs", c.BaseURL, namespace, jobName) + + resp, err := c.HTTPClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get job runs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + + var result struct { + Runs []map[string]interface{} `json:"runs"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return result.Runs, nil +}