Merge quick/cluster-coldstart-fixes: 3-node cluster cold-start fixes + real topology
This commit is contained in:
@@ -2,10 +2,10 @@
|
||||
#
|
||||
# This file is SOURCED by generate-cluster-certs.sh and deploy-cluster.sh.
|
||||
#
|
||||
# HUMAN: fill in every <PLACEHOLDER> with the real value before running the
|
||||
# HUMAN: fill in every placeholder with the real value before running the
|
||||
# scripts. The public IPs known at authoring time are pre-filled; the WireGuard
|
||||
# mesh IPs and magnus's public IP must be supplied. The scripts refuse to run
|
||||
# while any <PLACEHOLDER> remains.
|
||||
# while any unfilled placeholder remains.
|
||||
|
||||
# Cluster identity (must be identical on every node).
|
||||
CLUSTER_NAME="unibus"
|
||||
@@ -16,7 +16,7 @@ CLUSTER_USER="unibus-cluster"
|
||||
# KV/nonce replication factor. START AT 1 for the initial 1->3 rollout, then raise
|
||||
# to 3 IN PLACE (see README "Scale to R3") once all three nodes have joined. Only
|
||||
# set this to 3 here after the third node is up and you re-run the KV update.
|
||||
KV_REPLICAS=1
|
||||
KV_REPLICAS=3
|
||||
|
||||
# Ports (same on every node; the route port is server-to-server only).
|
||||
NATS_CLIENT_PORT=4250
|
||||
@@ -30,15 +30,28 @@ SSH_USER="root"
|
||||
# Which address family the inter-node routes use. "wg" builds --routes from the
|
||||
# WireGuard mesh IPs (private server-to-server links, preferred); "public" uses
|
||||
# the public IPs. The route layer is always mutual-TLS regardless.
|
||||
ROUTE_NETWORK="wg"
|
||||
#
|
||||
# DEPLOY DECISION (2026-06-07): set to "public". No WireGuard mesh exists between
|
||||
# the three cluster nodes — homer and datardos do not even have the `wg` binary
|
||||
# installed, and om's only WG peers are the operator's personal PCs, not the VPS.
|
||||
# Rather than stand up a fresh mesh blindly, the routes go over the public IPs,
|
||||
# still protected by the separate cluster route CA (mutual-TLS). On magnus (the
|
||||
# only node with ufw active) the route port 6250 is restricted to the homer and
|
||||
# datardos public IPs; homer/datardos run ufw inactive (Docker hosts) and rely on
|
||||
# the route mutual-TLS for 6250.
|
||||
ROUTE_NETWORK="public"
|
||||
|
||||
# One row per node: NAME SSH_HOST PUBLIC_IP WG_IP
|
||||
# NAME -> --server-name and the per-node cert filenames (unique).
|
||||
# SSH_HOST -> the `ssh <SSH_HOST>` alias (see ~/.ssh/config).
|
||||
# SSH_HOST -> the `ssh ALIAS` alias (see ~/.ssh/config).
|
||||
# PUBLIC_IP -> public address; goes in the cert SANs (client-facing data plane).
|
||||
# WG_IP -> WireGuard mesh address; cert SAN + route target when ROUTE_NETWORK=wg.
|
||||
# NOTE: with ROUTE_NETWORK=public and no WireGuard mesh, the WG_IP column is set to
|
||||
# each node's public IP so the cert SAN covers the address actually used by the
|
||||
# public routes and no unfilled placeholder remains (scripts refuse to run otherwise).
|
||||
# magnus == organic-machine.com == om (135.125.201.30); SSH alias `magnus` enters as root.
|
||||
CLUSTER_NODES=(
|
||||
"magnus magnus <MAGNUS_PUBLIC_IP> <MAGNUS_WG_IP>"
|
||||
"homer homer 141.94.69.66 <HOMER_WG_IP>"
|
||||
"datardos dd 51.91.100.142 <DATARDOS_WG_IP>"
|
||||
"magnus magnus 135.125.201.30 135.125.201.30"
|
||||
"homer homer 141.94.69.66 141.94.69.66"
|
||||
"datardos dd 51.91.100.142 51.91.100.142"
|
||||
)
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
@@ -106,6 +107,13 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
|
||||
// blocks until the server is ready to accept connections (up to 5s) and returns
|
||||
// the running server; the caller must Shutdown it.
|
||||
func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
// Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own
|
||||
// logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by
|
||||
// default so production behavior is unchanged; only set it when debugging the
|
||||
// cluster route layer.
|
||||
debugLevel := os.Getenv("UNIBUS_NATS_DEBUG")
|
||||
debugNATS := debugLevel == "1" || debugLevel == "2"
|
||||
traceNATS := debugLevel == "2"
|
||||
opts := &server.Options{
|
||||
JetStream: true,
|
||||
StoreDir: cfg.StoreDir,
|
||||
@@ -114,8 +122,17 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
ServerName: cfg.ServerName,
|
||||
DontListen: false,
|
||||
// Keep the embedded server quiet by default; the host app logs the URLs.
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
NoLog: !debugNATS,
|
||||
Debug: debugNATS,
|
||||
Trace: traceNATS,
|
||||
Logtime: true,
|
||||
NoSigs: true,
|
||||
}
|
||||
if debugNATS {
|
||||
// Expose the nats-server monitoring endpoint (loopback) so the operator can
|
||||
// inspect /jsz, /routez, /varz while debugging the cluster meta-group.
|
||||
opts.HTTPHost = "127.0.0.1"
|
||||
opts.HTTPPort = 8222
|
||||
}
|
||||
if cfg.Auth != nil {
|
||||
opts.CustomClientAuthentication = cfg.Auth
|
||||
@@ -141,6 +158,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
return nil, fmt.Errorf("embeddednats: new server: %w", err)
|
||||
}
|
||||
|
||||
if debugNATS {
|
||||
ns.ConfigureLogger()
|
||||
}
|
||||
|
||||
go ns.Start()
|
||||
|
||||
if !ns.ReadyForConnections(5 * time.Second) {
|
||||
@@ -162,6 +183,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
|
||||
Port: c.Port,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
|
||||
// 3 connections per peer). On a small cluster the pool churns with
|
||||
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
|
||||
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
|
||||
// meta never becomes current and stream/KV creation hangs (issue 0006g).
|
||||
// PoolSize=-1 forces the classic single route per peer, which is stable for
|
||||
// the 3-node unibus cluster.
|
||||
PoolSize: -1,
|
||||
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
|
||||
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
|
||||
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
|
||||
// then try to dial those private, mutually-unreachable IPs, churning the
|
||||
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
|
||||
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
|
||||
NoAdvertise: true,
|
||||
}
|
||||
if c.TLS != nil {
|
||||
opts.Cluster.TLSConfig = c.TLS
|
||||
|
||||
@@ -85,8 +85,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
if opTimeout <= 0 {
|
||||
opTimeout = defaultKVOpTime
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
|
||||
// is ready the instant the server starts, so the first attempt succeeds. On a
|
||||
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
|
||||
// each node must establish contact with it before its $JS.API responds. A KV
|
||||
// op is a NATS request/reply: if it is published before the node's JetStream is
|
||||
// ready the request is dropped (not queued), and a single long-context call then
|
||||
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
|
||||
// short per-attempt contexts until it succeeds or the overall bootstrap budget
|
||||
// is exhausted; once the cluster is ready the next retry lands and the buckets
|
||||
// are created, after which they persist and every node opens them quickly.
|
||||
bootstrapBudget := 120 * time.Second
|
||||
deadline := time.Now().Add(bootstrapBudget)
|
||||
|
||||
s := &jetstreamStore{opTimeout: opTimeout}
|
||||
for _, b := range []struct {
|
||||
@@ -99,14 +109,27 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
{bucketRoomKeys, &s.keys},
|
||||
{bucketUsers, &s.users},
|
||||
} {
|
||||
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
|
||||
var kv jetstream.KeyValue
|
||||
var lastErr error
|
||||
for {
|
||||
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
cancel()
|
||||
if lastErr == nil {
|
||||
break
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
|
||||
}
|
||||
// JetStream not ready yet (no meta leader / request dropped). Wait and
|
||||
// re-publish the op; in a cluster cold start this lands once the meta
|
||||
// group settles.
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
*b.dst = kv
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user