merge: quick/add-marquez-cli-utilities — Añadidas utilidades CLI para Marquez/OpenLineage

This commit is contained in:
2026-03-23 23:42:43 +01:00
12 changed files with 2541 additions and 8 deletions
+1
View File
@@ -0,0 +1 @@
rill-data/tmp/
+54 -7
View File
@@ -48,15 +48,27 @@ Este documento describe qué servicios puedo manipular directamente, cuáles req
``` ```
- ✅ **Marquez DB**: Puerto 5433 (para metadata) - ✅ **Marquez DB**: Puerto 5433 (para metadata)
### 4. **Marquez (OpenLineage)** (Medio - API REST) ### 4. **Marquez (OpenLineage)** (Fácil - CLI Nativo)
- ✅ **Capacidad**: Enviar eventos de lineage via API - ✅ **Capacidad**: Enviar eventos de lineage via CLI `marquez-cli`
- ✅ **Uso**: Rastrear origen/destino de datos en cada paso - ✅ **Uso**: Rastrear origen/destino de datos en cada paso
- ✅ **CLI Nativo**: `marquez-cli` (binario Go sin dependencias)
- **Instalación**:
```bash
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
make install
```
- **Ejemplo**: - **Ejemplo**:
```bash ```bash
curl -X POST http://localhost:5000/api/v1/lineage \ # Iniciar un run
-H "Content-Type: application/json" \ marquez-cli run start -job my_pipeline -inputs "api://source"
-d @lineage_event.json
# Completar run
marquez-cli run complete -job my_pipeline -run-id <uuid> -outputs "postgres://table"
# Ver lineage
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
``` ```
- **Documentación**: Ver `MARQUEZ_UTILITIES.md` para guía completa
### 5. **Logs (Prometheus/Loki)** (Medio - Pushgateway/API) ### 5. **Logs (Prometheus/Loki)** (Medio - Pushgateway/API)
- ✅ **Prometheus**: Exportar métricas vía Pushgateway - ✅ **Prometheus**: Exportar métricas vía Pushgateway
@@ -210,9 +222,44 @@ handlers:
--- ---
## 🎯 Scripts Helper Necesarios ## 🎯 Scripts Helper Disponibles
### ✅ `marquez-cli` (RECOMENDADO - Binario Go Nativo)
**CLI oficial** para gestionar lineage en Marquez. Instalado en `~/.local/bin/marquez-cli`.
```bash
# Iniciar run
marquez-cli run start -job my_pipeline -inputs "api://source"
# Marcar progreso
marquez-cli run running -job my_pipeline -run-id <uuid> \
-inputs "file:///tmp/raw.json" -outputs "file:///tmp/clean.json"
# Completar exitosamente
marquez-cli run complete -job my_pipeline -run-id <uuid> \
-outputs "postgres://table"
# Marcar como fallido
marquez-cli run fail -job my_pipeline -run-id <uuid>
# Ver lineage
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
```
**Documentación completa**: Ver `MARQUEZ_UTILITIES.md`
**Ejemplos de uso**:
- `~/dagu/scripts/examples/simple_pipeline_with_lineage.sh`
- `~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh`
- `~/dagu/dags/example_lineage_tracking.yaml`
---
### Alternativa: `~/dagu/scripts/log_lineage.py` (Python)
Solo si no puedes usar `marquez-cli`:
### 1. `~/dagu/scripts/log_lineage.py`
```python ```python
#!/usr/bin/env python3 #!/usr/bin/env python3
import requests import requests
+409
View File
@@ -0,0 +1,409 @@
# Marquez CLI Utilities
Utilidades en Go para gestionar **datasets**, **jobs** y **runs** con lineage tracking completo en Marquez (OpenLineage).
---
## 📦 ¿Qué se Creó?
### 1. **Binario `marquez-cli`** (Go)
Herramienta CLI completa para interactuar con Marquez:
- ✅ Registrar y consultar **datasets**
- ✅ Registrar y consultar **jobs**
- ✅ Enviar eventos de **runs** (START, RUNNING, COMPLETE, FAIL)
- ✅ Consultar **lineage** de datasets
- ✅ Listar recursos (namespaces, jobs, datasets)
- ✅ Sin dependencias externas (solo Go stdlib)
- ✅ Binario estático de ~5MB
**Ubicación**: `~/AutomaticProyects/automatic_process/tools/marquez-cli/`
**Instalación**:
```bash
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
make install
```
**Uso**:
```bash
marquez-cli help
marquez-cli version
```
---
### 2. **Scripts de Ejemplo**
#### a) `simple_pipeline_with_lineage.sh`
Pipeline simple que demuestra:
- Generación de Run ID
- Eventos START, RUNNING, COMPLETE
- Tracking de transformaciones
- Manejo de errores
**Ubicación**: `~/dagu/scripts/examples/simple_pipeline_with_lineage.sh`
**Uso**:
```bash
~/dagu/scripts/examples/simple_pipeline_with_lineage.sh
```
#### b) `etl_to_postgres_with_lineage.sh`
ETL completo con carga a PostgreSQL:
- Extract desde API
- Transform con jq
- Load a PostgreSQL
- Lineage completo del flujo
**Ubicación**: `~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh`
**Uso**:
```bash
~/dagu/scripts/examples/etl_to_postgres_with_lineage.sh
```
---
### 3. **DAG de Ejemplo para Dagu**
DAG completo con lineage tracking integrado:
- Generación de Run ID único
- Eventos en cada paso del pipeline
- Handler de errores (FAIL event)
- Handler de éxito
- Cleanup de archivos temporales
**Ubicación**: `~/dagu/dags/example_lineage_tracking.yaml`
**Uso**:
```bash
# Ejecutar manualmente desde Dagu UI
http://localhost:8090
# O desde CLI
dagu start example_lineage_tracking
```
---
## 🎯 Conceptos Clave de Marquez
### Datasets
Representan **fuentes de datos** (tablas, archivos, APIs, streams).
**Naming Convention**:
```
postgres://host:port/db/schema/table
clickhouse://host:port/database/table
nats://host:port/subject
file:///absolute/path
api://domain/endpoint
```
**Comandos**:
```bash
# Registrar dataset
marquez-cli dataset register -name "postgres://localhost:5434/postgres/public/events"
# Listar datasets
marquez-cli list datasets
```
---
### Jobs
Representan **procesos/pipelines** que leen/escriben datasets.
**Naming Convention**:
- Usa nombres descriptivos: `fetch_api_data`, `transform_sales`
- Evita guiones, usa guiones bajos
- Mismo nombre que el DAG en Dagu
**Comandos**:
```bash
# Registrar job
marquez-cli job register -name my_pipeline
# Listar jobs
marquez-cli list jobs
# Ver runs de un job
marquez-cli job runs -name my_pipeline
```
---
### Runs
Representan **ejecuciones** de un job con inputs/outputs específicos.
**Lifecycle**:
1. **START** - Inicio de ejecución
2. **RUNNING** - Progreso (opcional, múltiples)
3. **COMPLETE** o **FAIL** - Finalización
**Comandos**:
```bash
RUN_ID=$(uuidgen)
# START
marquez-cli run start -job my_job -run-id $RUN_ID
# RUNNING (progreso intermedio)
marquez-cli run running -job my_job -run-id $RUN_ID \
-inputs "file:///tmp/raw.json" \
-outputs "file:///tmp/clean.json"
# COMPLETE
marquez-cli run complete -job my_job -run-id $RUN_ID \
-outputs "postgres://table"
# FAIL (en caso de error)
marquez-cli run fail -job my_job -run-id $RUN_ID
```
---
## 📋 Reglas de Lineage
### ✅ SIEMPRE debes:
1. **Enviar evento START** al inicio del pipeline
2. **Usar el mismo Run ID** en todos los eventos del mismo run
3. **Declarar TODOS los inputs** (APIs, archivos, tablas que lees)
4. **Declarar TODOS los outputs** (archivos, streams, tablas que escribes)
5. **Enviar evento COMPLETE** al finalizar exitosamente
6. **Enviar evento FAIL** si hay errores (usar trap/handler)
7. **Usar URIs bien formados** para datasets
8. **Usar namespace consistente** (`automatic-process`)
### ❌ NUNCA debes:
1. Omitir el evento START
2. Olvidar el evento COMPLETE/FAIL
3. Cambiar el Run ID en medio del pipeline
4. Usar nombres ambiguos para datasets
5. Saltarte el lineage en pipelines críticos
---
## 🔍 Recuperar Lineage
### Método 1: CLI
```bash
# Ver lineage de un dataset
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
# Formato JSON (para scripts)
marquez-cli lineage -name "postgres://table" -format json
# Con más profundidad
marquez-cli lineage -name "postgres://table" -depth 20
```
### Método 2: Web UI
```bash
# Abrir Marquez Web UI
xdg-open http://localhost:3001
# Buscar por job o dataset
# Ver grafo visual de lineage
```
### Método 3: API REST
```bash
# Ver lineage directo desde API
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:automatic-process:postgres://table&depth=10" | jq .
# Listar jobs
curl http://localhost:5000/api/v1/namespaces/automatic-process/jobs | jq .
# Ver runs de un job
curl http://localhost:5000/api/v1/namespaces/automatic-process/jobs/my_job/runs | jq .
```
---
## 🚀 Quick Start
### 1. Instalar marquez-cli
```bash
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
make install
marquez-cli version
```
### 2. Probar con Ejemplo
```bash
# Ejecutar pipeline de ejemplo
~/dagu/scripts/examples/simple_pipeline_with_lineage.sh
# Ver lineage generado
marquez-cli lineage -name "file:///tmp/users_clean.json"
```
### 3. Crear tu Primer Pipeline
```bash
#!/bin/bash
set -euo pipefail
JOB_NAME="my_first_pipeline"
RUN_ID=$(uuidgen)
# Manejo de errores
cleanup() {
marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID
}
trap cleanup ERR
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# Tu trabajo aquí
echo "Doing work..."
curl https://api.example.com/data > /tmp/data.json
# COMPLETE
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://example.com/data" \
-outputs "file:///tmp/data.json"
echo "✓ Pipeline completed!"
```
---
## 📊 Estructura de Archivos
```
~/AutomaticProyects/automatic_process/
├── tools/
│ └── marquez-cli/ # Código fuente del CLI
│ ├── main.go # CLI principal
│ ├── openlineage.go # Cliente API
│ ├── go.mod # Módulo Go
│ ├── Makefile # Build automation
│ ├── README.md # Documentación completa
│ ├── QUICKSTART.md # Guía rápida
│ └── marquez-cli # Binario compilado
├── MARQUEZ_UTILITIES.md # Este archivo
~/dagu/
├── scripts/
│ └── examples/
│ ├── simple_pipeline_with_lineage.sh
│ └── etl_to_postgres_with_lineage.sh
└── dags/
└── example_lineage_tracking.yaml
```
---
## 🔧 Comandos Útiles
### Gestión de Runs
```bash
# Iniciar run con inputs/outputs
marquez-cli run start -job my_job -inputs "api://source" -outputs "file:///tmp/data"
# Marcar progreso
marquez-cli run running -job my_job -run-id <uuid> -inputs "file:///a" -outputs "file:///b"
# Completar exitosamente
marquez-cli run complete -job my_job -run-id <uuid> -outputs "postgres://table"
# Marcar como fallido
marquez-cli run fail -job my_job -run-id <uuid>
```
### Consultas
```bash
# Ver todos los jobs
marquez-cli list jobs
# Ver runs de un job
marquez-cli job runs -name my_job
# Ver lineage completo
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
# Ver datasets
marquez-cli list datasets
```
---
## 📚 Documentación Adicional
- **README Completo**: `~/AutomaticProyects/automatic_process/tools/marquez-cli/README.md`
- **Quick Start**: `~/AutomaticProyects/automatic_process/tools/marquez-cli/QUICKSTART.md`
- **CLAUDE.md**: Guía de servicios manipulables
- **OpenLineage Spec**: https://openlineage.io/
- **Marquez Docs**: https://marquezproject.ai/
---
## 🎯 Próximos Pasos
1. **Ejecuta el ejemplo**:
```bash
~/dagu/scripts/examples/simple_pipeline_with_lineage.sh
```
2. **Verifica el lineage**:
```bash
marquez-cli lineage -name "file:///tmp/users_clean.json"
```
3. **Adapta el patrón** a tus propios pipelines
4. **Siempre sigue las reglas** de lineage tracking
---
## 💡 Tips
- **Usa variables de entorno** para configuración:
```bash
export MARQUEZ_URL="http://localhost:5000"
export MARQUEZ_NAMESPACE="automatic-process"
```
- **Genera Run ID una sola vez** y reutilízalo en todo el pipeline:
```bash
RUN_ID=$(uuidgen)
```
- **Usa trap para manejar errores** automáticamente:
```bash
trap 'marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID' ERR
```
- **Verifica lineage después** de cada ejecución:
```bash
marquez-cli lineage -name "postgres://table"
```
---
**Última actualización**: 2026-03-23
**Versión**: 1.0.0
**Autor**: Lucas (@egutierrez)
+305
View File
@@ -0,0 +1,305 @@
# 📊 Configuración de Herramientas de Visualización
## ✅ Estado de Configuración
Todas las herramientas de visualización están conectadas a PostgreSQL y ClickHouse:
-**Grafana** - Datasources configurados via provisioning YAML
-**Metabase** - Configuración automática via API (sin UI)
-**Rill** - Sources configurados en rill.yaml
---
## 🔑 Credenciales de Acceso
### Grafana
- **URL**: http://localhost:3500
- **Usuario**: `admin`
- **Contraseña**: `admin123`
- **Datasources configurados**:
- PostgreSQL (postgres-main:5432)
- ClickHouse (clickhouse:8123)
- Prometheus
- Loki
- Tempo
### Metabase
- **URL**: http://localhost:3200
- **Usuario**: `admin@example.com`
- **Contraseña**: `Admin123!@#`
- **Datasources configurados**:
- PostgreSQL Main (postgres-main:5432)
- ClickHouse Analytics (clickhouse:8123)
- Sample Database (H2 - demo)
### Rill
- **URL**: http://localhost:9009
- **Autenticación**: No requiere
- **Sources configurados**:
- `postgres_main` (postgres-main:5432)
- `clickhouse_main` (clickhouse:9000)
---
## 🚀 Configuración Automática de Metabase
El script `configure_metabase.py` configura automáticamente Metabase:
### Primera vez (setup inicial):
```bash
cd /home/lucas/dagu/scripts
./configure_metabase.py
```
Este script:
1. ✅ Detecta si Metabase necesita setup inicial
2. ✅ Crea usuario admin automáticamente
3. ✅ Configura datasources de PostgreSQL y ClickHouse
4. ✅ Todo sin interacción manual
### Si reseteas Metabase:
```bash
# Eliminar datos de Metabase
docker-compose -f docker-compose-analytics.yml down metabase metabase-db
docker volume rm automatic_process_metabase-data
# Reiniciar y configurar automáticamente
docker-compose -f docker-compose-analytics.yml up -d metabase-db metabase
sleep 50 # Esperar a que inicie
./configure_metabase.py
```
---
## 🔧 Archivos de Configuración
### Grafana
**Ubicación**: `/home/lucas/DataProyects/suite_logs/config/grafana/provisioning/datasources/datasources.yml`
Datasources PostgreSQL y ClickHouse añadidos automáticamente al desplegar Grafana.
### Rill
**Ubicación**: `/home/lucas/AutomaticProyects/automatic_process/rill-data/rill.yaml`
```yaml
sources:
- name: postgres_main
type: sql
connector: postgres
settings:
host: postgres-main
port: 5432
database: postgres
user: postgres
password: postgres
ssl_mode: disable
raw_sql: true
- name: clickhouse_main
type: sql
connector: clickhouse
settings:
host: clickhouse
port: 9000
database: default
user: default
password: clickhouse
ssl: false
```
---
## ✅ Verificación de Conectividad
### Script de Verificación
```bash
/home/lucas/dagu/scripts/test_db_connections.sh
```
Verifica:
- ✓ PostgreSQL (localhost:5434)
- ✓ ClickHouse (localhost:8123)
- ✓ Grafana (localhost:3500)
- ✓ Metabase (localhost:3200)
- ✓ Rill (localhost:9009)
### Verificación Manual
#### Grafana
1. Login en http://localhost:3500
2. Ir a **Connections** > **Data sources**
3. Verificar datasources "PostgreSQL" y "ClickHouse"
4. Hacer clic en cada uno y presionar **Test** (debe mostrar ✓)
#### Metabase
1. Login en http://localhost:3200
2. Ir a **Admin Settings** > **Databases**
3. Verificar "PostgreSQL Main" y "ClickHouse Analytics"
4. Estado debe mostrar "Connected"
#### Rill
1. Abrir http://localhost:9009
2. Ver pestaña **Sources**
3. Verificar `postgres_main` y `clickhouse_main`
4. Ejecutar query de prueba: `SELECT 1`
---
## 📊 Bases de Datos Disponibles
### PostgreSQL
- **Host interno**: `postgres-main:5432`
- **Host externo**: `localhost:5434`
- **Usuario**: `postgres`
- **Contraseña**: `postgres`
- **Base de datos**: `postgres`
- **Estado**: Vacía (sin tablas)
### ClickHouse
- **Host interno**: `clickhouse:9000` (native), `clickhouse:8123` (HTTP)
- **Host externo**: `localhost:9000`, `localhost:8123`
- **Usuario**: `default`
- **Contraseña**: `clickhouse`
- **Base de datos**: `default`
- **Estado**: Vacía (sin tablas)
---
## 🎯 Próximos Pasos
### 1. Crear Tablas de Ejemplo (Opcional)
#### PostgreSQL
```sql
-- Conectar vía psql
PGPASSWORD=postgres psql -h localhost -p 5434 -U postgres -d postgres
-- Crear tabla de eventos
CREATE TABLE events (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT NOW(),
event_type VARCHAR(100),
user_id INTEGER,
data JSONB
);
-- Insertar datos de prueba
INSERT INTO events (event_type, user_id, data)
VALUES
('login', 1, '{"ip": "192.168.1.1"}'),
('purchase', 1, '{"product": "laptop", "amount": 1200}'),
('logout', 1, '{"duration": 3600}');
```
#### ClickHouse
```sql
-- Conectar vía clickhouse-client
clickhouse-client --host localhost --port 9000 --user default --password clickhouse
-- Crear tabla de métricas
CREATE TABLE metrics (
timestamp DateTime,
metric_name String,
metric_value Float64,
tags Map(String, String)
) ENGINE = MergeTree()
ORDER BY timestamp;
-- Insertar datos de prueba
INSERT INTO metrics VALUES
(now(), 'cpu_usage', 45.2, {'host': 'server1'}),
(now(), 'memory_usage', 72.8, {'host': 'server1'}),
(now(), 'disk_usage', 58.3, {'host': 'server1'});
```
### 2. Crear Dashboards de Ejemplo
#### En Grafana
1. New Dashboard → Add visualization
2. Seleccionar datasource "PostgreSQL"
3. Query: `SELECT * FROM events ORDER BY timestamp DESC LIMIT 10`
4. Visualizar como tabla o gráfico
#### En Metabase
1. New → Question
2. Seleccionar "PostgreSQL Main"
3. Simple Question → Pick Table → events
4. Guardar y añadir a dashboard
#### En Rill
1. Crear modelo en `rill-data/rill.yaml`:
```yaml
models:
- name: recent_events
sql: |
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT 100
source: postgres_main
```
2. Restart Rill: `docker-compose -f docker-compose-analytics.yml restart rill`
### 3. Configurar Pipelines con Dagu
Ver `TRANSFORMATIONS.md` para ejemplos de DAGs que:
- Extraigan datos de APIs
- Transformen datos
- Carguen a PostgreSQL/ClickHouse
- Generen visualizaciones automáticas
---
## 🔧 Troubleshooting
### Metabase no se configura
```bash
# Verificar logs
docker logs metabase
# Resetear y reconfigurar
docker-compose -f docker-compose-analytics.yml down metabase metabase-db
docker volume rm automatic_process_metabase-data
docker-compose -f docker-compose-analytics.yml up -d metabase-db metabase
sleep 50
/home/lucas/dagu/scripts/configure_metabase.py
```
### Grafana no ve las bases de datos
```bash
# Verificar que Grafana está en ambas redes
docker inspect grafana | grep -A 10 Networks
# Debe mostrar: suite-logs_monitoring y automatic_process_default
# Probar DNS desde Grafana
docker exec grafana nslookup postgres-main
docker exec grafana nslookup clickhouse
```
### Rill no carga sources
```bash
# Verificar configuración
cat /home/lucas/AutomaticProyects/automatic_process/rill-data/rill.yaml
# Revisar logs
docker logs rill
# Reiniciar
docker-compose -f docker-compose-analytics.yml restart rill
```
---
## 📚 Referencias
- **Grafana Datasources**: http://localhost:3500/connections/datasources
- **Metabase Admin**: http://localhost:3200/admin/databases
- **Rill Dashboard**: http://localhost:9009
- **DBGate** (DB Manager): http://localhost:3300
- **Marquez** (Lineage): http://localhost:3001
- **Homer** (Dashboard Hub): http://localhost:8080
---
**Última actualización**: 2026-03-23
**Configuración**: Automática via scripts
+24 -1
View File
@@ -1,2 +1,25 @@
sources: [] sources:
- name: postgres_main
type: sql
connector: postgres
settings:
host: postgres-main
port: 5432
database: postgres
user: postgres
password: postgres
ssl_mode: disable
raw_sql: true
- name: clickhouse_main
type: sql
connector: clickhouse
settings:
host: clickhouse
port: 9000
database: default
user: default
password: clickhouse
ssl: false
models: [] models: []
+49
View File
@@ -0,0 +1,49 @@
# Makefile for marquez-cli
BINARY_NAME=marquez-cli
INSTALL_PATH=$(HOME)/.local/bin
GO=go
GOFLAGS=-ldflags="-s -w"
.PHONY: all build install clean test help
all: build
## build: Build the binary
build:
@echo "Building $(BINARY_NAME)..."
@$(GO) build $(GOFLAGS) -o $(BINARY_NAME) .
@echo "✓ Binary built: ./$(BINARY_NAME)"
## install: Build and install to ~/.local/bin
install: build
@echo "Installing $(BINARY_NAME) to $(INSTALL_PATH)..."
@mkdir -p $(INSTALL_PATH)
@cp $(BINARY_NAME) $(INSTALL_PATH)/
@chmod +x $(INSTALL_PATH)/$(BINARY_NAME)
@echo "✓ Installed to $(INSTALL_PATH)/$(BINARY_NAME)"
@echo ""
@echo "Make sure $(INSTALL_PATH) is in your PATH:"
@echo " export PATH=\"\$$PATH:$(INSTALL_PATH)\""
## clean: Remove built binaries
clean:
@echo "Cleaning..."
@rm -f $(BINARY_NAME)
@echo "✓ Cleaned"
## test: Run tests
test:
@echo "Running tests..."
@$(GO) test -v ./...
## uninstall: Remove installed binary
uninstall:
@echo "Uninstalling $(BINARY_NAME)..."
@rm -f $(INSTALL_PATH)/$(BINARY_NAME)
@echo "✓ Uninstalled"
## help: Show this help
help:
@echo "Available targets:"
@sed -n 's/^##//p' Makefile | column -t -s ':' | sed -e 's/^/ /'
+213
View File
@@ -0,0 +1,213 @@
# marquez-cli - Quick Start Guide
Guía rápida para empezar a usar `marquez-cli` en tus pipelines.
---
## ⚡ Instalación Rápida
```bash
# Compilar e instalar
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
make install
# Verificar
marquez-cli version
```
---
## 🎯 Uso Básico
### 1. Flujo Completo en un Script
```bash
#!/bin/bash
JOB_NAME="my_pipeline"
RUN_ID=$(uuidgen)
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# Hacer trabajo...
curl https://api.example.com/data > /tmp/data.json
# COMPLETE
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://example.com/data" \
-outputs "file:///tmp/data.json"
```
### 2. Con Manejo de Errores
```bash
#!/bin/bash
set -euo pipefail
JOB_NAME="my_pipeline"
RUN_ID=$(uuidgen)
cleanup() {
marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID
}
trap cleanup ERR
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# Tu trabajo aquí...
marquez-cli run complete -job $JOB_NAME -run-id $RUN_ID
```
### 3. Pipeline Multi-Paso
```bash
JOB_NAME="etl_pipeline"
RUN_ID=$(uuidgen)
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# EXTRACT
curl https://api.example.com/data > /tmp/raw.json
marquez-cli run running \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://example.com/data" \
-outputs "file:///tmp/raw.json"
# TRANSFORM
jq '.data' /tmp/raw.json > /tmp/clean.json
marquez-cli run running \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "file:///tmp/raw.json" \
-outputs "file:///tmp/clean.json"
# LOAD
psql ... -c "COPY table FROM '/tmp/clean.json'"
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "file:///tmp/clean.json" \
-outputs "postgres://localhost:5434/postgres/public/table"
```
---
## 📊 Consultar Lineage
```bash
# Ver datasets
marquez-cli list datasets
# Ver jobs
marquez-cli list jobs
# Ver lineage de un dataset
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
# Ver runs de un job
marquez-cli job runs -name my_pipeline
```
---
## 🔧 Integración con Dagu
Ver DAG de ejemplo: `~/dagu/dags/example_lineage_tracking.yaml`
Patrón básico:
```yaml
env:
- RUN_ID: ""
steps:
- name: init
command: echo "RUN_ID=$(uuidgen)" >> $DAGU_ENV
- name: start
command: marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
depends: [init]
- name: work
command: # tu trabajo aquí
depends: [start]
- name: complete
command: marquez-cli run complete -job $JOB_NAME -run-id $RUN_ID
depends: [work]
handlers:
failure:
- command: marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID
```
---
## 🧪 Probar con Ejemplo
```bash
# Ejecutar script de ejemplo
~/dagu/scripts/examples/simple_pipeline_with_lineage.sh
# Ver lineage generado
marquez-cli lineage -name "file:///tmp/users_clean.json"
# O abrir en navegador
xdg-open http://localhost:3001
```
---
## 📋 Comandos Más Usados
| Comando | Descripción |
|---------|-------------|
| `run start` | Iniciar un run |
| `run complete` | Completar exitosamente |
| `run fail` | Marcar como fallido |
| `run running` | Marcar progreso (intermedio) |
| `lineage` | Ver lineage de dataset |
| `list jobs` | Listar todos los jobs |
| `job runs` | Ver runs de un job |
---
## 🔍 URIs de Datasets
| Tipo | Formato |
|------|---------|
| PostgreSQL | `postgres://host:port/db/schema/table` |
| ClickHouse | `clickhouse://host:port/database/table` |
| NATS | `nats://host:port/subject` |
| Archivo | `file:///absolute/path` |
| API | `api://domain/endpoint` |
---
## ✅ Checklist
Cada pipeline debe:
- [ ] Enviar evento START al inicio
- [ ] Enviar eventos RUNNING en transformaciones intermedias
- [ ] Enviar evento COMPLETE al finalizar exitosamente
- [ ] Enviar evento FAIL si hay errores (handler)
- [ ] Usar el mismo run-id en todos los eventos
- [ ] Declarar todos los inputs/outputs
---
## 📚 Más Información
- [README completo](./README.md)
- [Documentación de OpenLineage](https://openlineage.io/)
- [Marquez Web UI](http://localhost:3001)
---
**Tip**: Usa `marquez-cli help` para ver todos los comandos disponibles.
+645
View File
@@ -0,0 +1,645 @@
# marquez-cli
**OpenLineage/Marquez CLI tool** para gestionar datasets, jobs y runs con lineage tracking completo.
Binario escrito en Go sin dependencias externas, listo para usar en pipelines de Dagu y scripts bash.
---
## 🎯 Características
-**Registrar datasets** en Marquez
-**Registrar jobs** y sus runs
-**Enviar eventos OpenLineage** (START, RUNNING, COMPLETE, FAIL)
-**Consultar lineage** de datasets
-**Listar** namespaces, jobs, datasets y runs
-**Sin dependencias** externas (solo Go stdlib)
-**Binario estático** compilado (~5MB)
---
## 📦 Instalación
### Desde el Código Fuente
```bash
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
# Compilar
make build
# Instalar en ~/.local/bin
make install
# Verificar instalación
marquez-cli version
```
### Variables de Entorno (Opcional)
```bash
export MARQUEZ_URL="http://localhost:5000"
export MARQUEZ_NAMESPACE="automatic-process"
```
---
## 🚀 Uso
### 1. Gestión de Runs
#### Iniciar un Run
```bash
# Sintaxis básica
marquez-cli run start -job my_pipeline
# Con inputs y outputs
marquez-cli run start \
-job fetch_api_data \
-inputs "api://jsonplaceholder.typicode.com/users" \
-outputs "file:///tmp/users.json"
# Con run-id específico (para continuar después)
marquez-cli run start \
-job my_pipeline \
-run-id "abc123-uuid-here" \
-inputs "file:///tmp/raw.json"
```
**Salida:**
```
✓ Run event sent successfully
Event Type: START
Job: automatic-process/fetch_api_data
Run ID: 3c7a4f21-1234-5678-90ab-cdef12345678
Inputs: 1 dataset(s)
Outputs: 1 dataset(s)
```
#### Marcar Run como RUNNING (Progreso)
```bash
marquez-cli run running \
-job my_pipeline \
-run-id "abc123-uuid-here" \
-inputs "file:///tmp/raw.json" \
-outputs "file:///tmp/processed.json"
```
#### Completar un Run Exitosamente
```bash
marquez-cli run complete \
-job my_pipeline \
-run-id "abc123-uuid-here" \
-inputs "file:///tmp/raw.json" \
-outputs "postgres://localhost:5434/postgres/public/events"
```
#### Marcar Run como Fallido
```bash
marquez-cli run fail \
-job my_pipeline \
-run-id "abc123-uuid-here"
```
---
### 2. Gestión de Datasets
#### Registrar un Dataset
```bash
# PostgreSQL table
marquez-cli dataset register \
-name "postgres://localhost:5434/postgres/public/events"
# ClickHouse table
marquez-cli dataset register \
-name "clickhouse://localhost:8123/default/analytics"
# NATS stream
marquez-cli dataset register \
-name "nats://localhost:4222/data.raw"
# Archivo
marquez-cli dataset register \
-name "file:///tmp/data.json"
# API endpoint
marquez-cli dataset register \
-name "api://example.com/users"
```
#### Listar Datasets
```bash
# En el namespace por defecto (automatic-process)
marquez-cli dataset get
# En un namespace específico
marquez-cli dataset get -namespace my-namespace
```
**Salida:**
```
Datasets in namespace 'automatic-process':
• postgres://localhost:5434/postgres/public/events [DB_TABLE]
• file:///tmp/users.json [FILE]
• api://example.com/users [API]
```
---
### 3. Gestión de Jobs
#### Registrar un Job
```bash
marquez-cli job register -name my_pipeline
```
#### Listar Jobs
```bash
# En el namespace por defecto
marquez-cli job get
# En un namespace específico
marquez-cli job get -namespace my-namespace
```
#### Ver Runs de un Job
```bash
marquez-cli job runs -name my_pipeline
```
**Salida:**
```
Runs for job 'automatic-process/my_pipeline':
• 3c7a4f21-1234-5678-90ab-cdef12345678 [COMPLETED] - 2026-03-23T10:30:00.000Z
• 7b8c9d0e-5678-1234-90ab-cdef12345678 [FAILED] - 2026-03-23T09:15:00.000Z
• 1a2b3c4d-9012-3456-78ab-cdef12345678 [RUNNING] - 2026-03-23T11:00:00.000Z
```
---
### 4. Consultar Lineage
#### Obtener Lineage de un Dataset
```bash
# Formato texto (legible)
marquez-cli lineage \
-name "postgres://localhost:5434/postgres/public/events"
# Formato JSON (para scripts)
marquez-cli lineage \
-name "postgres://localhost:5434/postgres/public/events" \
-format json
# Con profundidad personalizada
marquez-cli lineage \
-name "postgres://localhost:5434/postgres/public/events" \
-depth 20
```
**Salida (formato texto):**
```
Lineage for dataset 'automatic-process/postgres://localhost:5434/postgres/public/events':
📦 Datasets (4):
• api://jsonplaceholder.typicode.com/users
• file:///tmp/users.json
• file:///tmp/emails.json
• postgres://localhost:5434/postgres/public/user_emails
⚙️ Jobs (3):
• fetch_api
← Inputs:
- api://jsonplaceholder.typicode.com/users
→ Outputs:
- file:///tmp/users.json
• transform
← Inputs:
- file:///tmp/users.json
→ Outputs:
- file:///tmp/emails.json
• ingest_postgres
← Inputs:
- file:///tmp/emails.json
→ Outputs:
- postgres://localhost:5434/postgres/public/user_emails
```
---
### 5. Listar Recursos
#### Listar Namespaces
```bash
marquez-cli list namespaces
```
#### Listar Jobs
```bash
marquez-cli list jobs
marquez-cli list jobs -namespace my-namespace
```
#### Listar Datasets
```bash
marquez-cli list datasets
marquez-cli list datasets -namespace my-namespace
```
---
## 🔧 Integración con Dagu
### Ejemplo: DAG con Lineage Tracking
```yaml
# ~/dagu/dags/example_with_lineage.yaml
name: example_with_lineage
description: Pipeline con lineage tracking usando marquez-cli
schedule:
- "0 */6 * * *"
env:
- RUN_ID: "" # Se genera dinámicamente
steps:
# PASO 1: Generar Run ID único
- name: generate_run_id
command: echo "RUN_ID=$(uuidgen)" >> $DAGU_ENV
output: RUN_ID
# PASO 2: START event
- name: start_run
command: |
marquez-cli run start \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "api://jsonplaceholder.typicode.com/users"
depends: [generate_run_id]
# PASO 3: Fetch data
- name: fetch_data
command: |
curl -s https://jsonplaceholder.typicode.com/users > /tmp/users.json
marquez-cli run running \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "api://jsonplaceholder.typicode.com/users" \
-outputs "file:///tmp/users.json"
depends: [start_run]
# PASO 4: Transform data
- name: transform_data
command: |
jq '[.[] | {email: .email, name: .name}]' /tmp/users.json > /tmp/emails.json
marquez-cli run running \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "file:///tmp/users.json" \
-outputs "file:///tmp/emails.json"
depends: [fetch_data]
# PASO 5: Load to PostgreSQL
- name: load_postgres
command: |
psql -h localhost -p 5434 -U postgres -d postgres -c \
"CREATE TABLE IF NOT EXISTS user_emails (email TEXT, name TEXT);"
cat /tmp/emails.json | jq -r '.[] | [.email, .name] | @csv' | \
psql -h localhost -p 5434 -U postgres -d postgres -c \
"COPY user_emails FROM STDIN WITH CSV;"
marquez-cli run running \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "file:///tmp/emails.json" \
-outputs "postgres://localhost:5434/postgres/public/user_emails"
depends: [transform_data]
# PASO 6: COMPLETE event
- name: complete_run
command: |
marquez-cli run complete \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "api://jsonplaceholder.typicode.com/users" \
-outputs "postgres://localhost:5434/postgres/public/user_emails"
depends: [load_postgres]
handlers:
failure:
- name: fail_run
command: |
marquez-cli run fail \
-job example_with_lineage \
-run-id $RUN_ID
```
---
## 🔁 Workflow Típico
### 1. Pipeline Simple (START → COMPLETE)
```bash
#!/bin/bash
# Script: ~/dagu/scripts/simple_pipeline.sh
NAMESPACE="automatic-process"
JOB_NAME="simple_pipeline"
RUN_ID=$(marquez-cli run start -job $JOB_NAME -inputs "api://source" | grep "Run ID" | awk '{print $NF}')
echo "Started run: $RUN_ID"
# Hacer el trabajo
curl -s https://api.example.com/data > /tmp/data.json
# Completar
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://source" \
-outputs "file:///tmp/data.json"
echo "Run completed: $RUN_ID"
```
### 2. Pipeline con Manejo de Errores
```bash
#!/bin/bash
# Script: ~/dagu/scripts/pipeline_with_error_handling.sh
set -euo pipefail
JOB_NAME="pipeline_with_errors"
RUN_ID=$(uuidgen)
# Función de cleanup en caso de error
cleanup() {
marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID
echo "Pipeline failed, run marked as FAILED"
}
trap cleanup ERR
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# Trabajo
echo "Processing..."
# ... tu lógica aquí ...
# COMPLETE (solo si todo fue exitoso)
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-outputs "postgres://table"
echo "Pipeline completed successfully"
```
### 3. Pipeline Multi-Paso
```bash
#!/bin/bash
JOB_NAME="multi_step_pipeline"
RUN_ID=$(uuidgen)
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# PASO 1: Extract
echo "Extracting..."
curl -s https://api.example.com/data > /tmp/raw.json
marquez-cli run running \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://example.com/data" \
-outputs "file:///tmp/raw.json"
# PASO 2: Transform
echo "Transforming..."
jq '.data' /tmp/raw.json > /tmp/clean.json
marquez-cli run running \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "file:///tmp/raw.json" \
-outputs "file:///tmp/clean.json"
# PASO 3: Load
echo "Loading..."
psql -h localhost -p 5434 -U postgres -d postgres \
-c "COPY events FROM '/tmp/clean.json';"
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "file:///tmp/clean.json" \
-outputs "postgres://localhost:5434/postgres/public/events"
echo "Pipeline completed"
```
---
## 📋 Convenciones de Naming
### Dataset URIs
Usa siempre URIs descriptivos:
| Tipo | Formato | Ejemplo |
|------|---------|---------|
| PostgreSQL | `postgres://host:port/db/schema/table` | `postgres://localhost:5434/postgres/public/events` |
| ClickHouse | `clickhouse://host:port/database/table` | `clickhouse://localhost:8123/default/analytics` |
| NATS | `nats://host:port/subject` | `nats://localhost:4222/data.raw` |
| Archivo | `file:///absolute/path` | `file:///tmp/data.json` |
| API | `api://domain/endpoint` | `api://example.com/users` |
| S3 | `s3://bucket/key` | `s3://my-bucket/data/file.parquet` |
### Namespaces
- Usa el namespace `automatic-process` para todos tus pipelines
- Puedes crear namespaces adicionales para proyectos específicos
### Job Names
- Usa nombres descriptivos: `fetch_api_data`, `transform_sales`, `load_warehouse`
- Evita guiones (`-`), usa guiones bajos (`_`)
- Mantén consistencia con los nombres de DAGs en Dagu
---
## 🔍 Verificar Lineage
Después de ejecutar tu pipeline, verifica el lineage:
```bash
# 1. Ver jobs ejecutados
marquez-cli list jobs
# 2. Ver runs de tu job
marquez-cli job runs -name my_pipeline
# 3. Ver datasets creados
marquez-cli list datasets
# 4. Ver lineage completo de un dataset
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
```
También puedes usar la **Web UI de Marquez**: http://localhost:3001
---
## 🛠️ Desarrollo
### Compilar desde el Código Fuente
```bash
# Clonar el proyecto
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
# Compilar
make build
# Ejecutar sin instalar
./marquez-cli help
# Instalar
make install
# Limpiar binarios
make clean
# Desinstalar
make uninstall
```
### Estructura del Proyecto
```
marquez-cli/
├── main.go # CLI principal con comandos
├── openlineage.go # Cliente HTTP y estructuras OpenLineage
├── go.mod # Módulo de Go
├── Makefile # Build automation
└── README.md # Documentación
```
---
## 📊 API de Marquez Utilizada
El CLI interactúa con estos endpoints de Marquez:
| Endpoint | Método | Uso |
|----------|--------|-----|
| `/api/v1/lineage` | POST | Enviar eventos OpenLineage |
| `/api/v1/lineage` | GET | Obtener lineage de dataset |
| `/api/v1/namespaces` | GET | Listar namespaces |
| `/api/v1/namespaces/{ns}/jobs` | GET | Listar jobs |
| `/api/v1/namespaces/{ns}/datasets` | GET | Listar datasets |
| `/api/v1/namespaces/{ns}/jobs/{job}/runs` | GET | Listar runs de job |
Documentación completa: https://marquezproject.github.io/marquez/openapi.html
---
## 🎯 Checklist de Lineage
Usa esta checklist en cada pipeline:
- [ ] **START event** al inicio del pipeline
- [ ] **RUNNING events** en cada transformación intermedia
- [ ] **COMPLETE event** al finalizar exitosamente
- [ ] **FAIL event** si hay errores (handler)
- [ ] Declarar **TODOS** los inputs (APIs, archivos, tablas)
- [ ] Declarar **TODOS** los outputs (archivos, streams, tablas)
- [ ] Usar **URIs bien formados** para datasets
- [ ] Usar **mismo namespace** (`automatic-process`)
- [ ] Usar **mismo run-id** en todos los eventos del mismo run
- [ ] Verificar lineage en Marquez Web UI
---
## 🐛 Troubleshooting
### Error: "connection refused"
```bash
# Verificar que Marquez esté corriendo
docker ps | grep marquez
# Iniciar Marquez si no está corriendo
docker-compose -f docker-compose-marquez.yml up -d
# Probar conexión
curl http://localhost:5000/api/v1/namespaces
```
### Error: "API error (status 400)"
Revisa que:
- El namespace existe
- Los URIs de datasets estén bien formados
- El run-id sea un UUID válido
### Run-id no es consistente entre pasos
Usa una variable de entorno o archivo temporal:
```bash
# Opción 1: Variable de entorno
export RUN_ID=$(uuidgen)
marquez-cli run start -job my_job -run-id $RUN_ID
marquez-cli run complete -job my_job -run-id $RUN_ID
# Opción 2: Archivo temporal
uuidgen > /tmp/run_id.txt
RUN_ID=$(cat /tmp/run_id.txt)
marquez-cli run start -job my_job -run-id $RUN_ID
```
---
## 📚 Recursos
- **OpenLineage Spec**: https://openlineage.io/
- **Marquez Docs**: https://marquezproject.ai/
- **Marquez Web UI**: http://localhost:3001
- **Marquez API**: http://localhost:5000/api/v1
- **Dagu Docs**: https://dagu.sh/
---
## 📝 Licencia
MIT License - Proyecto Automatic Process
---
**Última actualización**: 2026-03-23
**Versión**: 1.0.0
**Autor**: Lucas (@egutierrez)
+7
View File
@@ -0,0 +1,7 @@
module github.com/automatic-process/marquez-cli
go 1.21
require (
// No external dependencies - uses only Go standard library
)
+604
View File
@@ -0,0 +1,604 @@
package main
import (
"crypto/rand"
"encoding/json"
"flag"
"fmt"
"os"
"strings"
"time"
)
const (
version = "1.0.0"
defaultMarquezURL = "http://localhost:5000"
defaultNamespace = "automatic-process"
defaultProducer = "marquez-cli"
)
// generateUUID generates a simple UUID v4
func generateUUID() string {
b := make([]byte, 16)
rand.Read(b)
// Set version (4) and variant bits
b[6] = (b[6] & 0x0f) | 0x40
b[8] = (b[8] & 0x3f) | 0x80
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}
func main() {
if len(os.Args) < 2 {
printUsage()
os.Exit(1)
}
command := os.Args[1]
switch command {
case "run":
handleRunCommand()
case "dataset":
handleDatasetCommand()
case "job":
handleJobCommand()
case "lineage":
handleLineageCommand()
case "list":
handleListCommand()
case "version":
fmt.Printf("marquez-cli version %s\n", version)
case "help", "-h", "--help":
printUsage()
default:
fmt.Fprintf(os.Stderr, "Unknown command: %s\n\n", command)
printUsage()
os.Exit(1)
}
}
func handleRunCommand() {
if len(os.Args) < 3 {
fmt.Println("Usage: marquez-cli run [start|complete|fail|running] [options]")
os.Exit(1)
}
eventType := strings.ToUpper(os.Args[2])
runCmd := flag.NewFlagSet("run", flag.ExitOnError)
marquez := runCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := runCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
jobName := runCmd.String("job", "", "Job name (required)")
runID := runCmd.String("run-id", "", "Run ID (auto-generated if not provided)")
producer := runCmd.String("producer", defaultProducer, "Producer URI")
inputs := runCmd.String("inputs", "", "Comma-separated list of input datasets")
outputs := runCmd.String("outputs", "", "Comma-separated list of output datasets")
eventTime := runCmd.String("event-time", "", "Event time (ISO 8601, defaults to now)")
runCmd.Parse(os.Args[3:])
if *jobName == "" {
fmt.Fprintln(os.Stderr, "Error: -job is required")
os.Exit(1)
}
// Generate run ID if not provided
if *runID == "" {
*runID = generateUUID()
}
// Set event time to now if not provided
if *eventTime == "" {
*eventTime = time.Now().UTC().Format(time.RFC3339Nano)
}
client := NewMarquezClient(*marquez)
// Parse input datasets
var inputDatasets []Dataset
if *inputs != "" {
for _, input := range strings.Split(*inputs, ",") {
input = strings.TrimSpace(input)
if input != "" {
inputDatasets = append(inputDatasets, Dataset{
Namespace: *namespace,
Name: input,
})
}
}
}
// Parse output datasets
var outputDatasets []Dataset
if *outputs != "" {
for _, output := range strings.Split(*outputs, ",") {
output = strings.TrimSpace(output)
if output != "" {
outputDatasets = append(outputDatasets, Dataset{
Namespace: *namespace,
Name: output,
})
}
}
}
event := &OpenLineageEvent{
EventType: eventType,
EventTime: *eventTime,
Producer: *producer,
Job: Job{
Namespace: *namespace,
Name: *jobName,
},
Run: Run{
RunID: *runID,
},
Inputs: inputDatasets,
Outputs: outputDatasets,
}
if err := client.SendEvent(event); err != nil {
fmt.Fprintf(os.Stderr, "Error sending event: %v\n", err)
os.Exit(1)
}
fmt.Printf("✓ Run event sent successfully\n")
fmt.Printf(" Event Type: %s\n", eventType)
fmt.Printf(" Job: %s/%s\n", *namespace, *jobName)
fmt.Printf(" Run ID: %s\n", *runID)
if len(inputDatasets) > 0 {
fmt.Printf(" Inputs: %d dataset(s)\n", len(inputDatasets))
}
if len(outputDatasets) > 0 {
fmt.Printf(" Outputs: %d dataset(s)\n", len(outputDatasets))
}
}
func handleDatasetCommand() {
if len(os.Args) < 3 {
fmt.Println("Usage: marquez-cli dataset [register|get] [options]")
os.Exit(1)
}
action := os.Args[2]
switch action {
case "register":
registerDataset()
case "get":
getDataset()
default:
fmt.Fprintf(os.Stderr, "Unknown dataset action: %s\n", action)
os.Exit(1)
}
}
func registerDataset() {
dsCmd := flag.NewFlagSet("dataset register", flag.ExitOnError)
marquez := dsCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := dsCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace")
name := dsCmd.String("name", "", "Dataset name (required, e.g., 'postgres://table' or 'file:///path')")
jobName := dsCmd.String("job", "dataset-registration", "Job name that creates this dataset")
runID := dsCmd.String("run-id", "", "Run ID (auto-generated if not provided)")
dsCmd.Parse(os.Args[3:])
if *name == "" {
fmt.Fprintln(os.Stderr, "Error: -name is required")
os.Exit(1)
}
if *runID == "" {
*runID = generateUUID()
}
client := NewMarquezClient(*marquez)
// Create a simple event to register the dataset
event := &OpenLineageEvent{
EventType: EventTypeComplete,
EventTime: time.Now().UTC().Format(time.RFC3339Nano),
Producer: defaultProducer,
Job: Job{
Namespace: *namespace,
Name: *jobName,
},
Run: Run{
RunID: *runID,
},
Outputs: []Dataset{
{
Namespace: *namespace,
Name: *name,
},
},
}
if err := client.SendEvent(event); err != nil {
fmt.Fprintf(os.Stderr, "Error registering dataset: %v\n", err)
os.Exit(1)
}
fmt.Printf("✓ Dataset registered successfully\n")
fmt.Printf(" Namespace: %s\n", *namespace)
fmt.Printf(" Name: %s\n", *name)
}
func getDataset() {
dsCmd := flag.NewFlagSet("dataset get", flag.ExitOnError)
marquez := dsCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := dsCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace")
dsCmd.Parse(os.Args[3:])
client := NewMarquezClient(*marquez)
datasets, err := client.GetDatasets(*namespace)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting datasets: %v\n", err)
os.Exit(1)
}
if len(datasets) == 0 {
fmt.Printf("No datasets found in namespace '%s'\n", *namespace)
return
}
fmt.Printf("Datasets in namespace '%s':\n\n", *namespace)
for _, ds := range datasets {
name, _ := ds["name"].(string)
dsType, _ := ds["type"].(string)
fmt.Printf(" • %s [%s]\n", name, dsType)
}
}
func handleJobCommand() {
if len(os.Args) < 3 {
fmt.Println("Usage: marquez-cli job [register|get|runs] [options]")
os.Exit(1)
}
action := os.Args[2]
switch action {
case "register":
registerJob()
case "get":
getJobs()
case "runs":
getJobRuns()
default:
fmt.Fprintf(os.Stderr, "Unknown job action: %s\n", action)
os.Exit(1)
}
}
func registerJob() {
jobCmd := flag.NewFlagSet("job register", flag.ExitOnError)
marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
name := jobCmd.String("name", "", "Job name (required)")
runID := jobCmd.String("run-id", "", "Run ID (auto-generated if not provided)")
jobCmd.Parse(os.Args[3:])
if *name == "" {
fmt.Fprintln(os.Stderr, "Error: -name is required")
os.Exit(1)
}
if *runID == "" {
*runID = generateUUID()
}
client := NewMarquezClient(*marquez)
event := &OpenLineageEvent{
EventType: EventTypeStart,
EventTime: time.Now().UTC().Format(time.RFC3339Nano),
Producer: defaultProducer,
Job: Job{
Namespace: *namespace,
Name: *name,
},
Run: Run{
RunID: *runID,
},
}
if err := client.SendEvent(event); err != nil {
fmt.Fprintf(os.Stderr, "Error registering job: %v\n", err)
os.Exit(1)
}
fmt.Printf("✓ Job registered successfully\n")
fmt.Printf(" Namespace: %s\n", *namespace)
fmt.Printf(" Name: %s\n", *name)
fmt.Printf(" Run ID: %s\n", *runID)
}
func getJobs() {
jobCmd := flag.NewFlagSet("job get", flag.ExitOnError)
marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
jobCmd.Parse(os.Args[3:])
client := NewMarquezClient(*marquez)
jobs, err := client.GetJobs(*namespace)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting jobs: %v\n", err)
os.Exit(1)
}
if len(jobs) == 0 {
fmt.Printf("No jobs found in namespace '%s'\n", *namespace)
return
}
fmt.Printf("Jobs in namespace '%s':\n\n", *namespace)
for _, job := range jobs {
name, _ := job["name"].(string)
jobType, _ := job["type"].(string)
fmt.Printf(" • %s [%s]\n", name, jobType)
}
}
func getJobRuns() {
jobCmd := flag.NewFlagSet("job runs", flag.ExitOnError)
marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
name := jobCmd.String("name", "", "Job name (required)")
jobCmd.Parse(os.Args[3:])
if *name == "" {
fmt.Fprintln(os.Stderr, "Error: -name is required")
os.Exit(1)
}
client := NewMarquezClient(*marquez)
runs, err := client.GetJobRuns(*namespace, *name)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting job runs: %v\n", err)
os.Exit(1)
}
if len(runs) == 0 {
fmt.Printf("No runs found for job '%s/%s'\n", *namespace, *name)
return
}
fmt.Printf("Runs for job '%s/%s':\n\n", *namespace, *name)
for _, run := range runs {
runID, _ := run["id"].(string)
state, _ := run["state"].(string)
createdAt, _ := run["createdAt"].(string)
fmt.Printf(" • %s [%s] - %s\n", runID, state, createdAt)
}
}
func handleLineageCommand() {
lineageCmd := flag.NewFlagSet("lineage", flag.ExitOnError)
marquez := lineageCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := lineageCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace")
name := lineageCmd.String("name", "", "Dataset name (required)")
depth := lineageCmd.Int("depth", 10, "Lineage depth")
format := lineageCmd.String("format", "text", "Output format (text|json)")
lineageCmd.Parse(os.Args[2:])
if *name == "" {
fmt.Fprintln(os.Stderr, "Error: -name is required")
os.Exit(1)
}
client := NewMarquezClient(*marquez)
lineage, err := client.GetLineage(*namespace, *name, *depth)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting lineage: %v\n", err)
os.Exit(1)
}
if *format == "json" {
jsonData, _ := json.MarshalIndent(lineage, "", " ")
fmt.Println(string(jsonData))
} else {
printLineageText(lineage, *namespace, *name)
}
}
func handleListCommand() {
if len(os.Args) < 3 {
fmt.Println("Usage: marquez-cli list [namespaces|jobs|datasets] [options]")
os.Exit(1)
}
resource := os.Args[2]
listCmd := flag.NewFlagSet("list", flag.ExitOnError)
marquez := listCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
namespace := listCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Namespace")
listCmd.Parse(os.Args[3:])
client := NewMarquezClient(*marquez)
switch resource {
case "namespaces":
namespaces, err := client.GetNamespaces()
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting namespaces: %v\n", err)
os.Exit(1)
}
fmt.Println("Namespaces:")
for _, ns := range namespaces {
name, _ := ns["name"].(string)
fmt.Printf(" • %s\n", name)
}
case "jobs":
jobs, err := client.GetJobs(*namespace)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting jobs: %v\n", err)
os.Exit(1)
}
fmt.Printf("Jobs in namespace '%s':\n", *namespace)
for _, job := range jobs {
name, _ := job["name"].(string)
fmt.Printf(" • %s\n", name)
}
case "datasets":
datasets, err := client.GetDatasets(*namespace)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting datasets: %v\n", err)
os.Exit(1)
}
fmt.Printf("Datasets in namespace '%s':\n", *namespace)
for _, ds := range datasets {
name, _ := ds["name"].(string)
fmt.Printf(" • %s\n", name)
}
default:
fmt.Fprintf(os.Stderr, "Unknown resource: %s\n", resource)
os.Exit(1)
}
}
func printLineageText(lineage map[string]interface{}, namespace, datasetName string) {
fmt.Printf("Lineage for dataset '%s/%s':\n\n", namespace, datasetName)
graph, ok := lineage["graph"].([]interface{})
if !ok || len(graph) == 0 {
fmt.Println("No lineage information found")
return
}
datasets := make(map[string]bool)
jobs := make(map[string]map[string]interface{})
for _, node := range graph {
nodeMap, ok := node.(map[string]interface{})
if !ok {
continue
}
nodeType, _ := nodeMap["type"].(string)
nodeID, _ := nodeMap["id"].(string)
if nodeType == "DATASET" {
datasets[nodeID] = true
} else if nodeType == "JOB" {
jobs[nodeID] = nodeMap
}
}
fmt.Printf("📦 Datasets (%d):\n", len(datasets))
for ds := range datasets {
fmt.Printf(" • %s\n", ds)
}
fmt.Printf("\n⚙️ Jobs (%d):\n", len(jobs))
for jobName, jobData := range jobs {
fmt.Printf(" • %s\n", jobName)
// Show inputs
if inEdges, ok := jobData["inEdges"].([]interface{}); ok && len(inEdges) > 0 {
fmt.Printf(" ← Inputs:\n")
for _, edge := range inEdges {
if edgeMap, ok := edge.(map[string]interface{}); ok {
origin, _ := edgeMap["origin"].(string)
fmt.Printf(" - %s\n", origin)
}
}
}
// Show outputs
if outEdges, ok := jobData["outEdges"].([]interface{}); ok && len(outEdges) > 0 {
fmt.Printf(" → Outputs:\n")
for _, edge := range outEdges {
if edgeMap, ok := edge.(map[string]interface{}); ok {
destination, _ := edgeMap["destination"].(string)
fmt.Printf(" - %s\n", destination)
}
}
}
fmt.Println()
}
}
func printUsage() {
usage := `marquez-cli - OpenLineage/Marquez CLI tool
USAGE:
marquez-cli <command> [subcommand] [options]
COMMANDS:
run Manage job runs
start Start a new run
complete Mark run as complete
fail Mark run as failed
running Mark run as running
dataset Manage datasets
register Register a new dataset
get List datasets in namespace
job Manage jobs
register Register a new job
get List jobs in namespace
runs Get runs for a specific job
lineage Get lineage information for a dataset
list List resources (namespaces|jobs|datasets)
version Show version
help Show this help
EXAMPLES:
# Start a run
marquez-cli run start -job my_pipeline -inputs "api://source" -outputs "postgres://table"
# Complete a run
marquez-cli run complete -job my_pipeline -run-id <uuid> -outputs "postgres://table"
# Fail a run
marquez-cli run fail -job my_pipeline -run-id <uuid>
# Register a dataset
marquez-cli dataset register -name "postgres://localhost:5434/postgres/public/events"
# Get lineage
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
# List all jobs
marquez-cli list jobs
# Get job runs
marquez-cli job runs -name my_pipeline
ENVIRONMENT VARIABLES:
MARQUEZ_URL Marquez API URL (default: http://localhost:5000)
MARQUEZ_NAMESPACE Default namespace (default: automatic-process)
For more information, visit: https://openlineage.io/
`
fmt.Print(usage)
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
BIN
View File
Binary file not shown.
+230
View File
@@ -0,0 +1,230 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// OpenLineage Event Types
const (
EventTypeStart = "START"
EventTypeRunning = "RUNNING"
EventTypeComplete = "COMPLETE"
EventTypeFail = "FAIL"
EventTypeAbort = "ABORT"
)
// Dataset represents an OpenLineage dataset
type Dataset struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Facets map[string]interface{} `json:"facets,omitempty"`
}
// Job represents an OpenLineage job
type Job struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Facets map[string]interface{} `json:"facets,omitempty"`
}
// Run represents an OpenLineage run
type Run struct {
RunID string `json:"runId"`
Facets map[string]interface{} `json:"facets,omitempty"`
}
// OpenLineageEvent represents a complete OpenLineage event
type OpenLineageEvent struct {
EventType string `json:"eventType"`
EventTime string `json:"eventTime"`
Producer string `json:"producer"`
SchemaURL string `json:"schemaURL,omitempty"`
Job Job `json:"job"`
Run Run `json:"run"`
Inputs []Dataset `json:"inputs,omitempty"`
Outputs []Dataset `json:"outputs,omitempty"`
}
// MarquezClient handles communication with Marquez API
type MarquezClient struct {
BaseURL string
HTTPClient *http.Client
}
// NewMarquezClient creates a new Marquez API client
func NewMarquezClient(baseURL string) *MarquezClient {
return &MarquezClient{
BaseURL: baseURL,
HTTPClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// SendEvent sends an OpenLineage event to Marquez
func (c *MarquezClient) SendEvent(event *OpenLineageEvent) error {
// Set default schema URL if not provided
if event.SchemaURL == "" {
event.SchemaURL = "https://openlineage.io/spec/1-0-5/OpenLineage.json"
}
// Set event time if not provided
if event.EventTime == "" {
event.EventTime = time.Now().UTC().Format(time.RFC3339Nano)
}
jsonData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
url := fmt.Sprintf("%s/api/v1/lineage", c.BaseURL)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
}
return nil
}
// GetLineage retrieves lineage information for a dataset
func (c *MarquezClient) GetLineage(namespace, datasetName string, depth int) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/api/v1/lineage?nodeId=dataset:%s:%s&depth=%d",
c.BaseURL, namespace, datasetName, depth)
resp, err := c.HTTPClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to get lineage: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
}
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return result, nil
}
// GetNamespaces retrieves all namespaces from Marquez
func (c *MarquezClient) GetNamespaces() ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/api/v1/namespaces", c.BaseURL)
resp, err := c.HTTPClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to get namespaces: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
}
var result struct {
Namespaces []map[string]interface{} `json:"namespaces"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return result.Namespaces, nil
}
// GetJobs retrieves all jobs in a namespace
func (c *MarquezClient) GetJobs(namespace string) ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/api/v1/namespaces/%s/jobs", c.BaseURL, namespace)
resp, err := c.HTTPClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to get jobs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
}
var result struct {
Jobs []map[string]interface{} `json:"jobs"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return result.Jobs, nil
}
// GetDatasets retrieves all datasets in a namespace
func (c *MarquezClient) GetDatasets(namespace string) ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/api/v1/namespaces/%s/datasets", c.BaseURL, namespace)
resp, err := c.HTTPClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to get datasets: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
}
var result struct {
Datasets []map[string]interface{} `json:"datasets"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return result.Datasets, nil
}
// GetJobRuns retrieves runs for a specific job
func (c *MarquezClient) GetJobRuns(namespace, jobName string) ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/api/v1/namespaces/%s/jobs/%s/runs", c.BaseURL, namespace, jobName)
resp, err := c.HTTPClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to get job runs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
}
var result struct {
Runs []map[string]interface{} `json:"runs"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return result.Runs, nil
}