diff --git a/pkg/embeddednats/embeddednats.go b/pkg/embeddednats/embeddednats.go index 1c96e9f1..26cdd10f 100644 --- a/pkg/embeddednats/embeddednats.go +++ b/pkg/embeddednats/embeddednats.go @@ -9,6 +9,7 @@ import ( "crypto/tls" "fmt" "net/url" + "os" "time" server "github.com/nats-io/nats-server/v2/server" @@ -106,6 +107,13 @@ func StartHostAuth(storeDir, host string, port int, auth server.Authentication) // blocks until the server is ready to accept connections (up to 5s) and returns // the running server; the caller must Shutdown it. func StartServer(cfg ServerConfig) (*server.Server, error) { + // Diagnostic toggle: UNIBUS_NATS_DEBUG=1 enables the embedded nats-server's own + // logger (route/RAFT/JetStream errors), which is otherwise silenced. Off by + // default so production behavior is unchanged; only set it when debugging the + // cluster route layer. + debugLevel := os.Getenv("UNIBUS_NATS_DEBUG") + debugNATS := debugLevel == "1" || debugLevel == "2" + traceNATS := debugLevel == "2" opts := &server.Options{ JetStream: true, StoreDir: cfg.StoreDir, @@ -114,8 +122,17 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { ServerName: cfg.ServerName, DontListen: false, // Keep the embedded server quiet by default; the host app logs the URLs. - NoLog: true, - NoSigs: true, + NoLog: !debugNATS, + Debug: debugNATS, + Trace: traceNATS, + Logtime: true, + NoSigs: true, + } + if debugNATS { + // Expose the nats-server monitoring endpoint (loopback) so the operator can + // inspect /jsz, /routez, /varz while debugging the cluster meta-group. + opts.HTTPHost = "127.0.0.1" + opts.HTTPPort = 8222 } if cfg.Auth != nil { opts.CustomClientAuthentication = cfg.Auth @@ -141,6 +158,10 @@ func StartServer(cfg ServerConfig) (*server.Server, error) { return nil, fmt.Errorf("embeddednats: new server: %w", err) } + if debugNATS { + ns.ConfigureLogger() + } + go ns.Start() if !ns.ReadyForConnections(5 * time.Second) { @@ -162,6 +183,21 @@ func applyClusterOpts(opts *server.Options, c *ClusterConfig) error { Port: c.Port, Username: c.Username, Password: c.Password, + // Disable route connection pooling (nats-server 2.10+ defaults to a pool of + // 3 connections per peer). On a small cluster the pool churns with + // "duplicate route"/"client closed" reconnects that interrupt the meta-group + // RAFT heartbeats, causing perpetual leader re-elections so the JetStream + // meta never becomes current and stream/KV creation hangs (issue 0006g). + // PoolSize=-1 forces the classic single route per peer, which is stable for + // the 3-node unibus cluster. + PoolSize: -1, + // NoAdvertise stops the server from gossiping its locally-discovered IPs to + // peers. The cluster nodes are Docker hosts, so without this NATS advertises + // the docker bridge addresses (172.x / 10.0.x) as reachable routes; peers + // then try to dial those private, mutually-unreachable IPs, churning the + // route layer and destabilizing the JetStream meta-group. With NoAdvertise + // the nodes use ONLY the explicit public-IP routes we configure (issue 0006g). + NoAdvertise: true, } if c.TLS != nil { opts.Cluster.TLSConfig = c.TLS diff --git a/pkg/membership/jetstream_store.go b/pkg/membership/jetstream_store.go index 74f561ef..264f1294 100644 --- a/pkg/membership/jetstream_store.go +++ b/pkg/membership/jetstream_store.go @@ -85,8 +85,18 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) { if opTimeout <= 0 { opTimeout = defaultKVOpTime } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() + // Bootstrap budget for creating/opening the buckets. On a single node JetStream + // is ready the instant the server starts, so the first attempt succeeds. On a + // COLD multi-node cluster the JetStream meta-group must first elect a leader and + // each node must establish contact with it before its $JS.API responds. A KV + // op is a NATS request/reply: if it is published before the node's JetStream is + // ready the request is dropped (not queued), and a single long-context call then + // just blocks until it times out (issue 0006g). So we RETRY each bucket op with + // short per-attempt contexts until it succeeds or the overall bootstrap budget + // is exhausted; once the cluster is ready the next retry lands and the buckets + // are created, after which they persist and every node opens them quickly. + bootstrapBudget := 120 * time.Second + deadline := time.Now().Add(bootstrapBudget) s := &jetstreamStore{opTimeout: opTimeout} for _, b := range []struct { @@ -99,14 +109,27 @@ func OpenJetStream(js jetstream.JetStream, cfg JetStreamConfig) (Store, error) { {bucketRoomKeys, &s.keys}, {bucketUsers, &s.users}, } { - kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ - Bucket: b.name, - Replicas: cfg.Replicas, - History: 1, - Storage: jetstream.FileStorage, - }) - if err != nil { - return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d): %w", b.name, cfg.Replicas, err) + var kv jetstream.KeyValue + var lastErr error + for { + opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + kv, lastErr = js.CreateOrUpdateKeyValue(opCtx, jetstream.KeyValueConfig{ + Bucket: b.name, + Replicas: cfg.Replicas, + History: 1, + Storage: jetstream.FileStorage, + }) + cancel() + if lastErr == nil { + break + } + if time.Now().After(deadline) { + return nil, fmt.Errorf("membership: open KV bucket %q (replicas=%d) after %s: %w", b.name, cfg.Replicas, bootstrapBudget, lastErr) + } + // JetStream not ready yet (no meta leader / request dropped). Wait and + // re-publish the op; in a cluster cold start this lands once the meta + // group settles. + time.Sleep(1 * time.Second) } *b.dst = kv }