12 Commits

Author SHA1 Message Date
egutierrez df3b62a601 Merge quick/0005-bump-close: unibus 0.7.0 + close issue 0005 2026-06-07 16:17:41 +02:00
egutierrez 6976537842 chore(0005): bump unibus to 0.7.0, close issue 0005 (hardening 2)
Hardening 2 (issue 0005, fases 0005a-0005e) cierra los hallazgos nuevos de la
re-auditoría red-team (report 0006): bump de nats-server + toolchain (16 CVEs ->
0 alcanzables), drop de frames sin firma en rooms SignMsgs, limiter global de
bytes en vuelo contra el DoS por concurrencia, TLS obligatorio en bind publico, y
cableado de la ACL por subject que cierra el wildcard metadata leak. Detalle por
fase en el capability growth log del app.md y en el report 0007.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:17:41 +02:00
egutierrez a4bbe8209b Merge issue/0005e-acl-wire: wire per-subject ACL into membershipd (audit H4) 2026-06-07 16:15:52 +02:00
egutierrez 87ef52cc80 fix(0005e): wire per-subject ACL into membershipd (close H4 wildcard metadata leak)
The per-subject data-plane ACL existed since 0003e (membership.SubjectACLFor +
busauth.NewNkeyAuthenticatorACL, unit-tested in TestSubjectACLIsolation) but the
binary never used it: cmd/membershipd installed the plain NewNkeyAuthenticator, so
in production a registered NON-member could open a raw NATS connection,
Subscribe(">"), and harvest every room's subject plus JetStream stream/advisory
activity (payload stayed E2E ciphertext, metadata leaked) — the re-audit's H4
vector (report 0006).

Fix:
- New busauth.PermissionsFromSubjects adapts a subject-deriving function into the
  PermissionsFunc the ACL authenticator expects (subjects granted as both the
  publish and subscribe allow set; a derivation error fails closed). It lives in
  busauth so membership stays free of the nats-server dependency.
- cmd/membershipd, under enforce, now installs
  NewNkeyAuthenticatorACL(store.IsAuthorized,
    PermissionsFromSubjects(membership.SubjectACLFor(store)))
  so every connection is confined to the subjects of the rooms it belongs to plus
  the client-infra subjects.
- pkg/membership/acl_test.go's helper now delegates to the production wiring
  (PermissionsFromSubjects) instead of a test-only reimplementation, so the tests
  exercise the real path.

Verification (pkg/membership/acl_test.go):
- TestReaudit_H4_WildcardMetadataLeak: a non-member's Subscribe(">") and any
  foreign-subject subscribe raise permission violations; the member still pub/subs
  her own room and the non-member captures nothing. With the plain authenticator
  (the pre-0005e wiring) the test fails ("wildcard metadata leak still open"),
  confirming the wiring is what closes it.
- TestSubjectACLIsolation / TestRefreshSessionGainsNewRoom still green.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green.

Residual (documented): the client-infra grant includes "$JS.API.>", shared by all
peers so per-connection JetStream works; a peer that subscribes specifically to
"$JS.API.>" can still observe stream-management requests whose subjects embed the
room-derived stream name. Fully closing that needs NATS accounts/permissions per
identity (deferred to the 0003 decentralization line). Operational note: NATS
freezes permissions at connect time, so clients must client.RefreshSession after a
membership change to gain a new room's subject; cmd/chat and cmd/worker do not yet
call it, a functional gap to close before an enforce+ACL deployment.

Refs: report 0006 H4, issue 0005e.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:15:52 +02:00
egutierrez a2ec78c81d Merge issue/0005d-tls-guard: require TLS on public bind (audit N4) 2026-06-07 16:11:45 +02:00
egutierrez d01da9d396 fix(0005d): require TLS on a public bind (close N4 plaintext control plane)
The H2 guard refused "public bind without enforce" and "TLS flags without
enforce", but it still ALLOWED a public bind with enforce and no --tls-cert: the
control plane then served metadata (subjects, pubkeys, sealed keys, the social
graph) over plaintext HTTP publicly, so audit H5 reappeared as the N4 gap (TLS
was a capability, not a requirement; report 0006).

Fix: validateBootConfig now also refuses a non-loopback --bind unless both
--tls-cert and --tls-key are set. Public deployments must serve HTTPS; loopback
dev is unaffected (no TLS still allowed there).

Verification (cmd/membershipd/config_test.go):
- TestGap_PublicEnforceNoTLS: validateBootConfig("0.0.0.0", enforce, "", "")
  now returns an error mentioning --tls-cert (golden public+enforce+TLS allowed;
  edge loopback-without-TLS still allowed).
- TestBootConfigPolicy table updated: public+enforce+notls / +certonly / +keyonly
  and lan-ip+enforce+notls are now refused; public+enforce+tls and
  loopback+enforce+tls allowed.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green.

Refs: report 0006 N4, issue 0005d.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:11:45 +02:00
egutierrez db8618ddc3 Merge issue/0005c-inflight: global in-flight byte limiter bounds aggregate memory (audit N2) 2026-06-07 16:09:58 +02:00
egutierrez e7d59fd01d fix(0005c): bound aggregate buffered memory with a global in-flight byte limiter
The H1 fix bounds each request (1 MiB control / 16 MiB blob) and the per-IP rate
limiter throttles a single source, but neither bounds the AGGREGATE memory across
concurrent requests. The re-audit (report 0006, N2) drove RSS to ~1.42 GB with 40
concurrent 16 MiB uploads, and noted that a multi-IP (botnet) flood scales without
a ceiling because the rate limit is per-IP.

Fix: a global, non-blocking, byte-counting limiter (pkg/membership/inflight.go).
ServeHTTP reserves a POST's worst-case buffered size (its route ceiling) from the
limiter before reading the body, and releases it when the request finishes. When
the global cap (maxInflightBytes = 128 MiB) is reached, further POSTs are shed
with 503 (backpressure) rather than parking goroutines, so total bytes buffered
in flight stays bounded regardless of connection count or source-IP spread. GETs
carry no body and do not consume the budget.

The limiter is implemented inside unibus (not delegated to the fn-registry, where
a generic concurrency primitive would normally live) because functions/core pulls
transitive deps requiring CGO (mattn/go-sqlite3) and external modules that are
incompatible with unibus's CGO_ENABLED=0 build, and because this work is scoped
to the unibus sub-repo. The type/method comments document this.

