12 Commits

Author SHA1 Message Date
egutierrez df3b62a601 Merge quick/0005-bump-close: unibus 0.7.0 + close issue 0005 2026-06-07 16:17:41 +02:00
egutierrez 6976537842 chore(0005): bump unibus to 0.7.0, close issue 0005 (hardening 2)
Hardening 2 (issue 0005, fases 0005a-0005e) cierra los hallazgos nuevos de la
re-auditoría red-team (report 0006): bump de nats-server + toolchain (16 CVEs ->
0 alcanzables), drop de frames sin firma en rooms SignMsgs, limiter global de
bytes en vuelo contra el DoS por concurrencia, TLS obligatorio en bind publico, y
cableado de la ACL por subject que cierra el wildcard metadata leak. Detalle por
fase en el capability growth log del app.md y en el report 0007.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:17:41 +02:00
egutierrez a4bbe8209b Merge issue/0005e-acl-wire: wire per-subject ACL into membershipd (audit H4) 2026-06-07 16:15:52 +02:00
egutierrez 87ef52cc80 fix(0005e): wire per-subject ACL into membershipd (close H4 wildcard metadata leak)
The per-subject data-plane ACL existed since 0003e (membership.SubjectACLFor +
busauth.NewNkeyAuthenticatorACL, unit-tested in TestSubjectACLIsolation) but the
binary never used it: cmd/membershipd installed the plain NewNkeyAuthenticator, so
in production a registered NON-member could open a raw NATS connection,
Subscribe(">"), and harvest every room's subject plus JetStream stream/advisory
activity (payload stayed E2E ciphertext, metadata leaked) — the re-audit's H4
vector (report 0006).

Fix:
- New busauth.PermissionsFromSubjects adapts a subject-deriving function into the
  PermissionsFunc the ACL authenticator expects (subjects granted as both the
  publish and subscribe allow set; a derivation error fails closed). It lives in
  busauth so membership stays free of the nats-server dependency.
- cmd/membershipd, under enforce, now installs
  NewNkeyAuthenticatorACL(store.IsAuthorized,
    PermissionsFromSubjects(membership.SubjectACLFor(store)))
  so every connection is confined to the subjects of the rooms it belongs to plus
  the client-infra subjects.
- pkg/membership/acl_test.go's helper now delegates to the production wiring
  (PermissionsFromSubjects) instead of a test-only reimplementation, so the tests
  exercise the real path.

Verification (pkg/membership/acl_test.go):
- TestReaudit_H4_WildcardMetadataLeak: a non-member's Subscribe(">") and any
  foreign-subject subscribe raise permission violations; the member still pub/subs
  her own room and the non-member captures nothing. With the plain authenticator
  (the pre-0005e wiring) the test fails ("wildcard metadata leak still open"),
  confirming the wiring is what closes it.
- TestSubjectACLIsolation / TestRefreshSessionGainsNewRoom still green.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green.

Residual (documented): the client-infra grant includes "$JS.API.>", shared by all
peers so per-connection JetStream works; a peer that subscribes specifically to
"$JS.API.>" can still observe stream-management requests whose subjects embed the
room-derived stream name. Fully closing that needs NATS accounts/permissions per
identity (deferred to the 0003 decentralization line). Operational note: NATS
freezes permissions at connect time, so clients must client.RefreshSession after a
membership change to gain a new room's subject; cmd/chat and cmd/worker do not yet
call it, a functional gap to close before an enforce+ACL deployment.

Refs: report 0006 H4, issue 0005e.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:15:52 +02:00
egutierrez a2ec78c81d Merge issue/0005d-tls-guard: require TLS on public bind (audit N4) 2026-06-07 16:11:45 +02:00
egutierrez d01da9d396 fix(0005d): require TLS on a public bind (close N4 plaintext control plane)
The H2 guard refused "public bind without enforce" and "TLS flags without
enforce", but it still ALLOWED a public bind with enforce and no --tls-cert: the
control plane then served metadata (subjects, pubkeys, sealed keys, the social
graph) over plaintext HTTP publicly, so audit H5 reappeared as the N4 gap (TLS
was a capability, not a requirement; report 0006).

Fix: validateBootConfig now also refuses a non-loopback --bind unless both
--tls-cert and --tls-key are set. Public deployments must serve HTTPS; loopback
dev is unaffected (no TLS still allowed there).

Verification (cmd/membershipd/config_test.go):
- TestGap_PublicEnforceNoTLS: validateBootConfig("0.0.0.0", enforce, "", "")
  now returns an error mentioning --tls-cert (golden public+enforce+TLS allowed;
  edge loopback-without-TLS still allowed).
- TestBootConfigPolicy table updated: public+enforce+notls / +certonly / +keyonly
  and lan-ip+enforce+notls are now refused; public+enforce+tls and
  loopback+enforce+tls allowed.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green.

Refs: report 0006 N4, issue 0005d.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:11:45 +02:00
egutierrez db8618ddc3 Merge issue/0005c-inflight: global in-flight byte limiter bounds aggregate memory (audit N2) 2026-06-07 16:09:58 +02:00
egutierrez e7d59fd01d fix(0005c): bound aggregate buffered memory with a global in-flight byte limiter
The H1 fix bounds each request (1 MiB control / 16 MiB blob) and the per-IP rate
limiter throttles a single source, but neither bounds the AGGREGATE memory across
concurrent requests. The re-audit (report 0006, N2) drove RSS to ~1.42 GB with 40
concurrent 16 MiB uploads, and noted that a multi-IP (botnet) flood scales without
a ceiling because the rate limit is per-IP.

