package membership import ( "net/http" "net/http/httptest" "os" "runtime" "strconv" "strings" "sync" "sync/atomic" "testing" "time" ) // readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to // call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the // test's own goroutine). Returns 0 when unavailable. func readRSSkBRaw() int64 { b, err := os.ReadFile("/proc/self/status") if err != nil { return 0 } for _, line := range strings.Split(string(b), "\n") { if strings.HasPrefix(line, "VmRSS:") { f := strings.Fields(line) if len(f) >= 2 { v, _ := strconv.ParseInt(f[1], 10, 64) return v } } } return 0 } // TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the // per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE // memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40 // concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the // number of simultaneously-buffered uploads is capped, so the resident set stays // bounded regardless of how many connections arrive at once. // // Coverage: // - golden: a normal upload succeeds, and the server is still healthy after the // storm (the limiter did not wedge it); // - edge : concurrency right at the cap is admitted; // - error : a concurrent flood far past the cap sheds the excess with 503 // (backpressure) instead of buffering it all, and the RSS spike stays bounded // and does NOT scale with the number of requests. func TestReaudit_DoSConcurrency(t *testing.T) { if runtime.GOOS != "linux" { t.Skip("RSS probe is Linux-only") } srv := dosServer(t, AuthOff) // Force a small aggregate cap so the bound is observable in a unit test: with // a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses // maxInflightBytes (128 MiB); the mechanism under test is identical. const cap = int64(48) << 20 srv.inflight = newInflightLimiter(cap) const blob = maxBlobBytes // 16 MiB, the per-request ceiling const n = 40 // the auditor's figure // A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a // ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the // delta should stay well under this. Without the limiter, 40 concurrent // uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB). const maxSpikeKB = int64(256) << 10 // 256 MiB runtime.GC() before := readRSSkBRaw() // Sample peak RSS while the storm runs. var peak int64 atomic.StoreInt64(&peak, before) stop := make(chan struct{}) var sampler sync.WaitGroup sampler.Add(1) go func() { defer sampler.Done() for { select { case <-stop: return default: if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) { atomic.StoreInt64(&peak, v) } time.Sleep(2 * time.Millisecond) } } }() var got503, got200 int64 var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob}) req.ContentLength = blob // Distinct source IP per request: this is the multi-IP (botnet) shape the // auditor flagged, where the per-IP rate limit gives no aggregate defense. // The in-flight byte limiter is the global bound that must hold here. req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234" rec := httptest.NewRecorder() srv.ServeHTTP(rec, req) switch rec.Code { case http.StatusServiceUnavailable: atomic.AddInt64(&got503, 1) case http.StatusOK: atomic.AddInt64(&got200, 1) } }() } wg.Wait() close(stop) sampler.Wait() runtime.GC() delta := atomic.LoadInt64(&peak) - before // Error path: the flood must have hit the cap and shed the excess with 503. if got503 == 0 { t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503) } // The aggregate memory must stay bounded — not scale with n. if delta > maxSpikeKB { t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB) } // All reservations released after the storm. if f := srv.inflight.inFlight(); f != 0 { t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f) } // Golden: the server is still healthy and serves a normal upload (from a fresh // IP so the per-IP rate limiter, untouched here, is not what we measure). rec := httptest.NewRecorder() gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm")) gReq.RemoteAddr = "203.0.113.9:9999" srv.ServeHTTP(rec, gReq) if rec.Code != http.StatusOK { t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String()) } t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB", n, got200, got503, delta, maxSpikeKB, cap>>20) }