5f3bc84696
Añadido binario CLI en Go para gestionar datasets, jobs y runs en Marquez. Características: - Enviar eventos OpenLineage (START, RUNNING, COMPLETE, FAIL) - Registrar y consultar datasets - Registrar y consultar jobs y runs - Consultar lineage de datasets con formato texto/JSON - Listar recursos (namespaces, jobs, datasets) - Sin dependencias externas (solo Go stdlib) - Binario estático compilado de ~5MB Archivos: - tools/marquez-cli/main.go: CLI principal con comandos - tools/marquez-cli/openlineage.go: Cliente HTTP y estructuras OpenLineage - tools/marquez-cli/go.mod: Módulo de Go - tools/marquez-cli/Makefile: Build automation - tools/marquez-cli/README.md: Documentación completa - tools/marquez-cli/QUICKSTART.md: Guía rápida de uso Instalación: make install en ~/.local/bin/marquez-cli
605 lines
15 KiB
Go
605 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
version = "1.0.0"
|
|
defaultMarquezURL = "http://localhost:5000"
|
|
defaultNamespace = "automatic-process"
|
|
defaultProducer = "marquez-cli"
|
|
)
|
|
|
|
// generateUUID generates a simple UUID v4
|
|
func generateUUID() string {
|
|
b := make([]byte, 16)
|
|
rand.Read(b)
|
|
// Set version (4) and variant bits
|
|
b[6] = (b[6] & 0x0f) | 0x40
|
|
b[8] = (b[8] & 0x3f) | 0x80
|
|
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
|
|
}
|
|
|
|
func main() {
|
|
if len(os.Args) < 2 {
|
|
printUsage()
|
|
os.Exit(1)
|
|
}
|
|
|
|
command := os.Args[1]
|
|
|
|
switch command {
|
|
case "run":
|
|
handleRunCommand()
|
|
case "dataset":
|
|
handleDatasetCommand()
|
|
case "job":
|
|
handleJobCommand()
|
|
case "lineage":
|
|
handleLineageCommand()
|
|
case "list":
|
|
handleListCommand()
|
|
case "version":
|
|
fmt.Printf("marquez-cli version %s\n", version)
|
|
case "help", "-h", "--help":
|
|
printUsage()
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "Unknown command: %s\n\n", command)
|
|
printUsage()
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func handleRunCommand() {
|
|
if len(os.Args) < 3 {
|
|
fmt.Println("Usage: marquez-cli run [start|complete|fail|running] [options]")
|
|
os.Exit(1)
|
|
}
|
|
|
|
eventType := strings.ToUpper(os.Args[2])
|
|
runCmd := flag.NewFlagSet("run", flag.ExitOnError)
|
|
|
|
marquez := runCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := runCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
|
|
jobName := runCmd.String("job", "", "Job name (required)")
|
|
runID := runCmd.String("run-id", "", "Run ID (auto-generated if not provided)")
|
|
producer := runCmd.String("producer", defaultProducer, "Producer URI")
|
|
inputs := runCmd.String("inputs", "", "Comma-separated list of input datasets")
|
|
outputs := runCmd.String("outputs", "", "Comma-separated list of output datasets")
|
|
eventTime := runCmd.String("event-time", "", "Event time (ISO 8601, defaults to now)")
|
|
|
|
runCmd.Parse(os.Args[3:])
|
|
|
|
if *jobName == "" {
|
|
fmt.Fprintln(os.Stderr, "Error: -job is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Generate run ID if not provided
|
|
if *runID == "" {
|
|
*runID = generateUUID()
|
|
}
|
|
|
|
// Set event time to now if not provided
|
|
if *eventTime == "" {
|
|
*eventTime = time.Now().UTC().Format(time.RFC3339Nano)
|
|
}
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
// Parse input datasets
|
|
var inputDatasets []Dataset
|
|
if *inputs != "" {
|
|
for _, input := range strings.Split(*inputs, ",") {
|
|
input = strings.TrimSpace(input)
|
|
if input != "" {
|
|
inputDatasets = append(inputDatasets, Dataset{
|
|
Namespace: *namespace,
|
|
Name: input,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parse output datasets
|
|
var outputDatasets []Dataset
|
|
if *outputs != "" {
|
|
for _, output := range strings.Split(*outputs, ",") {
|
|
output = strings.TrimSpace(output)
|
|
if output != "" {
|
|
outputDatasets = append(outputDatasets, Dataset{
|
|
Namespace: *namespace,
|
|
Name: output,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
event := &OpenLineageEvent{
|
|
EventType: eventType,
|
|
EventTime: *eventTime,
|
|
Producer: *producer,
|
|
Job: Job{
|
|
Namespace: *namespace,
|
|
Name: *jobName,
|
|
},
|
|
Run: Run{
|
|
RunID: *runID,
|
|
},
|
|
Inputs: inputDatasets,
|
|
Outputs: outputDatasets,
|
|
}
|
|
|
|
if err := client.SendEvent(event); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error sending event: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
fmt.Printf("✓ Run event sent successfully\n")
|
|
fmt.Printf(" Event Type: %s\n", eventType)
|
|
fmt.Printf(" Job: %s/%s\n", *namespace, *jobName)
|
|
fmt.Printf(" Run ID: %s\n", *runID)
|
|
if len(inputDatasets) > 0 {
|
|
fmt.Printf(" Inputs: %d dataset(s)\n", len(inputDatasets))
|
|
}
|
|
if len(outputDatasets) > 0 {
|
|
fmt.Printf(" Outputs: %d dataset(s)\n", len(outputDatasets))
|
|
}
|
|
}
|
|
|
|
func handleDatasetCommand() {
|
|
if len(os.Args) < 3 {
|
|
fmt.Println("Usage: marquez-cli dataset [register|get] [options]")
|
|
os.Exit(1)
|
|
}
|
|
|
|
action := os.Args[2]
|
|
|
|
switch action {
|
|
case "register":
|
|
registerDataset()
|
|
case "get":
|
|
getDataset()
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "Unknown dataset action: %s\n", action)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func registerDataset() {
|
|
dsCmd := flag.NewFlagSet("dataset register", flag.ExitOnError)
|
|
|
|
marquez := dsCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := dsCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace")
|
|
name := dsCmd.String("name", "", "Dataset name (required, e.g., 'postgres://table' or 'file:///path')")
|
|
jobName := dsCmd.String("job", "dataset-registration", "Job name that creates this dataset")
|
|
runID := dsCmd.String("run-id", "", "Run ID (auto-generated if not provided)")
|
|
|
|
dsCmd.Parse(os.Args[3:])
|
|
|
|
if *name == "" {
|
|
fmt.Fprintln(os.Stderr, "Error: -name is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if *runID == "" {
|
|
*runID = generateUUID()
|
|
}
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
// Create a simple event to register the dataset
|
|
event := &OpenLineageEvent{
|
|
EventType: EventTypeComplete,
|
|
EventTime: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Producer: defaultProducer,
|
|
Job: Job{
|
|
Namespace: *namespace,
|
|
Name: *jobName,
|
|
},
|
|
Run: Run{
|
|
RunID: *runID,
|
|
},
|
|
Outputs: []Dataset{
|
|
{
|
|
Namespace: *namespace,
|
|
Name: *name,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := client.SendEvent(event); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error registering dataset: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
fmt.Printf("✓ Dataset registered successfully\n")
|
|
fmt.Printf(" Namespace: %s\n", *namespace)
|
|
fmt.Printf(" Name: %s\n", *name)
|
|
}
|
|
|
|
func getDataset() {
|
|
dsCmd := flag.NewFlagSet("dataset get", flag.ExitOnError)
|
|
|
|
marquez := dsCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := dsCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace")
|
|
|
|
dsCmd.Parse(os.Args[3:])
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
datasets, err := client.GetDatasets(*namespace)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting datasets: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if len(datasets) == 0 {
|
|
fmt.Printf("No datasets found in namespace '%s'\n", *namespace)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("Datasets in namespace '%s':\n\n", *namespace)
|
|
for _, ds := range datasets {
|
|
name, _ := ds["name"].(string)
|
|
dsType, _ := ds["type"].(string)
|
|
fmt.Printf(" • %s [%s]\n", name, dsType)
|
|
}
|
|
}
|
|
|
|
func handleJobCommand() {
|
|
if len(os.Args) < 3 {
|
|
fmt.Println("Usage: marquez-cli job [register|get|runs] [options]")
|
|
os.Exit(1)
|
|
}
|
|
|
|
action := os.Args[2]
|
|
|
|
switch action {
|
|
case "register":
|
|
registerJob()
|
|
case "get":
|
|
getJobs()
|
|
case "runs":
|
|
getJobRuns()
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "Unknown job action: %s\n", action)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func registerJob() {
|
|
jobCmd := flag.NewFlagSet("job register", flag.ExitOnError)
|
|
|
|
marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
|
|
name := jobCmd.String("name", "", "Job name (required)")
|
|
runID := jobCmd.String("run-id", "", "Run ID (auto-generated if not provided)")
|
|
|
|
jobCmd.Parse(os.Args[3:])
|
|
|
|
if *name == "" {
|
|
fmt.Fprintln(os.Stderr, "Error: -name is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if *runID == "" {
|
|
*runID = generateUUID()
|
|
}
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
event := &OpenLineageEvent{
|
|
EventType: EventTypeStart,
|
|
EventTime: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Producer: defaultProducer,
|
|
Job: Job{
|
|
Namespace: *namespace,
|
|
Name: *name,
|
|
},
|
|
Run: Run{
|
|
RunID: *runID,
|
|
},
|
|
}
|
|
|
|
if err := client.SendEvent(event); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error registering job: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
fmt.Printf("✓ Job registered successfully\n")
|
|
fmt.Printf(" Namespace: %s\n", *namespace)
|
|
fmt.Printf(" Name: %s\n", *name)
|
|
fmt.Printf(" Run ID: %s\n", *runID)
|
|
}
|
|
|
|
func getJobs() {
|
|
jobCmd := flag.NewFlagSet("job get", flag.ExitOnError)
|
|
|
|
marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
|
|
|
|
jobCmd.Parse(os.Args[3:])
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
jobs, err := client.GetJobs(*namespace)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting jobs: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if len(jobs) == 0 {
|
|
fmt.Printf("No jobs found in namespace '%s'\n", *namespace)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("Jobs in namespace '%s':\n\n", *namespace)
|
|
for _, job := range jobs {
|
|
name, _ := job["name"].(string)
|
|
jobType, _ := job["type"].(string)
|
|
fmt.Printf(" • %s [%s]\n", name, jobType)
|
|
}
|
|
}
|
|
|
|
func getJobRuns() {
|
|
jobCmd := flag.NewFlagSet("job runs", flag.ExitOnError)
|
|
|
|
marquez := jobCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := jobCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Job namespace")
|
|
name := jobCmd.String("name", "", "Job name (required)")
|
|
|
|
jobCmd.Parse(os.Args[3:])
|
|
|
|
if *name == "" {
|
|
fmt.Fprintln(os.Stderr, "Error: -name is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
runs, err := client.GetJobRuns(*namespace, *name)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting job runs: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if len(runs) == 0 {
|
|
fmt.Printf("No runs found for job '%s/%s'\n", *namespace, *name)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("Runs for job '%s/%s':\n\n", *namespace, *name)
|
|
for _, run := range runs {
|
|
runID, _ := run["id"].(string)
|
|
state, _ := run["state"].(string)
|
|
createdAt, _ := run["createdAt"].(string)
|
|
fmt.Printf(" • %s [%s] - %s\n", runID, state, createdAt)
|
|
}
|
|
}
|
|
|
|
func handleLineageCommand() {
|
|
lineageCmd := flag.NewFlagSet("lineage", flag.ExitOnError)
|
|
|
|
marquez := lineageCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := lineageCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Dataset namespace")
|
|
name := lineageCmd.String("name", "", "Dataset name (required)")
|
|
depth := lineageCmd.Int("depth", 10, "Lineage depth")
|
|
format := lineageCmd.String("format", "text", "Output format (text|json)")
|
|
|
|
lineageCmd.Parse(os.Args[2:])
|
|
|
|
if *name == "" {
|
|
fmt.Fprintln(os.Stderr, "Error: -name is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
lineage, err := client.GetLineage(*namespace, *name, *depth)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting lineage: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if *format == "json" {
|
|
jsonData, _ := json.MarshalIndent(lineage, "", " ")
|
|
fmt.Println(string(jsonData))
|
|
} else {
|
|
printLineageText(lineage, *namespace, *name)
|
|
}
|
|
}
|
|
|
|
func handleListCommand() {
|
|
if len(os.Args) < 3 {
|
|
fmt.Println("Usage: marquez-cli list [namespaces|jobs|datasets] [options]")
|
|
os.Exit(1)
|
|
}
|
|
|
|
resource := os.Args[2]
|
|
listCmd := flag.NewFlagSet("list", flag.ExitOnError)
|
|
|
|
marquez := listCmd.String("marquez", getEnv("MARQUEZ_URL", defaultMarquezURL), "Marquez URL")
|
|
namespace := listCmd.String("namespace", getEnv("MARQUEZ_NAMESPACE", defaultNamespace), "Namespace")
|
|
|
|
listCmd.Parse(os.Args[3:])
|
|
|
|
client := NewMarquezClient(*marquez)
|
|
|
|
switch resource {
|
|
case "namespaces":
|
|
namespaces, err := client.GetNamespaces()
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting namespaces: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Println("Namespaces:")
|
|
for _, ns := range namespaces {
|
|
name, _ := ns["name"].(string)
|
|
fmt.Printf(" • %s\n", name)
|
|
}
|
|
|
|
case "jobs":
|
|
jobs, err := client.GetJobs(*namespace)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting jobs: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Printf("Jobs in namespace '%s':\n", *namespace)
|
|
for _, job := range jobs {
|
|
name, _ := job["name"].(string)
|
|
fmt.Printf(" • %s\n", name)
|
|
}
|
|
|
|
case "datasets":
|
|
datasets, err := client.GetDatasets(*namespace)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error getting datasets: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Printf("Datasets in namespace '%s':\n", *namespace)
|
|
for _, ds := range datasets {
|
|
name, _ := ds["name"].(string)
|
|
fmt.Printf(" • %s\n", name)
|
|
}
|
|
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "Unknown resource: %s\n", resource)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func printLineageText(lineage map[string]interface{}, namespace, datasetName string) {
|
|
fmt.Printf("Lineage for dataset '%s/%s':\n\n", namespace, datasetName)
|
|
|
|
graph, ok := lineage["graph"].([]interface{})
|
|
if !ok || len(graph) == 0 {
|
|
fmt.Println("No lineage information found")
|
|
return
|
|
}
|
|
|
|
datasets := make(map[string]bool)
|
|
jobs := make(map[string]map[string]interface{})
|
|
|
|
for _, node := range graph {
|
|
nodeMap, ok := node.(map[string]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
nodeType, _ := nodeMap["type"].(string)
|
|
nodeID, _ := nodeMap["id"].(string)
|
|
|
|
if nodeType == "DATASET" {
|
|
datasets[nodeID] = true
|
|
} else if nodeType == "JOB" {
|
|
jobs[nodeID] = nodeMap
|
|
}
|
|
}
|
|
|
|
fmt.Printf("📦 Datasets (%d):\n", len(datasets))
|
|
for ds := range datasets {
|
|
fmt.Printf(" • %s\n", ds)
|
|
}
|
|
|
|
fmt.Printf("\n⚙️ Jobs (%d):\n", len(jobs))
|
|
for jobName, jobData := range jobs {
|
|
fmt.Printf(" • %s\n", jobName)
|
|
|
|
// Show inputs
|
|
if inEdges, ok := jobData["inEdges"].([]interface{}); ok && len(inEdges) > 0 {
|
|
fmt.Printf(" ← Inputs:\n")
|
|
for _, edge := range inEdges {
|
|
if edgeMap, ok := edge.(map[string]interface{}); ok {
|
|
origin, _ := edgeMap["origin"].(string)
|
|
fmt.Printf(" - %s\n", origin)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Show outputs
|
|
if outEdges, ok := jobData["outEdges"].([]interface{}); ok && len(outEdges) > 0 {
|
|
fmt.Printf(" → Outputs:\n")
|
|
for _, edge := range outEdges {
|
|
if edgeMap, ok := edge.(map[string]interface{}); ok {
|
|
destination, _ := edgeMap["destination"].(string)
|
|
fmt.Printf(" - %s\n", destination)
|
|
}
|
|
}
|
|
}
|
|
fmt.Println()
|
|
}
|
|
}
|
|
|
|
func printUsage() {
|
|
usage := `marquez-cli - OpenLineage/Marquez CLI tool
|
|
|
|
USAGE:
|
|
marquez-cli <command> [subcommand] [options]
|
|
|
|
COMMANDS:
|
|
run Manage job runs
|
|
start Start a new run
|
|
complete Mark run as complete
|
|
fail Mark run as failed
|
|
running Mark run as running
|
|
|
|
dataset Manage datasets
|
|
register Register a new dataset
|
|
get List datasets in namespace
|
|
|
|
job Manage jobs
|
|
register Register a new job
|
|
get List jobs in namespace
|
|
runs Get runs for a specific job
|
|
|
|
lineage Get lineage information for a dataset
|
|
list List resources (namespaces|jobs|datasets)
|
|
version Show version
|
|
help Show this help
|
|
|
|
EXAMPLES:
|
|
# Start a run
|
|
marquez-cli run start -job my_pipeline -inputs "api://source" -outputs "postgres://table"
|
|
|
|
# Complete a run
|
|
marquez-cli run complete -job my_pipeline -run-id <uuid> -outputs "postgres://table"
|
|
|
|
# Fail a run
|
|
marquez-cli run fail -job my_pipeline -run-id <uuid>
|
|
|
|
# Register a dataset
|
|
marquez-cli dataset register -name "postgres://localhost:5434/postgres/public/events"
|
|
|
|
# Get lineage
|
|
marquez-cli lineage -name "postgres://localhost:5434/postgres/public/events"
|
|
|
|
# List all jobs
|
|
marquez-cli list jobs
|
|
|
|
# Get job runs
|
|
marquez-cli job runs -name my_pipeline
|
|
|
|
ENVIRONMENT VARIABLES:
|
|
MARQUEZ_URL Marquez API URL (default: http://localhost:5000)
|
|
MARQUEZ_NAMESPACE Default namespace (default: automatic-process)
|
|
|
|
For more information, visit: https://openlineage.io/
|
|
`
|
|
fmt.Print(usage)
|
|
}
|
|
|
|
func getEnv(key, defaultValue string) string {
|
|
if value := os.Getenv(key); value != "" {
|
|
return value
|
|
}
|
|
return defaultValue
|
|
}
|