Fix: a global, non-blocking, byte-counting limiter (pkg/membership/inflight.go).
ServeHTTP reserves a POST's worst-case buffered size (its route ceiling) from the
limiter before reading the body, and releases it when the request finishes. When
the global cap (maxInflightBytes = 128 MiB) is reached, further POSTs are shed
with 503 (backpressure) rather than parking goroutines, so total bytes buffered
in flight stays bounded regardless of connection count or source-IP spread. GETs
carry no body and do not consume the budget.

The limiter is implemented inside unibus (not delegated to the fn-registry, where
a generic concurrency primitive would normally live) because functions/core pulls
transitive deps requiring CGO (mattn/go-sqlite3) and external modules that are
incompatible with unibus's CGO_ENABLED=0 build, and because this work is scoped
to the unibus sub-repo. The type/method comments document this.

Verification:
- pkg/membership/inflight_test.go: TestInflightLimiter{Basics,Disabled,Concurrent}
  cover golden/edge/error/disabled/over-release and a -race concurrency invariant
  (inFlight returns to 0, never exceeds cap).
- pkg/membership/dos_concurrency_test.go: TestReaudit_DoSConcurrency fires 40
  concurrent 16 MiB uploads from distinct IPs (the multi-IP shape) against a 48 MiB
  test cap -> 200=3 503=37, RSS delta ~93 MiB (bound 256 MiB), inFlight()==0, and a
  fresh upload still 200. With the limiter disabled the test fails (200=40 503=0),
  confirming it is a real regression guard.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green;
  CGO_ENABLED=1 go test -race ./pkg/membership/ green.

Residual (documented): under enforce the body is buffered twice (auth verify +
handler), so real RSS is ~2x the reserved bytes; closing that fully means
streaming blobs to disk (overlaps H9 / issue 0002).

Refs: report 0006 N2, issue 0005c.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:09:58 +02:00
egutierrez 0f79708338 Merge issue/0005b-sig-nil: drop unsigned frames in SignMsgs rooms (audit N3) 2026-06-07 15:58:10 +02:00
egutierrez ef3af6dfd1 fix(0005b): drop unsigned frames in SignMsgs rooms (close sig-nil spoof)
client.processFrame verified a frame's signature only when one was present
(`info.Policy.SignMsgs && f.Sig != nil`). In a room whose policy REQUIRES
per-message signatures, an attacker with data-plane access could publish a raw
frame with Sig==nil and a forged Sender, and the receiver accepted it as
authentic because the verification block was skipped (audit N3, report 0006).
On a signed-but-cleartext room any peer that knows the subject could thus
impersonate any sender.

Fix: in a SignMsgs room a missing signature is itself a rejection. processFrame
now drops any frame with Sig==nil before attempting verification:

    if info.Policy.SignMsgs {
        if f.Sig == nil { return }   // signature required but absent: drop
        // verify ...
    }

Non-signed rooms (ModeNATS) are unaffected: unsigned frames there are still
delivered, so the plain-NATS path is unchanged.

Verification (pkg/client/sig_nil_spoof_test.go, TestReaudit_SigNilSpoof):
- golden: a properly signed frame from a member is delivered.
- error : an unsigned frame with a forged Sender in a SignMsgs room is dropped
  (the test fails with "SIG-NIL SPOOF: receiver accepted ..." when the fix is
  reverted, confirming it is a real regression guard).
- edge  : a non-signed room still delivers an unsigned frame.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green.

Refs: report 0006 N3, issue 0005b.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 15:58:10 +02:00
egutierrez 88b47912bd Merge issue/0005a-cve-bump: bump nats-server to v2.11.15 + go1.26.4 (16 CVEs -> 0 reachable) 2026-06-07 15:55:32 +02:00
egutierrez a3ac58fb70 fix(0005a): bump nats-server v2.10.22->v2.11.15 + toolchain go1.26.4 (close 16 CVEs)
govulncheck reported 16 reachable vulnerabilities (re-audit finding N1, report 0006):
14 in github.com/nats-io/nats-server/v2@v2.10.22 -- the embedded NATS server, which
is exposed to the internet in the chosen deployment -- and 2 in the Go standard
library (GO-2026-5039 net/textproto, GO-2026-5037 crypto/x509).

Changes:
- go get github.com/nats-io/nats-server/v2@v2.11.15 (covers all 14 server CVEs;
  pulls nats.go v1.49.0, nkeys v0.4.15, jwt v2.8.1, klauspost/compress v1.18.4
  and friends transitively).
- go directive 1.25.0 -> 1.26.4 so the toolchain ships the two stdlib fixes.

This is a go.mod/go.sum change justified purely by CVE remediation; it is the
explicit exception to the "do not touch deps" rule for a CVE bump.

Verification:
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  -> green,
  including the 0003 multi-node cluster/JetStream e2e in pkg/embeddednats, so the
  server bump did not break the cluster or the durable plane.
- govulncheck ./...  -> "No vulnerabilities found" (0 reachable; the 13 that remain
  are in required-but-not-called modules).

