fix(cluster): make the JetStream control-plane survive a cold multi-node start
Bringing up the 3-node cluster from clean stores never converged: every node looped on `open KV bucket "UNIBUS_rooms" (replicas=1): context deadline exceeded`. Three independent defects in the clustered bootstrap path, none of which surface on a single node (where JetStream is ready instantly), caused it: 1. embeddednats: route connection pooling (nats-server 2.10 default pool of 3) churned with "duplicate route"/"client closed" reconnects on the small cluster, interrupting the meta-group RAFT heartbeats and forcing perpetual leader re-elections. Set Cluster.PoolSize = -1 (single route per peer). 2. embeddednats: the cluster nodes are Docker hosts, so NATS advertised the docker bridge IPs (172.x / 10.0.x) to peers, which then tried to dial those private, mutually-unreachable addresses. Set Cluster.NoAdvertise = true so only the explicit public-IP routes are used. Also added a UNIBUS_NATS_DEBUG env toggle (off by default) that enables the embedded server's logger and loopback monitoring port for debugging the route/meta layer. 3. membership.OpenJetStream: a KV op is a NATS request/reply; on a cold cluster the op was published once, before the node had contact with the meta leader, so the request was dropped and the single long-context call just blocked until timeout. Retry each bucket op with short per-attempt contexts until it succeeds or an overall bootstrap budget (120s) is exhausted, so it lands once the meta settles. With these the cluster forms cleanly, creates the KV buckets, scales R1->R3 in place, and survives loss of one node (quorum 2/3). Verified on magnus+homer+datardos. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
@@ -106,6 +107,13 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication)
|
||||
// blocks until the server is ready to accept connections (up to 5s) and returns
|
||||
// the running server; the caller must Shutdown it.
|
||||
func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
// Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own
|
||||
// logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by
|
||||
// default so production behavior is unchanged; only set it when debugging the
|
||||
// cluster route layer.
|
||||
debugLevel := os.Getenv("UNIBUS_NATS_DEBUG")
|
||||
debugNATS := debugLevel == "1" || debugLevel == "2"
|
||||
traceNATS := debugLevel == "2"
|
||||
opts := &server.Options{
|
||||
JetStream: true,
|
||||
StoreDir: cfg.StoreDir,
|
||||
@@ -114,8 +122,17 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
ServerName: cfg.ServerName,
|
||||
DontListen: false,
|
||||
// Keep the embedded server quiet by default; the host app logs the URLs.
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
NoLog: !debugNATS,
|
||||
Debug: debugNATS,
|
||||
Trace: traceNATS,
|
||||
Logtime: true,
|
||||
NoSigs: true,
|
||||
}
|
||||
if debugNATS {
|
||||
// Expose the nats-server monitoring endpoint (loopback) so the operator can
|
||||
// inspect /jsz, /routez, /varz while debugging the cluster meta-group.
|
||||
opts.HTTPHost = "127.0.0.1"
|
||||
opts.HTTPPort = 8222
|
||||
}
|
||||
if cfg.Auth != nil {
|
||||
opts.CustomClientAuthentication = cfg.Auth
|
||||
@@ -141,6 +158,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) {
|
||||
return nil, fmt.Errorf("embeddednats: new server: %w", err)
|
||||
}
|
||||
|
||||
if debugNATS {
|
||||
ns.ConfigureLogger()
|
||||
}
|
||||
|
||||
go ns.Start()
|
||||
|
||||
if !ns.ReadyForConnections(5 * time.Second) {
|
||||
@@ -162,6 +183,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error {
|
||||
Port: c.Port,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
// Disable route connection pooling (nats-server 2.10+ defaults to a pool of
|
||||
// 3 connections per peer). On a small cluster the pool churns with
|
||||
// "duplicate route"/"client closed" reconnects that interrupt the meta-group
|
||||
// RAFT heartbeats, causing perpetual leader re-elections so the JetStream
|
||||
// meta never becomes current and stream/KV creation hangs (issue 0006g).
|
||||
// PoolSize=-1 forces the classic single route per peer, which is stable for
|
||||
// the 3-node unibus cluster.
|
||||
PoolSize: -1,
|
||||
// NoAdvertise stops the server from gossiping its locally-discovered IPs to
|
||||
// peers. The cluster nodes are Docker hosts, so without this NATS advertises
|
||||
// the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers
|
||||
// then try to dial those private, mutually-unreachable IPs, churning the
|
||||
// route layer and destabilizing the JetStream meta-group. With NoAdvertise
|
||||
// the nodes use ONLY the explicit public-IP routes we configure (issue 0006g).
|
||||
NoAdvertise: true,
|
||||
}
|
||||
if c.TLS != nil {
|
||||
opts.Cluster.TLSConfig = c.TLS
|
||||
|
||||
@@ -85,8 +85,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
if opTimeout <= 0 {
|
||||
opTimeout = defaultKVOpTime
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
// Bootstrap budget for creating/opening the buckets. On a single node JetStream
|
||||
// is ready the instant the server starts, so the first attempt succeeds. On a
|
||||
// COLD multi-node cluster the JetStream meta-group must first elect a leader and
|
||||
// each node must establish contact with it before its $JS.API responds. A KV
|
||||
// op is a NATS request/reply: if it is published before the node's JetStream is
|
||||
// ready the request is dropped (not queued), and a single long-context call then
|
||||
// just blocks until it times out (issue 0006g). So we RETRY each bucket op with
|
||||
// short per-attempt contexts until it succeeds or the overall bootstrap budget
|
||||
// is exhausted; once the cluster is ready the next retry lands and the buckets
|
||||
// are created, after which they persist and every node opens them quickly.
|
||||
bootstrapBudget := 120 * time.Second
|
||||
deadline := time.Now().Add(bootstrapBudget)
|
||||
|
||||
s := &jetstreamStore{opTimeout: opTimeout}
|
||||
for _, b := range []struct {
|
||||
@@ -99,14 +109,27 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) {
|
||||
{bucketRoomKeys, &s.keys},
|
||||
{bucketUsers, &s.users},
|
||||
} {
|
||||
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err)
|
||||
var kv jetstream.KeyValue
|
||||
var lastErr error
|
||||
for {
|
||||
opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{
|
||||
Bucket: b.name,
|
||||
Replicas: cfg.Replicas,
|
||||
History: 1,
|
||||
Storage: jetstream.FileStorage,
|
||||
})
|
||||
cancel()
|
||||
if lastErr == nil {
|
||||
break
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr)
|
||||
}
|
||||
// JetStream not ready yet (no meta leader / request dropped). Wait and
|
||||
// re-publish the op; in a cluster cold start this lands once the meta
|
||||
// group settles.
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
*b.dst = kv
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user