5f3bc84696
Añadido binario CLI en Go para gestionar datasets, jobs y runs en Marquez. Características: - Enviar eventos OpenLineage (START, RUNNING, COMPLETE, FAIL) - Registrar y consultar datasets - Registrar y consultar jobs y runs - Consultar lineage de datasets con formato texto/JSON - Listar recursos (namespaces, jobs, datasets) - Sin dependencias externas (solo Go stdlib) - Binario estático compilado de ~5MB Archivos: - tools/marquez-cli/main.go: CLI principal con comandos - tools/marquez-cli/openlineage.go: Cliente HTTP y estructuras OpenLineage - tools/marquez-cli/go.mod: Módulo de Go - tools/marquez-cli/Makefile: Build automation - tools/marquez-cli/README.md: Documentación completa - tools/marquez-cli/QUICKSTART.md: Guía rápida de uso Instalación: make install en ~/.local/bin/marquez-cli
231 lines
6.4 KiB
Go
231 lines
6.4 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
// OpenLineage Event Types
|
|
const (
|
|
EventTypeStart = "START"
|
|
EventTypeRunning = "RUNNING"
|
|
EventTypeComplete = "COMPLETE"
|
|
EventTypeFail = "FAIL"
|
|
EventTypeAbort = "ABORT"
|
|
)
|
|
|
|
// Dataset represents an OpenLineage dataset
|
|
type Dataset struct {
|
|
Namespace string `json:"namespace"`
|
|
Name string `json:"name"`
|
|
Facets map[string]interface{} `json:"facets,omitempty"`
|
|
}
|
|
|
|
// Job represents an OpenLineage job
|
|
type Job struct {
|
|
Namespace string `json:"namespace"`
|
|
Name string `json:"name"`
|
|
Facets map[string]interface{} `json:"facets,omitempty"`
|
|
}
|
|
|
|
// Run represents an OpenLineage run
|
|
type Run struct {
|
|
RunID string `json:"runId"`
|
|
Facets map[string]interface{} `json:"facets,omitempty"`
|
|
}
|
|
|
|
// OpenLineageEvent represents a complete OpenLineage event
|
|
type OpenLineageEvent struct {
|
|
EventType string `json:"eventType"`
|
|
EventTime string `json:"eventTime"`
|
|
Producer string `json:"producer"`
|
|
SchemaURL string `json:"schemaURL,omitempty"`
|
|
Job Job `json:"job"`
|
|
Run Run `json:"run"`
|
|
Inputs []Dataset `json:"inputs,omitempty"`
|
|
Outputs []Dataset `json:"outputs,omitempty"`
|
|
}
|
|
|
|
// MarquezClient handles communication with Marquez API
|
|
type MarquezClient struct {
|
|
BaseURL string
|
|
HTTPClient *http.Client
|
|
}
|
|
|
|
// NewMarquezClient creates a new Marquez API client
|
|
func NewMarquezClient(baseURL string) *MarquezClient {
|
|
return &MarquezClient{
|
|
BaseURL: baseURL,
|
|
HTTPClient: &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
},
|
|
}
|
|
}
|
|
|
|
// SendEvent sends an OpenLineage event to Marquez
|
|
func (c *MarquezClient) SendEvent(event *OpenLineageEvent) error {
|
|
// Set default schema URL if not provided
|
|
if event.SchemaURL == "" {
|
|
event.SchemaURL = "https://openlineage.io/spec/1-0-5/OpenLineage.json"
|
|
}
|
|
|
|
// Set event time if not provided
|
|
if event.EventTime == "" {
|
|
event.EventTime = time.Now().UTC().Format(time.RFC3339Nano)
|
|
}
|
|
|
|
jsonData, err := json.Marshal(event)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal event: %w", err)
|
|
}
|
|
|
|
url := fmt.Sprintf("%s/api/v1/lineage", c.BaseURL)
|
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := c.HTTPClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetLineage retrieves lineage information for a dataset
|
|
func (c *MarquezClient) GetLineage(namespace, datasetName string, depth int) (map[string]interface{}, error) {
|
|
url := fmt.Sprintf("%s/api/v1/lineage?nodeId=dataset:%s:%s&depth=%d",
|
|
c.BaseURL, namespace, datasetName, depth)
|
|
|
|
resp, err := c.HTTPClient.Get(url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get lineage: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result map[string]interface{}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// GetNamespaces retrieves all namespaces from Marquez
|
|
func (c *MarquezClient) GetNamespaces() ([]map[string]interface{}, error) {
|
|
url := fmt.Sprintf("%s/api/v1/namespaces", c.BaseURL)
|
|
|
|
resp, err := c.HTTPClient.Get(url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get namespaces: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result struct {
|
|
Namespaces []map[string]interface{} `json:"namespaces"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return result.Namespaces, nil
|
|
}
|
|
|
|
// GetJobs retrieves all jobs in a namespace
|
|
func (c *MarquezClient) GetJobs(namespace string) ([]map[string]interface{}, error) {
|
|
url := fmt.Sprintf("%s/api/v1/namespaces/%s/jobs", c.BaseURL, namespace)
|
|
|
|
resp, err := c.HTTPClient.Get(url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get jobs: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result struct {
|
|
Jobs []map[string]interface{} `json:"jobs"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return result.Jobs, nil
|
|
}
|
|
|
|
// GetDatasets retrieves all datasets in a namespace
|
|
func (c *MarquezClient) GetDatasets(namespace string) ([]map[string]interface{}, error) {
|
|
url := fmt.Sprintf("%s/api/v1/namespaces/%s/datasets", c.BaseURL, namespace)
|
|
|
|
resp, err := c.HTTPClient.Get(url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get datasets: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result struct {
|
|
Datasets []map[string]interface{} `json:"datasets"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return result.Datasets, nil
|
|
}
|
|
|
|
// GetJobRuns retrieves runs for a specific job
|
|
func (c *MarquezClient) GetJobRuns(namespace, jobName string) ([]map[string]interface{}, error) {
|
|
url := fmt.Sprintf("%s/api/v1/namespaces/%s/jobs/%s/runs", c.BaseURL, namespace, jobName)
|
|
|
|
resp, err := c.HTTPClient.Get(url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get job runs: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result struct {
|
|
Runs []map[string]interface{} `json:"runs"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return result.Runs, nil
|
|
}
|