Files
fn_registry/functions/infra/parse_nats_monitor.go
T
egutierrez 0a6d1b8d17 feat(infra): auto-commit con 6 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 01:57:00 +02:00

151 lines
5.4 KiB
Go

package infra
import (
"encoding/json"
"fmt"
"strings"
"time"
)
// natsVarz refleja los campos relevantes de la respuesta JSON del endpoint
// /varz del monitoring HTTP embebido de un nats-server (puerto 8222, loopback).
// Solo se mapean los campos que producen series; el resto se ignora.
type natsVarz struct {
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
Connections int `json:"connections"`
SlowConsumers int `json:"slow_consumers"`
Subscriptions int `json:"subscriptions"`
Mem int64 `json:"mem"`
Start string `json:"start"`
}
// natsConnz refleja los campos relevantes de /connz.
type natsConnz struct {
NumConnections int `json:"num_connections"`
}
// natsStreamDetail refleja un stream dentro de account_details[].stream_detail[].
type natsStreamDetail struct {
Name string `json:"name"`
Cluster struct {
Leader string `json:"leader"`
} `json:"cluster"`
State struct {
Messages int64 `json:"messages"`
Bytes int64 `json:"bytes"`
} `json:"state"`
}
// natsJsz refleja los campos relevantes de /jsz?streams=1.
type natsJsz struct {
Streams int64 `json:"streams"`
Messages int64 `json:"messages"`
Bytes int64 `json:"bytes"`
Memory int64 `json:"memory"`
Storage int64 `json:"storage"`
AccountDetails []struct {
StreamDetail []natsStreamDetail `json:"stream_detail"`
} `json:"account_details"`
}
// ParseNatsMonitor convierte las respuestas JSON del endpoint de monitoring HTTP
// embebido de un nats-server (puerto 8222, loopback) en una serie de PromSample
// lista para empujar a VictoriaMetrics. Es la hermana de ParseUnibusHealth para
// las métricas server-level de NATS/JetStream (msgs/s, conexiones, KV bucket
// msgs, RAFT leader por stream, memoria). La consume el unibus_exporter de
// fleet_monitoring en modo scraper local por nodo.
//
// node es el nombre lógico del nodo (p.ej. "magnus"); se adjunta a CADA serie
// como las labels "node" e "instance" para distinguir los nodos cuando un único
// exporter scrapea varios.
//
// varz, connz y jsz son los cuerpos crudos de GET /varz, GET /connz y
// GET /jsz?streams=1 respectivamente:
// - varz es el core: si NO parsea como JSON válido devuelve (nil, error).
// - connz y jsz son best-effort: si vienen vacíos o no parsean, sus series se
// omiten sin abortar (no error), para que el scraper resista que un endpoint
// falle. nats_connections cae a varz.connections cuando connz no parsea.
func ParseNatsMonitor(node string, varz, connz, jsz []byte) ([]PromSample, error) {
var v natsVarz
if err := json.Unmarshal(varz, &v); err != nil {
return nil, fmt.Errorf("parse nats varz for node %q: %w", node, err)
}
// mk construye un PromSample con las labels base {node, instance} más, de
// forma opcional, labels extra (clave/valor alternados). Las labels base no
// se pueden sobreescribir desde extra.
mk := func(name string, val float64, extra ...string) PromSample {
labels := map[string]string{"node": node, "instance": node}
for i := 0; i+1 < len(extra); i += 2 {
labels[extra[i]] = extra[i+1]
}
return PromSample{Name: name, Labels: labels, Value: val}
}
out := []PromSample{
mk("nats_msgs_in_total", float64(v.InMsgs)),
mk("nats_msgs_out_total", float64(v.OutMsgs)),
mk("nats_bytes_in_total", float64(v.InBytes)),
mk("nats_bytes_out_total", float64(v.OutBytes)),
}
// nats_connections: prefiere connz.num_connections; si connz no parsea, cae
// a varz.connections para no perder la serie.
connections := float64(v.Connections)
if len(connz) > 0 {
var c natsConnz
if err := json.Unmarshal(connz, &c); err == nil {
connections = float64(c.NumConnections)
}
}
out = append(out,
mk("nats_connections", connections),
mk("nats_slow_consumers", float64(v.SlowConsumers)),
mk("nats_mem_bytes", float64(v.Mem)),
mk("nats_subscriptions", float64(v.Subscriptions)),
)
// nats_server_start_seconds: epoch (segundos Unix) del campo start (RFC3339).
// Proxy de reinicios del nats-server: un cambio de este valor = el server
// reinició. Si el parse de la fecha falla, se omite la serie (no se aborta).
if t, err := time.Parse(time.RFC3339, v.Start); err == nil {
out = append(out, mk("nats_server_start_seconds", float64(t.Unix())))
}
// jsz es best-effort: si vacío o inválido, se omiten todas sus series.
if len(jsz) > 0 {
var j natsJsz
if err := json.Unmarshal(jsz, &j); err == nil {
out = append(out,
mk("nats_jetstream_streams", float64(j.Streams)),
mk("nats_jetstream_messages", float64(j.Messages)),
mk("nats_jetstream_bytes", float64(j.Bytes)),
mk("nats_jetstream_memory_bytes", float64(j.Memory)),
mk("nats_jetstream_storage_bytes", float64(j.Storage)),
)
for _, acc := range j.AccountDetails {
for _, sd := range acc.StreamDetail {
out = append(out,
mk("nats_stream_messages", float64(sd.State.Messages), "stream", sd.Name),
mk("nats_stream_bytes", float64(sd.State.Bytes), "stream", sd.Name),
)
leader := 0.0
if sd.Cluster.Leader == node {
leader = 1
}
out = append(out, mk("nats_jetstream_raft_leader", leader, "stream", sd.Name))
if bucket, ok := strings.CutPrefix(sd.Name, "KV_"); ok {
out = append(out, mk("kv_bucket_msgs", float64(sd.State.Messages), "bucket", bucket))
}
}
}
}
}
return out, nil
}