// Command unibus_exporter probes the /healthz endpoint of every node of the // unibus messaging cluster (NATS+JetStream) on a fixed interval and pushes the // resulting cluster/posture metrics to a VictoriaMetrics / Prometheus-compatible // ingest endpoint. // // It does NOT instrument the bus: it only reads each node's public /healthz over // TLS (verified with the cluster CA) and turns the JSON posture into metrics. The // heavy lifting is three registry functions in fn-registry/functions/infra: // // - ParseUnibusHealth -> []infra.PromSample (per-node up/posture/store) // - FormatPromExposition -> Prometheus exposition text // - PushPromRemote -> POST the text with basic auth + an extra "job" label // // A single exporter scrapes all nodes, so the "node" and "instance" labels are // attached per series (by ParseUnibusHealth) rather than via the push's // extra_label, which would apply one value to the whole batch. package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io" "log" "net/http" "os" "os/signal" "syscall" "time" "fn-registry/functions/infra" ) func main() { configPath := flag.String("config", "", "path to JSON config file") once := flag.Bool("once", false, "scrape and push a single time, then exit (useful for testing)") flag.Parse() cfg, err := loadConfig(*configPath) if err != nil { log.Fatalf("config: %v", err) } client, err := newClient(cfg) if err != nil { log.Fatalf("tls client: %v", err) } log.Printf("unibus_exporter starting: nodes=%d hub=%q interval=%ds", len(cfg.Nodes), cfg.HubURL, cfg.IntervalSec) if *once { if err := scrapeAndPush(cfg, client); err != nil { log.Fatalf("push: %v", err) } return } ctx, cancel := context.WithCancel(context.Background()) defer cancel() stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) go func() { <-stop log.Print("shutting down") cancel() }() ticker := time.NewTicker(time.Duration(cfg.IntervalSec) * time.Second) defer ticker.Stop() // Push once right away so the cluster shows up immediately, then on each tick. if err := scrapeAndPush(cfg, client); err != nil { log.Printf("push error: %v", err) } for { select { case <-ticker.C: if err := scrapeAndPush(cfg, client); err != nil { log.Printf("push error: %v", err) } case <-ctx.Done(): return } } } // newClient builds an HTTP client that verifies the unibus nodes' TLS against the // cluster CA loaded from cfg.CACertPath. We never disable verification: a wrong // or missing CA must fail loudly, not silently trust the endpoint. func newClient(cfg Config) (*http.Client, error) { pem, err := os.ReadFile(cfg.CACertPath) if err != nil { return nil, fmt.Errorf("read CA %q: %w", cfg.CACertPath, err) } pool := x509.NewCertPool() if !pool.AppendCertsFromPEM(pem) { return nil, fmt.Errorf("no certificates parsed from CA %q", cfg.CACertPath) } return &http.Client{ Timeout: time.Duration(cfg.TimeoutSec) * time.Second, Transport: &http.Transport{ TLSClientConfig: &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}, }, }, nil } // scrapeAndPush runs one full cycle: probe every node, build the metric samples // and push them to VictoriaMetrics in a single request. func scrapeAndPush(cfg Config, client *http.Client) error { var samples []infra.PromSample // Cluster-wide gauge: the configured cluster size. Live nodes = sum(unibus_up). samples = append(samples, infra.PromSample{ Name: "unibus_cluster_size", Value: float64(len(cfg.Nodes)), }) for _, node := range cfg.Nodes { samples = append(samples, probeNode(node, client)...) } body := infra.FormatPromExposition(samples, time.Now().UnixMilli()) if err := infra.PushPromRemote(cfg.HubURL, cfg.User, cfg.Pass, body, cfg.Labels); err != nil { return err } log.Printf("pushed %d samples for %d nodes", len(samples), len(cfg.Nodes)) return nil } // probeNode does a single GET /healthz and turns the result into samples. // On any failure it emits unibus_up=0 + unibus_scrape_error=1 for the node so a // down node is visible in Grafana rather than just absent. On success it delegates // the body parsing to the registry function and adds scrape_error=0. // Either way it emits unibus_scrape_duration_seconds for the node. func probeNode(node Node, client *http.Client) []infra.PromSample { labels := map[string]string{"node": node.Name, "instance": node.Name} start := time.Now() body, err := getHealth(client, node.URL) elapsed := time.Since(start).Seconds() dur := infra.PromSample{Name: "unibus_scrape_duration_seconds", Labels: labels, Value: elapsed} if err != nil { log.Printf("node %s: scrape error: %v", node.Name, err) return []infra.PromSample{ {Name: "unibus_up", Labels: labels, Value: 0}, {Name: "unibus_scrape_error", Labels: labels, Value: 1}, dur, } } samples, perr := infra.ParseUnibusHealth(node.Name, body) if perr != nil { log.Printf("node %s: parse error: %v", node.Name, perr) return []infra.PromSample{ {Name: "unibus_up", Labels: labels, Value: 0}, {Name: "unibus_scrape_error", Labels: labels, Value: 1}, dur, } } samples = append(samples, infra.PromSample{Name: "unibus_scrape_error", Labels: labels, Value: 0}, dur) return samples } // getHealth performs the HTTP GET and returns the body when the status is 2xx. func getHealth(client *http.Client, url string) ([]byte, error) { resp, err := client.Get(url) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) if err != nil { return nil, err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(body)) } return body, nil }