diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 23a444b..f7bdc5d 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -113,7 +113,15 @@ func main() { srv := membership.NewServer(store, blobs, authMode) log.Printf("control-plane auth: %s", authMode) addr := *bind + ":" + *httpPort - httpSrv := &http.Server{Addr: addr, Handler: srv} + httpSrv := &http.Server{ + Addr: addr, + Handler: srv, + // Bound request header size so a peer cannot exhaust memory with huge + // headers before any body limit applies (the body ceilings live in the + // membership middleware). + MaxHeaderBytes: membership.MaxHeaderBytes, + ReadHeaderTimeout: 10 * time.Second, + } go func() { log.Printf("HTTP control-plane API: http://%s", addr) diff --git a/go.mod b/go.mod index c990d4c..c6ef254 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,9 @@ require ( fn-registry v0.0.0-00010101000000-000000000000 github.com/nats-io/nats-server/v2 v2.10.22 github.com/nats-io/nats.go v1.37.0 + github.com/nats-io/nkeys v0.4.7 github.com/oklog/ulid/v2 v2.1.0 + golang.org/x/time v0.7.0 modernc.org/sqlite v1.47.0 ) @@ -19,7 +21,6 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/minio/highwayhash v1.0.3 // indirect github.com/nats-io/jwt/v2 v2.5.8 // indirect - github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect @@ -29,7 +30,6 @@ require ( golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.44.0 // indirect golang.org/x/text v0.37.0 // indirect - golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.45.0 // indirect modernc.org/libc v1.70.0 // indirect modernc.org/mathutil v1.7.1 // indirect diff --git a/go.sum b/go.sum index 960e1a7..7a566ff 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -47,6 +49,10 @@ golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8= golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0= +golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM= +golang.org/x/tools/go/expect v0.1.1-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY= +golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM= +golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8= modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw= diff --git a/pkg/membership/dos_test.go b/pkg/membership/dos_test.go new file mode 100644 index 0000000..aa03eea --- /dev/null +++ b/pkg/membership/dos_test.go @@ -0,0 +1,206 @@ +package membership + +import ( + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "testing" + + "github.com/enmanuel/unibus/pkg/blobstore" +) + +// dosServer builds a Server backed by a fresh store + blob store so a test can +// drive ServeHTTP in-process (white-box) and observe its memory behavior without +// a network round trip — the same in-process technique the auditor used. +func dosServer(t *testing.T, mode AuthMode) *Server { + t.Helper() + dir := t.TempDir() + store, err := Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("open store: %v", err) + } + blobs, err := blobstore.New(filepath.Join(dir, "blobs")) + if err != nil { + t.Fatalf("open blobs: %v", err) + } + t.Cleanup(func() { store.Close() }) + return NewServer(store, blobs, mode) +} + +// zeroReader yields up to remaining zero bytes without ever allocating them, so +// the test process itself never materializes a huge buffer (which would taint the +// RSS measurement we are trying to make about the SERVER). +type zeroReader struct{ remaining int64 } + +func (z *zeroReader) Read(p []byte) (int, error) { + if z.remaining <= 0 { + return 0, io.EOF + } + n := int64(len(p)) + if n > z.remaining { + n = z.remaining + } + for i := int64(0); i < n; i++ { + p[i] = 0 + } + z.remaining -= n + return int(n), nil +} + +// vmRSSkB reads the resident set size (kB) of this process from /proc. Linux-only; +// the caller skips on other platforms. +func vmRSSkB(t *testing.T) int64 { + t.Helper() + b, err := os.ReadFile("/proc/self/status") + if err != nil { + t.Skipf("cannot read /proc/self/status: %v", err) + } + for _, line := range strings.Split(string(b), "\n") { + if strings.HasPrefix(line, "VmRSS:") { + f := strings.Fields(line) + if len(f) >= 2 { + v, _ := strconv.ParseInt(f[1], 10, 64) + return v + } + } + } + t.Skip("VmRSS not present in /proc/self/status") + return 0 +} + +// TestAudit_DoSBodyLimitNoAuth ports the auditor's H1 (Critical) vector: a peer +// with NO valid signature posts an oversized body. Before the fix the middleware +// io.ReadAll'd it unbounded (the auditor sent 400 MB and watched RSS jump from +// 18 MB to 898 MB). Now the request is rejected 413 and the resident set does NOT +// spike. Two shapes are covered: +// +// (1) a truthful, over-ceiling Content-Length -> rejected before any byte is read; +// (2) a lying / unknown length (chunked) -> MaxBytesReader trips mid-read, +// capping the buffered bytes at the ceiling instead of the attacker's 400 MB. +func TestAudit_DoSBodyLimitNoAuth(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("RSS probe is Linux-only") + } + srv := dosServer(t, AuthEnforce) // enforce: the request carries no signature + + const huge = int64(400) << 20 // 400 MiB — the auditor's figure + // A spike threshold an order of magnitude below the attack. The old code would + // add ~400 MB+; the fix keeps the delta to at most one bounded buffer. + const maxSpikeKB = int64(96) << 10 // 96 MiB + + // Shape 1: declared Content-Length over the blob ceiling -> early 413, no read. + runtime.GC() + before := vmRSSkB(t) + req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge}) + req.ContentLength = huge + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, req) + if rec.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("over-declared body should be 413, got %d", rec.Code) + } + runtime.GC() + if d := vmRSSkB(t) - before; d > maxSpikeKB { + t.Fatalf("RSS spiked %d kB on a pre-declared oversized body (limit %d kB)", d, maxSpikeKB) + } + + // Shape 2: unknown length (chunked-style). The middleware cannot reject by + // Content-Length, so MaxBytesReader must cap the read at maxBlobBytes. + runtime.GC() + before = vmRSSkB(t) + req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge}) + req.ContentLength = -1 + rec = httptest.NewRecorder() + srv.ServeHTTP(rec, req) + if rec.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("unknown-length oversized body should be 413, got %d", rec.Code) + } + runtime.GC() + if d := vmRSSkB(t) - before; d > maxSpikeKB { + t.Fatalf("RSS spiked %d kB on a chunked oversized body (limit %d kB)", d, maxSpikeKB) + } +} + +// TestBlobLimitGoldenAndBoundary covers the golden path (a normal blob is stored) +// and the boundary (a body exactly at the ceiling is accepted; one byte over by +// truthful Content-Length is rejected before buffering). +func TestBlobLimitGoldenAndBoundary(t *testing.T) { + srv := dosServer(t, AuthOff) // AuthOff: the limits apply regardless of auth mode + + // Golden: a small blob is accepted and hashed. + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello blob"))) + if rec.Code != http.StatusOK { + t.Fatalf("normal blob should be 200, got %d (%s)", rec.Code, rec.Body.String()) + } + + // Boundary: exactly at the ceiling is allowed (MaxBytesReader permits N bytes). + atLimit := strings.Repeat("a", maxBlobBytes) + rec = httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader(atLimit)) + req.ContentLength = int64(len(atLimit)) + srv.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("blob exactly at the ceiling should be 200, got %d", rec.Code) + } + + // Error: one byte over the ceiling (truthful Content-Length) -> 413 pre-read. + rec = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: maxBlobBytes + 1}) + req.ContentLength = maxBlobBytes + 1 + srv.ServeHTTP(rec, req) + if rec.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("blob one byte over the ceiling should be 413, got %d", rec.Code) + } +} + +// TestControlBodyLimit checks the smaller JSON ceiling on a non-blob route: a body +// over maxControlBodyBytes is rejected 413 before the handler runs. +func TestControlBodyLimit(t *testing.T) { + srv := dosServer(t, AuthOff) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/rooms", &zeroReader{remaining: maxControlBodyBytes + 1}) + req.ContentLength = maxControlBodyBytes + 1 + srv.ServeHTTP(rec, req) + if rec.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("control body over 1 MiB should be 413, got %d", rec.Code) + } +} + +// TestRateLimitPerIP exercises the per-IP throttle: a burst from one IP eventually +// gets 429 (error path), while a spread across distinct IPs is never throttled +// (edge — the bucket is keyed per source, not global). +func TestRateLimitPerIP(t *testing.T) { + srv := dosServer(t, AuthOff) + + // Same IP: well past the burst -> at least one 429. + got429 := false + for i := 0; i < defaultRateBurst+50; i++ { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil) + req.RemoteAddr = "203.0.113.7:5555" + srv.ServeHTTP(rec, req) + if rec.Code == http.StatusTooManyRequests { + got429 = true + break + } + } + if !got429 { + t.Fatalf("a flood from one IP should eventually be rate-limited (429)") + } + + // Distinct IPs: each gets a fresh bucket, so none is throttled. + for i := 0; i < 100; i++ { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil) + req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":4444" + srv.ServeHTTP(rec, req) + if rec.Code == http.StatusTooManyRequests { + t.Fatalf("distinct IPs must not share a rate bucket; IP #%d got 429", i) + } + } +} diff --git a/pkg/membership/ratelimit.go b/pkg/membership/ratelimit.go new file mode 100644 index 0000000..02b8116 --- /dev/null +++ b/pkg/membership/ratelimit.go @@ -0,0 +1,93 @@ +package membership + +import ( + "net" + "net/http" + "sync" + "time" + + "golang.org/x/time/rate" +) + +// ipRateLimiter is a per-source-IP token-bucket rate limiter for the control +// plane. It exists to blunt pre-auth flooding: an unauthenticated peer that +// hammers the HTTP API (signature verification is not free, and io is bounded +// but still real) is throttled before it can amplify load. Like the nonceCache, +// this is transport glue specific to unibus, not a registry primitive — the +// report 0003 made the same call for the nonce cache (it would only drag a NATS +// dependency into the multi-domain registry go.mod for one helper). +// +// Each distinct IP gets its own golang.org/x/time/rate.Limiter (a standard +// token bucket already in the module graph, so no new dependency). Idle buckets +// are reaped so the map cannot grow without bound under a churn of source IPs. +type ipRateLimiter struct { + mu sync.Mutex + buckets map[string]*ipBucket + r rate.Limit + burst int + ttl time.Duration +} + +type ipBucket struct { + lim *rate.Limiter + seen time.Time +} + +// newIPRateLimiter builds a limiter granting r tokens/second with the given +// burst per IP. ttl bounds how long an idle bucket is retained before being +// reaped. r<=0 disables limiting (Allow always true) so dev/loopback stacks are +// unaffected. +func newIPRateLimiter(r rate.Limit, burst int, ttl time.Duration) *ipRateLimiter { + return &ipRateLimiter{ + buckets: make(map[string]*ipBucket), + r: r, + burst: burst, + ttl: ttl, + } +} + +// allow reports whether a request from ip may proceed now, consuming one token +// on success. A disabled limiter (r<=0) always allows. Reaping of stale buckets +// is amortized: it runs only when the map has grown past a small threshold, so +// the common path is a single map lookup under the mutex. +func (l *ipRateLimiter) allow(ip string, now time.Time) bool { + if l == nil || l.r <= 0 { + return true + } + l.mu.Lock() + defer l.mu.Unlock() + + if len(l.buckets) > 1024 { + l.reapLocked(now) + } + b, ok := l.buckets[ip] + if !ok { + b = &ipBucket{lim: rate.NewLimiter(l.r, l.burst)} + l.buckets[ip] = b + } + b.seen = now + return b.lim.AllowN(now, 1) +} + +// reapLocked drops buckets idle for longer than ttl. The caller holds l.mu. +func (l *ipRateLimiter) reapLocked(now time.Time) { + for ip, b := range l.buckets { + if now.Sub(b.seen) > l.ttl { + delete(l.buckets, ip) + } + } +} + +// clientIP extracts the source IP of an HTTP request, stripping the port. It +// trusts the transport's RemoteAddr only (no X-Forwarded-For parsing): a public +// deployment terminates TLS at this process or behind a proxy that the operator +// controls, and honoring an attacker-supplied header would let a single IP fan +// its quota across forged identities. If parsing fails the whole RemoteAddr is +// used as the key (still a stable per-connection bucket). +func clientIP(r *http.Request) string { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return r.RemoteAddr + } + return host +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index 47b9058..50c0465 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -15,9 +15,36 @@ import ( cs "fn-registry/functions/cybersecurity" + "golang.org/x/time/rate" + "github.com/enmanuel/unibus/pkg/blobstore" ) +// Body-size ceilings for the control plane. They bound how much an unauthenticated +// peer can make the server buffer in RAM before the request is even authenticated +// (the signature is verified over the full body, so the body must be read — but +// not unboundedly). maxControlBodyBytes covers JSON metadata requests; /blobs gets +// a separate, larger ceiling because media ciphertext is legitimately bigger. A +// request whose declared Content-Length already exceeds its ceiling is rejected +// before a single byte is buffered. +const ( + maxControlBodyBytes = 1 << 20 // 1 MiB for JSON control-plane requests + maxBlobBytes = 16 << 20 // 16 MiB for a single media blob upload + // MaxHeaderBytes caps request header size; wired into the http.Server by the + // command. Exported so the bound lives next to its body-size siblings. + MaxHeaderBytes = 1 << 20 // 1 MiB +) + +// Per-IP rate-limit defaults for the control plane. Tuned for an interactive +// human/agent bus rather than a high-QPS API: a steady ~20 req/s with a burst of +// 40 absorbs a chat client's bursty polling while throttling a flood. Loopback +// dev stacks pass r<=0 to disable limiting entirely. +const ( + defaultRatePerSec = rate.Limit(20) + defaultRateBurst = 40 + rateBucketTTL = 10 * time.Minute +) + // Server is the HTTP control plane: the authoritative source of room metadata, // membership, and per-epoch sealed keys. The data plane (messages) is NATS. // @@ -32,11 +59,14 @@ type Server struct { mux *http.ServeMux authMode AuthMode nonces *nonceCache + limiter *ipRateLimiter } // NewServer wires the membership store and blob store into an http.Handler. The // authMode selects the control-plane auth rollout state (AuthOff for callers and -// tests that have not migrated to signed requests yet). +// tests that have not migrated to signed requests yet). It installs a per-IP +// rate limiter with the package defaults; loopback dev behavior is unchanged +// because the burst comfortably exceeds any single client's request rate. func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server { s := &Server{ store: store, @@ -44,6 +74,7 @@ func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server mux: http.NewServeMux(), authMode: authMode, nonces: newNonceCache(nonceTTL), + limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL), } s.routes() return s @@ -53,23 +84,53 @@ func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server // (signature verification + anti-replay + allowlist) ahead of the router // according to authMode, then dispatches to the matched handler. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + now := time.Now() + + // Per-IP rate limit runs first, ahead of auth and body reads, so a flood is + // shed at the cheapest possible point. The health probe is exempt so liveness + // checks are never throttled. + if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) { + writeErr(w, http.StatusTooManyRequests, "rate limit exceeded") + return + } + + // Cap how much body we will buffer, BEFORE reading a single byte. The ceiling + // is per-route: /blobs may legitimately carry a media ciphertext, everything + // else is small JSON. A declared Content-Length over the ceiling is rejected + // outright (no buffering); MaxBytesReader then guards against a lying or + // chunked sender by failing the read once the limit is crossed. This is the + // fix for the pre-auth DoS: without it an unauthenticated peer could make the + // server buffer an unbounded body in RAM before authenticate() ever ran. + limit := int64(maxControlBodyBytes) + if r.Method == http.MethodPost && r.URL.Path == "/blobs" { + limit = int64(maxBlobBytes) + } + if r.ContentLength > limit { + writeErr(w, http.StatusRequestEntityTooLarge, "request body too large") + return + } + r.Body = http.MaxBytesReader(w, r.Body, limit) + if s.authMode == AuthOff || isAuthExempt(r) { s.mux.ServeHTTP(w, r) return } - // Buffer the body so the signature can be verified over it and the handler - // still reads it. Bodies on the control plane are small (JSON metadata or a - // media blob already capped upstream), so full buffering is acceptable. + // Buffer the (now bounded) body so the signature can be verified over it and + // the handler still reads it. body, err := io.ReadAll(r.Body) if err != nil { - writeErr(w, http.StatusBadRequest, "read body: "+err.Error()) + if isBodyTooLarge(err) { + writeErr(w, http.StatusRequestEntityTooLarge, "request body too large") + return + } + writeErr(w, http.StatusBadRequest, "read body") return } _ = r.Body.Close() r.Body = io.NopCloser(bytes.NewReader(body)) - if _, err := s.authenticate(r, body, time.Now()); err != nil { + if _, err := s.authenticate(r, body, now); err != nil { if s.authMode == AuthSoft { log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err) s.mux.ServeHTTP(w, r) @@ -81,6 +142,13 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } +// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader +// when the body exceeds its limit, so the middleware can map it to 413. +func isBodyTooLarge(err error) bool { + var maxErr *http.MaxBytesError + return errors.As(err, &maxErr) +} + // isAuthExempt lists requests that bypass control-plane auth even under enforce. // Only the unauthenticated health probe qualifies: it carries no data and is // needed by load balancers / smoke checks / systemd before any identity exists. @@ -401,9 +469,18 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) { } func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) { + // The body arrives already bounded: ServeHTTP wraps it in a MaxBytesReader + // (maxBlobBytes) and rejects an over-declared Content-Length before this + // handler runs, in every auth mode. Reading here therefore cannot buffer + // more than the ceiling; a sender that lies about its length (e.g. chunked) + // trips MaxBytesReader and we map that to 413 rather than a generic 400. data, err := io.ReadAll(r.Body) if err != nil { - writeErr(w, http.StatusBadRequest, "read body: "+err.Error()) + if isBodyTooLarge(err) { + writeErr(w, http.StatusRequestEntityTooLarge, "request body too large") + return + } + writeErr(w, http.StatusBadRequest, "read body") return } hash, err := s.blobs.Put(data)