feat(0003e/2): replicated anti-replay nonce store on JetStream KV
The per-process nonce cache breaks anti-replay under multi-node failover (audit 0004): a request captured on one node can be replayed to a DIFFERENT node whose local cache never saw the nonce, and is accepted. This makes the nonce state shared so a replay is rejected cluster-wide. pkg/membership: - nonceStore is now an interface. The in-memory cache is renamed memNonceCache (still the default, single-node behavior). - kvNonceStore (new) claims each nonce with an atomic KV Create on a shared bucket: first sight wins (accept), any later sight on any node rejects (replay). A backend error fails CLOSED (reject), so a KV outage never silently disables anti-replay. The bucket carries a TTL = nonceTTL (2*clockSkew) so a key expires exactly when its replay window closes; raw base64 nonces are mapped to KV-safe keys via sha256-hex. - Server.UseReplicatedNonces(js, replicas) swaps the store on a node; every node in a cluster calls it. NewServer still defaults to the in-memory cache (master behavior unchanged). Test (DoD error path — the issue's cross-node replay case): - TestReplicatedNonceRejectsCrossNodeReplay: two membershipd nodes share one KV bucket; a request accepted (200) on node A, replayed with the same ts+nonce to node B, is rejected (401) — and replaying to A again is rejected too.
This commit is contained in:
+19
-8
@@ -95,16 +95,27 @@ func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte {
|
|||||||
return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:]))
|
return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
// nonceCache remembers recently-seen nonces to reject replays. It is an
|
// nonceStore is the anti-replay backend: rememberOrReject records a nonce and
|
||||||
// in-memory store guarded by a mutex — sufficient for a single membershipd
|
// reports whether it was unseen (true -> accept) or already seen (false ->
|
||||||
// process (the spec's chosen tradeoff over a server-issued nonce round-trip). A
|
// reject the replay). It is an interface (issue 0003e) so the single-node
|
||||||
// distributed deployment would need a shared store (tracked for issue 0003).
|
// in-memory cache can be swapped for a replicated KV store: a per-process cache
|
||||||
|
// is BROKEN under multi-node failover (a request captured and replayed to a
|
||||||
|
// DIFFERENT node whose cache never saw the nonce would be accepted), so a
|
||||||
|
// cluster MUST share the nonce state. Every implementation fails CLOSED — a
|
||||||
|
// backend it cannot reach rejects rather than admits.
|
||||||
|
type nonceStore interface {
|
||||||
|
rememberOrReject(nonce string, now time.Time) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// memNonceCache remembers recently-seen nonces to reject replays. It is an
|
||||||
|
// in-memory store guarded by a mutex — sufficient for a SINGLE membershipd
|
||||||
|
// process. A clustered deployment uses kvNonceStore instead (issue 0003e).
|
||||||
//
|
//
|
||||||
// Pruning is O(expired), not O(n): because the TTL is constant, insertion order
|
// Pruning is O(expired), not O(n): because the TTL is constant, insertion order
|
||||||
// equals expiry order, so the oldest entries (front of `order`) are exactly the
|
// equals expiry order, so the oldest entries (front of `order`) are exactly the
|
||||||
// ones that expire first (audit H7 — the previous full-map scan under the mutex
|
// ones that expire first (audit H7 — the previous full-map scan under the mutex
|
||||||
// was a CPU-amplification vector). A size cap bounds memory.
|
// was a CPU-amplification vector). A size cap bounds memory.
|
||||||
type nonceCache struct {
|
type memNonceCache struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
seen map[string]time.Time // nonce -> expiry
|
seen map[string]time.Time // nonce -> expiry
|
||||||
order []string // nonces in insertion order == expiry order
|
order []string // nonces in insertion order == expiry order
|
||||||
@@ -112,13 +123,13 @@ type nonceCache struct {
|
|||||||
cap int
|
cap int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNonceCache(ttl time.Duration, capacity int) *nonceCache {
|
func newMemNonceCache(ttl time.Duration, capacity int) *memNonceCache {
|
||||||
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity}
|
return &memNonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rememberOrReject records nonce and returns true if it was unseen, or false if
|
// rememberOrReject records nonce and returns true if it was unseen, or false if
|
||||||
// it is a replay (still live in the cache).
|
// it is a replay (still live in the cache).
|
||||||
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
|
func (n *memNonceCache) rememberOrReject(nonce string, now time.Time) bool {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
// (error), and after the TTL the same nonce is accepted again because its entry
|
// (error), and after the TTL the same nonce is accepted again because its entry
|
||||||
// was pruned (edge).
|
// was pruned (edge).
|
||||||
func TestNonceCacheRememberPrune(t *testing.T) {
|
func TestNonceCacheRememberPrune(t *testing.T) {
|
||||||
nc := newNonceCache(50*time.Millisecond, 1000)
|
nc := newMemNonceCache(50*time.Millisecond, 1000)
|
||||||
base := time.Now()
|
base := time.Now()
|
||||||
|
|
||||||
if !nc.rememberOrReject("a", base) {
|
if !nc.rememberOrReject("a", base) {
|
||||||
@@ -31,7 +31,7 @@ func TestNonceCacheRememberPrune(t *testing.T) {
|
|||||||
// from the map.
|
// from the map.
|
||||||
func TestNonceCacheCapBounded(t *testing.T) {
|
func TestNonceCacheCapBounded(t *testing.T) {
|
||||||
const capacity = 100
|
const capacity = 100
|
||||||
nc := newNonceCache(time.Hour, capacity)
|
nc := newMemNonceCache(time.Hour, capacity)
|
||||||
base := time.Now()
|
base := time.Now()
|
||||||
for i := 0; i < 500; i++ {
|
for i := 0; i < 500; i++ {
|
||||||
nc.rememberOrReject("n"+strconv.Itoa(i), base)
|
nc.rememberOrReject("n"+strconv.Itoa(i), base)
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
// kvNonceStore is the replicated anti-replay backend (issue 0003e): seen nonces
|
||||||
|
// live in a JetStream KV bucket shared by every node, with a per-key TTL so they
|
||||||
|
// expire on their own. This closes the multi-node replay hole the auditor
|
||||||
|
// flagged: the per-process memNonceCache let an attacker replay a captured
|
||||||
|
// request to a DIFFERENT node, whose local cache never saw the nonce. With the
|
||||||
|
// shared bucket the first node to see a nonce wins the atomic Create, and every
|
||||||
|
// other node rejects the replay.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
|
)
|
||||||
|
|
||||||
|
const bucketNonces = "UNIBUS_nonces"
|
||||||
|
|
||||||
|
type kvNonceStore struct {
|
||||||
|
kv jetstream.KeyValue
|
||||||
|
opTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// newKVNonceStore creates (or opens) the replicated nonce bucket. ttl is the
|
||||||
|
// per-key expiry — it must be >= the request acceptance window (2*clockSkew) so
|
||||||
|
// a replay can never outlive its memory, exactly like the in-memory cache's TTL.
|
||||||
|
func newKVNonceStore(js jetstream.JetStream, ttl time.Duration, replicas int, opTimeout time.Duration) (*kvNonceStore, error) {
|
||||||
|
if replicas <= 0 {
|
||||||
|
replicas = 1
|
||||||
|
}
|
||||||
|
if opTimeout <= 0 {
|
||||||
|
opTimeout = defaultKVOpTime
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: bucketNonces,
|
||||||
|
TTL: ttl,
|
||||||
|
Replicas: replicas,
|
||||||
|
History: 1,
|
||||||
|
Storage: jetstream.FileStorage,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("membership: open nonce KV bucket (replicas=%d): %w", replicas, err)
|
||||||
|
}
|
||||||
|
return &kvNonceStore{kv: kv, opTimeout: opTimeout}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// nonceKVKey maps a raw nonce (std-base64, which contains '+' '/' '=' that KV
|
||||||
|
// keys forbid) to a KV-safe token: the hex of its sha256. Deterministic, so the
|
||||||
|
// same nonce always maps to the same key, and collision-free in practice.
|
||||||
|
func nonceKVKey(nonce string) string {
|
||||||
|
sum := sha256.Sum256([]byte(nonce))
|
||||||
|
return hex.EncodeToString(sum[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// rememberOrReject atomically claims the nonce: Create succeeds only if the key
|
||||||
|
// is absent, so the first sight returns true (accept) and any later sight (a
|
||||||
|
// replay, on this or any other node sharing the bucket) returns false. A backend
|
||||||
|
// error fails CLOSED — reject — so a KV outage never silently disables
|
||||||
|
// anti-replay. The TTL on the bucket expires the key, reopening the window.
|
||||||
|
func (s *kvNonceStore) rememberOrReject(nonce string, _ time.Time) bool {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), s.opTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if _, err := s.kv.Create(ctx, nonceKVKey(nonce), nil); err != nil {
|
||||||
|
if errors.Is(err, jetstream.ErrKeyExists) {
|
||||||
|
return false // replay: already claimed
|
||||||
|
}
|
||||||
|
return false // backend unreachable: fail closed
|
||||||
|
}
|
||||||
|
return true // first sight: accept
|
||||||
|
}
|
||||||
@@ -0,0 +1,117 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cs "fn-registry/functions/cybersecurity"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||||
|
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestReplicatedNonceRejectsCrossNodeReplay is the issue's mandated error path:
|
||||||
|
// with the shared KV nonce store, a request accepted on node A is rejected as a
|
||||||
|
// replay when the SAME signed bytes are sent to node B. This closes the
|
||||||
|
// multi-node replay hole that the per-process cache left open.
|
||||||
|
func TestReplicatedNonceRejectsCrossNodeReplay(t *testing.T) {
|
||||||
|
// One NATS+JetStream backing the shared nonce bucket.
|
||||||
|
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||||
|
StoreDir: t.TempDir(), Host: "127.0.0.1", Port: kvFreePort(t),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("nats: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||||
|
nc, err := nats.Connect(ns.ClientURL())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(nc.Close)
|
||||||
|
js, err := jetstream.New(nc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("jetstream: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// One shared SQLite store (simulating the replicated control-plane state) and
|
||||||
|
// two membershipd servers (two nodes) that BOTH use the shared KV nonce store.
|
||||||
|
dir := t.TempDir()
|
||||||
|
store, err := Open(filepath.Join(dir, "unibus.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("store: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
alice, err := cs.GenerateIdentity()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("identity: %v", err)
|
||||||
|
}
|
||||||
|
alicePub := hex.EncodeToString(alice.SignPub)
|
||||||
|
if err := store.AddUser(alicePub, "alice", RoleAdmin); err != nil {
|
||||||
|
t.Fatalf("add alice: %v", err)
|
||||||
|
}
|
||||||
|
blobs, _ := blobstore.New(filepath.Join(dir, "blobs"))
|
||||||
|
|
||||||
|
mkNode := func() *httptest.Server {
|
||||||
|
srv := NewServer(store, blobs, AuthEnforce)
|
||||||
|
if err := srv.UseReplicatedNonces(js, 1); err != nil {
|
||||||
|
t.Fatalf("UseReplicatedNonces: %v", err)
|
||||||
|
}
|
||||||
|
return httptest.NewServer(srv)
|
||||||
|
}
|
||||||
|
nodeA := mkNode()
|
||||||
|
t.Cleanup(nodeA.Close)
|
||||||
|
nodeB := mkNode()
|
||||||
|
t.Cleanup(nodeB.Close)
|
||||||
|
|
||||||
|
// Build ONE signed request (fixed ts+nonce) and send the identical bytes to
|
||||||
|
// both nodes. Authenticated path: alice listing her own rooms (200, empty).
|
||||||
|
ts := time.Now().Unix()
|
||||||
|
nonceRaw := make([]byte, 16)
|
||||||
|
if _, err := rand.Read(nonceRaw); err != nil {
|
||||||
|
t.Fatalf("nonce: %v", err)
|
||||||
|
}
|
||||||
|
nonce := base64.StdEncoding.EncodeToString(nonceRaw)
|
||||||
|
path := "/members/" + frame.EndpointID(alice.SignPub) + "/rooms"
|
||||||
|
|
||||||
|
reqA := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce)
|
||||||
|
respA, err := http.DefaultClient.Do(reqA)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("do A: %v", err)
|
||||||
|
}
|
||||||
|
respA.Body.Close()
|
||||||
|
if respA.StatusCode != http.StatusOK {
|
||||||
|
t.Fatalf("node A first use: status %d, want 200 (auth should pass, nonce fresh)", respA.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replay the SAME ts+nonce to node B: the shared bucket already holds the
|
||||||
|
// nonce, so node B must reject it.
|
||||||
|
reqB := signedReq(t, nodeB.URL, "GET", path, nil, alice, ts, nonce)
|
||||||
|
respB, err := http.DefaultClient.Do(reqB)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("do B: %v", err)
|
||||||
|
}
|
||||||
|
respB.Body.Close()
|
||||||
|
if respB.StatusCode != http.StatusUnauthorized {
|
||||||
|
t.Fatalf("cross-node replay to node B: status %d, want 401 (replayed nonce)", respB.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// And replaying to node A again is likewise rejected (same bucket).
|
||||||
|
reqA2 := signedReq(t, nodeA.URL, "GET", path, nil, alice, ts, nonce)
|
||||||
|
respA2, err := http.DefaultClient.Do(reqA2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("do A2: %v", err)
|
||||||
|
}
|
||||||
|
respA2.Body.Close()
|
||||||
|
if respA2.StatusCode != http.StatusUnauthorized {
|
||||||
|
t.Fatalf("replay to node A: status %d, want 401", respA2.StatusCode)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||||
"github.com/enmanuel/unibus/pkg/frame"
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Body-size ceilings for the control plane. They bound how much an unauthenticated
|
// Body-size ceilings for the control plane. They bound how much an unauthenticated
|
||||||
@@ -59,7 +60,7 @@ type Server struct {
|
|||||||
blobs blobstore.Store
|
blobs blobstore.Store
|
||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
authMode AuthMode
|
authMode AuthMode
|
||||||
nonces *nonceCache
|
nonces nonceStore
|
||||||
limiter *ipRateLimiter
|
limiter *ipRateLimiter
|
||||||
|
|
||||||
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
||||||
@@ -84,13 +85,29 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
|||||||
blobs: blobs,
|
blobs: blobs,
|
||||||
mux: http.NewServeMux(),
|
mux: http.NewServeMux(),
|
||||||
authMode: authMode,
|
authMode: authMode,
|
||||||
nonces: newNonceCache(nonceTTL, maxNonceCacheEntries),
|
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
|
||||||
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
||||||
}
|
}
|
||||||
s.routes()
|
s.routes()
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseReplicatedNonces switches the server's anti-replay store from the
|
||||||
|
// per-process in-memory cache to a JetStream KV bucket shared across the cluster
|
||||||
|
// (issue 0003e). It MUST be called on every node of a multi-node deployment:
|
||||||
|
// otherwise a request captured on one node can be replayed to another whose
|
||||||
|
// local cache never saw the nonce. replicas is the bucket's replication factor
|
||||||
|
// (R1..R3). The TTL matches the in-memory cache (nonceTTL = 2*clockSkew), so a
|
||||||
|
// replay can never outlive its memory.
|
||||||
|
func (s *Server) UseReplicatedNonces(js jetstream.JetStream, replicas int) error {
|
||||||
|
ns, err := newKVNonceStore(js, nonceTTL, replicas, 0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.nonces = ns
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware
|
// ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware
|
||||||
// (signature verification + anti-replay + allowlist) ahead of the router
|
// (signature verification + anti-replay + allowlist) ahead of the router
|
||||||
// according to authMode, then dispatches to the matched handler.
|
// according to authMode, then dispatches to the matched handler.
|
||||||
|
|||||||
Reference in New Issue
Block a user