diff --git a/cmd/membershipd/config.go b/cmd/membershipd/config.go index 1c1886b..a593382 100644 --- a/cmd/membershipd/config.go +++ b/cmd/membershipd/config.go @@ -3,10 +3,24 @@ package main import ( "fmt" "net" + "strings" "github.com/enmanuel/unibus/pkg/membership" ) +// splitRoutes parses the comma-separated --routes flag into a clean slice of +// route URLs, dropping empty entries and surrounding whitespace so a trailing +// comma or a spaced list does not yield a bogus empty route. +func splitRoutes(csv string) []string { + var out []string + for _, r := range strings.Split(csv, ",") { + if r = strings.TrimSpace(r); r != "" { + out = append(out, r) + } + } + return out +} + // isLoopbackBind reports whether the --bind value keeps the service reachable // only from this host. An empty bind means "all interfaces" (public), and a // hostname we cannot resolve to a loopback literal is treated as public — the @@ -48,3 +62,42 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s } return nil } + +// validateClusterConfig guards the cluster route layer (issue 0003a). The route +// layer is a server-to-server trust boundary distinct from the client data +// plane: leaving it open lets anyone who reaches the route port join the cluster +// or inject messages into the whole bus (audit 0004, "auth of the cluster +// routes"). So on a public (non-loopback) bind, a cluster MUST carry both a +// shared route secret AND mutual route TLS. It is a pure function of the parsed +// flags. An empty clusterName means "no cluster" (standalone) and is always +// allowed. +// +// The three route-TLS paths are all-or-nothing (mutual TLS needs the node cert, +// its key, and the CA together), independent of the bind, so a partial TLS +// config never silently degrades to plaintext routes. +func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA string) error { + rtAny := rtCert != "" || rtKey != "" || rtCA != "" + rtAll := rtCert != "" && rtKey != "" && rtCA != "" + if rtAny && !rtAll { + return fmt.Errorf( + "refusing to start: --route-tls-cert/--route-tls-key/--route-tls-ca must be set together (mutual route TLS needs all three)") + } + if clusterName == "" { + return nil // standalone: no route layer to secure + } + if isLoopbackBind(bind) { + return nil // loopback cluster is dev-only and unreachable from outside + } + // Public cluster: demand a route secret and mutual route TLS. + if user == "" || pass == "" { + return fmt.Errorf( + "refusing to start: cluster %q on public bind %q requires --cluster-user and --cluster-pass; an unauthenticated route port lets anyone join the cluster", + clusterName, bind) + } + if !rtAll { + return fmt.Errorf( + "refusing to start: cluster %q on public bind %q requires mutual route TLS (--route-tls-cert/--route-tls-key/--route-tls-ca); plaintext routes expose server-to-server traffic and admit unsigned nodes", + clusterName, bind) + } + return nil +} diff --git a/cmd/membershipd/config_test.go b/cmd/membershipd/config_test.go index 50f4739..8adbb1c 100644 --- a/cmd/membershipd/config_test.go +++ b/cmd/membershipd/config_test.go @@ -70,3 +70,63 @@ func TestBootConfigPolicy(t *testing.T) { }) } } + +// TestClusterConfigPolicy is the cluster route guard (issue 0003a): a standalone +// server is always fine; a loopback cluster is dev-only and unguarded; a public +// cluster demands both a route secret and complete mutual route TLS; and the +// route-TLS flags are all-or-nothing regardless of bind. +func TestClusterConfigPolicy(t *testing.T) { + const c, k, ca = "node.crt", "node.key", "ca.crt" + cases := []struct { + name string + clusterName, bind string + user, pass string + rtCert, rtKey, rtCA string + wantErr bool + }{ + // Standalone (no cluster name) is always allowed, even on a public bind. + {"standalone-public", "", "0.0.0.0", "", "", "", "", "", false}, + // Loopback dev cluster: unguarded (unreachable from outside). + {"loopback-cluster-bare", "unibus", "127.0.0.1", "", "", "", "", "", false}, + // Golden: full public HA config. + {"public-full", "unibus", "0.0.0.0", "u", "p", c, k, ca, false}, + // Error: public cluster without a route secret. + {"public-no-secret", "unibus", "0.0.0.0", "", "", c, k, ca, true}, + {"public-half-secret", "unibus", "0.0.0.0", "u", "", c, k, ca, true}, + // Error: public cluster without mutual route TLS. + {"public-no-tls", "unibus", "10.0.0.1", "u", "p", "", "", "", true}, + // Error: partial route-TLS flags trip regardless of bind. + {"loopback-partial-tls", "unibus", "127.0.0.1", "", "", c, "", "", true}, + {"standalone-partial-tls", "", "127.0.0.1", "", "", c, k, "", true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := validateClusterConfig(tc.clusterName, tc.bind, tc.user, tc.pass, tc.rtCert, tc.rtKey, tc.rtCA) + if tc.wantErr && err == nil { + t.Fatalf("cluster config %+v should be refused", tc) + } + if !tc.wantErr && err != nil { + t.Fatalf("cluster config %+v should be allowed, got: %v", tc, err) + } + }) + } +} + +func TestSplitRoutes(t *testing.T) { + cases := []struct { + in string + want int + }{ + {"", 0}, + {"nats://a:1", 1}, + {"nats://a:1,nats://b:2", 2}, + {" nats://a:1 , nats://b:2 ", 2}, // spaces trimmed + {"nats://a:1,,", 1}, // empty entries dropped + {",", 0}, + } + for _, c := range cases { + if got := splitRoutes(c.in); len(got) != c.want { + t.Fatalf("splitRoutes(%q) = %v (len %d), want len %d", c.in, got, len(got), c.want) + } + } +} diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 629e699..ee509f5 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -45,6 +45,16 @@ func main() { busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)") tlsCert = flag.String("tls-cert", "", "PATH to the NATS server certificate (deploy/tls/server.crt); enables TLS on the embedded data plane") tlsKey = flag.String("tls-key", "", "path to the NATS server private key (deploy/tls/server.key); required with --tls-cert") + // Cluster (issue 0003a): empty --cluster-name keeps the server standalone. + clusterName = flag.String("cluster-name", "", "NATS cluster name (identical on every node); empty = standalone, no HA") + serverName = flag.String("server-name", "", "unique node name within the cluster (required by JetStream RAFT when clustered)") + clusterPort = flag.Int("cluster-port", 6250, "route listener port for server-to-server cluster traffic") + routesCSV = flag.String("routes", "", "comma-separated nats-route URLs of the OTHER nodes, e.g. nats://user:pass@10.0.0.2:6250") + clusterUser = flag.String("cluster-user", "", "shared route secret username (gates the route listener)") + clusterPass = flag.String("cluster-pass", "", "shared route secret password") + routeTLSCert = flag.String("route-tls-cert", "", "this node's route certificate (CA-signed); enables mutual route TLS with --route-tls-key/--route-tls-ca") + routeTLSKey = flag.String("route-tls-key", "", "this node's route private key") + routeTLSCA = flag.String("route-tls-ca", "", "bus CA that signs every node's route certificate (deploy/tls/ca.crt)") ) flag.Parse() @@ -59,6 +69,11 @@ func main() { if err := validateBootConfig(*bind, authMode, *tlsCert, *tlsKey); err != nil { log.Fatalf("%v", err) } + // Cluster route guard (issue 0003a): a public cluster needs a route secret + // and mutual route TLS, and the route-TLS flags are all-or-nothing. + if err := validateClusterConfig(*clusterName, *bind, *clusterUser, *clusterPass, *routeTLSCert, *routeTLSKey, *routeTLSCA); err != nil { + log.Fatalf("%v", err) + } log.SetFlags(log.LstdFlags | log.Lmsgprefix) log.SetPrefix("[membershipd] ") @@ -89,9 +104,31 @@ func main() { // Bind the embedded NATS to the same interface as the HTTP API so a // single --bind flag governs reachability: 127.0.0.1 keeps the whole // stack loopback-only; 0.0.0.0 exposes both planes to the LAN. - StoreDir: *natsStore, - Host: *bind, - Port: *natsPort, + StoreDir: *natsStore, + Host: *bind, + Port: *natsPort, + ServerName: *serverName, + } + // Cluster (issue 0003a): with a cluster name, join the route layer for HA. + if *clusterName != "" { + cc := &embeddednats.ClusterConfig{ + Name: *clusterName, + Host: *bind, + Port: *clusterPort, + Routes: splitRoutes(*routesCSV), + Username: *clusterUser, + Password: *clusterPass, + } + if *routeTLSCert != "" { + rtls, err := busauth.RouteTLSConfig(*routeTLSCert, *routeTLSKey, *routeTLSCA) + if err != nil { + log.Fatalf("load route TLS: %v", err) + } + cc.TLS = rtls + log.Printf("cluster route TLS: ON (mutual, CA %s)", *routeTLSCA) + } + cfg.Cluster = cc + log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes)) } if authMode == membership.AuthEnforce { cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized) diff --git a/pkg/busauth/tls.go b/pkg/busauth/tls.go index 47e4fcf..b8de511 100644 --- a/pkg/busauth/tls.go +++ b/pkg/busauth/tls.go @@ -35,3 +35,41 @@ func ServerTLSConfig(certPEMPath, keyPEMPath string) (*tls.Config, error) { } return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}, nil } + +// RouteTLSConfig builds the mutual-TLS config for the NATS CLUSTER route layer +// (issue 0003a). Unlike the client data plane, where the server presents a cert +// and only the client verifies it, routes are server-to-server: each node both +// presents its own node certificate AND verifies the connecting node's +// certificate against the bus CA. So this single config carries: +// +// - Certificates: this node's CA-signed certificate (presented in both the +// server and the client role of a route handshake), +// - RootCAs: the bus CA, to verify the certificate of a node we dial out to, +// - ClientCAs + ClientAuth=RequireAndVerifyClientCert: the bus CA, to verify +// the certificate of a node dialing in. +// +// The effect: a node that lacks a certificate signed by the bus CA cannot +// establish a route in either direction, even if it knows the cluster password. +// Reuse the same CA as the client data plane (deploy/tls) but a per-node cert +// whose SAN covers that node's route address. +func RouteTLSConfig(certPEMPath, keyPEMPath, caPEMPath string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certPEMPath, keyPEMPath) + if err != nil { + return nil, fmt.Errorf("busauth: load route keypair: %w", err) + } + pem, err := os.ReadFile(caPEMPath) + if err != nil { + return nil, fmt.Errorf("busauth: read route CA %q: %w", caPEMPath, err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(pem) { + return nil, fmt.Errorf("busauth: route CA %q contains no valid PEM certificate", caPEMPath) + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: pool, + ClientCAs: pool, + ClientAuth: tls.RequireAndVerifyClientCert, + MinVersion: tls.VersionTLS12, + }, nil +} diff --git a/pkg/embeddednats/cluster_test.go b/pkg/embeddednats/cluster_test.go new file mode 100644 index 0000000..e391b46 --- /dev/null +++ b/pkg/embeddednats/cluster_test.go @@ -0,0 +1,344 @@ +package embeddednats_test + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math/big" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/nats-io/nats.go" + server "github.com/nats-io/nats-server/v2/server" +) + +// freePort returns an OS-assigned free TCP port on loopback. +func freePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +// startNode boots a clustered embedded NATS node. peerRoutePorts are the route +// ports of the OTHER nodes; user/pass gate the route layer (empty disables it); +// routeTLS, when non-nil, secures the routes with mutual TLS. +func startNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int, user, pass string, routeTLS *clusterTLS) *server.Server { + t.Helper() + routes := make([]string, 0, len(peerRoutePorts)) + for _, p := range peerRoutePorts { + // Carry the cluster credentials in the route URL so this node + // authenticates outbound to its peers' route listeners. + if user != "" { + routes = append(routes, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pass, p)) + } else { + routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p)) + } + } + cc := &embeddednats.ClusterConfig{ + Name: "unibus-test", + Host: "127.0.0.1", + Port: routePort, + Routes: routes, + Username: user, + Password: pass, + } + if routeTLS != nil { + cfg, err := busauth.RouteTLSConfig(routeTLS.cert, routeTLS.key, routeTLS.ca) + if err != nil { + t.Fatalf("route TLS for %s: %v", name, err) + } + cc.TLS = cfg + } + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), + Host: "127.0.0.1", + Port: clientPort, + ServerName: name, + Cluster: cc, + }) + if err != nil { + t.Fatalf("start node %s: %v", name, err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + return ns +} + +// waitRoutes waits until ns has at least want established routes, or fails. +func waitRoutes(t *testing.T, ns *server.Server, want int) { + t.Helper() + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if ns.NumRoutes() >= want { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("node %q never reached %d routes (have %d)", ns.Name(), want, ns.NumRoutes()) +} + +// stableRouteCount waits for ns's route count to stop changing (the NATS route +// pool opens several connections per peer asynchronously) and returns it, so a +// test can use it as a baseline that an impostor must not increase. +func stableRouteCount(t *testing.T, ns *server.Server) int { + t.Helper() + prev := -1 + stableSince := time.Now() + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + n := ns.NumRoutes() + if n != prev { + prev = n + stableSince = time.Now() + } else if time.Since(stableSince) >= 750*time.Millisecond { + return n + } + time.Sleep(50 * time.Millisecond) + } + return prev +} + +// pubSubAcrossNodes connects a subscriber to subURL and a publisher to pubURL, +// publishes one message on subject, and reports whether it arrived within 3s. +// This proves the cluster forwards client subjects between nodes. +func pubSubAcrossNodes(t *testing.T, subURL, pubURL, subject, payload string) bool { + t.Helper() + subConn, err := nats.Connect(subURL) + if err != nil { + t.Fatalf("subscriber connect %s: %v", subURL, err) + } + defer subConn.Close() + got := make(chan string, 1) + if _, err := subConn.Subscribe(subject, func(m *nats.Msg) { + select { + case got <- string(m.Data): + default: + } + }); err != nil { + t.Fatalf("subscribe: %v", err) + } + if err := subConn.Flush(); err != nil { + t.Fatalf("flush sub: %v", err) + } + + pubConn, err := nats.Connect(pubURL) + if err != nil { + t.Fatalf("publisher connect %s: %v", pubURL, err) + } + defer pubConn.Close() + // Retry the publish for a moment: route interest propagation across the + // cluster is asynchronous, so the very first publish can race the gossip. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if err := pubConn.Publish(subject, []byte(payload)); err != nil { + t.Fatalf("publish: %v", err) + } + _ = pubConn.Flush() + select { + case v := <-got: + return v == payload + case <-time.After(100 * time.Millisecond): + } + } + return false +} + +// --- golden: two-node cluster forwards client subjects across nodes ---------- + +func TestClusterForwardsAcrossNodes(t *testing.T) { + rp0, rp1 := freePort(t), freePort(t) + n0 := startNode(t, "n0", freePort(t), rp0, []int{rp1}, "clusteruser", "clusterpass", nil) + n1 := startNode(t, "n1", freePort(t), rp1, []int{rp0}, "clusteruser", "clusterpass", nil) + + waitRoutes(t, n0, 1) + waitRoutes(t, n1, 1) + + if !pubSubAcrossNodes(t, n0.ClientURL(), n1.ClientURL(), "test.cross", "hello-cluster") { + t.Fatalf("subject published on n1 did not reach subscriber on n0") + } +} + +// --- edge: three-node cluster (HA shape) forwards between non-adjacent nodes -- + +func TestClusterThreeNodesForward(t *testing.T) { + rp0, rp1, rp2 := freePort(t), freePort(t), freePort(t) + n0 := startNode(t, "n0", freePort(t), rp0, []int{rp1, rp2}, "u", "p", nil) + n1 := startNode(t, "n1", freePort(t), rp1, []int{rp0, rp2}, "u", "p", nil) + n2 := startNode(t, "n2", freePort(t), rp2, []int{rp0, rp1}, "u", "p", nil) + + waitRoutes(t, n0, 2) + waitRoutes(t, n1, 2) + waitRoutes(t, n2, 2) + + // Publish on n2, subscribe on n0: a message must traverse the cluster. + if !pubSubAcrossNodes(t, n0.ClientURL(), n2.ClientURL(), "test.ha", "three-node") { + t.Fatalf("subject published on n2 did not reach subscriber on n0") + } +} + +// --- error: a node with the wrong cluster password is rejected as a route ----- + +func TestClusterRejectsBadRouteAuth(t *testing.T) { + rp0, rp1 := freePort(t), freePort(t) + good := startNode(t, "good", freePort(t), rp0, []int{rp1}, "secret", "right", nil) + _ = startNode(t, "peer", freePort(t), rp1, []int{rp0}, "secret", "right", nil) + waitRoutes(t, good, 1) + // Let the route pool settle so the baseline count is stable (NATS opens a + // pool of route connections per peer, so NumRoutes counts connections, not + // distinct peers). + base := stableRouteCount(t, good) + + // Impostor knows the addresses but not the cluster password. It tries to + // route to `good`; the route handshake must be rejected, so the impostor + // never establishes a route. + impostor := startNode(t, "impostor", freePort(t), freePort(t), []int{rp0}, "secret", "WRONG", nil) + + // Give the route layer ample time to (fail to) connect, then assert it never + // formed: the impostor has zero routes, and `good`'s route count is unchanged + // (it did not accept a route from the impostor). + time.Sleep(2 * time.Second) + if n := impostor.NumRoutes(); n != 0 { + t.Fatalf("impostor with wrong cluster password formed %d routes, want 0", n) + } + if n := good.NumRoutes(); n != base { + t.Fatalf("legit node route count changed from %d to %d after impostor attempt (it accepted the impostor)", base, n) + } +} + +// --- golden (TLS): mutual-TLS routes forward across nodes --------------------- + +func TestClusterMutualTLSForwards(t *testing.T) { + ca, caKey := genCA(t) + dir := t.TempDir() + tlsA := writeNodeCert(t, dir, "a", ca, caKey) + tlsB := writeNodeCert(t, dir, "b", ca, caKey) + + rp0, rp1 := freePort(t), freePort(t) + n0 := startNode(t, "n0", freePort(t), rp0, []int{rp1}, "u", "p", tlsA) + n1 := startNode(t, "n1", freePort(t), rp1, []int{rp0}, "u", "p", tlsB) + + waitRoutes(t, n0, 1) + waitRoutes(t, n1, 1) + + if !pubSubAcrossNodes(t, n0.ClientURL(), n1.ClientURL(), "test.tls", "mtls-ok") { + t.Fatalf("subject did not cross the mutual-TLS cluster") + } +} + +// --- error (TLS): a node whose cert is not signed by the bus CA cannot join --- + +func TestClusterRejectsUnsignedNode(t *testing.T) { + ca, caKey := genCA(t) + dir := t.TempDir() + tlsGood := writeNodeCert(t, dir, "good", ca, caKey) + tlsPeer := writeNodeCert(t, dir, "peer", ca, caKey) + + // The impostor signs its node cert with a DIFFERENT CA, and pins only that + // CA. The legit nodes' RequireAndVerifyClientCert against the bus CA rejects + // it; the impostor likewise rejects the legit node's cert. No route forms. + otherCA, otherKey := genCA(t) + tlsImpostor := writeNodeCert(t, dir, "impostor", otherCA, otherKey) + + rp0, rp1 := freePort(t), freePort(t) + good := startNode(t, "good", freePort(t), rp0, []int{rp1}, "u", "p", tlsGood) + _ = startNode(t, "peer", freePort(t), rp1, []int{rp0}, "u", "p", tlsPeer) + waitRoutes(t, good, 1) + base := stableRouteCount(t, good) + + impostor := startNode(t, "impostor", freePort(t), freePort(t), []int{rp0}, "u", "p", tlsImpostor) + time.Sleep(2 * time.Second) + if n := impostor.NumRoutes(); n != 0 { + t.Fatalf("impostor with unsigned cert formed %d routes, want 0", n) + } + if n := good.NumRoutes(); n != base { + t.Fatalf("legit node route count changed from %d to %d after unsigned impostor attempt (it accepted the impostor)", base, n) + } +} + +// --- cert helpers ------------------------------------------------------------ + +type clusterTLS struct{ cert, key, ca string } // PEM file paths + +// genCA creates a self-signed ECDSA CA certificate and its key. +func genCA(t *testing.T) (*x509.Certificate, *ecdsa.PrivateKey) { + t.Helper() + key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("gen CA key: %v", err) + } + tmpl := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "unibus-test-CA"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + BasicConstraintsValid: true, + IsCA: true, + } + der, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &key.PublicKey, key) + if err != nil { + t.Fatalf("create CA cert: %v", err) + } + caCert, err := x509.ParseCertificate(der) + if err != nil { + t.Fatalf("parse CA cert: %v", err) + } + return caCert, key +} + +// writeNodeCert issues a node certificate signed by ca (SAN 127.0.0.1/::1, +// usable as both server and client) and writes cert/key/ca PEM files, returning +// their paths for RouteTLSConfig. +func writeNodeCert(t *testing.T, dir, name string, ca *x509.Certificate, caKey *ecdsa.PrivateKey) *clusterTLS { + t.Helper() + key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("gen node key: %v", err) + } + tmpl := &x509.Certificate{ + SerialNumber: big.NewInt(time.Now().UnixNano()), + Subject: pkix.Name{CommonName: name}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}, + DNSNames: []string{"localhost"}, + } + der, err := x509.CreateCertificate(rand.Reader, tmpl, ca, &key.PublicKey, caKey) + if err != nil { + t.Fatalf("create node cert: %v", err) + } + certPath := filepath.Join(dir, name+".crt") + keyPath := filepath.Join(dir, name+".key") + caPath := filepath.Join(dir, name+"-ca.crt") + + writePEM(t, certPath, "CERTIFICATE", der) + keyDER, err := x509.MarshalECPrivateKey(key) + if err != nil { + t.Fatalf("marshal node key: %v", err) + } + writePEM(t, keyPath, "EC PRIVATE KEY", keyDER) + writePEM(t, caPath, "CERTIFICATE", ca.Raw) + return &clusterTLS{cert: certPath, key: keyPath, ca: caPath} +} + +func writePEM(t *testing.T, path, blockType string, der []byte) { + t.Helper() + b := pem.EncodeToMemory(&pem.Block{Type: blockType, Bytes: der}) + if err := os.WriteFile(path, b, 0o600); err != nil { + t.Fatalf("write %s: %v", path, err) + } +} diff --git a/pkg/embeddednats/embeddednats.go b/pkg/embeddednats/embeddednats.go index 114352a..1c96e9f 100644 --- a/pkg/embeddednats/embeddednats.go +++ b/pkg/embeddednats/embeddednats.go @@ -8,25 +8,76 @@ package embeddednats import ( "crypto/tls" "fmt" + "net/url" "time" server "github.com/nats-io/nats-server/v2/server" ) +// ClusterConfig configures the route layer that links several embedded NATS +// servers into a single cluster (issue 0003a). It is the data-plane side of +// high availability: with a cluster, a client subject published on one node is +// forwarded to subscribers connected to any other node, and (with JetStream +// replicas > 1) streams/KV are RAFT-replicated across nodes so the loss of one +// node does not lose the bus. +// +// The route layer is a SEPARATE trust boundary from the client data plane: it +// carries server-to-server traffic, so it authenticates NODES, not bus users. +// Never reuse the nkey client authenticator here. Routes are secured with their +// own shared secret (Username/Password -> NATS Cluster.Authorization) and their +// own mutual TLS (TLS, built from the bus CA with busauth.RouteTLSConfig): a +// node without the cluster secret and a CA-signed node certificate cannot join +// the cluster nor inject messages into it. +type ClusterConfig struct { + // Name is the cluster name; it MUST be identical on every node or the + // servers refuse to gossip routes to each other. + Name string + // Host and Port are the route listener (server-to-server), distinct from the + // client Host/Port. Use a free, non-client port (e.g. 6250). + Host string + Port int + // Routes are the nats-route URLs of the OTHER nodes, e.g. + // "nats://user:pass@10.0.0.2:6250". When the route layer is password + // protected each URL must carry the same userinfo as the local Username / + // Password so this node authenticates outbound to its peers. + Routes []string + // Username and Password gate the route listener (NATS Cluster.Authorization). + // A peer (or impostor) that connects to this node's route port without these + // credentials is rejected, so it never becomes a route. Empty disables route + // auth (dev / trusted-network only). + Username string + Password string + // TLS, when non-nil, secures the route connections with mutual TLS. Build it + // with busauth.RouteTLSConfig(cert, key, ca): the server presents its node + // certificate AND requires+verifies the connecting node's certificate against + // the bus CA, so an unsigned impostor cannot establish a route even with the + // right password. Nil keeps routes plaintext (dev / WireGuard-only). + TLS *tls.Config +} + // ServerConfig is the full set of knobs for the embedded NATS server. The zero // value (empty StoreDir aside) yields a dev-friendly server: JetStream on, bound -// to all interfaces, no client auth, no TLS. Secured deployments set Auth and -// TLS; tests set Host to loopback and a free Port. +// to all interfaces, no client auth, no TLS, standalone (no cluster). Secured +// deployments set Auth and TLS; HA deployments set ServerName + Cluster; tests +// set Host to loopback and a free Port. type ServerConfig struct { StoreDir string // JetStream store directory Host string // bind interface; "" = nats-server default ("0.0.0.0") Port int // listen port + // ServerName is this node's unique name within the cluster. JetStream's RAFT + // layer requires a stable, unique name per node to form its meta-group; leave + // it empty for a standalone server (nats-server then auto-generates one). + ServerName string // Auth, when non-nil, is installed as CustomClientAuthentication so the data // plane only accepts approved clients (nkey signature + bus allowlist). Auth server.Authentication // TLS, when non-nil, makes the server present a certificate and require TLS // on the data plane. Clients must trust the issuing CA (see busauth). TLS *tls.Config + // Cluster, when non-nil, joins this server to a route cluster for high + // availability (issue 0003a). Nil keeps the server standalone (the legacy + // single-node behavior). + Cluster *ClusterConfig } // Start is a thin backward-compatible wrapper: embedded JetStream server on the @@ -60,6 +111,7 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { StoreDir: cfg.StoreDir, Host: cfg.Host, Port: cfg.Port, + ServerName: cfg.ServerName, DontListen: false, // Keep the embedded server quiet by default; the host app logs the URLs. NoLog: true, @@ -78,6 +130,12 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { opts.TLS = true } + if cfg.Cluster != nil { + if err := applyClusterOpts(opts, cfg.Cluster); err != nil { + return nil, err + } + } + ns, err := server.NewServer(opts) if err != nil { return nil, fmt.Errorf("embeddednats: new server: %w", err) @@ -93,6 +151,34 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { return ns, nil } +// applyClusterOpts translates a ClusterConfig into the nats-server route options +// on opts: the cluster listener (name + host/port + shared-secret auth + mutual +// TLS) and the outbound routes to the other nodes. A malformed route URL is a +// configuration error and aborts startup rather than silently dropping a peer. +func applyClusterOpts(opts *server.Options, c *ClusterConfig) error { + opts.Cluster = server.ClusterOpts{ + Name: c.Name, + Host: c.Host, + Port: c.Port, + Username: c.Username, + Password: c.Password, + } + if c.TLS != nil { + opts.Cluster.TLSConfig = c.TLS + // A generous handshake budget: route TLS does a mutual handshake and the + // peer may still be booting. The default 2s can flap on a cold cluster. + opts.Cluster.TLSTimeout = 5.0 + } + for _, r := range c.Routes { + u, err := url.Parse(r) + if err != nil { + return fmt.Errorf("embeddednats: parse route %q: %w", r, err) + } + opts.Routes = append(opts.Routes, u) + } + return nil +} + // ClientURL returns a NATS connection URL for the running embedded server. func ClientURL(ns *server.Server) string { return ns.ClientURL()