Verification:
- pkg/membership/inflight_test.go: TestInflightLimiter{Basics,Disabled,Concurrent}
  cover golden/edge/error/disabled/over-release and a -race concurrency invariant
  (inFlight returns to 0, never exceeds cap).
- pkg/membership/dos_concurrency_test.go: TestReaudit_DoSConcurrency fires 40
  concurrent 16 MiB uploads from distinct IPs (the multi-IP shape) against a 48 MiB
  test cap -> 200=3 503=37, RSS delta ~93 MiB (bound 256 MiB), inFlight()==0, and a
  fresh upload still 200. With the limiter disabled the test fails (200=40 503=0),
  confirming it is a real regression guard.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green;
  CGO_ENABLED=1 go test -race ./pkg/membership/ green.

Residual (documented): under enforce the body is buffered twice (auth verify +
handler), so real RSS is ~2x the reserved bytes; closing that fully means
streaming blobs to disk (overlaps H9 / issue 0002).

Refs: report 0006 N2, issue 0005c.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 16:09:58 +02:00
egutierrez 0f79708338 Merge issue/0005b-sig-nil: drop unsigned frames in SignMsgs rooms (audit N3) 2026-06-07 15:58:10 +02:00
egutierrez ef3af6dfd1 fix(0005b): drop unsigned frames in SignMsgs rooms (close sig-nil spoof)
client.processFrame verified a frame's signature only when one was present
(`info.Policy.SignMsgs && f.Sig != nil`). In a room whose policy REQUIRES
per-message signatures, an attacker with data-plane access could publish a raw
frame with Sig==nil and a forged Sender, and the receiver accepted it as
authentic because the verification block was skipped (audit N3, report 0006).
On a signed-but-cleartext room any peer that knows the subject could thus
impersonate any sender.

Fix: in a SignMsgs room a missing signature is itself a rejection. processFrame
now drops any frame with Sig==nil before attempting verification:

    if info.Policy.SignMsgs {
        if f.Sig == nil { return }   // signature required but absent: drop
        // verify ...
    }

Non-signed rooms (ModeNATS) are unaffected: unsigned frames there are still
delivered, so the plain-NATS path is unchanged.

Verification (pkg/client/sig_nil_spoof_test.go, TestReaudit_SigNilSpoof):
- golden: a properly signed frame from a member is delivered.
- error : an unsigned frame with a forged Sender in a SignMsgs room is dropped
  (the test fails with "SIG-NIL SPOOF: receiver accepted ..." when the fix is
  reverted, confirming it is a real regression guard).
- edge  : a non-signed room still delivers an unsigned frame.
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  green.

Refs: report 0006 N3, issue 0005b.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 15:58:10 +02:00
egutierrez 88b47912bd Merge issue/0005a-cve-bump: bump nats-server to v2.11.15 + go1.26.4 (16 CVEs -> 0 reachable) 2026-06-07 15:55:32 +02:00
egutierrez a3ac58fb70 fix(0005a): bump nats-server v2.10.22->v2.11.15 + toolchain go1.26.4 (close 16 CVEs)
govulncheck reported 16 reachable vulnerabilities (re-audit finding N1, report 0006):
14 in github.com/nats-io/nats-server/v2@v2.10.22 -- the embedded NATS server, which
is exposed to the internet in the chosen deployment -- and 2 in the Go standard
library (GO-2026-5039 net/textproto, GO-2026-5037 crypto/x509).

Changes:
- go get github.com/nats-io/nats-server/v2@v2.11.15 (covers all 14 server CVEs;
  pulls nats.go v1.49.0, nkeys v0.4.15, jwt v2.8.1, klauspost/compress v1.18.4
  and friends transitively).
- go directive 1.25.0 -> 1.26.4 so the toolchain ships the two stdlib fixes.

This is a go.mod/go.sum change justified purely by CVE remediation; it is the
explicit exception to the "do not touch deps" rule for a CVE bump.

Verification:
- CGO_ENABLED=0 go build ./... && go vet ./... && go test -count=1 ./...  -> green,
  including the 0003 multi-node cluster/JetStream e2e in pkg/embeddednats, so the
  server bump did not break the cluster or the durable plane.
- govulncheck ./...  -> "No vulnerabilities found" (0 reachable; the 13 that remain
  are in required-but-not-called modules).

