Merge issue/0003a-cluster: NATS cluster routes (auth + mutual TLS)
This commit is contained in:
@@ -3,10 +3,24 @@ package main
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/enmanuel/unibus/pkg/membership"
|
"github.com/enmanuel/unibus/pkg/membership"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// splitRoutes parses the comma-separated --routes flag into a clean slice of
|
||||||
|
// route URLs, dropping empty entries and surrounding whitespace so a trailing
|
||||||
|
// comma or a spaced list does not yield a bogus empty route.
|
||||||
|
func splitRoutes(csv string) []string {
|
||||||
|
var out []string
|
||||||
|
for _, r := range strings.Split(csv, ",") {
|
||||||
|
if r = strings.TrimSpace(r); r != "" {
|
||||||
|
out = append(out, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// isLoopbackBind reports whether the --bind value keeps the service reachable
|
// isLoopbackBind reports whether the --bind value keeps the service reachable
|
||||||
// only from this host. An empty bind means "all interfaces" (public), and a
|
// only from this host. An empty bind means "all interfaces" (public), and a
|
||||||
// hostname we cannot resolve to a loopback literal is treated as public — the
|
// hostname we cannot resolve to a loopback literal is treated as public — the
|
||||||
@@ -48,3 +62,42 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validateClusterConfig guards the cluster route layer (issue 0003a). The route
|
||||||
|
// layer is a server-to-server trust boundary distinct from the client data
|
||||||
|
// plane: leaving it open lets anyone who reaches the route port join the cluster
|
||||||
|
// or inject messages into the whole bus (audit 0004, "auth of the cluster
|
||||||
|
// routes"). So on a public (non-loopback) bind, a cluster MUST carry both a
|
||||||
|
// shared route secret AND mutual route TLS. It is a pure function of the parsed
|
||||||
|
// flags. An empty clusterName means "no cluster" (standalone) and is always
|
||||||
|
// allowed.
|
||||||
|
//
|
||||||
|
// The three route-TLS paths are all-or-nothing (mutual TLS needs the node cert,
|
||||||
|
// its key, and the CA together), independent of the bind, so a partial TLS
|
||||||
|
// config never silently degrades to plaintext routes.
|
||||||
|
func validateClusterConfig(clusterName, bind, user, pass, rtCert, rtKey, rtCA string) error {
|
||||||
|
rtAny := rtCert != "" || rtKey != "" || rtCA != ""
|
||||||
|
rtAll := rtCert != "" && rtKey != "" && rtCA != ""
|
||||||
|
if rtAny && !rtAll {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"refusing to start: --route-tls-cert/--route-tls-key/--route-tls-ca must be set together (mutual route TLS needs all three)")
|
||||||
|
}
|
||||||
|
if clusterName == "" {
|
||||||
|
return nil // standalone: no route layer to secure
|
||||||
|
}
|
||||||
|
if isLoopbackBind(bind) {
|
||||||
|
return nil // loopback cluster is dev-only and unreachable from outside
|
||||||
|
}
|
||||||
|
// Public cluster: demand a route secret and mutual route TLS.
|
||||||
|
if user == "" || pass == "" {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"refusing to start: cluster %q on public bind %q requires --cluster-user and --cluster-pass; an unauthenticated route port lets anyone join the cluster",
|
||||||
|
clusterName, bind)
|
||||||
|
}
|
||||||
|
if !rtAll {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"refusing to start: cluster %q on public bind %q requires mutual route TLS (--route-tls-cert/--route-tls-key/--route-tls-ca); plaintext routes expose server-to-server traffic and admit unsigned nodes",
|
||||||
|
clusterName, bind)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -70,3 +70,63 @@ func TestBootConfigPolicy(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestClusterConfigPolicy is the cluster route guard (issue 0003a): a standalone
|
||||||
|
// server is always fine; a loopback cluster is dev-only and unguarded; a public
|
||||||
|
// cluster demands both a route secret and complete mutual route TLS; and the
|
||||||
|
// route-TLS flags are all-or-nothing regardless of bind.
|
||||||
|
func TestClusterConfigPolicy(t *testing.T) {
|
||||||
|
const c, k, ca = "node.crt", "node.key", "ca.crt"
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
clusterName, bind string
|
||||||
|
user, pass string
|
||||||
|
rtCert, rtKey, rtCA string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// Standalone (no cluster name) is always allowed, even on a public bind.
|
||||||
|
{"standalone-public", "", "0.0.0.0", "", "", "", "", "", false},
|
||||||
|
// Loopback dev cluster: unguarded (unreachable from outside).
|
||||||
|
{"loopback-cluster-bare", "unibus", "127.0.0.1", "", "", "", "", "", false},
|
||||||
|
// Golden: full public HA config.
|
||||||
|
{"public-full", "unibus", "0.0.0.0", "u", "p", c, k, ca, false},
|
||||||
|
// Error: public cluster without a route secret.
|
||||||
|
{"public-no-secret", "unibus", "0.0.0.0", "", "", c, k, ca, true},
|
||||||
|
{"public-half-secret", "unibus", "0.0.0.0", "u", "", c, k, ca, true},
|
||||||
|
// Error: public cluster without mutual route TLS.
|
||||||
|
{"public-no-tls", "unibus", "10.0.0.1", "u", "p", "", "", "", true},
|
||||||
|
// Error: partial route-TLS flags trip regardless of bind.
|
||||||
|
{"loopback-partial-tls", "unibus", "127.0.0.1", "", "", c, "", "", true},
|
||||||
|
{"standalone-partial-tls", "", "127.0.0.1", "", "", c, k, "", true},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
err := validateClusterConfig(tc.clusterName, tc.bind, tc.user, tc.pass, tc.rtCert, tc.rtKey, tc.rtCA)
|
||||||
|
if tc.wantErr && err == nil {
|
||||||
|
t.Fatalf("cluster config %+v should be refused", tc)
|
||||||
|
}
|
||||||
|
if !tc.wantErr && err != nil {
|
||||||
|
t.Fatalf("cluster config %+v should be allowed, got: %v", tc, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitRoutes(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
in string
|
||||||
|
want int
|
||||||
|
}{
|
||||||
|
{"", 0},
|
||||||
|
{"nats://a:1", 1},
|
||||||
|
{"nats://a:1,nats://b:2", 2},
|
||||||
|
{" nats://a:1 , nats://b:2 ", 2}, // spaces trimmed
|
||||||
|
{"nats://a:1,,", 1}, // empty entries dropped
|
||||||
|
{",", 0},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
if got := splitRoutes(c.in); len(got) != c.want {
|
||||||
|
t.Fatalf("splitRoutes(%q) = %v (len %d), want len %d", c.in, got, len(got), c.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
+40
-3
@@ -45,6 +45,16 @@ func main() {
|
|||||||
busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)")
|
busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)")
|
||||||
tlsCert = flag.String("tls-cert", "", "PATH to the NATS server certificate (deploy/tls/server.crt); enables TLS on the embedded data plane")
|
tlsCert = flag.String("tls-cert", "", "PATH to the NATS server certificate (deploy/tls/server.crt); enables TLS on the embedded data plane")
|
||||||
tlsKey = flag.String("tls-key", "", "path to the NATS server private key (deploy/tls/server.key); required with --tls-cert")
|
tlsKey = flag.String("tls-key", "", "path to the NATS server private key (deploy/tls/server.key); required with --tls-cert")
|
||||||
|
// Cluster (issue 0003a): empty --cluster-name keeps the server standalone.
|
||||||
|
clusterName = flag.String("cluster-name", "", "NATS cluster name (identical on every node); empty = standalone, no HA")
|
||||||
|
serverName = flag.String("server-name", "", "unique node name within the cluster (required by JetStream RAFT when clustered)")
|
||||||
|
clusterPort = flag.Int("cluster-port", 6250, "route listener port for server-to-server cluster traffic")
|
||||||
|
routesCSV = flag.String("routes", "", "comma-separated nats-route URLs of the OTHER nodes, e.g. nats://user:pass@10.0.0.2:6250")
|
||||||
|
clusterUser = flag.String("cluster-user", "", "shared route secret username (gates the route listener)")
|
||||||
|
clusterPass = flag.String("cluster-pass", "", "shared route secret password")
|
||||||
|
routeTLSCert = flag.String("route-tls-cert", "", "this node's route certificate (CA-signed); enables mutual route TLS with --route-tls-key/--route-tls-ca")
|
||||||
|
routeTLSKey = flag.String("route-tls-key", "", "this node's route private key")
|
||||||
|
routeTLSCA = flag.String("route-tls-ca", "", "bus CA that signs every node's route certificate (deploy/tls/ca.crt)")
|
||||||
)
|
)
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
@@ -59,6 +69,11 @@ func main() {
|
|||||||
if err := validateBootConfig(*bind, authMode, *tlsCert, *tlsKey); err != nil {
|
if err := validateBootConfig(*bind, authMode, *tlsCert, *tlsKey); err != nil {
|
||||||
log.Fatalf("%v", err)
|
log.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
|
// Cluster route guard (issue 0003a): a public cluster needs a route secret
|
||||||
|
// and mutual route TLS, and the route-TLS flags are all-or-nothing.
|
||||||
|
if err := validateClusterConfig(*clusterName, *bind, *clusterUser, *clusterPass, *routeTLSCert, *routeTLSKey, *routeTLSCA); err != nil {
|
||||||
|
log.Fatalf("%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||||
log.SetPrefix("[membershipd] ")
|
log.SetPrefix("[membershipd] ")
|
||||||
@@ -89,9 +104,31 @@ func main() {
|
|||||||
// Bind the embedded NATS to the same interface as the HTTP API so a
|
// Bind the embedded NATS to the same interface as the HTTP API so a
|
||||||
// single --bind flag governs reachability: 127.0.0.1 keeps the whole
|
// single --bind flag governs reachability: 127.0.0.1 keeps the whole
|
||||||
// stack loopback-only; 0.0.0.0 exposes both planes to the LAN.
|
// stack loopback-only; 0.0.0.0 exposes both planes to the LAN.
|
||||||
StoreDir: *natsStore,
|
StoreDir: *natsStore,
|
||||||
Host: *bind,
|
Host: *bind,
|
||||||
Port: *natsPort,
|
Port: *natsPort,
|
||||||
|
ServerName: *serverName,
|
||||||
|
}
|
||||||
|
// Cluster (issue 0003a): with a cluster name, join the route layer for HA.
|
||||||
|
if *clusterName != "" {
|
||||||
|
cc := &embeddednats.ClusterConfig{
|
||||||
|
Name: *clusterName,
|
||||||
|
Host: *bind,
|
||||||
|
Port: *clusterPort,
|
||||||
|
Routes: splitRoutes(*routesCSV),
|
||||||
|
Username: *clusterUser,
|
||||||
|
Password: *clusterPass,
|
||||||
|
}
|
||||||
|
if *routeTLSCert != "" {
|
||||||
|
rtls, err := busauth.RouteTLSConfig(*routeTLSCert, *routeTLSKey, *routeTLSCA)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("load route TLS: %v", err)
|
||||||
|
}
|
||||||
|
cc.TLS = rtls
|
||||||
|
log.Printf("cluster route TLS: ON (mutual, CA %s)", *routeTLSCA)
|
||||||
|
}
|
||||||
|
cfg.Cluster = cc
|
||||||
|
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
|
||||||
}
|
}
|
||||||
if authMode == membership.AuthEnforce {
|
if authMode == membership.AuthEnforce {
|
||||||
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
||||||
|
|||||||
@@ -35,3 +35,41 @@ func ServerTLSConfig(certPEMPath, keyPEMPath string) (*tls.Config, error) {
|
|||||||
}
|
}
|
||||||
return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}, nil
|
return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RouteTLSConfig builds the mutual-TLS config for the NATS CLUSTER route layer
|
||||||
|
// (issue 0003a). Unlike the client data plane, where the server presents a cert
|
||||||
|
// and only the client verifies it, routes are server-to-server: each node both
|
||||||
|
// presents its own node certificate AND verifies the connecting node's
|
||||||
|
// certificate against the bus CA. So this single config carries:
|
||||||
|
//
|
||||||
|
// - Certificates: this node's CA-signed certificate (presented in both the
|
||||||
|
// server and the client role of a route handshake),
|
||||||
|
// - RootCAs: the bus CA, to verify the certificate of a node we dial out to,
|
||||||
|
// - ClientCAs + ClientAuth=RequireAndVerifyClientCert: the bus CA, to verify
|
||||||
|
// the certificate of a node dialing in.
|
||||||
|
//
|
||||||
|
// The effect: a node that lacks a certificate signed by the bus CA cannot
|
||||||
|
// establish a route in either direction, even if it knows the cluster password.
|
||||||
|
// Reuse the same CA as the client data plane (deploy/tls) but a per-node cert
|
||||||
|
// whose SAN covers that node's route address.
|
||||||
|
func RouteTLSConfig(certPEMPath, keyPEMPath, caPEMPath string) (*tls.Config, error) {
|
||||||
|
cert, err := tls.LoadX509KeyPair(certPEMPath, keyPEMPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("busauth: load route keypair: %w", err)
|
||||||
|
}
|
||||||
|
pem, err := os.ReadFile(caPEMPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("busauth: read route CA %q: %w", caPEMPath, err)
|
||||||
|
}
|
||||||
|
pool := x509.NewCertPool()
|
||||||
|
if !pool.AppendCertsFromPEM(pem) {
|
||||||
|
return nil, fmt.Errorf("busauth: route CA %q contains no valid PEM certificate", caPEMPath)
|
||||||
|
}
|
||||||
|
return &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
RootCAs: pool,
|
||||||
|
ClientCAs: pool,
|
||||||
|
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||||
|
MinVersion: tls.VersionTLS12,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,344 @@
|
|||||||
|
package embeddednats_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"crypto/elliptic"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/x509"
|
||||||
|
"crypto/x509/pkix"
|
||||||
|
"encoding/pem"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/busauth"
|
||||||
|
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
server "github.com/nats-io/nats-server/v2/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
// freePort returns an OS-assigned free TCP port on loopback.
|
||||||
|
func freePort(t *testing.T) int {
|
||||||
|
t.Helper()
|
||||||
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("free port: %v", err)
|
||||||
|
}
|
||||||
|
defer l.Close()
|
||||||
|
return l.Addr().(*net.TCPAddr).Port
|
||||||
|
}
|
||||||
|
|
||||||
|
// startNode boots a clustered embedded NATS node. peerRoutePorts are the route
|
||||||
|
// ports of the OTHER nodes; user/pass gate the route layer (empty disables it);
|
||||||
|
// routeTLS, when non-nil, secures the routes with mutual TLS.
|
||||||
|
func startNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int, user, pass string, routeTLS *clusterTLS) *server.Server {
|
||||||
|
t.Helper()
|
||||||
|
routes := make([]string, 0, len(peerRoutePorts))
|
||||||
|
for _, p := range peerRoutePorts {
|
||||||
|
// Carry the cluster credentials in the route URL so this node
|
||||||
|
// authenticates outbound to its peers' route listeners.
|
||||||
|
if user != "" {
|
||||||
|
routes = append(routes, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pass, p))
|
||||||
|
} else {
|
||||||
|
routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cc := &embeddednats.ClusterConfig{
|
||||||
|
Name: "unibus-test",
|
||||||
|
Host: "127.0.0.1",
|
||||||
|
Port: routePort,
|
||||||
|
Routes: routes,
|
||||||
|
Username: user,
|
||||||
|
Password: pass,
|
||||||
|
}
|
||||||
|
if routeTLS != nil {
|
||||||
|
cfg, err := busauth.RouteTLSConfig(routeTLS.cert, routeTLS.key, routeTLS.ca)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("route TLS for %s: %v", name, err)
|
||||||
|
}
|
||||||
|
cc.TLS = cfg
|
||||||
|
}
|
||||||
|
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||||
|
StoreDir: t.TempDir(),
|
||||||
|
Host: "127.0.0.1",
|
||||||
|
Port: clientPort,
|
||||||
|
ServerName: name,
|
||||||
|
Cluster: cc,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("start node %s: %v", name, err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||||
|
return ns
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitRoutes waits until ns has at least want established routes, or fails.
|
||||||
|
func waitRoutes(t *testing.T, ns *server.Server, want int) {
|
||||||
|
t.Helper()
|
||||||
|
deadline := time.Now().Add(8 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if ns.NumRoutes() >= want {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
t.Fatalf("node %q never reached %d routes (have %d)", ns.Name(), want, ns.NumRoutes())
|
||||||
|
}
|
||||||
|
|
||||||
|
// stableRouteCount waits for ns's route count to stop changing (the NATS route
|
||||||
|
// pool opens several connections per peer asynchronously) and returns it, so a
|
||||||
|
// test can use it as a baseline that an impostor must not increase.
|
||||||
|
func stableRouteCount(t *testing.T, ns *server.Server) int {
|
||||||
|
t.Helper()
|
||||||
|
prev := -1
|
||||||
|
stableSince := time.Now()
|
||||||
|
deadline := time.Now().Add(5 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
n := ns.NumRoutes()
|
||||||
|
if n != prev {
|
||||||
|
prev = n
|
||||||
|
stableSince = time.Now()
|
||||||
|
} else if time.Since(stableSince) >= 750*time.Millisecond {
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
return prev
|
||||||
|
}
|
||||||
|
|
||||||
|
// pubSubAcrossNodes connects a subscriber to subURL and a publisher to pubURL,
|
||||||
|
// publishes one message on subject, and reports whether it arrived within 3s.
|
||||||
|
// This proves the cluster forwards client subjects between nodes.
|
||||||
|
func pubSubAcrossNodes(t *testing.T, subURL, pubURL, subject, payload string) bool {
|
||||||
|
t.Helper()
|
||||||
|
subConn, err := nats.Connect(subURL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("subscriber connect %s: %v", subURL, err)
|
||||||
|
}
|
||||||
|
defer subConn.Close()
|
||||||
|
got := make(chan string, 1)
|
||||||
|
if _, err := subConn.Subscribe(subject, func(m *nats.Msg) {
|
||||||
|
select {
|
||||||
|
case got <- string(m.Data):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("subscribe: %v", err)
|
||||||
|
}
|
||||||
|
if err := subConn.Flush(); err != nil {
|
||||||
|
t.Fatalf("flush sub: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pubConn, err := nats.Connect(pubURL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("publisher connect %s: %v", pubURL, err)
|
||||||
|
}
|
||||||
|
defer pubConn.Close()
|
||||||
|
// Retry the publish for a moment: route interest propagation across the
|
||||||
|
// cluster is asynchronous, so the very first publish can race the gossip.
|
||||||
|
deadline := time.Now().Add(3 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if err := pubConn.Publish(subject, []byte(payload)); err != nil {
|
||||||
|
t.Fatalf("publish: %v", err)
|
||||||
|
}
|
||||||
|
_ = pubConn.Flush()
|
||||||
|
select {
|
||||||
|
case v := <-got:
|
||||||
|
return v == payload
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- golden: two-node cluster forwards client subjects across nodes ----------
|
||||||
|
|
||||||
|
func TestClusterForwardsAcrossNodes(t *testing.T) {
|
||||||
|
rp0, rp1 := freePort(t), freePort(t)
|
||||||
|
n0 := startNode(t, "n0", freePort(t), rp0, []int{rp1}, "clusteruser", "clusterpass", nil)
|
||||||
|
n1 := startNode(t, "n1", freePort(t), rp1, []int{rp0}, "clusteruser", "clusterpass", nil)
|
||||||
|
|
||||||
|
waitRoutes(t, n0, 1)
|
||||||
|
waitRoutes(t, n1, 1)
|
||||||
|
|
||||||
|
if !pubSubAcrossNodes(t, n0.ClientURL(), n1.ClientURL(), "test.cross", "hello-cluster") {
|
||||||
|
t.Fatalf("subject published on n1 did not reach subscriber on n0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- edge: three-node cluster (HA shape) forwards between non-adjacent nodes --
|
||||||
|
|
||||||
|
func TestClusterThreeNodesForward(t *testing.T) {
|
||||||
|
rp0, rp1, rp2 := freePort(t), freePort(t), freePort(t)
|
||||||
|
n0 := startNode(t, "n0", freePort(t), rp0, []int{rp1, rp2}, "u", "p", nil)
|
||||||
|
n1 := startNode(t, "n1", freePort(t), rp1, []int{rp0, rp2}, "u", "p", nil)
|
||||||
|
n2 := startNode(t, "n2", freePort(t), rp2, []int{rp0, rp1}, "u", "p", nil)
|
||||||
|
|
||||||
|
waitRoutes(t, n0, 2)
|
||||||
|
waitRoutes(t, n1, 2)
|
||||||
|
waitRoutes(t, n2, 2)
|
||||||
|
|
||||||
|
// Publish on n2, subscribe on n0: a message must traverse the cluster.
|
||||||
|
if !pubSubAcrossNodes(t, n0.ClientURL(), n2.ClientURL(), "test.ha", "three-node") {
|
||||||
|
t.Fatalf("subject published on n2 did not reach subscriber on n0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- error: a node with the wrong cluster password is rejected as a route -----
|
||||||
|
|
||||||
|
func TestClusterRejectsBadRouteAuth(t *testing.T) {
|
||||||
|
rp0, rp1 := freePort(t), freePort(t)
|
||||||
|
good := startNode(t, "good", freePort(t), rp0, []int{rp1}, "secret", "right", nil)
|
||||||
|
_ = startNode(t, "peer", freePort(t), rp1, []int{rp0}, "secret", "right", nil)
|
||||||
|
waitRoutes(t, good, 1)
|
||||||
|
// Let the route pool settle so the baseline count is stable (NATS opens a
|
||||||
|
// pool of route connections per peer, so NumRoutes counts connections, not
|
||||||
|
// distinct peers).
|
||||||
|
base := stableRouteCount(t, good)
|
||||||
|
|
||||||
|
// Impostor knows the addresses but not the cluster password. It tries to
|
||||||
|
// route to `good`; the route handshake must be rejected, so the impostor
|
||||||
|
// never establishes a route.
|
||||||
|
impostor := startNode(t, "impostor", freePort(t), freePort(t), []int{rp0}, "secret", "WRONG", nil)
|
||||||
|
|
||||||
|
// Give the route layer ample time to (fail to) connect, then assert it never
|
||||||
|
// formed: the impostor has zero routes, and `good`'s route count is unchanged
|
||||||
|
// (it did not accept a route from the impostor).
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
if n := impostor.NumRoutes(); n != 0 {
|
||||||
|
t.Fatalf("impostor with wrong cluster password formed %d routes, want 0", n)
|
||||||
|
}
|
||||||
|
if n := good.NumRoutes(); n != base {
|
||||||
|
t.Fatalf("legit node route count changed from %d to %d after impostor attempt (it accepted the impostor)", base, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- golden (TLS): mutual-TLS routes forward across nodes ---------------------
|
||||||
|
|
||||||
|
func TestClusterMutualTLSForwards(t *testing.T) {
|
||||||
|
ca, caKey := genCA(t)
|
||||||
|
dir := t.TempDir()
|
||||||
|
tlsA := writeNodeCert(t, dir, "a", ca, caKey)
|
||||||
|
tlsB := writeNodeCert(t, dir, "b", ca, caKey)
|
||||||
|
|
||||||
|
rp0, rp1 := freePort(t), freePort(t)
|
||||||
|
n0 := startNode(t, "n0", freePort(t), rp0, []int{rp1}, "u", "p", tlsA)
|
||||||
|
n1 := startNode(t, "n1", freePort(t), rp1, []int{rp0}, "u", "p", tlsB)
|
||||||
|
|
||||||
|
waitRoutes(t, n0, 1)
|
||||||
|
waitRoutes(t, n1, 1)
|
||||||
|
|
||||||
|
if !pubSubAcrossNodes(t, n0.ClientURL(), n1.ClientURL(), "test.tls", "mtls-ok") {
|
||||||
|
t.Fatalf("subject did not cross the mutual-TLS cluster")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- error (TLS): a node whose cert is not signed by the bus CA cannot join ---
|
||||||
|
|
||||||
|
func TestClusterRejectsUnsignedNode(t *testing.T) {
|
||||||
|
ca, caKey := genCA(t)
|
||||||
|
dir := t.TempDir()
|
||||||
|
tlsGood := writeNodeCert(t, dir, "good", ca, caKey)
|
||||||
|
tlsPeer := writeNodeCert(t, dir, "peer", ca, caKey)
|
||||||
|
|
||||||
|
// The impostor signs its node cert with a DIFFERENT CA, and pins only that
|
||||||
|
// CA. The legit nodes' RequireAndVerifyClientCert against the bus CA rejects
|
||||||
|
// it; the impostor likewise rejects the legit node's cert. No route forms.
|
||||||
|
otherCA, otherKey := genCA(t)
|
||||||
|
tlsImpostor := writeNodeCert(t, dir, "impostor", otherCA, otherKey)
|
||||||
|
|
||||||
|
rp0, rp1 := freePort(t), freePort(t)
|
||||||
|
good := startNode(t, "good", freePort(t), rp0, []int{rp1}, "u", "p", tlsGood)
|
||||||
|
_ = startNode(t, "peer", freePort(t), rp1, []int{rp0}, "u", "p", tlsPeer)
|
||||||
|
waitRoutes(t, good, 1)
|
||||||
|
base := stableRouteCount(t, good)
|
||||||
|
|
||||||
|
impostor := startNode(t, "impostor", freePort(t), freePort(t), []int{rp0}, "u", "p", tlsImpostor)
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
if n := impostor.NumRoutes(); n != 0 {
|
||||||
|
t.Fatalf("impostor with unsigned cert formed %d routes, want 0", n)
|
||||||
|
}
|
||||||
|
if n := good.NumRoutes(); n != base {
|
||||||
|
t.Fatalf("legit node route count changed from %d to %d after unsigned impostor attempt (it accepted the impostor)", base, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- cert helpers ------------------------------------------------------------
|
||||||
|
|
||||||
|
type clusterTLS struct{ cert, key, ca string } // PEM file paths
|
||||||
|
|
||||||
|
// genCA creates a self-signed ECDSA CA certificate and its key.
|
||||||
|
func genCA(t *testing.T) (*x509.Certificate, *ecdsa.PrivateKey) {
|
||||||
|
t.Helper()
|
||||||
|
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("gen CA key: %v", err)
|
||||||
|
}
|
||||||
|
tmpl := &x509.Certificate{
|
||||||
|
SerialNumber: big.NewInt(1),
|
||||||
|
Subject: pkix.Name{CommonName: "unibus-test-CA"},
|
||||||
|
NotBefore: time.Now().Add(-time.Hour),
|
||||||
|
NotAfter: time.Now().Add(24 * time.Hour),
|
||||||
|
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature,
|
||||||
|
BasicConstraintsValid: true,
|
||||||
|
IsCA: true,
|
||||||
|
}
|
||||||
|
der, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &key.PublicKey, key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create CA cert: %v", err)
|
||||||
|
}
|
||||||
|
caCert, err := x509.ParseCertificate(der)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parse CA cert: %v", err)
|
||||||
|
}
|
||||||
|
return caCert, key
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeNodeCert issues a node certificate signed by ca (SAN 127.0.0.1/::1,
|
||||||
|
// usable as both server and client) and writes cert/key/ca PEM files, returning
|
||||||
|
// their paths for RouteTLSConfig.
|
||||||
|
func writeNodeCert(t *testing.T, dir, name string, ca *x509.Certificate, caKey *ecdsa.PrivateKey) *clusterTLS {
|
||||||
|
t.Helper()
|
||||||
|
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("gen node key: %v", err)
|
||||||
|
}
|
||||||
|
tmpl := &x509.Certificate{
|
||||||
|
SerialNumber: big.NewInt(time.Now().UnixNano()),
|
||||||
|
Subject: pkix.Name{CommonName: name},
|
||||||
|
NotBefore: time.Now().Add(-time.Hour),
|
||||||
|
NotAfter: time.Now().Add(24 * time.Hour),
|
||||||
|
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
|
||||||
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
|
||||||
|
IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")},
|
||||||
|
DNSNames: []string{"localhost"},
|
||||||
|
}
|
||||||
|
der, err := x509.CreateCertificate(rand.Reader, tmpl, ca, &key.PublicKey, caKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create node cert: %v", err)
|
||||||
|
}
|
||||||
|
certPath := filepath.Join(dir, name+".crt")
|
||||||
|
keyPath := filepath.Join(dir, name+".key")
|
||||||
|
caPath := filepath.Join(dir, name+"-ca.crt")
|
||||||
|
|
||||||
|
writePEM(t, certPath, "CERTIFICATE", der)
|
||||||
|
keyDER, err := x509.MarshalECPrivateKey(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("marshal node key: %v", err)
|
||||||
|
}
|
||||||
|
writePEM(t, keyPath, "EC PRIVATE KEY", keyDER)
|
||||||
|
writePEM(t, caPath, "CERTIFICATE", ca.Raw)
|
||||||
|
return &clusterTLS{cert: certPath, key: keyPath, ca: caPath}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writePEM(t *testing.T, path, blockType string, der []byte) {
|
||||||
|
t.Helper()
|
||||||
|
b := pem.EncodeToMemory(&pem.Block{Type: blockType, Bytes: der})
|
||||||
|
if err := os.WriteFile(path, b, 0o600); err != nil {
|
||||||
|
t.Fatalf("write %s: %v", path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,25 +8,76 @@ package embeddednats
|
|||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
server "github.com/nats-io/nats-server/v2/server"
|
server "github.com/nats-io/nats-server/v2/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ClusterConfig configures the route layer that links several embedded NATS
|
||||||
|
// servers into a single cluster (issue 0003a). It is the data-plane side of
|
||||||
|
// high availability: with a cluster, a client subject published on one node is
|
||||||
|
// forwarded to subscribers connected to any other node, and (with JetStream
|
||||||
|
// replicas > 1) streams/KV are RAFT-replicated across nodes so the loss of one
|
||||||
|
// node does not lose the bus.
|
||||||
|
//
|
||||||
|
// The route layer is a SEPARATE trust boundary from the client data plane: it
|
||||||
|
// carries server-to-server traffic, so it authenticates NODES, not bus users.
|
||||||
|
// Never reuse the nkey client authenticator here. Routes are secured with their
|
||||||
|
// own shared secret (Username/Password -> NATS Cluster.Authorization) and their
|
||||||
|
// own mutual TLS (TLS, built from the bus CA with busauth.RouteTLSConfig): a
|
||||||
|
// node without the cluster secret and a CA-signed node certificate cannot join
|
||||||
|
// the cluster nor inject messages into it.
|
||||||
|
type ClusterConfig struct {
|
||||||
|
// Name is the cluster name; it MUST be identical on every node or the
|
||||||
|
// servers refuse to gossip routes to each other.
|
||||||
|
Name string
|
||||||
|
// Host and Port are the route listener (server-to-server), distinct from the
|
||||||
|
// client Host/Port. Use a free, non-client port (e.g. 6250).
|
||||||
|
Host string
|
||||||
|
Port int
|
||||||
|
// Routes are the nats-route URLs of the OTHER nodes, e.g.
|
||||||
|
// "nats://user:pass@10.0.0.2:6250". When the route layer is password
|
||||||
|
// protected each URL must carry the same userinfo as the local Username /
|
||||||
|
// Password so this node authenticates outbound to its peers.
|
||||||
|
Routes []string
|
||||||
|
// Username and Password gate the route listener (NATS Cluster.Authorization).
|
||||||
|
// A peer (or impostor) that connects to this node's route port without these
|
||||||
|
// credentials is rejected, so it never becomes a route. Empty disables route
|
||||||
|
// auth (dev / trusted-network only).
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
// TLS, when non-nil, secures the route connections with mutual TLS. Build it
|
||||||
|
// with busauth.RouteTLSConfig(cert, key, ca): the server presents its node
|
||||||
|
// certificate AND requires+verifies the connecting node's certificate against
|
||||||
|
// the bus CA, so an unsigned impostor cannot establish a route even with the
|
||||||
|
// right password. Nil keeps routes plaintext (dev / WireGuard-only).
|
||||||
|
TLS *tls.Config
|
||||||
|
}
|
||||||
|
|
||||||
// ServerConfig is the full set of knobs for the embedded NATS server. The zero
|
// ServerConfig is the full set of knobs for the embedded NATS server. The zero
|
||||||
// value (empty StoreDir aside) yields a dev-friendly server: JetStream on, bound
|
// value (empty StoreDir aside) yields a dev-friendly server: JetStream on, bound
|
||||||
// to all interfaces, no client auth, no TLS. Secured deployments set Auth and
|
// to all interfaces, no client auth, no TLS, standalone (no cluster). Secured
|
||||||
// TLS; tests set Host to loopback and a free Port.
|
// deployments set Auth and TLS; HA deployments set ServerName + Cluster; tests
|
||||||
|
// set Host to loopback and a free Port.
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
StoreDir string // JetStream store directory
|
StoreDir string // JetStream store directory
|
||||||
Host string // bind interface; "" = nats-server default ("0.0.0.0")
|
Host string // bind interface; "" = nats-server default ("0.0.0.0")
|
||||||
Port int // listen port
|
Port int // listen port
|
||||||
|
// ServerName is this node's unique name within the cluster. JetStream's RAFT
|
||||||
|
// layer requires a stable, unique name per node to form its meta-group; leave
|
||||||
|
// it empty for a standalone server (nats-server then auto-generates one).
|
||||||
|
ServerName string
|
||||||
// Auth, when non-nil, is installed as CustomClientAuthentication so the data
|
// Auth, when non-nil, is installed as CustomClientAuthentication so the data
|
||||||
// plane only accepts approved clients (nkey signature + bus allowlist).
|
// plane only accepts approved clients (nkey signature + bus allowlist).
|
||||||
Auth server.Authentication
|
Auth server.Authentication
|
||||||
// TLS, when non-nil, makes the server present a certificate and require TLS
|
// TLS, when non-nil, makes the server present a certificate and require TLS
|
||||||
// on the data plane. Clients must trust the issuing CA (see busauth).
|
// on the data plane. Clients must trust the issuing CA (see busauth).
|
||||||
TLS *tls.Config
|
TLS *tls.Config
|
||||||
|
// Cluster, when non-nil, joins this server to a route cluster for high
|
||||||
|
// availability (issue 0003a). Nil keeps the server standalone (the legacy
|
||||||
|
// single-node behavior).
|
||||||
|
Cluster *ClusterConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start is a thin backward-compatible wrapper: embedded JetStream server on the
|
// Start is a thin backward-compatible wrapper: embedded JetStream server on the
|
||||||
@@ -60,6 +111,7 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
|||||||
StoreDir: cfg.StoreDir,
|
StoreDir: cfg.StoreDir,
|
||||||
Host: cfg.Host,
|
Host: cfg.Host,
|
||||||
Port: cfg.Port,
|
Port: cfg.Port,
|
||||||
|
ServerName: cfg.ServerName,
|
||||||
DontListen: false,
|
DontListen: false,
|
||||||
// Keep the embedded server quiet by default; the host app logs the URLs.
|
// Keep the embedded server quiet by default; the host app logs the URLs.
|
||||||
NoLog: true,
|
NoLog: true,
|
||||||
@@ -78,6 +130,12 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
|||||||
opts.TLS = true
|
opts.TLS = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.Cluster != nil {
|
||||||
|
if err := applyClusterOpts(opts, cfg.Cluster); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ns, err := server.NewServer(opts)
|
ns, err := server.NewServer(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("embeddednats: new server: %w", err)
|
return nil, fmt.Errorf("embeddednats: new server: %w", err)
|
||||||
@@ -93,6 +151,34 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
|||||||
return ns, nil
|
return ns, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// applyClusterOpts translates a ClusterConfig into the nats-server route options
|
||||||
|
// on opts: the cluster listener (name + host/port + shared-secret auth + mutual
|
||||||
|
// TLS) and the outbound routes to the other nodes. A malformed route URL is a
|
||||||
|
// configuration error and aborts startup rather than silently dropping a peer.
|
||||||
|
func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
|
||||||
|
opts.Cluster = server.ClusterOpts{
|
||||||
|
Name: c.Name,
|
||||||
|
Host: c.Host,
|
||||||
|
Port: c.Port,
|
||||||
|
Username: c.Username,
|
||||||
|
Password: c.Password,
|
||||||
|
}
|
||||||
|
if c.TLS != nil {
|
||||||
|
opts.Cluster.TLSConfig = c.TLS
|
||||||
|
// A generous handshake budget: route TLS does a mutual handshake and the
|
||||||
|
// peer may still be booting. The default 2s can flap on a cold cluster.
|
||||||
|
opts.Cluster.TLSTimeout = 5.0
|
||||||
|
}
|
||||||
|
for _, r := range c.Routes {
|
||||||
|
u, err := url.Parse(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("embeddednats: parse route %q: %w", r, err)
|
||||||
|
}
|
||||||
|
opts.Routes = append(opts.Routes, u)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ClientURL returns a NATS connection URL for the running embedded server.
|
// ClientURL returns a NATS connection URL for the running embedded server.
|
||||||
func ClientURL(ns *server.Server) string {
|
func ClientURL(ns *server.Server) string {
|
||||||
return ns.ClientURL()
|
return ns.ClientURL()
|
||||||
|
|||||||
Reference in New Issue
Block a user