From 82f1f1bd589588db1af16e4f9b9b9aa1e0217bf5 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 7 Jun 2026 20:26:15 +0200 Subject: [PATCH] =?UTF-8?q?feat(infra):=20parse=5Funibus=5Fhealth=20?= =?UTF-8?q?=E2=80=94=20healthz=20del=20cluster=20unibus=20=E2=86=92=20[]Pr?= =?UTF-8?q?omSample?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Función del grupo fleet-metrics que convierte la respuesta JSON del endpoint /healthz de un nodo unibus (membershipd) en series Prometheus (unibus_up, unibus_status_ok, unibus_posture_enforce/acl/tls/cluster, unibus_store_kv) con labels node/instance. Pura de transformación (impure solo por el error de unmarshal). La consume el daemon unibus_exporter del project fleet_monitoring. Con tests golden/edge/error. Co-Authored-By: Claude Opus 4.8 (1M context) --- functions/infra/parse_unibus_health.go | 67 ++++++++++++++++ functions/infra/parse_unibus_health.md | 89 +++++++++++++++++++++ functions/infra/parse_unibus_health_test.go | 67 ++++++++++++++++ 3 files changed, 223 insertions(+) create mode 100644 functions/infra/parse_unibus_health.go create mode 100644 functions/infra/parse_unibus_health.md create mode 100644 functions/infra/parse_unibus_health_test.go diff --git a/functions/infra/parse_unibus_health.go b/functions/infra/parse_unibus_health.go new file mode 100644 index 00000000..8968c7ba --- /dev/null +++ b/functions/infra/parse_unibus_health.go @@ -0,0 +1,67 @@ +package infra + +import ( + "encoding/json" + "fmt" +) + +// unibusHealth refleja la respuesta JSON del endpoint /healthz de un nodo del +// cluster de mensajería unibus (membershipd). Forma verificada en producción: +// +// {"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"} +type unibusHealth struct { + Status string `json:"status"` + Posture struct { + Enforce bool `json:"enforce"` + ACL bool `json:"acl"` + TLS bool `json:"tls"` + Cluster bool `json:"cluster"` + Store string `json:"store"` + } `json:"posture"` +} + +// ParseUnibusHealth convierte la respuesta JSON del endpoint /healthz de un nodo +// del cluster de mensajería unibus en una serie de PromSample lista para empujar +// a VictoriaMetrics, sin instrumentar el bus (solo lee su endpoint de salud). +// +// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a cada serie +// como las labels "node" e "instance" para distinguir los nodos cuando un único +// exporter scrapea varios. La función SOLO debe llamarse cuando el nodo +// respondió: el caso "no responde" (unibus_up=0) lo emite el llamador, no esta +// función, porque sin cuerpo no hay nada que parsear. +// +// Devuelve siete series por nodo: +// - unibus_up = 1 (si el body parseó, el nodo respondió) +// - unibus_status_ok = 1 si status=="ok", si no 0 +// - unibus_posture_enforce / _acl / _tls / _cluster = 1/0 según el booleano +// - unibus_store_kv = 1 si posture.store=="kv", si no 0 +// +// Si el body no es JSON válido con la forma esperada, devuelve (nil, error). +func ParseUnibusHealth(node string, body []byte) ([]PromSample, error) { + var h unibusHealth + if err := json.Unmarshal(body, &h); err != nil { + return nil, fmt.Errorf("parse unibus healthz for node %q: %w", node, err) + } + b2f := func(b bool) float64 { + if b { + return 1 + } + return 0 + } + mk := func(name string, v float64) PromSample { + return PromSample{ + Name: name, + Labels: map[string]string{"node": node, "instance": node}, + Value: v, + } + } + return []PromSample{ + mk("unibus_up", 1), + mk("unibus_status_ok", b2f(h.Status == "ok")), + mk("unibus_posture_enforce", b2f(h.Posture.Enforce)), + mk("unibus_posture_acl", b2f(h.Posture.ACL)), + mk("unibus_posture_tls", b2f(h.Posture.TLS)), + mk("unibus_posture_cluster", b2f(h.Posture.Cluster)), + mk("unibus_store_kv", b2f(h.Posture.Store == "kv")), + }, nil +} diff --git a/functions/infra/parse_unibus_health.md b/functions/infra/parse_unibus_health.md new file mode 100644 index 00000000..481c43f6 --- /dev/null +++ b/functions/infra/parse_unibus_health.md @@ -0,0 +1,89 @@ +--- +name: parse_unibus_health +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ParseUnibusHealth(node string, body []byte) ([]PromSample, error)" +description: "Convierte la respuesta JSON del endpoint /healthz de un nodo del cluster de mensajería unibus (membershipd) en una serie de PromSample lista para empujar a VictoriaMetrics, sin instrumentar el bus: solo lee su endpoint de salud. Adjunta a cada serie las labels node e instance (= nombre lógico del nodo) para distinguir los nodos cuando un único exporter scrapea varios. Emite siete series por nodo: unibus_up, unibus_status_ok, unibus_posture_enforce/acl/tls/cluster y unibus_store_kv. Devuelve error si el body no es JSON válido con la forma esperada." +tags: [prometheus, metrics, unibus, nats, healthz, posture, fleet-metrics, infra, monitoring] +uses_functions: [] +uses_types: ["PromSample_go_infra"] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: ["encoding/json", "fmt"] +params: + - name: node + desc: "nombre lógico del nodo (p.ej. \"magnus\"); se adjunta como labels node e instance a cada serie" + - name: body + desc: "cuerpo JSON crudo devuelto por GET https://:8470/healthz, forma {\"posture\":{enforce,acl,tls,cluster bool; store string},\"status\":string}" +output: "slice de 7 PromSample con labels {node,instance}: unibus_up=1, unibus_status_ok (1 si status==ok), unibus_posture_enforce/acl/tls/cluster (1/0), unibus_store_kv (1 si posture.store==kv). Error si el body no es JSON válido." +tested: true +test_file_path: "functions/infra/parse_unibus_health_test.go" +tests: + - "TestParseUnibusHealthGolden" + - "TestParseUnibusHealthDegraded" + - "TestParseUnibusHealthInvalid" +--- + +# parse_unibus_health + +Función pura de transformación (clasificada `impure` solo porque devuelve `error` al +fallar el unmarshal; no hace I/O ni red) que traduce la salud de un nodo del bus de +mensajería **unibus** a métricas Prometheus. Pertenece al grupo de capacidad +`fleet-metrics`: se compone con `format_prom_exposition_go_infra` (serializar) y +`push_prom_remote_go_infra` (empujar a VictoriaMetrics). + +El endpoint `/healthz` de cada nodo (`membershipd`) responde, verificado en producción: + +```json +{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"} +``` + +## Ejemplo + +```go +package main + +import ( + "fmt" + "time" + + "fn-registry/functions/infra" +) + +func main() { + body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`) + samples, err := infra.ParseUnibusHealth("magnus", body) + if err != nil { + panic(err) + } + // Serializa y (en un exporter real) empuja a VictoriaMetrics. + fmt.Print(infra.FormatPromExposition(samples, time.Now().UnixMilli())) + // unibus_up{instance="magnus",node="magnus"} 1 ... + // unibus_posture_enforce{instance="magnus",node="magnus"} 1 ... +} +``` + +## Cuando usarla + +Úsala dentro de un exporter que monitoriza el cluster unibus: tras hacer +`GET https://:8470/healthz` con la CA del cluster, pasa el cuerpo a esta función +para obtener las series del nodo. Llámala **solo cuando el nodo respondió**; si el GET +falla (timeout, TLS, no-2xx), emite tú `unibus_up=0` para ese nodo, porque sin cuerpo +no hay nada que parsear. + +## Gotchas + +- No emite `unibus_up=0`: ese caso (nodo caído) es responsabilidad del llamador, que sabe + si el GET falló. Esta función siempre emite `unibus_up=1` porque solo se la llama con un + cuerpo recibido. +- Las labels `node` e `instance` toman el mismo valor (el nombre lógico del nodo). El + `push_prom_remote_go_infra` añadiría `instance` vía `extra_label` por igual a todas las + series del body; por eso aquí ya se fija `instance` por-serie, para que cada nodo unibus + conserve su identidad cuando un solo exporter empuja los de varios nodos en un único POST. +- Solo lee la posture y el status que hoy expone `/healthz`. Métricas profundas de + NATS/JetStream (msgs/s, conexiones, RAFT leader por stream) NO salen de aquí: requieren + el monitoring embebido de NATS (puerto 8222), que en producción está cerrado. diff --git a/functions/infra/parse_unibus_health_test.go b/functions/infra/parse_unibus_health_test.go new file mode 100644 index 00000000..d7ee6b36 --- /dev/null +++ b/functions/infra/parse_unibus_health_test.go @@ -0,0 +1,67 @@ +package infra + +import "testing" + +// golden: nodo seguro con la posture homogénea esperada en producción. +func TestParseUnibusHealthGolden(t *testing.T) { + body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`) + got, err := ParseUnibusHealth("magnus", body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := map[string]float64{ + "unibus_up": 1, + "unibus_status_ok": 1, + "unibus_posture_enforce": 1, + "unibus_posture_acl": 1, + "unibus_posture_tls": 1, + "unibus_posture_cluster": 1, + "unibus_store_kv": 1, + } + if len(got) != len(want) { + t.Fatalf("got %d samples, want %d", len(got), len(want)) + } + for _, s := range got { + w, ok := want[s.Name] + if !ok { + t.Errorf("unexpected sample %q", s.Name) + continue + } + if s.Value != w { + t.Errorf("%s = %v, want %v", s.Name, s.Value, w) + } + if s.Labels["node"] != "magnus" || s.Labels["instance"] != "magnus" { + t.Errorf("%s labels = %v, want node=instance=magnus", s.Name, s.Labels) + } + } +} + +// edge: nodo degradado (posture todo false, store distinto de kv, status != ok). +func TestParseUnibusHealthDegraded(t *testing.T) { + body := []byte(`{"posture":{"enforce":false,"acl":false,"tls":false,"cluster":false,"store":"sqlite"},"status":"degraded"}`) + got, err := ParseUnibusHealth("homer", body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := map[string]float64{ + "unibus_up": 1, + "unibus_status_ok": 0, + "unibus_posture_enforce": 0, + "unibus_posture_acl": 0, + "unibus_posture_tls": 0, + "unibus_posture_cluster": 0, + "unibus_store_kv": 0, + } + for _, s := range got { + if s.Value != want[s.Name] { + t.Errorf("%s = %v, want %v", s.Name, s.Value, want[s.Name]) + } + } +} + +// error path: body que no es JSON válido devuelve error, no panic. +func TestParseUnibusHealthInvalid(t *testing.T) { + if _, err := ParseUnibusHealth("datardos", []byte("not json at all")); err == nil { + t.Fatal("expected error for invalid body, got nil") + } +}