Refs: report 0006 N1, issue 0005a.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 15:55:32 +02:00
15 changed files with 759 additions and 45 deletions
+24 -1
View File
@@ -2,7 +2,7 @@
name: unibus
lang: go
domain: infra
version: 0.6.0
version: 0.7.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e]
uses_functions:
@@ -154,6 +154,29 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log
- v0.7.0 (2026-06-07) — hardening de seguridad 2 (issue 0005, fases 0005a0005e)
que cierra los hallazgos nuevos de la re-auditoría red-team (report 0006) y
lleva el veredicto de exposición pública a "sí-con-condiciones". (0005a) Bump de
`github.com/nats-io/nats-server/v2` v2.10.22→v2.11.15 y de la toolchain a
go1.26.4: `govulncheck ./...` pasa de 16 vulnerabilidades alcanzables (14 del
servidor NATS embebido + 2 de la stdlib) a 0. (0005b) `client.processFrame`
ahora descarta cualquier frame sin firma en una room `SignMsgs` (antes verificaba
solo si la firma venía presente, lo que permitía suplantar `Sender` con
`Sig==nil`). (0005c) Nuevo limiter global de bytes en vuelo
(`pkg/membership.inflightLimiter`) que acota la memoria agregada que el control
plane bufferiza bajo concurrencia (el límite por-request y el rate-limit por-IP
no acotaban el total): un flood concurrente multi-IP se descarta con 503 en vez
de crecer sin techo (el RSS deja de escalar con N). (0005d) El guard de arranque
`validateBootConfig` ahora exige `--tls-cert/--tls-key` en bind no-loopback (un
control plane público sin TLS servía metadata en claro). (0005e) Se cablea por
fin en `membershipd` la ACL por subject que ya existía huérfana desde 0003e
(`busauth.NewNkeyAuthenticatorACL` + nuevo adaptador `busauth.PermissionsFromSubjects`
sobre `membership.SubjectACLFor`): un registrado no-miembro ya no puede
`Subscribe(">")` y captar los subjects/advisories de rooms ajenas. Residuales
documentados: `$JS.API.>` sigue compartido (cierre completo = NATS accounts por
identidad, diferido) y los clientes deben `RefreshSession` tras cambios de
membresía (chat/worker aún no lo hacen). El comportamiento de un solo nodo no
cambia y master sigue verde.
- v0.6.0 (2026-06-07) — descentralización / alta disponibilidad (issue 0003,
fases 0003a0003e), report 0006. El servidor NATS embebido gana soporte de
cluster con routes autenticadas (secreto de cluster) y TLS mutuo de nodo
+10 -2
View File
@@ -43,9 +43,12 @@ func isLoopbackBind(bind string) bool {
// configuration that would expose the bus without enforced authentication:
//
// - a non-loopback --bind without --bus-auth enforce (the data plane and
// control plane would both accept anyone), and
// control plane would both accept anyone),
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
// but authenticates no one — encrypted access for everybody is still open).
// but authenticates no one — encrypted access for everybody is still open), and
// - a non-loopback --bind WITHOUT --tls-cert/--tls-key (the control plane would
// serve metadata over plaintext HTTP publicly — audit H5 reappearing, the N4
// gap the re-audit found: TLS was available but not mandatory).
//
// It is a pure function of the parsed flags so the command can fail fast at
// startup and tests can assert the policy without booting a server.
@@ -60,6 +63,11 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
mode)
}
if !isLoopbackBind(bind) && (tlsCert == "" || tlsKey == "") {
return fmt.Errorf(
"refusing to start: --bind %q is not loopback but --tls-cert/--tls-key are not both set; a public control plane must serve HTTPS or its metadata (subjects, pubkeys, sealed keys, the social graph) travels in cleartext to a network MITM (audit H5/N4) — provide a CA-signed --tls-cert/--tls-key, or bind 127.0.0.1 for local dev",
bind)
}
return nil
}
+33 -2
View File
@@ -30,6 +30,31 @@ func TestAudit_FailOpenTLSWithoutAuth(t *testing.T) {
}
}
// TestGap_PublicEnforceNoTLS ports the re-auditor's N4 gap: the H2 guard refused
// "public without enforce" and "TLS without enforce", but ALLOWED a public bind
// with enforce and NO --tls-cert, so the control plane served metadata over
// plaintext HTTP publicly (H5 reappearing). The guard now refuses it.
func TestGap_PublicEnforceNoTLS(t *testing.T) {
// The exact auditor configuration: public bind, enforce on, no TLS cert/key.
err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "", "")
if err == nil {
t.Fatalf("public bind + enforce + NO --tls-cert must be refused: the control plane would serve plaintext HTTP publicly (audit N4)")
}
if !strings.Contains(err.Error(), "tls-cert") {
t.Fatalf("error should point the operator at --tls-cert/--tls-key, got: %v", err)
}
// Golden: the same public+enforce config WITH a cert/key is allowed.
if err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "server.crt", "server.key"); err != nil {
t.Fatalf("public + enforce + TLS is the intended production config, got: %v", err)
}
// Edge: loopback without TLS stays allowed (local dev is not a public exposure).
if err := validateBootConfig("127.0.0.1", membership.AuthOff, "", ""); err != nil {
t.Fatalf("loopback dev without TLS must remain allowed, got: %v", err)
}
}
// TestBootConfigPolicy is the full table: the golden secure-public config is
// allowed, dev loopback is allowed, and every fail-open shape is refused.
func TestBootConfigPolicy(t *testing.T) {
@@ -41,19 +66,25 @@ func TestBootConfigPolicy(t *testing.T) {
key string
wantErr bool
}{
// Golden: the intended public production config.
// Golden: the intended public production config — enforce AND TLS.
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", false},
// Edge: local dev on loopback may stay open (no auth, no TLS).
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
{"localhost+off", "localhost", membership.AuthOff, "", "", false},
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
// Edge: loopback with full enforce+TLS is also fine.
{"loopback+enforce+tls", "127.0.0.1", membership.AuthEnforce, "s.crt", "s.key", false},
// Error: public bind without enforce.
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
{"empty-bind+off", "", membership.AuthOff, "", "", true},
// Error (N4): public bind + enforce but NO TLS -> plaintext control plane.
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", true},
{"public+enforce+certonly", "0.0.0.0", membership.AuthEnforce, "s.crt", "", true},
{"public+enforce+keyonly", "0.0.0.0", membership.AuthEnforce, "", "s.key", true},
{"lan-ip+enforce+notls", "192.168.1.10", membership.AuthEnforce, "", "", true},
// Error: TLS flags without enforce (cert or key alone is enough to trip it).
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
+12 -2
View File
@@ -138,8 +138,18 @@ func main() {
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
}
if authMode == membership.AuthEnforce {
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
log.Printf("NATS nkey authentication: ON (enforce)")
// Per-subject data-plane ACL (audit H4 / N4 residual): the authenticator
// authorizes by the bus allowlist AND confines each connection to the
// subjects of the rooms it belongs to (plus client-infra subjects). This
// closes the wildcard metadata leak where a registered non-member could
// Subscribe(">") and harvest every room's subject and JetStream activity.
// NATS freezes permissions at connect time, so a peer that joins a room
// after connecting must client.RefreshSession to gain that room's subject.
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
store.IsAuthorized,
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
)
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
}
if *tlsCert != "" || *tlsKey != "" {
if *tlsCert == "" || *tlsKey == "" {
+2 -1
View File
@@ -1,8 +1,9 @@
---
issue: 0005
title: Hardening 2 — CVEs, spoof por firma omitida, DoS por concurrencia, TLS forzado (re-auditoría)
status: spec
status: done
created: 2026-06-07
completed: 2026-06-07
domain: security
scope: unibus (go.mod, pkg/client, pkg/membership/server.go, cmd/membershipd/config.go, pkg/embeddednats, pkg/blobstore)
depends_on: 0001, 0004 (cierra los hallazgos NUEVOS de la re-auditoría sobre lo entregado)
+10 -9
View File
@@ -1,26 +1,28 @@
module github.com/enmanuel/unibus
go 1.25.0
go 1.26.4
replace fn-registry => ../../../../
require (
fn-registry v0.0.0-00010101000000-000000000000
github.com/nats-io/nats-server/v2 v2.10.22
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nats-server/v2 v2.11.15
github.com/nats-io/nats.go v1.49.0
github.com/nats-io/nkeys v0.4.15
github.com/oklog/ulid/v2 v2.1.0
golang.org/x/time v0.7.0
golang.org/x/time v0.15.0
modernc.org/sqlite v1.47.0
)
require (
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/go-tpm v0.9.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.5.8 // indirect
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
github.com/nats-io/jwt/v2 v2.8.1 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
@@ -29,7 +31,6 @@ require (
golang.org/x/mod v0.36.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
golang.org/x/tools v0.45.0 // indirect
modernc.org/libc v1.70.0 // indirect
modernc.org/mathutil v1.7.1 // indirect
+18 -16
View File
@@ -1,27 +1,31 @@
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op h1:kpBdlEPbRvff0mDD1gk7o9BhI16b9p5yYAXRlidpqJE=
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA=
github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.1 h1:V0xpGuD/N8Mi+fQNDynXohVvp7ZztevW5io8CUWlPmU=
github.com/nats-io/jwt/v2 v2.8.1/go.mod h1:nWnOEEiVMiKHQpnAy4eXlizVEtSfzacZ1Q43LIRavZg=
github.com/nats-io/nats-server/v2 v2.11.15 h1:StSf9TINInaZtr4oww2+kXmfwa9SkN//g/LwS19/UJ0=
github.com/nats-io/nats-server/v2 v2.11.15/go.mod h1:zwhv8Y0PE3KHyKgznJc/9Xoai638SaJd83zzJ5GJn74=
github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
@@ -43,10 +47,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
+26
View File
@@ -0,0 +1,26 @@
package busauth
import server "github.com/nats-io/nats-server/v2/server"
// PermissionsFromSubjects adapts a subject-deriving function (e.g.
// membership.SubjectACLFor, which maps an identity to the subjects of the rooms
// it belongs to plus the client infrastructure subjects) into the PermissionsFunc
// the ACL authenticator expects. The derived subjects are granted as BOTH the
// publish and subscribe allow set, so a connection can only pub/sub on the
// subjects it is entitled to. A derivation error is propagated so the caller
// fails closed (denies the connection) rather than granting open access.
//
// This is the production wiring for the per-subject data-plane ACL (issue 0003e,
// audit H4): membershipd passes PermissionsFromSubjects(membership.SubjectACLFor(
// store)) to NewNkeyAuthenticatorACL. It lives in busauth (not membership) so the
// membership package stays free of the nats-server dependency.
func PermissionsFromSubjects(derive func(signPubHex string) ([]string, error)) PermissionsFunc {
return func(signPubHex string) (*server.Permissions, error) {
subjects, err := derive(signPubHex)
if err != nil {
return nil, err
}
sp := &server.SubjectPermission{Allow: subjects}
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
}
}
+11 -1
View File
@@ -799,7 +799,17 @@ func (c *Client) processFrame(roomID string, info roomView, data []byte, handler
if err != nil {
return
}
if info.Policy.SignMsgs && f.Sig != nil {
// A room with SignMsgs REQUIRES a signature, so an unsigned frame is
// unauthenticated and must be dropped — not silently accepted. The previous
// `&& f.Sig != nil` guard verified the signature only when one was present, so
// an attacker with data-plane access could publish a frame with Sig==nil and a
// forged Sender and have the receiver accept it as authentic in a room that
// demands signatures (audit N3, report 0006). Requiring the signature first
// closes that spoof.
if info.Policy.SignMsgs {
if f.Sig == nil {
return // signature required by room policy but absent: drop
}
pub, err := c.signerPub(roomID, f.Sender)
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
return // unauthenticated frame: drop
+154
View File
@@ -0,0 +1,154 @@
package client_test
import (
"sync"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
)
// TestReaudit_SigNilSpoof ports the re-auditor's N3 (Alto) finding: in a room
// that REQUIRES per-message signatures, an attacker with data-plane access
// publishes a raw frame with Sig==nil and a forged Sender. Before the fix
// processFrame verified the signature only when one was present
// (`SignMsgs && f.Sig != nil`), so the receiver accepted the unsigned, forged
// frame as authentic. The fix drops any unsigned frame in a SignMsgs room.
//
// Coverage:
// - golden: a properly signed frame from a real member IS delivered;
// - error : an unsigned frame with a forged Sender in a SignMsgs room is DROPPED;
// - edge : a room WITHOUT SignMsgs still delivers an unsigned frame (the drop
// is specific to signed rooms, not a blanket reject of unsigned frames).
func TestReaudit_SigNilSpoof(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
alice, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect alice: %v", err)
}
defer alice.Close()
bob, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect bob: %v", err)
}
defer bob.Close()
// A signed-but-NOT-encrypted room: SignMsgs enforces authorship, and the lack
// of encryption is exactly the case the auditor flagged as Alto (any peer with
// the subject can forge a sender if signatures are not strictly required).
const subject = "room.signed.spoof"
signedPolicy := room.Policy{Encrypt: false, Persist: false, SignMsgs: true}
roomID, err := alice.CreateRoom(subject, signedPolicy)
if err != nil {
t.Fatalf("alice create signed room: %v", err)
}
if err := alice.Invite(roomID, bob.Endpoint()); err != nil {
t.Fatalf("alice invite bob: %v", err)
}
if err := bob.Join(roomID); err != nil {
t.Fatalf("bob join: %v", err)
}
var mu sync.Mutex
var got []string
sub, err := bob.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got = append(got, string(plaintext))
mu.Unlock()
})
if err != nil {
t.Fatalf("bob subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
// Attacker: a raw NATS connection (the dev harness leaves the data plane open),
// no identity, forged Sender, NO signature.
const spoofMsg = "I am totally the victim"
rawAtk, err := nats.Connect(h.natsURL)
if err != nil {
t.Fatalf("attacker raw connect: %v", err)
}
defer rawAtk.Close()
spoof := frame.Frame{
Type: frame.PUB,
Subject: subject,
Sender: "victim-forged-endpoint",
MsgID: "spoof-1",
Epoch: 1,
Payload: []byte(spoofMsg),
// Sig intentionally nil — this is the attack.
}
sb, err := spoof.Marshal()
if err != nil {
t.Fatalf("marshal spoof: %v", err)
}
if err := rawAtk.Publish(subject, sb); err != nil {
t.Fatalf("attacker publish: %v", err)
}
_ = rawAtk.Flush()
// Golden: alice's properly signed frame must be delivered.
const goodMsg = "authentic from alice"
if err := alice.Publish(roomID, []byte(goodMsg)); err != nil {
t.Fatalf("alice publish: %v", err)
}
if !waitFor(&mu, &got, func(rs []string) bool {
for _, r := range rs {
if r == goodMsg {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("a properly signed frame should be delivered; got %v", snapshot(&mu, &got))
}
// Error path: the unsigned, forged frame must NEVER reach the handler.
for _, r := range snapshot(&mu, &got) {
if r == spoofMsg {
t.Fatalf("SIG-NIL SPOOF: receiver accepted an unsigned frame with a forged Sender in a SignMsgs room")
}
}
// Edge: a room WITHOUT SignMsgs still delivers an unsigned raw frame, proving
// the drop is scoped to signed rooms and did not break the plain-NATS path.
const subjectOpen = "room.open.nosig"
openRoom, err := alice.CreateRoom(subjectOpen, room.ModeNATS)
if err != nil {
t.Fatalf("alice create open room: %v", err)
}
openCol := subscribeCollect(t, alice, openRoom)
defer openCol.sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
const openMsg = "unsigned but allowed here"
openFrame := frame.Frame{
Type: frame.PUB,
Subject: subjectOpen,
Sender: "anyone",
MsgID: "open-1",
Payload: []byte(openMsg),
// no Sig — fine in a non-signed room
}
ob, _ := openFrame.Marshal()
if err := rawAtk.Publish(subjectOpen, ob); err != nil {
t.Fatalf("publish open frame: %v", err)
}
_ = rawAtk.Flush()
if !waitFor(&openCol.mu, &openCol.msgs, func(rs []string) bool {
for _, r := range rs {
if r == openMsg {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("an unsigned frame in a non-signed room should be delivered; got %v", snapshot(&openCol.mu, &openCol.msgs))
}
}
+103 -11
View File
@@ -39,18 +39,12 @@ func mustID(t *testing.T) cs.Identity {
return id
}
// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc
// the ACL authenticator expects (same Allow set for publish and subscribe).
// aclPermsFunc builds the per-subject PermissionsFunc the ACL authenticator
// expects. It delegates to the SAME production wiring membershipd uses
// (busauth.PermissionsFromSubjects over membership.SubjectACLFor), so this test
// exercises the real path rather than a test-only reimplementation.
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
derive := membership.SubjectACLFor(store)
return func(signPubHex string) (*server.Permissions, error) {
subs, err := derive(signPubHex)
if err != nil {
return nil, err
}
sp := &server.SubjectPermission{Allow: subs}
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
}
return busauth.PermissionsFromSubjects(membership.SubjectACLFor(store))
}
// startACLNats boots an embedded NATS whose authenticator confines each peer to
@@ -219,6 +213,104 @@ func TestSubjectACLIsolation(t *testing.T) {
}
}
// TestReaudit_H4_WildcardMetadataLeak ports the re-auditor's H4 vector. Before
// the per-subject ACL was WIRED into membershipd (it existed in pkg/membership and
// pkg/busauth but the binary used the plain NewNkeyAuthenticator), a registered
// NON-member could open a raw NATS connection, Subscribe(">"), and capture every
// room's subject plus JetStream stream/advisory activity — the payload stayed E2E
// ciphertext, but the metadata leaked. With NewNkeyAuthenticatorACL wired via the
// production path (busauth.PermissionsFromSubjects(membership.SubjectACLFor)), a
// non-member is confined to the client-infra subjects, so the wildcard and any
// foreign room subject are denied.
//
// Coverage:
// - error : a non-member's Subscribe(">") raises a permission violation;
// - edge : a non-member subscribing to another room's exact subject is denied;
// - golden: the member still pub/subs her own room, and the non-member never
// captures that traffic.
//
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
// subscribes specifically to "$JS.API.>" can still observe stream-management
// requests whose subjects embed the stream name derived from a room id. Fully
// closing that needs NATS accounts/permissions isolation per identity (deferred to
// the 0003 decentralization line). The high-impact leak the auditor exploited —
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
// — is closed.
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, eve := mustID(t), mustID(t)
aliceEP := frame.EndpointID(alice.SignPub)
mustAddUser(t, store, alice, "alice")
mustAddUser(t, store, eve, "eve") // eve is REGISTERED but never a member of alice's room
const subject = "room.e2e.confidential"
mustCreateRoom(t, store, "ROOMA", subject, aliceEP, alice)
srv := startACLNats(t, store)
url := srv.ClientURL()
eveErr := make(chan error, 8)
eveNC := nkeyConn(t, url, eve, eveErr)
eveAll := make(chan *nats.Msg, 16)
// Error: eve's wildcard subscription is rejected. nats.go creates the local sub
// object and the server rejects it asynchronously (delivered to ErrorHandler).
if _, err := eveNC.Subscribe(">", func(m *nats.Msg) { eveAll <- m }); err != nil {
t.Fatalf("eve sub >: %v", err)
}
_ = eveNC.Flush()
if e := waitErr(eveErr, 1*time.Second); e == nil {
t.Fatalf("a non-member's Subscribe(\">\") must raise a permissions violation (wildcard metadata leak still open)")
}
// Edge: eve subscribing to the foreign room's EXACT subject is also denied.
drain(eveErr)
if _, err := eveNC.Subscribe(subject, func(m *nats.Msg) { eveAll <- m }); err != nil {
t.Fatalf("eve sub subject: %v", err)
}
_ = eveNC.Flush()
if e := waitErr(eveErr, 1*time.Second); e == nil {
t.Fatalf("a non-member subscribing to another room's subject must be denied")
}
// Golden: alice (the member) pub/subs her own room with no violation, and eve
// never captured the traffic despite her (rejected) wildcard.
aliceErr := make(chan error, 4)
aliceNC := nkeyConn(t, url, alice, aliceErr)
aliceGot := make(chan string, 4)
if _, err := aliceNC.Subscribe(subject, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil {
t.Fatalf("alice sub own room: %v", err)
}
_ = aliceNC.Flush()
if e := waitErr(aliceErr, 300*time.Millisecond); e != nil {
t.Fatalf("alice subscribing to her OWN room raised an error: %v", e)
}
if err := aliceNC.Publish(subject, []byte("members-only metadata")); err != nil {
t.Fatalf("alice publish: %v", err)
}
_ = aliceNC.Flush()
select {
case got := <-aliceGot:
if got != "members-only metadata" {
t.Fatalf("alice got %q", got)
}
case <-time.After(2 * time.Second):
t.Fatalf("alice did not receive her own room's message")
}
select {
case m := <-eveAll:
t.Fatalf("eve captured room traffic despite the ACL: subject=%q data=%q", m.Subject, m.Data)
case <-time.After(500 * time.Millisecond):
// good: eve captured nothing
}
}
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
// alice is not in room B, so her connection has no permission for its subject;
// after she is added to room B and calls RefreshSession, the reconnect
+148
View File
@@ -0,0 +1,148 @@
package membership
import (
"net/http"
"net/http/httptest"
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to
// call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the
// test's own goroutine). Returns 0 when unavailable.
func readRSSkBRaw() int64 {
b, err := os.ReadFile("/proc/self/status")
if err != nil {
return 0
}
for _, line := range strings.Split(string(b), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
f := strings.Fields(line)
if len(f) >= 2 {
v, _ := strconv.ParseInt(f[1], 10, 64)
return v
}
}
}
return 0
}
// TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the
// per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE
// memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40
// concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the
// number of simultaneously-buffered uploads is capped, so the resident set stays
// bounded regardless of how many connections arrive at once.
//
// Coverage:
// - golden: a normal upload succeeds, and the server is still healthy after the
// storm (the limiter did not wedge it);
// - edge : concurrency right at the cap is admitted;
// - error : a concurrent flood far past the cap sheds the excess with 503
// (backpressure) instead of buffering it all, and the RSS spike stays bounded
// and does NOT scale with the number of requests.
func TestReaudit_DoSConcurrency(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("RSS probe is Linux-only")
}
srv := dosServer(t, AuthOff)
// Force a small aggregate cap so the bound is observable in a unit test: with
// a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses
// maxInflightBytes (128 MiB); the mechanism under test is identical.
const cap = int64(48) << 20
srv.inflight = newInflightLimiter(cap)
const blob = maxBlobBytes // 16 MiB, the per-request ceiling
const n = 40 // the auditor's figure
// A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a
// ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the
// delta should stay well under this. Without the limiter, 40 concurrent
// uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB).
const maxSpikeKB = int64(256) << 10 // 256 MiB
runtime.GC()
before := readRSSkBRaw()
// Sample peak RSS while the storm runs.
var peak int64
atomic.StoreInt64(&peak, before)
stop := make(chan struct{})
var sampler sync.WaitGroup
sampler.Add(1)
go func() {
defer sampler.Done()
for {
select {
case <-stop:
return
default:
if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) {
atomic.StoreInt64(&peak, v)
}
time.Sleep(2 * time.Millisecond)
}
}
}()
var got503, got200 int64
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob})
req.ContentLength = blob
// Distinct source IP per request: this is the multi-IP (botnet) shape the
// auditor flagged, where the per-IP rate limit gives no aggregate defense.
// The in-flight byte limiter is the global bound that must hold here.
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234"
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, req)
switch rec.Code {
case http.StatusServiceUnavailable:
atomic.AddInt64(&got503, 1)
case http.StatusOK:
atomic.AddInt64(&got200, 1)
}
}()
}
wg.Wait()
close(stop)
sampler.Wait()
runtime.GC()
delta := atomic.LoadInt64(&peak) - before
// Error path: the flood must have hit the cap and shed the excess with 503.
if got503 == 0 {
t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503)
}
// The aggregate memory must stay bounded — not scale with n.
if delta > maxSpikeKB {
t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB)
}
// All reservations released after the storm.
if f := srv.inflight.inFlight(); f != 0 {
t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f)
}
// Golden: the server is still healthy and serves a normal upload (from a fresh
// IP so the per-IP rate limiter, untouched here, is not what we measure).
rec := httptest.NewRecorder()
gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm"))
gReq.RemoteAddr = "203.0.113.9:9999"
srv.ServeHTTP(rec, gReq)
if rec.Code != http.StatusOK {
t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String())
}
t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB",
n, got200, got503, delta, maxSpikeKB, cap>>20)
}
+85
View File
@@ -0,0 +1,85 @@
package membership
import "sync/atomic"
// inflightLimiter is a non-blocking, byte-counting concurrency limiter: a global
// cap on how many bytes of request body the server will buffer simultaneously.
//
// The per-request body ceilings (maxControlBodyBytes / maxBlobBytes) bound a
// single request, and the per-IP rate limiter throttles a single source, but
// neither bounds the AGGREGATE memory across many concurrent uploads: the
// re-audit (report 0006, N2) showed 40 concurrent 16 MiB blob uploads driving
// RSS to ~1.42 GB, and a distributed (multi-IP) flood scales without a ceiling
// because the rate limiter is per-IP. This limiter is the missing aggregate
// bound: ServeHTTP reserves a request's worst-case buffered size before reading
// the body and releases it when the request finishes, so the total bytes in
// flight can never exceed max regardless of how many connections or source IPs
// arrive at once.
//
// It is intentionally NON-blocking: when a reservation does not fit, the caller
// sheds the request with backpressure (503) rather than parking a goroutine,
// which would let an attacker exhaust goroutines/connections instead of RAM. The
// counter is maintained with sync/atomic (a CAS loop), so it is safe for
// concurrent use without a mutex.
//
// Implementation note: this lives inside unibus rather than the fn-registry
// (where a generic concurrency primitive would normally belong) because the
// registry's functions/core package pulls in transitive dependencies that
// require CGO (mattn/go-sqlite3) and external modules, which are incompatible
// with unibus's CGO_ENABLED=0 build, and because this work is scoped to the
// unibus sub-repo.
type inflightLimiter struct {
max int64 // immutable after construction; <= 0 disables the limiter
used int64 // bytes currently reserved; accessed ONLY via sync/atomic
}
// newInflightLimiter builds a limiter with a cap of maxBytes bytes in flight.
// maxBytes <= 0 disables the cap (tryAcquire always grants), which is the
// loopback/dev posture where an aggregate memory ceiling is not wanted.
func newInflightLimiter(maxBytes int64) *inflightLimiter {
return &inflightLimiter{max: maxBytes}
}
// tryAcquire reserves n bytes without blocking. It returns true and reserves the
// bytes when they fit within the cap (used+n <= max), or false (reserving
// nothing) when they do not. n <= 0 is granted without reserving, and a disabled
// limiter (max <= 0) always grants. Safe for concurrent use.
func (l *inflightLimiter) tryAcquire(n int64) bool {
if l.max <= 0 || n <= 0 {
return true
}
for {
cur := atomic.LoadInt64(&l.used)
if cur+n > l.max {
return false
}
if atomic.CompareAndSwapInt64(&l.used, cur, cur+n) {
return true
}
}
}
// release returns n previously reserved bytes. It must be paired with a
// tryAcquire that granted. A disabled limiter or n <= 0 is a no-op. The counter
// never drops below zero (a defensive clamp against an accidental double release).
func (l *inflightLimiter) release(n int64) {
if l.max <= 0 || n <= 0 {
return
}
for {
cur := atomic.LoadInt64(&l.used)
nv := cur - n
if nv < 0 {
nv = 0
}
if atomic.CompareAndSwapInt64(&l.used, cur, nv) {
return
}
}
}
// inFlight returns the bytes currently reserved. It is observability for tests
// and metrics.
func (l *inflightLimiter) inFlight() int64 {
return atomic.LoadInt64(&l.used)
}
+97
View File
@@ -0,0 +1,97 @@
package membership
import (
"sync"
"testing"
)
// TestInflightLimiterBasics covers the limiter contract: granting within the cap
// (golden), the exact boundary (edge), refusal over the cap without mutating the
// counter (error), the disabled mode, and the defensive clamp on over-release.
func TestInflightLimiterBasics(t *testing.T) {
l := newInflightLimiter(100)
// Golden: a reservation within the cap is granted and reflected.
if !l.tryAcquire(60) {
t.Fatalf("acquire 60 within cap 100 should grant")
}
if l.inFlight() != 60 {
t.Fatalf("inFlight = %d, want 60", l.inFlight())
}
// Edge: exactly reaching the cap (60+40 == 100) is granted.
if !l.tryAcquire(40) {
t.Fatalf("acquire to the exact cap should grant")
}
if l.inFlight() != 100 {
t.Fatalf("inFlight = %d, want 100", l.inFlight())
}
// Error: one more byte over the full cap is refused, and the counter is left
// untouched (a refused reservation reserves nothing).
if l.tryAcquire(1) {
t.Fatalf("acquire over a full cap must be refused")
}
if l.inFlight() != 100 {
t.Fatalf("a refused acquire must not change inFlight; got %d", l.inFlight())
}
// Release frees capacity again.
l.release(100)
if l.inFlight() != 0 {
t.Fatalf("inFlight after full release = %d, want 0", l.inFlight())
}
// Defensive: an over-release never drives the counter negative.
l.release(50)
if l.inFlight() != 0 {
t.Fatalf("over-release must clamp at 0; got %d", l.inFlight())
}
}
// TestInflightLimiterDisabled verifies that a non-positive cap disables the
// limiter: every reservation is granted and nothing is tracked (the loopback/dev
// posture).
func TestInflightLimiterDisabled(t *testing.T) {
for _, max := range []int64{0, -1} {
l := newInflightLimiter(max)
if !l.tryAcquire(1 << 30) {
t.Fatalf("disabled limiter (max=%d) must always grant", max)
}
if l.inFlight() != 0 {
t.Fatalf("disabled limiter must not track usage; got %d", l.inFlight())
}
l.release(1 << 30) // no-op, must not panic
}
}
// TestInflightLimiterConcurrent hammers the limiter from many goroutines with
// equal-sized acquire/release pairs and asserts the invariant never breaks: the
// counter returns to 0 and never exceeds the cap. Run with -race for the memory
// model guarantee.
func TestInflightLimiterConcurrent(t *testing.T) {
const cap = 1000
const chunk = 7
l := newInflightLimiter(cap)
var wg sync.WaitGroup
for g := 0; g < 64; g++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 2000; i++ {
if l.tryAcquire(chunk) {
if f := l.inFlight(); f > cap {
t.Errorf("inFlight %d exceeded cap %d", f, cap)
return
}
l.release(chunk)
}
}
}()
}
wg.Wait()
if l.inFlight() != 0 {
t.Fatalf("after all goroutines, inFlight = %d, want 0", l.inFlight())
}
}
+26
View File
@@ -35,6 +35,14 @@ const (
// MaxHeaderBytes caps request header size; wired into the http.Server by the
// command. Exported so the bound lives next to its body-size siblings.
MaxHeaderBytes = 1 << 20 // 1 MiB
// maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all
// concurrent requests (audit N2). The per-request ceilings above bound one
// request; this bounds the sum, so a concurrent (even multi-IP) flood of
// max-size uploads cannot drive the resident set without limit. 128 MiB allows
// ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before
// further POSTs are shed with 503 — generous for an interactive bus, bounded
// for an attacker.
maxInflightBytes = 128 << 20 // 128 MiB
)
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
@@ -62,6 +70,7 @@ type Server struct {
authMode AuthMode
nonces nonceStore
limiter *ipRateLimiter
inflight *inflightLimiter
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
// rooms. It is the minimum-defensive control for the data plane (audit H4):
@@ -87,6 +96,7 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
authMode: authMode,
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
inflight: newInflightLimiter(maxInflightBytes),
}
s.routes()
return s
@@ -139,6 +149,22 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
r.Body = http.MaxBytesReader(w, r.Body, limit)
// Aggregate memory bound (audit N2): the per-request ceiling above and the
// per-IP rate limit do not cap the TOTAL bytes buffered across concurrent
// requests. A POST reserves its worst-case buffered size (its route ceiling)
// from a global limiter before the body is read, and is shed with 503 when the
// cap is reached, so the resident set stays bounded under a concurrent (even
// multi-IP) upload flood instead of growing linearly with the number of
// connections. Reservation is released when the request finishes. Only POSTs
// buffer a body; GETs carry none, so they do not consume the budget.
if r.Method == http.MethodPost {
if !s.inflight.tryAcquire(limit) {
writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight")
return
}
defer s.inflight.release(limit)
}
if s.authMode == AuthOff || isAuthExempt(r) {
s.mux.ServeHTTP(w, r)
return