Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e769836b0d | |||
| 93756fbd0c | |||
| 0a6d1b8d17 | |||
| 82f1f1bd58 |
@@ -2,7 +2,8 @@
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"Bash(CGO_ENABLED=1 go test *)",
|
||||
"Bash(sqlite3 *)"
|
||||
"Bash(sqlite3 *)",
|
||||
"Read(//home/enmanuel/.claude/**)"
|
||||
]
|
||||
},
|
||||
"enabledMcpjsonServers": [
|
||||
|
||||
@@ -72,28 +72,13 @@ full_git_push() {
|
||||
echo " [skip] GITEA_URL/GITEA_TOKEN no disponibles — omitiendo auto-init" >&2
|
||||
fi
|
||||
|
||||
# Redescubrir repos tras posibles inicializaciones
|
||||
# Redescubrir repos tras posibles inicializaciones.
|
||||
# El repo de config de Claude (dataforge/repo_Claude, al que apuntan los
|
||||
# symlinks de ~/.claude/) vive en fn_registry/external/repo_Claude, asi que
|
||||
# discover_git_repos ya lo encuentra y pasa por scan-secrets/commit/push
|
||||
# como un repo mas. No necesita tratamiento especial.
|
||||
repos=$(discover_git_repos "$registry_root")
|
||||
|
||||
# --- Paso 1c: Incluir el repo de configuracion de Claude ---
|
||||
# Los archivos de ~/.claude/ (settings.json, commands, skills, CLAUDE.md...)
|
||||
# son symlinks a un repo git externo (dataforge/repo_Claude). Lo resolvemos
|
||||
# de forma portable siguiendo el symlink de settings.json — sin hardcodear
|
||||
# el path, que difiere entre PCs. Si resuelve a un repo git, lo anadimos a
|
||||
# la lista para que pase por scan-secrets + auto-commit + push como los demas.
|
||||
local claude_repo=""
|
||||
if [[ -L "$HOME/.claude/settings.json" ]]; then
|
||||
local _claude_settings_real
|
||||
_claude_settings_real=$(readlink -f "$HOME/.claude/settings.json" 2>/dev/null || true)
|
||||
if [[ -n "$_claude_settings_real" ]]; then
|
||||
claude_repo=$(git -C "$(dirname "$_claude_settings_real")" rev-parse --show-toplevel 2>/dev/null || true)
|
||||
fi
|
||||
fi
|
||||
if [[ -n "$claude_repo" && -d "$claude_repo/.git" ]]; then
|
||||
echo "[1c] Incluyendo repo de config Claude: $claude_repo" >&2
|
||||
repos="$repos"$'\n'"$claude_repo"
|
||||
fi
|
||||
|
||||
# --- Paso 2: Escanear secrets ---
|
||||
echo "" >&2
|
||||
echo "[2/6] Escaneando secrets en dirty trees..." >&2
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// natsVarz refleja los campos relevantes de la respuesta JSON del endpoint
|
||||
// /varz del monitoring HTTP embebido de un nats-server (puerto 8222, loopback).
|
||||
// Solo se mapean los campos que producen series; el resto se ignora.
|
||||
type natsVarz struct {
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
Connections int `json:"connections"`
|
||||
SlowConsumers int `json:"slow_consumers"`
|
||||
Subscriptions int `json:"subscriptions"`
|
||||
Mem int64 `json:"mem"`
|
||||
Start string `json:"start"`
|
||||
}
|
||||
|
||||
// natsConnz refleja los campos relevantes de /connz.
|
||||
type natsConnz struct {
|
||||
NumConnections int `json:"num_connections"`
|
||||
}
|
||||
|
||||
// natsStreamDetail refleja un stream dentro de account_details[].stream_detail[].
|
||||
type natsStreamDetail struct {
|
||||
Name string `json:"name"`
|
||||
Cluster struct {
|
||||
Leader string `json:"leader"`
|
||||
} `json:"cluster"`
|
||||
State struct {
|
||||
Messages int64 `json:"messages"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
} `json:"state"`
|
||||
}
|
||||
|
||||
// natsJsz refleja los campos relevantes de /jsz?streams=1.
|
||||
type natsJsz struct {
|
||||
Streams int64 `json:"streams"`
|
||||
Messages int64 `json:"messages"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
Memory int64 `json:"memory"`
|
||||
Storage int64 `json:"storage"`
|
||||
AccountDetails []struct {
|
||||
StreamDetail []natsStreamDetail `json:"stream_detail"`
|
||||
} `json:"account_details"`
|
||||
}
|
||||
|
||||
// ParseNatsMonitor convierte las respuestas JSON del endpoint de monitoring HTTP
|
||||
// embebido de un nats-server (puerto 8222, loopback) en una serie de PromSample
|
||||
// lista para empujar a VictoriaMetrics. Es la hermana de ParseUnibusHealth para
|
||||
// las métricas server-level de NATS/JetStream (msgs/s, conexiones, KV bucket
|
||||
// msgs, RAFT leader por stream, memoria). La consume el unibus_exporter de
|
||||
// fleet_monitoring en modo scraper local por nodo.
|
||||
//
|
||||
// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a CADA serie
|
||||
// como las labels "node" e "instance" para distinguir los nodos cuando un único
|
||||
// exporter scrapea varios.
|
||||
//
|
||||
// varz, connz y jsz son los cuerpos crudos de GET /varz, GET /connz y
|
||||
// GET /jsz?streams=1 respectivamente:
|
||||
// - varz es el core: si NO parsea como JSON válido devuelve (nil, error).
|
||||
// - connz y jsz son best-effort: si vienen vacíos o no parsean, sus series se
|
||||
// omiten sin abortar (no error), para que el scraper resista que un endpoint
|
||||
// falle. nats_connections cae a varz.connections cuando connz no parsea.
|
||||
func ParseNatsMonitor(node string, varz, connz, jsz []byte) ([]PromSample, error) {
|
||||
var v natsVarz
|
||||
if err := json.Unmarshal(varz, &v); err != nil {
|
||||
return nil, fmt.Errorf("parse nats varz for node %q: %w", node, err)
|
||||
}
|
||||
|
||||
// mk construye un PromSample con las labels base {node, instance} más, de
|
||||
// forma opcional, labels extra (clave/valor alternados). Las labels base no
|
||||
// se pueden sobreescribir desde extra.
|
||||
mk := func(name string, val float64, extra ...string) PromSample {
|
||||
labels := map[string]string{"node": node, "instance": node}
|
||||
for i := 0; i+1 < len(extra); i += 2 {
|
||||
labels[extra[i]] = extra[i+1]
|
||||
}
|
||||
return PromSample{Name: name, Labels: labels, Value: val}
|
||||
}
|
||||
|
||||
out := []PromSample{
|
||||
mk("nats_msgs_in_total", float64(v.InMsgs)),
|
||||
mk("nats_msgs_out_total", float64(v.OutMsgs)),
|
||||
mk("nats_bytes_in_total", float64(v.InBytes)),
|
||||
mk("nats_bytes_out_total", float64(v.OutBytes)),
|
||||
}
|
||||
|
||||
// nats_connections: prefiere connz.num_connections; si connz no parsea, cae
|
||||
// a varz.connections para no perder la serie.
|
||||
connections := float64(v.Connections)
|
||||
if len(connz) > 0 {
|
||||
var c natsConnz
|
||||
if err := json.Unmarshal(connz, &c); err == nil {
|
||||
connections = float64(c.NumConnections)
|
||||
}
|
||||
}
|
||||
out = append(out,
|
||||
mk("nats_connections", connections),
|
||||
mk("nats_slow_consumers", float64(v.SlowConsumers)),
|
||||
mk("nats_mem_bytes", float64(v.Mem)),
|
||||
mk("nats_subscriptions", float64(v.Subscriptions)),
|
||||
)
|
||||
|
||||
// nats_server_start_seconds: epoch (segundos Unix) del campo start (RFC3339).
|
||||
// Proxy de reinicios del nats-server: un cambio de este valor = el server
|
||||
// reinició. Si el parse de la fecha falla, se omite la serie (no se aborta).
|
||||
if t, err := time.Parse(time.RFC3339, v.Start); err == nil {
|
||||
out = append(out, mk("nats_server_start_seconds", float64(t.Unix())))
|
||||
}
|
||||
|
||||
// jsz es best-effort: si vacío o inválido, se omiten todas sus series.
|
||||
if len(jsz) > 0 {
|
||||
var j natsJsz
|
||||
if err := json.Unmarshal(jsz, &j); err == nil {
|
||||
out = append(out,
|
||||
mk("nats_jetstream_streams", float64(j.Streams)),
|
||||
mk("nats_jetstream_messages", float64(j.Messages)),
|
||||
mk("nats_jetstream_bytes", float64(j.Bytes)),
|
||||
mk("nats_jetstream_memory_bytes", float64(j.Memory)),
|
||||
mk("nats_jetstream_storage_bytes", float64(j.Storage)),
|
||||
)
|
||||
for _, acc := range j.AccountDetails {
|
||||
for _, sd := range acc.StreamDetail {
|
||||
out = append(out,
|
||||
mk("nats_stream_messages", float64(sd.State.Messages), "stream", sd.Name),
|
||||
mk("nats_stream_bytes", float64(sd.State.Bytes), "stream", sd.Name),
|
||||
)
|
||||
leader := 0.0
|
||||
if sd.Cluster.Leader == node {
|
||||
leader = 1
|
||||
}
|
||||
out = append(out, mk("nats_jetstream_raft_leader", leader, "stream", sd.Name))
|
||||
|
||||
if bucket, ok := strings.CutPrefix(sd.Name, "KV_"); ok {
|
||||
out = append(out, mk("kv_bucket_msgs", float64(sd.State.Messages), "bucket", bucket))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
---
|
||||
name: parse_nats_monitor
|
||||
kind: function
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "func ParseNatsMonitor(node string, varz, connz, jsz []byte) ([]PromSample, error)"
|
||||
description: "Convierte las respuestas JSON del endpoint de monitoring HTTP embebido de un nats-server (puerto 8222, loopback) en una serie de PromSample lista para empujar a VictoriaMetrics. Hermana de ParseUnibusHealth pero para las métricas server-level de NATS/JetStream: msgs/s, bytes, conexiones, slow consumers, memoria RSS, start epoch (proxy de reinicios), streams/messages/bytes/memory/storage de JetStream, y por stream nats_stream_messages/bytes, nats_jetstream_raft_leader y kv_bucket_msgs para los buckets KV_. Adjunta labels node e instance a cada serie. varz es el core (error si no parsea); connz y jsz son best-effort (se omiten sin abortar). La consume el unibus_exporter de fleet_monitoring como scraper local por nodo."
|
||||
tags: [prometheus, metrics, nats, jetstream, monitoring, varz, connz, jsz, kv, raft, fleet-metrics, infra]
|
||||
uses_functions: []
|
||||
uses_types: ["PromSample_go_infra"]
|
||||
returns: []
|
||||
returns_optional: true
|
||||
error_type: "error_go_core"
|
||||
imports: ["encoding/json", "fmt", "strings", "time"]
|
||||
params:
|
||||
- name: node
|
||||
desc: "nombre lógico del nodo (p.ej. \"magnus\"); se adjunta como labels node e instance a CADA serie y se compara con cluster.leader de cada stream para nats_jetstream_raft_leader"
|
||||
- name: varz
|
||||
desc: "cuerpo JSON crudo de GET http://127.0.0.1:8222/varz; core de la función (in_msgs, out_msgs, in_bytes, out_bytes, connections, slow_consumers, subscriptions, mem, start). Si no parsea, la función devuelve error"
|
||||
- name: connz
|
||||
desc: "cuerpo JSON crudo de GET http://127.0.0.1:8222/connz; best-effort (num_connections). Si vacío o inválido, nats_connections cae a varz.connections sin abortar"
|
||||
- name: jsz
|
||||
desc: "cuerpo JSON crudo de GET http://127.0.0.1:8222/jsz?streams=1; best-effort (streams, messages, bytes, memory, storage y account_details[].stream_detail[]). Si vacío o inválido, se omiten sus series sin abortar. Necesita ?streams=1 para traer stream_detail"
|
||||
output: "slice de PromSample con labels base {node,instance}: nats_msgs_in/out_total, nats_bytes_in/out_total, nats_connections, nats_slow_consumers, nats_mem_bytes, nats_subscriptions, nats_server_start_seconds (omitida si start no parsea), nats_jetstream_streams/messages/bytes/memory_bytes/storage_bytes; y por stream nats_stream_messages{stream}, nats_stream_bytes{stream}, nats_jetstream_raft_leader{stream} (1 si cluster.leader==node) y, para streams KV_, kv_bucket_msgs{bucket} con el prefijo KV_ recortado. Error solo si varz no es JSON válido."
|
||||
tested: true
|
||||
test_file_path: "functions/infra/parse_nats_monitor_test.go"
|
||||
tests:
|
||||
- "TestParseNatsMonitorGolden"
|
||||
- "TestParseNatsMonitorEmptyJsz"
|
||||
- "TestParseNatsMonitorInvalidConnz"
|
||||
- "TestParseNatsMonitorInvalidVarz"
|
||||
---
|
||||
|
||||
# parse_nats_monitor
|
||||
|
||||
Función de transformación (clasificada `impure` porque devuelve `error` al fallar el
|
||||
unmarshal del core; no hace I/O ni red por sí misma) que traduce las métricas
|
||||
server-level de un **nats-server** a series Prometheus. Es la hermana de
|
||||
`parse_unibus_health_go_infra`: aquella lee el `/healthz` de `membershipd` (posture),
|
||||
esta lee el monitoring embebido de NATS (puerto 8222) para las métricas profundas que
|
||||
`/healthz` no expone: msgs/s, conexiones, RAFT leader por stream, memoria, KV buckets.
|
||||
|
||||
Pertenece al grupo de capacidad `fleet-metrics`: se compone con
|
||||
`format_prom_exposition_go_infra` (serializar) y `push_prom_remote_go_infra` (empujar a
|
||||
VictoriaMetrics). La consume el `unibus_exporter` de `fleet_monitoring` en modo scraper
|
||||
local por nodo, que hace los tres GET y le pasa los cuerpos crudos.
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
func get(url string) []byte {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return nil // best-effort: connz/jsz pueden faltar
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return b
|
||||
}
|
||||
|
||||
func main() {
|
||||
base := "http://127.0.0.1:8222"
|
||||
varz := get(base + "/varz")
|
||||
connz := get(base + "/connz")
|
||||
jsz := get(base + "/jsz?streams=1")
|
||||
|
||||
samples, err := infra.ParseNatsMonitor("magnus", varz, connz, jsz)
|
||||
if err != nil {
|
||||
panic(err) // varz es el core: sin él no hay métricas
|
||||
}
|
||||
fmt.Print(infra.FormatPromExposition(samples, time.Now().UnixMilli()))
|
||||
// nats_msgs_in_total{instance="magnus",node="magnus"} 17 ...
|
||||
// kv_bucket_msgs{bucket="UNIBUS_users",instance="magnus",node="magnus"} 2 ...
|
||||
// nats_jetstream_raft_leader{instance="magnus",node="magnus",stream="KV_UNIBUS_users"} 1 ...
|
||||
}
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Úsala dentro de un exporter que monitoriza un nats-server con el monitoring HTTP
|
||||
embebido activado (`http: 127.0.0.1:8222` en la config de NATS): tras hacer
|
||||
`GET /varz`, `GET /connz` y `GET /jsz?streams=1` contra loopback, pasa los tres cuerpos
|
||||
crudos a esta función para obtener todas las series server-level del nodo. Llámala como
|
||||
scraper local por nodo (cada nodo expone su 8222 solo en loopback), no centralizado.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **Impura por contrato**: solo devuelve `error` si `varz` no es JSON válido (es el core).
|
||||
`connz` y `jsz` son **best-effort**: si vienen vacíos o no parsean, sus series se omiten
|
||||
sin abortar. Esto hace al scraper resistente a que un endpoint falle de forma puntual.
|
||||
- **Monitoring loopback-only sin auth**: el puerto 8222 de NATS no tiene autenticación; por
|
||||
eso debe bindearse a `127.0.0.1` y scrapearse localmente en cada nodo, nunca exponerse a
|
||||
la red. El push agregado a VictoriaMetrics lo hace el exporter, no esta función.
|
||||
- **`/jsz` necesita `?streams=1`** para traer `account_details[].stream_detail[]`. Sin ese
|
||||
parámetro el cuerpo trae los totales pero no el detalle por stream, y entonces no salen
|
||||
`nats_stream_*`, `nats_jetstream_raft_leader` ni `kv_bucket_msgs`.
|
||||
- **`nats_connections`**: prefiere `connz.num_connections`; si `connz` no parsea, cae a
|
||||
`varz.connections` para no perder la serie.
|
||||
- **RAFT leader en standalone**: en un nats-server sin clúster, el objeto `cluster` puede
|
||||
faltar o `leader` venir vacío; en ese caso `nats_jetstream_raft_leader` sale 0 salvo que
|
||||
`cluster.leader == node`. Es esperado: en standalone no hay quorum RAFT real.
|
||||
- **`kv_bucket_msgs`** solo se emite para streams cuyo nombre empieza por `KV_`, recortando
|
||||
el prefijo (stream `KV_UNIBUS_users` → bucket `UNIBUS_users`).
|
||||
- **`nats_server_start_seconds`** es el epoch Unix del campo `start` (RFC3339): sirve como
|
||||
proxy de reinicios (un cambio de valor = el server reinició). Si el campo no parsea como
|
||||
fecha válida, la serie se omite en lugar de abortar.
|
||||
@@ -0,0 +1,160 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// findNatsSample devuelve el primer PromSample cuyo Name coincide y cuyos labels
|
||||
// extra (clave/valor alternados) están todos presentes con el valor esperado.
|
||||
// El segundo retorno indica si se encontró.
|
||||
func findNatsSample(samples []PromSample, name string, labels ...string) (PromSample, bool) {
|
||||
for _, s := range samples {
|
||||
if s.Name != name {
|
||||
continue
|
||||
}
|
||||
match := true
|
||||
for i := 0; i+1 < len(labels); i += 2 {
|
||||
if s.Labels[labels[i]] != labels[i+1] {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
return s, true
|
||||
}
|
||||
}
|
||||
return PromSample{}, false
|
||||
}
|
||||
|
||||
func mustRead(t *testing.T, path string) []byte {
|
||||
t.Helper()
|
||||
b, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatalf("read fixture %s: %v", path, err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// golden: fixtures reales de un nats-server 2.11.15, node="probe" (== el leader
|
||||
// de los streams), valores concretos verificados a mano.
|
||||
func TestParseNatsMonitorGolden(t *testing.T) {
|
||||
varz := mustRead(t, "testdata/nats_varz.json")
|
||||
connz := mustRead(t, "testdata/nats_connz.json")
|
||||
jsz := mustRead(t, "testdata/nats_jsz.json")
|
||||
|
||||
got, err := ParseNatsMonitor("probe", varz, connz, jsz)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
want := map[string]float64{
|
||||
"nats_msgs_in_total": 17,
|
||||
"nats_msgs_out_total": 17,
|
||||
"nats_mem_bytes": 18288640,
|
||||
"nats_jetstream_streams": 3,
|
||||
"nats_connections": 1,
|
||||
"nats_jetstream_messages": 6,
|
||||
}
|
||||
for name, w := range want {
|
||||
s, ok := findNatsSample(got, name)
|
||||
if !ok {
|
||||
t.Errorf("missing sample %q", name)
|
||||
continue
|
||||
}
|
||||
if s.Value != w {
|
||||
t.Errorf("%s = %v, want %v", name, s.Value, w)
|
||||
}
|
||||
if s.Labels["node"] != "probe" || s.Labels["instance"] != "probe" {
|
||||
t.Errorf("%s labels = %v, want node=instance=probe", name, s.Labels)
|
||||
}
|
||||
}
|
||||
|
||||
// kv_bucket_msgs por cada KV bucket (prefijo KV_ recortado).
|
||||
for bucket, w := range map[string]float64{
|
||||
"UNIBUS_users": 2,
|
||||
"UNIBUS_rooms": 2,
|
||||
"UNIBUS_members": 2,
|
||||
} {
|
||||
s, ok := findNatsSample(got, "kv_bucket_msgs", "bucket", bucket)
|
||||
if !ok {
|
||||
t.Errorf("missing kv_bucket_msgs{bucket=%q}", bucket)
|
||||
continue
|
||||
}
|
||||
if s.Value != w {
|
||||
t.Errorf("kv_bucket_msgs{bucket=%q} = %v, want %v", bucket, s.Value, w)
|
||||
}
|
||||
}
|
||||
|
||||
// raft leader: probe == node, así que el stream KV_UNIBUS_users tiene leader=1.
|
||||
s, ok := findNatsSample(got, "nats_jetstream_raft_leader", "stream", "KV_UNIBUS_users")
|
||||
if !ok {
|
||||
t.Fatal("missing nats_jetstream_raft_leader{stream=KV_UNIBUS_users}")
|
||||
}
|
||||
if s.Value != 1 {
|
||||
t.Errorf("nats_jetstream_raft_leader{stream=KV_UNIBUS_users} = %v, want 1", s.Value)
|
||||
}
|
||||
|
||||
// stream_detail también emite nats_stream_messages con label stream completo.
|
||||
if s, ok := findNatsSample(got, "nats_stream_messages", "stream", "KV_UNIBUS_users"); !ok || s.Value != 2 {
|
||||
t.Errorf("nats_stream_messages{stream=KV_UNIBUS_users} = %v ok=%v, want 2", s.Value, ok)
|
||||
}
|
||||
|
||||
// nats_server_start_seconds presente (start es RFC3339 válido).
|
||||
if _, ok := findNatsSample(got, "nats_server_start_seconds"); !ok {
|
||||
t.Error("missing nats_server_start_seconds (start is a valid RFC3339)")
|
||||
}
|
||||
}
|
||||
|
||||
// edge: jsz sin streams ni account_details. No produce series kv_bucket_msgs ni
|
||||
// nats_stream_*, pero sí las de varz/connz y las jetstream top-level (en 0).
|
||||
func TestParseNatsMonitorEmptyJsz(t *testing.T) {
|
||||
varz := mustRead(t, "testdata/nats_varz.json")
|
||||
connz := mustRead(t, "testdata/nats_connz.json")
|
||||
jsz := []byte(`{"streams":0,"account_details":[]}`)
|
||||
|
||||
got, err := ParseNatsMonitor("probe", varz, connz, jsz)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if _, ok := findNatsSample(got, "kv_bucket_msgs", "bucket", "UNIBUS_users"); ok {
|
||||
t.Error("did not expect kv_bucket_msgs with empty account_details")
|
||||
}
|
||||
if _, ok := findNatsSample(got, "nats_stream_messages"); ok {
|
||||
t.Error("did not expect nats_stream_messages with empty account_details")
|
||||
}
|
||||
// varz/connz siguen presentes.
|
||||
if s, ok := findNatsSample(got, "nats_msgs_in_total"); !ok || s.Value != 17 {
|
||||
t.Errorf("nats_msgs_in_total = %v ok=%v, want 17", s.Value, ok)
|
||||
}
|
||||
if s, ok := findNatsSample(got, "nats_connections"); !ok || s.Value != 1 {
|
||||
t.Errorf("nats_connections = %v ok=%v, want 1", s.Value, ok)
|
||||
}
|
||||
}
|
||||
|
||||
// edge: connz inválido. No es error; nats_connections cae a varz.connections (1).
|
||||
// varz/jsz siguen produciendo sus series.
|
||||
func TestParseNatsMonitorInvalidConnz(t *testing.T) {
|
||||
varz := mustRead(t, "testdata/nats_varz.json")
|
||||
jsz := mustRead(t, "testdata/nats_jsz.json")
|
||||
|
||||
got, err := ParseNatsMonitor("probe", varz, []byte("not json"), jsz)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
// fallback a varz.connections (= 1).
|
||||
if s, ok := findNatsSample(got, "nats_connections"); !ok || s.Value != 1 {
|
||||
t.Errorf("nats_connections = %v ok=%v, want 1 (fallback varz.connections)", s.Value, ok)
|
||||
}
|
||||
// jsz sigue vivo.
|
||||
if s, ok := findNatsSample(got, "nats_jetstream_streams"); !ok || s.Value != 3 {
|
||||
t.Errorf("nats_jetstream_streams = %v ok=%v, want 3", s.Value, ok)
|
||||
}
|
||||
}
|
||||
|
||||
// error path: varz inválido devuelve error no-nil (es el core, sin él no hay nada).
|
||||
func TestParseNatsMonitorInvalidVarz(t *testing.T) {
|
||||
if _, err := ParseNatsMonitor("probe", []byte("{{{"), nil, nil); err == nil {
|
||||
t.Fatal("expected error for invalid varz, got nil")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// unibusHealth refleja la respuesta JSON del endpoint /healthz de un nodo del
|
||||
// cluster de mensajería unibus (membershipd). Forma verificada en producción:
|
||||
//
|
||||
// {"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}
|
||||
type unibusHealth struct {
|
||||
Status string `json:"status"`
|
||||
Posture struct {
|
||||
Enforce bool `json:"enforce"`
|
||||
ACL bool `json:"acl"`
|
||||
TLS bool `json:"tls"`
|
||||
Cluster bool `json:"cluster"`
|
||||
Store string `json:"store"`
|
||||
} `json:"posture"`
|
||||
}
|
||||
|
||||
// ParseUnibusHealth convierte la respuesta JSON del endpoint /healthz de un nodo
|
||||
// del cluster de mensajería unibus en una serie de PromSample lista para empujar
|
||||
// a VictoriaMetrics, sin instrumentar el bus (solo lee su endpoint de salud).
|
||||
//
|
||||
// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a cada serie
|
||||
// como las labels "node" e "instance" para distinguir los nodos cuando un único
|
||||
// exporter scrapea varios. La función SOLO debe llamarse cuando el nodo
|
||||
// respondió: el caso "no responde" (unibus_up=0) lo emite el llamador, no esta
|
||||
// función, porque sin cuerpo no hay nada que parsear.
|
||||
//
|
||||
// Devuelve siete series por nodo:
|
||||
// - unibus_up = 1 (si el body parseó, el nodo respondió)
|
||||
// - unibus_status_ok = 1 si status=="ok", si no 0
|
||||
// - unibus_posture_enforce / _acl / _tls / _cluster = 1/0 según el booleano
|
||||
// - unibus_store_kv = 1 si posture.store=="kv", si no 0
|
||||
//
|
||||
// Si el body no es JSON válido con la forma esperada, devuelve (nil, error).
|
||||
func ParseUnibusHealth(node string, body []byte) ([]PromSample, error) {
|
||||
var h unibusHealth
|
||||
if err := json.Unmarshal(body, &h); err != nil {
|
||||
return nil, fmt.Errorf("parse unibus healthz for node %q: %w", node, err)
|
||||
}
|
||||
b2f := func(b bool) float64 {
|
||||
if b {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
mk := func(name string, v float64) PromSample {
|
||||
return PromSample{
|
||||
Name: name,
|
||||
Labels: map[string]string{"node": node, "instance": node},
|
||||
Value: v,
|
||||
}
|
||||
}
|
||||
return []PromSample{
|
||||
mk("unibus_up", 1),
|
||||
mk("unibus_status_ok", b2f(h.Status == "ok")),
|
||||
mk("unibus_posture_enforce", b2f(h.Posture.Enforce)),
|
||||
mk("unibus_posture_acl", b2f(h.Posture.ACL)),
|
||||
mk("unibus_posture_tls", b2f(h.Posture.TLS)),
|
||||
mk("unibus_posture_cluster", b2f(h.Posture.Cluster)),
|
||||
mk("unibus_store_kv", b2f(h.Posture.Store == "kv")),
|
||||
}, nil
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
---
|
||||
name: parse_unibus_health
|
||||
kind: function
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "func ParseUnibusHealth(node string, body []byte) ([]PromSample, error)"
|
||||
description: "Convierte la respuesta JSON del endpoint /healthz de un nodo del cluster de mensajería unibus (membershipd) en una serie de PromSample lista para empujar a VictoriaMetrics, sin instrumentar el bus: solo lee su endpoint de salud. Adjunta a cada serie las labels node e instance (= nombre lógico del nodo) para distinguir los nodos cuando un único exporter scrapea varios. Emite siete series por nodo: unibus_up, unibus_status_ok, unibus_posture_enforce/acl/tls/cluster y unibus_store_kv. Devuelve error si el body no es JSON válido con la forma esperada."
|
||||
tags: [prometheus, metrics, unibus, nats, healthz, posture, fleet-metrics, infra, monitoring]
|
||||
uses_functions: []
|
||||
uses_types: ["PromSample_go_infra"]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: ["encoding/json", "fmt"]
|
||||
params:
|
||||
- name: node
|
||||
desc: "nombre lógico del nodo (p.ej. \"magnus\"); se adjunta como labels node e instance a cada serie"
|
||||
- name: body
|
||||
desc: "cuerpo JSON crudo devuelto por GET https://<nodo>:8470/healthz, forma {\"posture\":{enforce,acl,tls,cluster bool; store string},\"status\":string}"
|
||||
output: "slice de 7 PromSample con labels {node,instance}: unibus_up=1, unibus_status_ok (1 si status==ok), unibus_posture_enforce/acl/tls/cluster (1/0), unibus_store_kv (1 si posture.store==kv). Error si el body no es JSON válido."
|
||||
tested: true
|
||||
test_file_path: "functions/infra/parse_unibus_health_test.go"
|
||||
tests:
|
||||
- "TestParseUnibusHealthGolden"
|
||||
- "TestParseUnibusHealthDegraded"
|
||||
- "TestParseUnibusHealthInvalid"
|
||||
---
|
||||
|
||||
# parse_unibus_health
|
||||
|
||||
Función pura de transformación (clasificada `impure` solo porque devuelve `error` al
|
||||
fallar el unmarshal; no hace I/O ni red) que traduce la salud de un nodo del bus de
|
||||
mensajería **unibus** a métricas Prometheus. Pertenece al grupo de capacidad
|
||||
`fleet-metrics`: se compone con `format_prom_exposition_go_infra` (serializar) y
|
||||
`push_prom_remote_go_infra` (empujar a VictoriaMetrics).
|
||||
|
||||
El endpoint `/healthz` de cada nodo (`membershipd`) responde, verificado en producción:
|
||||
|
||||
```json
|
||||
{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}
|
||||
```
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
func main() {
|
||||
body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`)
|
||||
samples, err := infra.ParseUnibusHealth("magnus", body)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Serializa y (en un exporter real) empuja a VictoriaMetrics.
|
||||
fmt.Print(infra.FormatPromExposition(samples, time.Now().UnixMilli()))
|
||||
// unibus_up{instance="magnus",node="magnus"} 1 ...
|
||||
// unibus_posture_enforce{instance="magnus",node="magnus"} 1 ...
|
||||
}
|
||||
```
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Úsala dentro de un exporter que monitoriza el cluster unibus: tras hacer
|
||||
`GET https://<nodo>:8470/healthz` con la CA del cluster, pasa el cuerpo a esta función
|
||||
para obtener las series del nodo. Llámala **solo cuando el nodo respondió**; si el GET
|
||||
falla (timeout, TLS, no-2xx), emite tú `unibus_up=0` para ese nodo, porque sin cuerpo
|
||||
no hay nada que parsear.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- No emite `unibus_up=0`: ese caso (nodo caído) es responsabilidad del llamador, que sabe
|
||||
si el GET falló. Esta función siempre emite `unibus_up=1` porque solo se la llama con un
|
||||
cuerpo recibido.
|
||||
- Las labels `node` e `instance` toman el mismo valor (el nombre lógico del nodo). El
|
||||
`push_prom_remote_go_infra` añadiría `instance` vía `extra_label` por igual a todas las
|
||||
series del body; por eso aquí ya se fija `instance` por-serie, para que cada nodo unibus
|
||||
conserve su identidad cuando un solo exporter empuja los de varios nodos en un único POST.
|
||||
- Solo lee la posture y el status que hoy expone `/healthz`. Métricas profundas de
|
||||
NATS/JetStream (msgs/s, conexiones, RAFT leader por stream) NO salen de aquí: requieren
|
||||
el monitoring embebido de NATS (puerto 8222), que en producción está cerrado.
|
||||
@@ -0,0 +1,67 @@
|
||||
package infra
|
||||
|
||||
import "testing"
|
||||
|
||||
// golden: nodo seguro con la posture homogénea esperada en producción.
|
||||
func TestParseUnibusHealthGolden(t *testing.T) {
|
||||
body := []byte(`{"posture":{"enforce":true,"acl":true,"tls":true,"cluster":true,"store":"kv"},"status":"ok"}`)
|
||||
got, err := ParseUnibusHealth("magnus", body)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
want := map[string]float64{
|
||||
"unibus_up": 1,
|
||||
"unibus_status_ok": 1,
|
||||
"unibus_posture_enforce": 1,
|
||||
"unibus_posture_acl": 1,
|
||||
"unibus_posture_tls": 1,
|
||||
"unibus_posture_cluster": 1,
|
||||
"unibus_store_kv": 1,
|
||||
}
|
||||
if len(got) != len(want) {
|
||||
t.Fatalf("got %d samples, want %d", len(got), len(want))
|
||||
}
|
||||
for _, s := range got {
|
||||
w, ok := want[s.Name]
|
||||
if !ok {
|
||||
t.Errorf("unexpected sample %q", s.Name)
|
||||
continue
|
||||
}
|
||||
if s.Value != w {
|
||||
t.Errorf("%s = %v, want %v", s.Name, s.Value, w)
|
||||
}
|
||||
if s.Labels["node"] != "magnus" || s.Labels["instance"] != "magnus" {
|
||||
t.Errorf("%s labels = %v, want node=instance=magnus", s.Name, s.Labels)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// edge: nodo degradado (posture todo false, store distinto de kv, status != ok).
|
||||
func TestParseUnibusHealthDegraded(t *testing.T) {
|
||||
body := []byte(`{"posture":{"enforce":false,"acl":false,"tls":false,"cluster":false,"store":"sqlite"},"status":"degraded"}`)
|
||||
got, err := ParseUnibusHealth("homer", body)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
want := map[string]float64{
|
||||
"unibus_up": 1,
|
||||
"unibus_status_ok": 0,
|
||||
"unibus_posture_enforce": 0,
|
||||
"unibus_posture_acl": 0,
|
||||
"unibus_posture_tls": 0,
|
||||
"unibus_posture_cluster": 0,
|
||||
"unibus_store_kv": 0,
|
||||
}
|
||||
for _, s := range got {
|
||||
if s.Value != want[s.Name] {
|
||||
t.Errorf("%s = %v, want %v", s.Name, s.Value, want[s.Name])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// error path: body que no es JSON válido devuelve error, no panic.
|
||||
func TestParseUnibusHealthInvalid(t *testing.T) {
|
||||
if _, err := ParseUnibusHealth("datardos", []byte("not json at all")); err == nil {
|
||||
t.Fatal("expected error for invalid body, got nil")
|
||||
}
|
||||
}
|
||||
+30
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"server_id": "NC23B47RQSJYPX5AIUC5CA3ND5RLCYREKSAFLM65MLBY5PBRIXPAFL7O",
|
||||
"now": "2026-06-07T19:02:25.326833943Z",
|
||||
"num_connections": 1,
|
||||
"total": 1,
|
||||
"offset": 0,
|
||||
"limit": 1024,
|
||||
"connections": [
|
||||
{
|
||||
"cid": 5,
|
||||
"kind": "Client",
|
||||
"type": "nats",
|
||||
"ip": "127.0.0.1",
|
||||
"port": 52734,
|
||||
"start": "2026-06-07T21:02:24.812382826+02:00",
|
||||
"last_activity": "2026-06-07T21:02:24.821005187+02:00",
|
||||
"rtt": "623µs",
|
||||
"uptime": "0s",
|
||||
"idle": "0s",
|
||||
"pending_bytes": 0,
|
||||
"in_msgs": 17,
|
||||
"out_msgs": 17,
|
||||
"in_bytes": 1304,
|
||||
"out_bytes": 3905,
|
||||
"subscriptions": 2,
|
||||
"lang": "go",
|
||||
"version": "1.49.0"
|
||||
}
|
||||
]
|
||||
}
|
||||
+97
@@ -0,0 +1,97 @@
|
||||
{
|
||||
"memory": 0,
|
||||
"storage": 310,
|
||||
"reserved_memory": 0,
|
||||
"reserved_storage": 0,
|
||||
"accounts": 1,
|
||||
"ha_assets": 0,
|
||||
"api": {
|
||||
"level": 1,
|
||||
"total": 6,
|
||||
"errors": 0
|
||||
},
|
||||
"server_id": "NC23B47RQSJYPX5AIUC5CA3ND5RLCYREKSAFLM65MLBY5PBRIXPAFL7O",
|
||||
"now": "2026-06-07T19:02:25.327216549Z",
|
||||
"config": {
|
||||
"max_memory": 3221225472,
|
||||
"max_storage": 546399169536,
|
||||
"store_dir": "/tmp/natsprobe4019469486/jetstream",
|
||||
"sync_interval": 120000000000
|
||||
},
|
||||
"limits": {},
|
||||
"streams": 3,
|
||||
"consumers": 0,
|
||||
"messages": 6,
|
||||
"bytes": 310,
|
||||
"account_details": [
|
||||
{
|
||||
"name": "$G",
|
||||
"id": "$G",
|
||||
"memory": 0,
|
||||
"storage": 310,
|
||||
"reserved_memory": 18446744073709551615,
|
||||
"reserved_storage": 18446744073709551615,
|
||||
"accounts": 0,
|
||||
"ha_assets": 0,
|
||||
"api": {
|
||||
"level": 0,
|
||||
"total": 6,
|
||||
"errors": 0
|
||||
},
|
||||
"stream_detail": [
|
||||
{
|
||||
"name": "KV_UNIBUS_rooms",
|
||||
"created": "2026-06-07T19:02:24.8170934Z",
|
||||
"cluster": {
|
||||
"leader": "probe"
|
||||
},
|
||||
"state": {
|
||||
"messages": 2,
|
||||
"bytes": 102,
|
||||
"first_seq": 1,
|
||||
"first_ts": "2026-06-07T19:02:24.817910599Z",
|
||||
"last_seq": 2,
|
||||
"last_ts": "2026-06-07T19:02:24.818011867Z",
|
||||
"num_subjects": 2,
|
||||
"consumer_count": 0
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "KV_UNIBUS_members",
|
||||
"created": "2026-06-07T19:02:24.818494147Z",
|
||||
"cluster": {
|
||||
"leader": "probe"
|
||||
},
|
||||
"state": {
|
||||
"messages": 2,
|
||||
"bytes": 106,
|
||||
"first_seq": 1,
|
||||
"first_ts": "2026-06-07T19:02:24.81917932Z",
|
||||
"last_seq": 2,
|
||||
"last_ts": "2026-06-07T19:02:24.819283444Z",
|
||||
"num_subjects": 2,
|
||||
"consumer_count": 0
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "KV_UNIBUS_users",
|
||||
"created": "2026-06-07T19:02:24.814500069Z",
|
||||
"cluster": {
|
||||
"leader": "probe"
|
||||
},
|
||||
"state": {
|
||||
"messages": 2,
|
||||
"bytes": 102,
|
||||
"first_seq": 1,
|
||||
"first_ts": "2026-06-07T19:02:24.81638123Z",
|
||||
"last_seq": 2,
|
||||
"last_ts": "2026-06-07T19:02:24.816570377Z",
|
||||
"num_subjects": 2,
|
||||
"consumer_count": 0
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"total": 1
|
||||
}
|
||||
+80
@@ -0,0 +1,80 @@
|
||||
{
|
||||
"server_id": "NC23B47RQSJYPX5AIUC5CA3ND5RLCYREKSAFLM65MLBY5PBRIXPAFL7O",
|
||||
"server_name": "probe",
|
||||
"version": "2.11.15",
|
||||
"proto": 1,
|
||||
"go": "go1.26.4",
|
||||
"host": "127.0.0.1",
|
||||
"port": 14260,
|
||||
"max_connections": 65536,
|
||||
"ping_interval": 120000000000,
|
||||
"ping_max": 2,
|
||||
"http_host": "127.0.0.1",
|
||||
"http_port": 8222,
|
||||
"http_base_path": "",
|
||||
"https_port": 0,
|
||||
"auth_timeout": 2,
|
||||
"max_control_line": 4096,
|
||||
"max_payload": 1048576,
|
||||
"max_pending": 67108864,
|
||||
"cluster": {},
|
||||
"gateway": {},
|
||||
"leaf": {},
|
||||
"mqtt": {},
|
||||
"websocket": {},
|
||||
"jetstream": {
|
||||
"config": {
|
||||
"max_memory": 3221225472,
|
||||
"max_storage": 546399169536,
|
||||
"store_dir": "/tmp/natsprobe4019469486/jetstream",
|
||||
"sync_interval": 120000000000
|
||||
},
|
||||
"stats": {
|
||||
"memory": 0,
|
||||
"storage": 310,
|
||||
"reserved_memory": 0,
|
||||
"reserved_storage": 0,
|
||||
"accounts": 1,
|
||||
"ha_assets": 0,
|
||||
"api": {
|
||||
"level": 1,
|
||||
"total": 6,
|
||||
"errors": 0
|
||||
}
|
||||
},
|
||||
"limits": {}
|
||||
},
|
||||
"tls_timeout": 2,
|
||||
"write_deadline": 10000000000,
|
||||
"start": "2026-06-07T19:02:24.785745698Z",
|
||||
"now": "2026-06-07T19:02:25.325501038Z",
|
||||
"uptime": "0s",
|
||||
"mem": 18288640,
|
||||
"cores": 24,
|
||||
"gomaxprocs": 24,
|
||||
"gomemlimit": 4294967296,
|
||||
"cpu": 0,
|
||||
"connections": 1,
|
||||
"total_connections": 1,
|
||||
"routes": 0,
|
||||
"remotes": 0,
|
||||
"leafnodes": 0,
|
||||
"in_msgs": 17,
|
||||
"out_msgs": 17,
|
||||
"in_bytes": 1304,
|
||||
"out_bytes": 3905,
|
||||
"slow_consumers": 0,
|
||||
"subscriptions": 75,
|
||||
"http_req_stats": {
|
||||
"/varz": 1
|
||||
},
|
||||
"config_load_time": "2026-06-07T19:02:24.785745698Z",
|
||||
"config_digest": "",
|
||||
"system_account": "$SYS",
|
||||
"slow_consumer_stats": {
|
||||
"clients": 0,
|
||||
"routes": 0,
|
||||
"gateways": 0,
|
||||
"leafs": 0
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user