16 Commits

Author SHA1 Message Date
egutierrez 618f6b61da chore(0004): close issue, bump unibus to 0.5.0, record dataplane-acl decision
Issue 0004 (security hardening) done across 0004a-0004f. app.md version 0.5.0
with the capability growth log entry; dev/0004d-dataplane-acl.md documents the
chosen minimum-defense strategy for the NATS data plane and its residual limit
(per-subject ACL deferred to 0003). Full work report in
projects/message_bus/reports/0005-2026-06-07-unibus-security-hardening.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:40:39 +02:00
egutierrez d483c90356 Merge issue/0004f-medium-fixes: owner binding, nonce-cache pre-auth, error leak (H6/H7/H12)
Owner of a created room must be the signer; the replay cache is populated only
after authorization (with bounded, O(expired) pruning); internal errors no
longer leak to clients.
2026-06-07 14:36:22 +02:00
egutierrez 1bcca987a4 test(membership): regression for H6 owner spoof and H7 nonce-cache poison
TestAudit_OwnerSpoof: a body declaring a foreign owner endpoint or signing key
is 403; a self-owned create is 201.
TestAudit_NonceCachePoisonPreAuth: an unregistered identity's repeated nonce
still fails 'not authorized' (never 'replayed'), proving it was not cached, while
an authorized identity's replay is still rejected.
Nonce cache unit tests: prune-after-TTL and cap-bounded memory.
2026-06-07 14:36:22 +02:00
egutierrez 0aa2caae43 feat(membership): owner binding, pre-auth nonce-cache fix, generic errors
Three medium audit findings.

H6 (owner spoof): handleCreateRoom now binds the body's declared owner to the
authenticated signer — both the endpoint id and the signing key must be the
signer's — so a registered peer cannot create rooms in another identity's name.
Enforced only when an authenticated signer is present.

H7 (nonce-cache poison pre-auth): IsAuthorized now runs BEFORE the replay cache
is touched, so an unregistered identity (Ed25519 keys are free) can no longer
seed nonces into it. The cache is rewritten with O(expired) pruning (insertion
order equals expiry order under a constant TTL) instead of the previous O(n)
full-map scan under the mutex, plus a size cap with oldest-eviction. This is the
prerequisite the 0003 replicated nonce store builds on.

H12 (error leak): internal store/blob errors are logged and replaced with a
generic client message via writeServerErr, so SQL fragments and filesystem paths
no longer reach the caller. Crafted 4xx messages (owner-sig, validation) are kept.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:36:22 +02:00
egutierrez 957b728160 Merge issue/0004e-control-tls: TLS on the HTTP control plane (H5)
membershipd serves https with the bus CA cert; the client pins the CA and
refuses a plaintext control plane when a CA is provided.
2026-06-07 14:30:15 +02:00
egutierrez 07f4af817e feat(client,membershipd): TLS on the HTTP control plane (H5)
Audit H5 (Alto, public). The control plane was signed but plaintext, so a
network MITM could read all metadata (subjects, endpoints, public keys, sealed
keys, blob hashes, the social graph) and drop requests. Signing gives integrity,
not confidentiality.

- membershipd serves the control plane over TLS (ListenAndServeTLS, MinVersion
  1.2) with the same CA-signed cert as the data plane when --tls-cert is set; the
  fail-open guard already requires --bus-auth enforce alongside it.
- The client gets a separate Options.CtrlTLS so the HTTP client pins the bus CA,
  independent of the NATS data-plane TLS. Connect now sets both planes' TLS from
  the one CA and REFUSES a plaintext http:// control-plane URL when a CA is
  provided, so metadata is never sent in the clear when TLS is expected.

Connect's signature is unchanged; callers (worker/chat --ca, mobile NewSession)
must pass an https:// control-plane URL when they pass a CA. Documented for the
deploy step.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:30:15 +02:00
egutierrez 0d56c3c81d Merge issue/0004d-dataplane-acl: data-plane content confidentiality (H4)
Public deployments refuse cleartext rooms, forcing E2E so a data-plane sniffer
gets only ciphertext. Per-subject ACL deferred to 0003 (documented).
2026-06-07 14:26:45 +02:00
egutierrez fb6c796059 test: regression for H4 data-plane content confidentiality
pkg/membership TestRequireEncryptedRoomsRejectsCleartext: cleartext create ->
403, encrypted -> 201, flag off -> cleartext allowed again.

pkg/client TestAudit_NoSubjectACL: under the public posture a ModeNATS room is
refused; bob (member) decrypts the secret; eve raw-subscribes to the subject off
the data plane and receives only ciphertext (non-empty AEAD nonce, no plaintext
substring) — closing the auditor's 'eve reads internal: salary numbers'.
2026-06-07 14:26:45 +02:00
egutierrez e502b16675 feat(membership): forbid cleartext rooms on public deployments (H4 min defense)
Audit H4 (Alto). The embedded NATS has a single account with no per-subject
permissions, so any registered peer can subscribe to any subject — a cleartext
(ModeNATS) room's payload is readable by anyone who knows the subject.

A complete per-subject ACL derived from membership does not fit here: NATS
evaluates a connection's permissions once at connect time and never re-evaluates
them, but unibus clients connect-then-create/join-then-publish on one connection
(TestSecureBusEndToEnd). Static permissions would forbid the owner from
publishing to a room it just created; the dynamic reconnection model belongs to
the 0003 decentralization redesign. See dev/0004d-dataplane-acl.md.