Refs: report 0006 N1, issue 0005a.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 15:55:32 +02:00
15 changed files with 759 additions and 45 deletions
+24 -1
View File
@@ -2,7 +2,7 @@
name: unibus name: unibus
lang: go lang: go
domain: infra domain: infra
version: 0.6.0 version: 0.7.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e] tags: [service, messaging, nats, e2e]
uses_functions: uses_functions:
@@ -154,6 +154,29 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log ## Capability growth log
- v0.7.0 (2026-06-07) — hardening de seguridad 2 (issue 0005, fases 0005a0005e)
que cierra los hallazgos nuevos de la re-auditoría red-team (report 0006) y
lleva el veredicto de exposición pública a "sí-con-condiciones". (0005a) Bump de
`github.com/nats-io/nats-server/v2` v2.10.22→v2.11.15 y de la toolchain a
go1.26.4: `govulncheck ./...` pasa de 16 vulnerabilidades alcanzables (14 del
servidor NATS embebido + 2 de la stdlib) a 0. (0005b) `client.processFrame`
ahora descarta cualquier frame sin firma en una room `SignMsgs` (antes verificaba
solo si la firma venía presente, lo que permitía suplantar `Sender` con
`Sig==nil`). (0005c) Nuevo limiter global de bytes en vuelo
(`pkg/membership.inflightLimiter`) que acota la memoria agregada que el control
plane bufferiza bajo concurrencia (el límite por-request y el rate-limit por-IP
no acotaban el total): un flood concurrente multi-IP se descarta con 503 en vez
de crecer sin techo (el RSS deja de escalar con N). (0005d) El guard de arranque
`validateBootConfig` ahora exige `--tls-cert/--tls-key` en bind no-loopback (un
control plane público sin TLS servía metadata en claro). (0005e) Se cablea por
fin en `membershipd` la ACL por subject que ya existía huérfana desde 0003e
(`busauth.NewNkeyAuthenticatorACL` + nuevo adaptador `busauth.PermissionsFromSubjects`
sobre `membership.SubjectACLFor`): un registrado no-miembro ya no puede
`Subscribe(">")` y captar los subjects/advisories de rooms ajenas. Residuales
documentados: `$JS.API.>` sigue compartido (cierre completo = NATS accounts por
identidad, diferido) y los clientes deben `RefreshSession` tras cambios de
membresía (chat/worker aún no lo hacen). El comportamiento de un solo nodo no
cambia y master sigue verde.
- v0.6.0 (2026-06-07) — descentralización / alta disponibilidad (issue 0003, - v0.6.0 (2026-06-07) — descentralización / alta disponibilidad (issue 0003,
fases 0003a0003e), report 0006. El servidor NATS embebido gana soporte de fases 0003a0003e), report 0006. El servidor NATS embebido gana soporte de
cluster con routes autenticadas (secreto de cluster) y TLS mutuo de nodo cluster con routes autenticadas (secreto de cluster) y TLS mutuo de nodo
+10 -2
View File
@@ -43,9 +43,12 @@ func isLoopbackBind(bind string) bool {
// configuration that would expose the bus without enforced authentication: // configuration that would expose the bus without enforced authentication:
// //
// - a non-loopback --bind without --bus-auth enforce (the data plane and // - a non-loopback --bind without --bus-auth enforce (the data plane and
// control plane would both accept anyone), and // control plane would both accept anyone),
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel // - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
// but authenticates no one — encrypted access for everybody is still open). // but authenticates no one — encrypted access for everybody is still open), and
// - a non-loopback --bind WITHOUT --tls-cert/--tls-key (the control plane would
// serve metadata over plaintext HTTP publicly — audit H5 reappearing, the N4
// gap the re-audit found: TLS was available but not mandatory).
// //
// It is a pure function of the parsed flags so the command can fail fast at // It is a pure function of the parsed flags so the command can fail fast at
// startup and tests can assert the policy without booting a server. // startup and tests can assert the policy without booting a server.
@@ -60,6 +63,11 @@ func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey s
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce", "refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
mode) mode)
} }
if !isLoopbackBind(bind) && (tlsCert == "" || tlsKey == "") {
return fmt.Errorf(
"refusing to start: --bind %q is not loopback but --tls-cert/--tls-key are not both set; a public control plane must serve HTTPS or its metadata (subjects, pubkeys, sealed keys, the social graph) travels in cleartext to a network MITM (audit H5/N4) — provide a CA-signed --tls-cert/--tls-key, or bind 127.0.0.1 for local dev",
bind)
}
return nil return nil
} }
+33 -2
View File
@@ -30,6 +30,31 @@ func TestAudit_FailOpenTLSWithoutAuth(t *testing.T) {
} }
} }
// TestGap_PublicEnforceNoTLS ports the re-auditor's N4 gap: the H2 guard refused
// "public without enforce" and "TLS without enforce", but ALLOWED a public bind
// with enforce and NO --tls-cert, so the control plane served metadata over
// plaintext HTTP publicly (H5 reappearing). The guard now refuses it.
func TestGap_PublicEnforceNoTLS(t *testing.T) {
// The exact auditor configuration: public bind, enforce on, no TLS cert/key.
err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "", "")
if err == nil {
t.Fatalf("public bind + enforce + NO --tls-cert must be refused: the control plane would serve plaintext HTTP publicly (audit N4)")
}
if !strings.Contains(err.Error(), "tls-cert") {
t.Fatalf("error should point the operator at --tls-cert/--tls-key, got: %v", err)
}
// Golden: the same public+enforce config WITH a cert/key is allowed.
if err := validateBootConfig("0.0.0.0", membership.AuthEnforce, "server.crt", "server.key"); err != nil {
t.Fatalf("public + enforce + TLS is the intended production config, got: %v", err)
}
// Edge: loopback without TLS stays allowed (local dev is not a public exposure).
if err := validateBootConfig("127.0.0.1", membership.AuthOff, "", ""); err != nil {
t.Fatalf("loopback dev without TLS must remain allowed, got: %v", err)
}
}
// TestBootConfigPolicy is the full table: the golden secure-public config is // TestBootConfigPolicy is the full table: the golden secure-public config is
// allowed, dev loopback is allowed, and every fail-open shape is refused. // allowed, dev loopback is allowed, and every fail-open shape is refused.
func TestBootConfigPolicy(t *testing.T) { func TestBootConfigPolicy(t *testing.T) {
@@ -41,19 +66,25 @@ func TestBootConfigPolicy(t *testing.T) {
key string key string
wantErr bool wantErr bool
}{ }{
// Golden: the intended public production config. // Golden: the intended public production config — enforce AND TLS.
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false}, {"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", false},
// Edge: local dev on loopback may stay open (no auth, no TLS). // Edge: local dev on loopback may stay open (no auth, no TLS).
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false}, {"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false}, {"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
{"localhost+off", "localhost", membership.AuthOff, "", "", false}, {"localhost+off", "localhost", membership.AuthOff, "", "", false},
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false}, {"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
// Edge: loopback with full enforce+TLS is also fine.
{"loopback+enforce+tls", "127.0.0.1", membership.AuthEnforce, "s.crt", "s.key", false},
// Error: public bind without enforce. // Error: public bind without enforce.
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true}, {"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true}, {"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true}, {"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
{"empty-bind+off", "", membership.AuthOff, "", "", true}, {"empty-bind+off", "", membership.AuthOff, "", "", true},
// Error (N4): public bind + enforce but NO TLS -> plaintext control plane.
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", true},
{"public+enforce+certonly", "0.0.0.0", membership.AuthEnforce, "s.crt", "", true},
{"public+enforce+keyonly", "0.0.0.0", membership.AuthEnforce, "", "s.key", true},
{"lan-ip+enforce+notls", "192.168.1.10", membership.AuthEnforce, "", "", true},
// Error: TLS flags without enforce (cert or key alone is enough to trip it). // Error: TLS flags without enforce (cert or key alone is enough to trip it).
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true}, {"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true}, {"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
+12 -2
View File
@@ -138,8 +138,18 @@ func main() {
log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes)) log.Printf("cluster: %q node %q, route port %d, %d peer route(s)", *clusterName, *serverName, *clusterPort, len(cc.Routes))
} }
if authMode == membership.AuthEnforce { if authMode == membership.AuthEnforce {
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized) // Per-subject data-plane ACL (audit H4 / N4 residual): the authenticator
log.Printf("NATS nkey authentication: ON (enforce)") // authorizes by the bus allowlist AND confines each connection to the
// subjects of the rooms it belongs to (plus client-infra subjects). This
// closes the wildcard metadata leak where a registered non-member could
// Subscribe(">") and harvest every room's subject and JetStream activity.
// NATS freezes permissions at connect time, so a peer that joins a room
// after connecting must client.RefreshSession to gain that room's subject.
cfg.Auth = busauth.NewNkeyAuthenticatorACL(
store.IsAuthorized,
busauth.PermissionsFromSubjects(membership.SubjectACLFor(store)),
)
log.Printf("NATS nkey authentication: ON (enforce, per-subject ACL)")
} }
if *tlsCert != "" || *tlsKey != "" { if *tlsCert != "" || *tlsKey != "" {
if *tlsCert == "" || *tlsKey == "" { if *tlsCert == "" || *tlsKey == "" {
+2 -1
View File
@@ -1,8 +1,9 @@
--- ---
issue: 0005 issue: 0005
title: Hardening 2 — CVEs, spoof por firma omitida, DoS por concurrencia, TLS forzado (re-auditoría) title: Hardening 2 — CVEs, spoof por firma omitida, DoS por concurrencia, TLS forzado (re-auditoría)
status: spec status: done
created: 2026-06-07 created: 2026-06-07
completed: 2026-06-07
domain: security domain: security
scope: unibus (go.mod, pkg/client, pkg/membership/server.go, cmd/membershipd/config.go, pkg/embeddednats, pkg/blobstore) scope: unibus (go.mod, pkg/client, pkg/membership/server.go, cmd/membershipd/config.go, pkg/embeddednats, pkg/blobstore)
depends_on: 0001, 0004 (cierra los hallazgos NUEVOS de la re-auditoría sobre lo entregado) depends_on: 0001, 0004 (cierra los hallazgos NUEVOS de la re-auditoría sobre lo entregado)
+10 -9
View File
@@ -1,26 +1,28 @@
module github.com/enmanuel/unibus module github.com/enmanuel/unibus
go 1.25.0 go 1.26.4
replace fn-registry => ../../../../ replace fn-registry => ../../../../
require ( require (
fn-registry v0.0.0-00010101000000-000000000000 fn-registry v0.0.0-00010101000000-000000000000
github.com/nats-io/nats-server/v2 v2.10.22 github.com/nats-io/nats-server/v2 v2.11.15
github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nats.go v1.49.0
github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nkeys v0.4.15
github.com/oklog/ulid/v2 v2.1.0 github.com/oklog/ulid/v2 v2.1.0
golang.org/x/time v0.7.0 golang.org/x/time v0.15.0
modernc.org/sqlite v1.47.0 modernc.org/sqlite v1.47.0
) )
require ( require (
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op // indirect
github.com/dustin/go-humanize v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/go-tpm v0.9.8 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.18.3 // indirect github.com/klauspost/compress v1.18.4 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/highwayhash v1.0.3 // indirect github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
github.com/nats-io/jwt/v2 v2.5.8 // indirect github.com/nats-io/jwt/v2 v2.8.1 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
@@ -29,7 +31,6 @@ require (
golang.org/x/mod v0.36.0 // indirect golang.org/x/mod v0.36.0 // indirect
golang.org/x/sync v0.20.0 // indirect golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
golang.org/x/tools v0.45.0 // indirect golang.org/x/tools v0.45.0 // indirect
modernc.org/libc v1.70.0 // indirect modernc.org/libc v1.70.0 // indirect
modernc.org/mathutil v1.7.1 // indirect modernc.org/mathutil v1.7.1 // indirect
+18 -16
View File
@@ -1,27 +1,31 @@
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op h1:kpBdlEPbRvff0mDD1gk7o9BhI16b9p5yYAXRlidpqJE=
github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.8.1 h1:V0xpGuD/N8Mi+fQNDynXohVvp7ZztevW5io8CUWlPmU=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/jwt/v2 v2.8.1/go.mod h1:nWnOEEiVMiKHQpnAy4eXlizVEtSfzacZ1Q43LIRavZg=
github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA= github.com/nats-io/nats-server/v2 v2.11.15 h1:StSf9TINInaZtr4oww2+kXmfwa9SkN//g/LwS19/UJ0=
github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk= github.com/nats-io/nats-server/v2 v2.11.15/go.mod h1:zwhv8Y0PE3KHyKgznJc/9Xoai638SaJd83zzJ5GJn74=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
@@ -43,10 +47,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8= golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0= golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM= golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
+26
View File
@@ -0,0 +1,26 @@
package busauth
import server "github.com/nats-io/nats-server/v2/server"
// PermissionsFromSubjects adapts a subject-deriving function (e.g.
// membership.SubjectACLFor, which maps an identity to the subjects of the rooms
// it belongs to plus the client infrastructure subjects) into the PermissionsFunc
// the ACL authenticator expects. The derived subjects are granted as BOTH the
// publish and subscribe allow set, so a connection can only pub/sub on the
// subjects it is entitled to. A derivation error is propagated so the caller
// fails closed (denies the connection) rather than granting open access.
//
// This is the production wiring for the per-subject data-plane ACL (issue 0003e,
// audit H4): membershipd passes PermissionsFromSubjects(membership.SubjectACLFor(
// store)) to NewNkeyAuthenticatorACL. It lives in busauth (not membership) so the
// membership package stays free of the nats-server dependency.
func PermissionsFromSubjects(derive func(signPubHex string) ([]string, error)) PermissionsFunc {
return func(signPubHex string) (*server.Permissions, error) {
subjects, err := derive(signPubHex)
if err != nil {
return nil, err
}
sp := &server.SubjectPermission{Allow: subjects}
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
}
}
+11 -1
View File
@@ -799,7 +799,17 @@ func (c *Client) processFrame(roomID string, info roomView, data []byte, handler
if err != nil { if err != nil {
return return
} }
if info.Policy.SignMsgs && f.Sig != nil { // A room with SignMsgs REQUIRES a signature, so an unsigned frame is
// unauthenticated and must be dropped — not silently accepted. The previous
// `&& f.Sig != nil` guard verified the signature only when one was present, so
// an attacker with data-plane access could publish a frame with Sig==nil and a
// forged Sender and have the receiver accept it as authentic in a room that
// demands signatures (audit N3, report 0006). Requiring the signature first
// closes that spoof.
if info.Policy.SignMsgs {
if f.Sig == nil {
return // signature required by room policy but absent: drop
}
pub, err := c.signerPub(roomID, f.Sender) pub, err := c.signerPub(roomID, f.Sender)
if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) { if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) {
return // unauthenticated frame: drop return // unauthenticated frame: drop
+154
View File
@@ -0,0 +1,154 @@
package client_test
import (
"sync"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
)
// TestReaudit_SigNilSpoof ports the re-auditor's N3 (Alto) finding: in a room
// that REQUIRES per-message signatures, an attacker with data-plane access
// publishes a raw frame with Sig==nil and a forged Sender. Before the fix
// processFrame verified the signature only when one was present
// (`SignMsgs && f.Sig != nil`), so the receiver accepted the unsigned, forged
// frame as authentic. The fix drops any unsigned frame in a SignMsgs room.
//
// Coverage:
// - golden: a properly signed frame from a real member IS delivered;
// - error : an unsigned frame with a forged Sender in a SignMsgs room is DROPPED;
// - edge : a room WITHOUT SignMsgs still delivers an unsigned frame (the drop
// is specific to signed rooms, not a blanket reject of unsigned frames).
func TestReaudit_SigNilSpoof(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
alice, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect alice: %v", err)
}
defer alice.Close()
bob, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect bob: %v", err)
}
defer bob.Close()
// A signed-but-NOT-encrypted room: SignMsgs enforces authorship, and the lack
// of encryption is exactly the case the auditor flagged as Alto (any peer with
// the subject can forge a sender if signatures are not strictly required).
const subject = "room.signed.spoof"
signedPolicy := room.Policy{Encrypt: false, Persist: false, SignMsgs: true}
roomID, err := alice.CreateRoom(subject, signedPolicy)
if err != nil {
t.Fatalf("alice create signed room: %v", err)
}
if err := alice.Invite(roomID, bob.Endpoint()); err != nil {
t.Fatalf("alice invite bob: %v", err)
}
if err := bob.Join(roomID); err != nil {
t.Fatalf("bob join: %v", err)
}
var mu sync.Mutex
var got []string
sub, err := bob.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got = append(got, string(plaintext))
mu.Unlock()
})
if err != nil {
t.Fatalf("bob subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
// Attacker: a raw NATS connection (the dev harness leaves the data plane open),
// no identity, forged Sender, NO signature.
const spoofMsg = "I am totally the victim"
rawAtk, err := nats.Connect(h.natsURL)
if err != nil {
t.Fatalf("attacker raw connect: %v", err)
}
defer rawAtk.Close()
spoof := frame.Frame{
Type: frame.PUB,
Subject: subject,
Sender: "victim-forged-endpoint",
MsgID: "spoof-1",
Epoch: 1,
Payload: []byte(spoofMsg),
// Sig intentionally nil — this is the attack.
}
sb, err := spoof.Marshal()
if err != nil {
t.Fatalf("marshal spoof: %v", err)
}
if err := rawAtk.Publish(subject, sb); err != nil {
t.Fatalf("attacker publish: %v", err)
}
_ = rawAtk.Flush()
// Golden: alice's properly signed frame must be delivered.
const goodMsg = "authentic from alice"
if err := alice.Publish(roomID, []byte(goodMsg)); err != nil {
t.Fatalf("alice publish: %v", err)
}
if !waitFor(&mu, &got, func(rs []string) bool {
for _, r := range rs {
if r == goodMsg {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("a properly signed frame should be delivered; got %v", snapshot(&mu, &got))
}
// Error path: the unsigned, forged frame must NEVER reach the handler.
for _, r := range snapshot(&mu, &got) {
if r == spoofMsg {
t.Fatalf("SIG-NIL SPOOF: receiver accepted an unsigned frame with a forged Sender in a SignMsgs room")
}
}
// Edge: a room WITHOUT SignMsgs still delivers an unsigned raw frame, proving
// the drop is scoped to signed rooms and did not break the plain-NATS path.
const subjectOpen = "room.open.nosig"
openRoom, err := alice.CreateRoom(subjectOpen, room.ModeNATS)
if err != nil {
t.Fatalf("alice create open room: %v", err)
}
openCol := subscribeCollect(t, alice, openRoom)
defer openCol.sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
const openMsg = "unsigned but allowed here"
openFrame := frame.Frame{
Type: frame.PUB,
Subject: subjectOpen,
Sender: "anyone",
MsgID: "open-1",
Payload: []byte(openMsg),
// no Sig — fine in a non-signed room
}
ob, _ := openFrame.Marshal()
if err := rawAtk.Publish(subjectOpen, ob); err != nil {
t.Fatalf("publish open frame: %v", err)
}
_ = rawAtk.Flush()
if !waitFor(&openCol.mu, &openCol.msgs, func(rs []string) bool {
for _, r := range rs {
if r == openMsg {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("an unsigned frame in a non-signed room should be delivered; got %v", snapshot(&openCol.mu, &openCol.msgs))
}
}
+103 -11
View File
@@ -39,18 +39,12 @@ func mustID(t *testing.T) cs.Identity {
return id return id
} }
// aclPermsFunc adapts membership.SubjectACLFor into the busauth.PermissionsFunc // aclPermsFunc builds the per-subject PermissionsFunc the ACL authenticator
// the ACL authenticator expects (same Allow set for publish and subscribe). // expects. It delegates to the SAME production wiring membershipd uses
// (busauth.PermissionsFromSubjects over membership.SubjectACLFor), so this test
// exercises the real path rather than a test-only reimplementation.
func aclPermsFunc(store membership.Store) busauth.PermissionsFunc { func aclPermsFunc(store membership.Store) busauth.PermissionsFunc {
derive := membership.SubjectACLFor(store) return busauth.PermissionsFromSubjects(membership.SubjectACLFor(store))
return func(signPubHex string) (*server.Permissions, error) {
subs, err := derive(signPubHex)
if err != nil {
return nil, err
}
sp := &server.SubjectPermission{Allow: subs}
return &server.Permissions{Publish: sp, Subscribe: sp}, nil
}
} }
// startACLNats boots an embedded NATS whose authenticator confines each peer to // startACLNats boots an embedded NATS whose authenticator confines each peer to
@@ -219,6 +213,104 @@ func TestSubjectACLIsolation(t *testing.T) {
} }
} }
// TestReaudit_H4_WildcardMetadataLeak ports the re-auditor's H4 vector. Before
// the per-subject ACL was WIRED into membershipd (it existed in pkg/membership and
// pkg/busauth but the binary used the plain NewNkeyAuthenticator), a registered
// NON-member could open a raw NATS connection, Subscribe(">"), and capture every
// room's subject plus JetStream stream/advisory activity — the payload stayed E2E
// ciphertext, but the metadata leaked. With NewNkeyAuthenticatorACL wired via the
// production path (busauth.PermissionsFromSubjects(membership.SubjectACLFor)), a
// non-member is confined to the client-infra subjects, so the wildcard and any
// foreign room subject are denied.
//
// Coverage:
// - error : a non-member's Subscribe(">") raises a permission violation;
// - edge : a non-member subscribing to another room's exact subject is denied;
// - golden: the member still pub/subs her own room, and the non-member never
// captures that traffic.
//
// Residual (DOCUMENTED, not closed here): the client-infra grant includes
// "$JS.API.>", shared by all peers so per-connection JetStream works. A peer that
// subscribes specifically to "$JS.API.>" can still observe stream-management
// requests whose subjects embed the stream name derived from a room id. Fully
// closing that needs NATS accounts/permissions isolation per identity (deferred to
// the 0003 decentralization line). The high-impact leak the auditor exploited —
// the room subject itself and JetStream advisories captured via "Subscribe(\">\")"
// — is closed.
func TestReaudit_H4_WildcardMetadataLeak(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
alice, eve := mustID(t), mustID(t)
aliceEP := frame.EndpointID(alice.SignPub)
mustAddUser(t, store, alice, "alice")
mustAddUser(t, store, eve, "eve") // eve is REGISTERED but never a member of alice's room
const subject = "room.e2e.confidential"
mustCreateRoom(t, store, "ROOMA", subject, aliceEP, alice)
srv := startACLNats(t, store)
url := srv.ClientURL()
eveErr := make(chan error, 8)
eveNC := nkeyConn(t, url, eve, eveErr)
eveAll := make(chan *nats.Msg, 16)
// Error: eve's wildcard subscription is rejected. nats.go creates the local sub
// object and the server rejects it asynchronously (delivered to ErrorHandler).
if _, err := eveNC.Subscribe(">", func(m *nats.Msg) { eveAll <- m }); err != nil {
t.Fatalf("eve sub >: %v", err)
}
_ = eveNC.Flush()
if e := waitErr(eveErr, 1*time.Second); e == nil {
t.Fatalf("a non-member's Subscribe(\">\") must raise a permissions violation (wildcard metadata leak still open)")
}
// Edge: eve subscribing to the foreign room's EXACT subject is also denied.
drain(eveErr)
if _, err := eveNC.Subscribe(subject, func(m *nats.Msg) { eveAll <- m }); err != nil {
t.Fatalf("eve sub subject: %v", err)
}
_ = eveNC.Flush()
if e := waitErr(eveErr, 1*time.Second); e == nil {
t.Fatalf("a non-member subscribing to another room's subject must be denied")
}
// Golden: alice (the member) pub/subs her own room with no violation, and eve
// never captured the traffic despite her (rejected) wildcard.
aliceErr := make(chan error, 4)
aliceNC := nkeyConn(t, url, alice, aliceErr)
aliceGot := make(chan string, 4)
if _, err := aliceNC.Subscribe(subject, func(m *nats.Msg) { aliceGot <- string(m.Data) }); err != nil {
t.Fatalf("alice sub own room: %v", err)
}
_ = aliceNC.Flush()
if e := waitErr(aliceErr, 300*time.Millisecond); e != nil {
t.Fatalf("alice subscribing to her OWN room raised an error: %v", e)
}
if err := aliceNC.Publish(subject, []byte("members-only metadata")); err != nil {
t.Fatalf("alice publish: %v", err)
}
_ = aliceNC.Flush()
select {
case got := <-aliceGot:
if got != "members-only metadata" {
t.Fatalf("alice got %q", got)
}
case <-time.After(2 * time.Second):
t.Fatalf("alice did not receive her own room's message")
}
select {
case m := <-eveAll:
t.Fatalf("eve captured room traffic despite the ACL: subject=%q data=%q", m.Subject, m.Data)
case <-time.After(500 * time.Millisecond):
// good: eve captured nothing
}
}
// TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path: // TestRefreshSessionGainsNewRoom is the "permissions refreshed on join" path:
// alice is not in room B, so her connection has no permission for its subject; // alice is not in room B, so her connection has no permission for its subject;
// after she is added to room B and calls RefreshSession, the reconnect // after she is added to room B and calls RefreshSession, the reconnect
+148
View File
@@ -0,0 +1,148 @@
package membership
import (
"net/http"
"net/http/httptest"
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// readRSSkBRaw reads VmRSS (kB) from /proc without a *testing.T, so it is safe to
// call from a sampling goroutine (vmRSSkB calls t.Skip, which may only run on the
// test's own goroutine). Returns 0 when unavailable.
func readRSSkBRaw() int64 {
b, err := os.ReadFile("/proc/self/status")
if err != nil {
return 0
}
for _, line := range strings.Split(string(b), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
f := strings.Fields(line)
if len(f) >= 2 {
v, _ := strconv.ParseInt(f[1], 10, 64)
return v
}
}
}
return 0
}
// TestReaudit_DoSConcurrency ports the re-auditor's N2 (Medio-Alto) finding: the
// per-request body ceiling and the per-IP rate limit do not bound the AGGREGATE
// memory of many concurrent uploads. The auditor drove RSS to ~1.42 GB with 40
// concurrent 16 MiB blob uploads. With the global in-flight byte limiter, the
// number of simultaneously-buffered uploads is capped, so the resident set stays
// bounded regardless of how many connections arrive at once.
//
// Coverage:
// - golden: a normal upload succeeds, and the server is still healthy after the
// storm (the limiter did not wedge it);
// - edge : concurrency right at the cap is admitted;
// - error : a concurrent flood far past the cap sheds the excess with 503
// (backpressure) instead of buffering it all, and the RSS spike stays bounded
// and does NOT scale with the number of requests.
func TestReaudit_DoSConcurrency(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("RSS probe is Linux-only")
}
srv := dosServer(t, AuthOff)
// Force a small aggregate cap so the bound is observable in a unit test: with
// a 16 MiB blob ceiling, 48 MiB admits ~3 concurrent uploads. Production uses
// maxInflightBytes (128 MiB); the mechanism under test is identical.
const cap = int64(48) << 20
srv.inflight = newInflightLimiter(cap)
const blob = maxBlobBytes // 16 MiB, the per-request ceiling
const n = 40 // the auditor's figure
// A spike bound: with the cap admitting ~3 concurrent 16 MiB uploads and a
// ~2x copy factor (auth buffer + handler buffer) plus Go runtime slack, the
// delta should stay well under this. Without the limiter, 40 concurrent
// uploads admitted at once would add hundreds of MB (the auditor saw ~1.4 GB).
const maxSpikeKB = int64(256) << 10 // 256 MiB
runtime.GC()
before := readRSSkBRaw()
// Sample peak RSS while the storm runs.
var peak int64
atomic.StoreInt64(&peak, before)
stop := make(chan struct{})
var sampler sync.WaitGroup
sampler.Add(1)
go func() {
defer sampler.Done()
for {
select {
case <-stop:
return
default:
if v := readRSSkBRaw(); v > atomic.LoadInt64(&peak) {
atomic.StoreInt64(&peak, v)
}
time.Sleep(2 * time.Millisecond)
}
}
}()
var got503, got200 int64
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: blob})
req.ContentLength = blob
// Distinct source IP per request: this is the multi-IP (botnet) shape the
// auditor flagged, where the per-IP rate limit gives no aggregate defense.
// The in-flight byte limiter is the global bound that must hold here.
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":1234"
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, req)
switch rec.Code {
case http.StatusServiceUnavailable:
atomic.AddInt64(&got503, 1)
case http.StatusOK:
atomic.AddInt64(&got200, 1)
}
}()
}
wg.Wait()
close(stop)
sampler.Wait()
runtime.GC()
delta := atomic.LoadInt64(&peak) - before
// Error path: the flood must have hit the cap and shed the excess with 503.
if got503 == 0 {
t.Fatalf("a concurrent flood of %d uploads past the cap should shed some with 503; got 200=%d 503=%d", n, got200, got503)
}
// The aggregate memory must stay bounded — not scale with n.
if delta > maxSpikeKB {
t.Fatalf("aggregate RSS spiked %d kB under %d concurrent uploads (bound %d kB): in-flight limiter not bounding memory", delta, n, maxSpikeKB)
}
// All reservations released after the storm.
if f := srv.inflight.inFlight(); f != 0 {
t.Fatalf("after the storm inFlight = %d, want 0 (reservations leaked)", f)
}
// Golden: the server is still healthy and serves a normal upload (from a fresh
// IP so the per-IP rate limiter, untouched here, is not what we measure).
rec := httptest.NewRecorder()
gReq := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello after storm"))
gReq.RemoteAddr = "203.0.113.9:9999"
srv.ServeHTTP(rec, gReq)
if rec.Code != http.StatusOK {
t.Fatalf("a normal upload after the storm should be 200, got %d (%s)", rec.Code, rec.Body.String())
}
t.Logf("N2 bound: %d uploads -> 200=%d 503=%d, RSS delta %d kB (bound %d kB), cap %d MiB",
n, got200, got503, delta, maxSpikeKB, cap>>20)
}
+85
View File
@@ -0,0 +1,85 @@
package membership
import "sync/atomic"
// inflightLimiter is a non-blocking, byte-counting concurrency limiter: a global
// cap on how many bytes of request body the server will buffer simultaneously.
//
// The per-request body ceilings (maxControlBodyBytes / maxBlobBytes) bound a
// single request, and the per-IP rate limiter throttles a single source, but
// neither bounds the AGGREGATE memory across many concurrent uploads: the
// re-audit (report 0006, N2) showed 40 concurrent 16 MiB blob uploads driving
// RSS to ~1.42 GB, and a distributed (multi-IP) flood scales without a ceiling
// because the rate limiter is per-IP. This limiter is the missing aggregate
// bound: ServeHTTP reserves a request's worst-case buffered size before reading
// the body and releases it when the request finishes, so the total bytes in
// flight can never exceed max regardless of how many connections or source IPs
// arrive at once.
//
// It is intentionally NON-blocking: when a reservation does not fit, the caller
// sheds the request with backpressure (503) rather than parking a goroutine,
// which would let an attacker exhaust goroutines/connections instead of RAM. The
// counter is maintained with sync/atomic (a CAS loop), so it is safe for
// concurrent use without a mutex.
//
// Implementation note: this lives inside unibus rather than the fn-registry
// (where a generic concurrency primitive would normally belong) because the
// registry's functions/core package pulls in transitive dependencies that
// require CGO (mattn/go-sqlite3) and external modules, which are incompatible
// with unibus's CGO_ENABLED=0 build, and because this work is scoped to the
// unibus sub-repo.
type inflightLimiter struct {
max int64 // immutable after construction; <= 0 disables the limiter
used int64 // bytes currently reserved; accessed ONLY via sync/atomic
}
// newInflightLimiter builds a limiter with a cap of maxBytes bytes in flight.
// maxBytes <= 0 disables the cap (tryAcquire always grants), which is the
// loopback/dev posture where an aggregate memory ceiling is not wanted.
func newInflightLimiter(maxBytes int64) *inflightLimiter {
return &inflightLimiter{max: maxBytes}
}
// tryAcquire reserves n bytes without blocking. It returns true and reserves the
// bytes when they fit within the cap (used+n <= max), or false (reserving
// nothing) when they do not. n <= 0 is granted without reserving, and a disabled
// limiter (max <= 0) always grants. Safe for concurrent use.
func (l *inflightLimiter) tryAcquire(n int64) bool {
if l.max <= 0 || n <= 0 {
return true
}
for {
cur := atomic.LoadInt64(&l.used)
if cur+n > l.max {
return false
}
if atomic.CompareAndSwapInt64(&l.used, cur, cur+n) {
return true
}
}
}
// release returns n previously reserved bytes. It must be paired with a
// tryAcquire that granted. A disabled limiter or n <= 0 is a no-op. The counter
// never drops below zero (a defensive clamp against an accidental double release).
func (l *inflightLimiter) release(n int64) {
if l.max <= 0 || n <= 0 {
return
}
for {
cur := atomic.LoadInt64(&l.used)
nv := cur - n
if nv < 0 {
nv = 0
}
if atomic.CompareAndSwapInt64(&l.used, cur, nv) {
return
}
}
}
// inFlight returns the bytes currently reserved. It is observability for tests
// and metrics.
func (l *inflightLimiter) inFlight() int64 {
return atomic.LoadInt64(&l.used)
}
+97
View File
@@ -0,0 +1,97 @@
package membership
import (
"sync"
"testing"
)
// TestInflightLimiterBasics covers the limiter contract: granting within the cap
// (golden), the exact boundary (edge), refusal over the cap without mutating the
// counter (error), the disabled mode, and the defensive clamp on over-release.
func TestInflightLimiterBasics(t *testing.T) {
l := newInflightLimiter(100)
// Golden: a reservation within the cap is granted and reflected.
if !l.tryAcquire(60) {
t.Fatalf("acquire 60 within cap 100 should grant")
}
if l.inFlight() != 60 {
t.Fatalf("inFlight = %d, want 60", l.inFlight())
}
// Edge: exactly reaching the cap (60+40 == 100) is granted.
if !l.tryAcquire(40) {
t.Fatalf("acquire to the exact cap should grant")
}
if l.inFlight() != 100 {
t.Fatalf("inFlight = %d, want 100", l.inFlight())
}
// Error: one more byte over the full cap is refused, and the counter is left
// untouched (a refused reservation reserves nothing).
if l.tryAcquire(1) {
t.Fatalf("acquire over a full cap must be refused")
}
if l.inFlight() != 100 {
t.Fatalf("a refused acquire must not change inFlight; got %d", l.inFlight())
}
// Release frees capacity again.
l.release(100)
if l.inFlight() != 0 {
t.Fatalf("inFlight after full release = %d, want 0", l.inFlight())
}
// Defensive: an over-release never drives the counter negative.
l.release(50)
if l.inFlight() != 0 {
t.Fatalf("over-release must clamp at 0; got %d", l.inFlight())
}
}
// TestInflightLimiterDisabled verifies that a non-positive cap disables the
// limiter: every reservation is granted and nothing is tracked (the loopback/dev
// posture).
func TestInflightLimiterDisabled(t *testing.T) {
for _, max := range []int64{0, -1} {
l := newInflightLimiter(max)
if !l.tryAcquire(1 << 30) {
t.Fatalf("disabled limiter (max=%d) must always grant", max)
}
if l.inFlight() != 0 {
t.Fatalf("disabled limiter must not track usage; got %d", l.inFlight())
}
l.release(1 << 30) // no-op, must not panic
}
}
// TestInflightLimiterConcurrent hammers the limiter from many goroutines with
// equal-sized acquire/release pairs and asserts the invariant never breaks: the
// counter returns to 0 and never exceeds the cap. Run with -race for the memory
// model guarantee.
func TestInflightLimiterConcurrent(t *testing.T) {
const cap = 1000
const chunk = 7
l := newInflightLimiter(cap)
var wg sync.WaitGroup
for g := 0; g < 64; g++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 2000; i++ {
if l.tryAcquire(chunk) {
if f := l.inFlight(); f > cap {
t.Errorf("inFlight %d exceeded cap %d", f, cap)
return
}
l.release(chunk)
}
}
}()
}
wg.Wait()
if l.inFlight() != 0 {
t.Fatalf("after all goroutines, inFlight = %d, want 0", l.inFlight())
}
}
+26
View File
@@ -35,6 +35,14 @@ const (
// MaxHeaderBytes caps request header size; wired into the http.Server by the // MaxHeaderBytes caps request header size; wired into the http.Server by the
// command. Exported so the bound lives next to its body-size siblings. // command. Exported so the bound lives next to its body-size siblings.
MaxHeaderBytes = 1 << 20 // 1 MiB MaxHeaderBytes = 1 << 20 // 1 MiB
// maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all
// concurrent requests (audit N2). The per-request ceilings above bound one
// request; this bounds the sum, so a concurrent (even multi-IP) flood of
// max-size uploads cannot drive the resident set without limit. 128 MiB allows
// ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before
// further POSTs are shed with 503 — generous for an interactive bus, bounded
// for an attacker.
maxInflightBytes = 128 << 20 // 128 MiB
) )
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive // Per-IP rate-limit defaults for the control plane. Tuned for an interactive
@@ -62,6 +70,7 @@ type Server struct {
authMode AuthMode authMode AuthMode
nonces nonceStore nonces nonceStore
limiter *ipRateLimiter limiter *ipRateLimiter
inflight *inflightLimiter
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS) // RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
// rooms. It is the minimum-defensive control for the data plane (audit H4): // rooms. It is the minimum-defensive control for the data plane (audit H4):
@@ -87,6 +96,7 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
authMode: authMode, authMode: authMode,
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries), nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL), limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
inflight: newInflightLimiter(maxInflightBytes),
} }
s.routes() s.routes()
return s return s
@@ -139,6 +149,22 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
r.Body = http.MaxBytesReader(w, r.Body, limit) r.Body = http.MaxBytesReader(w, r.Body, limit)
// Aggregate memory bound (audit N2): the per-request ceiling above and the
// per-IP rate limit do not cap the TOTAL bytes buffered across concurrent
// requests. A POST reserves its worst-case buffered size (its route ceiling)
// from a global limiter before the body is read, and is shed with 503 when the
// cap is reached, so the resident set stays bounded under a concurrent (even
// multi-IP) upload flood instead of growing linearly with the number of
// connections. Reservation is released when the request finishes. Only POSTs
// buffer a body; GETs carry none, so they do not consume the budget.
if r.Method == http.MethodPost {
if !s.inflight.tryAcquire(limit) {
writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight")
return
}
defer s.inflight.release(limit)
}
if s.authMode == AuthOff || isAuthExempt(r) { if s.authMode == AuthOff || isAuthExempt(r) {
s.mux.ServeHTTP(w, r) s.mux.ServeHTTP(w, r)
return return