diff --git a/cmd/membershipd/kv_store_test.go b/cmd/membershipd/kv_store_test.go new file mode 100644 index 0000000..43d0749 --- /dev/null +++ b/cmd/membershipd/kv_store_test.go @@ -0,0 +1,154 @@ +package main + +// Wiring tests for issue 0006c: --store kv selects the replicated JetStream KV +// control plane, the authenticator serves from it through the storeHolder, and a +// new node sees state created by another (the divergence that per-node SQLite +// caused — audit 0008 N5 — is gone). Branch-by-abstraction is verified elsewhere +// (the SQLite default path is the unchanged baseline covered by the existing +// suite). + +import ( + "encoding/hex" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// TestKVStoreBootstrapUnderEnforce drives the exact decentralized boot the binary +// performs: build the authenticator over an empty holder, start NATS, open the +// privileged internal connection, open the KV store, publish it into the holder, +// then a real bus user (seeded into the KV store) authenticates over nkey. This +// proves the bootstrap cycle is broken correctly — the KV-backed control plane +// authorizes live clients under enforce. +func TestKVStoreBootstrapUnderEnforce(t *testing.T) { + internalID, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("internal identity: %v", err) + } + holder := &storeHolder{} + auth := busauth.NewNkeyAuthenticatorACLInternal( + holder.IsAuthorized, + busauth.PermissionsFromSubjects(holder.subjectACL), + hex.EncodeToString(internalID.SignPub), + ) + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), Auth: auth, + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + + // Privileged internal connection opens the KV store while the holder still + // denies every normal client. + intNC, js, err := connectInternalJS(ns, internalID, true) + if err != nil { + t.Fatalf("connectInternalJS: %v", err) + } + t.Cleanup(intNC.Close) + kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second}) + if err != nil { + t.Fatalf("open kv store: %v", err) + } + holder.set(kvStore) + + // Seed a bus user into the KV control plane. + alice, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("alice: %v", err) + } + if err := kvStore.AddUser(hex.EncodeToString(alice.SignPub), "alice", membership.RoleMember); err != nil { + t.Fatalf("seed alice: %v", err) + } + + // alice authenticates over nkey — authorized via the KV store through the holder. + pub, sign, err := busauth.ClientNkey(alice.SignPriv) + if err != nil { + t.Fatalf("alice nkey: %v", err) + } + aliceNC, err := nats.Connect(ns.ClientURL(), nats.Nkey(pub, sign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)) + if err != nil { + t.Fatalf("alice (KV-authorized) must connect under enforce: %v", err) + } + aliceNC.Close() + + // An outsider not in the KV store is denied (fail closed). + outsider, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("outsider: %v", err) + } + opub, osign, err := busauth.ClientNkey(outsider.SignPriv) + if err != nil { + t.Fatalf("outsider nkey: %v", err) + } + if oc, err := nats.Connect(ns.ClientURL(), nats.Nkey(opub, osign), nats.MaxReconnects(0), nats.Timeout(2*time.Second)); err == nil { + oc.Close() + t.Fatalf("an outsider absent from the KV store must be rejected") + } +} + +// TestKVStoreDecentralizedConsistency: a room/user created via one node's KV store +// is immediately visible to another node's KV store over the same JetStream — the +// shared, replicated control plane that ends the per-node SQLite divergence. +func TestKVStoreDecentralizedConsistency(t *testing.T) { + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: freePort(t), + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + + open := func() membership.Store { + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + t.Fatalf("connect: %v", err) + } + t.Cleanup(nc.Close) + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("jetstream: %v", err) + } + st, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: 1, OpTimeout: 3 * time.Second}) + if err != nil { + t.Fatalf("open kv: %v", err) + } + return st + } + nodeA := open() + nodeB := open() + + owner, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("owner: %v", err) + } + ownerPub := hex.EncodeToString(owner.SignPub) + if err := nodeA.AddUser(ownerPub, "owner", membership.RoleAdmin); err != nil { + t.Fatalf("nodeA add user: %v", err) + } + if err := nodeA.CreateRoom( + membership.RoomInfo{RoomID: "ROOMX", Subject: "room.shared.x", OwnerEndpoint: "owner-ep"}, + owner.SignPub, owner.KexPub, nil, + ); err != nil { + t.Fatalf("nodeA create room: %v", err) + } + + // nodeB (a different connection, same buckets) sees both immediately. + if !nodeB.IsAuthorized(ownerPub) { + t.Fatalf("nodeB must see the user created on nodeA (decentralized state divergence)") + } + got, err := nodeB.GetRoom("ROOMX") + if err != nil { + t.Fatalf("nodeB must see the room created on nodeA: %v", err) + } + if got.Subject != "room.shared.x" { + t.Fatalf("nodeB read wrong room subject: %q", got.Subject) + } +} diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 0e54553..dfa7057 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -73,6 +73,11 @@ func main() { // `nats stream update --replicas 3` when the third node joins). kvReplicas = flag.Int("kv-replicas", 1, "JetStream replication factor for the shared nonce/KV buckets (1..3)") caFile = flag.String("ca", "", "bus CA cert; only used to pin TLS on the internal JetStream connection to an EXTERNAL --nats-url (the embedded server uses an in-process connection that needs no CA)") + // Control-plane store backend (issue 0006c, feature flag decentralized): + // "sqlite" (default) keeps the local single-node SQLite control plane; + // "kv" puts rooms/members/keys/users in replicated JetStream KV so any node + // in the cluster serves the same state. + storeBackend = flag.String("store", "sqlite", "control-plane store backend: sqlite (default, single-node) | kv (replicated JetStream, decentralized)") ) flag.Parse() @@ -80,6 +85,9 @@ func main() { if err != nil { log.Fatalf("%v", err) } + if *storeBackend != "sqlite" && *storeBackend != "kv" { + log.Fatalf("--store must be \"sqlite\" or \"kv\", got %q", *storeBackend) + } // Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands // --bus-auth enforce. This makes an insecure public startup impossible rather @@ -97,12 +105,13 @@ func main() { log.SetPrefix("[membershipd] ") // A clustered node shares its control plane with peers, so it needs a JetStream - // client to manage the replicated nonce bucket (issue 0006a). needJS will also - // be true under --store kv (issue 0006c), where the control-plane state lives in - // JetStream KV. A standalone single-node deployment needs none of this and keeps - // the in-process, in-memory behavior unchanged. + // client to manage the replicated nonce bucket (issue 0006a). --store kv (issue + // 0006c) also needs JetStream, for the control-plane KV itself. A standalone + // single-node SQLite deployment needs none of this and keeps the in-process, + // in-memory behavior unchanged. clustered := *clusterName != "" - needJS := clustered + decentralized := *storeBackend == "kv" + needJS := clustered || decentralized enforce := authMode == membership.AuthEnforce // Internal service identity (issue 0006a): when the embedded data plane enforces @@ -121,14 +130,29 @@ func main() { internalPubHex = hex.EncodeToString(internalID.SignPub) } - // Control plane store first: the NATS authenticator consults IsAuthorized, so - // the store must exist before the embedded server starts. - store, err := membership.Open(*dbPath) - if err != nil { - log.Fatalf("open membership store: %v", err) + // The authenticator consults the store through a holder so it can be built + // before the store exists: with --store kv the JetStream KV store opens only + // after NATS is up (the bootstrap cycle). In the default SQLite path the store + // is opened and set into the holder right here, before the server starts, so + // behavior is identical to the pre-0006c baseline. `store` is the final store + // used by the HTTP server (set below for the KV path). + holder := &storeHolder{} + var store membership.Store + if !decentralized { + store, err = membership.Open(*dbPath) + if err != nil { + log.Fatalf("open membership store: %v", err) + } + holder.set(store) + log.Printf("membership store: sqlite %s", *dbPath) } - defer store.Close() - log.Printf("membership store: %s", *dbPath) + // Close whichever store ends up final (SQLite closes its file; the JetStream KV + // store's Close is a no-op — its NATS connection is closed separately). + defer func() { + if store != nil { + store.Close() + } + }() blobs, err := blobstore.New(*storeDir) if err != nil { @@ -182,8 +206,8 @@ func main() { // NATS freezes permissions at connect time, so a peer that joins a room // after connecting must client.RefreshSession to gain that room's subject. cfg.Auth = busauth.NewNkeyAuthenticatorACLInternal( - store.IsAuthorized, - busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)), + holder.IsAuthorized, + busauth.PermissionsFromSubjects(holder.subjectACL), internalPubHex, ) log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)") @@ -209,6 +233,38 @@ func main() { log.Printf("using external NATS: %s", natsClientURL) } + // JetStream client + decentralized store (issue 0006a/c). needJS is set for a + // clustered node (shared nonce bucket) and for --store kv (the KV control + // plane). Open the privileged JetStream client first (in-process for the + // embedded server, a plain client for external NATS), then — for --store kv — + // open the replicated KV store and publish it into the holder so the + // authenticator and HTTP server serve from it. The privileged connection is the + // only client that can connect in this window (the holder still denies everyone + // else; the internal identity bypasses the store). + var js jetstream.JetStream + if needJS { + var internalNC *nats.Conn + if *natsURL == "" { + internalNC, js, err = connectInternalJS(ns, internalID, enforce) + } else { + internalNC, js, err = connectExternalJS(natsClientURL, *caFile) + } + if err != nil { + log.Fatalf("internal JetStream connection (required by --cluster-name/--store kv): %v", err) + } + defer internalNC.Close() + + if decentralized { + kvStore, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: *kvReplicas}) + if err != nil { + log.Fatalf("open decentralized control-plane KV store: %v", err) + } + store = kvStore + holder.set(store) + log.Printf("membership store: jetstream KV (replicas=%d)", *kvReplicas) + } + } + srv := membership.NewServer(store, blobs, authMode) // On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS // has no per-subject ACL, so cleartext content would be readable by any @@ -220,30 +276,17 @@ func main() { } // Replicated anti-replay (issue 0006a, audit 0008 N3): a clustered node MUST - // share its nonce store across the cluster via JetStream KV, or a request - // accepted on one node can be replayed to another. Open a privileged JetStream - // client (in-process for the embedded server, a plain client for an external - // NATS) and wire the shared nonce bucket. This is a HARD requirement: if the - // bucket cannot be created the node refuses to start rather than run with a - // per-process cache that leaves the replay hole open. + // share its nonce store across the cluster, or a request accepted on one node + // can be replayed to another. HARD requirement: if the bucket cannot be created + // the node refuses to start rather than run with a per-process cache that leaves + // the replay hole open. if needJS { - var ( - internalNC *nats.Conn - js jetstream.JetStream - ) - if *natsURL == "" { - internalNC, js, err = connectInternalJS(ns, internalID, enforce) - } else { - internalNC, js, err = connectExternalJS(natsClientURL, *caFile) - } - if err != nil { - log.Fatalf("internal JetStream connection (required by --cluster-name): %v", err) - } - defer internalNC.Close() if err := wireReplicatedNonces(srv, js, clustered, *kvReplicas); err != nil { log.Fatalf("%v", err) } - log.Printf("anti-replay: replicated nonce bucket %q (replicas=%d) — cluster-safe", "KV_UNIBUS_nonces", *kvReplicas) + if clustered { + log.Printf("anti-replay: replicated nonce bucket \"KV_UNIBUS_nonces\" (replicas=%d) — cluster-safe", *kvReplicas) + } } log.Printf("control-plane auth: %s", authMode) diff --git a/cmd/membershipd/store_holder.go b/cmd/membershipd/store_holder.go new file mode 100644 index 0000000..493d1a8 --- /dev/null +++ b/cmd/membershipd/store_holder.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "sync" + + "github.com/enmanuel/unibus/pkg/membership" +) + +// storeHolder is a concurrency-safe slot for the control-plane store, used to +// break the decentralized bootstrap cycle (issue 0006c): the NATS authenticator +// must be built BEFORE the embedded server starts, but the JetStream KV store can +// only be opened AFTER NATS is up (it needs a JetStream client). The authenticator +// therefore consults the holder instead of a concrete store. +// +// Fail-closed by construction: until the store is set, IsAuthorized denies and +// SubjectACL errors, so any client connecting in the startup window is rejected. +// The only connection expected in that window is membershipd's own internal +// service identity, which the authenticator recognizes by key and lets through +// without consulting the store at all. In the SQLite (default) path the store is +// set before StartServer, so the window does not exist and behavior is identical +// to the pre-0006c baseline. +type storeHolder struct { + mu sync.RWMutex + s membership.Store +} + +func (h *storeHolder) set(s membership.Store) { + h.mu.Lock() + h.s = s + h.mu.Unlock() +} + +func (h *storeHolder) get() membership.Store { + h.mu.RLock() + defer h.mu.RUnlock() + return h.s +} + +// IsAuthorized reports whether signPubHex is an active bus user, denying while the +// store is not yet set (fail closed). It is the predicate the nkey authenticator +// uses for every connecting client. +func (h *storeHolder) IsAuthorized(signPubHex string) bool { + s := h.get() + if s == nil { + return false + } + return s.IsAuthorized(signPubHex) +} + +// subjectACL derives the per-subject permissions for signPubHex via the live +// store, erroring (so the caller fails closed and denies the connection) while the +// store is not yet set. +func (h *storeHolder) subjectACL(signPubHex string) ([]string, error) { + s := h.get() + if s == nil { + return nil, fmt.Errorf("control-plane store not ready") + } + return membership.SubjectACLFor(s)(signPubHex) +} diff --git a/cmd/membershipd/store_holder_test.go b/cmd/membershipd/store_holder_test.go new file mode 100644 index 0000000..27b35bc --- /dev/null +++ b/cmd/membershipd/store_holder_test.go @@ -0,0 +1,51 @@ +package main + +import ( + "encoding/hex" + "path/filepath" + "testing" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/membership" +) + +// TestStoreHolderFailClosed: an empty holder denies everything (the bootstrap +// window before the store is set), and starts serving once a store is published. +func TestStoreHolderFailClosed(t *testing.T) { + h := &storeHolder{} + + // Empty: deny + error (fail closed). + if h.IsAuthorized("anything") { + t.Fatalf("empty holder must deny IsAuthorized") + } + if _, err := h.subjectACL("anything"); err == nil { + t.Fatalf("empty holder must error from subjectACL (fail closed)") + } + + // After set: serves from the real store. + store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + id, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + pub := hex.EncodeToString(id.SignPub) + if err := store.AddUser(pub, "alice", membership.RoleMember); err != nil { + t.Fatalf("add user: %v", err) + } + h.set(store) + + if !h.IsAuthorized(pub) { + t.Fatalf("after set, an active user must be authorized") + } + if _, err := h.subjectACL(pub); err != nil { + t.Fatalf("after set, subjectACL must succeed: %v", err) + } + if h.IsAuthorized("deadbeef") { + t.Fatalf("a non-user must not be authorized") + } +} diff --git a/dev/feature_flags.json b/dev/feature_flags.json index f58b826..686de2f 100644 --- a/dev/feature_flags.json +++ b/dev/feature_flags.json @@ -18,7 +18,7 @@ "decentralized": { "enabled": false, "issue": "0003", - "description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default OFF, jetstreamStore ON). The route cluster (0003a) and the KV store (0003b) ship behind this flag; the membershipd boot wiring that selects the KV store completes with the session/reconnect redesign (0003e) and is activated on the multi-node deploy (0003f). OFF keeps the single-node SQLite control plane unchanged.", + "description": "Control-plane state on replicated JetStream KV instead of local SQLite (branch-by-abstraction membership.Store: sqliteStore default, jetstreamStore opt-in). The route cluster (0003a) and the KV store (0003b) shipped behind this flag; the membershipd boot wiring that selects the store is COMPLETE since issue 0006c and is realized at runtime with the server flag --store kv|sqlite (default sqlite). The internal-identity bootstrap (0006a) lets membershipd open the KV store on its own embedded NATS under enforce. Per-deploy opt-in: a node joins the decentralized control plane by starting with --store kv (and --cluster-name for HA). OFF (--store sqlite) keeps the single-node SQLite control plane unchanged.", "added": "2026-06-07", "enabled_at": null }