Compare commits
12 Commits
fb0291ad8a
...
df3b62a601
| Author | SHA1 | Date | |
|---|---|---|---|
| df3b62a601 | |||
| 6976537842 | |||
| a4bbe8209b | |||
| 87ef52cc80 | |||
| a2ec78c81d | |||
| d01da9d396 | |||
| db8618ddc3 | |||
| e7d59fd01d | |||
| 0f79708338 | |||
| ef3af6dfd1 | |||
| 88b47912bd | |||
| a3ac58fb70 |
@@ -2,7 +2,7 @@
|
||||
name: unibus
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.6.0
|
||||
version: 0.7.0
|
||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||
tags: [service, messaging, nats, e2e]
|
||||
uses_functions:
|
||||
@@ -154,6 +154,29 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v0.7.0 (2026-06-07) — hardening de seguridad 2 (issue 0005, fases 0005a–0005e)
|
||||
que cierra los hallazgos nuevos de la re-auditoría red-team (report 0006) y
|
||||
lleva el veredicto de exposición pública a "sí-con-condiciones". (0005a) Bump de
|
||||
`github.com/nats-io/nats-server/v2` v2.10.22→v2.11.15 y de la toolchain a
|
||||
go1.26.4: `govulncheck ./...` pasa de 16 vulnerabilidades alcanzables (14 del
|
||||
servidor NATS embebido + 2 de la stdlib) a 0. (0005b) `client.processFrame`
|
||||
ahora descarta cualquier frame sin firma en una room `SignMsgs` (antes verificaba
|
||||
solo si la firma venía presente, lo que permitía suplantar `Sender` con
|
||||
`Sig==nil`). (0005c) Nuevo limiter global de bytes en vuelo
|
||||
(`pkg/membership.inflightLimiter`) que acota la memoria agregada que el control
|
||||
plane bufferiza bajo concurrencia (el límite por-request y el rate-limit por-IP
|
||||
no acotaban el total): un flood concurrente multi-IP se descarta con 503 en vez
|
||||
de crecer sin techo (el RSS deja de escalar con N). (0005d) El guard de arranque
|
||||
`validateBootConfig` ahora exige `--tls-cert/--tls-key` en bind no-loopback (un
|
||||
control plane público sin TLS servía metadata en claro). (0005e) Se cablea por
|
||||
fin en `membershipd` la ACL por subject que ya existía huérfana desde 0003e
|
||||
(`busauth.NewNkeyAuthenticatorACL` + nuevo adaptador `busauth.PermissionsFromSubjects`
|
||||
sobre `membership.SubjectACLFor`): un registrado no-miembro ya no puede
|
||||
`Subscribe(">")` y captar los subjects/advisories de rooms ajenas. Residuales
|
||||
documentados: `$JS.API.>` sigue compartido (cierre completo = NATS accounts por
|
||||
identidad, diferido) y los clientes deben `RefreshSession` tras cambios de
|
||||
membresía (chat/worker aún no lo hacen). El comportamiento de un solo nodo no
|
||||
cambia y master sigue verde.
|
||||
- v0.6.0 (2026-06-07) — descentralización / alta disponibilidad (issue 0003,
|
||||
fases 0003a–0003e), report 0006. El servidor NATS embebido gana soporte de
|
||||
cluster con routes autenticadas (secreto de cluster) y TLS mutuo de nodo
|
||||
|
||||
@@ -43,9 +43,12 @@ func isLoopbackBind(bind string) bool {
|
||||
// configuration that would expose the bus without enforced authentication:
|
||||
//
|
||||
// - a non-loopback --bind without --bus-auth enforce (the data plane and
|
||||
// control plane would both accept anyone), and
|
||||
// control plane would both accept anyone),
|
||||
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
|
||||
// but authenticates no one — encrypted access for everybody is still open).
|
||||
// but authenticates no one — encrypted access for everybody is still open), and
|
||||
// - a non-loopback --bind WITHOUT --tls-cert/--tls-key (the control plane would
|
||||
// serve metadata over plaintext HTTP publicly — audit H5 reappearing, the N4
|
||||
// gap the re-audit found: TLS was available but not mandatory).
|
||||
//
|
||||
// It is a pure function of the parsed flags so the command can fail fast at
|
||||
// startup and tests can assert the policy without booting a server.
|
||||
@@ -60,6 +63,11 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
|
||||
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
|
||||
mode)
|
||||
}
|
||||
if !isLoopbackBind(bind) && (tlsCert == "" || tlsKey == "") {
|
||||
return fmt.Errorf(
|
||||
"refusing to start: --bind %q is not loopback but --tls-cert/--tls-key are not both set; a public control plane must serve HTTPS or its metadata (subjects, pubkeys, sealed keys, the social graph) travels in cleartext to a network MITM (audit H5/N4) — provide a CA-signed --tls-cert/--tls-key, or bind 127.0.0.1 for local dev",
|
||||
bind)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -30,6 +30,31 @@ func TestAudit_FailOpenTLSWithoutAuth(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestGap_PublicEnforceNoTLS ports the re-auditor's N4 gap: the H2 guard refused
|
||||
// "public without enforce" and "TLS without enforce", but ALLOWED a public bind
|
||||
// with enforce and NO --tls-cert, so the control plane served metadata over
|
||||
// plaintext HTTP publicly (H5 reappearing). The guard now refuses it.
|
||||
func TestGap_PublicEnforceNoTLS(t *testing.T) {
|
||||
// The exact auditor configuration: public bind, enforce on, no TLS cert/key.
|
||||
err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "", "")
|
||||
if err == nil {
|
||||
t.Fatalf("public bind + enforce + NO --tls-cert must be refused: the control plane would serve plaintext HTTP publicly (audit N4)")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "tls-cert") {
|
||||
t.Fatalf("error should point the operator at --tls-cert/--tls-key, got: %v", err)
|
||||
}
|
||||
|
||||
// Golden: the same public+enforce config WITH a cert/key is allowed.
|
||||
if err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "server.crt", "server.key"); err != nil {
|
||||
t.Fatalf("public + enforce + TLS is the intended production config, got: %v", err)
|
||||
}
|
||||
|
||||
// Edge: loopback without TLS stays allowed (local dev is not a public exposure).
|
||||
if err := validateBootConfig("127.0.0.1", membership.AuthOff, "", ""); err != nil {
|
||||
t.Fatalf("loopback dev without TLS must remain allowed, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBootConfigPolicy is the full table: the golden secure-public config is
|
||||
// allowed, dev loopback is allowed, and every fail-open shape is refused.
|
||||
func TestBootConfigPolicy(t *testing.T) {
|
||||
@@ -41,19 +66,25 @@ func TestBootConfigPolicy(t *testing.T) {
|
||||
key string
|
||||
wantErr bool
|
||||
}{
|
||||
// Golden: the intended public production config.
|
||||
// Golden: the intended public production config — enforce AND TLS.
|
||||
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
|
||||
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", false},
|
||||
// Edge: local dev on loopback may stay open (no auth, no TLS).
|
||||
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
|
||||
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
|
||||
{"localhost+off", "localhost", membership.AuthOff, "", "", false},
|
||||
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
|
||||
// Edge: loopback with full enforce+TLS is also fine.
|
||||
{"loopback+enforce+tls", "127.0.0.1", membership.AuthEnforce, "s.crt", "s.key", false},
|
||||
// Error: public bind without enforce.
|
||||
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
|
||||
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
|
||||
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
|
||||
{"empty-bind+off", "", membership.AuthOff, "", "", true},
|
||||
// Error (N4): public bind + enforce but NO TLS -> plaintext control plane.
|
||||
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", true},
|
||||
{"public+enforce+certonly", "0.0.0.0", membership.AuthEnforce, "s.crt", "", true},
|
||||
{"public+enforce+keyonly", "0.0.0.0", membership.AuthEnforce, "", "s.key", true},
|
||||
{"lan-ip+enforce+notls", "192.168.1.10", membership.AuthEnforce, "", "", true},
|
||||
// Error: TLS flags without enforce (cert or key alone is enough to trip it).
|
||||
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
|
||||
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
|
||||
|
||||
+12
-2
@@ -138,8 +138,18 @@ func main() {
|
||||
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
|
||||
}
|
||||
if authMode == membership.AuthEnforce {
|
||||
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
||||
log.Printf("NATS nkey authentication: ON (enforce)")
|
||||
// Per-subject data-plane ACL (audit H4 / N4 residual): the authenticator
|
||||
// authorizes by the bus allowlist AND confines each connection to the
|
||||
// subjects of the rooms it belongs to (plus client-infra subjects). This
|
||||
// closes the wildcard metadata leak where a registered non-member could
|
||||
// Subscribe(">") and harvest every room's subject and JetStream activity.
|
||||
// NATS freezes permissions at connect time, so a peer that joins a room
|
||||
// after connecting must client.RefreshSession to gain that room's subject.
|
||||
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
|
||||
store.IsAuthorized,
|
||||
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
|
||||
)
|
||||
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
|
||||
}
|
||||
if *tlsCert != "" || *tlsKey != "" {
|
||||
if *tlsCert == "" || *tlsKey == "" {
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
---
|
||||
issue: 0005
|
||||
title: Hardening 2 — CVEs, spoof por firma omitida, DoS por concurrencia, TLS forzado (re-auditoría)
|
||||
status: spec
|
||||
status: done
|
||||
created: 2026-06-07
|
||||
completed: 2026-06-07
|
||||
domain: security
|
||||
scope: unibus (go.mod, pkg/client, pkg/membership/server.go, cmd/membershipd/config.go, pkg/embeddednats, pkg/blobstore)
|
||||
depends_on: 0001, 0004 (cierra los hallazgos NUEVOS de la re-auditoría sobre lo entregado)
|
||||
|
||||
@@ -1,26 +1,28 @@
|
||||
module github.com/enmanuel/unibus
|
||||
|
||||
go 1.25.0
|
||||
go 1.26.4
|
||||
|
||||
replace fn-registry => ../../../../
|
||||
|
||||
require (
|
||||
fn-registry v0.0.0-00010101000000-000000000000
|
||||
github.com/nats-io/nats-server/v2 v2.10.22
|
||||
github.com/nats-io/nats.go v1.37.0
|
||||
github.com/nats-io/nkeys v0.4.7
|
||||
github.com/nats-io/nats-server/v2 v2.11.15
|
||||
github.com/nats-io/nats.go v1.49.0
|
||||
github.com/nats-io/nkeys v0.4.15
|
||||
github.com/oklog/ulid/v2 v2.1.0
|
||||
golang.org/x/time v0.7.0
|
||||
golang.org/x/time v0.15.0
|
||||
modernc.org/sqlite v1.47.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/go-tpm v0.9.8 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/klauspost/compress v1.18.3 // indirect
|
||||
github.com/klauspost/compress v1.18.4 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/minio/highwayhash v1.0.3 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.5.8 // indirect
|
||||
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.8.1 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
@@ -29,7 +31,6 @@ require (
|
||||
golang.org/x/mod v0.36.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.44.0 // indirect
|
||||
golang.org/x/text v0.37.0 // indirect
|
||||
golang.org/x/tools v0.45.0 // indirect
|
||||
modernc.org/libc v1.70.0 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
|
||||
@@ -1,27 +1,31 @@
|
||||
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op h1:kpBdlEPbRvff0mDD1gk7o9BhI16b9p5yYAXRlidpqJE=
|
||||
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
|
||||
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
|
||||
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
|
||||
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
|
||||
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
||||
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
|
||||
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
|
||||
github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA=
|
||||
github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk=
|
||||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
||||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
|
||||
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
||||
github.com/nats-io/jwt/v2 v2.8.1 h1:V0xpGuD/N8Mi+fQNDynXohVvp7ZztevW5io8CUWlPmU=
|
||||
github.com/nats-io/jwt/v2 v2.8.1/go.mod h1:nWnOEEiVMiKHQpnAy4eXlizVEtSfzacZ1Q43LIRavZg=
|
||||
github.com/nats-io/nats-server/v2 v2.11.15 h1:StSf9TINInaZtr4oww2+kXmfwa9SkN//g/LwS19/UJ0=
|
||||
github.com/nats-io/nats-server/v2 v2.11.15/go.mod h1:zwhv8Y0PE3KHyKgznJc/9Xoai638SaJd83zzJ5GJn74=
|
||||
github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
|
||||
github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
|
||||
github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
|
||||
github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
||||
@@ -43,10 +47,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
|
||||
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
|
||||
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
|
||||
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
|
||||
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
|
||||
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
|
||||
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package busauth
|
||||
|
||||
import server "github.com/nats-io/nats-server/v2/server"
|
||||
|
||||
// PermissionsFromSubjects adapts a subject-deriving function (e.g.
|
||||
// membership.SubjectACLFor, which maps an identity to the subjects of the rooms
|
||||
// it belongs to plus the client infrastructure subjects) into the PermissionsFunc
|
||||
// the ACL authenticator expects. The derived subjects are granted as BOTH the
|
||||
// publish and subscribe allow set, so a connection can only pub/sub on the
|
||||
// subjects it is entitled to. A derivation error is propagated so the caller
|
||||
// fails closed (denies the connection) rather than granting open access.
|
||||
//
|
||||
// This is the production wiring for the per-subject data-plane ACL (issue 0003e,
|
||||
// audit H4): membershipd passes PermissionsFromSubjects(membership.SubjectACLFor(
|
||||
// store)) to NewNkeyAuthenticatorACL. It lives in busauth (not membership) so the
|
||||
// membership package stays free of the nats-server dependency.
|
||||
func PermissionsFromSubjects(derive func(signPubHex string) ([]string, error)) PermissionsFunc {
|
||||
return func(signPubHex string) (*server.Permissions, error) {
|
||||
subjects, err := derive(signPubHex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sp := &server.SubjectPermission{Allow: subjects}
|
||||
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
|
||||
}
|
||||
}
|
||||
+11
-1
@@ -799,7 +799,17 @@ func (c *Client) processFrame(roomID string, info roomView, data []byte, handler
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if info.Policy.SignMsgs && f.Sig != nil {
|
||||
// A room with SignMsgs REQUIRES a signature, so an unsigned frame is
|
||||
// unauthenticated and must be dropped — not silently accepted. The previous
|
||||
// `&& f.Sig != nil` guard verified the signature only when one was present, so
|
||||
// an attacker with data-plane access could publish a frame with Sig==nil and a
|
||||
// forged Sender and have the receiver accept it as authentic in a room that
|
||||
// demands signatures (audit N3, report 0006). Requiring the signature first
|
||||
// closes that spoof.
|
||||
if info.Policy.SignMsgs {
|
||||
if f.Sig == nil {
|
||||
return // signature required by room policy but absent: drop
|
||||
}
|
||||
pub, err := c.signerPub(roomID, f.Sender)
|
||||
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
|
||||
return // unauthenticated frame: drop
|
||||
|
||||
@@ -0,0 +1,154 @@
|
||||
package client_test
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// TestReaudit_SigNilSpoof ports the re-auditor's N3 (Alto) finding: in a room
|
||||
// that REQUIRES per-message signatures, an attacker with data-plane access
|
||||
// publishes a raw frame with Sig==nil and a forged Sender. Before the fix
|
||||
// processFrame verified the signature only when one was present
|
||||
// (`SignMsgs && f.Sig != nil`), so the receiver accepted the unsigned, forged
|
||||
// frame as authentic. The fix drops any unsigned frame in a SignMsgs room.
|
||||
//
|
||||
// Coverage:
|
||||
// - golden: a properly signed frame from a real member IS delivered;
|
||||
// - error : an unsigned frame with a forged Sender in a SignMsgs room is DROPPED;
|
||||
// - edge : a room WITHOUT SignMsgs still delivers an unsigned frame (the drop
|
||||
// is specific to signed rooms, not a blanket reject of unsigned frames).
|
||||
func TestReaudit_SigNilSpoof(t *testing.T) {
|
||||
h := newHarness(t)
|
||||
waitHealth(t, h.ctrlURL)
|
||||
|
||||
alice, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||
if err != nil {
|
||||
t.Fatalf("connect alice: %v", err)
|
||||
}
|
||||
defer alice.Close()
|
||||
bob, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||
if err != nil {
|
||||
t.Fatalf("connect bob: %v", err)
|
||||
}
|
||||
defer bob.Close()
|
||||
|
||||
// A signed-but-NOT-encrypted room: SignMsgs enforces authorship, and the lack
|
||||
// of encryption is exactly the case the auditor flagged as Alto (any peer with
|
||||
// the subject can forge a sender if signatures are not strictly required).
|
||||
const subject = "room.signed.spoof"
|
||||
signedPolicy := room.Policy{Encrypt: false, Persist: false, SignMsgs: true}
|
||||
roomID, err := alice.CreateRoom(subject, signedPolicy)
|
||||
if err != nil {
|
||||
t.Fatalf("alice create signed room: %v", err)
|
||||
}
|
||||
if err := alice.Invite(roomID, bob.Endpoint()); err != nil {
|
||||
t.Fatalf("alice invite bob: %v", err)
|
||||
}
|
||||
if err := bob.Join(roomID); err != nil {
|
||||
t.Fatalf("bob join: %v", err)
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var got []string
|
||||
sub, err := bob.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||
mu.Lock()
|
||||
got = append(got, string(plaintext))
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("bob subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
// Attacker: a raw NATS connection (the dev harness leaves the data plane open),
|
||||
// no identity, forged Sender, NO signature.
|
||||
const spoofMsg = "I am totally the victim"
|
||||
rawAtk, err := nats.Connect(h.natsURL)
|
||||
if err != nil {
|
||||
t.Fatalf("attacker raw connect: %v", err)
|
||||
}
|
||||
defer rawAtk.Close()
|
||||
spoof := frame.Frame{
|
||||
Type: frame.PUB,
|
||||
Subject: subject,
|
||||
Sender: "victim-forged-endpoint",
|
||||
MsgID: "spoof-1",
|
||||
Epoch: 1,
|
||||
Payload: []byte(spoofMsg),
|
||||
// Sig intentionally nil — this is the attack.
|
||||
}
|
||||
sb, err := spoof.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("marshal spoof: %v", err)
|
||||
}
|
||||
if err := rawAtk.Publish(subject, sb); err != nil {
|
||||
t.Fatalf("attacker publish: %v", err)
|
||||
}
|
||||
_ = rawAtk.Flush()
|
||||
|
||||
// Golden: alice's properly signed frame must be delivered.
|
||||
const goodMsg = "authentic from alice"
|
||||
if err := alice.Publish(roomID, []byte(goodMsg)); err != nil {
|
||||
t.Fatalf("alice publish: %v", err)
|
||||
}
|
||||
if !waitFor(&mu, &got, func(rs []string) bool {
|
||||
for _, r := range rs {
|
||||
if r == goodMsg {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 2*time.Second) {
|
||||
t.Fatalf("a properly signed frame should be delivered; got %v", snapshot(&mu, &got))
|
||||
}
|
||||
|
||||
// Error path: the unsigned, forged frame must NEVER reach the handler.
|
||||
for _, r := range snapshot(&mu, &got) {
|
||||
if r == spoofMsg {
|
||||
t.Fatalf("SIG-NIL SPOOF: receiver accepted an unsigned frame with a forged Sender in a SignMsgs room")
|
||||
}
|
||||
}
|
||||
|
||||
// Edge: a room WITHOUT SignMsgs still delivers an unsigned raw frame, proving
|
||||
// the drop is scoped to signed rooms and did not break the plain-NATS path.
|
||||
const subjectOpen = "room.open.nosig"
|
||||
openRoom, err := alice.CreateRoom(subjectOpen, room.ModeNATS)
|
||||
if err != nil {
|
||||
t.Fatalf("alice create open room: %v", err)
|
||||
}
|
||||
openCol := subscribeCollect(t, alice, openRoom)
|
||||
defer openCol.sub.Unsubscribe()
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
const openMsg = "unsigned but allowed here"
|
||||
openFrame := frame.Frame{
|
||||
Type: frame.PUB,
|
||||
Subject: subjectOpen,
|
||||
Sender: "anyone",
|
||||
MsgID: "open-1",
|
||||
Payload: []byte(openMsg),
|
||||
// no Sig — fine in a non-signed room
|
||||
}
|
||||
ob, _ := openFrame.Marshal()
|
||||
if err := rawAtk.Publish(subjectOpen, ob); err != nil {
|
||||
t.Fatalf("publish open frame: %v", err)
|
||||
}
|
||||
_ = rawAtk.Flush()
|
||||
if !waitFor(&openCol.mu, &openCol.msgs, func(rs []string) bool {
|
||||
for _, r := range rs {
|
||||
if r == openMsg {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 2*time.Second) {
|
||||
t.Fatalf("an unsigned frame in a non-signed room should be delivered; got %v", snapshot(&openCol.mu, &openCol.msgs))
|
||||
}
|
||||
}
|
||||
+103
-11
@@ -39,18 +39,12 @@ func mustID(t *testing.T) cs.Identity {
|
||||
return id
|
||||
}
|
||||
|
||||
// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc
|
||||
// the ACL authenticator expects (same Allow set for publish and subscribe).
|
||||
// aclPermsFunc builds the per-subject PermissionsFunc the ACL authenticator
|
||||
// expects. It delegates to the SAME production wiring membershipd uses
|
||||
// (busauth.PermissionsFromSubjects over membership.SubjectACLFor), so this test
|
||||
// exercises the real path rather than a test-only reimplementation.
|
||||
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
|
||||
derive := membership.SubjectACLFor(store)
|
||||
return func(signPubHex string) (*server.Permissions, error) {
|
||||
subs, err := derive(signPubHex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sp := &server.SubjectPermission{Allow: subs}
|
||||
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
|
||||
}
|
||||
return busauth.PermissionsFromSubjects(membership.SubjectACLFor(store))
|
||||
}
|
||||
|
||||
// startACLNats boots an embedded NATS whose authenticator confines each peer to
|
||||
@@ -219,6 +213,104 @@ func TestSubjectACLIsolation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestReaudit_H4_WildcardMetadataLeak ports the re-auditor's H4 vector. Before
|
||||
// the per-subject ACL was WIRED into membershipd (it existed in pkg/membership and
|
||||
// pkg/busauth but the binary used the plain NewNkeyAuthenticator), a registered
|
||||
// NON-member could open a raw NATS connection, Subscribe(">"), and capture every
|
||||
// room's subject plus JetStream stream/advisory activity — the payload stayed E2E
|
||||
// ciphertext, but the metadata leaked. With NewNkeyAuthenticatorACL wired via the
|
||||
// production path (busauth.PermissionsFromSubjects(membership.SubjectACLFor)), a
|
||||
// non-member is confined to the client-infra subjects, so the wildcard and any
|
||||
// foreign room subject are denied.
|
||||
//
|
||||
// Coverage:
|
||||
// - error : a non-member's Subscribe(">") raises a permission violation;
|
||||
// - edge : a non-member subscribing to another room's exact subject is denied;
|
||||
// - golden: the member still pub/subs her own room, and the non-member never
|
||||
// captures that traffic.
|
||||
//
|
||||
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
|
||||
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
|
||||
// subscribes specifically to "$JS.API.>" can still observe stream-management
|
||||
// requests whose subjects embed the stream name derived from a room id. Fully
|
||||
// closing that needs NATS accounts/permissions isolation per identity (deferred to
|
||||
// the 0003 decentralization line). The high-impact leak the auditor exploited —
|
||||
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
|
||||
// — is closed.
|
||||
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
alice, eve := mustID(t), mustID(t)
|
||||
aliceEP := frame.EndpointID(alice.SignPub)
|
||||
mustAddUser(t, store, alice, "alice")
|
||||
mustAddUser(t, store, eve, "eve") // eve is REGISTERED but never a member of alice's room
|
||||
const subject = "room.e2e.confidential"
|
||||
mustCreateRoom(t, store, "ROOMA", subject, aliceEP, alice)
|
||||
|
||||
srv := startACLNats(t, store)
|
||||
url := srv.ClientURL()
|
||||
|
||||
eveErr := make(chan error, 8)
|
||||
eveNC := nkeyConn(t, url, eve, eveErr)
|
||||
eveAll := make(chan *nats.Msg, 16)
|
||||
|
||||
// Error: eve's wildcard subscription is rejected. nats.go creates the local sub
|
||||
// object and the server rejects it asynchronously (delivered to ErrorHandler).
|
||||
if _, err := eveNC.Subscribe(">", func(m *nats.Msg) { eveAll <- m }); err != nil {
|
||||
t.Fatalf("eve sub >: %v", err)
|
||||
}
|
||||
_ = eveNC.Flush()
|
||||
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||
t.Fatalf("a non-member's Subscribe(\">\") must raise a permissions violation (wildcard metadata leak still open)")
|
||||
}
|
||||
|
||||
// Edge: eve subscribing to the foreign room's EXACT subject is also denied.
|
||||
drain(eveErr)
|
||||
if _, err := eveNC.Subscribe(subject, func(m *nats.Msg) { eveAll <- m }); err != nil {
|
||||
t.Fatalf("eve sub subject: %v", err)
|
||||
}
|
||||
_ = eveNC.Flush()
|
||||
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||
t.Fatalf("a non-member subscribing to another room's subject must be denied")
|
||||
}
|
||||
|
||||
// Golden: alice (the member) pub/subs her own room with no violation, and eve
|
||||
// never captured the traffic despite her (rejected) wildcard.
|
||||
aliceErr := make(chan error, 4)
|
||||
aliceNC := nkeyConn(t, url, alice, aliceErr)
|
||||
aliceGot := make(chan string, 4)
|
||||
if _, err := aliceNC.Subscribe(subject, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil {
|
||||
t.Fatalf("alice sub own room: %v", err)
|
||||
}
|
||||
_ = aliceNC.Flush()
|
||||
if e := waitErr(aliceErr, 300*time.Millisecond); e != nil {
|
||||
t.Fatalf("alice subscribing to her OWN room raised an error: %v", e)
|
||||
}
|
||||
if err := aliceNC.Publish(subject, []byte("members-only metadata")); err != nil {
|
||||
t.Fatalf("alice publish: %v", err)
|
||||
}
|
||||
_ = aliceNC.Flush()
|
||||
select {
|
||||
case got := <-aliceGot:
|
||||
if got != "members-only metadata" {
|
||||
t.Fatalf("alice got %q", got)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("alice did not receive her own room's message")
|
||||
}
|
||||
select {
|
||||
case m := <-eveAll:
|
||||
t.Fatalf("eve captured room traffic despite the ACL: subject=%q data=%q", m.Subject, m.Data)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
// good: eve captured nothing
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
|
||||
// alice is not in room B, so her connection has no permission for its subject;
|
||||
// after she is added to room B and calls RefreshSession, the reconnect
|
||||
|
||||
@@ -0,0 +1,148 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to
|
||||
// call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the
|
||||
// test's own goroutine). Returns 0 when unavailable.
|
||||
func readRSSkBRaw() int64 {
|
||||
b, err := os.ReadFile("/proc/self/status")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
for _, line := range strings.Split(string(b), "\n") {
|
||||
if strings.HasPrefix(line, "VmRSS:") {
|
||||
f := strings.Fields(line)
|
||||
if len(f) >= 2 {
|
||||
v, _ := strconv.ParseInt(f[1], 10, 64)
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the
|
||||
// per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE
|
||||
// memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40
|
||||
// concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the
|
||||
// number of simultaneously-buffered uploads is capped, so the resident set stays
|
||||
// bounded regardless of how many connections arrive at once.
|
||||
//
|
||||
// Coverage:
|
||||
// - golden: a normal upload succeeds, and the server is still healthy after the
|
||||
// storm (the limiter did not wedge it);
|
||||
// - edge : concurrency right at the cap is admitted;
|
||||
// - error : a concurrent flood far past the cap sheds the excess with 503
|
||||
// (backpressure) instead of buffering it all, and the RSS spike stays bounded
|
||||
// and does NOT scale with the number of requests.
|
||||
func TestReaudit_DoSConcurrency(t *testing.T) {
|
||||
if runtime.GOOS != "linux" {
|
||||
t.Skip("RSS probe is Linux-only")
|
||||
}
|
||||
srv := dosServer(t, AuthOff)
|
||||
// Force a small aggregate cap so the bound is observable in a unit test: with
|
||||
// a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses
|
||||
// maxInflightBytes (128 MiB); the mechanism under test is identical.
|
||||
const cap = int64(48) << 20
|
||||
srv.inflight = newInflightLimiter(cap)
|
||||
|
||||
const blob = maxBlobBytes // 16 MiB, the per-request ceiling
|
||||
const n = 40 // the auditor's figure
|
||||
|
||||
// A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a
|
||||
// ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the
|
||||
// delta should stay well under this. Without the limiter, 40 concurrent
|
||||
// uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB).
|
||||
const maxSpikeKB = int64(256) << 10 // 256 MiB
|
||||
|
||||
runtime.GC()
|
||||
before := readRSSkBRaw()
|
||||
|
||||
// Sample peak RSS while the storm runs.
|
||||
var peak int64
|
||||
atomic.StoreInt64(&peak, before)
|
||||
stop := make(chan struct{})
|
||||
var sampler sync.WaitGroup
|
||||
sampler.Add(1)
|
||||
go func() {
|
||||
defer sampler.Done()
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) {
|
||||
atomic.StoreInt64(&peak, v)
|
||||
}
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var got503, got200 int64
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob})
|
||||
req.ContentLength = blob
|
||||
// Distinct source IP per request: this is the multi-IP (botnet) shape the
|
||||
// auditor flagged, where the per-IP rate limit gives no aggregate defense.
|
||||
// The in-flight byte limiter is the global bound that must hold here.
|
||||
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234"
|
||||
rec := httptest.NewRecorder()
|
||||
srv.ServeHTTP(rec, req)
|
||||
switch rec.Code {
|
||||
case http.StatusServiceUnavailable:
|
||||
atomic.AddInt64(&got503, 1)
|
||||
case http.StatusOK:
|
||||
atomic.AddInt64(&got200, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
close(stop)
|
||||
sampler.Wait()
|
||||
|
||||
runtime.GC()
|
||||
delta := atomic.LoadInt64(&peak) - before
|
||||
|
||||
// Error path: the flood must have hit the cap and shed the excess with 503.
|
||||
if got503 == 0 {
|
||||
t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503)
|
||||
}
|
||||
// The aggregate memory must stay bounded — not scale with n.
|
||||
if delta > maxSpikeKB {
|
||||
t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB)
|
||||
}
|
||||
// All reservations released after the storm.
|
||||
if f := srv.inflight.inFlight(); f != 0 {
|
||||
t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f)
|
||||
}
|
||||
|
||||
// Golden: the server is still healthy and serves a normal upload (from a fresh
|
||||
// IP so the per-IP rate limiter, untouched here, is not what we measure).
|
||||
rec := httptest.NewRecorder()
|
||||
gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm"))
|
||||
gReq.RemoteAddr = "203.0.113.9:9999"
|
||||
srv.ServeHTTP(rec, gReq)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String())
|
||||
}
|
||||
|
||||
t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB",
|
||||
n, got200, got503, delta, maxSpikeKB, cap>>20)
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package membership
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
// inflightLimiter is a non-blocking, byte-counting concurrency limiter: a global
|
||||
// cap on how many bytes of request body the server will buffer simultaneously.
|
||||
//
|
||||
// The per-request body ceilings (maxControlBodyBytes / maxBlobBytes) bound a
|
||||
// single request, and the per-IP rate limiter throttles a single source, but
|
||||
// neither bounds the AGGREGATE memory across many concurrent uploads: the
|
||||
// re-audit (report 0006, N2) showed 40 concurrent 16 MiB blob uploads driving
|
||||
// RSS to ~1.42 GB, and a distributed (multi-IP) flood scales without a ceiling
|
||||
// because the rate limiter is per-IP. This limiter is the missing aggregate
|
||||
// bound: ServeHTTP reserves a request's worst-case buffered size before reading
|
||||
// the body and releases it when the request finishes, so the total bytes in
|
||||
// flight can never exceed max regardless of how many connections or source IPs
|
||||
// arrive at once.
|
||||
//
|
||||
// It is intentionally NON-blocking: when a reservation does not fit, the caller
|
||||
// sheds the request with backpressure (503) rather than parking a goroutine,
|
||||
// which would let an attacker exhaust goroutines/connections instead of RAM. The
|
||||
// counter is maintained with sync/atomic (a CAS loop), so it is safe for
|
||||
// concurrent use without a mutex.
|
||||
//
|
||||
// Implementation note: this lives inside unibus rather than the fn-registry
|
||||
// (where a generic concurrency primitive would normally belong) because the
|
||||
// registry's functions/core package pulls in transitive dependencies that
|
||||
// require CGO (mattn/go-sqlite3) and external modules, which are incompatible
|
||||
// with unibus's CGO_ENABLED=0 build, and because this work is scoped to the
|
||||
// unibus sub-repo.
|
||||
type inflightLimiter struct {
|
||||
max int64 // immutable after construction; <= 0 disables the limiter
|
||||
used int64 // bytes currently reserved; accessed ONLY via sync/atomic
|
||||
}
|
||||
|
||||
// newInflightLimiter builds a limiter with a cap of maxBytes bytes in flight.
|
||||
// maxBytes <= 0 disables the cap (tryAcquire always grants), which is the
|
||||
// loopback/dev posture where an aggregate memory ceiling is not wanted.
|
||||
func newInflightLimiter(maxBytes int64) *inflightLimiter {
|
||||
return &inflightLimiter{max: maxBytes}
|
||||
}
|
||||
|
||||
// tryAcquire reserves n bytes without blocking. It returns true and reserves the
|
||||
// bytes when they fit within the cap (used+n <= max), or false (reserving
|
||||
// nothing) when they do not. n <= 0 is granted without reserving, and a disabled
|
||||
// limiter (max <= 0) always grants. Safe for concurrent use.
|
||||
func (l *inflightLimiter) tryAcquire(n int64) bool {
|
||||
if l.max <= 0 || n <= 0 {
|
||||
return true
|
||||
}
|
||||
for {
|
||||
cur := atomic.LoadInt64(&l.used)
|
||||
if cur+n > l.max {
|
||||
return false
|
||||
}
|
||||
if atomic.CompareAndSwapInt64(&l.used, cur, cur+n) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// release returns n previously reserved bytes. It must be paired with a
|
||||
// tryAcquire that granted. A disabled limiter or n <= 0 is a no-op. The counter
|
||||
// never drops below zero (a defensive clamp against an accidental double release).
|
||||
func (l *inflightLimiter) release(n int64) {
|
||||
if l.max <= 0 || n <= 0 {
|
||||
return
|
||||
}
|
||||
for {
|
||||
cur := atomic.LoadInt64(&l.used)
|
||||
nv := cur - n
|
||||
if nv < 0 {
|
||||
nv = 0
|
||||
}
|
||||
if atomic.CompareAndSwapInt64(&l.used, cur, nv) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// inFlight returns the bytes currently reserved. It is observability for tests
|
||||
// and metrics.
|
||||
func (l *inflightLimiter) inFlight() int64 {
|
||||
return atomic.LoadInt64(&l.used)
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestInflightLimiterBasics covers the limiter contract: granting within the cap
|
||||
// (golden), the exact boundary (edge), refusal over the cap without mutating the
|
||||
// counter (error), the disabled mode, and the defensive clamp on over-release.
|
||||
func TestInflightLimiterBasics(t *testing.T) {
|
||||
l := newInflightLimiter(100)
|
||||
|
||||
// Golden: a reservation within the cap is granted and reflected.
|
||||
if !l.tryAcquire(60) {
|
||||
t.Fatalf("acquire 60 within cap 100 should grant")
|
||||
}
|
||||
if l.inFlight() != 60 {
|
||||
t.Fatalf("inFlight = %d, want 60", l.inFlight())
|
||||
}
|
||||
|
||||
// Edge: exactly reaching the cap (60+40 == 100) is granted.
|
||||
if !l.tryAcquire(40) {
|
||||
t.Fatalf("acquire to the exact cap should grant")
|
||||
}
|
||||
if l.inFlight() != 100 {
|
||||
t.Fatalf("inFlight = %d, want 100", l.inFlight())
|
||||
}
|
||||
|
||||
// Error: one more byte over the full cap is refused, and the counter is left
|
||||
// untouched (a refused reservation reserves nothing).
|
||||
if l.tryAcquire(1) {
|
||||
t.Fatalf("acquire over a full cap must be refused")
|
||||
}
|
||||
if l.inFlight() != 100 {
|
||||
t.Fatalf("a refused acquire must not change inFlight; got %d", l.inFlight())
|
||||
}
|
||||
|
||||
// Release frees capacity again.
|
||||
l.release(100)
|
||||
if l.inFlight() != 0 {
|
||||
t.Fatalf("inFlight after full release = %d, want 0", l.inFlight())
|
||||
}
|
||||
|
||||
// Defensive: an over-release never drives the counter negative.
|
||||
l.release(50)
|
||||
if l.inFlight() != 0 {
|
||||
t.Fatalf("over-release must clamp at 0; got %d", l.inFlight())
|
||||
}
|
||||
}
|
||||
|
||||
// TestInflightLimiterDisabled verifies that a non-positive cap disables the
|
||||
// limiter: every reservation is granted and nothing is tracked (the loopback/dev
|
||||
// posture).
|
||||
func TestInflightLimiterDisabled(t *testing.T) {
|
||||
for _, max := range []int64{0, -1} {
|
||||
l := newInflightLimiter(max)
|
||||
if !l.tryAcquire(1 << 30) {
|
||||
t.Fatalf("disabled limiter (max=%d) must always grant", max)
|
||||
}
|
||||
if l.inFlight() != 0 {
|
||||
t.Fatalf("disabled limiter must not track usage; got %d", l.inFlight())
|
||||
}
|
||||
l.release(1 << 30) // no-op, must not panic
|
||||
}
|
||||
}
|
||||
|
||||
// TestInflightLimiterConcurrent hammers the limiter from many goroutines with
|
||||
// equal-sized acquire/release pairs and asserts the invariant never breaks: the
|
||||
// counter returns to 0 and never exceeds the cap. Run with -race for the memory
|
||||
// model guarantee.
|
||||
func TestInflightLimiterConcurrent(t *testing.T) {
|
||||
const cap = 1000
|
||||
const chunk = 7
|
||||
l := newInflightLimiter(cap)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for g := 0; g < 64; g++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 2000; i++ {
|
||||
if l.tryAcquire(chunk) {
|
||||
if f := l.inFlight(); f > cap {
|
||||
t.Errorf("inFlight %d exceeded cap %d", f, cap)
|
||||
return
|
||||
}
|
||||
l.release(chunk)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if l.inFlight() != 0 {
|
||||
t.Fatalf("after all goroutines, inFlight = %d, want 0", l.inFlight())
|
||||
}
|
||||
}
|
||||
@@ -35,6 +35,14 @@ const (
|
||||
// MaxHeaderBytes caps request header size; wired into the http.Server by the
|
||||
// command. Exported so the bound lives next to its body-size siblings.
|
||||
MaxHeaderBytes = 1 << 20 // 1 MiB
|
||||
// maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all
|
||||
// concurrent requests (audit N2). The per-request ceilings above bound one
|
||||
// request; this bounds the sum, so a concurrent (even multi-IP) flood of
|
||||
// max-size uploads cannot drive the resident set without limit. 128 MiB allows
|
||||
// ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before
|
||||
// further POSTs are shed with 503 — generous for an interactive bus, bounded
|
||||
// for an attacker.
|
||||
maxInflightBytes = 128 << 20 // 128 MiB
|
||||
)
|
||||
|
||||
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
|
||||
@@ -62,6 +70,7 @@ type Server struct {
|
||||
authMode AuthMode
|
||||
nonces nonceStore
|
||||
limiter *ipRateLimiter
|
||||
inflight *inflightLimiter
|
||||
|
||||
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
||||
// rooms. It is the minimum-defensive control for the data plane (audit H4):
|
||||
@@ -87,6 +96,7 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
||||
authMode: authMode,
|
||||
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
|
||||
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
||||
inflight: newInflightLimiter(maxInflightBytes),
|
||||
}
|
||||
s.routes()
|
||||
return s
|
||||
@@ -139,6 +149,22 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
r.Body = http.MaxBytesReader(w, r.Body, limit)
|
||||
|
||||
// Aggregate memory bound (audit N2): the per-request ceiling above and the
|
||||
// per-IP rate limit do not cap the TOTAL bytes buffered across concurrent
|
||||
// requests. A POST reserves its worst-case buffered size (its route ceiling)
|
||||
// from a global limiter before the body is read, and is shed with 503 when the
|
||||
// cap is reached, so the resident set stays bounded under a concurrent (even
|
||||
// multi-IP) upload flood instead of growing linearly with the number of
|
||||
// connections. Reservation is released when the request finishes. Only POSTs
|
||||
// buffer a body; GETs carry none, so they do not consume the budget.
|
||||
if r.Method == http.MethodPost {
|
||||
if !s.inflight.tryAcquire(limit) {
|
||||
writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight")
|
||||
return
|
||||
}
|
||||
defer s.inflight.release(limit)
|
||||
}
|
||||
|
||||
if s.authMode == AuthOff || isAuthExempt(r) {
|
||||
s.mux.ServeHTTP(w, r)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user