Compare commits
12 Commits
fb0291ad8a
...
df3b62a601
| Author | SHA1 | Date | |
|---|---|---|---|
| df3b62a601 | |||
| 6976537842 | |||
| a4bbe8209b | |||
| 87ef52cc80 | |||
| a2ec78c81d | |||
| d01da9d396 | |||
| db8618ddc3 | |||
| e7d59fd01d | |||
| 0f79708338 | |||
| ef3af6dfd1 | |||
| 88b47912bd | |||
| a3ac58fb70 |
@@ -2,7 +2,7 @@
|
|||||||
name: unibus
|
name: unibus
|
||||||
lang: go
|
lang: go
|
||||||
domain: infra
|
domain: infra
|
||||||
version: 0.6.0
|
version: 0.7.0
|
||||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||||
tags: [service, messaging, nats, e2e]
|
tags: [service, messaging, nats, e2e]
|
||||||
uses_functions:
|
uses_functions:
|
||||||
@@ -154,6 +154,29 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
|||||||
|
|
||||||
## Capability growth log
|
## Capability growth log
|
||||||
|
|
||||||
|
- v0.7.0 (2026-06-07) — hardening de seguridad 2 (issue 0005, fases 0005a–0005e)
|
||||||
|
que cierra los hallazgos nuevos de la re-auditoría red-team (report 0006) y
|
||||||
|
lleva el veredicto de exposición pública a "sí-con-condiciones". (0005a) Bump de
|
||||||
|
`github.com/nats-io/nats-server/v2` v2.10.22→v2.11.15 y de la toolchain a
|
||||||
|
go1.26.4: `govulncheck ./...` pasa de 16 vulnerabilidades alcanzables (14 del
|
||||||
|
servidor NATS embebido + 2 de la stdlib) a 0. (0005b) `client.processFrame`
|
||||||
|
ahora descarta cualquier frame sin firma en una room `SignMsgs` (antes verificaba
|
||||||
|
solo si la firma venía presente, lo que permitía suplantar `Sender` con
|
||||||
|
`Sig==nil`). (0005c) Nuevo limiter global de bytes en vuelo
|
||||||
|
(`pkg/membership.inflightLimiter`) que acota la memoria agregada que el control
|
||||||
|
plane bufferiza bajo concurrencia (el límite por-request y el rate-limit por-IP
|
||||||
|
no acotaban el total): un flood concurrente multi-IP se descarta con 503 en vez
|
||||||
|
de crecer sin techo (el RSS deja de escalar con N). (0005d) El guard de arranque
|
||||||
|
`validateBootConfig` ahora exige `--tls-cert/--tls-key` en bind no-loopback (un
|
||||||
|
control plane público sin TLS servía metadata en claro). (0005e) Se cablea por
|
||||||
|
fin en `membershipd` la ACL por subject que ya existía huérfana desde 0003e
|
||||||
|
(`busauth.NewNkeyAuthenticatorACL` + nuevo adaptador `busauth.PermissionsFromSubjects`
|
||||||
|
sobre `membership.SubjectACLFor`): un registrado no-miembro ya no puede
|
||||||
|
`Subscribe(">")` y captar los subjects/advisories de rooms ajenas. Residuales
|
||||||
|
documentados: `$JS.API.>` sigue compartido (cierre completo = NATS accounts por
|
||||||
|
identidad, diferido) y los clientes deben `RefreshSession` tras cambios de
|
||||||
|
membresía (chat/worker aún no lo hacen). El comportamiento de un solo nodo no
|
||||||
|
cambia y master sigue verde.
|
||||||
- v0.6.0 (2026-06-07) — descentralización / alta disponibilidad (issue 0003,
|
- v0.6.0 (2026-06-07) — descentralización / alta disponibilidad (issue 0003,
|
||||||
fases 0003a–0003e), report 0006. El servidor NATS embebido gana soporte de
|
fases 0003a–0003e), report 0006. El servidor NATS embebido gana soporte de
|
||||||
cluster con routes autenticadas (secreto de cluster) y TLS mutuo de nodo
|
cluster con routes autenticadas (secreto de cluster) y TLS mutuo de nodo
|
||||||
|
|||||||
@@ -43,9 +43,12 @@ func isLoopbackBind(bind string) bool {
|
|||||||
// configuration that would expose the bus without enforced authentication:
|
// configuration that would expose the bus without enforced authentication:
|
||||||
//
|
//
|
||||||
// - a non-loopback --bind without --bus-auth enforce (the data plane and
|
// - a non-loopback --bind without --bus-auth enforce (the data plane and
|
||||||
// control plane would both accept anyone), and
|
// control plane would both accept anyone),
|
||||||
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
|
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
|
||||||
// but authenticates no one — encrypted access for everybody is still open).
|
// but authenticates no one — encrypted access for everybody is still open), and
|
||||||
|
// - a non-loopback --bind WITHOUT --tls-cert/--tls-key (the control plane would
|
||||||
|
// serve metadata over plaintext HTTP publicly — audit H5 reappearing, the N4
|
||||||
|
// gap the re-audit found: TLS was available but not mandatory).
|
||||||
//
|
//
|
||||||
// It is a pure function of the parsed flags so the command can fail fast at
|
// It is a pure function of the parsed flags so the command can fail fast at
|
||||||
// startup and tests can assert the policy without booting a server.
|
// startup and tests can assert the policy without booting a server.
|
||||||
@@ -60,6 +63,11 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
|
|||||||
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
|
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
|
||||||
mode)
|
mode)
|
||||||
}
|
}
|
||||||
|
if !isLoopbackBind(bind) && (tlsCert == "" || tlsKey == "") {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"refusing to start: --bind %q is not loopback but --tls-cert/--tls-key are not both set; a public control plane must serve HTTPS or its metadata (subjects, pubkeys, sealed keys, the social graph) travels in cleartext to a network MITM (audit H5/N4) — provide a CA-signed --tls-cert/--tls-key, or bind 127.0.0.1 for local dev",
|
||||||
|
bind)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,31 @@ func TestAudit_FailOpenTLSWithoutAuth(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestGap_PublicEnforceNoTLS ports the re-auditor's N4 gap: the H2 guard refused
|
||||||
|
// "public without enforce" and "TLS without enforce", but ALLOWED a public bind
|
||||||
|
// with enforce and NO --tls-cert, so the control plane served metadata over
|
||||||
|
// plaintext HTTP publicly (H5 reappearing). The guard now refuses it.
|
||||||
|
func TestGap_PublicEnforceNoTLS(t *testing.T) {
|
||||||
|
// The exact auditor configuration: public bind, enforce on, no TLS cert/key.
|
||||||
|
err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "", "")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("public bind + enforce + NO --tls-cert must be refused: the control plane would serve plaintext HTTP publicly (audit N4)")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "tls-cert") {
|
||||||
|
t.Fatalf("error should point the operator at --tls-cert/--tls-key, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Golden: the same public+enforce config WITH a cert/key is allowed.
|
||||||
|
if err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "server.crt", "server.key"); err != nil {
|
||||||
|
t.Fatalf("public + enforce + TLS is the intended production config, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: loopback without TLS stays allowed (local dev is not a public exposure).
|
||||||
|
if err := validateBootConfig("127.0.0.1", membership.AuthOff, "", ""); err != nil {
|
||||||
|
t.Fatalf("loopback dev without TLS must remain allowed, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestBootConfigPolicy is the full table: the golden secure-public config is
|
// TestBootConfigPolicy is the full table: the golden secure-public config is
|
||||||
// allowed, dev loopback is allowed, and every fail-open shape is refused.
|
// allowed, dev loopback is allowed, and every fail-open shape is refused.
|
||||||
func TestBootConfigPolicy(t *testing.T) {
|
func TestBootConfigPolicy(t *testing.T) {
|
||||||
@@ -41,19 +66,25 @@ func TestBootConfigPolicy(t *testing.T) {
|
|||||||
key string
|
key string
|
||||||
wantErr bool
|
wantErr bool
|
||||||
}{
|
}{
|
||||||
// Golden: the intended public production config.
|
// Golden: the intended public production config — enforce AND TLS.
|
||||||
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
|
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
|
||||||
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", false},
|
|
||||||
// Edge: local dev on loopback may stay open (no auth, no TLS).
|
// Edge: local dev on loopback may stay open (no auth, no TLS).
|
||||||
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
|
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
|
||||||
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
|
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
|
||||||
{"localhost+off", "localhost", membership.AuthOff, "", "", false},
|
{"localhost+off", "localhost", membership.AuthOff, "", "", false},
|
||||||
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
|
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
|
||||||
|
// Edge: loopback with full enforce+TLS is also fine.
|
||||||
|
{"loopback+enforce+tls", "127.0.0.1", membership.AuthEnforce, "s.crt", "s.key", false},
|
||||||
// Error: public bind without enforce.
|
// Error: public bind without enforce.
|
||||||
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
|
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
|
||||||
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
|
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
|
||||||
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
|
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
|
||||||
{"empty-bind+off", "", membership.AuthOff, "", "", true},
|
{"empty-bind+off", "", membership.AuthOff, "", "", true},
|
||||||
|
// Error (N4): public bind + enforce but NO TLS -> plaintext control plane.
|
||||||
|
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", true},
|
||||||
|
{"public+enforce+certonly", "0.0.0.0", membership.AuthEnforce, "s.crt", "", true},
|
||||||
|
{"public+enforce+keyonly", "0.0.0.0", membership.AuthEnforce, "", "s.key", true},
|
||||||
|
{"lan-ip+enforce+notls", "192.168.1.10", membership.AuthEnforce, "", "", true},
|
||||||
// Error: TLS flags without enforce (cert or key alone is enough to trip it).
|
// Error: TLS flags without enforce (cert or key alone is enough to trip it).
|
||||||
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
|
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
|
||||||
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
|
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
|
||||||
|
|||||||
+12
-2
@@ -138,8 +138,18 @@ func main() {
|
|||||||
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
|
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
|
||||||
}
|
}
|
||||||
if authMode == membership.AuthEnforce {
|
if authMode == membership.AuthEnforce {
|
||||||
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
// Per-subject data-plane ACL (audit H4 / N4 residual): the authenticator
|
||||||
log.Printf("NATS nkey authentication: ON (enforce)")
|
// authorizes by the bus allowlist AND confines each connection to the
|
||||||
|
// subjects of the rooms it belongs to (plus client-infra subjects). This
|
||||||
|
// closes the wildcard metadata leak where a registered non-member could
|
||||||
|
// Subscribe(">") and harvest every room's subject and JetStream activity.
|
||||||
|
// NATS freezes permissions at connect time, so a peer that joins a room
|
||||||
|
// after connecting must client.RefreshSession to gain that room's subject.
|
||||||
|
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
|
||||||
|
store.IsAuthorized,
|
||||||
|
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
|
||||||
|
)
|
||||||
|
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
|
||||||
}
|
}
|
||||||
if *tlsCert != "" || *tlsKey != "" {
|
if *tlsCert != "" || *tlsKey != "" {
|
||||||
if *tlsCert == "" || *tlsKey == "" {
|
if *tlsCert == "" || *tlsKey == "" {
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
---
|
---
|
||||||
issue: 0005
|
issue: 0005
|
||||||
title: Hardening 2 — CVEs, spoof por firma omitida, DoS por concurrencia, TLS forzado (re-auditoría)
|
title: Hardening 2 — CVEs, spoof por firma omitida, DoS por concurrencia, TLS forzado (re-auditoría)
|
||||||
status: spec
|
status: done
|
||||||
created: 2026-06-07
|
created: 2026-06-07
|
||||||
|
completed: 2026-06-07
|
||||||
domain: security
|
domain: security
|
||||||
scope: unibus (go.mod, pkg/client, pkg/membership/server.go, cmd/membershipd/config.go, pkg/embeddednats, pkg/blobstore)
|
scope: unibus (go.mod, pkg/client, pkg/membership/server.go, cmd/membershipd/config.go, pkg/embeddednats, pkg/blobstore)
|
||||||
depends_on: 0001, 0004 (cierra los hallazgos NUEVOS de la re-auditoría sobre lo entregado)
|
depends_on: 0001, 0004 (cierra los hallazgos NUEVOS de la re-auditoría sobre lo entregado)
|
||||||
|
|||||||
@@ -1,26 +1,28 @@
|
|||||||
module github.com/enmanuel/unibus
|
module github.com/enmanuel/unibus
|
||||||
|
|
||||||
go 1.25.0
|
go 1.26.4
|
||||||
|
|
||||||
replace fn-registry => ../../../../
|
replace fn-registry => ../../../../
|
||||||
|
|
||||||
require (
|
require (
|
||||||
fn-registry v0.0.0-00010101000000-000000000000
|
fn-registry v0.0.0-00010101000000-000000000000
|
||||||
github.com/nats-io/nats-server/v2 v2.10.22
|
github.com/nats-io/nats-server/v2 v2.11.15
|
||||||
github.com/nats-io/nats.go v1.37.0
|
github.com/nats-io/nats.go v1.49.0
|
||||||
github.com/nats-io/nkeys v0.4.7
|
github.com/nats-io/nkeys v0.4.15
|
||||||
github.com/oklog/ulid/v2 v2.1.0
|
github.com/oklog/ulid/v2 v2.1.0
|
||||||
golang.org/x/time v0.7.0
|
golang.org/x/time v0.15.0
|
||||||
modernc.org/sqlite v1.47.0
|
modernc.org/sqlite v1.47.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op // indirect
|
||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
|
github.com/google/go-tpm v0.9.8 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/klauspost/compress v1.18.3 // indirect
|
github.com/klauspost/compress v1.18.4 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/minio/highwayhash v1.0.3 // indirect
|
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
|
||||||
github.com/nats-io/jwt/v2 v2.5.8 // indirect
|
github.com/nats-io/jwt/v2 v2.8.1 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
@@ -29,7 +31,6 @@ require (
|
|||||||
golang.org/x/mod v0.36.0 // indirect
|
golang.org/x/mod v0.36.0 // indirect
|
||||||
golang.org/x/sync v0.20.0 // indirect
|
golang.org/x/sync v0.20.0 // indirect
|
||||||
golang.org/x/sys v0.44.0 // indirect
|
golang.org/x/sys v0.44.0 // indirect
|
||||||
golang.org/x/text v0.37.0 // indirect
|
|
||||||
golang.org/x/tools v0.45.0 // indirect
|
golang.org/x/tools v0.45.0 // indirect
|
||||||
modernc.org/libc v1.70.0 // indirect
|
modernc.org/libc v1.70.0 // indirect
|
||||||
modernc.org/mathutil v1.7.1 // indirect
|
modernc.org/mathutil v1.7.1 // indirect
|
||||||
|
|||||||
@@ -1,27 +1,31 @@
|
|||||||
|
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op h1:kpBdlEPbRvff0mDD1gk7o9BhI16b9p5yYAXRlidpqJE=
|
||||||
|
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
|
||||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
|
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
|
||||||
|
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
|
||||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||||
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
|
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
|
||||||
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
|
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
|
||||||
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
||||||
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
|
github.com/nats-io/jwt/v2 v2.8.1 h1:V0xpGuD/N8Mi+fQNDynXohVvp7ZztevW5io8CUWlPmU=
|
||||||
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
|
github.com/nats-io/jwt/v2 v2.8.1/go.mod h1:nWnOEEiVMiKHQpnAy4eXlizVEtSfzacZ1Q43LIRavZg=
|
||||||
github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA=
|
github.com/nats-io/nats-server/v2 v2.11.15 h1:StSf9TINInaZtr4oww2+kXmfwa9SkN//g/LwS19/UJ0=
|
||||||
github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk=
|
github.com/nats-io/nats-server/v2 v2.11.15/go.mod h1:zwhv8Y0PE3KHyKgznJc/9Xoai638SaJd83zzJ5GJn74=
|
||||||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
|
||||||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
|
||||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
|
||||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
|
||||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
||||||
@@ -43,10 +47,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
|||||||
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||||
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||||
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
|
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
|
||||||
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
|
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
|
||||||
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
|
||||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
|
||||||
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
|
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
|
||||||
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
|
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
|
||||||
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
|
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
|
||||||
|
|||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package busauth
|
||||||
|
|
||||||
|
import server "github.com/nats-io/nats-server/v2/server"
|
||||||
|
|
||||||
|
// PermissionsFromSubjects adapts a subject-deriving function (e.g.
|
||||||
|
// membership.SubjectACLFor, which maps an identity to the subjects of the rooms
|
||||||
|
// it belongs to plus the client infrastructure subjects) into the PermissionsFunc
|
||||||
|
// the ACL authenticator expects. The derived subjects are granted as BOTH the
|
||||||
|
// publish and subscribe allow set, so a connection can only pub/sub on the
|
||||||
|
// subjects it is entitled to. A derivation error is propagated so the caller
|
||||||
|
// fails closed (denies the connection) rather than granting open access.
|
||||||
|
//
|
||||||
|
// This is the production wiring for the per-subject data-plane ACL (issue 0003e,
|
||||||
|
// audit H4): membershipd passes PermissionsFromSubjects(membership.SubjectACLFor(
|
||||||
|
// store)) to NewNkeyAuthenticatorACL. It lives in busauth (not membership) so the
|
||||||
|
// membership package stays free of the nats-server dependency.
|
||||||
|
func PermissionsFromSubjects(derive func(signPubHex string) ([]string, error)) PermissionsFunc {
|
||||||
|
return func(signPubHex string) (*server.Permissions, error) {
|
||||||
|
subjects, err := derive(signPubHex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sp := &server.SubjectPermission{Allow: subjects}
|
||||||
|
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
+11
-1
@@ -799,7 +799,17 @@ func (c *Client) processFrame(roomID string, info roomView, data []byte, handler
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if info.Policy.SignMsgs && f.Sig != nil {
|
// A room with SignMsgs REQUIRES a signature, so an unsigned frame is
|
||||||
|
// unauthenticated and must be dropped — not silently accepted. The previous
|
||||||
|
// `&& f.Sig != nil` guard verified the signature only when one was present, so
|
||||||
|
// an attacker with data-plane access could publish a frame with Sig==nil and a
|
||||||
|
// forged Sender and have the receiver accept it as authentic in a room that
|
||||||
|
// demands signatures (audit N3, report 0006). Requiring the signature first
|
||||||
|
// closes that spoof.
|
||||||
|
if info.Policy.SignMsgs {
|
||||||
|
if f.Sig == nil {
|
||||||
|
return // signature required by room policy but absent: drop
|
||||||
|
}
|
||||||
pub, err := c.signerPub(roomID, f.Sender)
|
pub, err := c.signerPub(roomID, f.Sender)
|
||||||
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
|
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
|
||||||
return // unauthenticated frame: drop
|
return // unauthenticated frame: drop
|
||||||
|
|||||||
@@ -0,0 +1,154 @@
|
|||||||
|
package client_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/client"
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
"github.com/enmanuel/unibus/pkg/room"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestReaudit_SigNilSpoof ports the re-auditor's N3 (Alto) finding: in a room
|
||||||
|
// that REQUIRES per-message signatures, an attacker with data-plane access
|
||||||
|
// publishes a raw frame with Sig==nil and a forged Sender. Before the fix
|
||||||
|
// processFrame verified the signature only when one was present
|
||||||
|
// (`SignMsgs && f.Sig != nil`), so the receiver accepted the unsigned, forged
|
||||||
|
// frame as authentic. The fix drops any unsigned frame in a SignMsgs room.
|
||||||
|
//
|
||||||
|
// Coverage:
|
||||||
|
// - golden: a properly signed frame from a real member IS delivered;
|
||||||
|
// - error : an unsigned frame with a forged Sender in a SignMsgs room is DROPPED;
|
||||||
|
// - edge : a room WITHOUT SignMsgs still delivers an unsigned frame (the drop
|
||||||
|
// is specific to signed rooms, not a blanket reject of unsigned frames).
|
||||||
|
func TestReaudit_SigNilSpoof(t *testing.T) {
|
||||||
|
h := newHarness(t)
|
||||||
|
waitHealth(t, h.ctrlURL)
|
||||||
|
|
||||||
|
alice, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect alice: %v", err)
|
||||||
|
}
|
||||||
|
defer alice.Close()
|
||||||
|
bob, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect bob: %v", err)
|
||||||
|
}
|
||||||
|
defer bob.Close()
|
||||||
|
|
||||||
|
// A signed-but-NOT-encrypted room: SignMsgs enforces authorship, and the lack
|
||||||
|
// of encryption is exactly the case the auditor flagged as Alto (any peer with
|
||||||
|
// the subject can forge a sender if signatures are not strictly required).
|
||||||
|
const subject = "room.signed.spoof"
|
||||||
|
signedPolicy := room.Policy{Encrypt: false, Persist: false, SignMsgs: true}
|
||||||
|
roomID, err := alice.CreateRoom(subject, signedPolicy)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("alice create signed room: %v", err)
|
||||||
|
}
|
||||||
|
if err := alice.Invite(roomID, bob.Endpoint()); err != nil {
|
||||||
|
t.Fatalf("alice invite bob: %v", err)
|
||||||
|
}
|
||||||
|
if err := bob.Join(roomID); err != nil {
|
||||||
|
t.Fatalf("bob join: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
|
var got []string
|
||||||
|
sub, err := bob.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||||
|
mu.Lock()
|
||||||
|
got = append(got, string(plaintext))
|
||||||
|
mu.Unlock()
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("bob subscribe: %v", err)
|
||||||
|
}
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
time.Sleep(150 * time.Millisecond)
|
||||||
|
|
||||||
|
// Attacker: a raw NATS connection (the dev harness leaves the data plane open),
|
||||||
|
// no identity, forged Sender, NO signature.
|
||||||
|
const spoofMsg = "I am totally the victim"
|
||||||
|
rawAtk, err := nats.Connect(h.natsURL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("attacker raw connect: %v", err)
|
||||||
|
}
|
||||||
|
defer rawAtk.Close()
|
||||||
|
spoof := frame.Frame{
|
||||||
|
Type: frame.PUB,
|
||||||
|
Subject: subject,
|
||||||
|
Sender: "victim-forged-endpoint",
|
||||||
|
MsgID: "spoof-1",
|
||||||
|
Epoch: 1,
|
||||||
|
Payload: []byte(spoofMsg),
|
||||||
|
// Sig intentionally nil — this is the attack.
|
||||||
|
}
|
||||||
|
sb, err := spoof.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("marshal spoof: %v", err)
|
||||||
|
}
|
||||||
|
if err := rawAtk.Publish(subject, sb); err != nil {
|
||||||
|
t.Fatalf("attacker publish: %v", err)
|
||||||
|
}
|
||||||
|
_ = rawAtk.Flush()
|
||||||
|
|
||||||
|
// Golden: alice's properly signed frame must be delivered.
|
||||||
|
const goodMsg = "authentic from alice"
|
||||||
|
if err := alice.Publish(roomID, []byte(goodMsg)); err != nil {
|
||||||
|
t.Fatalf("alice publish: %v", err)
|
||||||
|
}
|
||||||
|
if !waitFor(&mu, &got, func(rs []string) bool {
|
||||||
|
for _, r := range rs {
|
||||||
|
if r == goodMsg {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, 2*time.Second) {
|
||||||
|
t.Fatalf("a properly signed frame should be delivered; got %v", snapshot(&mu, &got))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error path: the unsigned, forged frame must NEVER reach the handler.
|
||||||
|
for _, r := range snapshot(&mu, &got) {
|
||||||
|
if r == spoofMsg {
|
||||||
|
t.Fatalf("SIG-NIL SPOOF: receiver accepted an unsigned frame with a forged Sender in a SignMsgs room")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: a room WITHOUT SignMsgs still delivers an unsigned raw frame, proving
|
||||||
|
// the drop is scoped to signed rooms and did not break the plain-NATS path.
|
||||||
|
const subjectOpen = "room.open.nosig"
|
||||||
|
openRoom, err := alice.CreateRoom(subjectOpen, room.ModeNATS)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("alice create open room: %v", err)
|
||||||
|
}
|
||||||
|
openCol := subscribeCollect(t, alice, openRoom)
|
||||||
|
defer openCol.sub.Unsubscribe()
|
||||||
|
time.Sleep(150 * time.Millisecond)
|
||||||
|
|
||||||
|
const openMsg = "unsigned but allowed here"
|
||||||
|
openFrame := frame.Frame{
|
||||||
|
Type: frame.PUB,
|
||||||
|
Subject: subjectOpen,
|
||||||
|
Sender: "anyone",
|
||||||
|
MsgID: "open-1",
|
||||||
|
Payload: []byte(openMsg),
|
||||||
|
// no Sig — fine in a non-signed room
|
||||||
|
}
|
||||||
|
ob, _ := openFrame.Marshal()
|
||||||
|
if err := rawAtk.Publish(subjectOpen, ob); err != nil {
|
||||||
|
t.Fatalf("publish open frame: %v", err)
|
||||||
|
}
|
||||||
|
_ = rawAtk.Flush()
|
||||||
|
if !waitFor(&openCol.mu, &openCol.msgs, func(rs []string) bool {
|
||||||
|
for _, r := range rs {
|
||||||
|
if r == openMsg {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, 2*time.Second) {
|
||||||
|
t.Fatalf("an unsigned frame in a non-signed room should be delivered; got %v", snapshot(&openCol.mu, &openCol.msgs))
|
||||||
|
}
|
||||||
|
}
|
||||||
+103
-11
@@ -39,18 +39,12 @@ func mustID(t *testing.T) cs.Identity {
|
|||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
|
|
||||||
// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc
|
// aclPermsFunc builds the per-subject PermissionsFunc the ACL authenticator
|
||||||
// the ACL authenticator expects (same Allow set for publish and subscribe).
|
// expects. It delegates to the SAME production wiring membershipd uses
|
||||||
|
// (busauth.PermissionsFromSubjects over membership.SubjectACLFor), so this test
|
||||||
|
// exercises the real path rather than a test-only reimplementation.
|
||||||
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
|
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
|
||||||
derive := membership.SubjectACLFor(store)
|
return busauth.PermissionsFromSubjects(membership.SubjectACLFor(store))
|
||||||
return func(signPubHex string) (*server.Permissions, error) {
|
|
||||||
subs, err := derive(signPubHex)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sp := &server.SubjectPermission{Allow: subs}
|
|
||||||
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// startACLNats boots an embedded NATS whose authenticator confines each peer to
|
// startACLNats boots an embedded NATS whose authenticator confines each peer to
|
||||||
@@ -219,6 +213,104 @@ func TestSubjectACLIsolation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestReaudit_H4_WildcardMetadataLeak ports the re-auditor's H4 vector. Before
|
||||||
|
// the per-subject ACL was WIRED into membershipd (it existed in pkg/membership and
|
||||||
|
// pkg/busauth but the binary used the plain NewNkeyAuthenticator), a registered
|
||||||
|
// NON-member could open a raw NATS connection, Subscribe(">"), and capture every
|
||||||
|
// room's subject plus JetStream stream/advisory activity — the payload stayed E2E
|
||||||
|
// ciphertext, but the metadata leaked. With NewNkeyAuthenticatorACL wired via the
|
||||||
|
// production path (busauth.PermissionsFromSubjects(membership.SubjectACLFor)), a
|
||||||
|
// non-member is confined to the client-infra subjects, so the wildcard and any
|
||||||
|
// foreign room subject are denied.
|
||||||
|
//
|
||||||
|
// Coverage:
|
||||||
|
// - error : a non-member's Subscribe(">") raises a permission violation;
|
||||||
|
// - edge : a non-member subscribing to another room's exact subject is denied;
|
||||||
|
// - golden: the member still pub/subs her own room, and the non-member never
|
||||||
|
// captures that traffic.
|
||||||
|
//
|
||||||
|
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
|
||||||
|
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
|
||||||
|
// subscribes specifically to "$JS.API.>" can still observe stream-management
|
||||||
|
// requests whose subjects embed the stream name derived from a room id. Fully
|
||||||
|
// closing that needs NATS accounts/permissions isolation per identity (deferred to
|
||||||
|
// the 0003 decentralization line). The high-impact leak the auditor exploited —
|
||||||
|
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
|
||||||
|
// — is closed.
|
||||||
|
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("store: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
|
||||||
|
alice, eve := mustID(t), mustID(t)
|
||||||
|
aliceEP := frame.EndpointID(alice.SignPub)
|
||||||
|
mustAddUser(t, store, alice, "alice")
|
||||||
|
mustAddUser(t, store, eve, "eve") // eve is REGISTERED but never a member of alice's room
|
||||||
|
const subject = "room.e2e.confidential"
|
||||||
|
mustCreateRoom(t, store, "ROOMA", subject, aliceEP, alice)
|
||||||
|
|
||||||
|
srv := startACLNats(t, store)
|
||||||
|
url := srv.ClientURL()
|
||||||
|
|
||||||
|
eveErr := make(chan error, 8)
|
||||||
|
eveNC := nkeyConn(t, url, eve, eveErr)
|
||||||
|
eveAll := make(chan *nats.Msg, 16)
|
||||||
|
|
||||||
|
// Error: eve's wildcard subscription is rejected. nats.go creates the local sub
|
||||||
|
// object and the server rejects it asynchronously (delivered to ErrorHandler).
|
||||||
|
if _, err := eveNC.Subscribe(">", func(m *nats.Msg) { eveAll <- m }); err != nil {
|
||||||
|
t.Fatalf("eve sub >: %v", err)
|
||||||
|
}
|
||||||
|
_ = eveNC.Flush()
|
||||||
|
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||||
|
t.Fatalf("a non-member's Subscribe(\">\") must raise a permissions violation (wildcard metadata leak still open)")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: eve subscribing to the foreign room's EXACT subject is also denied.
|
||||||
|
drain(eveErr)
|
||||||
|
if _, err := eveNC.Subscribe(subject, func(m *nats.Msg) { eveAll <- m }); err != nil {
|
||||||
|
t.Fatalf("eve sub subject: %v", err)
|
||||||
|
}
|
||||||
|
_ = eveNC.Flush()
|
||||||
|
if e := waitErr(eveErr, 1*time.Second); e == nil {
|
||||||
|
t.Fatalf("a non-member subscribing to another room's subject must be denied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Golden: alice (the member) pub/subs her own room with no violation, and eve
|
||||||
|
// never captured the traffic despite her (rejected) wildcard.
|
||||||
|
aliceErr := make(chan error, 4)
|
||||||
|
aliceNC := nkeyConn(t, url, alice, aliceErr)
|
||||||
|
aliceGot := make(chan string, 4)
|
||||||
|
if _, err := aliceNC.Subscribe(subject, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil {
|
||||||
|
t.Fatalf("alice sub own room: %v", err)
|
||||||
|
}
|
||||||
|
_ = aliceNC.Flush()
|
||||||
|
if e := waitErr(aliceErr, 300*time.Millisecond); e != nil {
|
||||||
|
t.Fatalf("alice subscribing to her OWN room raised an error: %v", e)
|
||||||
|
}
|
||||||
|
if err := aliceNC.Publish(subject, []byte("members-only metadata")); err != nil {
|
||||||
|
t.Fatalf("alice publish: %v", err)
|
||||||
|
}
|
||||||
|
_ = aliceNC.Flush()
|
||||||
|
select {
|
||||||
|
case got := <-aliceGot:
|
||||||
|
if got != "members-only metadata" {
|
||||||
|
t.Fatalf("alice got %q", got)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("alice did not receive her own room's message")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case m := <-eveAll:
|
||||||
|
t.Fatalf("eve captured room traffic despite the ACL: subject=%q data=%q", m.Subject, m.Data)
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
// good: eve captured nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
|
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
|
||||||
// alice is not in room B, so her connection has no permission for its subject;
|
// alice is not in room B, so her connection has no permission for its subject;
|
||||||
// after she is added to room B and calls RefreshSession, the reconnect
|
// after she is added to room B and calls RefreshSession, the reconnect
|
||||||
|
|||||||
@@ -0,0 +1,148 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to
|
||||||
|
// call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the
|
||||||
|
// test's own goroutine). Returns 0 when unavailable.
|
||||||
|
func readRSSkBRaw() int64 {
|
||||||
|
b, err := os.ReadFile("/proc/self/status")
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
for _, line := range strings.Split(string(b), "\n") {
|
||||||
|
if strings.HasPrefix(line, "VmRSS:") {
|
||||||
|
f := strings.Fields(line)
|
||||||
|
if len(f) >= 2 {
|
||||||
|
v, _ := strconv.ParseInt(f[1], 10, 64)
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the
|
||||||
|
// per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE
|
||||||
|
// memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40
|
||||||
|
// concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the
|
||||||
|
// number of simultaneously-buffered uploads is capped, so the resident set stays
|
||||||
|
// bounded regardless of how many connections arrive at once.
|
||||||
|
//
|
||||||
|
// Coverage:
|
||||||
|
// - golden: a normal upload succeeds, and the server is still healthy after the
|
||||||
|
// storm (the limiter did not wedge it);
|
||||||
|
// - edge : concurrency right at the cap is admitted;
|
||||||
|
// - error : a concurrent flood far past the cap sheds the excess with 503
|
||||||
|
// (backpressure) instead of buffering it all, and the RSS spike stays bounded
|
||||||
|
// and does NOT scale with the number of requests.
|
||||||
|
func TestReaudit_DoSConcurrency(t *testing.T) {
|
||||||
|
if runtime.GOOS != "linux" {
|
||||||
|
t.Skip("RSS probe is Linux-only")
|
||||||
|
}
|
||||||
|
srv := dosServer(t, AuthOff)
|
||||||
|
// Force a small aggregate cap so the bound is observable in a unit test: with
|
||||||
|
// a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses
|
||||||
|
// maxInflightBytes (128 MiB); the mechanism under test is identical.
|
||||||
|
const cap = int64(48) << 20
|
||||||
|
srv.inflight = newInflightLimiter(cap)
|
||||||
|
|
||||||
|
const blob = maxBlobBytes // 16 MiB, the per-request ceiling
|
||||||
|
const n = 40 // the auditor's figure
|
||||||
|
|
||||||
|
// A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a
|
||||||
|
// ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the
|
||||||
|
// delta should stay well under this. Without the limiter, 40 concurrent
|
||||||
|
// uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB).
|
||||||
|
const maxSpikeKB = int64(256) << 10 // 256 MiB
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
before := readRSSkBRaw()
|
||||||
|
|
||||||
|
// Sample peak RSS while the storm runs.
|
||||||
|
var peak int64
|
||||||
|
atomic.StoreInt64(&peak, before)
|
||||||
|
stop := make(chan struct{})
|
||||||
|
var sampler sync.WaitGroup
|
||||||
|
sampler.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer sampler.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) {
|
||||||
|
atomic.StoreInt64(&peak, v)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var got503, got200 int64
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob})
|
||||||
|
req.ContentLength = blob
|
||||||
|
// Distinct source IP per request: this is the multi-IP (botnet) shape the
|
||||||
|
// auditor flagged, where the per-IP rate limit gives no aggregate defense.
|
||||||
|
// The in-flight byte limiter is the global bound that must hold here.
|
||||||
|
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234"
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
switch rec.Code {
|
||||||
|
case http.StatusServiceUnavailable:
|
||||||
|
atomic.AddInt64(&got503, 1)
|
||||||
|
case http.StatusOK:
|
||||||
|
atomic.AddInt64(&got200, 1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(stop)
|
||||||
|
sampler.Wait()
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
delta := atomic.LoadInt64(&peak) - before
|
||||||
|
|
||||||
|
// Error path: the flood must have hit the cap and shed the excess with 503.
|
||||||
|
if got503 == 0 {
|
||||||
|
t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503)
|
||||||
|
}
|
||||||
|
// The aggregate memory must stay bounded — not scale with n.
|
||||||
|
if delta > maxSpikeKB {
|
||||||
|
t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB)
|
||||||
|
}
|
||||||
|
// All reservations released after the storm.
|
||||||
|
if f := srv.inflight.inFlight(); f != 0 {
|
||||||
|
t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Golden: the server is still healthy and serves a normal upload (from a fresh
|
||||||
|
// IP so the per-IP rate limiter, untouched here, is not what we measure).
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm"))
|
||||||
|
gReq.RemoteAddr = "203.0.113.9:9999"
|
||||||
|
srv.ServeHTTP(rec, gReq)
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB",
|
||||||
|
n, got200, got503, delta, maxSpikeKB, cap>>20)
|
||||||
|
}
|
||||||
@@ -0,0 +1,85 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import "sync/atomic"
|
||||||
|
|
||||||
|
// inflightLimiter is a non-blocking, byte-counting concurrency limiter: a global
|
||||||
|
// cap on how many bytes of request body the server will buffer simultaneously.
|
||||||
|
//
|
||||||
|
// The per-request body ceilings (maxControlBodyBytes / maxBlobBytes) bound a
|
||||||
|
// single request, and the per-IP rate limiter throttles a single source, but
|
||||||
|
// neither bounds the AGGREGATE memory across many concurrent uploads: the
|
||||||
|
// re-audit (report 0006, N2) showed 40 concurrent 16 MiB blob uploads driving
|
||||||
|
// RSS to ~1.42 GB, and a distributed (multi-IP) flood scales without a ceiling
|
||||||
|
// because the rate limiter is per-IP. This limiter is the missing aggregate
|
||||||
|
// bound: ServeHTTP reserves a request's worst-case buffered size before reading
|
||||||
|
// the body and releases it when the request finishes, so the total bytes in
|
||||||
|
// flight can never exceed max regardless of how many connections or source IPs
|
||||||
|
// arrive at once.
|
||||||
|
//
|
||||||
|
// It is intentionally NON-blocking: when a reservation does not fit, the caller
|
||||||
|
// sheds the request with backpressure (503) rather than parking a goroutine,
|
||||||
|
// which would let an attacker exhaust goroutines/connections instead of RAM. The
|
||||||
|
// counter is maintained with sync/atomic (a CAS loop), so it is safe for
|
||||||
|
// concurrent use without a mutex.
|
||||||
|
//
|
||||||
|
// Implementation note: this lives inside unibus rather than the fn-registry
|
||||||
|
// (where a generic concurrency primitive would normally belong) because the
|
||||||
|
// registry's functions/core package pulls in transitive dependencies that
|
||||||
|
// require CGO (mattn/go-sqlite3) and external modules, which are incompatible
|
||||||
|
// with unibus's CGO_ENABLED=0 build, and because this work is scoped to the
|
||||||
|
// unibus sub-repo.
|
||||||
|
type inflightLimiter struct {
|
||||||
|
max int64 // immutable after construction; <= 0 disables the limiter
|
||||||
|
used int64 // bytes currently reserved; accessed ONLY via sync/atomic
|
||||||
|
}
|
||||||
|
|
||||||
|
// newInflightLimiter builds a limiter with a cap of maxBytes bytes in flight.
|
||||||
|
// maxBytes <= 0 disables the cap (tryAcquire always grants), which is the
|
||||||
|
// loopback/dev posture where an aggregate memory ceiling is not wanted.
|
||||||
|
func newInflightLimiter(maxBytes int64) *inflightLimiter {
|
||||||
|
return &inflightLimiter{max: maxBytes}
|
||||||
|
}
|
||||||
|
|
||||||
|
// tryAcquire reserves n bytes without blocking. It returns true and reserves the
|
||||||
|
// bytes when they fit within the cap (used+n <= max), or false (reserving
|
||||||
|
// nothing) when they do not. n <= 0 is granted without reserving, and a disabled
|
||||||
|
// limiter (max <= 0) always grants. Safe for concurrent use.
|
||||||
|
func (l *inflightLimiter) tryAcquire(n int64) bool {
|
||||||
|
if l.max <= 0 || n <= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
cur := atomic.LoadInt64(&l.used)
|
||||||
|
if cur+n > l.max {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapInt64(&l.used, cur, cur+n) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// release returns n previously reserved bytes. It must be paired with a
|
||||||
|
// tryAcquire that granted. A disabled limiter or n <= 0 is a no-op. The counter
|
||||||
|
// never drops below zero (a defensive clamp against an accidental double release).
|
||||||
|
func (l *inflightLimiter) release(n int64) {
|
||||||
|
if l.max <= 0 || n <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
cur := atomic.LoadInt64(&l.used)
|
||||||
|
nv := cur - n
|
||||||
|
if nv < 0 {
|
||||||
|
nv = 0
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapInt64(&l.used, cur, nv) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// inFlight returns the bytes currently reserved. It is observability for tests
|
||||||
|
// and metrics.
|
||||||
|
func (l *inflightLimiter) inFlight() int64 {
|
||||||
|
return atomic.LoadInt64(&l.used)
|
||||||
|
}
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestInflightLimiterBasics covers the limiter contract: granting within the cap
|
||||||
|
// (golden), the exact boundary (edge), refusal over the cap without mutating the
|
||||||
|
// counter (error), the disabled mode, and the defensive clamp on over-release.
|
||||||
|
func TestInflightLimiterBasics(t *testing.T) {
|
||||||
|
l := newInflightLimiter(100)
|
||||||
|
|
||||||
|
// Golden: a reservation within the cap is granted and reflected.
|
||||||
|
if !l.tryAcquire(60) {
|
||||||
|
t.Fatalf("acquire 60 within cap 100 should grant")
|
||||||
|
}
|
||||||
|
if l.inFlight() != 60 {
|
||||||
|
t.Fatalf("inFlight = %d, want 60", l.inFlight())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: exactly reaching the cap (60+40 == 100) is granted.
|
||||||
|
if !l.tryAcquire(40) {
|
||||||
|
t.Fatalf("acquire to the exact cap should grant")
|
||||||
|
}
|
||||||
|
if l.inFlight() != 100 {
|
||||||
|
t.Fatalf("inFlight = %d, want 100", l.inFlight())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error: one more byte over the full cap is refused, and the counter is left
|
||||||
|
// untouched (a refused reservation reserves nothing).
|
||||||
|
if l.tryAcquire(1) {
|
||||||
|
t.Fatalf("acquire over a full cap must be refused")
|
||||||
|
}
|
||||||
|
if l.inFlight() != 100 {
|
||||||
|
t.Fatalf("a refused acquire must not change inFlight; got %d", l.inFlight())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release frees capacity again.
|
||||||
|
l.release(100)
|
||||||
|
if l.inFlight() != 0 {
|
||||||
|
t.Fatalf("inFlight after full release = %d, want 0", l.inFlight())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Defensive: an over-release never drives the counter negative.
|
||||||
|
l.release(50)
|
||||||
|
if l.inFlight() != 0 {
|
||||||
|
t.Fatalf("over-release must clamp at 0; got %d", l.inFlight())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestInflightLimiterDisabled verifies that a non-positive cap disables the
|
||||||
|
// limiter: every reservation is granted and nothing is tracked (the loopback/dev
|
||||||
|
// posture).
|
||||||
|
func TestInflightLimiterDisabled(t *testing.T) {
|
||||||
|
for _, max := range []int64{0, -1} {
|
||||||
|
l := newInflightLimiter(max)
|
||||||
|
if !l.tryAcquire(1 << 30) {
|
||||||
|
t.Fatalf("disabled limiter (max=%d) must always grant", max)
|
||||||
|
}
|
||||||
|
if l.inFlight() != 0 {
|
||||||
|
t.Fatalf("disabled limiter must not track usage; got %d", l.inFlight())
|
||||||
|
}
|
||||||
|
l.release(1 << 30) // no-op, must not panic
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestInflightLimiterConcurrent hammers the limiter from many goroutines with
|
||||||
|
// equal-sized acquire/release pairs and asserts the invariant never breaks: the
|
||||||
|
// counter returns to 0 and never exceeds the cap. Run with -race for the memory
|
||||||
|
// model guarantee.
|
||||||
|
func TestInflightLimiterConcurrent(t *testing.T) {
|
||||||
|
const cap = 1000
|
||||||
|
const chunk = 7
|
||||||
|
l := newInflightLimiter(cap)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for g := 0; g < 64; g++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < 2000; i++ {
|
||||||
|
if l.tryAcquire(chunk) {
|
||||||
|
if f := l.inFlight(); f > cap {
|
||||||
|
t.Errorf("inFlight %d exceeded cap %d", f, cap)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
l.release(chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if l.inFlight() != 0 {
|
||||||
|
t.Fatalf("after all goroutines, inFlight = %d, want 0", l.inFlight())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -35,6 +35,14 @@ const (
|
|||||||
// MaxHeaderBytes caps request header size; wired into the http.Server by the
|
// MaxHeaderBytes caps request header size; wired into the http.Server by the
|
||||||
// command. Exported so the bound lives next to its body-size siblings.
|
// command. Exported so the bound lives next to its body-size siblings.
|
||||||
MaxHeaderBytes = 1 << 20 // 1 MiB
|
MaxHeaderBytes = 1 << 20 // 1 MiB
|
||||||
|
// maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all
|
||||||
|
// concurrent requests (audit N2). The per-request ceilings above bound one
|
||||||
|
// request; this bounds the sum, so a concurrent (even multi-IP) flood of
|
||||||
|
// max-size uploads cannot drive the resident set without limit. 128 MiB allows
|
||||||
|
// ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before
|
||||||
|
// further POSTs are shed with 503 — generous for an interactive bus, bounded
|
||||||
|
// for an attacker.
|
||||||
|
maxInflightBytes = 128 << 20 // 128 MiB
|
||||||
)
|
)
|
||||||
|
|
||||||
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
|
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
|
||||||
@@ -62,6 +70,7 @@ type Server struct {
|
|||||||
authMode AuthMode
|
authMode AuthMode
|
||||||
nonces nonceStore
|
nonces nonceStore
|
||||||
limiter *ipRateLimiter
|
limiter *ipRateLimiter
|
||||||
|
inflight *inflightLimiter
|
||||||
|
|
||||||
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
||||||
// rooms. It is the minimum-defensive control for the data plane (audit H4):
|
// rooms. It is the minimum-defensive control for the data plane (audit H4):
|
||||||
@@ -87,6 +96,7 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
|||||||
authMode: authMode,
|
authMode: authMode,
|
||||||
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
|
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
|
||||||
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
||||||
|
inflight: newInflightLimiter(maxInflightBytes),
|
||||||
}
|
}
|
||||||
s.routes()
|
s.routes()
|
||||||
return s
|
return s
|
||||||
@@ -139,6 +149,22 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
r.Body = http.MaxBytesReader(w, r.Body, limit)
|
r.Body = http.MaxBytesReader(w, r.Body, limit)
|
||||||
|
|
||||||
|
// Aggregate memory bound (audit N2): the per-request ceiling above and the
|
||||||
|
// per-IP rate limit do not cap the TOTAL bytes buffered across concurrent
|
||||||
|
// requests. A POST reserves its worst-case buffered size (its route ceiling)
|
||||||
|
// from a global limiter before the body is read, and is shed with 503 when the
|
||||||
|
// cap is reached, so the resident set stays bounded under a concurrent (even
|
||||||
|
// multi-IP) upload flood instead of growing linearly with the number of
|
||||||
|
// connections. Reservation is released when the request finishes. Only POSTs
|
||||||
|
// buffer a body; GETs carry none, so they do not consume the budget.
|
||||||
|
if r.Method == http.MethodPost {
|
||||||
|
if !s.inflight.tryAcquire(limit) {
|
||||||
|
writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer s.inflight.release(limit)
|
||||||
|
}
|
||||||
|
|
||||||
if s.authMode == AuthOff || isAuthExempt(r) {
|
if s.authMode == AuthOff || isAuthExempt(r) {
|
||||||
s.mux.ServeHTTP(w, r)
|
s.mux.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user