92da0c0b0a
Compone parse_unibus_health + format_prom_exposition + push_prom_remote del registry (grupo fleet-metrics). Un solo exporter scrapea los 3 nodos por IP pública con la CA del cluster; labels node/instance por serie. Config JSON con secretos fuera de argv. Incluye systemd unit y unibus.example.json. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
181 lines
5.7 KiB
Go
181 lines
5.7 KiB
Go
// Command unibus_exporter probes the /healthz endpoint of every node of the
|
|
// unibus messaging cluster (NATS+JetStream) on a fixed interval and pushes the
|
|
// resulting cluster/posture metrics to a VictoriaMetrics / Prometheus-compatible
|
|
// ingest endpoint.
|
|
//
|
|
// It does NOT instrument the bus: it only reads each node's public /healthz over
|
|
// TLS (verified with the cluster CA) and turns the JSON posture into metrics. The
|
|
// heavy lifting is three registry functions in fn-registry/functions/infra:
|
|
//
|
|
// - ParseUnibusHealth -> []infra.PromSample (per-node up/posture/store)
|
|
// - FormatPromExposition -> Prometheus exposition text
|
|
// - PushPromRemote -> POST the text with basic auth + an extra "job" label
|
|
//
|
|
// A single exporter scrapes all nodes, so the "node" and "instance" labels are
|
|
// attached per series (by ParseUnibusHealth) rather than via the push's
|
|
// extra_label, which would apply one value to the whole batch.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"fn-registry/functions/infra"
|
|
)
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "", "path to JSON config file")
|
|
once := flag.Bool("once", false, "scrape and push a single time, then exit (useful for testing)")
|
|
flag.Parse()
|
|
|
|
cfg, err := loadConfig(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("config: %v", err)
|
|
}
|
|
client, err := newClient(cfg)
|
|
if err != nil {
|
|
log.Fatalf("tls client: %v", err)
|
|
}
|
|
log.Printf("unibus_exporter starting: nodes=%d hub=%q interval=%ds", len(cfg.Nodes), cfg.HubURL, cfg.IntervalSec)
|
|
|
|
if *once {
|
|
if err := scrapeAndPush(cfg, client); err != nil {
|
|
log.Fatalf("push: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
<-stop
|
|
log.Print("shutting down")
|
|
cancel()
|
|
}()
|
|
|
|
ticker := time.NewTicker(time.Duration(cfg.IntervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Push once right away so the cluster shows up immediately, then on each tick.
|
|
if err := scrapeAndPush(cfg, client); err != nil {
|
|
log.Printf("push error: %v", err)
|
|
}
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := scrapeAndPush(cfg, client); err != nil {
|
|
log.Printf("push error: %v", err)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// newClient builds an HTTP client that verifies the unibus nodes' TLS against the
|
|
// cluster CA loaded from cfg.CACertPath. We never disable verification: a wrong
|
|
// or missing CA must fail loudly, not silently trust the endpoint.
|
|
func newClient(cfg Config) (*http.Client, error) {
|
|
pem, err := os.ReadFile(cfg.CACertPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read CA %q: %w", cfg.CACertPath, err)
|
|
}
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(pem) {
|
|
return nil, fmt.Errorf("no certificates parsed from CA %q", cfg.CACertPath)
|
|
}
|
|
return &http.Client{
|
|
Timeout: time.Duration(cfg.TimeoutSec) * time.Second,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// scrapeAndPush runs one full cycle: probe every node, build the metric samples
|
|
// and push them to VictoriaMetrics in a single request.
|
|
func scrapeAndPush(cfg Config, client *http.Client) error {
|
|
var samples []infra.PromSample
|
|
|
|
// Cluster-wide gauge: the configured cluster size. Live nodes = sum(unibus_up).
|
|
samples = append(samples, infra.PromSample{
|
|
Name: "unibus_cluster_size",
|
|
Value: float64(len(cfg.Nodes)),
|
|
})
|
|
|
|
for _, node := range cfg.Nodes {
|
|
samples = append(samples, probeNode(node, client)...)
|
|
}
|
|
|
|
body := infra.FormatPromExposition(samples, time.Now().UnixMilli())
|
|
if err := infra.PushPromRemote(cfg.HubURL, cfg.User, cfg.Pass, body, cfg.Labels); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("pushed %d samples for %d nodes", len(samples), len(cfg.Nodes))
|
|
return nil
|
|
}
|
|
|
|
// probeNode does a single GET <node>/healthz and turns the result into samples.
|
|
// On any failure it emits unibus_up=0 + unibus_scrape_error=1 for the node so a
|
|
// down node is visible in Grafana rather than just absent. On success it delegates
|
|
// the body parsing to the registry function and adds scrape_error=0.
|
|
// Either way it emits unibus_scrape_duration_seconds for the node.
|
|
func probeNode(node Node, client *http.Client) []infra.PromSample {
|
|
labels := map[string]string{"node": node.Name, "instance": node.Name}
|
|
start := time.Now()
|
|
body, err := getHealth(client, node.URL)
|
|
elapsed := time.Since(start).Seconds()
|
|
|
|
dur := infra.PromSample{Name: "unibus_scrape_duration_seconds", Labels: labels, Value: elapsed}
|
|
|
|
if err != nil {
|
|
log.Printf("node %s: scrape error: %v", node.Name, err)
|
|
return []infra.PromSample{
|
|
{Name: "unibus_up", Labels: labels, Value: 0},
|
|
{Name: "unibus_scrape_error", Labels: labels, Value: 1},
|
|
dur,
|
|
}
|
|
}
|
|
|
|
samples, perr := infra.ParseUnibusHealth(node.Name, body)
|
|
if perr != nil {
|
|
log.Printf("node %s: parse error: %v", node.Name, perr)
|
|
return []infra.PromSample{
|
|
{Name: "unibus_up", Labels: labels, Value: 0},
|
|
{Name: "unibus_scrape_error", Labels: labels, Value: 1},
|
|
dur,
|
|
}
|
|
}
|
|
samples = append(samples, infra.PromSample{Name: "unibus_scrape_error", Labels: labels, Value: 0}, dur)
|
|
return samples
|
|
}
|
|
|
|
// getHealth performs the HTTP GET and returns the body when the status is 2xx.
|
|
func getHealth(client *http.Client, url string) ([]byte, error) {
|
|
resp, err := client.Get(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
return body, nil
|
|
}
|