From 37c778ca9adf2bb0f7e405c6e254cff388e3b3e9 Mon Sep 17 00:00:00 2001 From: agent Date: Sun, 7 Jun 2026 15:21:45 +0200 Subject: [PATCH] feat(0003e/2): replicated anti-replay nonce store on JetStream KV MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-process nonce cache breaks anti-replay under multi-node failover (audit 0004): a request captured on one node can be replayed to a DIFFERENT node whose local cache never saw the nonce, and is accepted. This makes the nonce state shared so a replay is rejected cluster-wide. pkg/membership: - nonceStore is now an interface. The in-memory cache is renamed memNonceCache (still the default, single-node behavior). - kvNonceStore (new) claims each nonce with an atomic KV Create on a shared bucket: first sight wins (accept), any later sight on any node rejects (replay). A backend error fails CLOSED (reject), so a KV outage never silently disables anti-replay. The bucket carries a TTL = nonceTTL (2*clockSkew) so a key expires exactly when its replay window closes; raw base64 nonces are mapped to KV-safe keys via sha256-hex. - Server.UseReplicatedNonces(js, replicas) swaps the store on a node; every node in a cluster calls it. NewServer still defaults to the in-memory cache (master behavior unchanged). Test (DoD error path — the issue's cross-node replay case): - TestReplicatedNonceRejectsCrossNodeReplay: two membershipd nodes share one KV bucket; a request accepted (200) on node A, replayed with the same ts+nonce to node B, is rejected (401) — and replaying to A again is rejected too. --- pkg/membership/auth.go | 27 +++++-- pkg/membership/nonce_cache_test.go | 4 +- pkg/membership/nonce_kv.go | 77 +++++++++++++++++++ pkg/membership/nonce_kv_test.go | 117 +++++++++++++++++++++++++++++ pkg/membership/server.go | 21 +++++- 5 files changed, 234 insertions(+), 12 deletions(-) create mode 100644 pkg/membership/nonce_kv.go create mode 100644 pkg/membership/nonce_kv_test.go diff --git a/pkg/membership/auth.go b/pkg/membership/auth.go index c4cf354..e151bf4 100644 --- a/pkg/membership/auth.go +++ b/pkg/membership/auth.go @@ -95,16 +95,27 @@ func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte { return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:])) } -// nonceCache remembers recently-seen nonces to reject replays. It is an -// in-memory store guarded by a mutex — sufficient for a single membershipd -// process (the spec's chosen tradeoff over a server-issued nonce round-trip). A -// distributed deployment would need a shared store (tracked for issue 0003). +// nonceStore is the anti-replay backend: rememberOrReject records a nonce and +// reports whether it was unseen (true -> accept) or already seen (false -> +// reject the replay). It is an interface (issue 0003e) so the single-node +// in-memory cache can be swapped for a replicated KV store: a per-process cache +// is BROKEN under multi-node failover (a request captured and replayed to a +// DIFFERENT node whose cache never saw the nonce would be accepted), so a +// cluster MUST share the nonce state. Every implementation fails CLOSED — a +// backend it cannot reach rejects rather than admits. +type nonceStore interface { + rememberOrReject(nonce string, now time.Time) bool +} + +// memNonceCache remembers recently-seen nonces to reject replays. It is an +// in-memory store guarded by a mutex — sufficient for a SINGLE membershipd +// process. A clustered deployment uses kvNonceStore instead (issue 0003e). // // Pruning is O(expired), not O(n): because the TTL is constant, insertion order // equals expiry order, so the oldest entries (front of `order`) are exactly the // ones that expire first (audit H7 — the previous full-map scan under the mutex // was a CPU-amplification vector). A size cap bounds memory. -type nonceCache struct { +type memNonceCache struct { mu sync.Mutex seen map[string]time.Time // nonce -> expiry order []string // nonces in insertion order == expiry order @@ -112,13 +123,13 @@ type nonceCache struct { cap int } -func newNonceCache(ttl time.Duration, capacity int) *nonceCache { - return &nonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity} +func newMemNonceCache(ttl time.Duration, capacity int) *memNonceCache { + return &memNonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity} } // rememberOrReject records nonce and returns true if it was unseen, or false if // it is a replay (still live in the cache). -func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool { +func (n *memNonceCache) rememberOrReject(nonce string, now time.Time) bool { n.mu.Lock() defer n.mu.Unlock() diff --git a/pkg/membership/nonce_cache_test.go b/pkg/membership/nonce_cache_test.go index 0ff102a..18ce0dc 100644 --- a/pkg/membership/nonce_cache_test.go +++ b/pkg/membership/nonce_cache_test.go @@ -11,7 +11,7 @@ import ( // (error), and after the TTL the same nonce is accepted again because its entry // was pruned (edge). func TestNonceCacheRememberPrune(t *testing.T) { - nc := newNonceCache(50*time.Millisecond, 1000) + nc := newMemNonceCache(50*time.Millisecond, 1000) base := time.Now() if !nc.rememberOrReject("a", base) { @@ -31,7 +31,7 @@ func TestNonceCacheRememberPrune(t *testing.T) { // from the map. func TestNonceCacheCapBounded(t *testing.T) { const capacity = 100 - nc := newNonceCache(time.Hour, capacity) + nc := newMemNonceCache(time.Hour, capacity) base := time.Now() for i := 0; i < 500; i++ { nc.rememberOrReject("n"+strconv.Itoa(i), base) diff --git a/pkg/membership/nonce_kv.go b/pkg/membership/nonce_kv.go new file mode 100644 index 0000000..07f91e1 --- /dev/null +++ b/pkg/membership/nonce_kv.go @@ -0,0 +1,77 @@ +package membership + +// kvNonceStore is the replicated anti-replay backend (issue 0003e): seen nonces +// live in a JetStream KV bucket shared by every node, with a per-key TTL so they +// expire on their own. This closes the multi-node replay hole the auditor +// flagged: the per-process memNonceCache let an attacker replay a captured +// request to a DIFFERENT node, whose local cache never saw the nonce. With the +// shared bucket the first node to see a nonce wins the atomic Create, and every +// other node rejects the replay. + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +const bucketNonces = "UNIBUS_nonces" + +type kvNonceStore struct { + kv jetstream.KeyValue + opTimeout time.Duration +} + +// newKVNonceStore creates (or opens) the replicated nonce bucket. ttl is the +// per-key expiry — it must be >= the request acceptance window (2*clockSkew) so +// a replay can never outlive its memory, exactly like the in-memory cache's TTL. +func newKVNonceStore(js jetstream.JetStream, ttl time.Duration, replicas int, opTimeout time.Duration) (*kvNonceStore, error) { + if replicas <= 0 { + replicas = 1 + } + if opTimeout <= 0 { + opTimeout = defaultKVOpTime + } + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: bucketNonces, + TTL: ttl, + Replicas: replicas, + History: 1, + Storage: jetstream.FileStorage, + }) + if err != nil { + return nil, fmt.Errorf("membership: open nonce KV bucket (replicas=%d): %w", replicas, err) + } + return &kvNonceStore{kv: kv, opTimeout: opTimeout}, nil +} + +// nonceKVKey maps a raw nonce (std-base64, which contains '+' '/' '=' that KV +// keys forbid) to a KV-safe token: the hex of its sha256. Deterministic, so the +// same nonce always maps to the same key, and collision-free in practice. +func nonceKVKey(nonce string) string { + sum := sha256.Sum256([]byte(nonce)) + return hex.EncodeToString(sum[:]) +} + +// rememberOrReject atomically claims the nonce: Create succeeds only if the key +// is absent, so the first sight returns true (accept) and any later sight (a +// replay, on this or any other node sharing the bucket) returns false. A backend +// error fails CLOSED — reject — so a KV outage never silently disables +// anti-replay. The TTL on the bucket expires the key, reopening the window. +func (s *kvNonceStore) rememberOrReject(nonce string, _ time.Time) bool { + ctx, cancel := context.WithTimeout(context.Background(), s.opTimeout) + defer cancel() + if _, err := s.kv.Create(ctx, nonceKVKey(nonce), nil); err != nil { + if errors.Is(err, jetstream.ErrKeyExists) { + return false // replay: already claimed + } + return false // backend unreachable: fail closed + } + return true // first sight: accept +} diff --git a/pkg/membership/nonce_kv_test.go b/pkg/membership/nonce_kv_test.go new file mode 100644 index 0000000..2f9a88f --- /dev/null +++ b/pkg/membership/nonce_kv_test.go @@ -0,0 +1,117 @@ +package membership + +import ( + "crypto/rand" + "encoding/base64" + "encoding/hex" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// TestReplicatedNonceRejectsCrossNodeReplay is the issue's mandated error path: +// with the shared KV nonce store, a request accepted on node A is rejected as a +// replay when the SAME signed bytes are sent to node B. This closes the +// multi-node replay hole that the per-process cache left open. +func TestReplicatedNonceRejectsCrossNodeReplay(t *testing.T) { + // One NATS+JetStream backing the shared nonce bucket. + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), Host: "127.0.0.1", Port: kvFreePort(t), + }) + if err != nil { + t.Fatalf("nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + t.Fatalf("connect: %v", err) + } + t.Cleanup(nc.Close) + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("jetstream: %v", err) + } + + // One shared SQLite store (simulating the replicated control-plane state) and + // two membershipd servers (two nodes) that BOTH use the shared KV nonce store. + dir := t.TempDir() + store, err := Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + alice, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity: %v", err) + } + alicePub := hex.EncodeToString(alice.SignPub) + if err := store.AddUser(alicePub, "alice", RoleAdmin); err != nil { + t.Fatalf("add alice: %v", err) + } + blobs, _ := blobstore.New(filepath.Join(dir, "blobs")) + + mkNode := func() *httptest.Server { + srv := NewServer(store, blobs, AuthEnforce) + if err := srv.UseReplicatedNonces(js, 1); err != nil { + t.Fatalf("UseReplicatedNonces: %v", err) + } + return httptest.NewServer(srv) + } + nodeA := mkNode() + t.Cleanup(nodeA.Close) + nodeB := mkNode() + t.Cleanup(nodeB.Close) + + // Build ONE signed request (fixed ts+nonce) and send the identical bytes to + // both nodes. Authenticated path: alice listing her own rooms (200, empty). + ts := time.Now().Unix() + nonceRaw := make([]byte, 16) + if _, err := rand.Read(nonceRaw); err != nil { + t.Fatalf("nonce: %v", err) + } + nonce := base64.StdEncoding.EncodeToString(nonceRaw) + path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms" + + reqA := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce) + respA, err := http.DefaultClient.Do(reqA) + if err != nil { + t.Fatalf("do A: %v", err) + } + respA.Body.Close() + if respA.StatusCode != http.StatusOK { + t.Fatalf("node A first use: status %d, want 200 (auth should pass, nonce fresh)", respA.StatusCode) + } + + // Replay the SAME ts+nonce to node B: the shared bucket already holds the + // nonce, so node B must reject it. + reqB := signedReq(t, nodeB.URL, "GET", path, nil, alice, ts, nonce) + respB, err := http.DefaultClient.Do(reqB) + if err != nil { + t.Fatalf("do B: %v", err) + } + respB.Body.Close() + if respB.StatusCode != http.StatusUnauthorized { + t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce)", respB.StatusCode) + } + + // And replaying to node A again is likewise rejected (same bucket). + reqA2 := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce) + respA2, err := http.DefaultClient.Do(reqA2) + if err != nil { + t.Fatalf("do A2: %v", err) + } + respA2.Body.Close() + if respA2.StatusCode != http.StatusUnauthorized { + t.Fatalf("replay to node A: status %d, want 401", respA2.StatusCode) + } +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index 1b5758b..2159605 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -19,6 +19,7 @@ import ( "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/frame" + "github.com/nats-io/nats.go/jetstream" ) // Body-size ceilings for the control plane. They bound how much an unauthenticated @@ -59,7 +60,7 @@ type Server struct { blobs blobstore.Store mux *http.ServeMux authMode AuthMode - nonces *nonceCache + nonces nonceStore limiter *ipRateLimiter // RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS) @@ -84,13 +85,29 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server { blobs: blobs, mux: http.NewServeMux(), authMode: authMode, - nonces: newNonceCache(nonceTTL, maxNonceCacheEntries), + nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries), limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL), } s.routes() return s } +// UseReplicatedNonces switches the server's anti-replay store from the +// per-process in-memory cache to a JetStream KV bucket shared across the cluster +// (issue 0003e). It MUST be called on every node of a multi-node deployment: +// otherwise a request captured on one node can be replayed to another whose +// local cache never saw the nonce. replicas is the bucket's replication factor +// (R1..R3). The TTL matches the in-memory cache (nonceTTL = 2*clockSkew), so a +// replay can never outlive its memory. +func (s *Server) UseReplicatedNonces(js jetstream.JetStream, replicas int) error { + ns, err := newKVNonceStore(js, nonceTTL, replicas, 0) + if err != nil { + return err + } + s.nonces = ns + return nil +} + // ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware // (signature verification + anti-replay + allowlist) ahead of the router // according to authMode, then dispatches to the matched handler.