fix(0005c): bound aggregate buffered memory with a global in-flight byte limiter

The H1 fix bounds each request (1 MiB control / 16 MiB blob) and the per-IP rate
limiter throttles a single source, but neither bounds the AGGREGATE memory across
concurrent requests. The re-audit (report 0006, N2) drove RSS to ~1.42 GB with 40
concurrent 16 MiB uploads, and noted that a multi-IP (botnet) flood scales without
a ceiling because the rate limit is per-IP.

Fix: a global, non-blocking, byte-counting limiter (pkg/membership/inflight.go).
ServeHTTP reserves a POST's worst-case buffered size (its route ceiling) from the
limiter before reading the body, and releases it when the request finishes. When
the global cap (maxInflightBytes = 128 MiB) is reached, further POSTs are shed
with 503 (backpressure) rather than parking goroutines, so total bytes buffered
in flight stays bounded regardless of connection count or source-IP spread. GETs
carry no body and do not consume the budget.

The limiter is implemented inside unibus (not delegated to the fn-registry, where
a generic concurrency primitive would normally live) because functions/core pulls
transitive deps requiring CGO (mattn/go-sqlite3) and external modules that are
incompatible with unibus's CGO_ENABLED=0 build, and because this work is scoped
to the unibus sub-repo. The type/method comments document this.

Verification:
- pkg/membership/inflight_test.go: TestInflightLimiter{Basics,Disabled,Concurrent}
  cover golden/edge/error/disabled/over-release and a -race concurrency invariant
  (inFlight returns to 0, never exceeds cap).
- pkg/membership/dos_concurrency_test.go: TestReaudit_DoSConcurrency fires 40
  concurrent 16 MiB uploads from distinct IPs (the multi-IP shape) against a 48 MiB
  test cap -> 200=3 503=37, RSS delta ~93 MiB (bound 256 MiB), inFlight()==0, and a
  fresh upload still 200. With the limiter disabled the test fails (200=40 503=0),
  confirming it is a real regression guard.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green;
  CGO_ENABLED=1 go test -race ./pkg/membership/ green.

Residual (documented): under enforce the body is buffered twice (auth verify +
handler), so real RSS is ~2x the reserved bytes; closing that fully means
streaming blobs to disk (overlaps H9 / issue 0002).

