Compare commits
16 Commits
bcd02716d5
...
618f6b61da
| Author | SHA1 | Date | |
|---|---|---|---|
| 618f6b61da | |||
| d483c90356 | |||
| 1bcca987a4 | |||
| 0aa2caae43 | |||
| 957b728160 | |||
| 07f4af817e | |||
| 0d56c3c81d | |||
| fb6c796059 | |||
| e502b16675 | |||
| 47ff74d837 | |||
| b81e5f26f1 | |||
| d742f91881 | |||
| 30577145ce | |||
| 01e2ee1aa0 | |||
| e7bdcc978c | |||
| 60d6a86655 |
@@ -2,7 +2,7 @@
|
|||||||
name: unibus
|
name: unibus
|
||||||
lang: go
|
lang: go
|
||||||
domain: infra
|
domain: infra
|
||||||
version: 0.4.0
|
version: 0.5.0
|
||||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||||
tags: [service, messaging, nats, e2e]
|
tags: [service, messaging, nats, e2e]
|
||||||
uses_functions:
|
uses_functions:
|
||||||
@@ -154,6 +154,19 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
|||||||
|
|
||||||
## Capability growth log
|
## Capability growth log
|
||||||
|
|
||||||
|
- v0.5.0 (2026-06-07) — hardening de seguridad (issue 0004) que cierra los
|
||||||
|
hallazgos de la auditoría red-team (report 0004) y lleva el veredicto de
|
||||||
|
exposición pública de "NO" a "sí-con-condiciones". Anti-DoS pre-auth
|
||||||
|
(`http.MaxBytesReader` por ruta + rechazo por `Content-Length` + rate-limit
|
||||||
|
por IP + `MaxHeaderBytes`); guard de fail-open que prohíbe arrancar con bind
|
||||||
|
público o TLS sin `--bus-auth enforce`; autorización por pertenencia en los GET
|
||||||
|
de room (metadata y clave sellada solo para miembros / el propio endpoint);
|
||||||
|
rooms cleartext deshabilitadas en bind público (contenido siempre E2E, mínimo
|
||||||
|
defensivo del data plane mientras la ACL por subject llega con 0003); TLS en el
|
||||||
|
control plane HTTP con la CA propia y cliente que exige `https` cuando hay CA;
|
||||||
|
y los medios H6/H7/H12 (owner ligado al firmante, `IsAuthorized` antes del
|
||||||
|
nonce-cache con poda O(expired) + cap, errores genéricos al cliente). Cada
|
||||||
|
hallazgo lleva su test adversarial `TestAudit_*` portado como regresión.
|
||||||
- v0.4.0 (2026-06-07) — descubrimiento de rooms: `GET /members/{endpoint}/rooms`
|
- v0.4.0 (2026-06-07) — descubrimiento de rooms: `GET /members/{endpoint}/rooms`
|
||||||
lista las rooms de un endpoint con su metadata y rol, y `client.ListMyRooms()`
|
lista las rooms de un endpoint con su metadata y rol, y `client.ListMyRooms()`
|
||||||
lo consume. El control plane es pull (no hay push de invitaciones), así que un
|
lo consume. El control plane es pull (no hay push de invitaciones), así que un
|
||||||
|
|||||||
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/membership"
|
||||||
|
)
|
||||||
|
|
||||||
|
// isLoopbackBind reports whether the --bind value keeps the service reachable
|
||||||
|
// only from this host. An empty bind means "all interfaces" (public), and a
|
||||||
|
// hostname we cannot resolve to a loopback literal is treated as public — the
|
||||||
|
// conservative choice, so an unusual bind never silently slips past the guard.
|
||||||
|
func isLoopbackBind(bind string) bool {
|
||||||
|
switch bind {
|
||||||
|
case "localhost":
|
||||||
|
return true
|
||||||
|
case "":
|
||||||
|
return false // empty binds every interface
|
||||||
|
}
|
||||||
|
ip := net.ParseIP(bind)
|
||||||
|
if ip == nil {
|
||||||
|
return false // a hostname we can't classify: assume public
|
||||||
|
}
|
||||||
|
return ip.IsLoopback()
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateBootConfig is the fail-open guard (audit H2). It refuses any startup
|
||||||
|
// configuration that would expose the bus without enforced authentication:
|
||||||
|
//
|
||||||
|
// - a non-loopback --bind without --bus-auth enforce (the data plane and
|
||||||
|
// control plane would both accept anyone), and
|
||||||
|
// - --tls-cert/--tls-key without --bus-auth enforce (TLS encrypts the channel
|
||||||
|
// but authenticates no one — encrypted access for everybody is still open).
|
||||||
|
//
|
||||||
|
// It is a pure function of the parsed flags so the command can fail fast at
|
||||||
|
// startup and tests can assert the policy without booting a server.
|
||||||
|
func validateBootConfig(bind string, mode membership.AuthMode, tlsCert, tlsKey string) error {
|
||||||
|
if !isLoopbackBind(bind) && mode != membership.AuthEnforce {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"refusing to start: --bind %q is not loopback but --bus-auth is %q; a public bind requires --bus-auth enforce (or bind 127.0.0.1 for local dev)",
|
||||||
|
bind, mode)
|
||||||
|
}
|
||||||
|
if (tlsCert != "" || tlsKey != "") && mode != membership.AuthEnforce {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"refusing to start: --tls-cert/--tls-key set but --bus-auth is %q; TLS without enforced auth is fail-open (encrypted channel, no authentication) — set --bus-auth enforce",
|
||||||
|
mode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,72 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/membership"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAudit_FailOpenTLSWithoutAuth ports the auditor's H2 vector. Before the
|
||||||
|
// guard, booting with TLS on but the authenticator off ("--bind 0.0.0.0
|
||||||
|
// --tls-cert … " without enforce) produced an encrypted data plane that an
|
||||||
|
// unregistered, nkey-less client could still connect to — a fail-open config
|
||||||
|
// wearing the appearance of security. validateBootConfig now refuses it, so the
|
||||||
|
// insecure server never starts (the client therefore has nothing to connect to).
|
||||||
|
func TestAudit_FailOpenTLSWithoutAuth(t *testing.T) {
|
||||||
|
// The exact auditor configuration: public bind, TLS provided, auth off.
|
||||||
|
err := validateBootConfig("0.0.0.0", membership.AuthOff, "server.crt", "server.key")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("TLS without enforce on a public bind must be refused at startup")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "enforce") {
|
||||||
|
t.Fatalf("error should point the operator at --bus-auth enforce, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// And TLS without enforce is rejected even on loopback: TLS implies a
|
||||||
|
// security posture, so authenticating no one is always a misconfiguration.
|
||||||
|
if err := validateBootConfig("127.0.0.1", membership.AuthOff, "server.crt", "server.key"); err == nil {
|
||||||
|
t.Fatalf("TLS flags without enforce must be refused regardless of bind")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBootConfigPolicy is the full table: the golden secure-public config is
|
||||||
|
// allowed, dev loopback is allowed, and every fail-open shape is refused.
|
||||||
|
func TestBootConfigPolicy(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
bind string
|
||||||
|
mode membership.AuthMode
|
||||||
|
cert string
|
||||||
|
key string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// Golden: the intended public production config.
|
||||||
|
{"public+enforce+tls", "0.0.0.0", membership.AuthEnforce, "s.crt", "s.key", false},
|
||||||
|
{"public+enforce+notls", "0.0.0.0", membership.AuthEnforce, "", "", false},
|
||||||
|
// Edge: local dev on loopback may stay open (no auth, no TLS).
|
||||||
|
{"loopback+off", "127.0.0.1", membership.AuthOff, "", "", false},
|
||||||
|
{"loopback-ipv6+off", "::1", membership.AuthOff, "", "", false},
|
||||||
|
{"localhost+off", "localhost", membership.AuthOff, "", "", false},
|
||||||
|
{"loopback+soft", "127.0.0.1", membership.AuthSoft, "", "", false},
|
||||||
|
// Error: public bind without enforce.
|
||||||
|
{"public+off", "0.0.0.0", membership.AuthOff, "", "", true},
|
||||||
|
{"public+soft", "0.0.0.0", membership.AuthSoft, "", "", true},
|
||||||
|
{"lan-ip+off", "192.168.1.10", membership.AuthOff, "", "", true},
|
||||||
|
{"empty-bind+off", "", membership.AuthOff, "", "", true},
|
||||||
|
// Error: TLS flags without enforce (cert or key alone is enough to trip it).
|
||||||
|
{"loopback+tlscert+off", "127.0.0.1", membership.AuthOff, "s.crt", "", true},
|
||||||
|
{"loopback+tlskey+soft", "127.0.0.1", membership.AuthSoft, "", "s.key", true},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
err := validateBootConfig(c.bind, c.mode, c.cert, c.key)
|
||||||
|
if c.wantErr && err == nil {
|
||||||
|
t.Fatalf("config %+v should be refused", c)
|
||||||
|
}
|
||||||
|
if !c.wantErr && err != nil {
|
||||||
|
t.Fatalf("config %+v should be allowed, got: %v", c, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
+43
-5
@@ -6,6 +6,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -52,6 +53,13 @@ func main() {
|
|||||||
log.Fatalf("%v", err)
|
log.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fail-open guard (audit H2): a non-loopback bind, or any TLS flag, demands
|
||||||
|
// --bus-auth enforce. This makes an insecure public startup impossible rather
|
||||||
|
// than silently exposing the bus with the appearance of security.
|
||||||
|
if err := validateBootConfig(*bind, authMode, *tlsCert, *tlsKey); err != nil {
|
||||||
|
log.Fatalf("%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||||
log.SetPrefix("[membershipd] ")
|
log.SetPrefix("[membershipd] ")
|
||||||
|
|
||||||
@@ -111,15 +119,45 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
srv := membership.NewServer(store, blobs, authMode)
|
srv := membership.NewServer(store, blobs, authMode)
|
||||||
|
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
|
||||||
|
// has no per-subject ACL, so cleartext content would be readable by any
|
||||||
|
// registered peer. Forcing E2E keeps message content confidential regardless
|
||||||
|
// (audit H4 minimum defense; see dev/0004d-dataplane-acl.md).
|
||||||
|
if !isLoopbackBind(*bind) {
|
||||||
|
srv.RequireEncryptedRooms = true
|
||||||
|
log.Printf("cleartext rooms: DISABLED (public bind requires end-to-end encryption)")
|
||||||
|
}
|
||||||
log.Printf("control-plane auth: %s", authMode)
|
log.Printf("control-plane auth: %s", authMode)
|
||||||
addr := *bind + ":" + *httpPort
|
addr := *bind + ":" + *httpPort
|
||||||
httpSrv := &http.Server{Addr: addr, Handler: srv}
|
httpSrv := &http.Server{
|
||||||
|
Addr: addr,
|
||||||
|
Handler: srv,
|
||||||
|
// Bound request header size so a peer cannot exhaust memory with huge
|
||||||
|
// headers before any body limit applies (the body ceilings live in the
|
||||||
|
// membership middleware).
|
||||||
|
MaxHeaderBytes: membership.MaxHeaderBytes,
|
||||||
|
ReadHeaderTimeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Printf("HTTP control-plane API: http://%s", addr)
|
var serveErr error
|
||||||
log.Printf(" health: http://%s/healthz", addr)
|
if *tlsCert != "" {
|
||||||
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
// Serve the control plane over TLS with the same CA-signed cert as the
|
||||||
log.Fatalf("http server: %v", err)
|
// data plane (audit H5): metadata (subjects, pubkeys, sealed keys, the
|
||||||
|
// social graph) is no longer readable by a network MITM. The fail-open
|
||||||
|
// guard already requires --bus-auth enforce alongside these flags.
|
||||||
|
httpSrv.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
|
||||||
|
log.Printf("HTTPS control-plane API: https://%s", addr)
|
||||||
|
log.Printf(" health: https://%s/healthz", addr)
|
||||||
|
log.Printf("control-plane TLS: ON (%s)", *tlsCert)
|
||||||
|
serveErr = httpSrv.ListenAndServeTLS(*tlsCert, *tlsKey)
|
||||||
|
} else {
|
||||||
|
log.Printf("HTTP control-plane API: http://%s", addr)
|
||||||
|
log.Printf(" health: http://%s/healthz", addr)
|
||||||
|
serveErr = httpSrv.ListenAndServe()
|
||||||
|
}
|
||||||
|
if serveErr != nil && serveErr != http.ErrServerClosed {
|
||||||
|
log.Fatalf("http server: %v", serveErr)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,80 @@
|
|||||||
|
# 0004d — Data-plane access control on NATS (audit H4)
|
||||||
|
|
||||||
|
## The finding
|
||||||
|
|
||||||
|
The NATS authenticator (`pkg/busauth`) decides one thing per connection:
|
||||||
|
*is this identity registered on the bus?* It does **not** scope what a connected
|
||||||
|
client may subscribe to or publish. There is a single NATS account with no
|
||||||
|
`Permissions`, so any registered peer can subscribe to, or publish on, **any**
|
||||||
|
subject. Concretely:
|
||||||
|
|
||||||
|
- A cleartext room (`ModeNATS`) carries its payload in the clear on its subject.
|
||||||
|
A registered peer that knows or guesses the subject subscribes and reads the
|
||||||
|
content directly (the auditor's `TestAudit_NoSubjectACL`: eve, never invited,
|
||||||
|
receives `"internal: salary numbers"`).
|
||||||
|
- An encrypted room (`ModeMatrix`) keeps its **content** confidential (the
|
||||||
|
payload is AEAD ciphertext), but the **metadata of traffic** — that a subject
|
||||||
|
is active, message sizes and timing, who is publishing — is still observable by
|
||||||
|
any registered peer that subscribes to the subject.
|
||||||
|
|
||||||
|
## Why the "complete" fix does not fit here
|
||||||
|
|
||||||
|
The preferred fix is per-subject permissions derived from room membership: when a
|
||||||
|
client connects, the authenticator looks up the rooms it belongs to and grants
|
||||||
|
`Sub`/`Pub` only on those subjects. NATS supports this — `CustomClientAuthentication`
|
||||||
|
can register a `*server.User` carrying `Permissions`.
|
||||||
|
|
||||||
|
The blocker is that **NATS evaluates permissions once, at connect time, and never
|
||||||
|
re-evaluates them on a live connection.** unibus clients routinely *connect → create
|
||||||
|
or get invited to a room → publish/subscribe* within the **same** connection
|
||||||
|
(`TestSecureBusEndToEnd` does exactly this: A connects, then creates `room.secure`,
|
||||||
|
then publishes to it). Permissions frozen at connect time would not include a room
|
||||||
|
created or joined afterwards, so the legitimate owner could not publish to the room
|
||||||
|
it just made. Making per-subject ACLs work would therefore require the client to
|
||||||
|
**reconnect on every membership change**, an invasive change to the client library
|
||||||
|
and to every peer (worker, chat, mobile) — and the prompt for this issue scopes the
|
||||||
|
client changes to the minimum.
|
||||||
|
|
||||||
|
That dynamic-membership reconnection model is precisely the redesign that issue
|
||||||
|
**0003** (decentralization) already has to do: it moves the control-plane state to a
|
||||||
|
replicated JetStream KV and reworks how nodes and clients (re)establish sessions. Per
|
||||||
|
the issue's own guidance ("if a complete strategy does not fit, implement the minimum
|
||||||
|
defense and document the rest"), the full subject ACL is deferred to 0003, where the
|
||||||
|
session/permission model is being rebuilt anyway.
|
||||||
|
|
||||||
|
## The strategy implemented here: forbid cleartext rooms in public
|
||||||
|
|
||||||
|
`Server.RequireEncryptedRooms` (set by `membershipd` on any non-loopback bind)
|
||||||
|
refuses to create a cleartext (`ModeNATS`) room. Every room on a public deployment
|
||||||
|
is therefore end-to-end encrypted, so **message content stays confidential even
|
||||||
|
though the transport offers no subject isolation**: a peer that sniffs another
|
||||||
|
room's subject receives only AEAD ciphertext it has no key for.
|
||||||
|
|
||||||
|
This composes with the 0004c control-plane authorization: a non-member cannot even
|
||||||
|
learn a room's subject through the control plane (`GET /rooms/{id}` → 403), so to
|
||||||
|
sniff it an attacker must already know or guess the subject out of band.
|
||||||
|
|
||||||
|
## What this does NOT close (residual exposure, by design)
|
||||||
|
|
||||||
|
- **Traffic metadata.** A registered peer that already knows a subject can still
|
||||||
|
subscribe and observe that the subject is active, the ciphertext sizes, and the
|
||||||
|
timing/cadence of messages. It cannot read content.
|
||||||
|
- **Cross-room publish.** A registered peer can still *publish* arbitrary bytes on
|
||||||
|
any subject. In an encrypted room those bytes fail AEAD open and the signature
|
||||||
|
check (`SignMsgs`), so receivers drop them — it is a nuisance/spam vector, not a
|
||||||
|
confidentiality or integrity break.
|
||||||
|
- **WireGuard-only deployments** may still use cleartext rooms (the guard only trips
|
||||||
|
on a public bind), because the network already restricts who can reach the bus.
|
||||||
|
|
||||||
|
Closing the residual metadata exposure requires the per-subject ACL described above,
|
||||||
|
tracked for issue 0003.
|
||||||
|
|
||||||
|
## Regression evidence
|
||||||
|
|
||||||
|
- `pkg/membership` — `TestRequireEncryptedRoomsRejectsCleartext`: with
|
||||||
|
`RequireEncryptedRooms` on, `POST /rooms` for a cleartext policy returns 403 while
|
||||||
|
an encrypted-room create returns 201.
|
||||||
|
- `pkg/client` — `TestAudit_NoSubjectACL`: under the public posture, creating a
|
||||||
|
`ModeNATS` room fails; alice creates an encrypted room and publishes; eve (a
|
||||||
|
registered non-member) raw-subscribes to the subject and receives only ciphertext —
|
||||||
|
she never recovers the plaintext.
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
---
|
---
|
||||||
issue: 0004
|
issue: 0004
|
||||||
title: Hardening de seguridad — autorización, anti-DoS y confidencialidad antes de exponer público
|
title: Hardening de seguridad — autorización, anti-DoS y confidencialidad antes de exponer público
|
||||||
status: spec
|
status: done
|
||||||
created: 2026-06-07
|
created: 2026-06-07
|
||||||
|
completed: 2026-06-07
|
||||||
|
report: projects/message_bus/reports/0005-2026-06-07-unibus-security-hardening.md
|
||||||
domain: security
|
domain: security
|
||||||
scope: unibus (pkg/membership/server.go, auth.go, pkg/embeddednats, pkg/client, cmd/membershipd, deploy/tls)
|
scope: unibus (pkg/membership/server.go, auth.go, pkg/embeddednats, pkg/client, cmd/membershipd, deploy/tls)
|
||||||
depends_on: 0001 (cierra los gaps que la auditoría 0004 encontró sobre lo entregado en 0001)
|
depends_on: 0001 (cierra los gaps que la auditoría 0004 encontró sobre lo entregado en 0001)
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ require (
|
|||||||
fn-registry v0.0.0-00010101000000-000000000000
|
fn-registry v0.0.0-00010101000000-000000000000
|
||||||
github.com/nats-io/nats-server/v2 v2.10.22
|
github.com/nats-io/nats-server/v2 v2.10.22
|
||||||
github.com/nats-io/nats.go v1.37.0
|
github.com/nats-io/nats.go v1.37.0
|
||||||
|
github.com/nats-io/nkeys v0.4.7
|
||||||
github.com/oklog/ulid/v2 v2.1.0
|
github.com/oklog/ulid/v2 v2.1.0
|
||||||
|
golang.org/x/time v0.7.0
|
||||||
modernc.org/sqlite v1.47.0
|
modernc.org/sqlite v1.47.0
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,7 +21,6 @@ require (
|
|||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/minio/highwayhash v1.0.3 // indirect
|
github.com/minio/highwayhash v1.0.3 // indirect
|
||||||
github.com/nats-io/jwt/v2 v2.5.8 // indirect
|
github.com/nats-io/jwt/v2 v2.5.8 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
@@ -29,7 +30,6 @@ require (
|
|||||||
golang.org/x/sync v0.20.0 // indirect
|
golang.org/x/sync v0.20.0 // indirect
|
||||||
golang.org/x/sys v0.44.0 // indirect
|
golang.org/x/sys v0.44.0 // indirect
|
||||||
golang.org/x/text v0.37.0 // indirect
|
golang.org/x/text v0.37.0 // indirect
|
||||||
golang.org/x/time v0.7.0 // indirect
|
|
||||||
golang.org/x/tools v0.45.0 // indirect
|
golang.org/x/tools v0.45.0 // indirect
|
||||||
modernc.org/libc v1.70.0 // indirect
|
modernc.org/libc v1.70.0 // indirect
|
||||||
modernc.org/mathutil v1.7.1 // indirect
|
modernc.org/mathutil v1.7.1 // indirect
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
|
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
@@ -47,6 +49,10 @@ golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
|||||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||||
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
|
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
|
||||||
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
|
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
|
||||||
|
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
|
||||||
|
golang.org/x/tools/go/expect v0.1.1-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
|
||||||
|
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM=
|
||||||
|
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
|
||||||
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
|
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
|
||||||
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
||||||
modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw=
|
modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw=
|
||||||
|
|||||||
+26
-5
@@ -24,6 +24,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -67,10 +68,15 @@ type Options struct {
|
|||||||
// with an nkey to a server that does not advertise nkey auth ("nkeys not
|
// with an nkey to a server that does not advertise nkey auth ("nkeys not
|
||||||
// supported by the server"), so this is opt-in rather than always-on.
|
// supported by the server"), so this is opt-in rather than always-on.
|
||||||
UseNkey bool
|
UseNkey bool
|
||||||
// TLS, when non-nil, secures the NATS connection and pins the server to this
|
// TLS, when non-nil, secures the NATS (data plane) connection and pins the
|
||||||
// config's RootCAs (the bus's self-signed CA). Build it with
|
// server to this config's RootCAs (the bus's self-signed CA). Build it with
|
||||||
// busauth.LoadCATLSConfig(caPath). Nil keeps the connection plaintext.
|
// busauth.LoadCATLSConfig(caPath). Nil keeps the data plane plaintext.
|
||||||
TLS *tls.Config
|
TLS *tls.Config
|
||||||
|
// CtrlTLS, when non-nil, secures the HTTP control-plane connection and pins it
|
||||||
|
// to this config's RootCAs. It is separate from TLS so the two planes can be
|
||||||
|
// secured independently (a test may TLS one and not the other); production
|
||||||
|
// sets both to the same CA via Connect. Nil keeps the control plane plaintext.
|
||||||
|
CtrlTLS *tls.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// New connects to NATS and records the control-plane URL with default Options
|
// New connects to NATS and records the control-plane URL with default Options
|
||||||
@@ -90,11 +96,19 @@ func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, e
|
|||||||
if caPath == "" {
|
if caPath == "" {
|
||||||
return New(natsURL, ctrlURL, id)
|
return New(natsURL, ctrlURL, id)
|
||||||
}
|
}
|
||||||
|
// A CA implies the bus is TLS on BOTH planes. Refuse a plaintext control-plane
|
||||||
|
// URL: signing gives integrity, not confidentiality, so sending metadata over
|
||||||
|
// http:// when the operator provisioned a CA would silently leak it to a MITM
|
||||||
|
// (audit H5). Force https rather than silently downgrade.
|
||||||
|
if !strings.HasPrefix(ctrlURL, "https://") {
|
||||||
|
return nil, fmt.Errorf("client: control-plane URL %q must be https:// when a CA is provided", ctrlURL)
|
||||||
|
}
|
||||||
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("client: load CA %q: %w", caPath, err)
|
return nil, fmt.Errorf("client: load CA %q: %w", caPath, err)
|
||||||
}
|
}
|
||||||
return NewWithOptions(natsURL, ctrlURL, id, Options{UseNkey: true, TLS: tlsCfg})
|
// Pin the same CA on both planes: nkey+TLS on NATS, TLS on the HTTP control plane.
|
||||||
|
return NewWithOptions(natsURL, ctrlURL, id, Options{UseNkey: true, TLS: tlsCfg, CtrlTLS: tlsCfg})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWithOptions is New with explicit connection options (nkey auth, and, from
|
// NewWithOptions is New with explicit connection options (nkey auth, and, from
|
||||||
@@ -125,13 +139,20 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli
|
|||||||
nc.Close()
|
nc.Close()
|
||||||
return nil, fmt.Errorf("client: init jetstream: %w", err)
|
return nil, fmt.Errorf("client: init jetstream: %w", err)
|
||||||
}
|
}
|
||||||
|
// The control-plane HTTP client pins the bus CA when CtrlTLS is set, so an
|
||||||
|
// https:// control plane is verified against the bus's own CA rather than the
|
||||||
|
// system roots (audit H5). Without it the client stays plaintext for dev.
|
||||||
|
httpClient := &http.Client{Timeout: 10 * time.Second}
|
||||||
|
if opts.CtrlTLS != nil {
|
||||||
|
httpClient.Transport = &http.Transport{TLSClientConfig: opts.CtrlTLS.Clone()}
|
||||||
|
}
|
||||||
return &Client{
|
return &Client{
|
||||||
id: id,
|
id: id,
|
||||||
endpoint: frame.EndpointID(id.SignPub),
|
endpoint: frame.EndpointID(id.SignPub),
|
||||||
nc: nc,
|
nc: nc,
|
||||||
js: js,
|
js: js,
|
||||||
ctrlURL: ctrlURL,
|
ctrlURL: ctrlURL,
|
||||||
http: &http.Client{Timeout: 10 * time.Second},
|
http: httpClient,
|
||||||
keyCache: map[string]map[int][]byte{},
|
keyCache: map[string]map[int][]byte{},
|
||||||
signCache: map[string][]byte{},
|
signCache: map[string][]byte{},
|
||||||
}, nil
|
}, nil
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ type testHarness struct {
|
|||||||
ns *server.Server
|
ns *server.Server
|
||||||
httpts *httptest.Server
|
httpts *httptest.Server
|
||||||
store *membership.Store
|
store *membership.Store
|
||||||
|
srv *membership.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
func freePort(t *testing.T) int {
|
func freePort(t *testing.T) int {
|
||||||
@@ -98,7 +99,7 @@ func bootHarness(t *testing.T, ctrlMode membership.AuthMode, natsAuth bool, nats
|
|||||||
srv := membership.NewServer(store, blobs, ctrlMode)
|
srv := membership.NewServer(store, blobs, ctrlMode)
|
||||||
httpts := httptest.NewServer(srv)
|
httpts := httptest.NewServer(srv)
|
||||||
|
|
||||||
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store}
|
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store, srv: srv}
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
httpts.Close()
|
httpts.Close()
|
||||||
store.Close()
|
store.Close()
|
||||||
|
|||||||
@@ -0,0 +1,87 @@
|
|||||||
|
package client_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||||
|
"github.com/enmanuel/unibus/pkg/client"
|
||||||
|
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||||
|
"github.com/enmanuel/unibus/pkg/membership"
|
||||||
|
"github.com/enmanuel/unibus/pkg/room"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestConnectRequiresHTTPSWithCA covers audit H5's client contract: when a CA is
|
||||||
|
// provided the control-plane URL must be https://. A signed request gives
|
||||||
|
// integrity but not confidentiality, so silently talking http:// to a bus the
|
||||||
|
// operator secured with a CA would leak all metadata to a MITM. Connect refuses
|
||||||
|
// the plaintext URL outright (error path; the scheme is checked before any
|
||||||
|
// network use, so a bogus CA path is irrelevant).
|
||||||
|
func TestConnectRequiresHTTPSWithCA(t *testing.T) {
|
||||||
|
_, err := client.Connect("nats://127.0.0.1:4222", "http://127.0.0.1:8470", mustIdentity(t), "/nonexistent/ca.crt")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Connect with a CA and an http:// control plane must be refused")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "https") {
|
||||||
|
t.Fatalf("error should point the caller at https, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestControlPlaneOverTLS proves the control plane works over TLS pinned to the
|
||||||
|
// bus CA (golden) and that a client lacking the CA cannot complete the handshake
|
||||||
|
// (error path) — so a network observer can neither read nor inject control-plane
|
||||||
|
// traffic. The data plane is left plaintext here to isolate the HTTP-TLS wiring.
|
||||||
|
func TestControlPlaneOverTLS(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("store: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("blobs: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||||
|
StoreDir: filepath.Join(dir, "js"), Host: "127.0.0.1", Port: freePort(t),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("nats: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||||
|
natsURL := embeddednats.ClientURL(ns)
|
||||||
|
|
||||||
|
// An https control plane wrapping the real membership server.
|
||||||
|
ts := httptest.NewTLSServer(membership.NewServer(store, blobs, membership.AuthOff))
|
||||||
|
t.Cleanup(ts.Close)
|
||||||
|
|
||||||
|
pool := x509.NewCertPool()
|
||||||
|
pool.AddCert(ts.Certificate())
|
||||||
|
|
||||||
|
// Golden: trusting the control-plane CA, an https control-plane request works.
|
||||||
|
good, err := client.NewWithOptions(natsURL, ts.URL, mustIdentity(t),
|
||||||
|
client.Options{CtrlTLS: &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect with the pinned CA: %v", err)
|
||||||
|
}
|
||||||
|
defer good.Close()
|
||||||
|
if _, err := good.CreateRoom("room.tls.ctrl", room.ModeNATS); err != nil {
|
||||||
|
t.Fatalf("control plane over TLS should succeed with the pinned CA: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error path: without the CA the https handshake fails, so the request errors.
|
||||||
|
bad, err := client.NewWithOptions(natsURL, ts.URL, mustIdentity(t),
|
||||||
|
client.Options{CtrlTLS: &tls.Config{RootCAs: x509.NewCertPool(), MinVersion: tls.VersionTLS12}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("nats connect (bad CA case): %v", err)
|
||||||
|
}
|
||||||
|
defer bad.Close()
|
||||||
|
if _, err := bad.CreateRoom("room.tls.fail", room.ModeNATS); err == nil {
|
||||||
|
t.Fatalf("a control-plane request without the CA must fail the TLS handshake")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,124 @@
|
|||||||
|
package client_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/client"
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
"github.com/enmanuel/unibus/pkg/room"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAudit_NoSubjectACL ports the auditor's H4 (Alto) finding under the minimum
|
||||||
|
// defense chosen for this issue (forbid cleartext rooms in public; see
|
||||||
|
// dev/0004d-dataplane-acl.md). The NATS data plane still has no per-subject ACL,
|
||||||
|
// so the guarantee we make is CONTENT confidentiality, proven three ways:
|
||||||
|
//
|
||||||
|
// error : a cleartext (ModeNATS) room cannot be created under the public posture;
|
||||||
|
// golden: a legitimate member (bob) decrypts the secret;
|
||||||
|
// edge : eve, sniffing the raw subject off the data plane, receives only
|
||||||
|
// ciphertext — she never recovers the plaintext the auditor's eve did.
|
||||||
|
func TestAudit_NoSubjectACL(t *testing.T) {
|
||||||
|
h := newHarness(t)
|
||||||
|
h.srv.RequireEncryptedRooms = true // the public posture
|
||||||
|
waitHealth(t, h.ctrlURL)
|
||||||
|
|
||||||
|
alice, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect alice: %v", err)
|
||||||
|
}
|
||||||
|
defer alice.Close()
|
||||||
|
|
||||||
|
// Error path: a cleartext room is refused, so no payload ever rides a subject
|
||||||
|
// in the clear for a sniffer to read (the exact vector the auditor exploited).
|
||||||
|
if _, err := alice.CreateRoom("secret.subject.payroll", room.ModeNATS); err == nil {
|
||||||
|
t.Fatalf("cleartext room must be refused on a public deployment")
|
||||||
|
}
|
||||||
|
|
||||||
|
// alice creates an encrypted room and invites bob (the legitimate reader).
|
||||||
|
const subject = "secret.subject.payroll.e2e"
|
||||||
|
const secret = "internal: salary numbers"
|
||||||
|
roomID, err := alice.CreateRoom(subject, room.ModeMatrix)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("alice create encrypted room: %v", err)
|
||||||
|
}
|
||||||
|
bob, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect bob: %v", err)
|
||||||
|
}
|
||||||
|
defer bob.Close()
|
||||||
|
if err := alice.Invite(roomID, bob.Endpoint()); err != nil {
|
||||||
|
t.Fatalf("alice invite bob: %v", err)
|
||||||
|
}
|
||||||
|
if err := bob.Join(roomID); err != nil {
|
||||||
|
t.Fatalf("bob join: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Golden: bob (a member) subscribes and decrypts the secret.
|
||||||
|
var bmu sync.Mutex
|
||||||
|
var bobGot []string
|
||||||
|
bobSub, err := bob.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||||
|
bmu.Lock()
|
||||||
|
bobGot = append(bobGot, string(plaintext))
|
||||||
|
bmu.Unlock()
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("bob subscribe: %v", err)
|
||||||
|
}
|
||||||
|
defer bobSub.Unsubscribe()
|
||||||
|
|
||||||
|
// Edge: eve sniffs the raw subject directly off NATS (no membership, no key).
|
||||||
|
rawEve, err := nats.Connect(h.natsURL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("eve raw connect: %v", err)
|
||||||
|
}
|
||||||
|
defer rawEve.Close()
|
||||||
|
eveGot := make(chan []byte, 8)
|
||||||
|
if _, err := rawEve.Subscribe(subject, func(m *nats.Msg) { eveGot <- m.Data }); err != nil {
|
||||||
|
t.Fatalf("eve raw subscribe: %v", err)
|
||||||
|
}
|
||||||
|
if err := rawEve.Flush(); err != nil {
|
||||||
|
t.Fatalf("eve flush: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond) // let both subscriptions settle
|
||||||
|
|
||||||
|
if err := alice.Publish(roomID, []byte(secret)); err != nil {
|
||||||
|
t.Fatalf("alice publish: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// bob must decrypt the secret.
|
||||||
|
if !waitFor(&bmu, &bobGot, func(rs []string) bool {
|
||||||
|
for _, r := range rs {
|
||||||
|
if r == secret {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, 2*time.Second) {
|
||||||
|
t.Fatalf("bob (member) should decrypt the secret; got %v", snapshot(&bmu, &bobGot))
|
||||||
|
}
|
||||||
|
|
||||||
|
// eve must receive only ciphertext — never the plaintext.
|
||||||
|
select {
|
||||||
|
case data := <-eveGot:
|
||||||
|
if bytes.Contains(data, []byte(secret)) {
|
||||||
|
t.Fatalf("eve sniffed the plaintext off the data plane: %q", data)
|
||||||
|
}
|
||||||
|
f, err := frame.Unmarshal(data)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("eve received an undecodable frame: %v", err)
|
||||||
|
}
|
||||||
|
if string(f.Payload) == secret {
|
||||||
|
t.Fatalf("eve read the secret from the frame payload")
|
||||||
|
}
|
||||||
|
if len(f.Nonce) == 0 {
|
||||||
|
t.Fatalf("expected an AEAD-encrypted payload (non-empty nonce), got cleartext frame")
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
// eve receiving nothing is also a safe outcome; the assertion is only that
|
||||||
|
// she never gets the plaintext, which holds vacuously here.
|
||||||
|
}
|
||||||
|
}
|
||||||
+68
-23
@@ -11,6 +11,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
cs "fn-registry/functions/cybersecurity"
|
cs "fn-registry/functions/cybersecurity"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AuthMode is the control-plane authentication rollout state (feature flag
|
// AuthMode is the control-plane authentication rollout state (feature flag
|
||||||
@@ -73,6 +75,11 @@ const (
|
|||||||
const (
|
const (
|
||||||
clockSkew = 30 * time.Second
|
clockSkew = 30 * time.Second
|
||||||
nonceTTL = 60 * time.Second
|
nonceTTL = 60 * time.Second
|
||||||
|
// maxNonceCacheEntries bounds the replay cache so it cannot grow without limit
|
||||||
|
// (audit H7). With IsAuthorized now gating insertion, only authorized traffic
|
||||||
|
// is cached, so this ceiling is only approached under a legitimate burst; at
|
||||||
|
// the cap the oldest nonce is evicted (its TTL is nearly up anyway).
|
||||||
|
maxNonceCacheEntries = 100_000
|
||||||
)
|
)
|
||||||
|
|
||||||
// CanonicalRequest returns the exact bytes that are signed and verified for a
|
// CanonicalRequest returns the exact bytes that are signed and verified for a
|
||||||
@@ -89,43 +96,75 @@ func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// nonceCache remembers recently-seen nonces to reject replays. It is an
|
// nonceCache remembers recently-seen nonces to reject replays. It is an
|
||||||
// in-memory map guarded by a mutex with lazy expiry — sufficient for a single
|
// in-memory store guarded by a mutex — sufficient for a single membershipd
|
||||||
// membershipd process (the spec's chosen tradeoff over a server-issued nonce
|
// process (the spec's chosen tradeoff over a server-issued nonce round-trip). A
|
||||||
// round-trip). A distributed deployment would need a shared store.
|
// distributed deployment would need a shared store (tracked for issue 0003).
|
||||||
|
//
|
||||||
|
// Pruning is O(expired), not O(n): because the TTL is constant, insertion order
|
||||||
|
// equals expiry order, so the oldest entries (front of `order`) are exactly the
|
||||||
|
// ones that expire first (audit H7 — the previous full-map scan under the mutex
|
||||||
|
// was a CPU-amplification vector). A size cap bounds memory.
|
||||||
type nonceCache struct {
|
type nonceCache struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
seen map[string]time.Time
|
seen map[string]time.Time // nonce -> expiry
|
||||||
ttl time.Duration
|
order []string // nonces in insertion order == expiry order
|
||||||
|
ttl time.Duration
|
||||||
|
cap int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNonceCache(ttl time.Duration) *nonceCache {
|
func newNonceCache(ttl time.Duration, capacity int) *nonceCache {
|
||||||
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl}
|
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl, cap: capacity}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rememberOrReject records nonce and returns true if it was unseen, or false if
|
// rememberOrReject records nonce and returns true if it was unseen, or false if
|
||||||
// it is a replay (still live in the cache). Expired entries are pruned lazily on
|
// it is a replay (still live in the cache).
|
||||||
// each call so the map cannot grow without bound under steady traffic.
|
|
||||||
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
|
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
for k, exp := range n.seen {
|
|
||||||
if exp.Before(now) {
|
// Prune expired entries from the front (oldest first). The first live entry
|
||||||
delete(n.seen, k)
|
// ends the scan — everything behind it was inserted later and is newer.
|
||||||
|
cut := 0
|
||||||
|
for cut < len(n.order) {
|
||||||
|
exp, ok := n.seen[n.order[cut]]
|
||||||
|
if !ok {
|
||||||
|
cut++ // already evicted by the cap path below
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
if !exp.Before(now) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
delete(n.seen, n.order[cut])
|
||||||
|
cut++
|
||||||
}
|
}
|
||||||
|
if cut > 0 {
|
||||||
|
n.order = append(n.order[:0], n.order[cut:]...)
|
||||||
|
}
|
||||||
|
|
||||||
if exp, ok := n.seen[nonce]; ok && !exp.Before(now) {
|
if exp, ok := n.seen[nonce]; ok && !exp.Before(now) {
|
||||||
return false
|
return false // a live replay
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Bound memory: at capacity, evict the oldest entry (its TTL is nearly up).
|
||||||
|
for len(n.seen) >= n.cap && len(n.order) > 0 {
|
||||||
|
oldest := n.order[0]
|
||||||
|
n.order = n.order[1:]
|
||||||
|
delete(n.seen, oldest)
|
||||||
|
}
|
||||||
|
|
||||||
n.seen[nonce] = now.Add(n.ttl)
|
n.seen[nonce] = now.Add(n.ttl)
|
||||||
|
n.order = append(n.order, nonce)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// authResult is what a successful authentication yields: the verified signing
|
// authResult is what a successful authentication yields: the verified signing
|
||||||
// key (hex) and the authorized user record. Handlers may use it for fine-grained
|
// key (hex), the endpoint id derived from it, and the authorized user record.
|
||||||
// authorization (e.g. role checks) in later phases.
|
// Handlers use endpoint for membership authorization (only a member of a room
|
||||||
|
// may read its metadata/keys); user is available for role checks.
|
||||||
type authResult struct {
|
type authResult struct {
|
||||||
pubHex string
|
pubHex string
|
||||||
user User
|
endpoint string
|
||||||
|
user User
|
||||||
}
|
}
|
||||||
|
|
||||||
// authenticate verifies the signature headers on r against body and the user
|
// authenticate verifies the signature headers on r against body and the user
|
||||||
@@ -168,10 +207,9 @@ func (s *Server) authenticate(r *http.Request, body []byte, now time.Time) (auth
|
|||||||
return authResult{}, fmt.Errorf("invalid signature")
|
return authResult{}, fmt.Errorf("invalid signature")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.nonces.rememberOrReject(nonce, now) {
|
// Authorize BEFORE touching the replay cache (audit H7): an unregistered
|
||||||
return authResult{}, fmt.Errorf("replayed nonce")
|
// identity can mint valid signatures for free, so caching its nonces would let
|
||||||
}
|
// it poison/grow the cache pre-auth. Only authorized identities are remembered.
|
||||||
|
|
||||||
if !s.store.IsAuthorized(pubHex) {
|
if !s.store.IsAuthorized(pubHex) {
|
||||||
return authResult{}, fmt.Errorf("identity not authorized")
|
return authResult{}, fmt.Errorf("identity not authorized")
|
||||||
}
|
}
|
||||||
@@ -181,5 +219,12 @@ func (s *Server) authenticate(r *http.Request, body []byte, now time.Time) (auth
|
|||||||
// IsAuthorized passed but the row vanished (race with revoke): fail closed.
|
// IsAuthorized passed but the row vanished (race with revoke): fail closed.
|
||||||
return authResult{}, fmt.Errorf("identity not authorized")
|
return authResult{}, fmt.Errorf("identity not authorized")
|
||||||
}
|
}
|
||||||
return authResult{pubHex: pubHex, user: user}, nil
|
|
||||||
|
// Anti-replay last: a replayed request from an authorized identity is still
|
||||||
|
// rejected here (the nonce is already live in the cache from its first use).
|
||||||
|
if !s.nonces.rememberOrReject(nonce, now) {
|
||||||
|
return authResult{}, fmt.Errorf("replayed nonce")
|
||||||
|
}
|
||||||
|
|
||||||
|
return authResult{pubHex: pubHex, endpoint: frame.EndpointID(pub), user: user}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
cs "fn-registry/functions/cybersecurity"
|
cs "fn-registry/functions/cybersecurity"
|
||||||
|
|
||||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
)
|
)
|
||||||
|
|
||||||
// authHarness boots an in-process membershipd HTTP server in the given auth mode
|
// authHarness boots an in-process membershipd HTTP server in the given auth mode
|
||||||
@@ -88,13 +89,24 @@ func do(t *testing.T, req *http.Request) (int, string) {
|
|||||||
return resp.StatusCode, string(b)
|
return resp.StatusCode, string(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
const okPath = "/members/alice-endpoint/rooms" // always 200 with an empty list
|
// okPath is a path that authenticates and returns 200 with an empty list when
|
||||||
|
// the request carries NO membership-bound signer (AuthOff/soft/missing-headers
|
||||||
|
// tests). Under enforce, the per-endpoint room directory is now restricted to
|
||||||
|
// the signer's own endpoint (audit H3), so tests that sign as alice use
|
||||||
|
// aliceRoomsPath instead.
|
||||||
|
const okPath = "/members/alice-endpoint/rooms"
|
||||||
|
|
||||||
|
// aliceRoomsPath is alice's own room directory — the canonical "authenticated
|
||||||
|
// and authorized" 200 path under enforce after H3.
|
||||||
|
func aliceRoomsPath(h *authHarness) string {
|
||||||
|
return "/members/" + frame.EndpointID(h.alice.SignPub) + "/rooms"
|
||||||
|
}
|
||||||
|
|
||||||
// Golden: a request signed by a registered, active identity is accepted.
|
// Golden: a request signed by a registered, active identity is accepted.
|
||||||
func TestAuthGoldenAccepted(t *testing.T) {
|
func TestAuthGoldenAccepted(t *testing.T) {
|
||||||
h := newAuthHarness(t, AuthEnforce)
|
h := newAuthHarness(t, AuthEnforce)
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
code, _ := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-golden"))
|
code, _ := do(t, signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "nonce-golden"))
|
||||||
if code != http.StatusOK {
|
if code != http.StatusOK {
|
||||||
t.Fatalf("golden signed request should be 200, got %d", code)
|
t.Fatalf("golden signed request should be 200, got %d", code)
|
||||||
}
|
}
|
||||||
@@ -116,12 +128,12 @@ func TestAuthUnregisteredRejected(t *testing.T) {
|
|||||||
func TestAuthReplayRejected(t *testing.T) {
|
func TestAuthReplayRejected(t *testing.T) {
|
||||||
h := newAuthHarness(t, AuthEnforce)
|
h := newAuthHarness(t, AuthEnforce)
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
first := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
|
first := signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "nonce-replay")
|
||||||
if code, body := do(t, first); code != http.StatusOK {
|
if code, body := do(t, first); code != http.StatusOK {
|
||||||
t.Fatalf("first request should be 200, got %d (%s)", code, body)
|
t.Fatalf("first request should be 200, got %d (%s)", code, body)
|
||||||
}
|
}
|
||||||
// Identical ts + nonce + signature: a replay.
|
// Identical ts + nonce + signature: a replay.
|
||||||
second := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
|
second := signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "nonce-replay")
|
||||||
if code, body := do(t, second); code != http.StatusUnauthorized {
|
if code, body := do(t, second); code != http.StatusUnauthorized {
|
||||||
t.Fatalf("replayed request should be 401, got %d (%s)", code, body)
|
t.Fatalf("replayed request should be 401, got %d (%s)", code, body)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,119 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cs "fn-registry/functions/cybersecurity"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
)
|
||||||
|
|
||||||
|
// seedRoom inserts an encrypted room owned by alice with a sealed key for her,
|
||||||
|
// directly through the store so the test controls membership precisely. It
|
||||||
|
// returns the room id and alice's endpoint.
|
||||||
|
func seedRoom(t *testing.T, h *authHarness, subject string) (string, string) {
|
||||||
|
t.Helper()
|
||||||
|
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||||
|
roomID := newULID()
|
||||||
|
info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true}
|
||||||
|
if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed-key")); err != nil {
|
||||||
|
t.Fatalf("seed room: %v", err)
|
||||||
|
}
|
||||||
|
return roomID, aliceEp
|
||||||
|
}
|
||||||
|
|
||||||
|
// register adds id to the bus allowlist so its signed requests clear auth and
|
||||||
|
// reach the handler, where membership authorization (not mere registration) is
|
||||||
|
// what the test exercises.
|
||||||
|
func register(t *testing.T, h *authHarness, id cs.Identity, handle string) {
|
||||||
|
t.Helper()
|
||||||
|
if err := h.store.AddUser(hex.EncodeToString(id.SignPub), handle, RoleMember); err != nil {
|
||||||
|
t.Fatalf("register %s: %v", handle, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAudit_HorizontalMetadataLeak ports the auditor's H3 (Alto) finding: bob is
|
||||||
|
// REGISTERED on the bus but is NOT a member of alice's room. Before the fix the
|
||||||
|
// GET endpoints checked registration, not membership, so bob could read the
|
||||||
|
// room's subject, the full member list (with everyone's public keys), alice's
|
||||||
|
// room directory, and even alice's sealed key. Now every one of those returns
|
||||||
|
// 403 to bob, while alice (owner/member) and carol (plain member) get 200.
|
||||||
|
func TestAudit_HorizontalMetadataLeak(t *testing.T) {
|
||||||
|
h := newAuthHarness(t, AuthEnforce)
|
||||||
|
roomID, aliceEp := seedRoom(t, h, "secret.subject.payroll")
|
||||||
|
|
||||||
|
// bob: registered, never invited.
|
||||||
|
bob, _ := cs.GenerateIdentity()
|
||||||
|
register(t, h, bob, "bob")
|
||||||
|
|
||||||
|
// carol: registered AND a plain (non-owner) member — the legitimate-member edge.
|
||||||
|
carol, _ := cs.GenerateIdentity()
|
||||||
|
register(t, h, carol, "carol")
|
||||||
|
carolEp := frame.EndpointID(carol.SignPub)
|
||||||
|
if err := h.store.AddMember(roomID, Member{Endpoint: carolEp, Role: RoleMember, SignPub: carol.SignPub, KexPub: carol.KexPub}, 1, []byte("carol-sealed")); err != nil {
|
||||||
|
t.Fatalf("add carol: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
get := func(id cs.Identity, path string) int {
|
||||||
|
n++
|
||||||
|
code, _ := do(t, signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n)))
|
||||||
|
return code
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error path: bob (non-member) is forbidden on every room endpoint.
|
||||||
|
bobChecks := []struct {
|
||||||
|
name string
|
||||||
|
path string
|
||||||
|
}{
|
||||||
|
{"get room", "/rooms/" + roomID},
|
||||||
|
{"list members", "/rooms/" + roomID + "/members"},
|
||||||
|
{"alice room directory", "/members/" + aliceEp + "/rooms"},
|
||||||
|
{"alice sealed key", "/rooms/" + roomID + "/key?endpoint=" + aliceEp},
|
||||||
|
{"bob sealed key in alices room", "/rooms/" + roomID + "/key?endpoint=" + frame.EndpointID(bob.SignPub)},
|
||||||
|
}
|
||||||
|
for _, c := range bobChecks {
|
||||||
|
if code := get(bob, c.path); code != http.StatusForbidden {
|
||||||
|
t.Fatalf("bob (non-member) %s should be 403, got %d", c.name, code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Golden: alice (owner/member) reads her room's metadata, members, directory, key.
|
||||||
|
aliceChecks := []string{
|
||||||
|
"/rooms/" + roomID,
|
||||||
|
"/rooms/" + roomID + "/members",
|
||||||
|
"/members/" + aliceEp + "/rooms",
|
||||||
|
"/rooms/" + roomID + "/key?endpoint=" + aliceEp,
|
||||||
|
}
|
||||||
|
for _, p := range aliceChecks {
|
||||||
|
if code := get(h.alice, p); code != http.StatusOK {
|
||||||
|
t.Fatalf("alice (owner) %s should be 200, got %d", p, code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: carol is a plain member, not the owner — she may still read the room.
|
||||||
|
if code := get(carol, "/rooms/"+roomID); code != http.StatusOK {
|
||||||
|
t.Fatalf("carol (member) get room should be 200, got %d", code)
|
||||||
|
}
|
||||||
|
if code := get(carol, "/rooms/"+roomID+"/members"); code != http.StatusOK {
|
||||||
|
t.Fatalf("carol (member) list members should be 200, got %d", code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: carol may fetch her OWN sealed key but not alice's.
|
||||||
|
if code := get(carol, "/rooms/"+roomID+"/key?endpoint="+carolEp); code != http.StatusOK {
|
||||||
|
t.Fatalf("carol fetching her own key should be 200, got %d", code)
|
||||||
|
}
|
||||||
|
if code := get(carol, "/rooms/"+roomID+"/key?endpoint="+aliceEp); code != http.StatusForbidden {
|
||||||
|
t.Fatalf("carol fetching alice's key should be 403, got %d", code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nonceN yields a distinct nonce per request so the anti-replay cache never
|
||||||
|
// rejects a fresh, legitimately-different request inside one test.
|
||||||
|
func nonceN(i int) string {
|
||||||
|
return "authz-nonce-" + strconv.Itoa(i)
|
||||||
|
}
|
||||||
@@ -0,0 +1,206 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
// dosServer builds a Server backed by a fresh store + blob store so a test can
|
||||||
|
// drive ServeHTTP in-process (white-box) and observe its memory behavior without
|
||||||
|
// a network round trip — the same in-process technique the auditor used.
|
||||||
|
func dosServer(t *testing.T, mode AuthMode) *Server {
|
||||||
|
t.Helper()
|
||||||
|
dir := t.TempDir()
|
||||||
|
store, err := Open(filepath.Join(dir, "unibus.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open store: %v", err)
|
||||||
|
}
|
||||||
|
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open blobs: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
return NewServer(store, blobs, mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// zeroReader yields up to remaining zero bytes without ever allocating them, so
|
||||||
|
// the test process itself never materializes a huge buffer (which would taint the
|
||||||
|
// RSS measurement we are trying to make about the SERVER).
|
||||||
|
type zeroReader struct{ remaining int64 }
|
||||||
|
|
||||||
|
func (z *zeroReader) Read(p []byte) (int, error) {
|
||||||
|
if z.remaining <= 0 {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n := int64(len(p))
|
||||||
|
if n > z.remaining {
|
||||||
|
n = z.remaining
|
||||||
|
}
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
p[i] = 0
|
||||||
|
}
|
||||||
|
z.remaining -= n
|
||||||
|
return int(n), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// vmRSSkB reads the resident set size (kB) of this process from /proc. Linux-only;
|
||||||
|
// the caller skips on other platforms.
|
||||||
|
func vmRSSkB(t *testing.T) int64 {
|
||||||
|
t.Helper()
|
||||||
|
b, err := os.ReadFile("/proc/self/status")
|
||||||
|
if err != nil {
|
||||||
|
t.Skipf("cannot read /proc/self/status: %v", err)
|
||||||
|
}
|
||||||
|
for _, line := range strings.Split(string(b), "\n") {
|
||||||
|
if strings.HasPrefix(line, "VmRSS:") {
|
||||||
|
f := strings.Fields(line)
|
||||||
|
if len(f) >= 2 {
|
||||||
|
v, _ := strconv.ParseInt(f[1], 10, 64)
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Skip("VmRSS not present in /proc/self/status")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAudit_DoSBodyLimitNoAuth ports the auditor's H1 (Critical) vector: a peer
|
||||||
|
// with NO valid signature posts an oversized body. Before the fix the middleware
|
||||||
|
// io.ReadAll'd it unbounded (the auditor sent 400 MB and watched RSS jump from
|
||||||
|
// 18 MB to 898 MB). Now the request is rejected 413 and the resident set does NOT
|
||||||
|
// spike. Two shapes are covered:
|
||||||
|
//
|
||||||
|
// (1) a truthful, over-ceiling Content-Length -> rejected before any byte is read;
|
||||||
|
// (2) a lying / unknown length (chunked) -> MaxBytesReader trips mid-read,
|
||||||
|
// capping the buffered bytes at the ceiling instead of the attacker's 400 MB.
|
||||||
|
func TestAudit_DoSBodyLimitNoAuth(t *testing.T) {
|
||||||
|
if runtime.GOOS != "linux" {
|
||||||
|
t.Skip("RSS probe is Linux-only")
|
||||||
|
}
|
||||||
|
srv := dosServer(t, AuthEnforce) // enforce: the request carries no signature
|
||||||
|
|
||||||
|
const huge = int64(400) << 20 // 400 MiB — the auditor's figure
|
||||||
|
// A spike threshold an order of magnitude below the attack. The old code would
|
||||||
|
// add ~400 MB+; the fix keeps the delta to at most one bounded buffer.
|
||||||
|
const maxSpikeKB = int64(96) << 10 // 96 MiB
|
||||||
|
|
||||||
|
// Shape 1: declared Content-Length over the blob ceiling -> early 413, no read.
|
||||||
|
runtime.GC()
|
||||||
|
before := vmRSSkB(t)
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge})
|
||||||
|
req.ContentLength = huge
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||||
|
t.Fatalf("over-declared body should be 413, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
runtime.GC()
|
||||||
|
if d := vmRSSkB(t) - before; d > maxSpikeKB {
|
||||||
|
t.Fatalf("RSS spiked %d kB on a pre-declared oversized body (limit %d kB)", d, maxSpikeKB)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shape 2: unknown length (chunked-style). The middleware cannot reject by
|
||||||
|
// Content-Length, so MaxBytesReader must cap the read at maxBlobBytes.
|
||||||
|
runtime.GC()
|
||||||
|
before = vmRSSkB(t)
|
||||||
|
req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: huge})
|
||||||
|
req.ContentLength = -1
|
||||||
|
rec = httptest.NewRecorder()
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||||
|
t.Fatalf("unknown-length oversized body should be 413, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
runtime.GC()
|
||||||
|
if d := vmRSSkB(t) - before; d > maxSpikeKB {
|
||||||
|
t.Fatalf("RSS spiked %d kB on a chunked oversized body (limit %d kB)", d, maxSpikeKB)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBlobLimitGoldenAndBoundary covers the golden path (a normal blob is stored)
|
||||||
|
// and the boundary (a body exactly at the ceiling is accepted; one byte over by
|
||||||
|
// truthful Content-Length is rejected before buffering).
|
||||||
|
func TestBlobLimitGoldenAndBoundary(t *testing.T) {
|
||||||
|
srv := dosServer(t, AuthOff) // AuthOff: the limits apply regardless of auth mode
|
||||||
|
|
||||||
|
// Golden: a small blob is accepted and hashed.
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader("hello blob")))
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("normal blob should be 200, got %d (%s)", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Boundary: exactly at the ceiling is allowed (MaxBytesReader permits N bytes).
|
||||||
|
atLimit := strings.Repeat("a", maxBlobBytes)
|
||||||
|
rec = httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/blobs", strings.NewReader(atLimit))
|
||||||
|
req.ContentLength = int64(len(atLimit))
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("blob exactly at the ceiling should be 200, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error: one byte over the ceiling (truthful Content-Length) -> 413 pre-read.
|
||||||
|
rec = httptest.NewRecorder()
|
||||||
|
req = httptest.NewRequest(http.MethodPost, "/blobs", &zeroReader{remaining: maxBlobBytes + 1})
|
||||||
|
req.ContentLength = maxBlobBytes + 1
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||||
|
t.Fatalf("blob one byte over the ceiling should be 413, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestControlBodyLimit checks the smaller JSON ceiling on a non-blob route: a body
|
||||||
|
// over maxControlBodyBytes is rejected 413 before the handler runs.
|
||||||
|
func TestControlBodyLimit(t *testing.T) {
|
||||||
|
srv := dosServer(t, AuthOff)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/rooms", &zeroReader{remaining: maxControlBodyBytes + 1})
|
||||||
|
req.ContentLength = maxControlBodyBytes + 1
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||||
|
t.Fatalf("control body over 1 MiB should be 413, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRateLimitPerIP exercises the per-IP throttle: a burst from one IP eventually
|
||||||
|
// gets 429 (error path), while a spread across distinct IPs is never throttled
|
||||||
|
// (edge — the bucket is keyed per source, not global).
|
||||||
|
func TestRateLimitPerIP(t *testing.T) {
|
||||||
|
srv := dosServer(t, AuthOff)
|
||||||
|
|
||||||
|
// Same IP: well past the burst -> at least one 429.
|
||||||
|
got429 := false
|
||||||
|
for i := 0; i < defaultRateBurst+50; i++ {
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil)
|
||||||
|
req.RemoteAddr = "203.0.113.7:5555"
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code == http.StatusTooManyRequests {
|
||||||
|
got429 = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !got429 {
|
||||||
|
t.Fatalf("a flood from one IP should eventually be rate-limited (429)")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Distinct IPs: each gets a fresh bucket, so none is throttled.
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/rooms/none", nil)
|
||||||
|
req.RemoteAddr = "198.51.100." + strconv.Itoa(i%254+1) + ":4444"
|
||||||
|
srv.ServeHTTP(rec, req)
|
||||||
|
if rec.Code == http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("distinct IPs must not share a rate bucket; IP #%d got 429", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestNonceCacheRememberPrune covers the replay/expiry behavior directly on the
|
||||||
|
// cache: a fresh nonce is accepted (golden), an immediate repeat is rejected
|
||||||
|
// (error), and after the TTL the same nonce is accepted again because its entry
|
||||||
|
// was pruned (edge).
|
||||||
|
func TestNonceCacheRememberPrune(t *testing.T) {
|
||||||
|
nc := newNonceCache(50*time.Millisecond, 1000)
|
||||||
|
base := time.Now()
|
||||||
|
|
||||||
|
if !nc.rememberOrReject("a", base) {
|
||||||
|
t.Fatalf("first sighting should be accepted")
|
||||||
|
}
|
||||||
|
if nc.rememberOrReject("a", base) {
|
||||||
|
t.Fatalf("an immediate replay should be rejected")
|
||||||
|
}
|
||||||
|
if !nc.rememberOrReject("a", base.Add(60*time.Millisecond)) {
|
||||||
|
t.Fatalf("after the TTL the nonce should be accepted again (pruned)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNonceCacheCapBounded covers the memory bound (audit H7): with a long TTL so
|
||||||
|
// nothing expires, inserting far more nonces than the cap must still keep the
|
||||||
|
// cache at or under the cap (oldest evicted), and the order queue must not drift
|
||||||
|
// from the map.
|
||||||
|
func TestNonceCacheCapBounded(t *testing.T) {
|
||||||
|
const capacity = 100
|
||||||
|
nc := newNonceCache(time.Hour, capacity)
|
||||||
|
base := time.Now()
|
||||||
|
for i := 0; i < 500; i++ {
|
||||||
|
nc.rememberOrReject("n"+strconv.Itoa(i), base)
|
||||||
|
}
|
||||||
|
|
||||||
|
nc.mu.Lock()
|
||||||
|
size := len(nc.seen)
|
||||||
|
orderLen := len(nc.order)
|
||||||
|
nc.mu.Unlock()
|
||||||
|
|
||||||
|
if size > capacity {
|
||||||
|
t.Fatalf("cache exceeded its cap: %d > %d", size, capacity)
|
||||||
|
}
|
||||||
|
if orderLen != size {
|
||||||
|
t.Fatalf("order queue drifted from the map: order=%d seen=%d", orderLen, size)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,88 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cs "fn-registry/functions/cybersecurity"
|
||||||
|
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAudit_OwnerSpoof ports the auditor's H6 finding: handleCreateRoom did not
|
||||||
|
// bind the body's declared owner to the request signer, so a registered peer
|
||||||
|
// could create rooms in another identity's name. Now the owner endpoint AND the
|
||||||
|
// owner signing key must both be the authenticated signer's.
|
||||||
|
func TestAudit_OwnerSpoof(t *testing.T) {
|
||||||
|
h := newAuthHarness(t, AuthEnforce)
|
||||||
|
|
||||||
|
bob, _ := cs.GenerateIdentity()
|
||||||
|
register(t, h, bob, "bob")
|
||||||
|
bobEp := frame.EndpointID(bob.SignPub)
|
||||||
|
victim, _ := cs.GenerateIdentity()
|
||||||
|
|
||||||
|
post := func(id cs.Identity, owner endpointJSON, nonce string) int {
|
||||||
|
body, _ := json.Marshal(createRoomReq{Subject: "some.room", Owner: owner})
|
||||||
|
code, _ := do(t, signedReq(t, h.ts.URL, "POST", "/rooms", body, id, time.Now().Unix(), nonce))
|
||||||
|
return code
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error path: bob signs, body claims victim as owner -> 403.
|
||||||
|
if code := post(bob, endpointJSON{Endpoint: frame.EndpointID(victim.SignPub), SignPub: victim.SignPub, KexPub: victim.KexPub}, "spoof-1"); code != http.StatusForbidden {
|
||||||
|
t.Fatalf("owner-spoofed create should be 403, got %d", code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: bob declares his own endpoint but a foreign signing key -> 403 (the
|
||||||
|
// key, not just the endpoint string, is bound to the signer).
|
||||||
|
if code := post(bob, endpointJSON{Endpoint: bobEp, SignPub: victim.SignPub, KexPub: victim.KexPub}, "spoof-2"); code != http.StatusForbidden {
|
||||||
|
t.Fatalf("create with a foreign owner key should be 403, got %d", code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Golden: alice creates a room owned by herself -> 201.
|
||||||
|
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||||
|
if code := post(h.alice, endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub}, "owner-ok"); code != http.StatusCreated {
|
||||||
|
t.Fatalf("self-owned create should be 201, got %d", code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAudit_NonceCachePoisonPreAuth ports the auditor's H7 finding: the replay
|
||||||
|
// cache was populated BEFORE the allowlist check, so any unregistered identity
|
||||||
|
// (Ed25519 keys are free) could seed nonces into it. Now IsAuthorized runs first,
|
||||||
|
// so an unauthorized identity's nonce is never cached: a repeat of the same nonce
|
||||||
|
// still fails as "not authorized", not "replayed nonce".
|
||||||
|
func TestAudit_NonceCachePoisonPreAuth(t *testing.T) {
|
||||||
|
h := newAuthHarness(t, AuthEnforce)
|
||||||
|
|
||||||
|
eve, _ := cs.GenerateIdentity() // valid signatures, NOT on the allowlist
|
||||||
|
now := time.Now().Unix()
|
||||||
|
|
||||||
|
code1, body1 := do(t, signedReq(t, h.ts.URL, "GET", "/rooms/x", nil, eve, now, "poison-nonce"))
|
||||||
|
if code1 != http.StatusUnauthorized || !strings.Contains(body1, "not authorized") {
|
||||||
|
t.Fatalf("unregistered first request should be 401 not-authorized, got %d (%s)", code1, body1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Same nonce again: if the nonce had been cached, this would report "replayed
|
||||||
|
// nonce". It must still be "not authorized" — proving the nonce was NOT cached.
|
||||||
|
code2, body2 := do(t, signedReq(t, h.ts.URL, "GET", "/rooms/x", nil, eve, now, "poison-nonce"))
|
||||||
|
if code2 != http.StatusUnauthorized {
|
||||||
|
t.Fatalf("unregistered replay should still be 401, got %d", code2)
|
||||||
|
}
|
||||||
|
if strings.Contains(body2, "replayed") {
|
||||||
|
t.Fatalf("an unauthorized identity's nonce was cached pre-auth: %s", body2)
|
||||||
|
}
|
||||||
|
if !strings.Contains(body2, "not authorized") {
|
||||||
|
t.Fatalf("second unregistered request should still be not-authorized, got: %s", body2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Positive control: an AUTHORIZED identity's replay IS still rejected, so the
|
||||||
|
// reorder did not weaken anti-replay for legitimate traffic.
|
||||||
|
if code, _ := do(t, signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "alice-live")); code != http.StatusOK {
|
||||||
|
t.Fatalf("alice's first request should be 200, got %d", code)
|
||||||
|
}
|
||||||
|
if code, body := do(t, signedReq(t, h.ts.URL, "GET", aliceRoomsPath(h), nil, h.alice, now, "alice-live")); code != http.StatusUnauthorized || !strings.Contains(body, "replayed") {
|
||||||
|
t.Fatalf("alice's replay should be 401 replayed nonce, got %d (%s)", code, body)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,93 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ipRateLimiter is a per-source-IP token-bucket rate limiter for the control
|
||||||
|
// plane. It exists to blunt pre-auth flooding: an unauthenticated peer that
|
||||||
|
// hammers the HTTP API (signature verification is not free, and io is bounded
|
||||||
|
// but still real) is throttled before it can amplify load. Like the nonceCache,
|
||||||
|
// this is transport glue specific to unibus, not a registry primitive — the
|
||||||
|
// report 0003 made the same call for the nonce cache (it would only drag a NATS
|
||||||
|
// dependency into the multi-domain registry go.mod for one helper).
|
||||||
|
//
|
||||||
|
// Each distinct IP gets its own golang.org/x/time/rate.Limiter (a standard
|
||||||
|
// token bucket already in the module graph, so no new dependency). Idle buckets
|
||||||
|
// are reaped so the map cannot grow without bound under a churn of source IPs.
|
||||||
|
type ipRateLimiter struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
buckets map[string]*ipBucket
|
||||||
|
r rate.Limit
|
||||||
|
burst int
|
||||||
|
ttl time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type ipBucket struct {
|
||||||
|
lim *rate.Limiter
|
||||||
|
seen time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// newIPRateLimiter builds a limiter granting r tokens/second with the given
|
||||||
|
// burst per IP. ttl bounds how long an idle bucket is retained before being
|
||||||
|
// reaped. r<=0 disables limiting (Allow always true) so dev/loopback stacks are
|
||||||
|
// unaffected.
|
||||||
|
func newIPRateLimiter(r rate.Limit, burst int, ttl time.Duration) *ipRateLimiter {
|
||||||
|
return &ipRateLimiter{
|
||||||
|
buckets: make(map[string]*ipBucket),
|
||||||
|
r: r,
|
||||||
|
burst: burst,
|
||||||
|
ttl: ttl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// allow reports whether a request from ip may proceed now, consuming one token
|
||||||
|
// on success. A disabled limiter (r<=0) always allows. Reaping of stale buckets
|
||||||
|
// is amortized: it runs only when the map has grown past a small threshold, so
|
||||||
|
// the common path is a single map lookup under the mutex.
|
||||||
|
func (l *ipRateLimiter) allow(ip string, now time.Time) bool {
|
||||||
|
if l == nil || l.r <= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
|
if len(l.buckets) > 1024 {
|
||||||
|
l.reapLocked(now)
|
||||||
|
}
|
||||||
|
b, ok := l.buckets[ip]
|
||||||
|
if !ok {
|
||||||
|
b = &ipBucket{lim: rate.NewLimiter(l.r, l.burst)}
|
||||||
|
l.buckets[ip] = b
|
||||||
|
}
|
||||||
|
b.seen = now
|
||||||
|
return b.lim.AllowN(now, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reapLocked drops buckets idle for longer than ttl. The caller holds l.mu.
|
||||||
|
func (l *ipRateLimiter) reapLocked(now time.Time) {
|
||||||
|
for ip, b := range l.buckets {
|
||||||
|
if now.Sub(b.seen) > l.ttl {
|
||||||
|
delete(l.buckets, ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// clientIP extracts the source IP of an HTTP request, stripping the port. It
|
||||||
|
// trusts the transport's RemoteAddr only (no X-Forwarded-For parsing): a public
|
||||||
|
// deployment terminates TLS at this process or behind a proxy that the operator
|
||||||
|
// controls, and honoring an attacker-supplied header would let a single IP fan
|
||||||
|
// its quota across forged identities. If parsing fails the whole RemoteAddr is
|
||||||
|
// used as the key (still a stable per-connection bucket).
|
||||||
|
func clientIP(r *http.Request) string {
|
||||||
|
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||||
|
if err != nil {
|
||||||
|
return r.RemoteAddr
|
||||||
|
}
|
||||||
|
return host
|
||||||
|
}
|
||||||
@@ -0,0 +1,46 @@
|
|||||||
|
package membership
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestRequireEncryptedRoomsRejectsCleartext is the control-plane half of the
|
||||||
|
// audit H4 minimum defense: with RequireEncryptedRooms on (the public posture),
|
||||||
|
// creating a cleartext (ModeNATS) room is refused 403, while an encrypted room is
|
||||||
|
// created normally. This is what guarantees no message ever rides the un-ACL'd
|
||||||
|
// NATS subject in the clear on a public deployment.
|
||||||
|
func TestRequireEncryptedRoomsRejectsCleartext(t *testing.T) {
|
||||||
|
srv := dosServer(t, AuthOff)
|
||||||
|
srv.RequireEncryptedRooms = true
|
||||||
|
|
||||||
|
create := func(encrypt bool) int {
|
||||||
|
body, _ := json.Marshal(createRoomReq{
|
||||||
|
Subject: "payroll.subject",
|
||||||
|
Policy: policyJSON{Encrypt: encrypt, Persist: encrypt, SignMsgs: encrypt},
|
||||||
|
Owner: endpointJSON{Endpoint: "owner-ep", SignPub: []byte("sp"), KexPub: []byte("kp")},
|
||||||
|
SealedKeySelf: []byte("sealed"),
|
||||||
|
})
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/rooms", bytes.NewReader(body)))
|
||||||
|
return rec.Code
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error path: a cleartext room is refused.
|
||||||
|
if code := create(false); code != http.StatusForbidden {
|
||||||
|
t.Fatalf("cleartext room under RequireEncryptedRooms should be 403, got %d", code)
|
||||||
|
}
|
||||||
|
// Golden: an encrypted room is created.
|
||||||
|
if code := create(true); code != http.StatusCreated {
|
||||||
|
t.Fatalf("encrypted room should be 201, got %d", code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Edge: with the flag OFF (loopback/dev), cleartext rooms are allowed again.
|
||||||
|
srv.RequireEncryptedRooms = false
|
||||||
|
if code := create(false); code != http.StatusCreated {
|
||||||
|
t.Fatalf("cleartext room with the flag off should be 201, got %d", code)
|
||||||
|
}
|
||||||
|
}
|
||||||
+205
-21
@@ -2,6 +2,7 @@ package membership
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
@@ -15,7 +16,35 @@ import (
|
|||||||
|
|
||||||
cs "fn-registry/functions/cybersecurity"
|
cs "fn-registry/functions/cybersecurity"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||||
|
"github.com/enmanuel/unibus/pkg/frame"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Body-size ceilings for the control plane. They bound how much an unauthenticated
|
||||||
|
// peer can make the server buffer in RAM before the request is even authenticated
|
||||||
|
// (the signature is verified over the full body, so the body must be read — but
|
||||||
|
// not unboundedly). maxControlBodyBytes covers JSON metadata requests; /blobs gets
|
||||||
|
// a separate, larger ceiling because media ciphertext is legitimately bigger. A
|
||||||
|
// request whose declared Content-Length already exceeds its ceiling is rejected
|
||||||
|
// before a single byte is buffered.
|
||||||
|
const (
|
||||||
|
maxControlBodyBytes = 1 << 20 // 1 MiB for JSON control-plane requests
|
||||||
|
maxBlobBytes = 16 << 20 // 16 MiB for a single media blob upload
|
||||||
|
// MaxHeaderBytes caps request header size; wired into the http.Server by the
|
||||||
|
// command. Exported so the bound lives next to its body-size siblings.
|
||||||
|
MaxHeaderBytes = 1 << 20 // 1 MiB
|
||||||
|
)
|
||||||
|
|
||||||
|
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
|
||||||
|
// human/agent bus rather than a high-QPS API: a steady ~20 req/s with a burst of
|
||||||
|
// 40 absorbs a chat client's bursty polling while throttling a flood. Loopback
|
||||||
|
// dev stacks pass r<=0 to disable limiting entirely.
|
||||||
|
const (
|
||||||
|
defaultRatePerSec = rate.Limit(20)
|
||||||
|
defaultRateBurst = 40
|
||||||
|
rateBucketTTL = 10 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is the HTTP control plane: the authoritative source of room metadata,
|
// Server is the HTTP control plane: the authoritative source of room metadata,
|
||||||
@@ -32,18 +61,32 @@ type Server struct {
|
|||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
authMode AuthMode
|
authMode AuthMode
|
||||||
nonces *nonceCache
|
nonces *nonceCache
|
||||||
|
limiter *ipRateLimiter
|
||||||
|
|
||||||
|
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
||||||
|
// rooms. It is the minimum-defensive control for the data plane (audit H4):
|
||||||
|
// the embedded NATS has no per-subject ACL, so a cleartext room is readable by
|
||||||
|
// any registered peer that knows (or guesses) its subject. Forcing every room
|
||||||
|
// to be end-to-end encrypted keeps message CONTENT confidential even when the
|
||||||
|
// transport offers no subject isolation. The command sets this on a public
|
||||||
|
// (non-loopback) bind. See dev/0004d-dataplane-acl.md for the full rationale
|
||||||
|
// and the residual metadata exposure this does NOT close.
|
||||||
|
RequireEncryptedRooms bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer wires the membership store and blob store into an http.Handler. The
|
// NewServer wires the membership store and blob store into an http.Handler. The
|
||||||
// authMode selects the control-plane auth rollout state (AuthOff for callers and
|
// authMode selects the control-plane auth rollout state (AuthOff for callers and
|
||||||
// tests that have not migrated to signed requests yet).
|
// tests that have not migrated to signed requests yet). It installs a per-IP
|
||||||
|
// rate limiter with the package defaults; loopback dev behavior is unchanged
|
||||||
|
// because the burst comfortably exceeds any single client's request rate.
|
||||||
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
|
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
|
||||||
s := &Server{
|
s := &Server{
|
||||||
store: store,
|
store: store,
|
||||||
blobs: blobs,
|
blobs: blobs,
|
||||||
mux: http.NewServeMux(),
|
mux: http.NewServeMux(),
|
||||||
authMode: authMode,
|
authMode: authMode,
|
||||||
nonces: newNonceCache(nonceTTL),
|
nonces: newNonceCache(nonceTTL, maxNonceCacheEntries),
|
||||||
|
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
||||||
}
|
}
|
||||||
s.routes()
|
s.routes()
|
||||||
return s
|
return s
|
||||||
@@ -53,23 +96,54 @@ func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server
|
|||||||
// (signature verification + anti-replay + allowlist) ahead of the router
|
// (signature verification + anti-replay + allowlist) ahead of the router
|
||||||
// according to authMode, then dispatches to the matched handler.
|
// according to authMode, then dispatches to the matched handler.
|
||||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
|
||||||
|
// shed at the cheapest possible point. The health probe is exempt so liveness
|
||||||
|
// checks are never throttled.
|
||||||
|
if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
|
||||||
|
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cap how much body we will buffer, BEFORE reading a single byte. The ceiling
|
||||||
|
// is per-route: /blobs may legitimately carry a media ciphertext, everything
|
||||||
|
// else is small JSON. A declared Content-Length over the ceiling is rejected
|
||||||
|
// outright (no buffering); MaxBytesReader then guards against a lying or
|
||||||
|
// chunked sender by failing the read once the limit is crossed. This is the
|
||||||
|
// fix for the pre-auth DoS: without it an unauthenticated peer could make the
|
||||||
|
// server buffer an unbounded body in RAM before authenticate() ever ran.
|
||||||
|
limit := int64(maxControlBodyBytes)
|
||||||
|
if r.Method == http.MethodPost && r.URL.Path == "/blobs" {
|
||||||
|
limit = int64(maxBlobBytes)
|
||||||
|
}
|
||||||
|
if r.ContentLength > limit {
|
||||||
|
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.Body = http.MaxBytesReader(w, r.Body, limit)
|
||||||
|
|
||||||
if s.authMode == AuthOff || isAuthExempt(r) {
|
if s.authMode == AuthOff || isAuthExempt(r) {
|
||||||
s.mux.ServeHTTP(w, r)
|
s.mux.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Buffer the body so the signature can be verified over it and the handler
|
// Buffer the (now bounded) body so the signature can be verified over it and
|
||||||
// still reads it. Bodies on the control plane are small (JSON metadata or a
|
// the handler still reads it.
|
||||||
// media blob already capped upstream), so full buffering is acceptable.
|
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
|
if isBodyTooLarge(err) {
|
||||||
|
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeErr(w, http.StatusBadRequest, "read body")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = r.Body.Close()
|
_ = r.Body.Close()
|
||||||
r.Body = io.NopCloser(bytes.NewReader(body))
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
||||||
|
|
||||||
if _, err := s.authenticate(r, body, time.Now()); err != nil {
|
res, err := s.authenticate(r, body, now)
|
||||||
|
if err != nil {
|
||||||
if s.authMode == AuthSoft {
|
if s.authMode == AuthSoft {
|
||||||
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
|
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
|
||||||
s.mux.ServeHTTP(w, r)
|
s.mux.ServeHTTP(w, r)
|
||||||
@@ -78,7 +152,53 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
|
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.mux.ServeHTTP(w, r)
|
// Carry the authenticated signer's endpoint into the handler so room handlers
|
||||||
|
// can authorize by membership (audit H3). Only set on a verified identity.
|
||||||
|
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
|
||||||
|
// when the body exceeds its limit, so the middleware can map it to 413.
|
||||||
|
func isBodyTooLarge(err error) bool {
|
||||||
|
var maxErr *http.MaxBytesError
|
||||||
|
return errors.As(err, &maxErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ctxKey is the unexported type for this package's request-context keys, so the
|
||||||
|
// values cannot collide with keys set by other packages.
|
||||||
|
type ctxKey int
|
||||||
|
|
||||||
|
const ctxSignerEndpoint ctxKey = iota
|
||||||
|
|
||||||
|
// withSigner returns a context carrying the authenticated signer's endpoint id.
|
||||||
|
func withSigner(ctx context.Context, endpoint string) context.Context {
|
||||||
|
return context.WithValue(ctx, ctxSignerEndpoint, endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// signerEndpoint returns the authenticated signer's endpoint id and whether one
|
||||||
|
// is present. It is absent under AuthOff (no verification) and when a soft-mode
|
||||||
|
// request was let through unauthenticated — in both cases membership
|
||||||
|
// authorization is skipped, preserving dev/legacy behavior.
|
||||||
|
func signerEndpoint(r *http.Request) (string, bool) {
|
||||||
|
v, ok := r.Context().Value(ctxSignerEndpoint).(string)
|
||||||
|
return v, ok && v != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// requireMember authorizes a room request by membership (audit H3): it returns
|
||||||
|
// the signer endpoint and true when the request may proceed, or writes 403 and
|
||||||
|
// returns false when an authenticated signer is not a member of roomID. When no
|
||||||
|
// authenticated signer is present (AuthOff/dev, or soft pass-through) it allows
|
||||||
|
// the request — membership is only enforced once the caller's identity is known.
|
||||||
|
func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID string) (string, bool) {
|
||||||
|
signer, ok := signerEndpoint(r)
|
||||||
|
if !ok {
|
||||||
|
return "", true
|
||||||
|
}
|
||||||
|
if _, err := s.store.GetMember(roomID, signer); err != nil {
|
||||||
|
writeErr(w, http.StatusForbidden, "forbidden: not a member of this room")
|
||||||
|
return signer, false
|
||||||
|
}
|
||||||
|
return signer, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
|
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
|
||||||
@@ -188,6 +308,15 @@ func writeErr(w http.ResponseWriter, code int, msg string) {
|
|||||||
writeJSON(w, code, map[string]string{"error": msg})
|
writeJSON(w, code, map[string]string{"error": msg})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeServerErr logs the internal error detail and returns ONLY a generic
|
||||||
|
// message to the client (audit H12): raw store/blob errors embed SQL fragments
|
||||||
|
// and filesystem paths, which must not leak to a caller. Use it for any error
|
||||||
|
// that originates inside the server (5xx, or a not-found wrapping a store error).
|
||||||
|
func writeServerErr(w http.ResponseWriter, r *http.Request, code int, publicMsg string, err error) {
|
||||||
|
log.Printf("[handler] %s %s -> %d: %v", r.Method, r.URL.Path, code, err)
|
||||||
|
writeErr(w, code, publicMsg)
|
||||||
|
}
|
||||||
|
|
||||||
// canonicalSig returns the bytes to verify for a request: the request struct
|
// canonicalSig returns the bytes to verify for a request: the request struct
|
||||||
// re-marshaled with its Sig field cleared. The caller passes a copy with Sig
|
// re-marshaled with its Sig field cleared. The caller passes a copy with Sig
|
||||||
// already zeroed. This is symmetric with how the client signs.
|
// already zeroed. This is symmetric with how the client signs.
|
||||||
@@ -232,6 +361,24 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeErr(w, http.StatusBadRequest, "subject and owner.endpoint required")
|
writeErr(w, http.StatusBadRequest, "subject and owner.endpoint required")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Data-plane minimum defense (audit H4): on a public deployment cleartext
|
||||||
|
// rooms are disabled, so no message ever rides the un-ACL'd NATS subject in
|
||||||
|
// the clear for another registered peer to sniff.
|
||||||
|
if s.RequireEncryptedRooms && !req.Policy.Encrypt {
|
||||||
|
writeErr(w, http.StatusForbidden,
|
||||||
|
"cleartext rooms are disabled on this deployment; create an encrypted (Matrix-policy) room")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Owner binding (audit H6): the declared owner must BE the authenticated
|
||||||
|
// signer — both the endpoint id and the signing key. Otherwise a registered
|
||||||
|
// peer could create rooms in another identity's name. Enforced only when an
|
||||||
|
// authenticated signer is present (AuthOff/dev trusts the caller).
|
||||||
|
if signer, ok := signerEndpoint(r); ok {
|
||||||
|
if req.Owner.Endpoint != signer || frame.EndpointID(req.Owner.SignPub) != signer {
|
||||||
|
writeErr(w, http.StatusForbidden, "forbidden: room owner must be the authenticated signer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
roomID := newULID()
|
roomID := newULID()
|
||||||
info := RoomInfo{
|
info := RoomInfo{
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
@@ -242,7 +389,7 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
|||||||
OwnerEndpoint: req.Owner.Endpoint,
|
OwnerEndpoint: req.Owner.Endpoint,
|
||||||
}
|
}
|
||||||
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
|
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusCreated, createRoomResp{RoomID: roomID})
|
writeJSON(w, http.StatusCreated, createRoomResp{RoomID: roomID})
|
||||||
@@ -264,7 +411,7 @@ func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
info, err := s.store.GetRoom(roomID)
|
info, err := s.store.GetRoom(roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusNotFound, err.Error())
|
writeServerErr(w, r, http.StatusNotFound, "room not found", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m := Member{
|
m := Member{
|
||||||
@@ -274,7 +421,7 @@ func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) {
|
|||||||
KexPub: req.Member.KexPub,
|
KexPub: req.Member.KexPub,
|
||||||
}
|
}
|
||||||
if err := s.store.AddMember(roomID, m, info.Epoch, req.SealedKey); err != nil {
|
if err := s.store.AddMember(roomID, m, info.Epoch, req.SealedKey); err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, map[string]string{"status": "invited"})
|
writeJSON(w, http.StatusOK, map[string]string{"status": "invited"})
|
||||||
@@ -287,6 +434,20 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeErr(w, http.StatusBadRequest, "endpoint query param required")
|
writeErr(w, http.StatusBadRequest, "endpoint query param required")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// A sealed room key is sealed to one identity's X25519 key. Serving it only to
|
||||||
|
// that identity (the signer) stops a registered peer from harvesting another
|
||||||
|
// member's sealed key (audit H3). Membership is implied by owning a sealed key,
|
||||||
|
// but we also require the signer to be a member for defense in depth.
|
||||||
|
if signer, ok := signerEndpoint(r); ok {
|
||||||
|
if endpoint != signer {
|
||||||
|
writeErr(w, http.StatusForbidden, "forbidden: may only fetch your own sealed key")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err := s.store.GetMember(roomID, signer); err != nil {
|
||||||
|
writeErr(w, http.StatusForbidden, "forbidden: not a member of this room")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
epoch := 0
|
epoch := 0
|
||||||
if e := r.URL.Query().Get("epoch"); e != "" {
|
if e := r.URL.Query().Get("epoch"); e != "" {
|
||||||
if n, err := strconv.Atoi(e); err == nil {
|
if n, err := strconv.Atoi(e); err == nil {
|
||||||
@@ -300,7 +461,7 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
|
|||||||
"not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.")
|
"not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, keyResp{Epoch: ep, SealedKey: sealed})
|
writeJSON(w, http.StatusOK, keyResp{Epoch: ep, SealedKey: sealed})
|
||||||
@@ -308,9 +469,14 @@ func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) {
|
||||||
roomID := r.PathValue("id")
|
roomID := r.PathValue("id")
|
||||||
|
// Membership authorization (audit H3): the member list exposes every member's
|
||||||
|
// sign_pub + kex_pub, so it must not be served to a non-member.
|
||||||
|
if _, ok := s.requireMember(w, r, roomID); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
members, err := s.store.ListMembers(roomID)
|
members, err := s.store.ListMembers(roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeErr(w, http.StatusInternalServerError, "internal error")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
out := make([]memberJSON, 0, len(members))
|
out := make([]memberJSON, 0, len(members))
|
||||||
@@ -326,9 +492,15 @@ func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeErr(w, http.StatusBadRequest, "endpoint required")
|
writeErr(w, http.StatusBadRequest, "endpoint required")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// A peer may only enumerate its OWN room directory (audit H3): otherwise any
|
||||||
|
// registered identity could map another's entire social graph of rooms.
|
||||||
|
if signer, ok := signerEndpoint(r); ok && endpoint != signer {
|
||||||
|
writeErr(w, http.StatusForbidden, "forbidden: may only list your own rooms")
|
||||||
|
return
|
||||||
|
}
|
||||||
rooms, err := s.store.ListRoomsForEndpoint(endpoint)
|
rooms, err := s.store.ListRoomsForEndpoint(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeErr(w, http.StatusInternalServerError, "internal error")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
out := make([]memberRoomJSON, 0, len(rooms))
|
out := make([]memberRoomJSON, 0, len(rooms))
|
||||||
@@ -346,9 +518,12 @@ func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) {
|
||||||
roomID := r.PathValue("id")
|
roomID := r.PathValue("id")
|
||||||
|
if _, ok := s.requireMember(w, r, roomID); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
info, err := s.store.GetRoom(roomID)
|
info, err := s.store.GetRoom(roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusNotFound, err.Error())
|
writeErr(w, http.StatusNotFound, "room not found")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, roomResp{
|
writeJSON(w, http.StatusOK, roomResp{
|
||||||
@@ -378,7 +553,7 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Bump epoch, then store the fresh sealed keys for the remaining members,
|
// Bump epoch, then store the fresh sealed keys for the remaining members,
|
||||||
// then remove the kicked/left members.
|
// then remove the kicked/left members.
|
||||||
if err := s.store.BumpEpoch(roomID, req.NewEpoch); err != nil {
|
if err := s.store.BumpEpoch(roomID, req.NewEpoch); err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
keys := make(map[string][]byte, len(req.Keys))
|
keys := make(map[string][]byte, len(req.Keys))
|
||||||
@@ -387,13 +562,13 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
if len(keys) > 0 {
|
if len(keys) > 0 {
|
||||||
if err := s.store.PutSealedKeys(roomID, req.NewEpoch, keys); err != nil {
|
if err := s.store.PutSealedKeys(roomID, req.NewEpoch, keys); err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, ep := range req.Remove {
|
for _, ep := range req.Remove {
|
||||||
if err := s.store.RemoveMember(roomID, ep); err != nil {
|
if err := s.store.RemoveMember(roomID, ep); err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -401,14 +576,23 @@ func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// The body arrives already bounded: ServeHTTP wraps it in a MaxBytesReader
|
||||||
|
// (maxBlobBytes) and rejects an over-declared Content-Length before this
|
||||||
|
// handler runs, in every auth mode. Reading here therefore cannot buffer
|
||||||
|
// more than the ceiling; a sender that lies about its length (e.g. chunked)
|
||||||
|
// trips MaxBytesReader and we map that to 413 rather than a generic 400.
|
||||||
data, err := io.ReadAll(r.Body)
|
data, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
|
if isBodyTooLarge(err) {
|
||||||
|
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeErr(w, http.StatusBadRequest, "read body")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hash, err := s.blobs.Put(data)
|
hash, err := s.blobs.Put(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusInternalServerError, err.Error())
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, blobResp{Hash: hash})
|
writeJSON(w, http.StatusOK, blobResp{Hash: hash})
|
||||||
@@ -422,7 +606,7 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
data, err := s.blobs.Get(hash)
|
data, err := s.blobs.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr(w, http.StatusNotFound, err.Error())
|
writeServerErr(w, r, http.StatusNotFound, "not found", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
|
|||||||
Reference in New Issue
Block a user