diff --git a/deploy/cluster/nodes.env b/deploy/cluster/nodes.env index 4cdb8e07..3a0886d0 100644 --- a/deploy/cluster/nodes.env +++ b/deploy/cluster/nodes.env @@ -2,10 +2,10 @@ # # This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh. # -# HUMAN: fill in every with the real value before running the +# HUMAN: fill in every placeholder with the real value before running the # scripts. The public IPs known at authoring time are pre-filled; the WireGuard # mesh IPs and magnus's public IP must be supplied. The scripts refuse to run -# while any remains. +# while any unfilled placeholder remains. # Cluster identity (must be identical on every node). CLUSTER_NAME="unibus" @@ -16,7 +16,7 @@ CLUSTER_USER="unibus-cluster" # KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise # to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only # set this to 3 here after the third node is up and you re-run the KV update. -KV_REPLICAS=1 +KV_REPLICAS=3 # Ports (same on every node; the route port is server-to-server only). NATS_CLIENT_PORT=4250 @@ -30,15 +30,28 @@ SSH_USER="root" # Which address family the inter-node routes use. "wg" builds --routes from the # WireGuard mesh IPs (private server-to-server links, preferred); "public" uses # the public IPs. The route layer is always mutual-TLS regardless. -ROUTE_NETWORK="wg" +# +# DEPLOY DECISION (2026-06-07): set to "public". No WireGuard mesh exists between +# the three cluster nodes — homer and datardos do not even have the `wg` binary +# installed, and om's only WG peers are the operator's personal PCs, not the VPS. +# Rather than stand up a fresh mesh blindly, the routes go over the public IPs, +# still protected by the separate cluster route CA (mutual-TLS). On magnus (the +# only node with ufw active) the route port 6250 is restricted to the homer and +# datardos public IPs; homer/datardos run ufw inactive (Docker hosts) and rely on +# the route mutual-TLS for 6250. +ROUTE_NETWORK="public" # One row per node: NAME SSH_HOST PUBLIC_IP WG_IP # NAME -> --server-name and the per-node cert filenames (unique). -# SSH_HOST -> the `ssh ` alias (see ~/.ssh/config). +# SSH_HOST -> the `ssh ALIAS` alias (see ~/.ssh/config). # PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane). # WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg. +# NOTE: with ROUTE_NETWORK=public and no WireGuard mesh, the WG_IP column is set to +# each node's public IP so the cert SAN covers the address actually used by the +# public routes and no unfilled placeholder remains (scripts refuse to run otherwise). +# magnus == organic-machine.com == om (135.125.201.30); SSH alias `magnus` enters as root. CLUSTER_NODES=( - "magnus magnus " - "homer homer 141.94.69.66 " - "datardos dd 51.91.100.142 " + "magnus magnus 135.125.201.30 135.125.201.30" + "homer homer 141.94.69.66 141.94.69.66" + "datardos dd 51.91.100.142 51.91.100.142" ) diff --git a/pkg/embeddednats/embeddednats.go b/pkg/embeddednats/embeddednats.go index 1c96e9f1..26cdd10f 100644 --- a/pkg/embeddednats/embeddednats.go +++ b/pkg/embeddednats/embeddednats.go @@ -9,6 +9,7 @@ import ( "crypto/tls" "fmt" "net/url" + "os" "time" server "github.com/nats-io/nats-server/v2/server" @@ -106,6 +107,13 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication) // blocks until the server is ready to accept connections (up to 5s) and returns // the running server; the caller must Shutdown it. func StartServer(cfg ServerConfig) (*server.Server, error) { + // Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own + // logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by + // default so production behavior is unchanged; only set it when debugging the + // cluster route layer. + debugLevel := os.Getenv("UNIBUS_NATS_DEBUG") + debugNATS := debugLevel == "1" || debugLevel == "2" + traceNATS := debugLevel == "2" opts := &server.Options{ JetStream: true, StoreDir: cfg.StoreDir, @@ -114,8 +122,17 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { ServerName: cfg.ServerName, DontListen: false, // Keep the embedded server quiet by default; the host app logs the URLs. - NoLog: true, - NoSigs: true, + NoLog: !debugNATS, + Debug: debugNATS, + Trace: traceNATS, + Logtime: true, + NoSigs: true, + } + if debugNATS { + // Expose the nats-server monitoring endpoint (loopback) so the operator can + // inspect /jsz, /routez, /varz while debugging the cluster meta-group. + opts.HTTPHost = "127.0.0.1" + opts.HTTPPort = 8222 } if cfg.Auth != nil { opts.CustomClientAuthentication = cfg.Auth @@ -141,6 +158,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { return nil, fmt.Errorf("embeddednats: new server: %w", err) } + if debugNATS { + ns.ConfigureLogger() + } + go ns.Start() if !ns.ReadyForConnections(5 * time.Second) { @@ -162,6 +183,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error { Port: c.Port, Username: c.Username, Password: c.Password, + // Disable route connection pooling (nats-server 2.10+ defaults to a pool of + // 3 connections per peer). On a small cluster the pool churns with + // "duplicate route"/"client closed" reconnects that interrupt the meta-group + // RAFT heartbeats, causing perpetual leader re-elections so the JetStream + // meta never becomes current and stream/KV creation hangs (issue 0006g). + // PoolSize=-1 forces the classic single route per peer, which is stable for + // the 3-node unibus cluster. + PoolSize: -1, + // NoAdvertise stops the server from gossiping its locally-discovered IPs to + // peers. The cluster nodes are Docker hosts, so without this NATS advertises + // the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers + // then try to dial those private, mutually-unreachable IPs, churning the + // route layer and destabilizing the JetStream meta-group. With NoAdvertise + // the nodes use ONLY the explicit public-IP routes we configure (issue 0006g). + NoAdvertise: true, } if c.TLS != nil { opts.Cluster.TLSConfig = c.TLS diff --git a/pkg/membership/jetstream_store.go b/pkg/membership/jetstream_store.go index 74f561ef..264f1294 100644 --- a/pkg/membership/jetstream_store.go +++ b/pkg/membership/jetstream_store.go @@ -85,8 +85,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) { if opTimeout <= 0 { opTimeout = defaultKVOpTime } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() + // Bootstrap budget for creating/opening the buckets. On a single node JetStream + // is ready the instant the server starts, so the first attempt succeeds. On a + // COLD multi-node cluster the JetStream meta-group must first elect a leader and + // each node must establish contact with it before its $JS.API responds. A KV + // op is a NATS request/reply: if it is published before the node's JetStream is + // ready the request is dropped (not queued), and a single long-context call then + // just blocks until it times out (issue 0006g). So we RETRY each bucket op with + // short per-attempt contexts until it succeeds or the overall bootstrap budget + // is exhausted; once the cluster is ready the next retry lands and the buckets + // are created, after which they persist and every node opens them quickly. + bootstrapBudget := 120 * time.Second + deadline := time.Now().Add(bootstrapBudget) s := &jetstreamStore{opTimeout: opTimeout} for _, b := range []struct { @@ -99,14 +109,27 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) { {bucketRoomKeys, &s.keys}, {bucketUsers, &s.users}, } { - kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ - Bucket: b.name, - Replicas: cfg.Replicas, - History: 1, - Storage: jetstream.FileStorage, - }) - if err != nil { - return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err) + var kv jetstream.KeyValue + var lastErr error + for { + opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{ + Bucket: b.name, + Replicas: cfg.Replicas, + History: 1, + Storage: jetstream.FileStorage, + }) + cancel() + if lastErr == nil { + break + } + if time.Now().After(deadline) { + return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr) + } + // JetStream not ready yet (no meta leader / request dropped). Wait and + // re-publish the op; in a cluster cold start this lands once the meta + // group settles. + time.Sleep(1 * time.Second) } *b.dst = kv }