5f3bc84696
Añadido binario CLI en Go para gestionar datasets, jobs y runs en Marquez. Características: - Enviar eventos OpenLineage (START, RUNNING, COMPLETE, FAIL) - Registrar y consultar datasets - Registrar y consultar jobs y runs - Consultar lineage de datasets con formato texto/JSON - Listar recursos (namespaces, jobs, datasets) - Sin dependencias externas (solo Go stdlib) - Binario estático compilado de ~5MB Archivos: - tools/marquez-cli/main.go: CLI principal con comandos - tools/marquez-cli/openlineage.go: Cliente HTTP y estructuras OpenLineage - tools/marquez-cli/go.mod: Módulo de Go - tools/marquez-cli/Makefile: Build automation - tools/marquez-cli/README.md: Documentación completa - tools/marquez-cli/QUICKSTART.md: Guía rápida de uso Instalación: make install en ~/.local/bin/marquez-cli
marquez-cli
OpenLineage/Marquez CLI tool para gestionar datasets, jobs y runs con lineage tracking completo.
Binario escrito en Go sin dependencias externas, listo para usar en pipelines de Dagu y scripts bash.
🎯 Características
- ✅ Registrar datasets en Marquez
- ✅ Registrar jobs y sus runs
- ✅ Enviar eventos OpenLineage (START, RUNNING, COMPLETE, FAIL)
- ✅ Consultar lineage de datasets
- ✅ Listar namespaces, jobs, datasets y runs
- ✅ Sin dependencias externas (solo Go stdlib)
- ✅ Binario estático compilado (~5MB)
📦 Instalación
Desde el Código Fuente
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
# Compilar
make build
# Instalar en ~/.local/bin
make install
# Verificar instalación
marquez-cli version
Variables de Entorno (Opcional)
export MARQUEZ_URL="http://localhost:5000"
export MARQUEZ_NAMESPACE="automatic-process"
🚀 Uso
1. Gestión de Runs
Iniciar un Run
# Sintaxis básica
marquez-cli run start -job my_pipeline
# Con inputs y outputs
marquez-cli run start \
-job fetch_api_data \
-inputs "api://jsonplaceholder.typicode.com/users" \
-outputs "file:///tmp/users.json"
# Con run-id específico (para continuar después)
marquez-cli run start \
-job my_pipeline \
-run-id "abc123-uuid-here" \
-inputs "file:///tmp/raw.json"
Salida:
✓ Run event sent successfully
Event Type: START
Job: automatic-process/fetch_api_data
Run ID: 3c7a4f21-1234-5678-90ab-cdef12345678
Inputs: 1 dataset(s)
Outputs: 1 dataset(s)
Marcar Run como RUNNING (Progreso)
marquez-cli run running \
-job my_pipeline \
-run-id "abc123-uuid-here" \
-inputs "file:///tmp/raw.json" \
-outputs "file:///tmp/processed.json"
Completar un Run Exitosamente
marquez-cli run complete \
-job my_pipeline \
-run-id "abc123-uuid-here" \
-inputs "file:///tmp/raw.json" \
-outputs "postgres://localhost:5434/postgres/public/events"
Marcar Run como Fallido
marquez-cli run fail \
-job my_pipeline \
-run-id "abc123-uuid-here"
2. Gestión de Datasets
Registrar un Dataset
# PostgreSQL table
marquez-cli dataset register \
-name "postgres://localhost:5434/postgres/public/events"
# ClickHouse table
marquez-cli dataset register \
-name "clickhouse://localhost:8123/default/analytics"
# NATS stream
marquez-cli dataset register \
-name "nats://localhost:4222/data.raw"
# Archivo
marquez-cli dataset register \
-name "file:///tmp/data.json"
# API endpoint
marquez-cli dataset register \
-name "api://example.com/users"
Listar Datasets
# En el namespace por defecto (automatic-process)
marquez-cli dataset get
# En un namespace específico
marquez-cli dataset get -namespace my-namespace
Salida:
Datasets in namespace 'automatic-process':
• postgres://localhost:5434/postgres/public/events [DB_TABLE]
• file:///tmp/users.json [FILE]
• api://example.com/users [API]
3. Gestión de Jobs
Registrar un Job
marquez-cli job register -name my_pipeline
Listar Jobs
# En el namespace por defecto
marquez-cli job get
# En un namespace específico
marquez-cli job get -namespace my-namespace
Ver Runs de un Job
marquez-cli job runs -name my_pipeline
Salida:
Runs for job 'automatic-process/my_pipeline':
• 3c7a4f21-1234-5678-90ab-cdef12345678 [COMPLETED] - 2026-03-23T10:30:00.000Z
• 7b8c9d0e-5678-1234-90ab-cdef12345678 [FAILED] - 2026-03-23T09:15:00.000Z
• 1a2b3c4d-9012-3456-78ab-cdef12345678 [RUNNING] - 2026-03-23T11:00:00.000Z
4. Consultar Lineage
Obtener Lineage de un Dataset
# Formato texto (legible)
marquez-cli lineage \
-name "postgres://localhost:5434/postgres/public/events"
# Formato JSON (para scripts)
marquez-cli lineage \
-name "postgres://localhost:5434/postgres/public/events" \
-format json
# Con profundidad personalizada
marquez-cli lineage \
-name "postgres://localhost:5434/postgres/public/events" \
-depth 20
Salida (formato texto):
Lineage for dataset 'automatic-process/postgres://localhost:5434/postgres/public/events':
📦 Datasets (4):
• api://jsonplaceholder.typicode.com/users
• file:///tmp/users.json
• file:///tmp/emails.json
• postgres://localhost:5434/postgres/public/user_emails
⚙️ Jobs (3):
• fetch_api
← Inputs:
- api://jsonplaceholder.typicode.com/users
→ Outputs:
- file:///tmp/users.json
• transform
← Inputs:
- file:///tmp/users.json
→ Outputs:
- file:///tmp/emails.json
• ingest_postgres
← Inputs:
- file:///tmp/emails.json
→ Outputs:
- postgres://localhost:5434/postgres/public/user_emails
5. Listar Recursos
Listar Namespaces
marquez-cli list namespaces
Listar Jobs
marquez-cli list jobs
marquez-cli list jobs -namespace my-namespace
Listar Datasets
marquez-cli list datasets
marquez-cli list datasets -namespace my-namespace
🔧 Integración con Dagu
Ejemplo: DAG con Lineage Tracking
# ~/dagu/dags/example_with_lineage.yaml
name: example_with_lineage
description: Pipeline con lineage tracking usando marquez-cli
schedule:
- "0 */6 * * *"
env:
- RUN_ID: "" # Se genera dinámicamente
steps:
# PASO 1: Generar Run ID único
- name: generate_run_id
command: echo "RUN_ID=$(uuidgen)" >> $DAGU_ENV
output: RUN_ID
# PASO 2: START event
- name: start_run
command: |
marquez-cli run start \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "api://jsonplaceholder.typicode.com/users"
depends: [generate_run_id]
# PASO 3: Fetch data
- name: fetch_data
command: |
curl -s https://jsonplaceholder.typicode.com/users > /tmp/users.json
marquez-cli run running \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "api://jsonplaceholder.typicode.com/users" \
-outputs "file:///tmp/users.json"
depends: [start_run]
# PASO 4: Transform data
- name: transform_data
command: |
jq '[.[] | {email: .email, name: .name}]' /tmp/users.json > /tmp/emails.json
marquez-cli run running \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "file:///tmp/users.json" \
-outputs "file:///tmp/emails.json"
depends: [fetch_data]
# PASO 5: Load to PostgreSQL
- name: load_postgres
command: |
psql -h localhost -p 5434 -U postgres -d postgres -c \
"CREATE TABLE IF NOT EXISTS user_emails (email TEXT, name TEXT);"
cat /tmp/emails.json | jq -r '.[] | [.email, .name] | @csv' | \
psql -h localhost -p 5434 -U postgres -d postgres -c \
"COPY user_emails FROM STDIN WITH CSV;"
marquez-cli run running \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "file:///tmp/emails.json" \
-outputs "postgres://localhost:5434/postgres/public/user_emails"
depends: [transform_data]
# PASO 6: COMPLETE event
- name: complete_run
command: |
marquez-cli run complete \
-job example_with_lineage \
-run-id $RUN_ID \
-inputs "api://jsonplaceholder.typicode.com/users" \
-outputs "postgres://localhost:5434/postgres/public/user_emails"
depends: [load_postgres]
handlers:
failure:
- name: fail_run
command: |
marquez-cli run fail \
-job example_with_lineage \
-run-id $RUN_ID
🔁 Workflow Típico
1. Pipeline Simple (START → COMPLETE)
#!/bin/bash
# Script: ~/dagu/scripts/simple_pipeline.sh
NAMESPACE="automatic-process"
JOB_NAME="simple_pipeline"
RUN_ID=$(marquez-cli run start -job $JOB_NAME -inputs "api://source" | grep "Run ID" | awk '{print $NF}')
echo "Started run: $RUN_ID"
# Hacer el trabajo
curl -s https://api.example.com/data > /tmp/data.json
# Completar
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://source" \
-outputs "file:///tmp/data.json"
echo "Run completed: $RUN_ID"
2. Pipeline con Manejo de Errores
#!/bin/bash
# Script: ~/dagu/scripts/pipeline_with_error_handling.sh
set -euo pipefail
JOB_NAME="pipeline_with_errors"
RUN_ID=$(uuidgen)
# Función de cleanup en caso de error
cleanup() {
marquez-cli run fail -job $JOB_NAME -run-id $RUN_ID
echo "Pipeline failed, run marked as FAILED"
}
trap cleanup ERR
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# Trabajo
echo "Processing..."
# ... tu lógica aquí ...
# COMPLETE (solo si todo fue exitoso)
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-outputs "postgres://table"
echo "Pipeline completed successfully"
3. Pipeline Multi-Paso
#!/bin/bash
JOB_NAME="multi_step_pipeline"
RUN_ID=$(uuidgen)
# START
marquez-cli run start -job $JOB_NAME -run-id $RUN_ID
# PASO 1: Extract
echo "Extracting..."
curl -s https://api.example.com/data > /tmp/raw.json
marquez-cli run running \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "api://example.com/data" \
-outputs "file:///tmp/raw.json"
# PASO 2: Transform
echo "Transforming..."
jq '.data' /tmp/raw.json > /tmp/clean.json
marquez-cli run running \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "file:///tmp/raw.json" \
-outputs "file:///tmp/clean.json"
# PASO 3: Load
echo "Loading..."
psql -h localhost -p 5434 -U postgres -d postgres \
-c "COPY events FROM '/tmp/clean.json';"
marquez-cli run complete \
-job $JOB_NAME \
-run-id $RUN_ID \
-inputs "file:///tmp/clean.json" \
-outputs "postgres://localhost:5434/postgres/public/events"
echo "Pipeline completed"
📋 Convenciones de Naming
Dataset URIs
Usa siempre URIs descriptivos:
| Tipo | Formato | Ejemplo |
|---|---|---|
| PostgreSQL | postgres://host:port/db/schema/table |
postgres://localhost:5434/postgres/public/events |
| ClickHouse | clickhouse://host:port/database/table |
clickhouse://localhost:8123/default/analytics |
| NATS | nats://host:port/subject |
nats://localhost:4222/data.raw |
| Archivo | file:///absolute/path |
file:///tmp/data.json |
| API | api://domain/endpoint |
api://example.com/users |
| S3 | s3://bucket/key |
s3://my-bucket/data/file.parquet |
Namespaces
- Usa el namespace
automatic-processpara todos tus pipelines - Puedes crear namespaces adicionales para proyectos específicos
Job Names
- Usa nombres descriptivos:
fetch_api_data,transform_sales,load_warehouse - Evita guiones (
-), usa guiones bajos (_) - Mantén consistencia con los nombres de DAGs en Dagu
🔍 Verificar Lineage
Después de ejecutar tu pipeline, verifica el lineage:
# 1. Ver jobs ejecutados
marquez-cli list jobs
# 2. Ver runs de tu job
marquez-cli job runs -name my_pipeline
# 3. Ver datasets creados
marquez-cli list datasets
# 4. Ver lineage completo de un dataset
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
También puedes usar la Web UI de Marquez: http://localhost:3001
🛠️ Desarrollo
Compilar desde el Código Fuente
# Clonar el proyecto
cd ~/AutomaticProyects/automatic_process/tools/marquez-cli
# Compilar
make build
# Ejecutar sin instalar
./marquez-cli help
# Instalar
make install
# Limpiar binarios
make clean
# Desinstalar
make uninstall
Estructura del Proyecto
marquez-cli/
├── main.go # CLI principal con comandos
├── openlineage.go # Cliente HTTP y estructuras OpenLineage
├── go.mod # Módulo de Go
├── Makefile # Build automation
└── README.md # Documentación
📊 API de Marquez Utilizada
El CLI interactúa con estos endpoints de Marquez:
| Endpoint | Método | Uso |
|---|---|---|
/api/v1/lineage |
POST | Enviar eventos OpenLineage |
/api/v1/lineage |
GET | Obtener lineage de dataset |
/api/v1/namespaces |
GET | Listar namespaces |
/api/v1/namespaces/{ns}/jobs |
GET | Listar jobs |
/api/v1/namespaces/{ns}/datasets |
GET | Listar datasets |
/api/v1/namespaces/{ns}/jobs/{job}/runs |
GET | Listar runs de job |
Documentación completa: https://marquezproject.github.io/marquez/openapi.html
🎯 Checklist de Lineage
Usa esta checklist en cada pipeline:
- START event al inicio del pipeline
- RUNNING events en cada transformación intermedia
- COMPLETE event al finalizar exitosamente
- FAIL event si hay errores (handler)
- Declarar TODOS los inputs (APIs, archivos, tablas)
- Declarar TODOS los outputs (archivos, streams, tablas)
- Usar URIs bien formados para datasets
- Usar mismo namespace (
automatic-process) - Usar mismo run-id en todos los eventos del mismo run
- Verificar lineage en Marquez Web UI
🐛 Troubleshooting
Error: "connection refused"
# Verificar que Marquez esté corriendo
docker ps | grep marquez
# Iniciar Marquez si no está corriendo
docker-compose -f docker-compose-marquez.yml up -d
# Probar conexión
curl http://localhost:5000/api/v1/namespaces
Error: "API error (status 400)"
Revisa que:
- El namespace existe
- Los URIs de datasets estén bien formados
- El run-id sea un UUID válido
Run-id no es consistente entre pasos
Usa una variable de entorno o archivo temporal:
# Opción 1: Variable de entorno
export RUN_ID=$(uuidgen)
marquez-cli run start -job my_job -run-id $RUN_ID
marquez-cli run complete -job my_job -run-id $RUN_ID
# Opción 2: Archivo temporal
uuidgen > /tmp/run_id.txt
RUN_ID=$(cat /tmp/run_id.txt)
marquez-cli run start -job my_job -run-id $RUN_ID
📚 Recursos
- OpenLineage Spec: https://openlineage.io/
- Marquez Docs: https://marquezproject.ai/
- Marquez Web UI: http://localhost:3001
- Marquez API: http://localhost:5000/api/v1
- Dagu Docs: https://dagu.sh/
📝 Licencia
MIT License - Proyecto Automatic Process
Última actualización: 2026-03-23 Versión: 1.0.0 Autor: Lucas (@egutierrez)