feat(infra): parse_unibus_health — healthz del cluster unibus → []PromSample
Función del grupo fleet-metrics que convierte la respuesta JSON del endpoint /healthz de un nodo unibus (membershipd) en series Prometheus (unibus_up, unibus_status_ok, unibus_posture_enforce/acl/tls/cluster, unibus_store_kv) con labels node/instance. Pura de transformación (impure solo por el error de unmarshal). La consume el daemon unibus_exporter del project fleet_monitoring. Con tests golden/edge/error. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,67 @@
|
|||||||
|
package infra
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// unibusHealth refleja la respuesta JSON del endpoint /healthz de un nodo del
|
||||||
|
// cluster de mensajería unibus (membershipd). Forma verificada en producción:
|
||||||
|
//
|
||||||
|
// {"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}
|
||||||
|
type unibusHealth struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Posture struct {
|
||||||
|
Enforce bool `json:"enforce"`
|
||||||
|
ACL bool `json:"acl"`
|
||||||
|
TLS bool `json:"tls"`
|
||||||
|
Cluster bool `json:"cluster"`
|
||||||
|
Store string `json:"store"`
|
||||||
|
} `json:"posture"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseUnibusHealth convierte la respuesta JSON del endpoint /healthz de un nodo
|
||||||
|
// del cluster de mensajería unibus en una serie de PromSample lista para empujar
|
||||||
|
// a VictoriaMetrics, sin instrumentar el bus (solo lee su endpoint de salud).
|
||||||
|
//
|
||||||
|
// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a cada serie
|
||||||
|
// como las labels "node" e "instance" para distinguir los nodos cuando un único
|
||||||
|
// exporter scrapea varios. La función SOLO debe llamarse cuando el nodo
|
||||||
|
// respondió: el caso "no responde" (unibus_up=0) lo emite el llamador, no esta
|
||||||
|
// función, porque sin cuerpo no hay nada que parsear.
|
||||||
|
//
|
||||||
|
// Devuelve siete series por nodo:
|
||||||
|
// - unibus_up = 1 (si el body parseó, el nodo respondió)
|
||||||
|
// - unibus_status_ok = 1 si status=="ok", si no 0
|
||||||
|
// - unibus_posture_enforce / _acl / _tls / _cluster = 1/0 según el booleano
|
||||||
|
// - unibus_store_kv = 1 si posture.store=="kv", si no 0
|
||||||
|
//
|
||||||
|
// Si el body no es JSON válido con la forma esperada, devuelve (nil, error).
|
||||||
|
func ParseUnibusHealth(node string, body []byte) ([]PromSample, error) {
|
||||||
|
var h unibusHealth
|
||||||
|
if err := json.Unmarshal(body, &h); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse unibus healthz for node %q: %w", node, err)
|
||||||
|
}
|
||||||
|
b2f := func(b bool) float64 {
|
||||||
|
if b {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
mk := func(name string, v float64) PromSample {
|
||||||
|
return PromSample{
|
||||||
|
Name: name,
|
||||||
|
Labels: map[string]string{"node": node, "instance": node},
|
||||||
|
Value: v,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return []PromSample{
|
||||||
|
mk("unibus_up", 1),
|
||||||
|
mk("unibus_status_ok", b2f(h.Status == "ok")),
|
||||||
|
mk("unibus_posture_enforce", b2f(h.Posture.Enforce)),
|
||||||
|
mk("unibus_posture_acl", b2f(h.Posture.ACL)),
|
||||||
|
mk("unibus_posture_tls", b2f(h.Posture.TLS)),
|
||||||
|
mk("unibus_posture_cluster", b2f(h.Posture.Cluster)),
|
||||||
|
mk("unibus_store_kv", b2f(h.Posture.Store == "kv")),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,89 @@
|
|||||||
|
---
|
||||||
|
name: parse_unibus_health
|
||||||
|
kind: function
|
||||||
|
lang: go
|
||||||
|
domain: infra
|
||||||
|
version: "1.0.0"
|
||||||
|
purity: impure
|
||||||
|
signature: "func ParseUnibusHealth(node string, body []byte) ([]PromSample, error)"
|
||||||
|
description: "Convierte la respuesta JSON del endpoint /healthz de un nodo del cluster de mensajería unibus (membershipd) en una serie de PromSample lista para empujar a VictoriaMetrics, sin instrumentar el bus: solo lee su endpoint de salud. Adjunta a cada serie las labels node e instance (= nombre lógico del nodo) para distinguir los nodos cuando un único exporter scrapea varios. Emite siete series por nodo: unibus_up, unibus_status_ok, unibus_posture_enforce/acl/tls/cluster y unibus_store_kv. Devuelve error si el body no es JSON válido con la forma esperada."
|
||||||
|
tags: [prometheus, metrics, unibus, nats, healthz, posture, fleet-metrics, infra, monitoring]
|
||||||
|
uses_functions: []
|
||||||
|
uses_types: ["PromSample_go_infra"]
|
||||||
|
returns: []
|
||||||
|
returns_optional: false
|
||||||
|
error_type: "error_go_core"
|
||||||
|
imports: ["encoding/json", "fmt"]
|
||||||
|
params:
|
||||||
|
- name: node
|
||||||
|
desc: "nombre lógico del nodo (p.ej. \"magnus\"); se adjunta como labels node e instance a cada serie"
|
||||||
|
- name: body
|
||||||
|
desc: "cuerpo JSON crudo devuelto por GET https://<nodo>:8470/healthz, forma {\"posture\":{enforce,acl,tls,cluster bool; store string},\"status\":string}"
|
||||||
|
output: "slice de 7 PromSample con labels {node,instance}: unibus_up=1, unibus_status_ok (1 si status==ok), unibus_posture_enforce/acl/tls/cluster (1/0), unibus_store_kv (1 si posture.store==kv). Error si el body no es JSON válido."
|
||||||
|
tested: true
|
||||||
|
test_file_path: "functions/infra/parse_unibus_health_test.go"
|
||||||
|
tests:
|
||||||
|
- "TestParseUnibusHealthGolden"
|
||||||
|
- "TestParseUnibusHealthDegraded"
|
||||||
|
- "TestParseUnibusHealthInvalid"
|
||||||
|
---
|
||||||
|
|
||||||
|
# parse_unibus_health
|
||||||
|
|
||||||
|
Función pura de transformación (clasificada `impure` solo porque devuelve `error` al
|
||||||
|
fallar el unmarshal; no hace I/O ni red) que traduce la salud de un nodo del bus de
|
||||||
|
mensajería **unibus** a métricas Prometheus. Pertenece al grupo de capacidad
|
||||||
|
`fleet-metrics`: se compone con `format_prom_exposition_go_infra` (serializar) y
|
||||||
|
`push_prom_remote_go_infra` (empujar a VictoriaMetrics).
|
||||||
|
|
||||||
|
El endpoint `/healthz` de cada nodo (`membershipd`) responde, verificado en producción:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Ejemplo
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"fn-registry/functions/infra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`)
|
||||||
|
samples, err := infra.ParseUnibusHealth("magnus", body)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
// Serializa y (en un exporter real) empuja a VictoriaMetrics.
|
||||||
|
fmt.Print(infra.FormatPromExposition(samples, time.Now().UnixMilli()))
|
||||||
|
// unibus_up{instance="magnus",node="magnus"} 1 ...
|
||||||
|
// unibus_posture_enforce{instance="magnus",node="magnus"} 1 ...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Cuando usarla
|
||||||
|
|
||||||
|
Úsala dentro de un exporter que monitoriza el cluster unibus: tras hacer
|
||||||
|
`GET https://<nodo>:8470/healthz` con la CA del cluster, pasa el cuerpo a esta función
|
||||||
|
para obtener las series del nodo. Llámala **solo cuando el nodo respondió**; si el GET
|
||||||
|
falla (timeout, TLS, no-2xx), emite tú `unibus_up=0` para ese nodo, porque sin cuerpo
|
||||||
|
no hay nada que parsear.
|
||||||
|
|
||||||
|
## Gotchas
|
||||||
|
|
||||||
|
- No emite `unibus_up=0`: ese caso (nodo caído) es responsabilidad del llamador, que sabe
|
||||||
|
si el GET falló. Esta función siempre emite `unibus_up=1` porque solo se la llama con un
|
||||||
|
cuerpo recibido.
|
||||||
|
- Las labels `node` e `instance` toman el mismo valor (el nombre lógico del nodo). El
|
||||||
|
`push_prom_remote_go_infra` añadiría `instance` vía `extra_label` por igual a todas las
|
||||||
|
series del body; por eso aquí ya se fija `instance` por-serie, para que cada nodo unibus
|
||||||
|
conserve su identidad cuando un solo exporter empuja los de varios nodos en un único POST.
|
||||||
|
- Solo lee la posture y el status que hoy expone `/healthz`. Métricas profundas de
|
||||||
|
NATS/JetStream (msgs/s, conexiones, RAFT leader por stream) NO salen de aquí: requieren
|
||||||
|
el monitoring embebido de NATS (puerto 8222), que en producción está cerrado.
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
package infra
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
// golden: nodo seguro con la posture homogénea esperada en producción.
|
||||||
|
func TestParseUnibusHealthGolden(t *testing.T) {
|
||||||
|
body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`)
|
||||||
|
got, err := ParseUnibusHealth("magnus", body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
want := map[string]float64{
|
||||||
|
"unibus_up": 1,
|
||||||
|
"unibus_status_ok": 1,
|
||||||
|
"unibus_posture_enforce": 1,
|
||||||
|
"unibus_posture_acl": 1,
|
||||||
|
"unibus_posture_tls": 1,
|
||||||
|
"unibus_posture_cluster": 1,
|
||||||
|
"unibus_store_kv": 1,
|
||||||
|
}
|
||||||
|
if len(got) != len(want) {
|
||||||
|
t.Fatalf("got %d samples, want %d", len(got), len(want))
|
||||||
|
}
|
||||||
|
for _, s := range got {
|
||||||
|
w, ok := want[s.Name]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("unexpected sample %q", s.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s.Value != w {
|
||||||
|
t.Errorf("%s = %v, want %v", s.Name, s.Value, w)
|
||||||
|
}
|
||||||
|
if s.Labels["node"] != "magnus" || s.Labels["instance"] != "magnus" {
|
||||||
|
t.Errorf("%s labels = %v, want node=instance=magnus", s.Name, s.Labels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// edge: nodo degradado (posture todo false, store distinto de kv, status != ok).
|
||||||
|
func TestParseUnibusHealthDegraded(t *testing.T) {
|
||||||
|
body := []byte(`{"posture":{"enforce":false,"acl":false,"tls":false,"cluster":false,"store":"sqlite"},"status":"degraded"}`)
|
||||||
|
got, err := ParseUnibusHealth("homer", body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
want := map[string]float64{
|
||||||
|
"unibus_up": 1,
|
||||||
|
"unibus_status_ok": 0,
|
||||||
|
"unibus_posture_enforce": 0,
|
||||||
|
"unibus_posture_acl": 0,
|
||||||
|
"unibus_posture_tls": 0,
|
||||||
|
"unibus_posture_cluster": 0,
|
||||||
|
"unibus_store_kv": 0,
|
||||||
|
}
|
||||||
|
for _, s := range got {
|
||||||
|
if s.Value != want[s.Name] {
|
||||||
|
t.Errorf("%s = %v, want %v", s.Name, s.Value, want[s.Name])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// error path: body que no es JSON válido devuelve error, no panic.
|
||||||
|
func TestParseUnibusHealthInvalid(t *testing.T) {
|
||||||
|
if _, err := ParseUnibusHealth("datardos", []byte("not json at all")); err == nil {
|
||||||
|
t.Fatal("expected error for invalid body, got nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user