Merge issue/0004a-dos-limit: pre-auth DoS hardening (H1)
Body-size ceilings (MaxBytesReader, per-route + Content-Length pre-reject), Server.MaxHeaderBytes, and a per-IP token-bucket rate limit shut the critical pre-auth memory-exhaustion vector. Regression test asserts a bounded RSS.
This commit is contained in:
@@ -113,7 +113,15 @@ func main() {
|
||||
srv := membership.NewServer(store, blobs, authMode)
|
||||
log.Printf("control-plane auth: %s", authMode)
|
||||
addr := *bind + ":" + *httpPort
|
||||
httpSrv := &http.Server{Addr: addr, Handler: srv}
|
||||
httpSrv := &http.Server{
|
||||
Addr: addr,
|
||||
Handler: srv,
|
||||
// Bound request header size so a peer cannot exhaust memory with huge
|
||||
// headers before any body limit applies (the body ceilings live in the
|
||||
// membership middleware).
|
||||
MaxHeaderBytes: membership.MaxHeaderBytes,
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
go func() {
|
||||
log.Printf("HTTP control-plane API: http://%s", addr)
|
||||
|
||||
@@ -8,7 +8,9 @@ require (
|
||||
fn-registry v0.0.0-00010101000000-000000000000
|
||||
github.com/nats-io/nats-server/v2 v2.10.22
|
||||
github.com/nats-io/nats.go v1.37.0
|
||||
github.com/nats-io/nkeys v0.4.7
|
||||
github.com/oklog/ulid/v2 v2.1.0
|
||||
golang.org/x/time v0.7.0
|
||||
modernc.org/sqlite v1.47.0
|
||||
)
|
||||
|
||||
@@ -19,7 +21,6 @@ require (
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/minio/highwayhash v1.0.3 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.5.8 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
@@ -29,7 +30,6 @@ require (
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.44.0 // indirect
|
||||
golang.org/x/text v0.37.0 // indirect
|
||||
golang.org/x/time v0.7.0 // indirect
|
||||
golang.org/x/tools v0.45.0 // indirect
|
||||
modernc.org/libc v1.70.0 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
@@ -47,6 +49,10 @@ golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
|
||||
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
|
||||
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
|
||||
golang.org/x/tools/go/expect v0.1.1-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
|
||||
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM=
|
||||
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
|
||||
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
|
||||
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
||||
modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw=
|
||||
|
||||
@@ -0,0 +1,206 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
)
|
||||
|
||||
// dosServer builds a Server backed by a fresh store + blob store so a test can
|
||||
// drive ServeHTTP in-process (white-box) and observe its memory behavior without
|
||||
// a network round trip — the same in-process technique the auditor used.
|
||||
func dosServer(t *testing.T, mode AuthMode) *Server {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
store, err := Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
if err != nil {
|
||||
t.Fatalf("open blobs: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
return NewServer(store, blobs, mode)
|
||||
}
|
||||
|
||||
// zeroReader yields up to remaining zero bytes without ever allocating them, so
|
||||
// the test process itself never materializes a huge buffer (which would taint the
|
||||
// RSS measurement we are trying to make about the SERVER).
|
||||
type zeroReader struct{ remaining int64 }
|
||||
|
||||
func (z *zeroReader) Read(p []byte) (int, error) {
|
||||
if z.remaining <= 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := int64(len(p))
|
||||
if n > z.remaining {
|
||||
n = z.remaining
|
||||
}
|
||||
for i := int64(0); i < n; i++ {
|
||||
p[i] = 0
|
||||
}
|
||||
z.remaining -= n
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// vmRSSkB reads the resident set size (kB) of this process from /proc. Linux-only;
|
||||
// the caller skips on other platforms.
|
||||
func vmRSSkB(t *testing.T) int64 {
|
||||
t.Helper()
|
||||
b, err := os.ReadFile("/proc/self/status")
|
||||
if err != nil {
|
||||
t.Skipf("cannot read /proc/self/status: %v", err)
|
||||
}
|
||||
for _, line := range strings.Split(string(b), "\n") {
|
||||
if strings.HasPrefix(line, "VmRSS:") {
|
||||
f := strings.Fields(line)
|
||||
if len(f) >= 2 {
|
||||
v, _ := strconv.ParseInt(f[1], 10, 64)
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
t.Skip("VmRSS not present in /proc/self/status")
|
||||
return 0
|
||||
}
|
||||
|
||||
// TestAudit_DoSBodyLimitNoAuth ports the auditor's H1 (Critical) vector: a peer
|
||||
// with NO valid signature posts an oversized body. Before the fix the middleware
|
||||
// io.ReadAll'd it unbounded (the auditor sent 400 MB and watched RSS jump from
|
||||
// 18 MB to 898 MB). Now the request is rejected 413 and the resident set does NOT
|
||||
// spike. Two shapes are covered:
|
||||
//
|
||||
// (1) a truthful, over-ceiling Content-Length -> rejected before any byte is read;
|
||||
// (2) a lying / unknown length (chunked) -> MaxBytesReader trips mid-read,
|
||||
// capping the buffered bytes at the ceiling instead of the attacker's 400 MB.
|
||||
func TestAudit_DoSBodyLimitNoAuth(t *testing.T) {
|
||||
if runtime.GOOS != "linux" {
|
||||
t.Skip("RSS probe is Linux-only")
|
||||
}
|
||||
srv := dosServer(t, AuthEnforce) // enforce: the request carries no signature
|
||||
|
||||
const huge = int64(400) << 20 // 400 MiB — the auditor's figure
|
||||
// A spike threshold an order of magnitude below the attack. The old code would
|
||||
// add ~400 MB+; the fix keeps the delta to at most one bounded buffer.
|
||||
const maxSpikeKB = int64(96) << 10 // 96 MiB
|
||||
|
||||
// Shape 1: declared Content-Length over the blob ceiling -> early 413, no read.
|
||||
runtime.GC()
|
||||
before := vmRSSkB(t)
|
||||
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge})
|
||||
req.ContentLength = huge
|
||||
rec := httptest.NewRecorder()
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||
t.Fatalf("over-declared body should be 413, got %d", rec.Code)
|
||||
}
|
||||
runtime.GC()
|
||||
if d := vmRSSkB(t) - before; d > maxSpikeKB {
|
||||
t.Fatalf("RSS spiked %d kB on a pre-declared oversized body (limit %d kB)", d, maxSpikeKB)
|
||||
}
|
||||
|
||||
// Shape 2: unknown length (chunked-style). The middleware cannot reject by
|
||||
// Content-Length, so MaxBytesReader must cap the read at maxBlobBytes.
|
||||
runtime.GC()
|
||||
before = vmRSSkB(t)
|
||||
req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge})
|
||||
req.ContentLength = -1
|
||||
rec = httptest.NewRecorder()
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||
t.Fatalf("unknown-length oversized body should be 413, got %d", rec.Code)
|
||||
}
|
||||
runtime.GC()
|
||||
if d := vmRSSkB(t) - before; d > maxSpikeKB {
|
||||
t.Fatalf("RSS spiked %d kB on a chunked oversized body (limit %d kB)", d, maxSpikeKB)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBlobLimitGoldenAndBoundary covers the golden path (a normal blob is stored)
|
||||
// and the boundary (a body exactly at the ceiling is accepted; one byte over by
|
||||
// truthful Content-Length is rejected before buffering).
|
||||
func TestBlobLimitGoldenAndBoundary(t *testing.T) {
|
||||
srv := dosServer(t, AuthOff) // AuthOff: the limits apply regardless of auth mode
|
||||
|
||||
// Golden: a small blob is accepted and hashed.
|
||||
rec := httptest.NewRecorder()
|
||||
srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello blob")))
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("normal blob should be 200, got %d (%s)", rec.Code, rec.Body.String())
|
||||
}
|
||||
|
||||
// Boundary: exactly at the ceiling is allowed (MaxBytesReader permits N bytes).
|
||||
atLimit := strings.Repeat("a", maxBlobBytes)
|
||||
rec = httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader(atLimit))
|
||||
req.ContentLength = int64(len(atLimit))
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("blob exactly at the ceiling should be 200, got %d", rec.Code)
|
||||
}
|
||||
|
||||
// Error: one byte over the ceiling (truthful Content-Length) -> 413 pre-read.
|
||||
rec = httptest.NewRecorder()
|
||||
req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: maxBlobBytes + 1})
|
||||
req.ContentLength = maxBlobBytes + 1
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||
t.Fatalf("blob one byte over the ceiling should be 413, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestControlBodyLimit checks the smaller JSON ceiling on a non-blob route: a body
|
||||
// over maxControlBodyBytes is rejected 413 before the handler runs.
|
||||
func TestControlBodyLimit(t *testing.T) {
|
||||
srv := dosServer(t, AuthOff)
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodPost, "/rooms", &zeroReader{remaining: maxControlBodyBytes + 1})
|
||||
req.ContentLength = maxControlBodyBytes + 1
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||
t.Fatalf("control body over 1 MiB should be 413, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRateLimitPerIP exercises the per-IP throttle: a burst from one IP eventually
|
||||
// gets 429 (error path), while a spread across distinct IPs is never throttled
|
||||
// (edge — the bucket is keyed per source, not global).
|
||||
func TestRateLimitPerIP(t *testing.T) {
|
||||
srv := dosServer(t, AuthOff)
|
||||
|
||||
// Same IP: well past the burst -> at least one 429.
|
||||
got429 := false
|
||||
for i := 0; i < defaultRateBurst+50; i++ {
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil)
|
||||
req.RemoteAddr = "203.0.113.7:5555"
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code == http.StatusTooManyRequests {
|
||||
got429 = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !got429 {
|
||||
t.Fatalf("a flood from one IP should eventually be rate-limited (429)")
|
||||
}
|
||||
|
||||
// Distinct IPs: each gets a fresh bucket, so none is throttled.
|
||||
for i := 0; i < 100; i++ {
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil)
|
||||
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":4444"
|
||||
srv.ServeHTTP(rec, req)
|
||||
if rec.Code == http.StatusTooManyRequests {
|
||||
t.Fatalf("distinct IPs must not share a rate bucket; IP #%d got 429", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// ipRateLimiter is a per-source-IP token-bucket rate limiter for the control
|
||||
// plane. It exists to blunt pre-auth flooding: an unauthenticated peer that
|
||||
// hammers the HTTP API (signature verification is not free, and io is bounded
|
||||
// but still real) is throttled before it can amplify load. Like the nonceCache,
|
||||
// this is transport glue specific to unibus, not a registry primitive — the
|
||||
// report 0003 made the same call for the nonce cache (it would only drag a NATS
|
||||
// dependency into the multi-domain registry go.mod for one helper).
|
||||
//
|
||||
// Each distinct IP gets its own golang.org/x/time/rate.Limiter (a standard
|
||||
// token bucket already in the module graph, so no new dependency). Idle buckets
|
||||
// are reaped so the map cannot grow without bound under a churn of source IPs.
|
||||
type ipRateLimiter struct {
|
||||
mu sync.Mutex
|
||||
buckets map[string]*ipBucket
|
||||
r rate.Limit
|
||||
burst int
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
type ipBucket struct {
|
||||
lim *rate.Limiter
|
||||
seen time.Time
|
||||
}
|
||||
|
||||
// newIPRateLimiter builds a limiter granting r tokens/second with the given
|
||||
// burst per IP. ttl bounds how long an idle bucket is retained before being
|
||||
// reaped. r<=0 disables limiting (Allow always true) so dev/loopback stacks are
|
||||
// unaffected.
|
||||
func newIPRateLimiter(r rate.Limit, burst int, ttl time.Duration) *ipRateLimiter {
|
||||
return &ipRateLimiter{
|
||||
buckets: make(map[string]*ipBucket),
|
||||
r: r,
|
||||
burst: burst,
|
||||
ttl: ttl,
|
||||
}
|
||||
}
|
||||
|
||||
// allow reports whether a request from ip may proceed now, consuming one token
|
||||
// on success. A disabled limiter (r<=0) always allows. Reaping of stale buckets
|
||||
// is amortized: it runs only when the map has grown past a small threshold, so
|
||||
// the common path is a single map lookup under the mutex.
|
||||
func (l *ipRateLimiter) allow(ip string, now time.Time) bool {
|
||||
if l == nil || l.r <= 0 {
|
||||
return true
|
||||
}
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if len(l.buckets) > 1024 {
|
||||
l.reapLocked(now)
|
||||
}
|
||||
b, ok := l.buckets[ip]
|
||||
if !ok {
|
||||
b = &ipBucket{lim: rate.NewLimiter(l.r, l.burst)}
|
||||
l.buckets[ip] = b
|
||||
}
|
||||
b.seen = now
|
||||
return b.lim.AllowN(now, 1)
|
||||
}
|
||||
|
||||
// reapLocked drops buckets idle for longer than ttl. The caller holds l.mu.
|
||||
func (l *ipRateLimiter) reapLocked(now time.Time) {
|
||||
for ip, b := range l.buckets {
|
||||
if now.Sub(b.seen) > l.ttl {
|
||||
delete(l.buckets, ip)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// clientIP extracts the source IP of an HTTP request, stripping the port. It
|
||||
// trusts the transport's RemoteAddr only (no X-Forwarded-For parsing): a public
|
||||
// deployment terminates TLS at this process or behind a proxy that the operator
|
||||
// controls, and honoring an attacker-supplied header would let a single IP fan
|
||||
// its quota across forged identities. If parsing fails the whole RemoteAddr is
|
||||
// used as the key (still a stable per-connection bucket).
|
||||
func clientIP(r *http.Request) string {
|
||||
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||
if err != nil {
|
||||
return r.RemoteAddr
|
||||
}
|
||||
return host
|
||||
}
|
||||
@@ -15,9 +15,36 @@ import (
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
)
|
||||
|
||||
// Body-size ceilings for the control plane. They bound how much an unauthenticated
|
||||
// peer can make the server buffer in RAM before the request is even authenticated
|
||||
// (the signature is verified over the full body, so the body must be read — but
|
||||
// not unboundedly). maxControlBodyBytes covers JSON metadata requests; /blobs gets
|
||||
// a separate, larger ceiling because media ciphertext is legitimately bigger. A
|
||||
// request whose declared Content-Length already exceeds its ceiling is rejected
|
||||
// before a single byte is buffered.
|
||||
const (
|
||||
maxControlBodyBytes = 1 << 20 // 1 MiB for JSON control-plane requests
|
||||
maxBlobBytes = 16 << 20 // 16 MiB for a single media blob upload
|
||||
// MaxHeaderBytes caps request header size; wired into the http.Server by the
|
||||
// command. Exported so the bound lives next to its body-size siblings.
|
||||
MaxHeaderBytes = 1 << 20 // 1 MiB
|
||||
)
|
||||
|
||||
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
|
||||
// human/agent bus rather than a high-QPS API: a steady ~20 req/s with a burst of
|
||||
// 40 absorbs a chat client's bursty polling while throttling a flood. Loopback
|
||||
// dev stacks pass r<=0 to disable limiting entirely.
|
||||
const (
|
||||
defaultRatePerSec = rate.Limit(20)
|
||||
defaultRateBurst = 40
|
||||
rateBucketTTL = 10 * time.Minute
|
||||
)
|
||||
|
||||
// Server is the HTTP control plane: the authoritative source of room metadata,
|
||||
// membership, and per-epoch sealed keys. The data plane (messages) is NATS.
|
||||
//
|
||||
@@ -32,11 +59,14 @@ type Server struct {
|
||||
mux *http.ServeMux
|
||||
authMode AuthMode
|
||||
nonces *nonceCache
|
||||
limiter *ipRateLimiter
|
||||
}
|
||||
|
||||
// NewServer wires the membership store and blob store into an http.Handler. The
|
||||
// authMode selects the control-plane auth rollout state (AuthOff for callers and
|
||||
// tests that have not migrated to signed requests yet).
|
||||
// tests that have not migrated to signed requests yet). It installs a per-IP
|
||||
// rate limiter with the package defaults; loopback dev behavior is unchanged
|
||||
// because the burst comfortably exceeds any single client's request rate.
|
||||
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
|
||||
s := &Server{
|
||||
store: store,
|
||||
@@ -44,6 +74,7 @@ func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server
|
||||
mux: http.NewServeMux(),
|
||||
authMode: authMode,
|
||||
nonces: newNonceCache(nonceTTL),
|
||||
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
||||
}
|
||||
s.routes()
|
||||
return s
|
||||
@@ -53,23 +84,53 @@ func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server
|
||||
// (signature verification + anti-replay + allowlist) ahead of the router
|
||||
// according to authMode, then dispatches to the matched handler.
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now()
|
||||
|
||||
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
|
||||
// shed at the cheapest possible point. The health probe is exempt so liveness
|
||||
// checks are never throttled.
|
||||
if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
|
||||
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
|
||||
return
|
||||
}
|
||||
|
||||
// Cap how much body we will buffer, BEFORE reading a single byte. The ceiling
|
||||
// is per-route: /blobs may legitimately carry a media ciphertext, everything
|
||||
// else is small JSON. A declared Content-Length over the ceiling is rejected
|
||||
// outright (no buffering); MaxBytesReader then guards against a lying or
|
||||
// chunked sender by failing the read once the limit is crossed. This is the
|
||||
// fix for the pre-auth DoS: without it an unauthenticated peer could make the
|
||||
// server buffer an unbounded body in RAM before authenticate() ever ran.
|
||||
limit := int64(maxControlBodyBytes)
|
||||
if r.Method == http.MethodPost && r.URL.Path == "/blobs" {
|
||||
limit = int64(maxBlobBytes)
|
||||
}
|
||||
if r.ContentLength > limit {
|
||||
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
||||
return
|
||||
}
|
||||
r.Body = http.MaxBytesReader(w, r.Body, limit)
|
||||
|
||||
if s.authMode == AuthOff || isAuthExempt(r) {
|
||||
s.mux.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Buffer the body so the signature can be verified over it and the handler
|
||||
// still reads it. Bodies on the control plane are small (JSON metadata or a
|
||||
// media blob already capped upstream), so full buffering is acceptable.
|
||||
// Buffer the (now bounded) body so the signature can be verified over it and
|
||||
// the handler still reads it.
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
|
||||
if isBodyTooLarge(err) {
|
||||
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
||||
return
|
||||
}
|
||||
writeErr(w, http.StatusBadRequest, "read body")
|
||||
return
|
||||
}
|
||||
_ = r.Body.Close()
|
||||
r.Body = io.NopCloser(bytes.NewReader(body))
|
||||
|
||||
if _, err := s.authenticate(r, body, time.Now()); err != nil {
|
||||
if _, err := s.authenticate(r, body, now); err != nil {
|
||||
if s.authMode == AuthSoft {
|
||||
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
|
||||
s.mux.ServeHTTP(w, r)
|
||||
@@ -81,6 +142,13 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
s.mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
|
||||
// when the body exceeds its limit, so the middleware can map it to 413.
|
||||
func isBodyTooLarge(err error) bool {
|
||||
var maxErr *http.MaxBytesError
|
||||
return errors.As(err, &maxErr)
|
||||
}
|
||||
|
||||
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
|
||||
// Only the unauthenticated health probe qualifies: it carries no data and is
|
||||
// needed by load balancers / smoke checks / systemd before any identity exists.
|
||||
@@ -401,9 +469,18 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) {
|
||||
// The body arrives already bounded: ServeHTTP wraps it in a MaxBytesReader
|
||||
// (maxBlobBytes) and rejects an over-declared Content-Length before this
|
||||
// handler runs, in every auth mode. Reading here therefore cannot buffer
|
||||
// more than the ceiling; a sender that lies about its length (e.g. chunked)
|
||||
// trips MaxBytesReader and we map that to 413 rather than a generic 400.
|
||||
data, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
|
||||
if isBodyTooLarge(err) {
|
||||
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
||||
return
|
||||
}
|
||||
writeErr(w, http.StatusBadRequest, "read body")
|
||||
return
|
||||
}
|
||||
hash, err := s.blobs.Put(data)
|
||||
|
||||
Reference in New Issue
Block a user