From e7d59fd01daf9da192e0fbe88a24e510452a9381 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 7 Jun 2026 16:09:58 +0200 Subject: [PATCH] fix(0005c): bound aggregate buffered memory with a global in-flight byte limiter The H1 fix bounds each request (1 MiB control / 16 MiB blob) and the per-IP rate limiter throttles a single source, but neither bounds the AGGREGATE memory across concurrent requests. The re-audit (report 0006, N2) drove RSS to ~1.42 GB with 40 concurrent 16 MiB uploads, and noted that a multi-IP (botnet) flood scales without a ceiling because the rate limit is per-IP. Fix: a global, non-blocking, byte-counting limiter (pkg/membership/inflight.go). ServeHTTP reserves a POST's worst-case buffered size (its route ceiling) from the limiter before reading the body, and releases it when the request finishes. When the global cap (maxInflightBytes = 128 MiB) is reached, further POSTs are shed with 503 (backpressure) rather than parking goroutines, so total bytes buffered in flight stays bounded regardless of connection count or source-IP spread. GETs carry no body and do not consume the budget. The limiter is implemented inside unibus (not delegated to the fn-registry, where a generic concurrency primitive would normally live) because functions/core pulls transitive deps requiring CGO (mattn/go-sqlite3) and external modules that are incompatible with unibus's CGO_ENABLED=0 build, and because this work is scoped to the unibus sub-repo. The type/method comments document this. Verification: - pkg/membership/inflight_test.go: TestInflightLimiter{Basics,Disabled,Concurrent} cover golden/edge/error/disabled/over-release and a -race concurrency invariant (inFlight returns to 0, never exceeds cap). - pkg/membership/dos_concurrency_test.go: TestReaudit_DoSConcurrency fires 40 concurrent 16 MiB uploads from distinct IPs (the multi-IP shape) against a 48 MiB test cap -> 200=3 503=37, RSS delta ~93 MiB (bound 256 MiB), inFlight()==0, and a fresh upload still 200. With the limiter disabled the test fails (200=40 503=0), confirming it is a real regression guard. - CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./... green; CGO_ENABLED=1 go test -race ./pkg/membership/ green. Residual (documented): under enforce the body is buffered twice (auth verify + handler), so real RSS is ~2x the reserved bytes; closing that fully means streaming blobs to disk (overlaps H9 / issue 0002). Refs: report 0006 N2, issue 0005c. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/membership/dos_concurrency_test.go | 148 +++++++++++++++++++++++++ pkg/membership/inflight.go | 85 ++++++++++++++ pkg/membership/inflight_test.go | 97 ++++++++++++++++ pkg/membership/server.go | 26 +++++ 4 files changed, 356 insertions(+) create mode 100644 pkg/membership/dos_concurrency_test.go create mode 100644 pkg/membership/inflight.go create mode 100644 pkg/membership/inflight_test.go diff --git a/pkg/membership/dos_concurrency_test.go b/pkg/membership/dos_concurrency_test.go new file mode 100644 index 0000000..7635c90 --- /dev/null +++ b/pkg/membership/dos_concurrency_test.go @@ -0,0 +1,148 @@ +package membership + +import ( + "net/http" + "net/http/httptest" + "os" + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" +) + +// readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to +// call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the +// test's own goroutine). Returns 0 when unavailable. +func readRSSkBRaw() int64 { + b, err := os.ReadFile("/proc/self/status") + if err != nil { + return 0 + } + for _, line := range strings.Split(string(b), "\n") { + if strings.HasPrefix(line, "VmRSS:") { + f := strings.Fields(line) + if len(f) >= 2 { + v, _ := strconv.ParseInt(f[1], 10, 64) + return v + } + } + } + return 0 +} + +// TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the +// per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE +// memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40 +// concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the +// number of simultaneously-buffered uploads is capped, so the resident set stays +// bounded regardless of how many connections arrive at once. +// +// Coverage: +// - golden: a normal upload succeeds, and the server is still healthy after the +// storm (the limiter did not wedge it); +// - edge : concurrency right at the cap is admitted; +// - error : a concurrent flood far past the cap sheds the excess with 503 +// (backpressure) instead of buffering it all, and the RSS spike stays bounded +// and does NOT scale with the number of requests. +func TestReaudit_DoSConcurrency(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("RSS probe is Linux-only") + } + srv := dosServer(t, AuthOff) + // Force a small aggregate cap so the bound is observable in a unit test: with + // a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses + // maxInflightBytes (128 MiB); the mechanism under test is identical. + const cap = int64(48) << 20 + srv.inflight = newInflightLimiter(cap) + + const blob = maxBlobBytes // 16 MiB, the per-request ceiling + const n = 40 // the auditor's figure + + // A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a + // ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the + // delta should stay well under this. Without the limiter, 40 concurrent + // uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB). + const maxSpikeKB = int64(256) << 10 // 256 MiB + + runtime.GC() + before := readRSSkBRaw() + + // Sample peak RSS while the storm runs. + var peak int64 + atomic.StoreInt64(&peak, before) + stop := make(chan struct{}) + var sampler sync.WaitGroup + sampler.Add(1) + go func() { + defer sampler.Done() + for { + select { + case <-stop: + return + default: + if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) { + atomic.StoreInt64(&peak, v) + } + time.Sleep(2 * time.Millisecond) + } + } + }() + + var got503, got200 int64 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob}) + req.ContentLength = blob + // Distinct source IP per request: this is the multi-IP (botnet) shape the + // auditor flagged, where the per-IP rate limit gives no aggregate defense. + // The in-flight byte limiter is the global bound that must hold here. + req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234" + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, req) + switch rec.Code { + case http.StatusServiceUnavailable: + atomic.AddInt64(&got503, 1) + case http.StatusOK: + atomic.AddInt64(&got200, 1) + } + }() + } + wg.Wait() + close(stop) + sampler.Wait() + + runtime.GC() + delta := atomic.LoadInt64(&peak) - before + + // Error path: the flood must have hit the cap and shed the excess with 503. + if got503 == 0 { + t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503) + } + // The aggregate memory must stay bounded — not scale with n. + if delta > maxSpikeKB { + t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB) + } + // All reservations released after the storm. + if f := srv.inflight.inFlight(); f != 0 { + t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f) + } + + // Golden: the server is still healthy and serves a normal upload (from a fresh + // IP so the per-IP rate limiter, untouched here, is not what we measure). + rec := httptest.NewRecorder() + gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm")) + gReq.RemoteAddr = "203.0.113.9:9999" + srv.ServeHTTP(rec, gReq) + if rec.Code != http.StatusOK { + t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String()) + } + + t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB", + n, got200, got503, delta, maxSpikeKB, cap>>20) +} diff --git a/pkg/membership/inflight.go b/pkg/membership/inflight.go new file mode 100644 index 0000000..3fc3438 --- /dev/null +++ b/pkg/membership/inflight.go @@ -0,0 +1,85 @@ +package membership + +import "sync/atomic" + +// inflightLimiter is a non-blocking, byte-counting concurrency limiter: a global +// cap on how many bytes of request body the server will buffer simultaneously. +// +// The per-request body ceilings (maxControlBodyBytes / maxBlobBytes) bound a +// single request, and the per-IP rate limiter throttles a single source, but +// neither bounds the AGGREGATE memory across many concurrent uploads: the +// re-audit (report 0006, N2) showed 40 concurrent 16 MiB blob uploads driving +// RSS to ~1.42 GB, and a distributed (multi-IP) flood scales without a ceiling +// because the rate limiter is per-IP. This limiter is the missing aggregate +// bound: ServeHTTP reserves a request's worst-case buffered size before reading +// the body and releases it when the request finishes, so the total bytes in +// flight can never exceed max regardless of how many connections or source IPs +// arrive at once. +// +// It is intentionally NON-blocking: when a reservation does not fit, the caller +// sheds the request with backpressure (503) rather than parking a goroutine, +// which would let an attacker exhaust goroutines/connections instead of RAM. The +// counter is maintained with sync/atomic (a CAS loop), so it is safe for +// concurrent use without a mutex. +// +// Implementation note: this lives inside unibus rather than the fn-registry +// (where a generic concurrency primitive would normally belong) because the +// registry's functions/core package pulls in transitive dependencies that +// require CGO (mattn/go-sqlite3) and external modules, which are incompatible +// with unibus's CGO_ENABLED=0 build, and because this work is scoped to the +// unibus sub-repo. +type inflightLimiter struct { + max int64 // immutable after construction; <= 0 disables the limiter + used int64 // bytes currently reserved; accessed ONLY via sync/atomic +} + +// newInflightLimiter builds a limiter with a cap of maxBytes bytes in flight. +// maxBytes <= 0 disables the cap (tryAcquire always grants), which is the +// loopback/dev posture where an aggregate memory ceiling is not wanted. +func newInflightLimiter(maxBytes int64) *inflightLimiter { + return &inflightLimiter{max: maxBytes} +} + +// tryAcquire reserves n bytes without blocking. It returns true and reserves the +// bytes when they fit within the cap (used+n <= max), or false (reserving +// nothing) when they do not. n <= 0 is granted without reserving, and a disabled +// limiter (max <= 0) always grants. Safe for concurrent use. +func (l *inflightLimiter) tryAcquire(n int64) bool { + if l.max <= 0 || n <= 0 { + return true + } + for { + cur := atomic.LoadInt64(&l.used) + if cur+n > l.max { + return false + } + if atomic.CompareAndSwapInt64(&l.used, cur, cur+n) { + return true + } + } +} + +// release returns n previously reserved bytes. It must be paired with a +// tryAcquire that granted. A disabled limiter or n <= 0 is a no-op. The counter +// never drops below zero (a defensive clamp against an accidental double release). +func (l *inflightLimiter) release(n int64) { + if l.max <= 0 || n <= 0 { + return + } + for { + cur := atomic.LoadInt64(&l.used) + nv := cur - n + if nv < 0 { + nv = 0 + } + if atomic.CompareAndSwapInt64(&l.used, cur, nv) { + return + } + } +} + +// inFlight returns the bytes currently reserved. It is observability for tests +// and metrics. +func (l *inflightLimiter) inFlight() int64 { + return atomic.LoadInt64(&l.used) +} diff --git a/pkg/membership/inflight_test.go b/pkg/membership/inflight_test.go new file mode 100644 index 0000000..5f9fe0d --- /dev/null +++ b/pkg/membership/inflight_test.go @@ -0,0 +1,97 @@ +package membership + +import ( + "sync" + "testing" +) + +// TestInflightLimiterBasics covers the limiter contract: granting within the cap +// (golden), the exact boundary (edge), refusal over the cap without mutating the +// counter (error), the disabled mode, and the defensive clamp on over-release. +func TestInflightLimiterBasics(t *testing.T) { + l := newInflightLimiter(100) + + // Golden: a reservation within the cap is granted and reflected. + if !l.tryAcquire(60) { + t.Fatalf("acquire 60 within cap 100 should grant") + } + if l.inFlight() != 60 { + t.Fatalf("inFlight = %d, want 60", l.inFlight()) + } + + // Edge: exactly reaching the cap (60+40 == 100) is granted. + if !l.tryAcquire(40) { + t.Fatalf("acquire to the exact cap should grant") + } + if l.inFlight() != 100 { + t.Fatalf("inFlight = %d, want 100", l.inFlight()) + } + + // Error: one more byte over the full cap is refused, and the counter is left + // untouched (a refused reservation reserves nothing). + if l.tryAcquire(1) { + t.Fatalf("acquire over a full cap must be refused") + } + if l.inFlight() != 100 { + t.Fatalf("a refused acquire must not change inFlight; got %d", l.inFlight()) + } + + // Release frees capacity again. + l.release(100) + if l.inFlight() != 0 { + t.Fatalf("inFlight after full release = %d, want 0", l.inFlight()) + } + + // Defensive: an over-release never drives the counter negative. + l.release(50) + if l.inFlight() != 0 { + t.Fatalf("over-release must clamp at 0; got %d", l.inFlight()) + } +} + +// TestInflightLimiterDisabled verifies that a non-positive cap disables the +// limiter: every reservation is granted and nothing is tracked (the loopback/dev +// posture). +func TestInflightLimiterDisabled(t *testing.T) { + for _, max := range []int64{0, -1} { + l := newInflightLimiter(max) + if !l.tryAcquire(1 << 30) { + t.Fatalf("disabled limiter (max=%d) must always grant", max) + } + if l.inFlight() != 0 { + t.Fatalf("disabled limiter must not track usage; got %d", l.inFlight()) + } + l.release(1 << 30) // no-op, must not panic + } +} + +// TestInflightLimiterConcurrent hammers the limiter from many goroutines with +// equal-sized acquire/release pairs and asserts the invariant never breaks: the +// counter returns to 0 and never exceeds the cap. Run with -race for the memory +// model guarantee. +func TestInflightLimiterConcurrent(t *testing.T) { + const cap = 1000 + const chunk = 7 + l := newInflightLimiter(cap) + + var wg sync.WaitGroup + for g := 0; g < 64; g++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 2000; i++ { + if l.tryAcquire(chunk) { + if f := l.inFlight(); f > cap { + t.Errorf("inFlight %d exceeded cap %d", f, cap) + return + } + l.release(chunk) + } + } + }() + } + wg.Wait() + if l.inFlight() != 0 { + t.Fatalf("after all goroutines, inFlight = %d, want 0", l.inFlight()) + } +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index 2159605..753a7c6 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -35,6 +35,14 @@ const ( // MaxHeaderBytes caps request header size; wired into the http.Server by the // command. Exported so the bound lives next to its body-size siblings. MaxHeaderBytes = 1 << 20 // 1 MiB + // maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all + // concurrent requests (audit N2). The per-request ceilings above bound one + // request; this bounds the sum, so a concurrent (even multi-IP) flood of + // max-size uploads cannot drive the resident set without limit. 128 MiB allows + // ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before + // further POSTs are shed with 503 — generous for an interactive bus, bounded + // for an attacker. + maxInflightBytes = 128 << 20 // 128 MiB ) // Per-IP rate-limit defaults for the control plane. Tuned for an interactive @@ -62,6 +70,7 @@ type Server struct { authMode AuthMode nonces nonceStore limiter *ipRateLimiter + inflight *inflightLimiter // RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS) // rooms. It is the minimum-defensive control for the data plane (audit H4): @@ -87,6 +96,7 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server { authMode: authMode, nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries), limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL), + inflight: newInflightLimiter(maxInflightBytes), } s.routes() return s @@ -139,6 +149,22 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } r.Body = http.MaxBytesReader(w, r.Body, limit) + // Aggregate memory bound (audit N2): the per-request ceiling above and the + // per-IP rate limit do not cap the TOTAL bytes buffered across concurrent + // requests. A POST reserves its worst-case buffered size (its route ceiling) + // from a global limiter before the body is read, and is shed with 503 when the + // cap is reached, so the resident set stays bounded under a concurrent (even + // multi-IP) upload flood instead of growing linearly with the number of + // connections. Reservation is released when the request finishes. Only POSTs + // buffer a body; GETs carry none, so they do not consume the budget. + if r.Method == http.MethodPost { + if !s.inflight.tryAcquire(limit) { + writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight") + return + } + defer s.inflight.release(limit) + } + if s.authMode == AuthOff || isAuthExempt(r) { s.mux.ServeHTTP(w, r) return