Minimum defense implemented: Server.RequireEncryptedRooms (set by membershipd on
any non-loopback bind) refuses to create cleartext rooms, so every room on a
public deployment is end-to-end encrypted. Message CONTENT stays confidential
even with no subject isolation; residual traffic-metadata exposure is documented
and tracked for 0003.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:26:45 +02:00
egutierrez 47ff74d837 Merge issue/0004c-membership-authz: membership-based authorization (H3)
Room metadata, member lists, room directories and sealed keys are now served
only to members of the room (and a sealed key only to its own endpoint),
closing the horizontal metadata leak.
2026-06-07 14:21:55 +02:00
egutierrez b81e5f26f1 feat(membership): authorize room reads by membership, not registration
Audit H3 (Alto). 'Authorized' meant 'registered in the allowlist', not 'member
of the room', so any registered peer could read another room's subject, its
full member list (every member's sign_pub + kex_pub), any endpoint's room
directory, and even another member's sealed key.

The middleware now carries the authenticated signer's endpoint id into the
handler via request context. Room handlers enforce membership:
  - GET /rooms/{id} and /rooms/{id}/members require the signer to be a member;
  - GET /rooms/{id}/key serves the sealed key only to its own endpoint
    (endpoint == signer) and only to a member;
  - GET /members/{endpoint}/rooms is restricted to the signer's own endpoint.

Authorization is skipped only when no authenticated signer is present (AuthOff
dev, or a soft-mode pass-through), preserving legacy/dev behavior. Internal
errors no longer echo store messages to the client on these paths.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:21:55 +02:00
egutierrez d742f91881 Merge issue/0004b-failopen-guard: close the fail-open startup (H2)
A non-loopback bind, or any TLS flag, now requires --bus-auth enforce or the
service refuses to start.
2026-06-07 14:17:37 +02:00
egutierrez 30577145ce feat(membershipd): refuse fail-open startup configs
Audit H2 (Alto). The binary defaulted to --bus-auth off, the NATS nkey
authenticator only turned on under enforce, and TLS was an independent flag.
Booting --bind 0.0.0.0 --tls-cert … without --bus-auth enforce left both
planes open while looking secure.

validateBootConfig is a pure guard, called right after flag parsing, that
log.Fatals on two insecure shapes:
  - a non-loopback --bind without --bus-auth enforce, and
  - --tls-cert/--tls-key without --bus-auth enforce.

An insecure public startup is now impossible (the process exits), so a
fail-open data plane never comes up for an unregistered client to reach.
TestAudit_FailOpenTLSWithoutAuth plus a full policy table cover golden
(public+enforce, dev loopback) and every refused shape.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:17:37 +02:00
egutierrez 01e2ee1aa0 Merge issue/0004a-dos-limit: pre-auth DoS hardening (H1)
Body-size ceilings (MaxBytesReader, per-route + Content-Length pre-reject),
Server.MaxHeaderBytes, and a per-IP token-bucket rate limit shut the critical
pre-auth memory-exhaustion vector. Regression test asserts a bounded RSS.
2026-06-07 14:16:13 +02:00
egutierrez e7bdcc978c test(membership): regression for H1 pre-auth DoS body limit
Ports the auditor's TestAudit_DoSBodyLimitNoAuth: an unsigned oversized POST
to /blobs is now rejected 413 without the resident set spiking (measured via
/proc/self/status, delta bounded to <96 MiB vs the attack's 400 MB+). Covers
both a truthful over-ceiling Content-Length (rejected pre-read) and a chunked
unknown-length sender (MaxBytesReader caps the read). Plus golden (normal blob
stored), boundary (exactly at the ceiling accepted), the 1 MiB control-plane
ceiling, and the per-IP rate limit (flood -> 429, distinct IPs not throttled).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:16:04 +02:00
egutierrez 60d6a86655 feat(membership): bound request bodies and add per-IP rate limit
Pre-auth DoS hardening (audit H1, Critical). The control-plane middleware
read the request body with io.ReadAll before authenticating and with no size
cap, so an unauthenticated peer could force the server to buffer an arbitrary
body in RAM (the auditor sent 400 MB and watched RSS climb to ~898 MB).

- ServeHTTP now caps the buffered body before reading: a per-route ceiling
  (1 MiB JSON, 16 MiB /blobs) rejects an over-declared Content-Length outright
  and wraps the body in http.MaxBytesReader so a lying/chunked sender trips at
  the ceiling instead of unbounded.
- handlePutBlob maps the MaxBytesReader cutoff to 413 in every auth mode.
- Per-IP token-bucket rate limiter (golang.org/x/time/rate, already in the
  module graph) sheds floods before auth or body reads. Loopback dev stacks are
  unaffected (burst >> any single client's rate). Kept in-package as transport
  glue, not promoted to the registry, mirroring the nonceCache decision in 0003.
- membershipd sets http.Server.MaxHeaderBytes and ReadHeaderTimeout.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:16:04 +02:00
21 changed files with 1401 additions and 63 deletions
+14 -1
View File
@@ -2,7 +2,7 @@
name: unibus
lang: go
domain: infra
version: 0.4.0
version: 0.5.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e]
uses_functions:
@@ -154,6 +154,19 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log
- v0.5.0 (2026-06-07) — hardening de seguridad (issue 0004) que cierra los
hallazgos de la auditoría red-team (report 0004) y lleva el veredicto de
exposición pública de "NO" a "sí-con-condiciones". Anti-DoS pre-auth
(`http.MaxBytesReader` por ruta + rechazo por `Content-Length` + rate-limit
por IP + `MaxHeaderBytes`); guard de fail-open que prohíbe arrancar con bind
público o TLS sin `--bus-auth enforce`; autorización por pertenencia en los GET
de room (metadata y clave sellada solo para miembros / el propio endpoint);
rooms cleartext deshabilitadas en bind público (contenido siempre E2E, mínimo
defensivo del data plane mientras la ACL por subject llega con 0003); TLS en el
control plane HTTP con la CA propia y cliente que exige `https` cuando hay CA;
y los medios H6/H7/H12 (owner ligado al firmante, `IsAuthorized` antes del
nonce-cache con poda O(expired) + cap, errores genéricos al cliente). Cada
hallazgo lleva su test adversarial `TestAudit_*` portado como regresión.
- v0.4.0 (2026-06-07) — descubrimiento de rooms: `GET /members/{endpoint}/rooms`
lista las rooms de un endpoint con su metadata y rol, y `client.ListMyRooms()`
lo consume. El control plane es pull (no hay push de invitaciones), así que un
+50
View File
@@ -0,0 +1,50 @@
package main
import (
"fmt"
"net"
"github.com/enmanuel/unibus/pkg/membership"
)
// isLoopbackBind reports whether the --bind value keeps the service reachable
// only from this host. An empty bind means "all interfaces" (public), and a
// hostname we cannot resolve to a loopback literal is treated as public — the
// conservative choice, so an unusual bind never silently slips past the guard.
func isLoopbackBind(bind string) bool {
switch bind {
case "localhost":
return true
case "":
return false // empty binds every interface
}
ip := net.ParseIP(bind)
if ip == nil {
return false // a hostname we can't classify: assume public
}
return ip.IsLoopback()
}
// validateBootConfig is the fail-open guard (audit H2). It refuses any startup
// configuration that would expose the bus without enforced authentication:
//
// - a non-loopback --bind without --bus-auth enforce (the data plane and
// control plane would both accept anyone), and
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
// but authenticates no one — encrypted access for everybody is still open).
//
// It is a pure function of the parsed flags so the command can fail fast at
// startup and tests can assert the policy without booting a server.
func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey string) error {
if !isLoopbackBind(bind) && mode != membership.AuthEnforce {
return fmt.Errorf(
"refusing to start: --bind %q is not loopback but --bus-auth is %q; a public bind requires --bus-auth enforce (or bind 127.0.0.1 for local dev)",
bind, mode)
}
if (tlsCert != "" || tlsKey != "") && mode != membership.AuthEnforce {
return fmt.Errorf(
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
mode)
}
return nil
}
+72
View File
@@ -0,0 +1,72 @@
package main
import (
"strings"
"testing"
"github.com/enmanuel/unibus/pkg/membership"
)
// TestAudit_FailOpenTLSWithoutAuth ports the auditor's H2 vector. Before the
// guard, booting with TLS on but the authenticator off ("--bind 0.0.0.0
// --tls-cert … " without enforce) produced an encrypted data plane that an
// unregistered, nkey-less client could still connect to — a fail-open config
// wearing the appearance of security. validateBootConfig now refuses it, so the
// insecure server never starts (the client therefore has nothing to connect to).
func TestAudit_FailOpenTLSWithoutAuth(t *testing.T) {
// The exact auditor configuration: public bind, TLS provided, auth off.
err := validateBootConfig("0.0.0.0", membership.AuthOff, "server.crt", "server.key")
if err == nil {
t.Fatalf("TLS without enforce on a public bind must be refused at startup")
}
if !strings.Contains(err.Error(), "enforce") {
t.Fatalf("error should point the operator at --bus-auth enforce, got: %v", err)
}
// And TLS without enforce is rejected even on loopback: TLS implies a
// security posture, so authenticating no one is always a misconfiguration.
if err := validateBootConfig("127.0.0.1", membership.AuthOff, "server.crt", "server.key"); err == nil {
t.Fatalf("TLS flags without enforce must be refused regardless of bind")
}
}
// TestBootConfigPolicy is the full table: the golden secure-public config is
// allowed, dev loopback is allowed, and every fail-open shape is refused.
func TestBootConfigPolicy(t *testing.T) {
cases := []struct {
name string
bind string
mode membership.AuthMode
cert string
key string
wantErr bool
}{
// Golden: the intended public production config.
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", false},
// Edge: local dev on loopback may stay open (no auth, no TLS).
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
{"localhost+off", "localhost", membership.AuthOff, "", "", false},
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
// Error: public bind without enforce.
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
{"empty-bind+off", "", membership.AuthOff, "", "", true},
// Error: TLS flags without enforce (cert or key alone is enough to trip it).
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := validateBootConfig(c.bind, c.mode, c.cert, c.key)
if c.wantErr && err == nil {
t.Fatalf("config %+v should be refused", c)
}
if !c.wantErr && err != nil {
t.Fatalf("config %+v should be allowed, got: %v", c, err)
}
})
}
}
+43 -5
View File
@@ -6,6 +6,7 @@ package main
import (
"context"
"crypto/tls"
"flag"
"log"
"net/http"
@@ -52,6 +53,13 @@ func main() {
log.Fatalf("%v", err)
}
// Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands
// --bus-auth enforce. This makes an insecure public startup impossible rather
// than silently exposing the bus with the appearance of security.
if err := validateBootConfig(*bind, authMode, *tlsCert, *tlsKey); err != nil {
log.Fatalf("%v", err)
}
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[membershipd] ")
@@ -111,15 +119,45 @@ func main() {
}
srv := membership.NewServer(store, blobs, authMode)
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
// has no per-subject ACL, so cleartext content would be readable by any
// registered peer. Forcing E2E keeps message content confidential regardless
// (audit H4 minimum defense; see dev/0004d-dataplane-acl.md).
if !isLoopbackBind(*bind) {
srv.RequireEncryptedRooms = true
log.Printf("cleartext rooms: DISABLED (public bind requires end-to-end encryption)")
}
log.Printf("control-plane auth: %s", authMode)
addr := *bind + ":" + *httpPort
httpSrv := &http.Server{Addr: addr, Handler: srv}
httpSrv := &http.Server{
Addr: addr,
Handler: srv,
// Bound request header size so a peer cannot exhaust memory with huge
// headers before any body limit applies (the body ceilings live in the
// membership middleware).
MaxHeaderBytes: membership.MaxHeaderBytes,
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
log.Printf("HTTP control-plane API: http://%s", addr)
log.Printf(" health: http://%s/healthz", addr)
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("http server: %v", err)
var serveErr error
if *tlsCert != "" {
// Serve the control plane over TLS with the same CA-signed cert as the
// data plane (audit H5): metadata (subjects, pubkeys, sealed keys, the
// social graph) is no longer readable by a network MITM. The fail-open
// guard already requires --bus-auth enforce alongside these flags.
httpSrv.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
log.Printf("HTTPS control-plane API: https://%s", addr)
log.Printf(" health: https://%s/healthz", addr)
log.Printf("control-plane TLS: ON (%s)", *tlsCert)
serveErr = httpSrv.ListenAndServeTLS(*tlsCert, *tlsKey)
} else {
log.Printf("HTTP control-plane API: http://%s", addr)
log.Printf(" health: http://%s/healthz", addr)
serveErr = httpSrv.ListenAndServe()
}
if serveErr != nil && serveErr != http.ErrServerClosed {
log.Fatalf("http server: %v", serveErr)
}
}()
+80
View File
@@ -0,0 +1,80 @@
# 0004d — Data-plane access control on NATS (audit H4)
## The finding
The NATS authenticator (`pkg/busauth`) decides one thing per connection:
*is this identity registered on the bus?* It does **not** scope what a connected
client may subscribe to or publish. There is a single NATS account with no
`Permissions`, so any registered peer can subscribe to, or publish on, **any**
subject. Concretely:
- A cleartext room (`ModeNATS`) carries its payload in the clear on its subject.
A registered peer that knows or guesses the subject subscribes and reads the
content directly (the auditor's `TestAudit_NoSubjectACL`: eve, never invited,
receives `"internal: salary numbers"`).
- An encrypted room (`ModeMatrix`) keeps its **content** confidential (the
payload is AEAD ciphertext), but the **metadata of traffic** — that a subject
is active, message sizes and timing, who is publishing — is still observable by
any registered peer that subscribes to the subject.
## Why the "complete" fix does not fit here
The preferred fix is per-subject permissions derived from room membership: when a
client connects, the authenticator looks up the rooms it belongs to and grants
`Sub`/`Pub` only on those subjects. NATS supports this — `CustomClientAuthentication`
can register a `*server.User` carrying `Permissions`.
The blocker is that **NATS evaluates permissions once, at connect time, and never
re-evaluates them on a live connection.** unibus clients routinely *connect → create
or get invited to a room → publish/subscribe* within the **same** connection
(`TestSecureBusEndToEnd` does exactly this: A connects, then creates `room.secure`,
then publishes to it). Permissions frozen at connect time would not include a room
created or joined afterwards, so the legitimate owner could not publish to the room
it just made. Making per-subject ACLs work would therefore require the client to
**reconnect on every membership change**, an invasive change to the client library
and to every peer (worker, chat, mobile) — and the prompt for this issue scopes the
client changes to the minimum.
That dynamic-membership reconnection model is precisely the redesign that issue
**0003** (decentralization) already has to do: it moves the control-plane state to a
replicated JetStream KV and reworks how nodes and clients (re)establish sessions. Per
the issue's own guidance ("if a complete strategy does not fit, implement the minimum
defense and document the rest"), the full subject ACL is deferred to 0003, where the
session/permission model is being rebuilt anyway.
## The strategy implemented here: forbid cleartext rooms in public
`Server.RequireEncryptedRooms` (set by `membershipd` on any non-loopback bind)
refuses to create a cleartext (`ModeNATS`) room. Every room on a public deployment
is therefore end-to-end encrypted, so **message content stays confidential even
though the transport offers no subject isolation**: a peer that sniffs another
room's subject receives only AEAD ciphertext it has no key for.
This composes with the 0004c control-plane authorization: a non-member cannot even
learn a room's subject through the control plane (`GET /rooms/{id}` → 403), so to
sniff it an attacker must already know or guess the subject out of band.
## What this does NOT close (residual exposure, by design)
- **Traffic metadata.** A registered peer that already knows a subject can still
subscribe and observe that the subject is active, the ciphertext sizes, and the
timing/cadence of messages. It cannot read content.
- **Cross-room publish.** A registered peer can still *publish* arbitrary bytes on
any subject. In an encrypted room those bytes fail AEAD open and the signature
check (`SignMsgs`), so receivers drop them — it is a nuisance/spam vector, not a
confidentiality or integrity break.
- **WireGuard-only deployments** may still use cleartext rooms (the guard only trips
on a public bind), because the network already restricts who can reach the bus.
Closing the residual metadata exposure requires the per-subject ACL described above,
tracked for issue 0003.
## Regression evidence
- `pkg/membership``TestRequireEncryptedRoomsRejectsCleartext`: with
`RequireEncryptedRooms` on, `POST /rooms` for a cleartext policy returns 403 while
an encrypted-room create returns 201.
- `pkg/client``TestAudit_NoSubjectACL`: under the public posture, creating a
`ModeNATS` room fails; alice creates an encrypted room and publishes; eve (a
registered non-member) raw-subscribes to the subject and receives only ciphertext —
she never recovers the plaintext.
+3 -1
View File
@@ -1,8 +1,10 @@
---
issue: 0004
title: Hardening de seguridad — autorización, anti-DoS y confidencialidad antes de exponer público
status: spec
status: done
created: 2026-06-07
completed: 2026-06-07
report: projects/message_bus/reports/0005-2026-06-07-unibus-security-hardening.md
domain: security
scope: unibus (pkg/membership/server.go, auth.go, pkg/embeddednats, pkg/client, cmd/membershipd, deploy/tls)
depends_on: 0001 (cierra los gaps que la auditoría 0004 encontró sobre lo entregado en 0001)
+2 -2
View File
@@ -8,7 +8,9 @@ require (
fn-registry v0.0.0-00010101000000-000000000000
github.com/nats-io/nats-server/v2 v2.10.22
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/oklog/ulid/v2 v2.1.0
golang.org/x/time v0.7.0
modernc.org/sqlite v1.47.0
)
@@ -19,7 +21,6 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.5.8 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
@@ -29,7 +30,6 @@ require (
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.45.0 // indirect
modernc.org/libc v1.70.0 // indirect
modernc.org/mathutil v1.7.1 // indirect
+6
View File
@@ -1,5 +1,7 @@
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
@@ -47,6 +49,10 @@ golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
golang.org/x/tools/go/expect v0.1.1-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM=
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw=
+26 -5
View File
@@ -24,6 +24,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
@@ -67,10 +68,15 @@ type Options struct {
// with an nkey to a server that does not advertise nkey auth ("nkeys not
// supported by the server"), so this is opt-in rather than always-on.
UseNkey bool
// TLS, when non-nil, secures the NATS connection and pins the server to this
// config's RootCAs (the bus's self-signed CA). Build it with
// busauth.LoadCATLSConfig(caPath). Nil keeps the connection plaintext.
// TLS, when non-nil, secures the NATS (data plane) connection and pins the
// server to this config's RootCAs (the bus's self-signed CA). Build it with
// busauth.LoadCATLSConfig(caPath). Nil keeps the data plane plaintext.
TLS *tls.Config
// CtrlTLS, when non-nil, secures the HTTP control-plane connection and pins it
// to this config's RootCAs. It is separate from TLS so the two planes can be
// secured independently (a test may TLS one and not the other); production
// sets both to the same CA via Connect. Nil keeps the control plane plaintext.
CtrlTLS *tls.Config
}
// New connects to NATS and records the control-plane URL with default Options
@@ -90,11 +96,19 @@ func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, e
if caPath == "" {
return New(natsURL, ctrlURL, id)
}
// A CA implies the bus is TLS on BOTH planes. Refuse a plaintext control-plane
// URL: signing gives integrity, not confidentiality, so sending metadata over
// http:// when the operator provisioned a CA would silently leak it to a MITM
// (audit H5). Force https rather than silently downgrade.
if !strings.HasPrefix(ctrlURL, "https://") {
return nil, fmt.Errorf("client: control-plane URL %q must be https:// when a CA is provided", ctrlURL)
}
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
if err != nil {
return nil, fmt.Errorf("client: load CA %q: %w", caPath, err)
}
return NewWithOptions(natsURL, ctrlURL, id, Options{UseNkey: true, TLS: tlsCfg})
// Pin the same CA on both planes: nkey+TLS on NATS, TLS on the HTTP control plane.
return NewWithOptions(natsURL, ctrlURL, id, Options{UseNkey: true, TLS: tlsCfg, CtrlTLS: tlsCfg})
}
// NewWithOptions is New with explicit connection options (nkey auth, and, from
@@ -125,13 +139,20 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli
nc.Close()
return nil, fmt.Errorf("client: init jetstream: %w", err)
}
// The control-plane HTTP client pins the bus CA when CtrlTLS is set, so an
// https:// control plane is verified against the bus's own CA rather than the
// system roots (audit H5). Without it the client stays plaintext for dev.
httpClient := &http.Client{Timeout: 10 * time.Second}
if opts.CtrlTLS != nil {
httpClient.Transport = &http.Transport{TLSClientConfig: opts.CtrlTLS.Clone()}
}
return &Client{
id: id,
endpoint: frame.EndpointID(id.SignPub),
nc: nc,
js: js,
ctrlURL: ctrlURL,
http: &http.Client{Timeout: 10 * time.Second},
http: httpClient,
keyCache: map[string]map[int][]byte{},
signCache: map[string][]byte{},
}, nil
+2 -1
View File
@@ -32,6 +32,7 @@ type testHarness struct {
ns *server.Server
httpts *httptest.Server
store *membership.Store
srv *membership.Server
}
func freePort(t *testing.T) int {
@@ -98,7 +99,7 @@ func bootHarness(t *testing.T, ctrlMode membership.AuthMode, natsAuth bool, nats
srv := membership.NewServer(store, blobs, ctrlMode)
httpts := httptest.NewServer(srv)
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store}
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store, srv: srv}
t.Cleanup(func() {
httpts.Close()
store.Close()
+87
View File
@@ -0,0 +1,87 @@
package client_test
import (
"crypto/tls"
"crypto/x509"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
)
// TestConnectRequiresHTTPSWithCA covers audit H5's client contract: when a CA is
// provided the control-plane URL must be https://. A signed request gives
// integrity but not confidentiality, so silently talking http:// to a bus the
// operator secured with a CA would leak all metadata to a MITM. Connect refuses
// the plaintext URL outright (error path; the scheme is checked before any
// network use, so a bogus CA path is irrelevant).
func TestConnectRequiresHTTPSWithCA(t *testing.T) {
_, err := client.Connect("nats://127.0.0.1:4222", "http://127.0.0.1:8470", mustIdentity(t), "/nonexistent/ca.crt")
if err == nil {
t.Fatalf("Connect with a CA and an http:// control plane must be refused")
}
if !strings.Contains(err.Error(), "https") {
t.Fatalf("error should point the caller at https, got: %v", err)
}
}
// TestControlPlaneOverTLS proves the control plane works over TLS pinned to the
// bus CA (golden) and that a client lacking the CA cannot complete the handshake
// (error path) — so a network observer can neither read nor inject control-plane
// traffic. The data plane is left plaintext here to isolate the HTTP-TLS wiring.
func TestControlPlaneOverTLS(t *testing.T) {
dir := t.TempDir()
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("store: %v", err)
}
t.Cleanup(func() { store.Close() })
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
t.Fatalf("blobs: %v", err)
}
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: filepath.Join(dir, "js"), Host: "127.0.0.1", Port: freePort(t),
})
if err != nil {
t.Fatalf("nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
natsURL := embeddednats.ClientURL(ns)
// An https control plane wrapping the real membership server.
ts := httptest.NewTLSServer(membership.NewServer(store, blobs, membership.AuthOff))
t.Cleanup(ts.Close)
pool := x509.NewCertPool()
pool.AddCert(ts.Certificate())
// Golden: trusting the control-plane CA, an https control-plane request works.
good, err := client.NewWithOptions(natsURL, ts.URL, mustIdentity(t),
client.Options{CtrlTLS: &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}})
if err != nil {
t.Fatalf("connect with the pinned CA: %v", err)
}
defer good.Close()
if _, err := good.CreateRoom("room.tls.ctrl", room.ModeNATS); err != nil {
t.Fatalf("control plane over TLS should succeed with the pinned CA: %v", err)
}
// Error path: without the CA the https handshake fails, so the request errors.
bad, err := client.NewWithOptions(natsURL, ts.URL, mustIdentity(t),
client.Options{CtrlTLS: &tls.Config{RootCAs: x509.NewCertPool(), MinVersion: tls.VersionTLS12}})
if err != nil {
t.Fatalf("nats connect (bad CA case): %v", err)
}
defer bad.Close()
if _, err := bad.CreateRoom("room.tls.fail", room.ModeNATS); err == nil {
t.Fatalf("a control-plane request without the CA must fail the TLS handshake")
}
}
+124
View File
@@ -0,0 +1,124 @@
package client_test
import (
"bytes"
"sync"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
)
// TestAudit_NoSubjectACL ports the auditor's H4 (Alto) finding under the minimum
// defense chosen for this issue (forbid cleartext rooms in public; see
// dev/0004d-dataplane-acl.md). The NATS data plane still has no per-subject ACL,
// so the guarantee we make is CONTENT confidentiality, proven three ways:
//
// error : a cleartext (ModeNATS) room cannot be created under the public posture;
// golden: a legitimate member (bob) decrypts the secret;
// edge : eve, sniffing the raw subject off the data plane, receives only
// ciphertext — she never recovers the plaintext the auditor's eve did.
func TestAudit_NoSubjectACL(t *testing.T) {
h := newHarness(t)
h.srv.RequireEncryptedRooms = true // the public posture
waitHealth(t, h.ctrlURL)
alice, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect alice: %v", err)
}
defer alice.Close()
// Error path: a cleartext room is refused, so no payload ever rides a subject
// in the clear for a sniffer to read (the exact vector the auditor exploited).
if _, err := alice.CreateRoom("secret.subject.payroll", room.ModeNATS); err == nil {
t.Fatalf("cleartext room must be refused on a public deployment")
}
// alice creates an encrypted room and invites bob (the legitimate reader).
const subject = "secret.subject.payroll.e2e"
const secret = "internal: salary numbers"
roomID, err := alice.CreateRoom(subject, room.ModeMatrix)
if err != nil {
t.Fatalf("alice create encrypted room: %v", err)
}
bob, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect bob: %v", err)
}
defer bob.Close()
if err := alice.Invite(roomID, bob.Endpoint()); err != nil {
t.Fatalf("alice invite bob: %v", err)
}
if err := bob.Join(roomID); err != nil {
t.Fatalf("bob join: %v", err)
}
// Golden: bob (a member) subscribes and decrypts the secret.
var bmu sync.Mutex
var bobGot []string
bobSub, err := bob.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
bmu.Lock()
bobGot = append(bobGot, string(plaintext))
bmu.Unlock()
})
if err != nil {
t.Fatalf("bob subscribe: %v", err)
}
defer bobSub.Unsubscribe()
// Edge: eve sniffs the raw subject directly off NATS (no membership, no key).
rawEve, err := nats.Connect(h.natsURL)
if err != nil {
t.Fatalf("eve raw connect: %v", err)
}
defer rawEve.Close()
eveGot := make(chan []byte, 8)
if _, err := rawEve.Subscribe(subject, func(m *nats.Msg) { eveGot <- m.Data }); err != nil {
t.Fatalf("eve raw subscribe: %v", err)
}
if err := rawEve.Flush(); err != nil {
t.Fatalf("eve flush: %v", err)
}
time.Sleep(200 * time.Millisecond) // let both subscriptions settle
if err := alice.Publish(roomID, []byte(secret)); err != nil {
t.Fatalf("alice publish: %v", err)
}
// bob must decrypt the secret.
if !waitFor(&bmu, &bobGot, func(rs []string) bool {
for _, r := range rs {
if r == secret {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("bob (member) should decrypt the secret; got %v", snapshot(&bmu, &bobGot))
}
// eve must receive only ciphertext — never the plaintext.
select {
case data := <-eveGot:
if bytes.Contains(data, []byte(secret)) {
t.Fatalf("eve sniffed the plaintext off the data plane: %q", data)
}
f, err := frame.Unmarshal(data)
if err != nil {
t.Fatalf("eve received an undecodable frame: %v", err)
}
if string(f.Payload) == secret {
t.Fatalf("eve read the secret from the frame payload")
}
if len(f.Nonce) == 0 {
t.Fatalf("expected an AEAD-encrypted payload (non-empty nonce), got cleartext frame")
}
case <-time.After(2 * time.Second):
// eve receiving nothing is also a safe outcome; the assertion is only that
// she never gets the plaintext, which holds vacuously here.
}
}
+68 -23
View File
@@ -11,6 +11,8 @@ import (
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/frame"
)
// AuthMode is the control-plane authentication rollout state (feature flag
@@ -73,6 +75,11 @@ const (
const (
clockSkew = 30 * time.Second
nonceTTL = 60 * time.Second
// maxNonceCacheEntries bounds the replay cache so it cannot grow without limit
// (audit H7). With IsAuthorized now gating insertion, only authorized traffic
// is cached, so this ceiling is only approached under a legitimate burst; at
// the cap the oldest nonce is evicted (its TTL is nearly up anyway).
maxNonceCacheEntries = 100_000
)
// CanonicalRequest returns the exact bytes that are signed and verified for a
@@ -89,43 +96,75 @@ func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte {
}
// nonceCache remembers recently-seen nonces to reject replays. It is an
// in-memory map guarded by a mutex with lazy expiry — sufficient for a single
// membershipd process (the spec's chosen tradeoff over a server-issued nonce
// round-trip). A distributed deployment would need a shared store.
// in-memory store guarded by a mutex — sufficient for a single membershipd
// process (the spec's chosen tradeoff over a server-issued nonce round-trip). A
// distributed deployment would need a shared store (tracked for issue 0003).
//
// Pruning is O(expired), not O(n): because the TTL is constant, insertion order
// equals expiry order, so the oldest entries (front of `order`) are exactly the
// ones that expire first (audit H7 — the previous full-map scan under the mutex
// was a CPU-amplification vector). A size cap bounds memory.
type nonceCache struct {
mu sync.Mutex
seen map[string]time.Time
ttl time.Duration
mu sync.Mutex
seen map[string]time.Time // nonce -> expiry
order []string // nonces in insertion order == expiry order
ttl time.Duration
cap int
}
func newNonceCache(ttl time.Duration) *nonceCache {
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl}
func newNonceCache(ttl time.Duration, capacity int) *nonceCache {
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity}
}
// rememberOrReject records nonce and returns true if it was unseen, or false if
// it is a replay (still live in the cache). Expired entries are pruned lazily on
// each call so the map cannot grow without bound under steady traffic.
// it is a replay (still live in the cache).
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
n.mu.Lock()
defer n.mu.Unlock()
for k, exp := range n.seen {
if exp.Before(now) {
delete(n.seen, k)
// Prune expired entries from the front (oldest first). The first live entry
// ends the scan — everything behind it was inserted later and is newer.
cut := 0
for cut < len(n.order) {
exp, ok := n.seen[n.order[cut]]
if !ok {
cut++ // already evicted by the cap path below
continue
}
if !exp.Before(now) {
break
}
delete(n.seen, n.order[cut])
cut++
}
if cut > 0 {
n.order = append(n.order[:0], n.order[cut:]...)
}
if exp, ok := n.seen[nonce]; ok && !exp.Before(now) {
return false
return false // a live replay
}
// Bound memory: at capacity, evict the oldest entry (its TTL is nearly up).
for len(n.seen) >= n.cap && len(n.order) > 0 {
oldest := n.order[0]
n.order = n.order[1:]
delete(n.seen, oldest)
}
n.seen[nonce] = now.Add(n.ttl)
n.order = append(n.order, nonce)
return true
}
// authResult is what a successful authentication yields: the verified signing
// key (hex) and the authorized user record. Handlers may use it for fine-grained
// authorization (e.g. role checks) in later phases.
// key (hex), the endpoint id derived from it, and the authorized user record.
// Handlers use endpoint for membership authorization (only a member of a room
// may read its metadata/keys); user is available for role checks.
type authResult struct {
pubHex string
user User
pubHex string
endpoint string
user User
}
// authenticate verifies the signature headers on r against body and the user
@@ -168,10 +207,9 @@ func (s *Server) authenticate(r *http.Request, body []byte, now time.Time) (auth
return authResult{}, fmt.Errorf("invalid signature")
}
if !s.nonces.rememberOrReject(nonce, now) {
return authResult{}, fmt.Errorf("replayed nonce")
}
// Authorize BEFORE touching the replay cache (audit H7): an unregistered
// identity can mint valid signatures for free, so caching its nonces would let
// it poison/grow the cache pre-auth. Only authorized identities are remembered.
if !s.store.IsAuthorized(pubHex) {
return authResult{}, fmt.Errorf("identity not authorized")
}
@@ -181,5 +219,12 @@ func (s *Server) authenticate(r *http.Request, body []byte, now time.Time) (auth
// IsAuthorized passed but the row vanished (race with revoke): fail closed.
return authResult{}, fmt.Errorf("identity not authorized")
}
return authResult{pubHex: pubHex, user: user}, nil
// Anti-replay last: a replayed request from an authorized identity is still
// rejected here (the nonce is already live in the cache from its first use).
if !s.nonces.rememberOrReject(nonce, now) {
return authResult{}, fmt.Errorf("replayed nonce")
}
return authResult{pubHex: pubHex, endpoint: frame.EndpointID(pub), user: user}, nil
}
+16 -4
View File
@@ -15,6 +15,7 @@ import (
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/frame"
)
// authHarness boots an in-process membershipd HTTP server in the given auth mode
@@ -88,13 +89,24 @@ func do(t *testing.T, req *http.Request) (int, string) {
return resp.StatusCode, string(b)
}
const okPath = "/members/alice-endpoint/rooms" // always 200 with an empty list
// okPath is a path that authenticates and returns 200 with an empty list when
// the request carries NO membership-bound signer (AuthOff/soft/missing-headers
// tests). Under enforce, the per-endpoint room directory is now restricted to
// the signer's own endpoint (audit H3), so tests that sign as alice use
// aliceRoomsPath instead.
const okPath = "/members/alice-endpoint/rooms"
// aliceRoomsPath is alice's own room directory — the canonical "authenticated
// and authorized" 200 path under enforce after H3.
func aliceRoomsPath(h *authHarness) string {
return "/members/" + frame.EndpointID(h.alice.SignPub) + "/rooms"
}
// Golden: a request signed by a registered, active identity is accepted.
func TestAuthGoldenAccepted(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
code, _ := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-golden"))
code, _ := do(t, signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "nonce-golden"))
if code != http.StatusOK {
t.Fatalf("golden signed request should be 200, got %d", code)
}
@@ -116,12 +128,12 @@ func TestAuthUnregisteredRejected(t *testing.T) {
func TestAuthReplayRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
first := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
first := signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "nonce-replay")
if code, body := do(t, first); code != http.StatusOK {
t.Fatalf("first request should be 200, got %d (%s)", code, body)
}
// Identical ts + nonce + signature: a replay.
second := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
second := signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "nonce-replay")
if code, body := do(t, second); code != http.StatusUnauthorized {
t.Fatalf("replayed request should be 401, got %d (%s)", code, body)
}
+119
View File
@@ -0,0 +1,119 @@
package membership
import (
"encoding/hex"
"net/http"
"strconv"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/frame"
)
// seedRoom inserts an encrypted room owned by alice with a sealed key for her,
// directly through the store so the test controls membership precisely. It
// returns the room id and alice's endpoint.
func seedRoom(t *testing.T, h *authHarness, subject string) (string, string) {
t.Helper()
aliceEp := frame.EndpointID(h.alice.SignPub)
roomID := newULID()
info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true}
if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed-key")); err != nil {
t.Fatalf("seed room: %v", err)
}
return roomID, aliceEp
}
// register adds id to the bus allowlist so its signed requests clear auth and
// reach the handler, where membership authorization (not mere registration) is
// what the test exercises.
func register(t *testing.T, h *authHarness, id cs.Identity, handle string) {
t.Helper()
if err := h.store.AddUser(hex.EncodeToString(id.SignPub), handle, RoleMember); err != nil {
t.Fatalf("register %s: %v", handle, err)
}
}
// TestAudit_HorizontalMetadataLeak ports the auditor's H3 (Alto) finding: bob is
// REGISTERED on the bus but is NOT a member of alice's room. Before the fix the
// GET endpoints checked registration, not membership, so bob could read the
// room's subject, the full member list (with everyone's public keys), alice's
// room directory, and even alice's sealed key. Now every one of those returns
// 403 to bob, while alice (owner/member) and carol (plain member) get 200.
func TestAudit_HorizontalMetadataLeak(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
roomID, aliceEp := seedRoom(t, h, "secret.subject.payroll")
// bob: registered, never invited.
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob")
// carol: registered AND a plain (non-owner) member — the legitimate-member edge.
carol, _ := cs.GenerateIdentity()
register(t, h, carol, "carol")
carolEp := frame.EndpointID(carol.SignPub)
if err := h.store.AddMember(roomID, Member{Endpoint: carolEp, Role: RoleMember, SignPub: carol.SignPub, KexPub: carol.KexPub}, 1, []byte("carol-sealed")); err != nil {
t.Fatalf("add carol: %v", err)
}
n := 0
get := func(id cs.Identity, path string) int {
n++
code, _ := do(t, signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n)))
return code
}
// Error path: bob (non-member) is forbidden on every room endpoint.
bobChecks := []struct {
name string
path string
}{
{"get room", "/rooms/" + roomID},
{"list members", "/rooms/" + roomID + "/members"},
{"alice room directory", "/members/" + aliceEp + "/rooms"},
{"alice sealed key", "/rooms/" + roomID + "/key?endpoint=" + aliceEp},
{"bob sealed key in alices room", "/rooms/" + roomID + "/key?endpoint=" + frame.EndpointID(bob.SignPub)},
}
for _, c := range bobChecks {
if code := get(bob, c.path); code != http.StatusForbidden {
t.Fatalf("bob (non-member) %s should be 403, got %d", c.name, code)
}
}
// Golden: alice (owner/member) reads her room's metadata, members, directory, key.
aliceChecks := []string{
"/rooms/" + roomID,
"/rooms/" + roomID + "/members",
"/members/" + aliceEp + "/rooms",
"/rooms/" + roomID + "/key?endpoint=" + aliceEp,
}
for _, p := range aliceChecks {
if code := get(h.alice, p); code != http.StatusOK {
t.Fatalf("alice (owner) %s should be 200, got %d", p, code)
}
}
// Edge: carol is a plain member, not the owner — she may still read the room.
if code := get(carol, "/rooms/"+roomID); code != http.StatusOK {
t.Fatalf("carol (member) get room should be 200, got %d", code)
}
if code := get(carol, "/rooms/"+roomID+"/members"); code != http.StatusOK {
t.Fatalf("carol (member) list members should be 200, got %d", code)
}
// Edge: carol may fetch her OWN sealed key but not alice's.
if code := get(carol, "/rooms/"+roomID+"/key?endpoint="+carolEp); code != http.StatusOK {
t.Fatalf("carol fetching her own key should be 200, got %d", code)
}
if code := get(carol, "/rooms/"+roomID+"/key?endpoint="+aliceEp); code != http.StatusForbidden {
t.Fatalf("carol fetching alice's key should be 403, got %d", code)
}
}
// nonceN yields a distinct nonce per request so the anti-replay cache never
// rejects a fresh, legitimately-different request inside one test.
func nonceN(i int) string {
return "authz-nonce-" + strconv.Itoa(i)
}
+206
View File
@@ -0,0 +1,206 @@
package membership
import (
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"github.com/enmanuel/unibus/pkg/blobstore"
)
// dosServer builds a Server backed by a fresh store + blob store so a test can
// drive ServeHTTP in-process (white-box) and observe its memory behavior without
// a network round trip — the same in-process technique the auditor used.
func dosServer(t *testing.T, mode AuthMode) *Server {
t.Helper()
dir := t.TempDir()
store, err := Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
t.Fatalf("open blobs: %v", err)
}
t.Cleanup(func() { store.Close() })
return NewServer(store, blobs, mode)
}
// zeroReader yields up to remaining zero bytes without ever allocating them, so
// the test process itself never materializes a huge buffer (which would taint the
// RSS measurement we are trying to make about the SERVER).
type zeroReader struct{ remaining int64 }
func (z *zeroReader) Read(p []byte) (int, error) {
if z.remaining <= 0 {
return 0, io.EOF
}
n := int64(len(p))
if n > z.remaining {
n = z.remaining
}
for i := int64(0); i < n; i++ {
p[i] = 0
}
z.remaining -= n
return int(n), nil
}
// vmRSSkB reads the resident set size (kB) of this process from /proc. Linux-only;
// the caller skips on other platforms.
func vmRSSkB(t *testing.T) int64 {
t.Helper()
b, err := os.ReadFile("/proc/self/status")
if err != nil {
t.Skipf("cannot read /proc/self/status: %v", err)
}
for _, line := range strings.Split(string(b), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
f := strings.Fields(line)
if len(f) >= 2 {
v, _ := strconv.ParseInt(f[1], 10, 64)
return v
}
}
}
t.Skip("VmRSS not present in /proc/self/status")
return 0
}
// TestAudit_DoSBodyLimitNoAuth ports the auditor's H1 (Critical) vector: a peer
// with NO valid signature posts an oversized body. Before the fix the middleware
// io.ReadAll'd it unbounded (the auditor sent 400 MB and watched RSS jump from
// 18 MB to 898 MB). Now the request is rejected 413 and the resident set does NOT
// spike. Two shapes are covered:
//
// (1) a truthful, over-ceiling Content-Length -> rejected before any byte is read;
// (2) a lying / unknown length (chunked) -> MaxBytesReader trips mid-read,
// capping the buffered bytes at the ceiling instead of the attacker's 400 MB.
func TestAudit_DoSBodyLimitNoAuth(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("RSS probe is Linux-only")
}
srv := dosServer(t, AuthEnforce) // enforce: the request carries no signature
const huge = int64(400) << 20 // 400 MiB — the auditor's figure
// A spike threshold an order of magnitude below the attack. The old code would
// add ~400 MB+; the fix keeps the delta to at most one bounded buffer.
const maxSpikeKB = int64(96) << 10 // 96 MiB
// Shape 1: declared Content-Length over the blob ceiling -> early 413, no read.
runtime.GC()
before := vmRSSkB(t)
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge})
req.ContentLength = huge
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, req)
if rec.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("over-declared body should be 413, got %d", rec.Code)
}
runtime.GC()
if d := vmRSSkB(t) - before; d > maxSpikeKB {
t.Fatalf("RSS spiked %d kB on a pre-declared oversized body (limit %d kB)", d, maxSpikeKB)
}
// Shape 2: unknown length (chunked-style). The middleware cannot reject by
// Content-Length, so MaxBytesReader must cap the read at maxBlobBytes.
runtime.GC()
before = vmRSSkB(t)
req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge})
req.ContentLength = -1
rec = httptest.NewRecorder()
srv.ServeHTTP(rec, req)
if rec.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("unknown-length oversized body should be 413, got %d", rec.Code)
}
runtime.GC()
if d := vmRSSkB(t) - before; d > maxSpikeKB {
t.Fatalf("RSS spiked %d kB on a chunked oversized body (limit %d kB)", d, maxSpikeKB)
}
}
// TestBlobLimitGoldenAndBoundary covers the golden path (a normal blob is stored)
// and the boundary (a body exactly at the ceiling is accepted; one byte over by
// truthful Content-Length is rejected before buffering).
func TestBlobLimitGoldenAndBoundary(t *testing.T) {
srv := dosServer(t, AuthOff) // AuthOff: the limits apply regardless of auth mode
// Golden: a small blob is accepted and hashed.
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello blob")))
if rec.Code != http.StatusOK {
t.Fatalf("normal blob should be 200, got %d (%s)", rec.Code, rec.Body.String())
}
// Boundary: exactly at the ceiling is allowed (MaxBytesReader permits N bytes).
atLimit := strings.Repeat("a", maxBlobBytes)
rec = httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader(atLimit))
req.ContentLength = int64(len(atLimit))
srv.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("blob exactly at the ceiling should be 200, got %d", rec.Code)
}
// Error: one byte over the ceiling (truthful Content-Length) -> 413 pre-read.
rec = httptest.NewRecorder()
req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: maxBlobBytes + 1})
req.ContentLength = maxBlobBytes + 1
srv.ServeHTTP(rec, req)
if rec.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("blob one byte over the ceiling should be 413, got %d", rec.Code)
}
}
// TestControlBodyLimit checks the smaller JSON ceiling on a non-blob route: a body
// over maxControlBodyBytes is rejected 413 before the handler runs.
func TestControlBodyLimit(t *testing.T) {
srv := dosServer(t, AuthOff)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/rooms", &zeroReader{remaining: maxControlBodyBytes + 1})
req.ContentLength = maxControlBodyBytes + 1
srv.ServeHTTP(rec, req)
if rec.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("control body over 1 MiB should be 413, got %d", rec.Code)
}
}
// TestRateLimitPerIP exercises the per-IP throttle: a burst from one IP eventually
// gets 429 (error path), while a spread across distinct IPs is never throttled
// (edge — the bucket is keyed per source, not global).
func TestRateLimitPerIP(t *testing.T) {
srv := dosServer(t, AuthOff)
// Same IP: well past the burst -> at least one 429.
got429 := false
for i := 0; i < defaultRateBurst+50; i++ {
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil)
req.RemoteAddr = "203.0.113.7:5555"
srv.ServeHTTP(rec, req)
if rec.Code == http.StatusTooManyRequests {
got429 = true
break
}
}
if !got429 {
t.Fatalf("a flood from one IP should eventually be rate-limited (429)")
}
// Distinct IPs: each gets a fresh bucket, so none is throttled.
for i := 0; i < 100; i++ {
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil)
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":4444"
srv.ServeHTTP(rec, req)
if rec.Code == http.StatusTooManyRequests {
t.Fatalf("distinct IPs must not share a rate bucket; IP #%d got 429", i)
}
}
}
+51
View File
@@ -0,0 +1,51 @@
package membership
import (
"strconv"
"testing"
"time"
)
// TestNonceCacheRememberPrune covers the replay/expiry behavior directly on the
// cache: a fresh nonce is accepted (golden), an immediate repeat is rejected
// (error), and after the TTL the same nonce is accepted again because its entry
// was pruned (edge).
func TestNonceCacheRememberPrune(t *testing.T) {
nc := newNonceCache(50*time.Millisecond, 1000)
base := time.Now()
if !nc.rememberOrReject("a", base) {
t.Fatalf("first sighting should be accepted")
}
if nc.rememberOrReject("a", base) {
t.Fatalf("an immediate replay should be rejected")
}
if !nc.rememberOrReject("a", base.Add(60*time.Millisecond)) {
t.Fatalf("after the TTL the nonce should be accepted again (pruned)")
}
}
// TestNonceCacheCapBounded covers the memory bound (audit H7): with a long TTL so
// nothing expires, inserting far more nonces than the cap must still keep the
// cache at or under the cap (oldest evicted), and the order queue must not drift
// from the map.
func TestNonceCacheCapBounded(t *testing.T) {
const capacity = 100
nc := newNonceCache(time.Hour, capacity)
base := time.Now()
for i := 0; i < 500; i++ {
nc.rememberOrReject("n"+strconv.Itoa(i), base)
}
nc.mu.Lock()
size := len(nc.seen)
orderLen := len(nc.order)
nc.mu.Unlock()
if size > capacity {
t.Fatalf("cache exceeded its cap: %d > %d", size, capacity)
}
if orderLen != size {
t.Fatalf("order queue drifted from the map: order=%d seen=%d", orderLen, size)
}
}
+88
View File
@@ -0,0 +1,88 @@
package membership
import (
"encoding/json"
"net/http"
"strings"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/frame"
)
// TestAudit_OwnerSpoof ports the auditor's H6 finding: handleCreateRoom did not
// bind the body's declared owner to the request signer, so a registered peer
// could create rooms in another identity's name. Now the owner endpoint AND the
// owner signing key must both be the authenticated signer's.
func TestAudit_OwnerSpoof(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob")
bobEp := frame.EndpointID(bob.SignPub)
victim, _ := cs.GenerateIdentity()
post := func(id cs.Identity, owner endpointJSON, nonce string) int {
body, _ := json.Marshal(createRoomReq{Subject: "some.room", Owner: owner})
code, _ := do(t, signedReq(t, h.ts.URL, "POST", "/rooms", body, id, time.Now().Unix(), nonce))
return code
}
// Error path: bob signs, body claims victim as owner -> 403.
if code := post(bob, endpointJSON{Endpoint: frame.EndpointID(victim.SignPub), SignPub: victim.SignPub, KexPub: victim.KexPub}, "spoof-1"); code != http.StatusForbidden {
t.Fatalf("owner-spoofed create should be 403, got %d", code)
}
// Edge: bob declares his own endpoint but a foreign signing key -> 403 (the
// key, not just the endpoint string, is bound to the signer).
if code := post(bob, endpointJSON{Endpoint: bobEp, SignPub: victim.SignPub, KexPub: victim.KexPub}, "spoof-2"); code != http.StatusForbidden {
t.Fatalf("create with a foreign owner key should be 403, got %d", code)
}
// Golden: alice creates a room owned by herself -> 201.
aliceEp := frame.EndpointID(h.alice.SignPub)
if code := post(h.alice, endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub}, "owner-ok"); code != http.StatusCreated {
t.Fatalf("self-owned create should be 201, got %d", code)
}
}
// TestAudit_NonceCachePoisonPreAuth ports the auditor's H7 finding: the replay
// cache was populated BEFORE the allowlist check, so any unregistered identity
// (Ed25519 keys are free) could seed nonces into it. Now IsAuthorized runs first,
// so an unauthorized identity's nonce is never cached: a repeat of the same nonce
// still fails as "not authorized", not "replayed nonce".
func TestAudit_NonceCachePoisonPreAuth(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
eve, _ := cs.GenerateIdentity() // valid signatures, NOT on the allowlist
now := time.Now().Unix()
code1, body1 := do(t, signedReq(t, h.ts.URL, "GET", "/rooms/x", nil, eve, now, "poison-nonce"))
if code1 != http.StatusUnauthorized || !strings.Contains(body1, "not authorized") {
t.Fatalf("unregistered first request should be 401 not-authorized, got %d (%s)", code1, body1)
}
// Same nonce again: if the nonce had been cached, this would report "replayed
// nonce". It must still be "not authorized" — proving the nonce was NOT cached.
code2, body2 := do(t, signedReq(t, h.ts.URL, "GET", "/rooms/x", nil, eve, now, "poison-nonce"))
if code2 != http.StatusUnauthorized {
t.Fatalf("unregistered replay should still be 401, got %d", code2)
}
if strings.Contains(body2, "replayed") {
t.Fatalf("an unauthorized identity's nonce was cached pre-auth: %s", body2)
}
if !strings.Contains(body2, "not authorized") {
t.Fatalf("second unregistered request should still be not-authorized, got: %s", body2)
}
// Positive control: an AUTHORIZED identity's replay IS still rejected, so the
// reorder did not weaken anti-replay for legitimate traffic.
if code, _ := do(t, signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "alice-live")); code != http.StatusOK {
t.Fatalf("alice's first request should be 200, got %d", code)
}
if code, body := do(t, signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "alice-live")); code != http.StatusUnauthorized || !strings.Contains(body, "replayed") {
t.Fatalf("alice's replay should be 401 replayed nonce, got %d (%s)", code, body)
}
}
+93
View File
@@ -0,0 +1,93 @@
package membership
import (
"net"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
// ipRateLimiter is a per-source-IP token-bucket rate limiter for the control
// plane. It exists to blunt pre-auth flooding: an unauthenticated peer that
// hammers the HTTP API (signature verification is not free, and io is bounded
// but still real) is throttled before it can amplify load. Like the nonceCache,
// this is transport glue specific to unibus, not a registry primitive — the
// report 0003 made the same call for the nonce cache (it would only drag a NATS
// dependency into the multi-domain registry go.mod for one helper).
//
// Each distinct IP gets its own golang.org/x/time/rate.Limiter (a standard
// token bucket already in the module graph, so no new dependency). Idle buckets
// are reaped so the map cannot grow without bound under a churn of source IPs.
type ipRateLimiter struct {
mu sync.Mutex
buckets map[string]*ipBucket
r rate.Limit
burst int
ttl time.Duration
}
type ipBucket struct {
lim *rate.Limiter
seen time.Time
}
// newIPRateLimiter builds a limiter granting r tokens/second with the given
// burst per IP. ttl bounds how long an idle bucket is retained before being
// reaped. r<=0 disables limiting (Allow always true) so dev/loopback stacks are
// unaffected.
func newIPRateLimiter(r rate.Limit, burst int, ttl time.Duration) *ipRateLimiter {
return &ipRateLimiter{
buckets: make(map[string]*ipBucket),
r: r,
burst: burst,
ttl: ttl,
}
}
// allow reports whether a request from ip may proceed now, consuming one token
// on success. A disabled limiter (r<=0) always allows. Reaping of stale buckets
// is amortized: it runs only when the map has grown past a small threshold, so
// the common path is a single map lookup under the mutex.
func (l *ipRateLimiter) allow(ip string, now time.Time) bool {
if l == nil || l.r <= 0 {
return true
}
l.mu.Lock()
defer l.mu.Unlock()
if len(l.buckets) > 1024 {
l.reapLocked(now)
}
b, ok := l.buckets[ip]
if !ok {
b = &ipBucket{lim: rate.NewLimiter(l.r, l.burst)}
l.buckets[ip] = b
}
b.seen = now
return b.lim.AllowN(now, 1)
}
// reapLocked drops buckets idle for longer than ttl. The caller holds l.mu.
func (l *ipRateLimiter) reapLocked(now time.Time) {
for ip, b := range l.buckets {
if now.Sub(b.seen) > l.ttl {
delete(l.buckets, ip)
}
}
}
// clientIP extracts the source IP of an HTTP request, stripping the port. It
// trusts the transport's RemoteAddr only (no X-Forwarded-For parsing): a public
// deployment terminates TLS at this process or behind a proxy that the operator
// controls, and honoring an attacker-supplied header would let a single IP fan
// its quota across forged identities. If parsing fails the whole RemoteAddr is
// used as the key (still a stable per-connection bucket).
func clientIP(r *http.Request) string {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr
}
return host
}
+46
View File
@@ -0,0 +1,46 @@
package membership
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
// TestRequireEncryptedRoomsRejectsCleartext is the control-plane half of the
// audit H4 minimum defense: with RequireEncryptedRooms on (the public posture),
// creating a cleartext (ModeNATS) room is refused 403, while an encrypted room is
// created normally. This is what guarantees no message ever rides the un-ACL'd
// NATS subject in the clear on a public deployment.
func TestRequireEncryptedRoomsRejectsCleartext(t *testing.T) {
srv := dosServer(t, AuthOff)
srv.RequireEncryptedRooms = true
create := func(encrypt bool) int {
body, _ := json.Marshal(createRoomReq{
Subject: "payroll.subject",
Policy: policyJSON{Encrypt: encrypt, Persist: encrypt, SignMsgs: encrypt},
Owner: endpointJSON{Endpoint: "owner-ep", SignPub: []byte("sp"), KexPub: []byte("kp")},
SealedKeySelf: []byte("sealed"),
})
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/rooms", bytes.NewReader(body)))
return rec.Code
}
// Error path: a cleartext room is refused.
if code := create(false); code != http.StatusForbidden {
t.Fatalf("cleartext room under RequireEncryptedRooms should be 403, got %d", code)
}
// Golden: an encrypted room is created.
if code := create(true); code != http.StatusCreated {
t.Fatalf("encrypted room should be 201, got %d", code)
}
// Edge: with the flag OFF (loopback/dev), cleartext rooms are allowed again.
srv.RequireEncryptedRooms = false
if code := create(false); code != http.StatusCreated {
t.Fatalf("cleartext room with the flag off should be 201, got %d", code)
}
}
+205 -21
View File
@@ -2,6 +2,7 @@ package membership
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
@@ -15,7 +16,35 @@ import (
cs "fn-registry/functions/cybersecurity"
"golang.org/x/time/rate"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/frame"
)
// Body-size ceilings for the control plane. They bound how much an unauthenticated
// peer can make the server buffer in RAM before the request is even authenticated
// (the signature is verified over the full body, so the body must be read — but
// not unboundedly). maxControlBodyBytes covers JSON metadata requests; /blobs gets
// a separate, larger ceiling because media ciphertext is legitimately bigger. A
// request whose declared Content-Length already exceeds its ceiling is rejected
// before a single byte is buffered.
const (
maxControlBodyBytes = 1 << 20 // 1 MiB for JSON control-plane requests
maxBlobBytes = 16 << 20 // 16 MiB for a single media blob upload
// MaxHeaderBytes caps request header size; wired into the http.Server by the
// command. Exported so the bound lives next to its body-size siblings.
MaxHeaderBytes = 1 << 20 // 1 MiB
)
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
// human/agent bus rather than a high-QPS API: a steady ~20 req/s with a burst of
// 40 absorbs a chat client's bursty polling while throttling a flood. Loopback
// dev stacks pass r<=0 to disable limiting entirely.
const (
defaultRatePerSec = rate.Limit(20)
defaultRateBurst = 40
rateBucketTTL = 10 * time.Minute
)
// Server is the HTTP control plane: the authoritative source of room metadata,
@@ -32,18 +61,32 @@ type Server struct {
mux *http.ServeMux
authMode AuthMode
nonces *nonceCache
limiter *ipRateLimiter
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
// rooms. It is the minimum-defensive control for the data plane (audit H4):
// the embedded NATS has no per-subject ACL, so a cleartext room is readable by
// any registered peer that knows (or guesses) its subject. Forcing every room
// to be end-to-end encrypted keeps message CONTENT confidential even when the
// transport offers no subject isolation. The command sets this on a public
// (non-loopback) bind. See dev/0004d-dataplane-acl.md for the full rationale
// and the residual metadata exposure this does NOT close.
RequireEncryptedRooms bool
}
// NewServer wires the membership store and blob store into an http.Handler. The
// authMode selects the control-plane auth rollout state (AuthOff for callers and
// tests that have not migrated to signed requests yet).
// tests that have not migrated to signed requests yet). It installs a per-IP
// rate limiter with the package defaults; loopback dev behavior is unchanged
// because the burst comfortably exceeds any single client's request rate.
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
s := &Server{
store: store,
blobs: blobs,
mux: http.NewServeMux(),
authMode: authMode,
nonces: newNonceCache(nonceTTL),
nonces: newNonceCache(nonceTTL, maxNonceCacheEntries),
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
}
s.routes()
return s
@@ -53,23 +96,54 @@ func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server
// (signature verification + anti-replay + allowlist) ahead of the router
// according to authMode, then dispatches to the matched handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
now := time.Now()
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
// shed at the cheapest possible point. The health probe is exempt so liveness
// checks are never throttled.
if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
return
}
// Cap how much body we will buffer, BEFORE reading a single byte. The ceiling
// is per-route: /blobs may legitimately carry a media ciphertext, everything
// else is small JSON. A declared Content-Length over the ceiling is rejected
// outright (no buffering); MaxBytesReader then guards against a lying or
// chunked sender by failing the read once the limit is crossed. This is the
// fix for the pre-auth DoS: without it an unauthenticated peer could make the
// server buffer an unbounded body in RAM before authenticate() ever ran.
limit := int64(maxControlBodyBytes)
if r.Method == http.MethodPost && r.URL.Path == "/blobs" {
limit = int64(maxBlobBytes)
}
if r.ContentLength > limit {
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
return
}
r.Body = http.MaxBytesReader(w, r.Body, limit)
if s.authMode == AuthOff || isAuthExempt(r) {
s.mux.ServeHTTP(w, r)
return
}
// Buffer the body so the signature can be verified over it and the handler
// still reads it. Bodies on the control plane are small (JSON metadata or a
// media blob already capped upstream), so full buffering is acceptable.
// Buffer the (now bounded) body so the signature can be verified over it and
// the handler still reads it.
body, err := io.ReadAll(r.Body)
if err != nil {
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
if isBodyTooLarge(err) {
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
return
}
writeErr(w, http.StatusBadRequest, "read body")
return
}
_ = r.Body.Close()
r.Body = io.NopCloser(bytes.NewReader(body))
if _, err := s.authenticate(r, body, time.Now()); err != nil {
res, err := s.authenticate(r, body, now)
if err != nil {
if s.authMode == AuthSoft {
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
s.mux.ServeHTTP(w, r)
@@ -78,7 +152,53 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
return
}
s.mux.ServeHTTP(w, r)
// Carry the authenticated signer's endpoint into the handler so room handlers
// can authorize by membership (audit H3). Only set on a verified identity.
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint)))
}
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
// when the body exceeds its limit, so the middleware can map it to 413.
func isBodyTooLarge(err error) bool {
var maxErr *http.MaxBytesError
return errors.As(err, &maxErr)
}
// ctxKey is the unexported type for this package's request-context keys, so the
// values cannot collide with keys set by other packages.
type ctxKey int
const ctxSignerEndpoint ctxKey = iota
// withSigner returns a context carrying the authenticated signer's endpoint id.
func withSigner(ctx context.Context, endpoint string) context.Context {
return context.WithValue(ctx, ctxSignerEndpoint, endpoint)
}
// signerEndpoint returns the authenticated signer's endpoint id and whether one
// is present. It is absent under AuthOff (no verification) and when a soft-mode
// request was let through unauthenticated — in both cases membership
// authorization is skipped, preserving dev/legacy behavior.
func signerEndpoint(r *http.Request) (string, bool) {
v, ok := r.Context().Value(ctxSignerEndpoint).(string)
return v, ok && v != ""
}
// requireMember authorizes a room request by membership (audit H3): it returns
// the signer endpoint and true when the request may proceed, or writes 403 and
// returns false when an authenticated signer is not a member of roomID. When no
// authenticated signer is present (AuthOff/dev, or soft pass-through) it allows
// the request — membership is only enforced once the caller's identity is known.
func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID string) (string, bool) {
signer, ok := signerEndpoint(r)
if !ok {
return "", true
}
if _, err := s.store.GetMember(roomID, signer); err != nil {
writeErr(w, http.StatusForbidden, "forbidden: not a member of this room")
return signer, false
}
return signer, true
}
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
@@ -188,6 +308,15 @@ func writeErr(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]string{"error": msg})
}
// writeServerErr logs the internal error detail and returns ONLY a generic
// message to the client (audit H12): raw store/blob errors embed SQL fragments
// and filesystem paths, which must not leak to a caller. Use it for any error
// that originates inside the server (5xx, or a not-found wrapping a store error).
func writeServerErr(w http.ResponseWriter, r *http.Request, code int, publicMsg string, err error) {
log.Printf("[handler] %s %s -> %d: %v", r.Method, r.URL.Path, code, err)
writeErr(w, code, publicMsg)
}
// canonicalSig returns the bytes to verify for a request: the request struct
// re-marshaled with its Sig field cleared. The caller passes a copy with Sig
// already zeroed. This is symmetric with how the client signs.
@@ -232,6 +361,24 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusBadRequest, "subject and owner.endpoint required")
return
}
// Data-plane minimum defense (audit H4): on a public deployment cleartext
// rooms are disabled, so no message ever rides the un-ACL'd NATS subject in
// the clear for another registered peer to sniff.
if s.RequireEncryptedRooms && !req.Policy.Encrypt {
writeErr(w, http.StatusForbidden,
"cleartext rooms are disabled on this deployment; create an encrypted (Matrix-policy) room")
return
}
// Owner binding (audit H6): the declared owner must BE the authenticated
// signer — both the endpoint id and the signing key. Otherwise a registered
// peer could create rooms in another identity's name. Enforced only when an
// authenticated signer is present (AuthOff/dev trusts the caller).
if signer, ok := signerEndpoint(r); ok {
if req.Owner.Endpoint != signer || frame.EndpointID(req.Owner.SignPub) != signer {
writeErr(w, http.StatusForbidden, "forbidden: room owner must be the authenticated signer")
return
}
}
roomID := newULID()
info := RoomInfo{
RoomID: roomID,
@@ -242,7 +389,7 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
OwnerEndpoint: req.Owner.Endpoint,
}
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusCreated, createRoomResp{RoomID: roomID})
@@ -264,7 +411,7 @@ func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) {
}
info, err := s.store.GetRoom(roomID)
if err != nil {
writeErr(w, http.StatusNotFound, err.Error())
writeServerErr(w, r, http.StatusNotFound, "room not found", err)
return
}
m := Member{
@@ -274,7 +421,7 @@ func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) {
KexPub: req.Member.KexPub,
}
if err := s.store.AddMember(roomID, m, info.Epoch, req.SealedKey); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "invited"})
@@ -287,6 +434,20 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusBadRequest, "endpoint query param required")
return
}
// A sealed room key is sealed to one identity's X25519 key. Serving it only to
// that identity (the signer) stops a registered peer from harvesting another
// member's sealed key (audit H3). Membership is implied by owning a sealed key,
// but we also require the signer to be a member for defense in depth.
if signer, ok := signerEndpoint(r); ok {
if endpoint != signer {
writeErr(w, http.StatusForbidden, "forbidden: may only fetch your own sealed key")
return
}
if _, err := s.store.GetMember(roomID, signer); err != nil {
writeErr(w, http.StatusForbidden, "forbidden: not a member of this room")
return
}
}
epoch := 0
if e := r.URL.Query().Get("epoch"); e != "" {
if n, err := strconv.Atoi(e); err == nil {
@@ -300,7 +461,7 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
"not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.")
return
}
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, keyResp{Epoch: ep, SealedKey: sealed})
@@ -308,9 +469,14 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) {
roomID := r.PathValue("id")
// Membership authorization (audit H3): the member list exposes every member's
// sign_pub + kex_pub, so it must not be served to a non-member.
if _, ok := s.requireMember(w, r, roomID); !ok {
return
}
members, err := s.store.ListMembers(roomID)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeErr(w, http.StatusInternalServerError, "internal error")
return
}
out := make([]memberJSON, 0, len(members))
@@ -326,9 +492,15 @@ func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusBadRequest, "endpoint required")
return
}
// A peer may only enumerate its OWN room directory (audit H3): otherwise any
// registered identity could map another's entire social graph of rooms.
if signer, ok := signerEndpoint(r); ok && endpoint != signer {
writeErr(w, http.StatusForbidden, "forbidden: may only list your own rooms")
return
}
rooms, err := s.store.ListRoomsForEndpoint(endpoint)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeErr(w, http.StatusInternalServerError, "internal error")
return
}
out := make([]memberRoomJSON, 0, len(rooms))
@@ -346,9 +518,12 @@ func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) {
roomID := r.PathValue("id")
if _, ok := s.requireMember(w, r, roomID); !ok {
return
}
info, err := s.store.GetRoom(roomID)
if err != nil {
writeErr(w, http.StatusNotFound, err.Error())
writeErr(w, http.StatusNotFound, "room not found")
return
}
writeJSON(w, http.StatusOK, roomResp{
@@ -378,7 +553,7 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
// Bump epoch, then store the fresh sealed keys for the remaining members,
// then remove the kicked/left members.
if err := s.store.BumpEpoch(roomID, req.NewEpoch); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
keys := make(map[string][]byte, len(req.Keys))
@@ -387,13 +562,13 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
}
if len(keys) > 0 {
if err := s.store.PutSealedKeys(roomID, req.NewEpoch, keys); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
}
for _, ep := range req.Remove {
if err := s.store.RemoveMember(roomID, ep); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
}
@@ -401,14 +576,23 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) {
// The body arrives already bounded: ServeHTTP wraps it in a MaxBytesReader
// (maxBlobBytes) and rejects an over-declared Content-Length before this
// handler runs, in every auth mode. Reading here therefore cannot buffer
// more than the ceiling; a sender that lies about its length (e.g. chunked)
// trips MaxBytesReader and we map that to 413 rather than a generic 400.
data, err := io.ReadAll(r.Body)
if err != nil {
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
if isBodyTooLarge(err) {
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
return
}
writeErr(w, http.StatusBadRequest, "read body")
return
}
hash, err := s.blobs.Put(data)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, blobResp{Hash: hash})
@@ -422,7 +606,7 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
}
data, err := s.blobs.Get(hash)
if err != nil {
writeErr(w, http.StatusNotFound, err.Error())
writeServerErr(w, r, http.StatusNotFound, "not found", err)
return
}
w.Header().Set("Content-Type", "application/octet-stream")