feat(0006c): wire the decentralized control-plane KV store (--store kv)

0003 built the JetStream KV store (jetstreamStore) but the binary never selected
it: membership.Open (SQLite) was hardcoded and OpenJetStream was only reached by
migrate-to-kv. This completes the wiring so a node actually serves its control
plane from the replicated KV.

- New flag --store kv|sqlite (default sqlite). kv opens the JetStream KV control
  plane over the privileged internal connection; sqlite is the unchanged baseline
  (branch-by-abstraction: the full suite's SQLite paths are untouched).
- Bootstrap cycle resolved with storeHolder: the authenticator consults the holder
  (fail-closed until set), so it can be built before the KV store exists. The KV
  store opens after NATS is up and is published into the holder. The only client
  that can connect in that window is the internal identity, which bypasses the
  store by key. In SQLite mode the store is set before StartServer, so the window
  does not exist.
- needJS now covers --store kv as well as --cluster-name; the JetStream client is
  shared by the KV store and the replicated nonce bucket.
- feature_flags.json: decentralized wiring documented as complete, realized via
  --store kv (opt-in per deploy; default stays sqlite).

Fail-closed preserved: jetstreamStore.IsAuthorized already denies on any backend
error; the holder denies while unset.

Tests:
- TestStoreHolderFailClosed: empty holder denies; serves after set.
- TestKVStoreBootstrapUnderEnforce: end-to-end decentralized boot — KV-seeded user
  authenticates over nkey under enforce; outsider denied.
- TestKVStoreDecentralizedConsistency: a room/user created on one node's KV store
  is visible to another's (ends the per-node SQLite divergence, audit 0008 N5).

CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 17:14:20 +02:00
parent d1e1a478f8
commit e9ad719424
5 changed files with 343 additions and 35 deletions
+77 -34
View File
@@ -73,6 +73,11 @@ func main() {
// `nats stream update --replicas 3` when the third node joins).
kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)")
caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)")
// Control-plane store backend (issue 0006c, feature flag decentralized):
// "sqlite" (default) keeps the local single-node SQLite control plane;
// "kv" puts rooms/members/keys/users in replicated JetStream KV so any node
// in the cluster serves the same state.
storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)")
)
flag.Parse()
@@ -80,6 +85,9 @@ func main() {
if err != nil {
log.Fatalf("%v", err)
}
if *storeBackend != "sqlite" && *storeBackend != "kv" {
log.Fatalf("--store must be \"sqlite\" or \"kv\", got %q", *storeBackend)
}
// Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands
// --bus-auth enforce. This makes an insecure public startup impossible rather
@@ -97,12 +105,13 @@ func main() {
log.SetPrefix("[membershipd] ")
// A clustered node shares its control plane with peers, so it needs a JetStream
// client to manage the replicated nonce bucket (issue 0006a). needJS will also
// be true under --store kv (issue 0006c), where the control-plane state lives in
// JetStream KV. A standalone single-node deployment needs none of this and keeps
// the in-process, in-memory behavior unchanged.
// client to manage the replicated nonce bucket (issue 0006a). --store kv (issue
// 0006c) also needs JetStream, for the control-plane KV itself. A standalone
// single-node SQLite deployment needs none of this and keeps the in-process,
// in-memory behavior unchanged.
clustered := *clusterName != ""
needJS := clustered
decentralized := *storeBackend == "kv"
needJS := clustered || decentralized
enforce := authMode == membership.AuthEnforce
// Internal service identity (issue 0006a): when the embedded data plane enforces
@@ -121,14 +130,29 @@ func main() {
internalPubHex = hex.EncodeToString(internalID.SignPub)
}
// Control plane store first: the NATS authenticator consults IsAuthorized, so
// the store must exist before the embedded server starts.
store, err := membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
// The authenticator consults the store through a holder so it can be built
// before the store exists: with --store kv the JetStream KV store opens only
// after NATS is up (the bootstrap cycle). In the default SQLite path the store
// is opened and set into the holder right here, before the server starts, so
// behavior is identical to the pre-0006c baseline. `store` is the final store
// used by the HTTP server (set below for the KV path).
holder := &storeHolder{}
var store membership.Store
if !decentralized {
store, err = membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
}
holder.set(store)
log.Printf("membership store: sqlite %s", *dbPath)
}
defer store.Close()
log.Printf("membership store: %s", *dbPath)
// Close whichever store ends up final (SQLite closes its file; the JetStream KV
// store's Close is a no-op — its NATS connection is closed separately).
defer func() {
if store != nil {
store.Close()
}
}()
blobs, err := blobstore.New(*storeDir)
if err != nil {
@@ -182,8 +206,8 @@ func main() {
// NATS freezes permissions at connect time, so a peer that joins a room
// after connecting must client.RefreshSession to gain that room's subject.
cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal(
store.IsAuthorized,
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
holder.IsAuthorized,
busauth.PermissionsFromSubjects(holder.subjectACL),
internalPubHex,
)
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
@@ -209,6 +233,38 @@ func main() {
log.Printf("using external NATS: %s", natsClientURL)
}
// JetStream client + decentralized store (issue 0006a/c). needJS is set for a
// clustered node (shared nonce bucket) and for --store kv (the KV control
// plane). Open the privileged JetStream client first (in-process for the
// embedded server, a plain client for external NATS), then — for --store kv —
// open the replicated KV store and publish it into the holder so the
// authenticator and HTTP server serve from it. The privileged connection is the
// only client that can connect in this window (the holder still denies everyone
// else; the internal identity bypasses the store).
var js jetstream.JetStream
if needJS {
var internalNC *nats.Conn
if *natsURL == "" {
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
} else {
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
}
if err != nil {
log.Fatalf("internal JetStream connection (required by --cluster-name/--store kv): %v", err)
}
defer internalNC.Close()
if decentralized {
kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: *kvReplicas})
if err != nil {
log.Fatalf("open decentralized control-plane KV store: %v", err)
}
store = kvStore
holder.set(store)
log.Printf("membership store: jetstream KV (replicas=%d)", *kvReplicas)
}
}
srv := membership.NewServer(store, blobs, authMode)
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
// has no per-subject ACL, so cleartext content would be readable by any
@@ -220,30 +276,17 @@ func main() {
}
// Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST
// share its nonce store across the cluster via JetStream KV, or a request
// accepted on one node can be replayed to another. Open a privileged JetStream
// client (in-process for the embedded server, a plain client for an external
// NATS) and wire the shared nonce bucket. This is a HARD requirement: if the
// bucket cannot be created the node refuses to start rather than run with a
// per-process cache that leaves the replay hole open.
// share its nonce store across the cluster, or a request accepted on one node
// can be replayed to another. HARD requirement: if the bucket cannot be created
// the node refuses to start rather than run with a per-process cache that leaves
// the replay hole open.
if needJS {
var (
internalNC *nats.Conn
js jetstream.JetStream
)
if *natsURL == "" {
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
} else {
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
}
if err != nil {
log.Fatalf("internal JetStream connection (required by --cluster-name): %v", err)
}
defer internalNC.Close()
if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil {
log.Fatalf("%v", err)
}
log.Printf("anti-replay: replicated nonce bucket %q (replicas=%d) — cluster-safe", "KV_UNIBUS_nonces", *kvReplicas)
if clustered {
log.Printf("anti-replay: replicated nonce bucket \"KV_UNIBUS_nonces\" (replicas=%d) — cluster-safe", *kvReplicas)
}
}
log.Printf("control-plane auth: %s", authMode)