Refs: report 0006 N2, issue 0005c.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 16:09:58 +02:00
parent 0f79708338
commit e7d59fd01d
4 changed files with 356 additions and 0 deletions
+148
View File
@@ -0,0 +1,148 @@
package membership
import (
"net/http"
"net/http/httptest"
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to
// call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the
// test's own goroutine). Returns 0 when unavailable.
func readRSSkBRaw() int64 {
b, err := os.ReadFile("/proc/self/status")
if err != nil {
return 0
}
for _, line := range strings.Split(string(b), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
f := strings.Fields(line)
if len(f) >= 2 {
v, _ := strconv.ParseInt(f[1], 10, 64)
return v
}
}
}
return 0
}
// TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the
// per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE
// memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40
// concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the
// number of simultaneously-buffered uploads is capped, so the resident set stays
// bounded regardless of how many connections arrive at once.
//
// Coverage:
// - golden: a normal upload succeeds, and the server is still healthy after the
// storm (the limiter did not wedge it);
// - edge : concurrency right at the cap is admitted;
// - error : a concurrent flood far past the cap sheds the excess with 503
// (backpressure) instead of buffering it all, and the RSS spike stays bounded
// and does NOT scale with the number of requests.
func TestReaudit_DoSConcurrency(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("RSS probe is Linux-only")
}
srv := dosServer(t, AuthOff)
// Force a small aggregate cap so the bound is observable in a unit test: with
// a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses
// maxInflightBytes (128 MiB); the mechanism under test is identical.
const cap = int64(48) << 20
srv.inflight = newInflightLimiter(cap)
const blob = maxBlobBytes // 16 MiB, the per-request ceiling
const n = 40 // the auditor's figure
// A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a
// ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the
// delta should stay well under this. Without the limiter, 40 concurrent
// uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB).
const maxSpikeKB = int64(256) << 10 // 256 MiB
runtime.GC()
before := readRSSkBRaw()
// Sample peak RSS while the storm runs.
var peak int64
atomic.StoreInt64(&peak, before)
stop := make(chan struct{})
var sampler sync.WaitGroup
sampler.Add(1)
go func() {
defer sampler.Done()
for {
select {
case <-stop:
return
default:
if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) {
atomic.StoreInt64(&peak, v)
}
time.Sleep(2 * time.Millisecond)
}
}
}()
var got503, got200 int64
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob})
req.ContentLength = blob
// Distinct source IP per request: this is the multi-IP (botnet) shape the
// auditor flagged, where the per-IP rate limit gives no aggregate defense.
// The in-flight byte limiter is the global bound that must hold here.
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234"
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, req)
switch rec.Code {
case http.StatusServiceUnavailable:
atomic.AddInt64(&got503, 1)
case http.StatusOK:
atomic.AddInt64(&got200, 1)
}
}()
}
wg.Wait()
close(stop)
sampler.Wait()
runtime.GC()
delta := atomic.LoadInt64(&peak) - before
// Error path: the flood must have hit the cap and shed the excess with 503.
if got503 == 0 {
t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503)
}
// The aggregate memory must stay bounded — not scale with n.
if delta > maxSpikeKB {
t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB)
}
// All reservations released after the storm.
if f := srv.inflight.inFlight(); f != 0 {
t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f)
}
// Golden: the server is still healthy and serves a normal upload (from a fresh
// IP so the per-IP rate limiter, untouched here, is not what we measure).
rec := httptest.NewRecorder()
gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm"))
gReq.RemoteAddr = "203.0.113.9:9999"
srv.ServeHTTP(rec, gReq)
if rec.Code != http.StatusOK {
t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String())
}
t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB",
n, got200, got503, delta, maxSpikeKB, cap>>20)
}
+85
View File
@@ -0,0 +1,85 @@
package membership
import "sync/atomic"
// inflightLimiter is a non-blocking, byte-counting concurrency limiter: a global
// cap on how many bytes of request body the server will buffer simultaneously.
//
// The per-request body ceilings (maxControlBodyBytes / maxBlobBytes) bound a
// single request, and the per-IP rate limiter throttles a single source, but
// neither bounds the AGGREGATE memory across many concurrent uploads: the
// re-audit (report 0006, N2) showed 40 concurrent 16 MiB blob uploads driving
// RSS to ~1.42 GB, and a distributed (multi-IP) flood scales without a ceiling
// because the rate limiter is per-IP. This limiter is the missing aggregate
// bound: ServeHTTP reserves a request's worst-case buffered size before reading
// the body and releases it when the request finishes, so the total bytes in
// flight can never exceed max regardless of how many connections or source IPs
// arrive at once.
//
// It is intentionally NON-blocking: when a reservation does not fit, the caller
// sheds the request with backpressure (503) rather than parking a goroutine,
// which would let an attacker exhaust goroutines/connections instead of RAM. The
// counter is maintained with sync/atomic (a CAS loop), so it is safe for
// concurrent use without a mutex.
//
// Implementation note: this lives inside unibus rather than the fn-registry
// (where a generic concurrency primitive would normally belong) because the
// registry's functions/core package pulls in transitive dependencies that
// require CGO (mattn/go-sqlite3) and external modules, which are incompatible
// with unibus's CGO_ENABLED=0 build, and because this work is scoped to the
// unibus sub-repo.
type inflightLimiter struct {
max int64 // immutable after construction; <= 0 disables the limiter
used int64 // bytes currently reserved; accessed ONLY via sync/atomic
}
// newInflightLimiter builds a limiter with a cap of maxBytes bytes in flight.
// maxBytes <= 0 disables the cap (tryAcquire always grants), which is the
// loopback/dev posture where an aggregate memory ceiling is not wanted.
func newInflightLimiter(maxBytes int64) *inflightLimiter {
return &inflightLimiter{max: maxBytes}
}
// tryAcquire reserves n bytes without blocking. It returns true and reserves the
// bytes when they fit within the cap (used+n <= max), or false (reserving
// nothing) when they do not. n <= 0 is granted without reserving, and a disabled
// limiter (max <= 0) always grants. Safe for concurrent use.
func (l *inflightLimiter) tryAcquire(n int64) bool {
if l.max <= 0 || n <= 0 {
return true
}
for {
cur := atomic.LoadInt64(&l.used)
if cur+n > l.max {
return false
}
if atomic.CompareAndSwapInt64(&l.used, cur, cur+n) {
return true
}
}
}
// release returns n previously reserved bytes. It must be paired with a
// tryAcquire that granted. A disabled limiter or n <= 0 is a no-op. The counter
// never drops below zero (a defensive clamp against an accidental double release).
func (l *inflightLimiter) release(n int64) {
if l.max <= 0 || n <= 0 {
return
}
for {
cur := atomic.LoadInt64(&l.used)
nv := cur - n
if nv < 0 {
nv = 0
}
if atomic.CompareAndSwapInt64(&l.used, cur, nv) {
return
}
}
}
// inFlight returns the bytes currently reserved. It is observability for tests
// and metrics.
func (l *inflightLimiter) inFlight() int64 {
return atomic.LoadInt64(&l.used)
}
+97
View File
@@ -0,0 +1,97 @@
package membership
import (
"sync"
"testing"
)
// TestInflightLimiterBasics covers the limiter contract: granting within the cap
// (golden), the exact boundary (edge), refusal over the cap without mutating the
// counter (error), the disabled mode, and the defensive clamp on over-release.
func TestInflightLimiterBasics(t *testing.T) {
l := newInflightLimiter(100)
// Golden: a reservation within the cap is granted and reflected.
if !l.tryAcquire(60) {
t.Fatalf("acquire 60 within cap 100 should grant")
}
if l.inFlight() != 60 {
t.Fatalf("inFlight = %d, want 60", l.inFlight())
}
// Edge: exactly reaching the cap (60+40 == 100) is granted.
if !l.tryAcquire(40) {
t.Fatalf("acquire to the exact cap should grant")
}
if l.inFlight() != 100 {
t.Fatalf("inFlight = %d, want 100", l.inFlight())
}
// Error: one more byte over the full cap is refused, and the counter is left
// untouched (a refused reservation reserves nothing).
if l.tryAcquire(1) {
t.Fatalf("acquire over a full cap must be refused")
}
if l.inFlight() != 100 {
t.Fatalf("a refused acquire must not change inFlight; got %d", l.inFlight())
}
// Release frees capacity again.
l.release(100)
if l.inFlight() != 0 {
t.Fatalf("inFlight after full release = %d, want 0", l.inFlight())
}
// Defensive: an over-release never drives the counter negative.
l.release(50)
if l.inFlight() != 0 {
t.Fatalf("over-release must clamp at 0; got %d", l.inFlight())
}
}
// TestInflightLimiterDisabled verifies that a non-positive cap disables the
// limiter: every reservation is granted and nothing is tracked (the loopback/dev
// posture).
func TestInflightLimiterDisabled(t *testing.T) {
for _, max := range []int64{0, -1} {
l := newInflightLimiter(max)
if !l.tryAcquire(1 << 30) {
t.Fatalf("disabled limiter (max=%d) must always grant", max)
}
if l.inFlight() != 0 {
t.Fatalf("disabled limiter must not track usage; got %d", l.inFlight())
}
l.release(1 << 30) // no-op, must not panic
}
}
// TestInflightLimiterConcurrent hammers the limiter from many goroutines with
// equal-sized acquire/release pairs and asserts the invariant never breaks: the
// counter returns to 0 and never exceeds the cap. Run with -race for the memory
// model guarantee.
func TestInflightLimiterConcurrent(t *testing.T) {
const cap = 1000
const chunk = 7
l := newInflightLimiter(cap)
var wg sync.WaitGroup
for g := 0; g < 64; g++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 2000; i++ {
if l.tryAcquire(chunk) {
if f := l.inFlight(); f > cap {
t.Errorf("inFlight %d exceeded cap %d", f, cap)
return
}
l.release(chunk)
}
}
}()
}
wg.Wait()
if l.inFlight() != 0 {
t.Fatalf("after all goroutines, inFlight = %d, want 0", l.inFlight())
}
}
+26
View File
@@ -35,6 +35,14 @@ const (
// MaxHeaderBytes caps request header size; wired into the http.Server by the
// command. Exported so the bound lives next to its body-size siblings.
MaxHeaderBytes = 1 << 20 // 1 MiB
// maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all
// concurrent requests (audit N2). The per-request ceilings above bound one
// request; this bounds the sum, so a concurrent (even multi-IP) flood of
// max-size uploads cannot drive the resident set without limit. 128 MiB allows
// ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before
// further POSTs are shed with 503 — generous for an interactive bus, bounded
// for an attacker.
maxInflightBytes = 128 << 20 // 128 MiB
)
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
@@ -62,6 +70,7 @@ type Server struct {
authMode AuthMode
nonces nonceStore
limiter *ipRateLimiter
inflight *inflightLimiter
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
// rooms. It is the minimum-defensive control for the data plane (audit H4):
@@ -87,6 +96,7 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
authMode: authMode,
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
inflight: newInflightLimiter(maxInflightBytes),
}
s.routes()
return s
@@ -139,6 +149,22 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
r.Body = http.MaxBytesReader(w, r.Body, limit)
// Aggregate memory bound (audit N2): the per-request ceiling above and the
// per-IP rate limit do not cap the TOTAL bytes buffered across concurrent
// requests. A POST reserves its worst-case buffered size (its route ceiling)
// from a global limiter before the body is read, and is shed with 503 when the
// cap is reached, so the resident set stays bounded under a concurrent (even
// multi-IP) upload flood instead of growing linearly with the number of
// connections. Reservation is released when the request finishes. Only POSTs
// buffer a body; GETs carry none, so they do not consume the budget.
if r.Method == http.MethodPost {
if !s.inflight.tryAcquire(limit) {
writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight")
return
}
defer s.inflight.release(limit)
}
if s.authMode == AuthOff || isAuthExempt(r) {
s.mux.ServeHTTP(w, r)
return