4 Commits

Author SHA1 Message Date
egutierrez e769836b0d feat(pipelines): auto-commit con 1 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 11:33:13 +02:00
egutierrez 93756fbd0c chore: auto-commit (1 archivos)
- .claude/settings.local.json

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 11:28:02 +02:00
egutierrez 0a6d1b8d17 feat(infra): auto-commit con 6 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 01:57:00 +02:00
egutierrez 82f1f1bd58 feat(infra): parse_unibus_health — healthz del cluster unibus → []PromSample
Función del grupo fleet-metrics que convierte la respuesta JSON del endpoint /healthz
de un nodo unibus (membershipd) en series Prometheus (unibus_up, unibus_status_ok,
unibus_posture_enforce/acl/tls/cluster, unibus_store_kv) con labels node/instance.
Pura de transformación (impure solo por el error de unmarshal). La consume el daemon
unibus_exporter del project fleet_monitoring. Con tests golden/edge/error.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:26:15 +02:00
11 changed files with 866 additions and 21 deletions
+2 -1
View File
@@ -2,7 +2,8 @@
"permissions": {
"allow": [
"Bash(CGO_ENABLED=1 go test *)",
"Bash(sqlite3 *)"
"Bash(sqlite3 *)",
"Read(//home/enmanuel/.claude/**)"
]
},
"enabledMcpjsonServers": [
+5 -20
View File
@@ -72,28 +72,13 @@ full_git_push() {
echo " [skip] GITEA_URL/GITEA_TOKEN no disponibles — omitiendo auto-init" >&2
fi
# Redescubrir repos tras posibles inicializaciones
# Redescubrir repos tras posibles inicializaciones.
# El repo de config de Claude (dataforge/repo_Claude, al que apuntan los
# symlinks de ~/.claude/) vive en fn_registry/external/repo_Claude, asi que
# discover_git_repos ya lo encuentra y pasa por scan-secrets/commit/push
# como un repo mas. No necesita tratamiento especial.
repos=$(discover_git_repos "$registry_root")
# --- Paso 1c: Incluir el repo de configuracion de Claude ---
# Los archivos de ~/.claude/ (settings.json, commands, skills, CLAUDE.md...)
# son symlinks a un repo git externo (dataforge/repo_Claude). Lo resolvemos
# de forma portable siguiendo el symlink de settings.json — sin hardcodear
# el path, que difiere entre PCs. Si resuelve a un repo git, lo anadimos a
# la lista para que pase por scan-secrets + auto-commit + push como los demas.
local claude_repo=""
if [[ -L "$HOME/.claude/settings.json" ]]; then
local _claude_settings_real
_claude_settings_real=$(readlink -f "$HOME/.claude/settings.json" 2>/dev/null || true)
if [[ -n "$_claude_settings_real" ]]; then
claude_repo=$(git -C "$(dirname "$_claude_settings_real")" rev-parse --show-toplevel 2>/dev/null || true)
fi
fi
if [[ -n "$claude_repo" && -d "$claude_repo/.git" ]]; then
echo "[1c] Incluyendo repo de config Claude: $claude_repo" >&2
repos="$repos"$'\n'"$claude_repo"
fi
# --- Paso 2: Escanear secrets ---
echo "" >&2
echo "[2/6] Escaneando secrets en dirty trees..." >&2
+150
View File
@@ -0,0 +1,150 @@
package infra
import (
"encoding/json"
"fmt"
"strings"
"time"
)
// natsVarz refleja los campos relevantes de la respuesta JSON del endpoint
// /varz del monitoring HTTP embebido de un nats-server (puerto 8222, loopback).
// Solo se mapean los campos que producen series; el resto se ignora.
type natsVarz struct {
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
Connections int `json:"connections"`
SlowConsumers int `json:"slow_consumers"`
Subscriptions int `json:"subscriptions"`
Mem int64 `json:"mem"`
Start string `json:"start"`
}
// natsConnz refleja los campos relevantes de /connz.
type natsConnz struct {
NumConnections int `json:"num_connections"`
}
// natsStreamDetail refleja un stream dentro de account_details[].stream_detail[].
type natsStreamDetail struct {
Name string `json:"name"`
Cluster struct {
Leader string `json:"leader"`
} `json:"cluster"`
State struct {
Messages int64 `json:"messages"`
Bytes int64 `json:"bytes"`
} `json:"state"`
}
// natsJsz refleja los campos relevantes de /jsz?streams=1.
type natsJsz struct {
Streams int64 `json:"streams"`
Messages int64 `json:"messages"`
Bytes int64 `json:"bytes"`
Memory int64 `json:"memory"`
Storage int64 `json:"storage"`
AccountDetails []struct {
StreamDetail []natsStreamDetail `json:"stream_detail"`
} `json:"account_details"`
}
// ParseNatsMonitor convierte las respuestas JSON del endpoint de monitoring HTTP
// embebido de un nats-server (puerto 8222, loopback) en una serie de PromSample
// lista para empujar a VictoriaMetrics. Es la hermana de ParseUnibusHealth para
// las métricas server-level de NATS/JetStream (msgs/s, conexiones, KV bucket
// msgs, RAFT leader por stream, memoria). La consume el unibus_exporter de
// fleet_monitoring en modo scraper local por nodo.
//
// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a CADA serie
// como las labels "node" e "instance" para distinguir los nodos cuando un único
// exporter scrapea varios.
//
// varz, connz y jsz son los cuerpos crudos de GET /varz, GET /connz y
// GET /jsz?streams=1 respectivamente:
// - varz es el core: si NO parsea como JSON válido devuelve (nil, error).
// - connz y jsz son best-effort: si vienen vacíos o no parsean, sus series se
// omiten sin abortar (no error), para que el scraper resista que un endpoint
// falle. nats_connections cae a varz.connections cuando connz no parsea.
func ParseNatsMonitor(node string, varz, connz, jsz []byte) ([]PromSample, error) {
var v natsVarz
if err := json.Unmarshal(varz, &v); err != nil {
return nil, fmt.Errorf("parse nats varz for node %q: %w", node, err)
}
// mk construye un PromSample con las labels base {node, instance} más, de
// forma opcional, labels extra (clave/valor alternados). Las labels base no
// se pueden sobreescribir desde extra.
mk := func(name string, val float64, extra ...string) PromSample {
labels := map[string]string{"node": node, "instance": node}
for i := 0; i+1 < len(extra); i += 2 {
labels[extra[i]] = extra[i+1]
}
return PromSample{Name: name, Labels: labels, Value: val}
}
out := []PromSample{
mk("nats_msgs_in_total", float64(v.InMsgs)),
mk("nats_msgs_out_total", float64(v.OutMsgs)),
mk("nats_bytes_in_total", float64(v.InBytes)),
mk("nats_bytes_out_total", float64(v.OutBytes)),
}
// nats_connections: prefiere connz.num_connections; si connz no parsea, cae
// a varz.connections para no perder la serie.
connections := float64(v.Connections)
if len(connz) > 0 {
var c natsConnz
if err := json.Unmarshal(connz, &c); err == nil {
connections = float64(c.NumConnections)
}
}
out = append(out,
mk("nats_connections", connections),
mk("nats_slow_consumers", float64(v.SlowConsumers)),
mk("nats_mem_bytes", float64(v.Mem)),
mk("nats_subscriptions", float64(v.Subscriptions)),
)
// nats_server_start_seconds: epoch (segundos Unix) del campo start (RFC3339).
// Proxy de reinicios del nats-server: un cambio de este valor = el server
// reinició. Si el parse de la fecha falla, se omite la serie (no se aborta).
if t, err := time.Parse(time.RFC3339, v.Start); err == nil {
out = append(out, mk("nats_server_start_seconds", float64(t.Unix())))
}
// jsz es best-effort: si vacío o inválido, se omiten todas sus series.
if len(jsz) > 0 {
var j natsJsz
if err := json.Unmarshal(jsz, &j); err == nil {
out = append(out,
mk("nats_jetstream_streams", float64(j.Streams)),
mk("nats_jetstream_messages", float64(j.Messages)),
mk("nats_jetstream_bytes", float64(j.Bytes)),
mk("nats_jetstream_memory_bytes", float64(j.Memory)),
mk("nats_jetstream_storage_bytes", float64(j.Storage)),
)
for _, acc := range j.AccountDetails {
for _, sd := range acc.StreamDetail {
out = append(out,
mk("nats_stream_messages", float64(sd.State.Messages), "stream", sd.Name),
mk("nats_stream_bytes", float64(sd.State.Bytes), "stream", sd.Name),
)
leader := 0.0
if sd.Cluster.Leader == node {
leader = 1
}
out = append(out, mk("nats_jetstream_raft_leader", leader, "stream", sd.Name))
if bucket, ok := strings.CutPrefix(sd.Name, "KV_"); ok {
out = append(out, mk("kv_bucket_msgs", float64(sd.State.Messages), "bucket", bucket))
}
}
}
}
}
return out, nil
}
+119
View File
@@ -0,0 +1,119 @@
---
name: parse_nats_monitor
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ParseNatsMonitor(node string, varz, connz, jsz []byte) ([]PromSample, error)"
description: "Convierte las respuestas JSON del endpoint de monitoring HTTP embebido de un nats-server (puerto 8222, loopback) en una serie de PromSample lista para empujar a VictoriaMetrics. Hermana de ParseUnibusHealth pero para las métricas server-level de NATS/JetStream: msgs/s, bytes, conexiones, slow consumers, memoria RSS, start epoch (proxy de reinicios), streams/messages/bytes/memory/storage de JetStream, y por stream nats_stream_messages/bytes, nats_jetstream_raft_leader y kv_bucket_msgs para los buckets KV_. Adjunta labels node e instance a cada serie. varz es el core (error si no parsea); connz y jsz son best-effort (se omiten sin abortar). La consume el unibus_exporter de fleet_monitoring como scraper local por nodo."
tags: [prometheus, metrics, nats, jetstream, monitoring, varz, connz, jsz, kv, raft, fleet-metrics, infra]
uses_functions: []
uses_types: ["PromSample_go_infra"]
returns: []
returns_optional: true
error_type: "error_go_core"
imports: ["encoding/json", "fmt", "strings", "time"]
params:
- name: node
desc: "nombre lógico del nodo (p.ej. \"magnus\"); se adjunta como labels node e instance a CADA serie y se compara con cluster.leader de cada stream para nats_jetstream_raft_leader"
- name: varz
desc: "cuerpo JSON crudo de GET http://127.0.0.1:8222/varz; core de la función (in_msgs, out_msgs, in_bytes, out_bytes, connections, slow_consumers, subscriptions, mem, start). Si no parsea, la función devuelve error"
- name: connz
desc: "cuerpo JSON crudo de GET http://127.0.0.1:8222/connz; best-effort (num_connections). Si vacío o inválido, nats_connections cae a varz.connections sin abortar"
- name: jsz
desc: "cuerpo JSON crudo de GET http://127.0.0.1:8222/jsz?streams=1; best-effort (streams, messages, bytes, memory, storage y account_details[].stream_detail[]). Si vacío o inválido, se omiten sus series sin abortar. Necesita ?streams=1 para traer stream_detail"
output: "slice de PromSample con labels base {node,instance}: nats_msgs_in/out_total, nats_bytes_in/out_total, nats_connections, nats_slow_consumers, nats_mem_bytes, nats_subscriptions, nats_server_start_seconds (omitida si start no parsea), nats_jetstream_streams/messages/bytes/memory_bytes/storage_bytes; y por stream nats_stream_messages{stream}, nats_stream_bytes{stream}, nats_jetstream_raft_leader{stream} (1 si cluster.leader==node) y, para streams KV_, kv_bucket_msgs{bucket} con el prefijo KV_ recortado. Error solo si varz no es JSON válido."
tested: true
test_file_path: "functions/infra/parse_nats_monitor_test.go"
tests:
- "TestParseNatsMonitorGolden"
- "TestParseNatsMonitorEmptyJsz"
- "TestParseNatsMonitorInvalidConnz"
- "TestParseNatsMonitorInvalidVarz"
---
# parse_nats_monitor
Función de transformación (clasificada `impure` porque devuelve `error` al fallar el
unmarshal del core; no hace I/O ni red por sí misma) que traduce las métricas
server-level de un **nats-server** a series Prometheus. Es la hermana de
`parse_unibus_health_go_infra`: aquella lee el `/healthz` de `membershipd` (posture),
esta lee el monitoring embebido de NATS (puerto 8222) para las métricas profundas que
`/healthz` no expone: msgs/s, conexiones, RAFT leader por stream, memoria, KV buckets.
Pertenece al grupo de capacidad `fleet-metrics`: se compone con
`format_prom_exposition_go_infra` (serializar) y `push_prom_remote_go_infra` (empujar a
VictoriaMetrics). La consume el `unibus_exporter` de `fleet_monitoring` en modo scraper
local por nodo, que hace los tres GET y le pasa los cuerpos crudos.
## Ejemplo
```go
package main
import (
"fmt"
"io"
"net/http"
"time"
"fn-registry/functions/infra"
)
func get(url string) []byte {
resp, err := http.Get(url)
if err != nil {
return nil // best-effort: connz/jsz pueden faltar
}
defer resp.Body.Close()
b, _ := io.ReadAll(resp.Body)
return b
}
func main() {
base := "http://127.0.0.1:8222"
varz := get(base + "/varz")
connz := get(base + "/connz")
jsz := get(base + "/jsz?streams=1")
samples, err := infra.ParseNatsMonitor("magnus", varz, connz, jsz)
if err != nil {
panic(err) // varz es el core: sin él no hay métricas
}
fmt.Print(infra.FormatPromExposition(samples, time.Now().UnixMilli()))
// nats_msgs_in_total{instance="magnus",node="magnus"} 17 ...
// kv_bucket_msgs{bucket="UNIBUS_users",instance="magnus",node="magnus"} 2 ...
// nats_jetstream_raft_leader{instance="magnus",node="magnus",stream="KV_UNIBUS_users"} 1 ...
}
```
## Cuando usarla
Úsala dentro de un exporter que monitoriza un nats-server con el monitoring HTTP
embebido activado (`http: 127.0.0.1:8222` en la config de NATS): tras hacer
`GET /varz`, `GET /connz` y `GET /jsz?streams=1` contra loopback, pasa los tres cuerpos
crudos a esta función para obtener todas las series server-level del nodo. Llámala como
scraper local por nodo (cada nodo expone su 8222 solo en loopback), no centralizado.
## Gotchas
- **Impura por contrato**: solo devuelve `error` si `varz` no es JSON válido (es el core).
`connz` y `jsz` son **best-effort**: si vienen vacíos o no parsean, sus series se omiten
sin abortar. Esto hace al scraper resistente a que un endpoint falle de forma puntual.
- **Monitoring loopback-only sin auth**: el puerto 8222 de NATS no tiene autenticación; por
eso debe bindearse a `127.0.0.1` y scrapearse localmente en cada nodo, nunca exponerse a
la red. El push agregado a VictoriaMetrics lo hace el exporter, no esta función.
- **`/jsz` necesita `?streams=1`** para traer `account_details[].stream_detail[]`. Sin ese
parámetro el cuerpo trae los totales pero no el detalle por stream, y entonces no salen
`nats_stream_*`, `nats_jetstream_raft_leader` ni `kv_bucket_msgs`.
- **`nats_connections`**: prefiere `connz.num_connections`; si `connz` no parsea, cae a
`varz.connections` para no perder la serie.
- **RAFT leader en standalone**: en un nats-server sin clúster, el objeto `cluster` puede
faltar o `leader` venir vacío; en ese caso `nats_jetstream_raft_leader` sale 0 salvo que
`cluster.leader == node`. Es esperado: en standalone no hay quorum RAFT real.
- **`kv_bucket_msgs`** solo se emite para streams cuyo nombre empieza por `KV_`, recortando
el prefijo (stream `KV_UNIBUS_users` → bucket `UNIBUS_users`).
- **`nats_server_start_seconds`** es el epoch Unix del campo `start` (RFC3339): sirve como
proxy de reinicios (un cambio de valor = el server reinició). Si el campo no parsea como
fecha válida, la serie se omite en lugar de abortar.
+160
View File
@@ -0,0 +1,160 @@
package infra
import (
"os"
"testing"
)
// findNatsSample devuelve el primer PromSample cuyo Name coincide y cuyos labels
// extra (clave/valor alternados) están todos presentes con el valor esperado.
// El segundo retorno indica si se encontró.
func findNatsSample(samples []PromSample, name string, labels ...string) (PromSample, bool) {
for _, s := range samples {
if s.Name != name {
continue
}
match := true
for i := 0; i+1 < len(labels); i += 2 {
if s.Labels[labels[i]] != labels[i+1] {
match = false
break
}
}
if match {
return s, true
}
}
return PromSample{}, false
}
func mustRead(t *testing.T, path string) []byte {
t.Helper()
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("read fixture %s: %v", path, err)
}
return b
}
// golden: fixtures reales de un nats-server 2.11.15, node="probe" (== el leader
// de los streams), valores concretos verificados a mano.
func TestParseNatsMonitorGolden(t *testing.T) {
varz := mustRead(t, "testdata/nats_varz.json")
connz := mustRead(t, "testdata/nats_connz.json")
jsz := mustRead(t, "testdata/nats_jsz.json")
got, err := ParseNatsMonitor("probe", varz, connz, jsz)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
want := map[string]float64{
"nats_msgs_in_total": 17,
"nats_msgs_out_total": 17,
"nats_mem_bytes": 18288640,
"nats_jetstream_streams": 3,
"nats_connections": 1,
"nats_jetstream_messages": 6,
}
for name, w := range want {
s, ok := findNatsSample(got, name)
if !ok {
t.Errorf("missing sample %q", name)
continue
}
if s.Value != w {
t.Errorf("%s = %v, want %v", name, s.Value, w)
}
if s.Labels["node"] != "probe" || s.Labels["instance"] != "probe" {
t.Errorf("%s labels = %v, want node=instance=probe", name, s.Labels)
}
}
// kv_bucket_msgs por cada KV bucket (prefijo KV_ recortado).
for bucket, w := range map[string]float64{
"UNIBUS_users": 2,
"UNIBUS_rooms": 2,
"UNIBUS_members": 2,
} {
s, ok := findNatsSample(got, "kv_bucket_msgs", "bucket", bucket)
if !ok {
t.Errorf("missing kv_bucket_msgs{bucket=%q}", bucket)
continue
}
if s.Value != w {
t.Errorf("kv_bucket_msgs{bucket=%q} = %v, want %v", bucket, s.Value, w)
}
}
// raft leader: probe == node, así que el stream KV_UNIBUS_users tiene leader=1.
s, ok := findNatsSample(got, "nats_jetstream_raft_leader", "stream", "KV_UNIBUS_users")
if !ok {
t.Fatal("missing nats_jetstream_raft_leader{stream=KV_UNIBUS_users}")
}
if s.Value != 1 {
t.Errorf("nats_jetstream_raft_leader{stream=KV_UNIBUS_users} = %v, want 1", s.Value)
}
// stream_detail también emite nats_stream_messages con label stream completo.
if s, ok := findNatsSample(got, "nats_stream_messages", "stream", "KV_UNIBUS_users"); !ok || s.Value != 2 {
t.Errorf("nats_stream_messages{stream=KV_UNIBUS_users} = %v ok=%v, want 2", s.Value, ok)
}
// nats_server_start_seconds presente (start es RFC3339 válido).
if _, ok := findNatsSample(got, "nats_server_start_seconds"); !ok {
t.Error("missing nats_server_start_seconds (start is a valid RFC3339)")
}
}
// edge: jsz sin streams ni account_details. No produce series kv_bucket_msgs ni
// nats_stream_*, pero sí las de varz/connz y las jetstream top-level (en 0).
func TestParseNatsMonitorEmptyJsz(t *testing.T) {
varz := mustRead(t, "testdata/nats_varz.json")
connz := mustRead(t, "testdata/nats_connz.json")
jsz := []byte(`{"streams":0,"account_details":[]}`)
got, err := ParseNatsMonitor("probe", varz, connz, jsz)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if _, ok := findNatsSample(got, "kv_bucket_msgs", "bucket", "UNIBUS_users"); ok {
t.Error("did not expect kv_bucket_msgs with empty account_details")
}
if _, ok := findNatsSample(got, "nats_stream_messages"); ok {
t.Error("did not expect nats_stream_messages with empty account_details")
}
// varz/connz siguen presentes.
if s, ok := findNatsSample(got, "nats_msgs_in_total"); !ok || s.Value != 17 {
t.Errorf("nats_msgs_in_total = %v ok=%v, want 17", s.Value, ok)
}
if s, ok := findNatsSample(got, "nats_connections"); !ok || s.Value != 1 {
t.Errorf("nats_connections = %v ok=%v, want 1", s.Value, ok)
}
}
// edge: connz inválido. No es error; nats_connections cae a varz.connections (1).
// varz/jsz siguen produciendo sus series.
func TestParseNatsMonitorInvalidConnz(t *testing.T) {
varz := mustRead(t, "testdata/nats_varz.json")
jsz := mustRead(t, "testdata/nats_jsz.json")
got, err := ParseNatsMonitor("probe", varz, []byte("not json"), jsz)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// fallback a varz.connections (= 1).
if s, ok := findNatsSample(got, "nats_connections"); !ok || s.Value != 1 {
t.Errorf("nats_connections = %v ok=%v, want 1 (fallback varz.connections)", s.Value, ok)
}
// jsz sigue vivo.
if s, ok := findNatsSample(got, "nats_jetstream_streams"); !ok || s.Value != 3 {
t.Errorf("nats_jetstream_streams = %v ok=%v, want 3", s.Value, ok)
}
}
// error path: varz inválido devuelve error no-nil (es el core, sin él no hay nada).
func TestParseNatsMonitorInvalidVarz(t *testing.T) {
if _, err := ParseNatsMonitor("probe", []byte("{{{"), nil, nil); err == nil {
t.Fatal("expected error for invalid varz, got nil")
}
}
+67
View File
@@ -0,0 +1,67 @@
package infra
import (
"encoding/json"
"fmt"
)
// unibusHealth refleja la respuesta JSON del endpoint /healthz de un nodo del
// cluster de mensajería unibus (membershipd). Forma verificada en producción:
//
// {"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}
type unibusHealth struct {
Status string `json:"status"`
Posture struct {
Enforce bool `json:"enforce"`
ACL bool `json:"acl"`
TLS bool `json:"tls"`
Cluster bool `json:"cluster"`
Store string `json:"store"`
} `json:"posture"`
}
// ParseUnibusHealth convierte la respuesta JSON del endpoint /healthz de un nodo
// del cluster de mensajería unibus en una serie de PromSample lista para empujar
// a VictoriaMetrics, sin instrumentar el bus (solo lee su endpoint de salud).
//
// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a cada serie
// como las labels "node" e "instance" para distinguir los nodos cuando un único
// exporter scrapea varios. La función SOLO debe llamarse cuando el nodo
// respondió: el caso "no responde" (unibus_up=0) lo emite el llamador, no esta
// función, porque sin cuerpo no hay nada que parsear.
//
// Devuelve siete series por nodo:
// - unibus_up = 1 (si el body parseó, el nodo respondió)
// - unibus_status_ok = 1 si status=="ok", si no 0
// - unibus_posture_enforce / _acl / _tls / _cluster = 1/0 según el booleano
// - unibus_store_kv = 1 si posture.store=="kv", si no 0
//
// Si el body no es JSON válido con la forma esperada, devuelve (nil, error).
func ParseUnibusHealth(node string, body []byte) ([]PromSample, error) {
var h unibusHealth
if err := json.Unmarshal(body, &h); err != nil {
return nil, fmt.Errorf("parse unibus healthz for node %q: %w", node, err)
}
b2f := func(b bool) float64 {
if b {
return 1
}
return 0
}
mk := func(name string, v float64) PromSample {
return PromSample{
Name: name,
Labels: map[string]string{"node": node, "instance": node},
Value: v,
}
}
return []PromSample{
mk("unibus_up", 1),
mk("unibus_status_ok", b2f(h.Status == "ok")),
mk("unibus_posture_enforce", b2f(h.Posture.Enforce)),
mk("unibus_posture_acl", b2f(h.Posture.ACL)),
mk("unibus_posture_tls", b2f(h.Posture.TLS)),
mk("unibus_posture_cluster", b2f(h.Posture.Cluster)),
mk("unibus_store_kv", b2f(h.Posture.Store == "kv")),
}, nil
}
+89
View File
@@ -0,0 +1,89 @@
---
name: parse_unibus_health
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ParseUnibusHealth(node string, body []byte) ([]PromSample, error)"
description: "Convierte la respuesta JSON del endpoint /healthz de un nodo del cluster de mensajería unibus (membershipd) en una serie de PromSample lista para empujar a VictoriaMetrics, sin instrumentar el bus: solo lee su endpoint de salud. Adjunta a cada serie las labels node e instance (= nombre lógico del nodo) para distinguir los nodos cuando un único exporter scrapea varios. Emite siete series por nodo: unibus_up, unibus_status_ok, unibus_posture_enforce/acl/tls/cluster y unibus_store_kv. Devuelve error si el body no es JSON válido con la forma esperada."
tags: [prometheus, metrics, unibus, nats, healthz, posture, fleet-metrics, infra, monitoring]
uses_functions: []
uses_types: ["PromSample_go_infra"]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["encoding/json", "fmt"]
params:
- name: node
desc: "nombre lógico del nodo (p.ej. \"magnus\"); se adjunta como labels node e instance a cada serie"
- name: body
desc: "cuerpo JSON crudo devuelto por GET https://<nodo>:8470/healthz, forma {\"posture\":{enforce,acl,tls,cluster bool; store string},\"status\":string}"
output: "slice de 7 PromSample con labels {node,instance}: unibus_up=1, unibus_status_ok (1 si status==ok), unibus_posture_enforce/acl/tls/cluster (1/0), unibus_store_kv (1 si posture.store==kv). Error si el body no es JSON válido."
tested: true
test_file_path: "functions/infra/parse_unibus_health_test.go"
tests:
- "TestParseUnibusHealthGolden"
- "TestParseUnibusHealthDegraded"
- "TestParseUnibusHealthInvalid"
---
# parse_unibus_health
Función pura de transformación (clasificada `impure` solo porque devuelve `error` al
fallar el unmarshal; no hace I/O ni red) que traduce la salud de un nodo del bus de
mensajería **unibus** a métricas Prometheus. Pertenece al grupo de capacidad
`fleet-metrics`: se compone con `format_prom_exposition_go_infra` (serializar) y
`push_prom_remote_go_infra` (empujar a VictoriaMetrics).
El endpoint `/healthz` de cada nodo (`membershipd`) responde, verificado en producción:
```json
{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}
```
## Ejemplo
```go
package main
import (
"fmt"
"time"
"fn-registry/functions/infra"
)
func main() {
body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`)
samples, err := infra.ParseUnibusHealth("magnus", body)
if err != nil {
panic(err)
}
// Serializa y (en un exporter real) empuja a VictoriaMetrics.
fmt.Print(infra.FormatPromExposition(samples, time.Now().UnixMilli()))
// unibus_up{instance="magnus",node="magnus"} 1 ...
// unibus_posture_enforce{instance="magnus",node="magnus"} 1 ...
}
```
## Cuando usarla
Úsala dentro de un exporter que monitoriza el cluster unibus: tras hacer
`GET https://<nodo>:8470/healthz` con la CA del cluster, pasa el cuerpo a esta función
para obtener las series del nodo. Llámala **solo cuando el nodo respondió**; si el GET
falla (timeout, TLS, no-2xx), emite tú `unibus_up=0` para ese nodo, porque sin cuerpo
no hay nada que parsear.
## Gotchas
- No emite `unibus_up=0`: ese caso (nodo caído) es responsabilidad del llamador, que sabe
si el GET falló. Esta función siempre emite `unibus_up=1` porque solo se la llama con un
cuerpo recibido.
- Las labels `node` e `instance` toman el mismo valor (el nombre lógico del nodo). El
`push_prom_remote_go_infra` añadiría `instance` vía `extra_label` por igual a todas las
series del body; por eso aquí ya se fija `instance` por-serie, para que cada nodo unibus
conserve su identidad cuando un solo exporter empuja los de varios nodos en un único POST.
- Solo lee la posture y el status que hoy expone `/healthz`. Métricas profundas de
NATS/JetStream (msgs/s, conexiones, RAFT leader por stream) NO salen de aquí: requieren
el monitoring embebido de NATS (puerto 8222), que en producción está cerrado.
@@ -0,0 +1,67 @@
package infra
import "testing"
// golden: nodo seguro con la posture homogénea esperada en producción.
func TestParseUnibusHealthGolden(t *testing.T) {
body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`)
got, err := ParseUnibusHealth("magnus", body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
want := map[string]float64{
"unibus_up": 1,
"unibus_status_ok": 1,
"unibus_posture_enforce": 1,
"unibus_posture_acl": 1,
"unibus_posture_tls": 1,
"unibus_posture_cluster": 1,
"unibus_store_kv": 1,
}
if len(got) != len(want) {
t.Fatalf("got %d samples, want %d", len(got), len(want))
}
for _, s := range got {
w, ok := want[s.Name]
if !ok {
t.Errorf("unexpected sample %q", s.Name)
continue
}
if s.Value != w {
t.Errorf("%s = %v, want %v", s.Name, s.Value, w)
}
if s.Labels["node"] != "magnus" || s.Labels["instance"] != "magnus" {
t.Errorf("%s labels = %v, want node=instance=magnus", s.Name, s.Labels)
}
}
}
// edge: nodo degradado (posture todo false, store distinto de kv, status != ok).
func TestParseUnibusHealthDegraded(t *testing.T) {
body := []byte(`{"posture":{"enforce":false,"acl":false,"tls":false,"cluster":false,"store":"sqlite"},"status":"degraded"}`)
got, err := ParseUnibusHealth("homer", body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
want := map[string]float64{
"unibus_up": 1,
"unibus_status_ok": 0,
"unibus_posture_enforce": 0,
"unibus_posture_acl": 0,
"unibus_posture_tls": 0,
"unibus_posture_cluster": 0,
"unibus_store_kv": 0,
}
for _, s := range got {
if s.Value != want[s.Name] {
t.Errorf("%s = %v, want %v", s.Name, s.Value, want[s.Name])
}
}
}
// error path: body que no es JSON válido devuelve error, no panic.
func TestParseUnibusHealthInvalid(t *testing.T) {
if _, err := ParseUnibusHealth("datardos", []byte("not json at all")); err == nil {
t.Fatal("expected error for invalid body, got nil")
}
}
+30
View File
@@ -0,0 +1,30 @@
{
"server_id": "NC23B47RQSJYPX5AIUC5CA3ND5RLCYREKSAFLM65MLBY5PBRIXPAFL7O",
"now": "2026-06-07T19:02:25.326833943Z",
"num_connections": 1,
"total": 1,
"offset": 0,
"limit": 1024,
"connections": [
{
"cid": 5,
"kind": "Client",
"type": "nats",
"ip": "127.0.0.1",
"port": 52734,
"start": "2026-06-07T21:02:24.812382826+02:00",
"last_activity": "2026-06-07T21:02:24.821005187+02:00",
"rtt": "623µs",
"uptime": "0s",
"idle": "0s",
"pending_bytes": 0,
"in_msgs": 17,
"out_msgs": 17,
"in_bytes": 1304,
"out_bytes": 3905,
"subscriptions": 2,
"lang": "go",
"version": "1.49.0"
}
]
}
+97
View File
@@ -0,0 +1,97 @@
{
"memory": 0,
"storage": 310,
"reserved_memory": 0,
"reserved_storage": 0,
"accounts": 1,
"ha_assets": 0,
"api": {
"level": 1,
"total": 6,
"errors": 0
},
"server_id": "NC23B47RQSJYPX5AIUC5CA3ND5RLCYREKSAFLM65MLBY5PBRIXPAFL7O",
"now": "2026-06-07T19:02:25.327216549Z",
"config": {
"max_memory": 3221225472,
"max_storage": 546399169536,
"store_dir": "/tmp/natsprobe4019469486/jetstream",
"sync_interval": 120000000000
},
"limits": {},
"streams": 3,
"consumers": 0,
"messages": 6,
"bytes": 310,
"account_details": [
{
"name": "$G",
"id": "$G",
"memory": 0,
"storage": 310,
"reserved_memory": 18446744073709551615,
"reserved_storage": 18446744073709551615,
"accounts": 0,
"ha_assets": 0,
"api": {
"level": 0,
"total": 6,
"errors": 0
},
"stream_detail": [
{
"name": "KV_UNIBUS_rooms",
"created": "2026-06-07T19:02:24.8170934Z",
"cluster": {
"leader": "probe"
},
"state": {
"messages": 2,
"bytes": 102,
"first_seq": 1,
"first_ts": "2026-06-07T19:02:24.817910599Z",
"last_seq": 2,
"last_ts": "2026-06-07T19:02:24.818011867Z",
"num_subjects": 2,
"consumer_count": 0
}
},
{
"name": "KV_UNIBUS_members",
"created": "2026-06-07T19:02:24.818494147Z",
"cluster": {
"leader": "probe"
},
"state": {
"messages": 2,
"bytes": 106,
"first_seq": 1,
"first_ts": "2026-06-07T19:02:24.81917932Z",
"last_seq": 2,
"last_ts": "2026-06-07T19:02:24.819283444Z",
"num_subjects": 2,
"consumer_count": 0
}
},
{
"name": "KV_UNIBUS_users",
"created": "2026-06-07T19:02:24.814500069Z",
"cluster": {
"leader": "probe"
},
"state": {
"messages": 2,
"bytes": 102,
"first_seq": 1,
"first_ts": "2026-06-07T19:02:24.81638123Z",
"last_seq": 2,
"last_ts": "2026-06-07T19:02:24.816570377Z",
"num_subjects": 2,
"consumer_count": 0
}
}
]
}
],
"total": 1
}
+80
View File
@@ -0,0 +1,80 @@
{
"server_id": "NC23B47RQSJYPX5AIUC5CA3ND5RLCYREKSAFLM65MLBY5PBRIXPAFL7O",
"server_name": "probe",
"version": "2.11.15",
"proto": 1,
"go": "go1.26.4",
"host": "127.0.0.1",
"port": 14260,
"max_connections": 65536,
"ping_interval": 120000000000,
"ping_max": 2,
"http_host": "127.0.0.1",
"http_port": 8222,
"http_base_path": "",
"https_port": 0,
"auth_timeout": 2,
"max_control_line": 4096,
"max_payload": 1048576,
"max_pending": 67108864,
"cluster": {},
"gateway": {},
"leaf": {},
"mqtt": {},
"websocket": {},
"jetstream": {
"config": {
"max_memory": 3221225472,
"max_storage": 546399169536,
"store_dir": "/tmp/natsprobe4019469486/jetstream",
"sync_interval": 120000000000
},
"stats": {
"memory": 0,
"storage": 310,
"reserved_memory": 0,
"reserved_storage": 0,
"accounts": 1,
"ha_assets": 0,
"api": {
"level": 1,
"total": 6,
"errors": 0
}
},
"limits": {}
},
"tls_timeout": 2,
"write_deadline": 10000000000,
"start": "2026-06-07T19:02:24.785745698Z",
"now": "2026-06-07T19:02:25.325501038Z",
"uptime": "0s",
"mem": 18288640,
"cores": 24,
"gomaxprocs": 24,
"gomemlimit": 4294967296,
"cpu": 0,
"connections": 1,
"total_connections": 1,
"routes": 0,
"remotes": 0,
"leafnodes": 0,
"in_msgs": 17,
"out_msgs": 17,
"in_bytes": 1304,
"out_bytes": 3905,
"slow_consumers": 0,
"subscriptions": 75,
"http_req_stats": {
"/varz": 1
},
"config_load_time": "2026-06-07T19:02:24.785745698Z",
"config_digest": "",
"system_account": "$SYS",
"slow_consumer_stats": {
"clients": 0,
"routes": 0,
"gateways": 0,
"leafs": 0
}
}