From e9ad7194246008d39a2fbe36c68ce087396dfc60 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 7 Jun 2026 17:14:20 +0200 Subject: [PATCH] feat(0006c): wire the decentralized control-plane KV store (--store kv) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 0003 built the JetStream KV store (jetstreamStore) but the binary never selected it: membership.Open (SQLite) was hardcoded and OpenJetStream was only reached by migrate-to-kv. This completes the wiring so a node actually serves its control plane from the replicated KV. - New flag --store kv|sqlite (default sqlite). kv opens the JetStream KV control plane over the privileged internal connection; sqlite is the unchanged baseline (branch-by-abstraction: the full suite's SQLite paths are untouched). - Bootstrap cycle resolved with storeHolder: the authenticator consults the holder (fail-closed until set), so it can be built before the KV store exists. The KV store opens after NATS is up and is published into the holder. The only client that can connect in that window is the internal identity, which bypasses the store by key. In SQLite mode the store is set before StartServer, so the window does not exist. - needJS now covers --store kv as well as --cluster-name; the JetStream client is shared by the KV store and the replicated nonce bucket. - feature_flags.json: decentralized wiring documented as complete, realized via --store kv (opt-in per deploy; default stays sqlite). Fail-closed preserved: jetstreamStore.IsAuthorized already denies on any backend error; the holder denies while unset. Tests: - TestStoreHolderFailClosed: empty holder denies; serves after set. - TestKVStoreBootstrapUnderEnforce: end-to-end decentralized boot — KV-seeded user authenticates over nkey under enforce; outsider denied. - TestKVStoreDecentralizedConsistency: a room/user created on one node's KV store is visible to another's (ends the per-node SQLite divergence, audit 0008 N5). CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/membershipd/kv_store_test.go | 154 +++++++++++++++++++++++++++ cmd/membershipd/main.go | 111 +++++++++++++------ cmd/membershipd/store_holder.go | 60 +++++++++++ cmd/membershipd/store_holder_test.go | 51 +++++++++ dev/feature_flags.json | 2 +- 5 files changed, 343 insertions(+), 35 deletions(-) create mode 100644 cmd/membershipd/kv_store_test.go create mode 100644 cmd/membershipd/store_holder.go create mode 100644 cmd/membershipd/store_holder_test.go diff --git a/cmd/membershipd/kv_store_test.go b/cmd/membershipd/kv_store_test.go new file mode 100644 index 0000000..43d0749 --- /dev/null +++ b/cmd/membershipd/kv_store_test.go @@ -0,0 +1,154 @@ +package main + +// Wiring tests for issue 0006c: --store kv selects the replicated JetStream KV +// control plane, the authenticator serves from it through the storeHolder, and a +// new node sees state created by another (the divergence that per-node SQLite +// caused — audit 0008 N5 — is gone). Branch-by-abstraction is verified elsewhere +// (the SQLite default path is the unchanged baseline covered by the existing +// suite). + +import ( + "encoding/hex" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// TestKVStoreBootstrapUnderEnforce drives the exact decentralized boot the binary +// performs: build the authenticator over an empty holder, start NATS, open the +// privileged internal connection, open the KV store, publish it into the holder, +// then a real bus user (seeded into the KV store) authenticates over nkey. This +// proves the bootstrap cycle is broken correctly — the KV-backed control plane +// authorizes live clients under enforce. +func TestKVStoreBootstrapUnderEnforce(t *testing.T) { + internalID, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("internal identity: %v", err) + } + holder := &storeHolder{} + auth := busauth.NewNkeyAuthenticatorACLInternal( + holder.IsAuthorized, + busauth.PermissionsFromSubjects(holder.subjectACL), + hex.EncodeToString(internalID.SignPub), + ) + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth, + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + + // Privileged internal connection opens the KV store while the holder still + // denies every normal client. + intNC, js, err := connectInternalJS(ns, internalID, true) + if err != nil { + t.Fatalf("connectInternalJS: %v", err) + } + t.Cleanup(intNC.Close) + kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second}) + if err != nil { + t.Fatalf("open kv store: %v", err) + } + holder.set(kvStore) + + // Seed a bus user into the KV control plane. + alice, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("alice: %v", err) + } + if err := kvStore.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleMember); err != nil { + t.Fatalf("seed alice: %v", err) + } + + // alice authenticates over nkey — authorized via the KV store through the holder. + pub, sign, err := busauth.ClientNkey(alice.SignPriv) + if err != nil { + t.Fatalf("alice nkey: %v", err) + } + aliceNC, err := nats.Connect(ns.ClientURL(), nats.Nkey(pub, sign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)) + if err != nil { + t.Fatalf("alice (KV-authorized) must connect under enforce: %v", err) + } + aliceNC.Close() + + // An outsider not in the KV store is denied (fail closed). + outsider, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("outsider: %v", err) + } + opub, osign, err := busauth.ClientNkey(outsider.SignPriv) + if err != nil { + t.Fatalf("outsider nkey: %v", err) + } + if oc, err := nats.Connect(ns.ClientURL(), nats.Nkey(opub, osign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)); err == nil { + oc.Close() + t.Fatalf("an outsider absent from the KV store must be rejected") + } +} + +// TestKVStoreDecentralizedConsistency: a room/user created via one node's KV store +// is immediately visible to another node's KV store over the same JetStream — the +// shared, replicated control plane that ends the per-node SQLite divergence. +func TestKVStoreDecentralizedConsistency(t *testing.T) { + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + + open := func() membership.Store { + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + t.Fatalf("connect: %v", err) + } + t.Cleanup(nc.Close) + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("jetstream: %v", err) + } + st, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second}) + if err != nil { + t.Fatalf("open kv: %v", err) + } + return st + } + nodeA := open() + nodeB := open() + + owner, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("owner: %v", err) + } + ownerPub := hex.EncodeToString(owner.SignPub) + if err := nodeA.AddUser(ownerPub, "owner", membership.RoleAdmin); err != nil { + t.Fatalf("nodeA add user: %v", err) + } + if err := nodeA.CreateRoom( + membership.RoomInfo{RoomID: "ROOMX", Subject: "room.shared.x", OwnerEndpoint: "owner-ep"}, + owner.SignPub, owner.KexPub, nil, + ); err != nil { + t.Fatalf("nodeA create room: %v", err) + } + + // nodeB (a different connection, same buckets) sees both immediately. + if !nodeB.IsAuthorized(ownerPub) { + t.Fatalf("nodeB must see the user created on nodeA (decentralized state divergence)") + } + got, err := nodeB.GetRoom("ROOMX") + if err != nil { + t.Fatalf("nodeB must see the room created on nodeA: %v", err) + } + if got.Subject != "room.shared.x" { + t.Fatalf("nodeB read wrong room subject: %q", got.Subject) + } +} diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 0e54553..dfa7057 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -73,6 +73,11 @@ func main() { // `nats stream update --replicas 3` when the third node joins). kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)") caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)") + // Control-plane store backend (issue 0006c, feature flag decentralized): + // "sqlite" (default) keeps the local single-node SQLite control plane; + // "kv" puts rooms/members/keys/users in replicated JetStream KV so any node + // in the cluster serves the same state. + storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)") ) flag.Parse() @@ -80,6 +85,9 @@ func main() { if err != nil { log.Fatalf("%v", err) } + if *storeBackend != "sqlite" && *storeBackend != "kv" { + log.Fatalf("--store must be \"sqlite\" or \"kv\", got %q", *storeBackend) + } // Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands // --bus-auth enforce. This makes an insecure public startup impossible rather @@ -97,12 +105,13 @@ func main() { log.SetPrefix("[membershipd] ") // A clustered node shares its control plane with peers, so it needs a JetStream - // client to manage the replicated nonce bucket (issue 0006a). needJS will also - // be true under --store kv (issue 0006c), where the control-plane state lives in - // JetStream KV. A standalone single-node deployment needs none of this and keeps - // the in-process, in-memory behavior unchanged. + // client to manage the replicated nonce bucket (issue 0006a). --store kv (issue + // 0006c) also needs JetStream, for the control-plane KV itself. A standalone + // single-node SQLite deployment needs none of this and keeps the in-process, + // in-memory behavior unchanged. clustered := *clusterName != "" - needJS := clustered + decentralized := *storeBackend == "kv" + needJS := clustered || decentralized enforce := authMode == membership.AuthEnforce // Internal service identity (issue 0006a): when the embedded data plane enforces @@ -121,14 +130,29 @@ func main() { internalPubHex = hex.EncodeToString(internalID.SignPub) } - // Control plane store first: the NATS authenticator consults IsAuthorized, so - // the store must exist before the embedded server starts. - store, err := membership.Open(*dbPath) - if err != nil { - log.Fatalf("open membership store: %v", err) + // The authenticator consults the store through a holder so it can be built + // before the store exists: with --store kv the JetStream KV store opens only + // after NATS is up (the bootstrap cycle). In the default SQLite path the store + // is opened and set into the holder right here, before the server starts, so + // behavior is identical to the pre-0006c baseline. `store` is the final store + // used by the HTTP server (set below for the KV path). + holder := &storeHolder{} + var store membership.Store + if !decentralized { + store, err = membership.Open(*dbPath) + if err != nil { + log.Fatalf("open membership store: %v", err) + } + holder.set(store) + log.Printf("membership store: sqlite %s", *dbPath) } - defer store.Close() - log.Printf("membership store: %s", *dbPath) + // Close whichever store ends up final (SQLite closes its file; the JetStream KV + // store's Close is a no-op — its NATS connection is closed separately). + defer func() { + if store != nil { + store.Close() + } + }() blobs, err := blobstore.New(*storeDir) if err != nil { @@ -182,8 +206,8 @@ func main() { // NATS freezes permissions at connect time, so a peer that joins a room // after connecting must client.RefreshSession to gain that room's subject. cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal( - store.IsAuthorized, - busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)), + holder.IsAuthorized, + busauth.PermissionsFromSubjects(holder.subjectACL), internalPubHex, ) log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)") @@ -209,6 +233,38 @@ func main() { log.Printf("using external NATS: %s", natsClientURL) } + // JetStream client + decentralized store (issue 0006a/c). needJS is set for a + // clustered node (shared nonce bucket) and for --store kv (the KV control + // plane). Open the privileged JetStream client first (in-process for the + // embedded server, a plain client for external NATS), then — for --store kv — + // open the replicated KV store and publish it into the holder so the + // authenticator and HTTP server serve from it. The privileged connection is the + // only client that can connect in this window (the holder still denies everyone + // else; the internal identity bypasses the store). + var js jetstream.JetStream + if needJS { + var internalNC *nats.Conn + if *natsURL == "" { + internalNC, js, err = connectInternalJS(ns, internalID, enforce) + } else { + internalNC, js, err = connectExternalJS(natsClientURL, *caFile) + } + if err != nil { + log.Fatalf("internal JetStream connection (required by --cluster-name/--store kv): %v", err) + } + defer internalNC.Close() + + if decentralized { + kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: *kvReplicas}) + if err != nil { + log.Fatalf("open decentralized control-plane KV store: %v", err) + } + store = kvStore + holder.set(store) + log.Printf("membership store: jetstream KV (replicas=%d)", *kvReplicas) + } + } + srv := membership.NewServer(store, blobs, authMode) // On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS // has no per-subject ACL, so cleartext content would be readable by any @@ -220,30 +276,17 @@ func main() { } // Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST - // share its nonce store across the cluster via JetStream KV, or a request - // accepted on one node can be replayed to another. Open a privileged JetStream - // client (in-process for the embedded server, a plain client for an external - // NATS) and wire the shared nonce bucket. This is a HARD requirement: if the - // bucket cannot be created the node refuses to start rather than run with a - // per-process cache that leaves the replay hole open. + // share its nonce store across the cluster, or a request accepted on one node + // can be replayed to another. HARD requirement: if the bucket cannot be created + // the node refuses to start rather than run with a per-process cache that leaves + // the replay hole open. if needJS { - var ( - internalNC *nats.Conn - js jetstream.JetStream - ) - if *natsURL == "" { - internalNC, js, err = connectInternalJS(ns, internalID, enforce) - } else { - internalNC, js, err = connectExternalJS(natsClientURL, *caFile) - } - if err != nil { - log.Fatalf("internal JetStream connection (required by --cluster-name): %v", err) - } - defer internalNC.Close() if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil { log.Fatalf("%v", err) } - log.Printf("anti-replay: replicated nonce bucket %q (replicas=%d) — cluster-safe", "KV_UNIBUS_nonces", *kvReplicas) + if clustered { + log.Printf("anti-replay: replicated nonce bucket \"KV_UNIBUS_nonces\" (replicas=%d) — cluster-safe", *kvReplicas) + } } log.Printf("control-plane auth: %s", authMode) diff --git a/cmd/membershipd/store_holder.go b/cmd/membershipd/store_holder.go new file mode 100644 index 0000000..493d1a8 --- /dev/null +++ b/cmd/membershipd/store_holder.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "sync" + + "github.com/enmanuel/unibus/pkg/membership" +) + +// storeHolder is a concurrency-safe slot for the control-plane store, used to +// break the decentralized bootstrap cycle (issue 0006c): the NATS authenticator +// must be built BEFORE the embedded server starts, but the JetStream KV store can +// only be opened AFTER NATS is up (it needs a JetStream client). The authenticator +// therefore consults the holder instead of a concrete store. +// +// Fail-closed by construction: until the store is set, IsAuthorized denies and +// SubjectACL errors, so any client connecting in the startup window is rejected. +// The only connection expected in that window is membershipd's own internal +// service identity, which the authenticator recognizes by key and lets through +// without consulting the store at all. In the SQLite (default) path the store is +// set before StartServer, so the window does not exist and behavior is identical +// to the pre-0006c baseline. +type storeHolder struct { + mu sync.RWMutex + s membership.Store +} + +func (h *storeHolder) set(s membership.Store) { + h.mu.Lock() + h.s = s + h.mu.Unlock() +} + +func (h *storeHolder) get() membership.Store { + h.mu.RLock() + defer h.mu.RUnlock() + return h.s +} + +// IsAuthorized reports whether signPubHex is an active bus user, denying while the +// store is not yet set (fail closed). It is the predicate the nkey authenticator +// uses for every connecting client. +func (h *storeHolder) IsAuthorized(signPubHex string) bool { + s := h.get() + if s == nil { + return false + } + return s.IsAuthorized(signPubHex) +} + +// subjectACL derives the per-subject permissions for signPubHex via the live +// store, erroring (so the caller fails closed and denies the connection) while the +// store is not yet set. +func (h *storeHolder) subjectACL(signPubHex string) ([]string, error) { + s := h.get() + if s == nil { + return nil, fmt.Errorf("control-plane store not ready") + } + return membership.SubjectACLFor(s)(signPubHex) +} diff --git a/cmd/membershipd/store_holder_test.go b/cmd/membershipd/store_holder_test.go new file mode 100644 index 0000000..27b35bc --- /dev/null +++ b/cmd/membershipd/store_holder_test.go @@ -0,0 +1,51 @@ +package main + +import ( + "encoding/hex" + "path/filepath" + "testing" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/membership" +) + +// TestStoreHolderFailClosed: an empty holder denies everything (the bootstrap +// window before the store is set), and starts serving once a store is published. +func TestStoreHolderFailClosed(t *testing.T) { + h := &storeHolder{} + + // Empty: deny + error (fail closed). + if h.IsAuthorized("anything") { + t.Fatalf("empty holder must deny IsAuthorized") + } + if _, err := h.subjectACL("anything"); err == nil { + t.Fatalf("empty holder must error from subjectACL (fail closed)") + } + + // After set: serves from the real store. + store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + id, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + pub := hex.EncodeToString(id.SignPub) + if err := store.AddUser(pub, "alice", membership.RoleMember); err != nil { + t.Fatalf("add user: %v", err) + } + h.set(store) + + if !h.IsAuthorized(pub) { + t.Fatalf("after set, an active user must be authorized") + } + if _, err := h.subjectACL(pub); err != nil { + t.Fatalf("after set, subjectACL must succeed: %v", err) + } + if h.IsAuthorized("deadbeef") { + t.Fatalf("a non-user must not be authorized") + } +} diff --git a/dev/feature_flags.json b/dev/feature_flags.json index f58b826..686de2f 100644 --- a/dev/feature_flags.json +++ b/dev/feature_flags.json @@ -18,7 +18,7 @@ "decentralized": { "enabled": false, "issue": "0003", - "description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.", + "description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default, jetstreamStore opt-in). The route cluster (0003a) and the KV store (0003b) shipped behind this flag; the membershipd boot wiring that selects the store is COMPLETE since issue 0006c and is realized at runtime with the server flag --store kv|sqlite (default sqlite). The internal-identity bootstrap (0006a) lets membershipd open the KV store on its own embedded NATS under enforce. Per-deploy opt-in: a node joins the decentralized control plane by starting with --store kv (and --cluster-name for HA). OFF (--store sqlite) keeps the single-node SQLite control plane unchanged.", "added": "2026-06-07", "enabled_at": null }