36f4ba0eaf
Add WebsocketConfig to ServerConfig so the embedded nats-server can expose an additional WebSocket port (nats.ws) alongside the TCP data plane. This lets a browser SPA speak the NATS protocol directly, the way native TCP peers (Go, Kotlin/android) already do — the first enabler for uniweb becoming a browser-native client with no Go gateway (issue uniweb/0001, Phase 0). The client authenticator applies to WebSocket connections too, so this adds a transport, not a trust bypass. Plain ws:// is used only without TLS (loopback dev); a certificate yields wss://. An empty AllowedOrigins enforces same-origin. Nil WebsocketConfig keeps the server TCP-only, so existing single-node and cluster deployments are unchanged. Tests: WebSocket listener opens and completes the upgrade handshake (101); no listener opens when WebsocketConfig is nil.
304 lines
14 KiB
Go
304 lines
14 KiB
Go
// Package embeddednats starts an in-process NATS server with JetStream enabled.
|
|
//
|
|
// This lets the whole unibus stack run with `go run` without installing or
|
|
// managing a separate NATS deployment. In production, point clients at an
|
|
// external NATS via the --nats-url flag instead of using this.
|
|
package embeddednats
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"time"
|
|
|
|
server "github.com/nats-io/nats-server/v2/server"
|
|
)
|
|
|
|
// ClusterConfig configures the route layer that links several embedded NATS
|
|
// servers into a single cluster (issue 0003a). It is the data-plane side of
|
|
// high availability: with a cluster, a client subject published on one node is
|
|
// forwarded to subscribers connected to any other node, and (with JetStream
|
|
// replicas > 1) streams/KV are RAFT-replicated across nodes so the loss of one
|
|
// node does not lose the bus.
|
|
//
|
|
// The route layer is a SEPARATE trust boundary from the client data plane: it
|
|
// carries server-to-server traffic, so it authenticates NODES, not bus users.
|
|
// Never reuse the nkey client authenticator here. Routes are secured with their
|
|
// own shared secret (Username/Password -> NATS Cluster.Authorization) and their
|
|
// own mutual TLS (TLS, built from the bus CA with busauth.RouteTLSConfig): a
|
|
// node without the cluster secret and a CA-signed node certificate cannot join
|
|
// the cluster nor inject messages into it.
|
|
type ClusterConfig struct {
|
|
// Name is the cluster name; it MUST be identical on every node or the
|
|
// servers refuse to gossip routes to each other.
|
|
Name string
|
|
// Host and Port are the route listener (server-to-server), distinct from the
|
|
// client Host/Port. Use a free, non-client port (e.g. 6250).
|
|
Host string
|
|
Port int
|
|
// Routes are the nats-route URLs of the OTHER nodes, e.g.
|
|
// "nats://user:pass@10.0.0.2:6250". When the route layer is password
|
|
// protected each URL must carry the same userinfo as the local Username /
|
|
// Password so this node authenticates outbound to its peers.
|
|
Routes []string
|
|
// Username and Password gate the route listener (NATS Cluster.Authorization).
|
|
// A peer (or impostor) that connects to this node's route port without these
|
|
// credentials is rejected, so it never becomes a route. Empty disables route
|
|
// auth (dev / trusted-network only).
|
|
Username string
|
|
Password string
|
|
// TLS, when non-nil, secures the route connections with mutual TLS. Build it
|
|
// with busauth.RouteTLSConfig(cert, key, ca): the server presents its node
|
|
// certificate AND requires+verifies the connecting node's certificate against
|
|
// the bus CA, so an unsigned impostor cannot establish a route even with the
|
|
// right password. Nil keeps routes plaintext (dev / WireGuard-only).
|
|
TLS *tls.Config
|
|
}
|
|
|
|
// ServerConfig is the full set of knobs for the embedded NATS server. The zero
|
|
// value (empty StoreDir aside) yields a dev-friendly server: JetStream on, bound
|
|
// to all interfaces, no client auth, no TLS, standalone (no cluster). Secured
|
|
// deployments set Auth and TLS; HA deployments set ServerName + Cluster; tests
|
|
// set Host to loopback and a free Port.
|
|
type ServerConfig struct {
|
|
StoreDir string // JetStream store directory
|
|
Host string // bind interface; "" = nats-server default ("0.0.0.0")
|
|
Port int // listen port
|
|
// ServerName is this node's unique name within the cluster. JetStream's RAFT
|
|
// layer requires a stable, unique name per node to form its meta-group; leave
|
|
// it empty for a standalone server (nats-server then auto-generates one).
|
|
ServerName string
|
|
// Auth, when non-nil, is installed as CustomClientAuthentication so the data
|
|
// plane only accepts approved clients (nkey signature + bus allowlist).
|
|
Auth server.Authentication
|
|
// TLS, when non-nil, makes the server present a certificate and require TLS
|
|
// on the data plane. Clients must trust the issuing CA (see busauth).
|
|
TLS *tls.Config
|
|
// Cluster, when non-nil, joins this server to a route cluster for high
|
|
// availability (issue 0003a). Nil keeps the server standalone (the legacy
|
|
// single-node behavior).
|
|
Cluster *ClusterConfig
|
|
// Websocket, when non-nil, opens an ADDITIONAL WebSocket listener on the
|
|
// embedded nats-server so browser clients (nats.ws) can reach the data plane
|
|
// directly, the same way native TCP peers (Go, Kotlin) do (issue uniweb/0001).
|
|
// Native TCP clients are unaffected: the WebSocket listener is a separate port
|
|
// layered on top of the existing TCP listener, and the client authenticator
|
|
// (Auth) applies to both. Nil keeps the server TCP-only (legacy behavior).
|
|
Websocket *WebsocketConfig
|
|
}
|
|
|
|
// WebsocketConfig configures the embedded nats-server's WebSocket listener so a
|
|
// browser can speak the NATS protocol over ws://. A browser cannot open a raw TCP
|
|
// socket, so this is the only way the SPA reaches the data plane without a Go
|
|
// gateway in between.
|
|
//
|
|
// Security: off loopback a browser requires wss:// (TLS) — set TLS with a
|
|
// certificate the browser trusts. NoTLS plain ws:// is acceptable only for a
|
|
// loopback dev stack. The WebSocket upgrade also enforces an Origin allowlist
|
|
// (browser same-origin policy); AllowedOrigins must list the SPA's origins or the
|
|
// browser handshake is refused.
|
|
type WebsocketConfig struct {
|
|
// Host is the bind interface for the WebSocket listener; "" lets nats-server
|
|
// pick its default. Use "127.0.0.1" to keep it loopback-only in dev.
|
|
Host string
|
|
// Port is the WebSocket listen port (e.g. 8480). Required (non-zero) for the
|
|
// listener to open.
|
|
Port int
|
|
// NoTLS serves plain ws:// instead of wss://. Loopback/dev only: browsers refuse
|
|
// ws:// to a non-loopback origin. Ignored when TLS is set (TLS implies wss://).
|
|
NoTLS bool
|
|
// TLS, when set, serves wss:// with this certificate. Required for any browser
|
|
// origin that is not loopback.
|
|
TLS *tls.Config
|
|
// AllowedOrigins is the allowlist of browser Origin headers permitted to upgrade
|
|
// the WebSocket. Empty = same-origin only (nats-server SameOrigin). Never use a
|
|
// wildcard in production; list the exact SPA origins.
|
|
AllowedOrigins []string
|
|
}
|
|
|
|
// Start is a thin backward-compatible wrapper: embedded JetStream server on the
|
|
// default interface, no auth, no TLS.
|
|
func Start(storeDir string, port int) (*server.Server, error) {
|
|
return StartServer(ServerConfig{StoreDir: storeDir, Port: port})
|
|
}
|
|
|
|
// StartHost is Start with explicit control over the bind interface. host selects
|
|
// which network interface the data plane listens on: pass "127.0.0.1" to keep
|
|
// NATS loopback-only (the safe default for a single-host dev stack) or "0.0.0.0"
|
|
// to expose it to the LAN so remote peers (phones, other PCs) can connect. An
|
|
// empty host falls back to the nats-server default ("0.0.0.0", all interfaces).
|
|
func StartHost(storeDir, host string, port int) (*server.Server, error) {
|
|
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port})
|
|
}
|
|
|
|
// StartHostAuth is StartHost with an optional custom client authenticator. When
|
|
// auth is non-nil only clients the authenticator approves may connect; when nil
|
|
// the server accepts any client (legacy, network-trusted behavior).
|
|
func StartHostAuth(storeDir, host string, port int, auth server.Authentication) (*server.Server, error) {
|
|
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port, Auth: auth})
|
|
}
|
|
|
|
// natsLogOpts maps the two independent environment toggles to the embedded
|
|
// nats-server logging and monitoring flags. It is a pure function (no I/O) so the
|
|
// decoupling between the two toggles can be unit-tested directly.
|
|
//
|
|
// - UNIBUS_NATS_DEBUG="1" enables the nats-server logger (route/RAFT/JetStream
|
|
// errors); "2" additionally enables protocol tracing. Off by default so the
|
|
// server stays silent (NoLog) and production behavior is unchanged.
|
|
// - UNIBUS_NATS_MONITOR="1" opens the monitoring HTTP endpoint (loopback only)
|
|
// for a local metrics scraper to read /varz, /connz and /jsz.
|
|
//
|
|
// The two are DECOUPLED on purpose: enabling the monitoring endpoint must NOT turn
|
|
// on the verbose debug log, which would write room subjects and routing metadata
|
|
// to journald in clear and regress the hardened posture (issue 0007). The reverse
|
|
// coupling is kept for backward compatibility: debug mode still exposes the
|
|
// monitoring endpoint as well (debug implies monitor), so existing debugging
|
|
// workflows are unchanged.
|
|
func natsLogOpts(debugEnv, monitorEnv string) (noLog, debug, trace, monitor bool) {
|
|
debug = debugEnv == "1" || debugEnv == "2"
|
|
trace = debugEnv == "2"
|
|
monitor = monitorEnv == "1" || debug
|
|
noLog = !debug
|
|
return noLog, debug, trace, monitor
|
|
}
|
|
|
|
// StartServer launches an embedded nats-server with JetStream from cfg. It
|
|
// blocks until the server is ready to accept connections (up to 5s) and returns
|
|
// the running server; the caller must Shutdown it.
|
|
func StartServer(cfg ServerConfig) (*server.Server, error) {
|
|
// Map the two independent env toggles to the nats-server logging + monitoring
|
|
// flags. See natsLogOpts for the decoupling rationale (issue 0007).
|
|
noLog, debugNATS, traceNATS, monitorNATS := natsLogOpts(
|
|
os.Getenv("UNIBUS_NATS_DEBUG"), os.Getenv("UNIBUS_NATS_MONITOR"))
|
|
opts := &server.Options{
|
|
JetStream: true,
|
|
StoreDir: cfg.StoreDir,
|
|
Host: cfg.Host,
|
|
Port: cfg.Port,
|
|
ServerName: cfg.ServerName,
|
|
DontListen: false,
|
|
// Keep the embedded server quiet by default; the host app logs the URLs.
|
|
NoLog: noLog,
|
|
Debug: debugNATS,
|
|
Trace: traceNATS,
|
|
Logtime: true,
|
|
NoSigs: true,
|
|
}
|
|
if monitorNATS {
|
|
// Expose the nats-server monitoring endpoint on LOOPBACK ONLY (never public):
|
|
// the operator (or a local metrics scraper) inspects /varz, /connz, /jsz,
|
|
// /routez. The 127.0.0.1 bind is mandatory because this endpoint has no auth;
|
|
// it must stay unreachable from the network.
|
|
opts.HTTPHost = "127.0.0.1"
|
|
opts.HTTPPort = 8222
|
|
}
|
|
if cfg.Auth != nil {
|
|
opts.CustomClientAuthentication = cfg.Auth
|
|
// A CustomClientAuthentication alone does not make the server advertise a
|
|
// nonce in its INFO line, and nats.go refuses to connect with an nkey to a
|
|
// server that does not ("nkeys not supported by the server"). Forcing the
|
|
// nonce makes nkey clients sign the challenge our authenticator verifies.
|
|
opts.AlwaysEnableNonce = true
|
|
}
|
|
if cfg.TLS != nil {
|
|
opts.TLSConfig = cfg.TLS
|
|
opts.TLS = true
|
|
}
|
|
|
|
if cfg.Websocket != nil {
|
|
// Layer a WebSocket listener on top of the TCP data plane so browser
|
|
// clients (nats.ws) can connect. The client authenticator (opts.*Auth above)
|
|
// applies to WebSocket connections too, so a browser still has to pass the
|
|
// nkey + allowlist check; this only adds a transport, not a trust bypass.
|
|
ws := server.WebsocketOpts{
|
|
Host: cfg.Websocket.Host,
|
|
Port: cfg.Websocket.Port,
|
|
AllowedOrigins: cfg.Websocket.AllowedOrigins,
|
|
}
|
|
if cfg.Websocket.TLS != nil {
|
|
ws.TLSConfig = cfg.Websocket.TLS
|
|
} else {
|
|
// No certificate: plain ws:// (loopback/dev only). Browsers refuse this
|
|
// off-loopback, which is the intended guard rail.
|
|
ws.NoTLS = true
|
|
}
|
|
// Empty AllowedOrigins means "same-origin only": tell nats-server to enforce
|
|
// it rather than defaulting to accept-any-origin.
|
|
ws.SameOrigin = len(cfg.Websocket.AllowedOrigins) == 0
|
|
opts.Websocket = ws
|
|
}
|
|
|
|
if cfg.Cluster != nil {
|
|
if err := applyClusterOpts(opts, cfg.Cluster); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
ns, err := server.NewServer(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("embeddednats: new server: %w", err)
|
|
}
|
|
|
|
if debugNATS {
|
|
ns.ConfigureLogger()
|
|
}
|
|
|
|
go ns.Start()
|
|
|
|
if !ns.ReadyForConnections(5 * time.Second) {
|
|
ns.Shutdown()
|
|
return nil, fmt.Errorf("embeddednats: server not ready for connections within 5s")
|
|
}
|
|
|
|
return ns, nil
|
|
}
|
|
|
|
// applyClusterOpts translates a ClusterConfig into the nats-server route options
|
|
// on opts: the cluster listener (name + host/port + shared-secret auth + mutual
|
|
// TLS) and the outbound routes to the other nodes. A malformed route URL is a
|
|
// configuration error and aborts startup rather than silently dropping a peer.
|
|
func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
|
|
opts.Cluster = server.ClusterOpts{
|
|
Name: c.Name,
|
|
Host: c.Host,
|
|
Port: c.Port,
|
|
Username: c.Username,
|
|
Password: c.Password,
|
|
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
|
|
// 3 connections per peer). On a small cluster the pool churns with
|
|
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
|
|
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
|
|
// meta never becomes current and stream/KV creation hangs (issue 0006g).
|
|
// PoolSize=-1 forces the classic single route per peer, which is stable for
|
|
// the 3-node unibus cluster.
|
|
PoolSize: -1,
|
|
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
|
|
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
|
|
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
|
|
// then try to dial those private, mutually-unreachable IPs, churning the
|
|
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
|
|
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
|
|
NoAdvertise: true,
|
|
}
|
|
if c.TLS != nil {
|
|
opts.Cluster.TLSConfig = c.TLS
|
|
// A generous handshake budget: route TLS does a mutual handshake and the
|
|
// peer may still be booting. The default 2s can flap on a cold cluster.
|
|
opts.Cluster.TLSTimeout = 5.0
|
|
}
|
|
for _, r := range c.Routes {
|
|
u, err := url.Parse(r)
|
|
if err != nil {
|
|
return fmt.Errorf("embeddednats: parse route %q: %w", r, err)
|
|
}
|
|
opts.Routes = append(opts.Routes, u)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ClientURL returns a NATS connection URL for the running embedded server.
|
|
func ClientURL(ns *server.Server) string {
|
|
return ns.ClientURL()
|
|
}
|