28 Commits

Author SHA1 Message Date
agent bcd02716d5 docs(issues): encolar 0002 (media v2), 0003 (descentralización HA), 0004 (hardening seguridad)
Specs de los tres issues siguientes del bus, derivados de esta sesión:
- 0002 media v2: chunking, mimetype, GC del object store, exponer en clientes.
- 0003 descentralización/HA: cluster NATS magnus+homer (R1→R3), control plane
  SQLite→JetStream KV, quorum, failover. Tercer nodo = homer (141.94.69.66).
- 0004 hardening: cierra los hallazgos de la auditoría red-team (report 0004):
  DoS pre-auth, fail-open, autorización por pertenencia, ACL NATS, TLS control plane.
2026-06-07 14:04:33 +02:00
egutierrez 484a07d6fd Merge issue/0001e-migrate-clients: secure-by-default clients, bus-auth enforce
Phase 0001e of issue 0001. client.Connect(caPath) is the single seam every
peer uses: with the bundled ca.crt it connects with TLS + nkey and signs the
control plane (enforce); without it, legacy plaintext dev. worker/chat gain
--ca, the mobile NewSession gains caPath, membershipd gains --tls-cert/--tls-key
and turns on the nkey authenticator under enforce. dev/feature_flags.json
declares the target state (bus-auth enforce, bus-tls on); the gateway and
unibots migrations are documented as notes (dev/0001e-remaining-clients.md).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:49:42 +02:00
egutierrez 04e27518af test(client): secure bus end-to-end (auth + TLS + E2E together)
TestSecureBusEndToEnd boots the server with control-plane enforce, NATS nkey
auth, and TLS all on; two registered peers connect with nkey+TLS, A creates a
Matrix room, invites B, publishes, and B decrypts — proving the three layers
compose. This is the headline golden of issue 0001.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:49:19 +02:00
egutierrez 6b0916f1fa docs(0001e): note remaining gateway and unibots migration
The web gateway (playground) and unibots (in the agents repo) are not migrated
here: the gateway stays a local dev tool at AuthOff, and the bot transport
lives outside this sub-repo. dev/0001e-remaining-clients.md records exactly
what each needs (client.Connect with ca.crt, identity registered in the
allowlist) and the operator server flags for phase 0001f.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:49:19 +02:00
egutierrez 87dbc421cd chore(flags): flip bus-auth to enforce and bus-tls on (target state)
Declares the project's target rollout: bus-auth enforce, bus-tls enabled.
Flags are declarative; the operator activates them at deploy via membershipd
--bus-auth/--tls-cert/--tls-key. CLI defaults stay off so dev and tests run
unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:49:19 +02:00
egutierrez b647779521 feat(membershipd): enable NATS nkey auth (enforce) and TLS flags
Opens the store before NATS so the authenticator can consult IsAuthorized.
Under --bus-auth enforce the embedded NATS gets the nkey authenticator (only
allowlisted identities connect); --tls-cert/--tls-key make it present the
server certificate and require TLS. External NATS manages its own auth/TLS.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:49:19 +02:00
egutierrez 74c8d4f941 feat(client,cmd,mobile): connect securely via client.Connect(caPath)
client.Connect is the single migration seam: a non-empty caPath connects with
TLS pinned to the bus CA plus nkey auth (matching enforce + bus-tls), an empty
caPath keeps the legacy plaintext dev connection; control-plane requests are
signed either way. worker and chat gain a --ca flag; the gomobile NewSession
gains a caPath parameter so the Android app bundles ca.crt and connects
securely. Every peer now flows through one code path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:49:19 +02:00
egutierrez 2ccd11b68c Merge issue/0001d-tls: TLS on the NATS data plane (self-signed CA)
Phase 0001d of issue 0001. embeddednats grows a ServerConfig with an optional
TLS config; the client can pin the bus's self-signed CA via Options.TLS built
from busauth.LoadCATLSConfig. deploy/tls/generate-certs.sh mints the CA and a
server cert (SAN: public IP, WG IP, om, localhost) — only the public ca.crt is
versioned, private keys are gitignored. A client trusting the CA completes the
handshake; one without it fails. TLS stays off until phase 0001e wires it in.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:44:22 +02:00
egutierrez 75939a192c test: TLS data plane end to end + CA/keypair loaders
client/tls_test: mints a throwaway CA + server cert in-memory; a client
pinning the CA completes the handshake and operates (golden), a client
without the CA fails the handshake (error path). busauth/tls_test: golden
load of a CA PEM and a server keypair, plus error paths (missing file,
non-PEM). Harness body extracted to bootHarness(ctrlMode, natsAuth, natsTLS).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:44:13 +02:00
egutierrez 1b56f14c20 feat(deploy/tls): self-signed CA + server cert generator
generate-certs.sh mints the bus CA and a NATS server certificate whose SANs
cover the public IP (135.125.201.30), the WireGuard IP (10.42.0.1), the om
hostname, and localhost/127.0.0.1 for on-host smoke tests (all overridable via
env). Only the public ca.crt is committed; ca.key, server.key and server.crt
are gitignored and distributed out of band. README documents generation, use
and rotation.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:44:13 +02:00
egutierrez 2786ae2dde feat(busauth,client): pin the bus CA over TLS
busauth.LoadCATLSConfig turns a ca.crt path into a *tls.Config trusting only
that private CA (clients must pin it; the system roots would reject a
self-signed server cert). busauth.ServerTLSConfig loads the server keypair.
client.Options gains TLS; NewWithOptions calls nats.Secure when set, so the
data-plane connection is encrypted and the server pinned.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:44:13 +02:00
egutierrez 6d3d6d2562 refactor(embeddednats): ServerConfig with optional TLS
Collapses Start/StartHost/StartHostAuth onto StartServer(ServerConfig) so
auth and a TLS config can be set without growing the parameter list further.
When TLS is set the server presents the certificate and requires TLS on the
data plane; the wrappers preserve the existing no-auth/no-TLS behavior.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:44:13 +02:00
egutierrez 217daae472 Merge issue/0001c-nats-nkey: NATS nkey authentication
Phase 0001c of issue 0001. The data plane now authenticates with each peer's
Ed25519 identity reused as a NATS nkey: busauth converts the identity to an
nkey and back, embeddednats installs a CustomClientAuthentication that verifies
the nkey signature and checks the user allowlist on every connection (live
revocation, no restart), and the client opts into nkey via NewWithOptions. The
embedded server stays open by default so dev stacks and existing tests are
unaffected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:37:59 +02:00
egutierrez 00058ea0af test(client): NATS nkey auth end to end
Harness gains newHarnessFull(ctrlMode, natsAuth) wiring the nkey authenticator
to the user allowlist; NATS auth and HTTP auth are independent so each plane
is tested in isolation. TestNatsNkeyAuth: registered peer connects with nkey
and operates (golden); unregistered peer and no-nkey client refused at connect
(error paths); peer revoked at runtime refused on its next connection without
a restart (edge).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:37:59 +02:00
egutierrez 1630f6f163 feat(client): opt-in nkey NATS connection via NewWithOptions
nats.go refuses to connect with an nkey to a server that does not advertise
nkey auth, so the connection cannot blindly always present one. New keeps the
legacy plain connection; NewWithOptions(Options{UseNkey:true}) presents the
peer's identity-derived nkey. NewWithOptions is the single place the data-plane
connection is built, so every peer gets identical behavior from the same
Options (TLS fields arrive in phase 0001d).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:37:59 +02:00
egutierrez b09bafe242 feat(embeddednats): nkey CustomClientAuthentication against the allowlist
busauth.NewNkeyAuthenticator verifies a client's nkey signature over the
server nonce (decoding like nats-server: raw-url then std base64), maps the
nkey to its Ed25519 hex, and consults an injected IsAuthorized predicate.
Checking on every connection (rather than a static Options.Nkeys map) means
revoking a user denies its next connection with no restart. embeddednats
gains StartHostAuth(auth) and sets AlwaysEnableNonce so the server advertises
the nonce nkey clients need; Start/StartHost stay open (auth=nil) for dev.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:37:46 +02:00
egutierrez 413dd61041 feat(busauth): Ed25519<->NATS nkey conversion with round-trip test
A NATS nkey is an Ed25519 keypair, so the bus reuses each peer's signing
identity for the data plane instead of minting new key material. ClientNkey
derives the user nkey public string and a nonce-signing callback from the
peer's Ed25519 private key (its first 32 bytes are the nkey seed);
SignPubHexFromNkey maps a presented nkey back to the allowlist's hex key;
NkeyPublicFromSignPub is the public-only derivation.

This is NATS-specific transport glue kept in the app, not promoted to the
registry, to avoid pulling nats-io/nkeys into the multi-domain registry
module. The dedicated round-trip test runs first (spec requirement): it
proves the nkey signature equals the identity's raw Ed25519 signature and
that the nkey maps back to the identity's hex.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:37:46 +02:00
egutierrez 89e0d0e64a Merge issue/0001b-ctrl-auth: signed control-plane auth + anti-replay
Phase 0001b of issue 0001. The control plane (membershipd HTTP) now supports
the bus-auth rollout off->soft->enforce: clients sign every request with their
Ed25519 identity (headers over method/path/ts/nonce/sha256(body)); the server
verifies the signature, clock skew (+/-30s), nonce replay (60s TTL cache), and
the user allowlist. Revocation denies access on the next request without a
restart. Default stays off so master keeps working.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:32:06 +02:00
egutierrez 2130eaa44d test: control-plane auth middleware + end-to-end enforce
membership/auth_test: golden (signed+registered accepted), error paths
(unregistered 401, replayed nonce 401, clock skew 401, tampered body 401,
missing headers 401), exemptions (healthz, soft allows, off no-op).
client_test: end-to-end with the real client against an enforce server —
registered peer accepted, unregistered rejected, revoked peer denied without
a server restart.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:31:58 +02:00
egutierrez 567e604fc7 chore(playground): pass AuthOff to NewServer (gateway not yet migrated)
The local dev gateway has not adopted signed requests; tracked for phase
0001e. Keeps it working while the NewServer signature gains the auth mode.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:31:58 +02:00
egutierrez 0f8a38d62b feat(membershipd): --bus-auth flag selects control-plane auth mode
Maps off|soft|enforce to membership.AuthMode and wires it into NewServer.
Defaults to off so existing deployments are unaffected until the operator
opts into the rollout.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:31:58 +02:00
egutierrez e0ef3a27cc feat(client): sign every control-plane request (transport auth headers)
doJSON, putBlob and getBlob now go through newSignedRequest, which attaches
X-Unibus-Pub/Ts/Nonce/Sig signing membership.CanonicalRequest with the peer's
Ed25519 key. GETs are signed too so the server can authenticate the caller
uniformly under enforce. The payload-level owner signature (invite/rekey)
is unchanged and coexists with this transport-level signature.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:31:50 +02:00
egutierrez 3e39e23fe0 feat(membership): signed control-plane auth middleware + anti-replay
Adds the bus-auth rollout (off|soft|enforce) to the control plane. The
middleware verifies an Ed25519 request signature over CanonicalRequest
(method, request-URI, ts, nonce, sha256(body)), checks the timestamp is
within +/-30s, rejects replayed nonces via an in-memory TTL cache (60s), and
requires the signer to be an active user in the allowlist. soft logs
rejections but lets requests through so clients can migrate without an
outage; off is the legacy no-op default. /healthz is exempt so health probes
work before any identity exists. CanonicalRequest is exported as the single
source of truth shared with the client.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:31:50 +02:00
egutierrez e9711bf74b Merge issue/0001a-users: bus user allowlist (store + CLI + migration 002)
Phase 0001a of issue 0001 (bus auth + TLS). Adds the users table, store CRUD
(AddUser/GetUser/ListUsers/RevokeUser/IsAuthorized/HasAdmin), the local
'membershipd user' admin CLI for seeding the first admin, and the bus-auth /
bus-tls feature flags (both off). No behavior change yet: the allowlist is
not consulted until phase 0001b wires the control-plane middleware.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:23:29 +02:00
egutierrez 822982b71b test(membership): cover user store golden/edge/error paths
Golden: add -> get -> IsAuthorized true, admin seeded. Edge: empty role
defaults to member, case-insensitive hex lookup, list ordering, revoke
denies authorization and stamps revoked_at. Error: duplicate key
(ErrUserExists), invalid role, empty sign_pub, unknown user not authorized,
revoke of unknown/already-revoked. Plus users-table migration idempotency.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:23:23 +02:00
egutierrez ddc6cabc24 feat(flags): declare bus-auth and bus-tls feature flags (off)
bus-auth carries the off -> soft -> enforce rollout state; bus-tls is a
boolean. Both start disabled so master keeps compiling and passing tests
while the auth/TLS code lands behind them across phases 0001a-0001e.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:23:23 +02:00
egutierrez 0d7ab22d4a feat(membershipd): add 'user add/list/revoke' local admin CLI
Local administration surface for the user allowlist, dispatched before the
server flag set parses os.Args. It opens the SQLite store directly with no
network or auth: running on the bus host is trusted by design, which is how
the first admin is seeded (breaking the chicken-egg of needing an admin to
add an admin). Validates that sign-pub is a 32-byte Ed25519 key in hex and
tolerates the sign-pub positional appearing before or after --db.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:23:16 +02:00
egutierrez c5387028e0 feat(membership): add 002_users.sql migration and user CRUD store
Bus-level user allowlist (issue 0001a): the authoritative directory of
Ed25519 signing identities permitted to use the bus, independent of room
membership. Migration is additive and mirrored byte-for-byte between the
module-root migrations/ and the embedded pkg/membership/migrations/.

Store adds AddUser/GetUser/ListUsers/RevokeUser/IsAuthorized/HasAdmin.
IsAuthorized is the single fail-closed predicate both the control plane and
the NATS data plane will consult, so revocation is a status flip that denies
access on both without a restart.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:23:11 +02:00
31 changed files with 2612 additions and 78 deletions
+13 -12
View File
@@ -27,11 +27,12 @@ import (
func main() {
var (
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "NATS url")
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "membershipd control-plane url")
roomSub = flag.String("room", "proc.test.ticks", "room subject to subscribe to")
idFile = flag.String("id-file", "./local_files/chat.id", "identity file path")
demoEnc = flag.Bool("demo-encrypted", false, "run the encrypted forward-secrecy demo")
natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "NATS url")
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "membershipd control-plane url")
roomSub = flag.String("room", "proc.test.ticks", "room subject to subscribe to")
idFile = flag.String("id-file", "./local_files/chat.id", "identity file path")
demoEnc = flag.Bool("demo-encrypted", false, "run the encrypted forward-secrecy demo")
caFile = flag.String("ca", "", "path to the bus CA cert (ca.crt); set to connect with TLS + nkey to a secured bus")
)
flag.Parse()
@@ -39,19 +40,19 @@ func main() {
log.SetPrefix("[chat] ")
if *demoEnc {
runEncryptedDemo(*natsURL, *ctrlURL)
runEncryptedDemo(*natsURL, *ctrlURL, *caFile)
return
}
runSimple(*natsURL, *ctrlURL, *roomSub, *idFile)
runSimple(*natsURL, *ctrlURL, *roomSub, *idFile, *caFile)
}
// runSimple subscribes to a cleartext subject and prints messages live.
func runSimple(natsURL, ctrlURL, roomSub, idFile string) {
func runSimple(natsURL, ctrlURL, roomSub, idFile, caFile string) {
id, err := client.LoadOrCreateIdentity(idFile)
if err != nil {
log.Fatalf("identity: %v", err)
}
c, err := client.New(natsURL, ctrlURL, id)
c, err := client.Connect(natsURL, ctrlURL, id, caFile)
if err != nil {
log.Fatalf("connect: %v", err)
}
@@ -91,7 +92,7 @@ func shortID(id string) string {
}
// runEncryptedDemo proves E2E encryption + forward secrecy end-to-end.
func runEncryptedDemo(natsURL, ctrlURL string) {
func runEncryptedDemo(natsURL, ctrlURL, caFile string) {
log.Printf("=== encrypted forward-secrecy demo ===")
pass := true
check := func(name string, ok bool) {
@@ -109,10 +110,10 @@ func runEncryptedDemo(natsURL, ctrlURL string) {
idB, err := newEphemeralIdentity()
must(err, "generate B identity")
a, err := client.New(natsURL, ctrlURL, idA)
a, err := client.Connect(natsURL, ctrlURL, idA, caFile)
must(err, "connect A")
defer a.Close()
b, err := client.New(natsURL, ctrlURL, idB)
b, err := client.Connect(natsURL, ctrlURL, idB, caFile)
must(err, "connect B")
defer b.Close()
+63 -20
View File
@@ -17,11 +17,22 @@ import (
server "github.com/nats-io/nats-server/v2/server"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/membership"
)
func main() {
// Subcommand dispatch: `membershipd user ...` is the local administration CLI
// (seed/list/revoke bus users) and must be handled before the server flag set
// parses os.Args. Running the CLI on the bus host is trusted by design (whoever
// has a shell there already controls the service), which is how the first admin
// is seeded without a chicken-egg auth problem.
if len(os.Args) > 1 && os.Args[1] == "user" {
runUserCLI(os.Args[2:])
return
}
var (
bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers")
natsURL = flag.String("nats-url", "", "external NATS url; empty starts an embedded server")
@@ -30,31 +41,22 @@ func main() {
storeDir = flag.String("store-dir", "./local_files/blobs", "blob store directory")
natsPort = flag.Int("nats-port", 4250, "embedded NATS listen port (when --nats-url empty)")
natsStore = flag.String("nats-store", "./local_files/jetstream", "embedded JetStream store dir")
busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)")
tlsCert = flag.String("tls-cert", "", "PATH to the NATS server certificate (deploy/tls/server.crt); enables TLS on the embedded data plane")
tlsKey = flag.String("tls-key", "", "path to the NATS server private key (deploy/tls/server.key); required with --tls-cert")
)
flag.Parse()
authMode, err := membership.ParseAuthMode(*busAuth)
if err != nil {
log.Fatalf("%v", err)
}
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[membershipd] ")
// Data plane: embedded or external NATS.
var ns *server.Server
natsClientURL := *natsURL
if natsClientURL == "" {
var err error
// Bind the embedded NATS to the same interface as the HTTP API so a single
// --bind flag governs reachability: 127.0.0.1 keeps the whole stack
// loopback-only; 0.0.0.0 exposes both planes to the LAN.
ns, err = embeddednats.StartHost(*natsStore, *bind, *natsPort)
if err != nil {
log.Fatalf("start embedded nats: %v", err)
}
natsClientURL = embeddednats.ClientURL(ns)
log.Printf("embedded NATS (JetStream) ready: %s", natsClientURL)
} else {
log.Printf("using external NATS: %s", natsClientURL)
}
// Control plane: SQLite store + blob store + HTTP API.
// Control plane store first: the NATS authenticator consults IsAuthorized, so
// the store must exist before the embedded server starts.
store, err := membership.Open(*dbPath)
if err != nil {
log.Fatalf("open membership store: %v", err)
@@ -68,7 +70,48 @@ func main() {
}
log.Printf("blob store: %s", *storeDir)
srv := membership.NewServer(store, blobs)
// Data plane: embedded or external NATS. For the embedded server, enforce
// turns on the nkey authenticator (only allowlisted identities may connect)
// and --tls-cert/--tls-key turn on TLS. An external NATS manages its own
// auth/TLS, so those flags do not apply to it.
var ns *server.Server
natsClientURL := *natsURL
if natsClientURL == "" {
cfg := embeddednats.ServerConfig{
// Bind the embedded NATS to the same interface as the HTTP API so a
// single --bind flag governs reachability: 127.0.0.1 keeps the whole
// stack loopback-only; 0.0.0.0 exposes both planes to the LAN.
StoreDir: *natsStore,
Host: *bind,
Port: *natsPort,
}
if authMode == membership.AuthEnforce {
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
log.Printf("NATS nkey authentication: ON (enforce)")
}
if *tlsCert != "" || *tlsKey != "" {
if *tlsCert == "" || *tlsKey == "" {
log.Fatalf("--tls-cert and --tls-key must be set together")
}
tlsCfg, err := busauth.ServerTLSConfig(*tlsCert, *tlsKey)
if err != nil {
log.Fatalf("load NATS TLS: %v", err)
}
cfg.TLS = tlsCfg
log.Printf("NATS TLS: ON (%s)", *tlsCert)
}
ns, err = embeddednats.StartServer(cfg)
if err != nil {
log.Fatalf("start embedded nats: %v", err)
}
natsClientURL = embeddednats.ClientURL(ns)
log.Printf("embedded NATS (JetStream) ready: %s", natsClientURL)
} else {
log.Printf("using external NATS: %s", natsClientURL)
}
srv := membership.NewServer(store, blobs, authMode)
log.Printf("control-plane auth: %s", authMode)
addr := *bind + ":" + *httpPort
httpSrv := &http.Server{Addr: addr, Handler: srv}
+178
View File
@@ -0,0 +1,178 @@
package main
import (
"encoding/hex"
"flag"
"fmt"
"os"
"strings"
"text/tabwriter"
"github.com/enmanuel/unibus/pkg/membership"
)
// runUserCLI implements `membershipd user <add|list|revoke> ...`, the local
// administration surface for the bus user allowlist. It opens the SQLite store
// directly (no network, no auth): it is meant to run on the bus host, where
// shell access already implies full control. This is the seam that seeds the
// first admin, breaking the chicken-egg of "you need an admin to add an admin".
//
// The function never returns: it exits the process with a non-zero status on
// error so it composes cleanly in shell scripts and systemd ExecStartPre hooks.
func runUserCLI(args []string) {
if len(args) == 0 {
userUsage()
os.Exit(2)
}
sub, rest := args[0], args[1:]
switch sub {
case "add":
userAdd(rest)
case "list":
userList(rest)
case "revoke":
userRevoke(rest)
case "-h", "--help", "help":
userUsage()
os.Exit(0)
default:
fmt.Fprintf(os.Stderr, "membershipd user: unknown subcommand %q\n\n", sub)
userUsage()
os.Exit(2)
}
}
func userUsage() {
fmt.Fprint(os.Stderr, `usage: membershipd user <command> [flags]
commands:
add Register a bus user from their Ed25519 signing public key
list List all registered users
revoke Revoke a user (denies access on both planes immediately)
examples:
membershipd user add --handle alice --sign-pub <64-hex> --role admin
membershipd user list
membershipd user revoke <64-hex>
common flags:
--db <path> SQLite database path (default ./local_files/unibus.db)
`)
}
const defaultDBPath = "./local_files/unibus.db"
// openStore opens the membership store at path, exiting on failure. Migrations
// (including 002_users.sql) are applied by membership.Open, so a fresh database
// gets the users table on first use of the CLI.
func openStore(path string) *membership.Store {
store, err := membership.Open(path)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd user: open store %q: %v\n", path, err)
os.Exit(1)
}
return store
}
// validateSignPubHex ensures the key is exactly a 32-byte Ed25519 public key in
// hex (64 hex chars). Catching this here turns a silent "authorized nobody" into
// an explicit error at seed time.
func validateSignPubHex(signPub string) error {
b, err := hex.DecodeString(signPub)
if err != nil {
return fmt.Errorf("sign-pub is not valid hex: %w", err)
}
if len(b) != 32 {
return fmt.Errorf("sign-pub must be a 32-byte Ed25519 public key (64 hex chars), got %d bytes", len(b))
}
return nil
}
func userAdd(args []string) {
fs := flag.NewFlagSet("user add", flag.ExitOnError)
handle := fs.String("handle", "", "human-readable user name (required)")
signPub := fs.String("sign-pub", "", "Ed25519 signing public key in hex (required)")
role := fs.String("role", membership.RoleMember, "role: admin or member")
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
_ = fs.Parse(args)
if *handle == "" || *signPub == "" {
fmt.Fprintln(os.Stderr, "membershipd user add: --handle and --sign-pub are required")
os.Exit(2)
}
if err := validateSignPubHex(*signPub); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
os.Exit(2)
}
store := openStore(*dbPath)
defer store.Close()
if err := store.AddUser(*signPub, *handle, *role); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user add: %v\n", err)
os.Exit(1)
}
fmt.Printf("added user %q (%s) role=%s\n", *handle, *signPub, *role)
}
func userList(args []string) {
fs := flag.NewFlagSet("user list", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
_ = fs.Parse(args)
store := openStore(*dbPath)
defer store.Close()
users, err := store.ListUsers()
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd user list: %v\n", err)
os.Exit(1)
}
if len(users) == 0 {
fmt.Println("(no users)")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 2, 2, ' ', 0)
fmt.Fprintln(w, "HANDLE\tROLE\tSTATUS\tSIGN_PUB\tCREATED")
for _, u := range users {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", u.Handle, u.Role, u.Status, u.SignPub, u.CreatedAt)
}
_ = w.Flush()
}
func userRevoke(args []string) {
fs := flag.NewFlagSet("user revoke", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
// Go's flag package stops at the first non-flag argument, so `revoke <key>
// --db path` would otherwise leave --db unparsed. Pull a leading positional
// (the sign-pub) off the front before parsing so both `revoke <key> --db p`
// and `revoke --db p <key>` work for the operator.
var signPub string
if len(args) > 0 && !strings.HasPrefix(args[0], "-") {
signPub, args = args[0], args[1:]
}
_ = fs.Parse(args)
if signPub == "" {
if rest := fs.Args(); len(rest) == 1 {
signPub = rest[0]
}
}
if signPub == "" {
fmt.Fprintln(os.Stderr, "membershipd user revoke: exactly one <sign-pub> argument required")
os.Exit(2)
}
if err := validateSignPubHex(signPub); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
os.Exit(2)
}
store := openStore(*dbPath)
defer store.Close()
if err := store.RevokeUser(signPub); err != nil {
fmt.Fprintf(os.Stderr, "membershipd user revoke: %v\n", err)
os.Exit(1)
}
fmt.Printf("revoked user %s\n", signPub)
}
+2 -1
View File
@@ -23,6 +23,7 @@ func main() {
ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "membershipd control-plane url")
roomSub = flag.String("room", "proc.test.ticks", "room subject to publish to")
idFile = flag.String("id-file", "./local_files/worker.id", "identity file path")
caFile = flag.String("ca", "", "path to the bus CA cert (ca.crt); set to connect with TLS + nkey to a secured bus")
)
flag.Parse()
@@ -33,7 +34,7 @@ func main() {
if err != nil {
log.Fatalf("identity: %v", err)
}
c, err := client.New(*natsURL, *ctrlURL, id)
c, err := client.Connect(*natsURL, *ctrlURL, id, *caFile)
if err != nil {
log.Fatalf("connect: %v", err)
}
+6
View File
@@ -0,0 +1,6 @@
# Private keys and the deploy-specific server certificate never go to git.
# Only the public CA certificate (ca.crt) is versioned, because clients embed it.
*.key
*.csr
*.srl
server.crt
+56
View File
@@ -0,0 +1,56 @@
# Bus TLS — self-signed CA and server certificate
The unibus data plane (NATS) is encrypted with TLS using the project's own
self-signed CA. The bus is exposed publicly, protected by auth + TLS, so the CA
is private (not Let's Encrypt) and every client we control embeds the public
`ca.crt`; the server presents `server.crt`/`server.key`.
## Files
| File | Secret? | Goes where |
|---|---|---|
| `ca.crt` | no (public) | versioned in git; embedded/distributed to every client |
| `ca.key` | **yes** | stays on the machine that mints certs; gitignored |
| `server.crt` | no | deployed to the bus host; gitignored (deploy-specific SANs) |
| `server.key` | **yes** | deployed to the bus host over a secure channel; gitignored |
Only `ca.crt` is committed. `ca.key`, `server.key`, `server.crt`, and any
`*.csr`/`*.srl` are gitignored — see `.gitignore`.
## Generate
```bash
cd deploy/tls
./generate-certs.sh # CA (if missing) + server cert with default SANs
./generate-certs.sh --force # also regenerate the CA (invalidates pinned clients)
```
The server certificate's SANs cover the public IP, the WireGuard IP, the om
hostname, plus `localhost`/`127.0.0.1` for on-host smoke tests. Override the
defaults via environment variables:
```bash
UNIBUS_PUBLIC_IP=135.125.201.30 UNIBUS_WG_IP=10.42.0.1 UNIBUS_HOSTNAME=om ./generate-certs.sh
```
Verify the SANs:
```bash
openssl x509 -in server.crt -noout -text | grep -A1 'Subject Alternative Name'
```
## Use
- **Server** (`membershipd`, phase 0001e): point it at `server.crt`/`server.key`
so the embedded NATS presents the certificate and requires TLS. Built with
`busauth.ServerTLSConfig(certPath, keyPath)`.
- **Clients** (Go peers, mobile binding, gateway): pin `ca.crt` with
`busauth.LoadCATLSConfig(caPath)` and pass the result as `client.Options.TLS`.
## Rotation
The CA is long-lived (10 years). Rotate the server certificate (825 days) by
re-running `generate-certs.sh` (without `--force`) and redeploying
`server.crt`/`server.key`; clients are unaffected because they pin the CA, not
the server cert. Rotating the CA (`--force`) requires redistributing `ca.crt` to
every client.
+11
View File
@@ -0,0 +1,11 @@
-----BEGIN CERTIFICATE-----
MIIBfTCCASOgAwIBAgIUW2HZJDDlixxw/DgNP/IDIrJ7MeMwCgYIKoZIzj0EAwIw
FDESMBAGA1UEAwwJdW5pYnVzLWNhMB4XDTI2MDYwNzEwNDIyNloXDTM2MDYwNDEw
NDIyNlowFDESMBAGA1UEAwwJdW5pYnVzLWNhMFkwEwYHKoZIzj0CAQYIKoZIzj0D
AQcDQgAEe2by5l9dcEbqKB11yJtPIH9S/01XNhuFnBB/IpDevO2fWLLV+muqoB8C
ADH1wKleq8jF5D0sSlK2DCuYrjAjPqNTMFEwHQYDVR0OBBYEFABX+UI7bXICRF4l
WmmDR/rUtxnrMB8GA1UdIwQYMBaAFABX+UI7bXICRF4lWmmDR/rUtxnrMA8GA1Ud
EwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSAAwRQIgCAeOYTKvA6SBB8xMdMdqNrp1
20OPyi2BwFovW6vTCLMCIQC1qRi8SGRHTui8BVqIvp/DFJaZ/U8ocAg/qedLdy+R
/w==
-----END CERTIFICATE-----
+64
View File
@@ -0,0 +1,64 @@
#!/usr/bin/env bash
#
# generate-certs.sh — mint the unibus bus's self-signed CA and the NATS server
# certificate. Run once on a trusted machine; distribute ca.crt to clients and
# server.crt/server.key to the bus host (server.key by a secure channel, never
# git). Re-running regenerates the server cert; pass --force to also regenerate
# the CA (which invalidates every client that pinned the old ca.crt).
#
# SANs cover the public IP, the WireGuard IP, the om hostname, plus localhost so
# the operator can smoke-test the TLS handshake on the box. Override via env:
# UNIBUS_PUBLIC_IP (default 135.125.201.30)
# UNIBUS_WG_IP (default 10.42.0.1)
# UNIBUS_HOSTNAME (default om)
#
# Key material: EC P-256 (widely supported by Go's crypto/tls and nats-server).
set -euo pipefail
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$DIR"
PUBLIC_IP="${UNIBUS_PUBLIC_IP:-135.125.201.30}"
WG_IP="${UNIBUS_WG_IP:-10.42.0.1}"
HOSTNAME_OM="${UNIBUS_HOSTNAME:-om}"
DAYS_CA=3650
DAYS_SRV=825
force=0
[[ "${1:-}" == "--force" ]] && force=1
# --- CA (long-lived; only the cert is public) ---
if [[ ! -f ca.crt || ! -f ca.key || $force -eq 1 ]]; then
echo "==> generating CA"
openssl ecparam -name prime256v1 -genkey -noout -out ca.key
chmod 600 ca.key
openssl req -x509 -new -key ca.key -sha256 -days "$DAYS_CA" \
-subj "/CN=unibus-ca" -out ca.crt
else
echo "==> reusing existing CA (pass --force to regenerate)"
fi
# --- server certificate, signed by the CA, with the bus SANs ---
echo "==> generating server certificate (SAN: $PUBLIC_IP, $WG_IP, $HOSTNAME_OM, localhost, 127.0.0.1)"
openssl ecparam -name prime256v1 -genkey -noout -out server.key
chmod 600 server.key
openssl req -new -key server.key -subj "/CN=unibus-bus" -out server.csr
cat > server.ext <<EOF
subjectAltName=IP:${PUBLIC_IP},IP:${WG_IP},DNS:${HOSTNAME_OM},DNS:localhost,IP:127.0.0.1
extendedKeyUsage=serverAuth
keyUsage=digitalSignature,keyEncipherment
EOF
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
-sha256 -days "$DAYS_SRV" -extfile server.ext -out server.crt
rm -f server.csr server.ext ca.srl
echo "==> done:"
echo " ca.crt -> embed/distribute to every client (public)"
echo " server.crt -> deploy to the bus host"
echo " server.key -> deploy to the bus host over a secure channel (NEVER git)"
echo
echo "verify SANs with:"
echo " openssl x509 -in server.crt -noout -text | grep -A1 'Subject Alternative Name'"
+55
View File
@@ -0,0 +1,55 @@
# Issue 0001e — remaining client migrations (notes, NOT implemented)
Phase 0001e migrated the first-class Go clients and the mobile binding to the
secure connection path (`client.Connect(caPath)` → TLS + nkey; control-plane
requests are always signed). Two consumers are intentionally **left as notes**
because they live outside this sub-repo or need their own coordination:
## 1. Web gateway (`playground/server.go`)
The playground is a local dev gateway that embeds its own membershipd
(`membership.NewServer(..., AuthOff)`) and an open embedded NATS, and connects
browser sessions through an in-process client. To run it against a **secured**
bus it would need:
- Connect its internal client via `client.Connect(natsURL, ctrlURL, id, caPath)`
with the bundled `ca.crt` (it currently builds the client without options).
- If it should itself enforce auth on the browser-facing side, start its
embedded membershipd with an auth mode and its embedded NATS with
`embeddednats.StartServer(ServerConfig{Auth: ..., TLS: ...})` — but a local
dev gateway typically stays open and only the *upstream* bus is secured.
- The gateway's own bus identity must be registered in the upstream allowlist
(`membershipd user add`).
Decision: left at `AuthOff` + plaintext for now (local dev tool). Migrate when
the gateway is pointed at the public bus.
## 2. unibots (`shell/transportunibus`, in the agents repo — NOT this sub-repo)
The bot transport lives in the `agents_and_robots` / message_bus consumer, not
in `dataforge/unibus`. To talk to the secured bus it must, after recompiling
against this `pkg/client`:
- Switch its connect call to `client.Connect(natsURL, ctrlURL, id, caPath)`,
passing the path to the bundled `ca.crt`.
- Ship `ca.crt` alongside the bot binary (read-only) and point `caPath` at it.
- Register each bot's identity (`hex(SignPub)`) in the bus allowlist via
`membershipd user add --handle <bot> --sign-pub <hex>` on the bus host.
- Run as `systemd --user` with `caPath` set, per the deploy plan (0001f).
No code change is possible from this sub-repo; this is the contract the bot
transport consumes.
## Server enablement (operator, phase 0001f)
`membershipd` now accepts:
- `--bus-auth enforce` — verify signed control-plane requests AND turn on the
NATS nkey authenticator (only allowlisted identities connect).
- `--tls-cert deploy/tls/server.crt --tls-key deploy/tls/server.key` — present
the server certificate and require TLS on the embedded NATS.
`dev/feature_flags.json` now declares both `bus-auth: enforce` and
`bus-tls: enabled` as the project's target state. The flags are declarative;
the operator activates them at deploy time with the flags above. The CLI
defaults remain off so local dev and the test suite are unaffected.
+19
View File
@@ -0,0 +1,19 @@
{
"flags": {
"bus-auth": {
"enabled": true,
"state": "enforce",
"issue": "0001",
"description": "Signed control-plane auth + NATS nkey auth. Rollout: off -> soft (verify+log, allow) -> enforce (reject). 'enabled' mirrors state!=off. Server opts in via membershipd --bus-auth; clients via client.Connect(caPath).",
"added": "2026-06-07",
"enabled_at": "2026-06-07"
},
"bus-tls": {
"enabled": true,
"issue": "0001",
"description": "TLS on the NATS data plane using the project's self-signed CA (deploy/tls/). Server opts in via membershipd --tls-cert/--tls-key; clients pin ca.crt via client.Connect(caPath).",
"added": "2026-06-07",
"enabled_at": "2026-06-07"
}
}
}
+146
View File
@@ -0,0 +1,146 @@
---
issue: 0002
title: Media v2 — archivos grandes (chunking), metadata, GC del object store, exponer en clientes
status: spec
created: 2026-06-07
domain: media
scope: unibus (pkg/blobstore, pkg/frame, pkg/client, pkg/membership) + clientes (mobile binding, gateway web, unibots)
depends_on: 0001 (la auth firmada del control plane debe cubrir /blobs antes de exponer media)
---
# Objetivo
El envío de archivos (imágenes, audio, vídeo) ya funciona en v1, pero con límites
que lo hacen inviable para vídeo grande y poco usable para los clientes. Este issue
lleva la media a un estado de producción: archivos grandes por chunks, metadata de
tipo/nombre, recolección de basura del object store, y exposición en los frontends.
# Contexto — cómo funciona media v1 (hoy)
`PublishMedia(roomID, data []byte)` cifra el archivo **entero** con la clave de la
room (`SealAEAD`), lo sube **entero** al object store (`pkg/blobstore`,
content-addressed por hash) vía el control plane (`POST /blobs`), y publica por el
bus solo una referencia `frame.BlobRef{Hash, Nonce, Size}`. `FetchMedia` baja el
ciphertext por hash (`GET /blobs/{hash}`) y lo descifra. El binario nunca viaja por
NATS; el bus solo lleva la referencia. El object store guarda solo ciphertext (E2E
real). Es correcto y simple, pero:
| Limitación v1 | Consecuencia |
|---|---|
| Todo el archivo en RAM (cifra y sube de una vez) | imágenes/audio OK; vídeo grande (cientos MB/GB) revienta memoria |
| `BlobRef` solo lleva hash+nonce+size | el receptor no sabe mimetype/filename; no puede renderizar bien |
| Sin resumable | si falla la subida de un archivo grande, reempezar de cero |
| Object store sin GC | blobs content-addressed crecen indefinidamente, sin refcount ni TTL |
| `mobile/` solo expone `Publish` (texto) | no se puede enviar una foto desde el móvil |
| Gateway web sin endpoints de media | la SPA no sube/baja archivos |
Fuera de alcance de este issue (sería otro): **streaming en vivo** (videollamada,
audio en tiempo real) — eso no es modelo blob, requiere WebRTC señalizado por el bus.
# Diseño
## Pieza 1 — Chunking de archivos grandes
Partir el archivo en chunks de tamaño fijo (propuesta: 4 MB), cifrar **cada chunk**
de forma independiente con la clave de la room (nonce por chunk), y subir cada chunk
como un blob propio (content-addressed). La referencia pasa de un solo blob a un
manifiesto de chunks.
- `frame.BlobRef` evoluciona (de forma compatible) a soportar lista de chunks:
```
BlobRef{
Hash string // hash del manifiesto (o del blob único si no hay chunks)
Nonce []byte // nonce del manifiesto / del blob único
Size int64 // tamaño total en claro
Chunks []ChunkRef // vacío en archivos pequeños (camino v1 intacto)
}
ChunkRef{ Hash string; Nonce []byte; Size int64 } // por chunk cifrado
```
- `PublishMediaStream(roomID string, r io.Reader, meta MediaMeta) (BlobRef, error)`:
lee del `io.Reader` en chunks (no carga el archivo entero en RAM), cifra y sube
cada chunk, y construye el manifiesto. El `PublishMedia([]byte)` v1 se mantiene
como atajo para archivos pequeños (sin chunks).
- `FetchMediaStream(roomID, BlobRef) (io.ReadCloser, error)`: baja y descifra chunks
bajo demanda, exponiendo un `io.Reader` (descarga progresiva, no todo en RAM).
- Subida/descarga de chunks en paralelo acotado (p. ej. 4 a la vez) para throughput.
## Pieza 2 — Metadata (mimetype + filename)
Añadir a `BlobRef` (o a un sidecar cifrado) los campos `Mime string` y `Name
string`, de modo que el receptor sepa renderizar (imagen inline, reproductor de
audio/vídeo, icono de descarga). Como `Name`/`Mime` pueden ser sensibles, viajan
**dentro del campo cifrado** del frame, no en claro. Detección de mimetype por
sniffing del primer chunk + extensión.
## Pieza 3 — Garbage collection del object store
Hoy los blobs no se borran nunca. Introducir refcount o barrido:
- **Refcount por referencia**: una tabla `blob_refs(hash, room_id, msg_id)` en el
control plane; al expirar un mensaje de una room efímera o al purgar historial de
una room persistente, decrementar y borrar el blob cuando llega a cero.
- **Alternativa TTL**: blobs de rooms efímeras con TTL; blobs de rooms persistentes
viven mientras viva el mensaje en JetStream.
- Comando `membershipd blobs gc [--dry-run]` para barrido manual + métrica de
espacio. Debe ser idempotente y seguro (nunca borrar un blob aún referenciado).
## Pieza 4 — Exponer media en los clientes
- **Binding móvil** (`mobile/unibus.go`): `SendFile(roomID, path, mime)` y
`FetchFile(roomID, frameJSON) -> path` (escribe a un archivo local del sandbox de
la app y devuelve la ruta; no pasa []byte grandes por el puente gomobile).
- **Gateway web** (`playground/server.go`): `POST /api/media` (multipart, streaming
al store) y `GET /api/media/{room}/{hash}` (descarga descifrada con los headers
`Content-Type`/`Content-Disposition` derivados de la metadata).
- **unibots**: una tool `send_file` para que un bot pueda adjuntar archivos.
# Decisiones técnicas
| Decisión | Elegido | Alternativa | Razón |
|---|---|---|---|
| Tamaño de chunk | 4 MB | 1 MB / 16 MB | equilibrio RAM vs overhead de manifiesto |
| Cifrado por chunk | nonce independiente por chunk, misma clave de room | re-cifrar todo | permite descarga/borrado parcial y paralelismo |
| Metadata sensible | dentro del frame cifrado | en claro en BlobRef | filename/mime pueden filtrar info |
| GC | refcount en control plane | solo TTL | preciso, no borra lo aún referenciado |
| Compatibilidad v1 | `Chunks` vacío = camino v1 | romper formato | no romper media ya enviada |
# Fases (TBD, ramas `issue/0002x-*`)
1. **0002a — BlobRef con chunks (compatible)** — extender el tipo + tests de
marshalling con `Chunks` vacío (v1) y con chunks (v2). Sin cambiar clientes aún.
2. **0002b — PublishMediaStream / FetchMediaStream** — API de streaming en
`pkg/client` sobre `io.Reader`/`io.ReadCloser`, cifrado por chunk, subida/descarga
paralela acotada. Tests con un archivo > tamaño de chunk.
3. **0002c — metadata mime+name** (en el campo cifrado) + sniffing.
4. **0002d — GC del object store** — refcount + `membershipd blobs gc` + tests de
"no borrar referenciado / borrar huérfano".
5. **0002e — exponer en clientes** — binding móvil (`SendFile`/`FetchFile`), gateway
web (`/api/media`), tool `send_file` en unibots.
# Definition of Done (evidencia ejecutable)
- **Golden:** enviar y recibir una imagen pequeña (camino v1, sin chunks) sigue
funcionando; enviar y recibir un archivo de 50 MB por chunks sin cargar 50 MB en
RAM (medir RSS durante la operación).
- **Edge:** archivo cuyo tamaño es múltiplo exacto del chunk; archivo de 1 byte;
archivo justo por debajo y por encima del umbral de chunking.
- **Error path:** chunk corrupto/no descifrable → error claro, no panic; `blobs gc`
con un blob aún referenciado → NO lo borra (assert).
- `CGO_ENABLED=0 go test ./...` verde.
# Riesgos y mitigaciones
| Riesgo | Mitigación |
|---|---|
| Romper media v1 ya enviada | `Chunks` vacío preserva el camino v1; tests de compatibilidad |
| GC borra un blob aún referenciado | refcount + barrido conservador + `--dry-run` por defecto en CI |
| Puente gomobile con []byte grandes | el binding trabaja con rutas de archivo, no buffers en memoria |
| Paralelismo de chunks satura el control plane | límite de concurrencia (4) + el endurecimiento de auth del issue 0001 |
# Relación con otros issues
- **0001 (seguridad)** — prerequisito: la auth firmada del control plane debe cubrir
`POST/GET /blobs` antes de exponer media públicamente; si no, cualquiera llena el
store o descarga ciphertext ajeno.
- **Streaming en vivo** (futuro, no este issue) — videollamada/audio en tiempo real =
WebRTC con el bus como canal de señalización; modelo distinto al blob.
+195
View File
@@ -0,0 +1,195 @@
---
issue: 0003
title: Descentralización / alta disponibilidad — cluster NATS + JetStream replicado + control plane sin SPOF
status: spec
created: 2026-06-07
domain: infra
scope: unibus (pkg/embeddednats, pkg/membership, pkg/blobstore, pkg/client, cmd/membershipd) + despliegue multi-nodo
depends_on: 0001 (la auth de cluster y de clientes va junto con el endurecimiento)
---
# Objetivo
Que la caída de un servidor **no deje el bus sin servicio**. Hoy unibus es un único
`membershipd` (con NATS embebido + SQLite local): si ese host muere, no hay bus.
Este issue lleva unibus a un modelo **descentralizado / alta disponibilidad** usando
las capacidades nativas de NATS: cluster multi-nodo, JetStream replicado (RAFT), y
el estado del control plane fuera de la SQLite local. **No es federación**
(multi-operador con dominios distintos); es eliminar el punto único de fallo dentro
de un único dominio administrativo controlado por nosotros.
# Requisito clave de quorum (decisión de infraestructura)
JetStream replica con RAFT, que necesita **mayoría (quorum)** para confirmar
escrituras. Las consecuencias son duras y hay que asumirlas desde el diseño:
| Nodos | Réplica | Tolera caída de | Nota |
|---|---|---|---|
| 1 | R1 | 0 | situación actual (SPOF) |
| 2 | R2 | **0** | si cae uno se pierde quorum: las escrituras se bloquean. NO sirve para HA |
| **3** | **R3** | **1** | mínimo real para "si un server cae, seguimos" |
| 5 | R5 | 2 | mayor tolerancia |
**Por tanto el objetivo del usuario ("si mi server falla, no nos quedamos sin
servicio") exige 3 nodos JetStream.** Servers disponibles hoy: **magnus** y
**homer** (ambos VPS OVH). El tercero está pendiente de conseguir.
| Nodo | IP pública | Estado | Notas |
|---|---|---|---|
| magnus | (en pass: `MAGNUS_ovh_ssh_ROOT`) | disponible, **cargado** | corre coolify, minio, postgres, authentik, portainer, dagu — revisar recursos antes |
| homer | `141.94.69.66` | disponible, vivo | creds en pass (`vps_ovhcloud_SSH_SERVER_HOMER_-_root`, `vps_SSH_SERVER_HOMER_dataherrero`); tenía coolify |
| nodo 3 | — | **pendiente** | conseguir un tercer VPS siempre-on, o reusar om/datardos si se liberan |
Preparación previa al deploy de cada nodo: alta del alias SSH + clave, integración en
la WireGuard, y revisar/aligerar la carga existente (coolify, etc.).
## Rollout R1 → R3: funcionar con 2 nodos hoy, HA con 3 mañana
No se "desactiva el quorum"; se controla el **número de réplicas** de cada stream/KV:
| Réplicas | Quorum | Tolera | Sirve con |
|---|---|---|---|
| R1 | ninguno (1 copia) | 0 caídas | 1-2 nodos, sin bloqueo |
| R3 | 2 de 3 | 1 caída | 3 nodos |
- **Fase actual (magnus + homer):** desplegar con streams/KV en **R1** (flag
`decentralized: off`). El bus funciona al 100% para operar, sin tolerancia a fallo
todavía. Opción: streams en **R2** para duplicar los datos en ambos nodos
(durabilidad/backup vivo), asumiendo que la escritura necesita los dos hasta el 3er
nodo.
- **Cuando entre el nodo 3:** escalar en caliente `nats stream update --replicas 3`
(idem KV/Object Store) + añadir el nodo al cluster + flag `decentralized: on`. **HA
real, sin downtime, sin reescritura, sin migrar datos.**
- **Aviso de 2 nodos:** NO montar el meta-group de JetStream con 2 nodos como si
fuera HA — su quorum es 2, y la caída de uno bloquea la gestión de streams. Con 2
servers, modelo recomendado: **magnus principal (R1) + homer 2º nodo/réplica**, y
escalar a R3 al tener el tercero.
Mientras solo haya 2 nodos: el **data plane efímero** (core-NATS, rooms `ModeNATS`)
sí tolera la caída de uno (los clientes reconectan al otro), pero las **rooms
persistentes y el control plane** (que necesitan quorum) no. El issue se despliega
de verdad cuando haya 3 nodos.
# Contexto — por qué hoy es un SPOF
- `pkg/embeddednats` arranca un NATS **standalone** (sin cluster).
- `pkg/membership` guarda rooms/members/room_keys/users en una **SQLite local** al
proceso.
- `pkg/blobstore` guarda los blobs en el **disco local** del proceso.
- El cliente (`pkg/client`) conecta a **una** URL de NATS y **una** de control plane.
Todo vive en un host. Ese host es el punto único de fallo.
# Diseño
## Pieza 1 — Cluster NATS (data plane replicado)
`pkg/embeddednats` gana opciones de cluster: `server.Options.Cluster` (nombre +
host/puerto de routes) y `Routes` (los otros nodos). Cada `membershipd` arranca su
NATS embebido en cluster con los demás. JetStream se habilita con `Replicas: 3` en
streams y KV. Auth entre nodos (routes) con credenciales propias (no las de
clientes), y TLS también en las routes (reusa la CA del issue 0001).
## Pieza 2 — Control plane sin estado local (SQLite → JetStream KV)
Es el corazón del issue. Hoy `pkg/membership.Store` es SQLite. Se introduce, por
**branch-by-abstraction**, una interfaz `Store` con dos implementaciones:
- `sqliteStore` — la actual (sigue siendo el default mientras el flag está off; útil
para un solo nodo / desarrollo).
- `jetstreamStore` — nueva: rooms, members, room_keys y users (la tabla del issue
0001) viven en **JetStream KV** (buckets replicados R3). Cualquier nodo lee/escribe
el mismo estado; RAFT garantiza consistencia. El HTTP control plane pasa a ser
efectivamente **stateless**: cualquier `membershipd` sirve cualquier request
porque el estado está en el KV replicado.
Flag `decentralized` (off → on). Migración inicial de datos SQLite → KV con un
comando `membershipd migrate-to-kv` (idempotente). Las claves de room siguen
selladas igual; solo cambia **dónde se guardan**, no el cifrado.
## Pieza 3 — Blobs replicados (object store → NATS Object Store)
`pkg/blobstore` gana una implementación sobre **NATS Object Store** (encima de
JetStream, replicado R3) además de la de disco local. Los blobs (ya ciphertext, E2E)
quedan disponibles desde cualquier nodo. Encaja con el GC del issue 0002.
## Pieza 4 — Cliente con failover
`pkg/client`: aceptar **lista** de seeds de NATS y **lista** de URLs de control
plane. `nats.go` ya hace reconnect/failover entre servidores del cluster nativamente
(`nats.Servers([...])`, `nats.MaxReconnects(-1)`). El control plane HTTP se prueba en
orden con reintento. Así, si un nodo cae, el cliente reconecta a otro de forma
transparente.
## Pieza 5 — Despliegue multi-nodo
3 nodos `membershipd`, cada uno con su NATS embebido en cluster, JetStream R3, mismo
`ca.crt`/credenciales de routes. systemd en cada VPS. Los clientes reciben la lista
de los 3 endpoints. Health/observabilidad por nodo (`/healthz` + métricas de
JetStream: líder RAFT, lag de réplica).
# Decisiones técnicas
| Decisión | Elegido | Alternativa | Razón |
|---|---|---|---|
| Nº de nodos de quorum | 3 (R3) | 2 (R2) | 2 no tolera caída de uno; 3 es el mínimo real de HA |
| Estado del control plane | JetStream KV replicado | SQLite replicada a mano / Postgres externo | KV ya viene con NATS, mismo RAFT que JetStream, cero infra extra |
| Migración del store | branch-by-abstraction (interfaz `Store`, dos impls, flag) | reescritura directa | master nunca se rompe; sqlite sigue para 1 nodo/dev |
| Blobs | NATS Object Store | disco compartido / S3 | replicado nativamente, sin dependencia externa |
| Failover de cliente | lista de seeds + reconnect nativo nats.go | balanceador externo | menos infra, nats.go ya lo hace |
| Federación multi-operador | **fuera de alcance** | — | no es el objetivo; es otra liga (trust entre dominios) |
# Fases (TBD, ramas `issue/0003x-*`)
1. **0003a — cluster NATS** — opciones de cluster/routes + TLS de routes en
`pkg/embeddednats`; arrancar 2-3 nodos locales en tests e2e y verificar que un
subject publicado en uno llega a un suscriptor en otro.
2. **0003b — interfaz Store + jetstreamStore (KV)** — abstraer `pkg/membership.Store`;
implementar rooms/members/room_keys/users sobre JetStream KV R3; tests de
consistencia. Flag `decentralized: off`.
3. **0003c — migrate-to-kv** — comando idempotente SQLite → KV + test de paridad
(mismo estado antes/después).
4. **0003d — blobs en Object Store** — impl `pkg/blobstore` sobre NATS Object Store
replicado.
5. **0003e — cliente failover** — lista de seeds + lista de ctrl-urls + reconnect;
test que mata el nodo al que está conectado y verifica que sigue operando.
6. **0003f — despliegue 3 nodos** (humano) — 3 VPS en cluster, JetStream R3, flag
`decentralized: on`. Chaos test real: matar un nodo en producción y comprobar que
el servicio sigue.
# Definition of Done (evidencia ejecutable)
- **Golden:** 3 nodos en cluster; un cliente publica en un nodo y otro cliente
suscrito a otro nodo lo recibe; crear room + invitar funciona desde cualquier nodo.
- **Edge:** un cliente conectado al nodo A; se **mata el nodo A**; el cliente
reconecta a B automáticamente y sigue publicando/recibiendo sin perder la sesión.
- **Error path (chaos):** matar 1 de 3 nodos → el control plane sigue aceptando
escrituras (quorum 2/3); matar 2 de 3 → las escrituras se bloquean (quorum perdido,
comportamiento esperado y documentado, no corrupción).
- `CGO_ENABLED=0 go test ./...` verde, incluido un test e2e multi-nodo en proceso.
# Riesgos y mitigaciones
| Riesgo | Mitigación |
|---|---|
| Solo 2 nodos disponibles → sin quorum real | prerequisito explícito de 3 nodos antes de 0003f; hasta entonces, despliegue queda en standalone |
| Latencia inter-VPS afecta RAFT | nodos en la misma región o con buena red; medir; R3 tolera latencias moderadas |
| Migración SQLite→KV pierde datos | comando idempotente + test de paridad + backup de la SQLite antes |
| Partición de red (split-brain) | RAFT lo previene: el lado sin quorum se bloquea para escritura, no diverge |
| Complejidad operativa de 3 nodos | observabilidad de JetStream (líder, lag) + `/healthz` por nodo + runbook en deploy/ |
# Orden recomendado respecto a otros issues
1. **0001 (seguridad)** primero: la auth de clientes (nkey) y la CA/TLS se reutilizan
para las routes del cluster. Desplegar descentralizado sin auth sería abrir varios
puntos públicos sin protección.
2. **0003 (este)** después: una vez el bus es seguro, replicarlo en 3 nodos.
3. **0002 (media v2)** es ortogonal; su object store encaja con la pieza 3 (blobs
replicados) cuando ambos estén.
# Fuera de alcance
- Federación entre operadores/dominios distintos (otra liga; requiere protocolo de
trust entre dominios).
- Multi-tenant / accounts de NATS por organización.
- Auto-escalado dinámico de nodos.
+144
View File
@@ -0,0 +1,144 @@
---
issue: 0004
title: Hardening de seguridad — autorización, anti-DoS y confidencialidad antes de exponer público
status: spec
created: 2026-06-07
domain: security
scope: unibus (pkg/membership/server.go, auth.go, pkg/embeddednats, pkg/client, cmd/membershipd, deploy/tls)
depends_on: 0001 (cierra los gaps que la auditoría 0004 encontró sobre lo entregado en 0001)
blocks: 0001f (deploy público) y 0003f (deploy descentralizado)
source: projects/message_bus/reports/0004-2026-06-07-unibus-security-audit.md
---
# Objetivo
La auditoría red-team (report 0004) concluyó: la **autenticación** del bus es sólida,
pero faltan **autorización, disponibilidad y confidencialidad de metadata** — justo lo
que un bus *público* necesita. Veredicto: **NO exponer público hoy**. Este issue cierra
los hallazgos bloqueantes (1 crítico + 4 altos) y los medios relevantes, de modo que el
deploy 0001f (público) y luego 0003 (descentralizado) sean seguros.
Cada fase corresponde a un hallazgo del report 0004. La **DoD de cada fase es portar el
test adversarial del auditor** (`TestAudit_*`) y verificar que ahora arroja el resultado
SEGURO (lo que antes pasaba el ataque, ahora lo rechaza).
# Fases (TBD, ramas `issue/0004x-*`, una por hallazgo)
## 0004a — H1 (Crítico): límite de cuerpo + anti-DoS pre-auth
**Problema:** `Server.ServeHTTP` hace `io.ReadAll(r.Body)` **sin límite y antes** de
`authenticate()`; `handlePutBlob` repite el `io.ReadAll` sin límite. 400 MB sin
credenciales → 898 MB RSS → OOM con pocas conexiones.
**Fix:**
- `http.MaxBytesReader` en el middleware **antes** del `io.ReadAll` (límite control plane,
p.ej. 1 MB).
- Límite separado y mayor para `/blobs`, con rechazo temprano por `Content-Length` antes
de bufferizar; idealmente stream a disco en vez de RAM.
- `Server.MaxHeaderBytes` ajustado.
- Rate-limit por IP (y por identidad tras auth). Reusar/crear una función del registry si
aplica (delegar a `fn-constructor` si es genérica).
**DoD:** test que envía un cuerpo > límite sin firma → `413`/`401` **sin** que el RSS se
dispare (medir `/proc/self/status` antes/después, delta acotado). Golden (cuerpo normal
pasa) + edge (justo en el límite) + error (excede → rechazo barato).
## 0004b — H2 (Alto): cerrar el fail-open de configuración
**Problema:** default `--bus-auth off`; el nkey de NATS solo se activa en `enforce`; TLS
es flag independiente. `--bind 0.0.0.0 --tls-cert …` **sin** `--bus-auth enforce` deja el
bus abierto con apariencia de seguro.
**Fix:**
- Si `--bind` no es loopback ⇒ exigir `--bus-auth enforce` (si no, `log.Fatal` con mensaje
claro).
- `--tls-cert`/`--tls-key` sin `--bus-auth enforce` ⇒ error de arranque.
- Arranque inseguro imposible o, como mínimo, ruidoso y rechazado.
**DoD:** portar `TestAudit_FailOpenTLSWithoutAuth` → ahora el arranque público-sin-enforce
falla; cliente no registrado NO conecta. Golden (bind loopback dev sigue permitido) + error
(bind público sin enforce aborta).
## 0004c — H3 (Alto): autorización por pertenencia en el control plane
**Problema:** "autorizado" = "registrado", no "miembro". Los GET de room no comprueban
pertenencia: `/rooms/{id}`, `/rooms/{id}/members` (expone `sign_pub`+`kex_pub` de todos),
`/members/{endpoint}/rooms`, y `/rooms/{id}/key?endpoint=X` (devuelve la `sealed_key` ajena).
**Fix:**
- Cada handler de room consulta `members` y exige que el firmante (`X-Unibus-Pub`
endpoint) sea miembro.
- `/rooms/{id}/key` solo sirve la clave sellada **para el propio firmante** (`endpoint ==
signer`), nunca de un tercero.
- `/members/{endpoint}/rooms` solo si `endpoint == signer`.
- No exponer la member-list completa a no-miembros.
**DoD:** portar `TestAudit_HorizontalMetadataLeak` → bob (no miembro) ahora recibe `403`
en todos. Golden (miembro legítimo accede) + edge (owner accede) + error (no-miembro 403).
## 0004d — H4 (Alto): control de acceso en el data plane NATS
**Problema:** el authenticator nkey solo decide "registrado sí/no"; no hay permisos por
subject. Cualquier registrado se suscribe/publica en cualquier subject; las rooms
`ModeNATS` (cleartext) quedan expuestas entre usuarios.
**Fix (elegir y documentar la estrategia):**
- Preferente: NATS `Permissions` por identidad (subjects que el usuario puede sub/pub),
derivadas de su pertenencia a rooms; o
- Subjects impredecibles (no derivables del nombre) + verificación de pertenencia
server-side; o
- Prohibir `ModeNATS` en despliegue público (forzar siempre E2E) como mínimo defensivo.
**DoD:** portar `TestAudit_NoSubjectACL` → eve (no invitada) ya NO recibe el mensaje de la
room ajena. Documentar la estrategia elegida y su límite.
## 0004e — H5 (Alto, público): TLS en el control plane
**Problema:** HTTP `:8470` firmado pero **sin TLS** → metadata (subjects, endpoints,
pubkeys, sealed keys, hashes de blobs, grafo social) legible por un MITM en la red pública.
**Fix:**
- Servir el control plane sobre TLS con la misma CA propia (o documentar un reverse-proxy
TLS delante).
- El cliente exige `https` cuando se le pasa una CA (`client.Connect(caPath)` ⇒ control
plane también TLS).
**DoD:** cliente contra control plane `https` con la CA → OK; contra `http` con CA esperada
→ rechaza; un observador no ve la metadata (argumentado + test de esquema).
## 0004f — medios: owner binding, nonce-cache, error leak
- **H6** `handleCreateRoom`: exigir `Owner.Endpoint == frame.EndpointID(X-Unibus-Pub)` y
`Owner.SignPub == pub`. (Portar `TestAudit_OwnerSpoof` → ahora 403.)
- **H7** mover `IsAuthorized` **antes** de tocar el `nonceCache` (no cachear nonces de
no-autorizados); poda por expiry-bucket/heap en vez de O(n) bajo mutex global; cap de
tamaño. (Portar `TestAudit_NonceCachePoisonPreAuth`.) **Nota:** este fix es prerequisito
del cambio a nonce-cache replicado del issue 0003.
- **H12** mensajes de error genéricos al cliente; detalle solo al log (no filtrar rutas/SQL).
# Fuera de alcance de este issue (encolado en otros)
- **H9** (cuota/GC de blobs) → issue 0002 (media v2) ya lo cubre.
- **H10** (AEAD nonce 12B → XChaCha o rekey por volumen) → bajo, futuro; abrir issue propio
si se necesitan rooms de muy alto volumen.
- **H11** (firma de owner sin nonce/ts) → cubierto en la práctica por el envelope `enforce`;
documentar la dependencia. Reforzar si se relaja `enforce`.
- **H8** (custodia de la CA: generar en om, `ca.key` fuera del PC) → tarea operacional del
deploy 0001f/0003f, no de código.
- **govulncheck** sobre nats-server/nats.go/modernc → paso de CI aparte.
# Definition of Done global
- Las cuatro pruebas adversariales bloqueantes del report 0004 (DoS acotado, fail-open
cerrado, fuga horizontal 403, ACL data plane) portadas como tests de regresión y en verde.
- `CGO_ENABLED=0 go build ./...` + `go vet ./...` + `go test ./...` verdes.
- Re-evaluación: tras el hardening, el veredicto de exposición pública pasa de "NO" a
"sí-con-condiciones operacionales" (CA custodiada, Restart=always). Anotar en un report
nuevo o como addendum al 0004.
# Orden respecto a otros issues
1. **0004 (este)** — primero: hace el bus seguro para exponer.
2. **0003 (descentralización)** — después: absorbe el nonce-cache→KV replicado (apoyado en
0004f-H7), la auth de routes del cluster y el guard de fail-open ×N nodos.
3. **0002 (media v2)** — ortogonal; incluye la cuota/GC de blobs (H9).
+22
View File
@@ -0,0 +1,22 @@
-- 002_users.sql — bus-level user directory (issue 0001a).
--
-- The authoritative allowlist of identities permitted to use the bus, independent
-- of room membership. A user is identified by its Ed25519 signing public key (the
-- same key that derives the endpoint via frame.EndpointID); roles gate admin-only
-- control-plane operations; status enables revocation without deleting history.
--
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
-- further schema changes go in new numbered migrations (see
-- .claude/rules/db_migrations.md). The embedded copy under
-- pkg/membership/migrations/002_users.sql mirrors this file byte-for-byte.
CREATE TABLE IF NOT EXISTS users (
sign_pub TEXT PRIMARY KEY, -- Ed25519 public key in lowercase hex (peer identity)
handle TEXT NOT NULL, -- human-readable name (unique recommended, not enforced as PK)
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member'
status TEXT NOT NULL DEFAULT 'active', -- 'active' | 'revoked'
created_at TEXT NOT NULL,
revoked_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_users_status ON users(status);
+8 -4
View File
@@ -44,14 +44,18 @@ func GenerateIdentity(path string) error {
}
// NewSession loads the identity at idPath and connects to the bus. natsURL is
// the data plane (for example nats://host:4250) and ctrlURL is the control
// plane HTTP endpoint (for example http://host:8470).
func NewSession(idPath, natsURL, ctrlURL string) (*Session, error) {
// the data plane (for example tls://host:4250) and ctrlURL is the control plane
// HTTP endpoint (for example http://host:8470). caPath is the path to the bus
// CA certificate (ca.crt) bundled with the app: when set, the session connects
// securely (TLS pinned to that CA + nkey authentication on the data plane),
// matching a bus running with auth + TLS. Pass an empty caPath to connect in
// plaintext to an unsecured (dev) bus.
func NewSession(idPath, natsURL, ctrlURL, caPath string) (*Session, error) {
id, err := client.LoadOrCreateIdentity(idPath)
if err != nil {
return nil, err
}
c, err := client.New(natsURL, ctrlURL, id)
c, err := client.Connect(natsURL, ctrlURL, id, caPath)
if err != nil {
return nil, err
}
+57
View File
@@ -0,0 +1,57 @@
package busauth
import (
"encoding/base64"
server "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nkeys"
)
// nkeyAuthenticator is a NATS server.Authentication that authorizes a client by
// verifying the nkey signature over the server-presented nonce and then
// consulting the bus user allowlist. Authorization is checked on every new
// connection via the injected predicate (not a static Options.Nkeys map), so
// revoking a user denies its next connection without restarting the server.
type nkeyAuthenticator struct {
// isAuthorized reports whether the lowercase-hex Ed25519 public key behind an
// nkey belongs to an active bus user. Injected (membership.Store.IsAuthorized)
// so this package stays free of the store dependency.
isAuthorized func(signPubHex string) bool
}
// NewNkeyAuthenticator builds a NATS custom authenticator backed by isAuthorized.
// Pass it to embeddednats so the data plane only accepts registered identities.
func NewNkeyAuthenticator(isAuthorized func(signPubHex string) bool) server.Authentication {
return &nkeyAuthenticator{isAuthorized: isAuthorized}
}
// Check verifies the client's nkey signature against the nonce the server
// presented, then maps the nkey to its allowlist key and checks authorization.
// Any malformed input or failed verification yields false (fail closed). The
// signature decoding mirrors nats-server's own (raw-url base64, then std base64
// fallback) so genuine clients using nats.Nkey are accepted unchanged.
func (a *nkeyAuthenticator) Check(c server.ClientAuthentication) bool {
opts := c.GetOpts()
if opts.Nkey == "" {
return false
}
sig, err := base64.RawURLEncoding.DecodeString(opts.Sig)
if err != nil {
sig, err = base64.StdEncoding.DecodeString(opts.Sig)
if err != nil {
return false
}
}
pub, err := nkeys.FromPublicKey(opts.Nkey)
if err != nil {
return false
}
if err := pub.Verify(c.GetNonce(), sig); err != nil {
return false
}
signPubHex, err := SignPubHexFromNkey(opts.Nkey)
if err != nil {
return false
}
return a.isAuthorized(signPubHex)
}
+76
View File
@@ -0,0 +1,76 @@
// Package busauth bridges a unibus peer's Ed25519 identity to NATS nkey
// authentication. A NATS nkey IS an Ed25519 keypair, so the bus reuses the
// peer's existing signing identity for the data plane instead of minting new
// key material — one identity authenticates both planes (HTTP request signatures
// and NATS connections), keyed in the user allowlist by the same Ed25519 public
// key.
//
// This is transport glue specific to NATS + unibus, not a general-purpose
// registry primitive: it deliberately lives in the app to avoid pulling
// github.com/nats-io/nkeys into the multi-domain registry module. The Ed25519
// signing/verification it relies on comes from the registry cybersecurity
// package; this package never reimplements a primitive.
package busauth
import (
"crypto/ed25519"
"encoding/hex"
"fmt"
"github.com/nats-io/nkeys"
)
// ClientNkey derives, from a peer's Ed25519 private key, the NATS user nkey
// public string ("U...") and a signature callback suitable for
// nats.Nkey(pub, sign). The callback signs the server-presented nonce with the
// same Ed25519 key, so the server can verify it and map it back to the bus user.
//
// signPriv must be a 64-byte Ed25519 private key (as produced by the registry's
// GenerateIdentity). Its first 32 bytes are the seed nkeys needs.
func ClientNkey(signPriv []byte) (pub string, sign func([]byte) ([]byte, error), err error) {
if len(signPriv) != ed25519.PrivateKeySize {
return "", nil, fmt.Errorf("busauth: signPriv must be %d bytes, got %d", ed25519.PrivateKeySize, len(signPriv))
}
seed := ed25519.PrivateKey(signPriv).Seed() // 32-byte Ed25519 seed
kp, err := nkeys.FromRawSeed(nkeys.PrefixByteUser, seed)
if err != nil {
return "", nil, fmt.Errorf("busauth: derive nkey from seed: %w", err)
}
pub, err = kp.PublicKey()
if err != nil {
return "", nil, fmt.Errorf("busauth: nkey public key: %w", err)
}
sign = func(nonce []byte) ([]byte, error) {
return kp.Sign(nonce)
}
return pub, sign, nil
}
// NkeyPublicFromSignPub derives the NATS user nkey public string from a 32-byte
// Ed25519 public key. It is the inverse view of the identity used by callers
// that have only the public key (e.g. to display or pre-register an nkey).
func NkeyPublicFromSignPub(signPub []byte) (string, error) {
if len(signPub) != ed25519.PublicKeySize {
return "", fmt.Errorf("busauth: signPub must be %d bytes, got %d", ed25519.PublicKeySize, len(signPub))
}
pub, err := nkeys.Encode(nkeys.PrefixByteUser, signPub)
if err != nil {
return "", fmt.Errorf("busauth: encode nkey public: %w", err)
}
return string(pub), nil
}
// SignPubHexFromNkey decodes a NATS user nkey public string ("U...") back to the
// lowercase hex of its 32-byte Ed25519 public key — the identity key used to
// look a peer up in the bus user allowlist. The server calls this to map the
// nkey a client presented to the users table.
func SignPubHexFromNkey(nkeyPub string) (string, error) {
raw, err := nkeys.Decode(nkeys.PrefixByteUser, []byte(nkeyPub))
if err != nil {
return "", fmt.Errorf("busauth: decode nkey %q: %w", nkeyPub, err)
}
if len(raw) != ed25519.PublicKeySize {
return "", fmt.Errorf("busauth: decoded nkey is %d bytes, want %d", len(raw), ed25519.PublicKeySize)
}
return hex.EncodeToString(raw), nil
}
+85
View File
@@ -0,0 +1,85 @@
package busauth
import (
"bytes"
"crypto/ed25519"
"encoding/hex"
"testing"
cs "fn-registry/functions/cybersecurity"
"github.com/nats-io/nkeys"
)
// TestNkeyRoundTrip is the dedicated sign/verify round-trip the spec requires
// BEFORE the NATS server depends on this conversion. It proves three things end
// to end: (1) ClientNkey produces a signature callback whose output verifies
// under the derived nkey public key; (2) that signature is exactly the Ed25519
// signature of the same identity (the nkey is the same key, not a new one);
// (3) the nkey public string maps back to the identity's Ed25519 hex, which is
// the key the allowlist is indexed by.
func TestNkeyRoundTrip(t *testing.T) {
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
pub, sign, err := ClientNkey(id.SignPriv)
if err != nil {
t.Fatalf("ClientNkey: %v", err)
}
// (1) The callback's signature over a server-style nonce verifies under the
// public nkey, exactly as the NATS server will verify it.
nonce := []byte("server-presented-nonce-1234567890")
sig, err := sign(nonce)
if err != nil {
t.Fatalf("sign: %v", err)
}
kpPub, err := nkeys.FromPublicKey(pub)
if err != nil {
t.Fatalf("FromPublicKey: %v", err)
}
if err := kpPub.Verify(nonce, sig); err != nil {
t.Fatalf("nkey verify failed: %v", err)
}
// (2) The signature is the very same bytes as a raw Ed25519 sign with the
// identity's private key — confirming no separate key material was minted.
want := ed25519.Sign(ed25519.PrivateKey(id.SignPriv), nonce)
if !bytes.Equal(sig, want) {
t.Fatalf("nkey signature differs from Ed25519 signature of the same identity")
}
// (3) The nkey public maps back to the identity's Ed25519 hex (allowlist key).
gotHex, err := SignPubHexFromNkey(pub)
if err != nil {
t.Fatalf("SignPubHexFromNkey: %v", err)
}
if gotHex != hex.EncodeToString(id.SignPub) {
t.Fatalf("nkey->hex mismatch: got %s want %s", gotHex, hex.EncodeToString(id.SignPub))
}
// And NkeyPublicFromSignPub is consistent with ClientNkey's public.
pub2, err := NkeyPublicFromSignPub(id.SignPub)
if err != nil {
t.Fatalf("NkeyPublicFromSignPub: %v", err)
}
if pub2 != pub {
t.Fatalf("public nkey mismatch between derivations: %s vs %s", pub2, pub)
}
}
// Error path: a wrong-length private key is rejected, not silently misused.
func TestClientNkeyBadKey(t *testing.T) {
if _, _, err := ClientNkey([]byte("too-short")); err == nil {
t.Fatalf("expected error for short private key")
}
}
// Error path: a non-nkey string does not decode to an allowlist key.
func TestSignPubHexFromNkeyBad(t *testing.T) {
if _, err := SignPubHexFromNkey("not-a-real-nkey"); err == nil {
t.Fatalf("expected error decoding a bogus nkey")
}
}
+37
View File
@@ -0,0 +1,37 @@
package busauth
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
)
// LoadCATLSConfig builds a *tls.Config that trusts ONLY the given CA certificate
// (PEM file), for a bus client pinning the project's self-signed CA. Because the
// bus uses a private CA rather than a public one, clients must pin it explicitly;
// trusting the system roots would reject the server cert. This is the single
// helper every client (Go peers, the mobile binding, the gateway) uses to turn a
// ca.crt path into a connection config.
func LoadCATLSConfig(caPEMPath string) (*tls.Config, error) {
pem, err := os.ReadFile(caPEMPath)
if err != nil {
return nil, fmt.Errorf("busauth: read CA %q: %w", caPEMPath, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(pem) {
return nil, fmt.Errorf("busauth: CA %q contains no valid PEM certificate", caPEMPath)
}
return &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}, nil
}
// ServerTLSConfig loads the bus NATS server's certificate and private key (PEM
// files) into a *tls.Config to present to clients. The private key never leaves
// the host; only the CA cert travels to clients.
func ServerTLSConfig(certPEMPath, keyPEMPath string) (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(certPEMPath, keyPEMPath)
if err != nil {
return nil, fmt.Errorf("busauth: load server keypair: %w", err)
}
return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}, nil
}
+95
View File
@@ -0,0 +1,95 @@
package busauth
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"os"
"path/filepath"
"testing"
"time"
)
// writeSelfSigned writes a self-signed cert + key PEM pair to dir and returns
// their paths. It is enough to exercise both LoadCATLSConfig (reads the cert as
// a CA) and ServerTLSConfig (reads the cert+key as a server keypair).
func writeSelfSigned(t *testing.T, dir string) (certPath, keyPath string) {
t.Helper()
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatalf("key: %v", err)
}
tmpl := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "unibus-tls-test"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(time.Hour),
IsCA: true,
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature,
BasicConstraintsValid: true,
}
der, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &key.PublicKey, key)
if err != nil {
t.Fatalf("cert: %v", err)
}
certPath = filepath.Join(dir, "cert.pem")
keyPath = filepath.Join(dir, "key.pem")
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: der})
if err := os.WriteFile(certPath, certPEM, 0o644); err != nil {
t.Fatalf("write cert: %v", err)
}
keyDER, err := x509.MarshalECPrivateKey(key)
if err != nil {
t.Fatalf("marshal key: %v", err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyDER})
if err := os.WriteFile(keyPath, keyPEM, 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
return certPath, keyPath
}
// Golden: a valid CA PEM loads into a config with a non-empty RootCAs pool, and
// a valid keypair loads into a config presenting one certificate.
func TestLoadTLSConfigsGolden(t *testing.T) {
dir := t.TempDir()
certPath, keyPath := writeSelfSigned(t, dir)
caCfg, err := LoadCATLSConfig(certPath)
if err != nil {
t.Fatalf("LoadCATLSConfig: %v", err)
}
if caCfg.RootCAs == nil {
t.Fatalf("expected a populated RootCAs pool")
}
srvCfg, err := ServerTLSConfig(certPath, keyPath)
if err != nil {
t.Fatalf("ServerTLSConfig: %v", err)
}
if len(srvCfg.Certificates) != 1 {
t.Fatalf("expected exactly one server certificate, got %d", len(srvCfg.Certificates))
}
}
// Error path: missing file, and a file that is not valid PEM.
func TestLoadTLSConfigsErrors(t *testing.T) {
if _, err := LoadCATLSConfig("/no/such/ca.crt"); err == nil {
t.Fatalf("expected error for missing CA file")
}
dir := t.TempDir()
junk := filepath.Join(dir, "junk.crt")
if err := os.WriteFile(junk, []byte("not a pem"), 0o644); err != nil {
t.Fatalf("write junk: %v", err)
}
if _, err := LoadCATLSConfig(junk); err == nil {
t.Fatalf("expected error for non-PEM CA file")
}
if _, err := ServerTLSConfig("/no/such/server.crt", "/no/such/server.key"); err == nil {
t.Fatalf("expected error for missing server keypair")
}
}
+112 -11
View File
@@ -16,16 +16,22 @@ import (
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
@@ -52,10 +58,62 @@ type Client struct {
signCache map[string][]byte // sender endpoint -> sign pub (for verification)
}
// New connects to NATS and records the control-plane URL. The identity holds
// the peer's long-term keypairs.
// Options configures how a client connects to the bus. The zero value is the
// legacy behavior: a plain NATS connection with no nkey and no TLS — what dev
// stacks and a not-yet-secured server expect. Secured deployments set these.
type Options struct {
// UseNkey authenticates the NATS connection with the peer's Ed25519 identity
// reused as a NATS nkey. It MUST match the server: nats.go refuses to connect
// with an nkey to a server that does not advertise nkey auth ("nkeys not
// supported by the server"), so this is opt-in rather than always-on.
UseNkey bool
// TLS, when non-nil, secures the NATS connection and pins the server to this
// config's RootCAs (the bus's self-signed CA). Build it with
// busauth.LoadCATLSConfig(caPath). Nil keeps the connection plaintext.
TLS *tls.Config
}
// New connects to NATS and records the control-plane URL with default Options
// (no nkey, no TLS). The identity holds the peer's long-term keypairs.
func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) {
nc, err := nats.Connect(natsURL, nats.Name("unibus-client"))
return NewWithOptions(natsURL, ctrlURL, id, Options{})
}
// Connect is the single migration seam every peer (worker, chat, mobile,
// gateway) uses to pick its security posture from one input: the CA path. With
// a non-empty caPath it connects securely — TLS pinned to that CA plus nkey
// authentication on the data plane — matching a bus running with bus-auth
// enforce + bus-tls. With an empty caPath it falls back to the legacy plaintext,
// no-nkey connection for local dev against an unsecured bus. The control-plane
// HTTP requests are signed in both cases (that signing is unconditional).
func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, error) {
if caPath == "" {
return New(natsURL, ctrlURL, id)
}
tlsCfg, err := busauth.LoadCATLSConfig(caPath)
if err != nil {
return nil, fmt.Errorf("client: load CA %q: %w", caPath, err)
}
return NewWithOptions(natsURL, ctrlURL, id, Options{UseNkey: true, TLS: tlsCfg})
}
// NewWithOptions is New with explicit connection options (nkey auth, and, from
// phase 0001d, TLS). It is the single place the data-plane connection is built,
// so every peer (worker, chat, mobile, gateway) gets identical behavior by
// passing the same Options.
func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Client, error) {
natsOpts := []nats.Option{nats.Name("unibus-client")}
if opts.UseNkey {
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
if err != nil {
return nil, fmt.Errorf("client: derive nkey: %w", err)
}
natsOpts = append(natsOpts, nats.Nkey(nkeyPub, nkeySign))
}
if opts.TLS != nil {
natsOpts = append(natsOpts, nats.Secure(opts.TLS))
}
nc, err := nats.Connect(natsURL, natsOpts...)
if err != nil {
return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err)
}
@@ -116,17 +174,17 @@ func (c *Client) getCachedKey(roomID string, epoch int) ([]byte, bool) {
// ---- control-plane HTTP helpers ------------------------------------------
func (c *Client) doJSON(method, path string, body, out any) error {
var rdr io.Reader
var bodyBytes []byte
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("client: marshal request: %w", err)
}
rdr = bytes.NewReader(b)
bodyBytes = b
}
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
req, err := c.newSignedRequest(method, path, bodyBytes)
if err != nil {
return fmt.Errorf("client: new request: %w", err)
return err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
@@ -158,12 +216,51 @@ func (c *Client) doJSON(method, path string, body, out any) error {
// signRequest signs the canonical bytes of req (req must already have its Sig
// field cleared) with the client's Ed25519 key. It is symmetric with the
// server's verifyOwnerSig.
// server's verifyOwnerSig. This is the PAYLOAD-level owner signature that
// authorizes room operations (invite/rekey) by ownership — distinct from the
// transport-level request signature applied by newSignedRequest below, which
// authenticates the caller's identity on every request.
func (c *Client) signRequest(req any) []byte {
b, _ := json.Marshal(req)
return cs.SignEd25519(c.id.SignPriv, b)
}
// newSignedRequest builds an *http.Request to the control plane and attaches the
// transport authentication headers (X-Unibus-Pub/Ts/Nonce/Sig) signing the
// canonical request bytes with this peer's Ed25519 key. path is the request URI
// (path plus any query); body is the raw request body (nil for GET). The server
// (membership.authenticate) verifies these headers under the bus-auth flag.
//
// Signing happens on every request — including GETs — so that under enforce the
// server can authenticate the caller and reject unregistered or revoked
// identities uniformly. The canonical construction is the single source of truth
// in membership.CanonicalRequest, shared by both sides.
func (c *Client) newSignedRequest(method, path string, body []byte) (*http.Request, error) {
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
if err != nil {
return nil, fmt.Errorf("client: new request: %w", err)
}
ts := strconv.FormatInt(time.Now().Unix(), 10)
nonceRaw := make([]byte, 16)
if _, err := rand.Read(nonceRaw); err != nil {
return nil, fmt.Errorf("client: generate nonce: %w", err)
}
nonce := base64.StdEncoding.EncodeToString(nonceRaw)
canonical := membership.CanonicalRequest(method, path, ts, nonce, body)
sig := cs.SignEd25519(c.id.SignPriv, canonical)
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(c.id.SignPub))
req.Header.Set("X-Unibus-Ts", ts)
req.Header.Set("X-Unibus-Nonce", nonce)
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
return req, nil
}
// ---- mirror of server wire types (control plane) -------------------------
type policyJSON struct {
@@ -769,9 +866,9 @@ func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) {
}
func (c *Client) putBlob(ciphertext []byte) (string, error) {
req, err := http.NewRequest("POST", c.ctrlURL+"/blobs", bytes.NewReader(ciphertext))
req, err := c.newSignedRequest("POST", "/blobs", ciphertext)
if err != nil {
return "", fmt.Errorf("client: new blob request: %w", err)
return "", err
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := c.http.Do(req)
@@ -791,7 +888,11 @@ func (c *Client) putBlob(ciphertext []byte) (string, error) {
}
func (c *Client) getBlob(hash string) ([]byte, error) {
resp, err := c.http.Get(c.ctrlURL + "/blobs/" + hash)
req, err := c.newSignedRequest("GET", "/blobs/"+hash, nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("client: get blob: %w", err)
}
+144 -9
View File
@@ -1,10 +1,13 @@
package client_test
import (
"crypto/tls"
"encoding/hex"
"net"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@@ -12,6 +15,7 @@ import (
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
@@ -27,6 +31,7 @@ type testHarness struct {
ctrlURL string
ns *server.Server
httpts *httptest.Server
store *membership.Store
}
func freePort(t *testing.T) int {
@@ -39,29 +44,61 @@ func freePort(t *testing.T) int {
return l.Addr().(*net.TCPAddr).Port
}
func newHarness(t *testing.T) *testHarness {
func newHarness(t *testing.T) *testHarness { return newHarnessFull(t, membership.AuthOff, false) }
// newHarnessMode is newHarness with an explicit control-plane auth mode and the
// NATS data plane left open (no nkey auth), so HTTP-auth tests can use a plain
// client.New that does not present an nkey.
func newHarnessMode(t *testing.T, mode membership.AuthMode) *testHarness {
return newHarnessFull(t, mode, false)
}
// newHarnessFull boots the embedded NATS (optionally with the nkey authenticator
// backed by the user allowlist) and the membershipd HTTP server in ctrlMode.
// natsAuth and ctrlMode are independent on purpose: an HTTP-enforce test can
// keep NATS open, and an nkey test can keep HTTP off, mirroring how the rollout
// flags compose. The store is created before NATS so the authenticator can
// consult IsAuthorized for live revocation.
func newHarnessFull(t *testing.T, ctrlMode membership.AuthMode, natsAuth bool) *testHarness {
return bootHarness(t, ctrlMode, natsAuth, nil)
}
// bootHarness is the shared body: a store, an embedded NATS (optionally with the
// nkey authenticator and/or TLS), and the membershipd HTTP server in ctrlMode.
func bootHarness(t *testing.T, ctrlMode membership.AuthMode, natsAuth bool, natsTLS *tls.Config) *testHarness {
t.Helper()
dir := t.TempDir()
ns, err := embeddednats.Start(filepath.Join(dir, "js"), freePort(t))
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("membership store: %v", err)
}
cfg := embeddednats.ServerConfig{
StoreDir: filepath.Join(dir, "js"),
Host: "127.0.0.1",
Port: freePort(t),
TLS: natsTLS,
}
if natsAuth {
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
}
ns, err := embeddednats.StartServer(cfg)
if err != nil {
store.Close()
t.Fatalf("embedded nats: %v", err)
}
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
if err != nil {
ns.Shutdown()
t.Fatalf("membership store: %v", err)
}
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
ns.Shutdown()
store.Close()
t.Fatalf("blob store: %v", err)
}
srv := membership.NewServer(store, blobs)
srv := membership.NewServer(store, blobs, ctrlMode)
httpts := httptest.NewServer(srv)
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts}
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store}
t.Cleanup(func() {
httpts.Close()
store.Close()
@@ -71,6 +108,15 @@ func newHarness(t *testing.T) *testHarness {
return h
}
// registerClient adds a peer's signing identity to the bus allowlist so its
// signed control-plane requests pass under enforce.
func registerClient(t *testing.T, h *testHarness, c *client.Client, handle, role string) {
t.Helper()
if err := h.store.AddUser(hex.EncodeToString(c.Endpoint().SignPub), handle, role); err != nil {
t.Fatalf("register %s: %v", handle, err)
}
}
func waitHealth(t *testing.T, ctrlURL string) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
@@ -455,6 +501,95 @@ func TestListMyRoomsDiscovery(t *testing.T) {
}
}
// TestControlPlaneAuthEnforceE2E closes the loop end to end with the production
// client against a server in enforce mode: a registered peer's signed requests
// are accepted (golden), and an unregistered peer is rejected with 401 on its
// first control-plane call (error path). This proves the client's real
// signature construction matches the server's verification.
func TestControlPlaneAuthEnforceE2E(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
registerClient(t, h, a, "alice", membership.RoleAdmin)
// Golden: registered peer's signed request is accepted.
if _, err := a.CreateRoom("room.enforced", room.ModeNATS); err != nil {
t.Fatalf("registered peer should create a room under enforce: %v", err)
}
// Error path: an unregistered peer is rejected on its first control-plane call.
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
_, err = b.CreateRoom("room.denied", room.ModeNATS)
if err == nil {
t.Fatalf("unregistered peer must be rejected under enforce")
}
if !strings.Contains(err.Error(), "401") && !strings.Contains(strings.ToLower(err.Error()), "unauthorized") {
t.Fatalf("expected a 401/unauthorized error, got %v", err)
}
// Revocation takes effect without restart: revoke A, its next request fails.
if err := h.store.RevokeUser(hex.EncodeToString(a.Endpoint().SignPub)); err != nil {
t.Fatalf("revoke A: %v", err)
}
if _, err := a.CreateRoom("room.after-revoke", room.ModeNATS); err == nil {
t.Fatalf("revoked peer must be rejected without a server restart")
}
}
// TestNatsNkeyAuth exercises the data-plane authenticator: with NATS nkey auth
// on, a registered peer connecting with its nkey is accepted and can publish
// (golden); an unregistered peer is refused at connect time (error path); and a
// peer revoked while the server runs is refused on its NEXT connection, proving
// revocation without a restart (edge).
func TestNatsNkeyAuth(t *testing.T) {
h := newHarnessFull(t, membership.AuthOff, true) // NATS auth on; HTTP off to isolate the data plane
waitHealth(t, h.ctrlURL)
idA := mustIdentity(t)
if err := h.store.AddUser(hex.EncodeToString(idA.SignPub), "alice", membership.RoleMember); err != nil {
t.Fatalf("register A: %v", err)
}
// Golden: registered peer connects with its nkey and uses the bus.
a, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idA, client.Options{UseNkey: true})
if err != nil {
t.Fatalf("registered peer should connect with nkey: %v", err)
}
defer a.Close()
if _, err := a.CreateRoom("room.nkey", room.ModeNATS); err != nil {
t.Fatalf("registered peer should operate: %v", err)
}
// Error path: an unregistered identity is refused at connect time.
idB := mustIdentity(t)
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idB, client.Options{UseNkey: true}); err == nil {
t.Fatalf("unregistered peer must be refused by the NATS authenticator")
}
// Error path: presenting no nkey to an auth-required server is refused.
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idB, client.Options{UseNkey: false}); err == nil {
t.Fatalf("a client without an nkey must be refused when the server requires auth")
}
// Edge: revoke A while the server runs; A's NEXT connection is refused even
// though an already-open connection (a) is unaffected. No server restart.
if err := h.store.RevokeUser(hex.EncodeToString(idA.SignPub)); err != nil {
t.Fatalf("revoke A: %v", err)
}
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, idA, client.Options{UseNkey: true}); err == nil {
t.Fatalf("revoked peer must be refused on a new connection without a restart")
}
}
// ---- test helpers ---------------------------------------------------------
type collector struct {
+185
View File
@@ -0,0 +1,185 @@
package client_test
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/hex"
"encoding/pem"
"math/big"
"net"
"sync"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
)
// genTestCA mints a throwaway self-signed CA plus a server certificate (SAN
// 127.0.0.1 / localhost) signed by it, mirroring deploy/tls/generate-certs.sh
// without shelling out to openssl. It returns the server's *tls.Config (cert it
// presents) and the CA pool a client must trust to complete the handshake.
func genTestCA(t *testing.T) (server *tls.Config, caPool *x509.CertPool) {
t.Helper()
// --- CA ---
caKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatalf("ca key: %v", err)
}
caTmpl := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "unibus-test-ca"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(24 * time.Hour),
IsCA: true,
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature,
BasicConstraintsValid: true,
}
caDER, err := x509.CreateCertificate(rand.Reader, caTmpl, caTmpl, &caKey.PublicKey, caKey)
if err != nil {
t.Fatalf("ca cert: %v", err)
}
caCert, err := x509.ParseCertificate(caDER)
if err != nil {
t.Fatalf("parse ca: %v", err)
}
// --- server cert signed by the CA ---
srvKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatalf("server key: %v", err)
}
srvTmpl := &x509.Certificate{
SerialNumber: big.NewInt(2),
Subject: pkix.Name{CommonName: "unibus-test-server"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(24 * time.Hour),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
DNSNames: []string{"localhost"},
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)},
}
srvDER, err := x509.CreateCertificate(rand.Reader, srvTmpl, caCert, &srvKey.PublicKey, caKey)
if err != nil {
t.Fatalf("server cert: %v", err)
}
srvCertPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srvDER})
srvKeyDER, err := x509.MarshalECPrivateKey(srvKey)
if err != nil {
t.Fatalf("marshal server key: %v", err)
}
srvKeyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: srvKeyDER})
srvPair, err := tls.X509KeyPair(srvCertPEM, srvKeyPEM)
if err != nil {
t.Fatalf("server keypair: %v", err)
}
pool := x509.NewCertPool()
pool.AddCert(caCert)
return &tls.Config{Certificates: []tls.Certificate{srvPair}, MinVersion: tls.VersionTLS12}, pool
}
// TestNatsTLS validates the TLS data plane: a client trusting the bus CA
// completes the handshake and uses the bus (golden); a client that does NOT
// trust the CA fails the handshake (error path).
func TestNatsTLS(t *testing.T) {
serverTLS, caPool := genTestCA(t)
h := bootHarness(t, membership.AuthOff, false, serverTLS)
waitHealth(t, h.ctrlURL)
// Golden: client pinning the CA connects over TLS and operates.
clientTLS := &tls.Config{RootCAs: caPool, MinVersion: tls.VersionTLS12}
a, err := client.NewWithOptions(h.natsURL, h.ctrlURL, mustIdentity(t), client.Options{TLS: clientTLS})
if err != nil {
t.Fatalf("client trusting the CA should complete the TLS handshake: %v", err)
}
defer a.Close()
if _, err := a.CreateRoom("room.tls", room.ModeNATS); err != nil {
t.Fatalf("TLS client should operate on the bus: %v", err)
}
// Error path: a client that does not trust the CA fails the handshake. Use an
// empty pool (system roots would also reject this private CA, but an empty
// pool makes the intent explicit and avoids depending on the host's roots).
badTLS := &tls.Config{RootCAs: x509.NewCertPool(), MinVersion: tls.VersionTLS12}
if _, err := client.NewWithOptions(h.natsURL, h.ctrlURL, mustIdentity(t), client.Options{TLS: badTLS}); err == nil {
t.Fatalf("client without the CA must fail the TLS handshake")
}
}
// TestSecureBusEndToEnd is the headline golden of issue 0001: with ALL three
// layers active at once — control-plane request signing (enforce), NATS nkey
// auth, and TLS — two registered peers run an encrypted room end to end. A
// creates a Matrix-policy room, invites B, A publishes and B decrypts. This
// proves the layers compose: signed HTTP control plane + authenticated,
// encrypted data plane + E2E room content.
func TestSecureBusEndToEnd(t *testing.T) {
serverTLS, caPool := genTestCA(t)
h := bootHarness(t, membership.AuthEnforce, true, serverTLS)
waitHealth(t, h.ctrlURL)
clientTLS := &tls.Config{RootCAs: caPool, MinVersion: tls.VersionTLS12}
secure := func(t *testing.T, handle string) (*client.Client, membership.AuthMode) {
id := mustIdentity(t)
if err := h.store.AddUser(hex.EncodeToString(id.SignPub), handle, membership.RoleMember); err != nil {
t.Fatalf("register %s: %v", handle, err)
}
c, err := client.NewWithOptions(h.natsURL, h.ctrlURL, id, client.Options{UseNkey: true, TLS: clientTLS})
if err != nil {
t.Fatalf("connect %s securely: %v", handle, err)
}
return c, 0
}
a, _ := secure(t, "alice")
defer a.Close()
b, _ := secure(t, "bob")
defer b.Close()
roomID, err := a.CreateRoom("room.secure", room.ModeMatrix)
if err != nil {
t.Fatalf("A create encrypted room over secure bus: %v", err)
}
if err := a.Invite(roomID, b.Endpoint()); err != nil {
t.Fatalf("A invite B: %v", err)
}
if err := b.Join(roomID); err != nil {
t.Fatalf("B join: %v", err)
}
var mu sync.Mutex
var got []string
sub, err := b.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
mu.Lock()
got = append(got, string(plaintext))
mu.Unlock()
})
if err != nil {
t.Fatalf("B subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
const msg = "mensaje sobre bus seguro (auth+TLS+E2E)"
if err := a.Publish(roomID, []byte(msg)); err != nil {
t.Fatalf("A publish: %v", err)
}
if !waitFor(&mu, &got, func(rs []string) bool {
for _, r := range rs {
if r == msg {
return true
}
}
return false
}, 2*time.Second) {
t.Fatalf("B did not receive/decrypt the message over the secured bus; got %v", snapshot(&mu, &got))
}
}
+49 -12
View File
@@ -6,22 +6,33 @@
package embeddednats
import (
"crypto/tls"
"fmt"
"time"
server "github.com/nats-io/nats-server/v2/server"
)
// Start launches an embedded nats-server with JetStream enabled, listening on
// the given port and persisting JetStream state under storeDir. The listen host
// is left at the nats-server default ("0.0.0.0", all interfaces). It blocks
// until the server is ready to accept connections (up to 5s) and returns the
// running server. The caller is responsible for calling Shutdown on it.
//
// Start is a thin backward-compatible wrapper over StartHost; callers that need
// to control the bind interface (loopback vs LAN) should use StartHost directly.
// ServerConfig is the full set of knobs for the embedded NATS server. The zero
// value (empty StoreDir aside) yields a dev-friendly server: JetStream on, bound
// to all interfaces, no client auth, no TLS. Secured deployments set Auth and
// TLS; tests set Host to loopback and a free Port.
type ServerConfig struct {
StoreDir string // JetStream store directory
Host string // bind interface; "" = nats-server default ("0.0.0.0")
Port int // listen port
// Auth, when non-nil, is installed as CustomClientAuthentication so the data
// plane only accepts approved clients (nkey signature + bus allowlist).
Auth server.Authentication
// TLS, when non-nil, makes the server present a certificate and require TLS
// on the data plane. Clients must trust the issuing CA (see busauth).
TLS *tls.Config
}
// Start is a thin backward-compatible wrapper: embedded JetStream server on the
// default interface, no auth, no TLS.
func Start(storeDir string, port int) (*server.Server, error) {
return StartHost(storeDir, "", port)
return StartServer(ServerConfig{StoreDir: storeDir, Port: port})
}
// StartHost is Start with explicit control over the bind interface. host selects
@@ -30,16 +41,42 @@ func Start(storeDir string, port int) (*server.Server, error) {
// to expose it to the LAN so remote peers (phones, other PCs) can connect. An
// empty host falls back to the nats-server default ("0.0.0.0", all interfaces).
func StartHost(storeDir, host string, port int) (*server.Server, error) {
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port})
}
// StartHostAuth is StartHost with an optional custom client authenticator. When
// auth is non-nil only clients the authenticator approves may connect; when nil
// the server accepts any client (legacy, network-trusted behavior).
func StartHostAuth(storeDir, host string, port int, auth server.Authentication) (*server.Server, error) {
return StartServer(ServerConfig{StoreDir: storeDir, Host: host, Port: port, Auth: auth})
}
// StartServer launches an embedded nats-server with JetStream from cfg. It
// blocks until the server is ready to accept connections (up to 5s) and returns
// the running server; the caller must Shutdown it.
func StartServer(cfg ServerConfig) (*server.Server, error) {
opts := &server.Options{
JetStream: true,
StoreDir: storeDir,
Host: host,
Port: port,
StoreDir: cfg.StoreDir,
Host: cfg.Host,
Port: cfg.Port,
DontListen: false,
// Keep the embedded server quiet by default; the host app logs the URLs.
NoLog: true,
NoSigs: true,
}
if cfg.Auth != nil {
opts.CustomClientAuthentication = cfg.Auth
// A CustomClientAuthentication alone does not make the server advertise a
// nonce in its INFO line, and nats.go refuses to connect with an nkey to a
// server that does not ("nkeys not supported by the server"). Forcing the
// nonce makes nkey clients sign the challenge our authenticator verifies.
opts.AlwaysEnableNonce = true
}
if cfg.TLS != nil {
opts.TLSConfig = cfg.TLS
opts.TLS = true
}
ns, err := server.NewServer(opts)
if err != nil {
+185
View File
@@ -0,0 +1,185 @@
package membership
import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"net/http"
"strconv"
"sync"
"time"
cs "fn-registry/functions/cybersecurity"
)
// AuthMode is the control-plane authentication rollout state (feature flag
// bus-auth). It governs how the HTTP middleware treats a request whose signature
// is missing, invalid, replayed, skewed, or from an unregistered identity.
//
// AuthOff — do not verify anything (legacy behavior; default).
// AuthSoft — verify and LOG rejections, but let the request through. Lets
// clients migrate to signing without an outage.
// AuthEnforce — reject unauthenticated requests with 401.
type AuthMode int
const (
AuthOff AuthMode = iota
AuthSoft
AuthEnforce
)
func (m AuthMode) String() string {
switch m {
case AuthOff:
return "off"
case AuthSoft:
return "soft"
case AuthEnforce:
return "enforce"
default:
return "unknown"
}
}
// ParseAuthMode maps the bus-auth flag string to an AuthMode.
func ParseAuthMode(s string) (AuthMode, error) {
switch s {
case "off", "":
return AuthOff, nil
case "soft":
return AuthSoft, nil
case "enforce":
return AuthEnforce, nil
default:
return AuthOff, fmt.Errorf("membership: invalid bus-auth mode %q (want off|soft|enforce)", s)
}
}
// Control-plane signature headers. The client signs the canonical bytes of the
// request and presents these; the server reconstructs the canonical bytes and
// verifies. See canonicalRequest for the exact byte layout.
const (
hdrPub = "X-Unibus-Pub" // signer Ed25519 public key, lowercase hex
hdrTs = "X-Unibus-Ts" // unix seconds (string)
hdrNonce = "X-Unibus-Nonce" // 16 random bytes, std base64
hdrSig = "X-Unibus-Sig" // Ed25519 signature over canonical, std base64
)
// Anti-replay parameters. A request is accepted only if its timestamp is within
// clockSkew of now; nonces are remembered for nonceTTL so a captured request
// cannot be replayed inside its acceptance window. nonceTTL must be >= the full
// acceptance window (2*clockSkew) so a replay can never outlive its memory.
const (
clockSkew = 30 * time.Second
nonceTTL = 60 * time.Second
)
// CanonicalRequest returns the exact bytes that are signed and verified for a
// control-plane request:
//
// method "\n" path "\n" ts "\n" nonce "\n" hex(sha256(body))
//
// path is the request URI (path plus raw query) so query parameters (endpoint,
// epoch) are covered by the signature. It is exported so the client library and
// tests sign with the identical construction — the one place this format lives.
func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte {
sum := sha256.Sum256(body)
return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:]))
}
// nonceCache remembers recently-seen nonces to reject replays. It is an
// in-memory map guarded by a mutex with lazy expiry — sufficient for a single
// membershipd process (the spec's chosen tradeoff over a server-issued nonce
// round-trip). A distributed deployment would need a shared store.
type nonceCache struct {
mu sync.Mutex
seen map[string]time.Time
ttl time.Duration
}
func newNonceCache(ttl time.Duration) *nonceCache {
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl}
}
// rememberOrReject records nonce and returns true if it was unseen, or false if
// it is a replay (still live in the cache). Expired entries are pruned lazily on
// each call so the map cannot grow without bound under steady traffic.
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
n.mu.Lock()
defer n.mu.Unlock()
for k, exp := range n.seen {
if exp.Before(now) {
delete(n.seen, k)
}
}
if exp, ok := n.seen[nonce]; ok && !exp.Before(now) {
return false
}
n.seen[nonce] = now.Add(n.ttl)
return true
}
// authResult is what a successful authentication yields: the verified signing
// key (hex) and the authorized user record. Handlers may use it for fine-grained
// authorization (e.g. role checks) in later phases.
type authResult struct {
pubHex string
user User
}
// authenticate verifies the signature headers on r against body and the user
// allowlist. It returns an error describing the first failing check; the
// middleware decides whether that error blocks (enforce) or only logs (soft).
//
// Order matters: cheap, non-cryptographic checks (header presence, key shape,
// clock skew) run first; the Ed25519 verification runs before the replay cache
// is touched so an attacker cannot poison the cache with unsigned nonces; the
// allowlist lookup runs last.
func (s *Server) authenticate(r *http.Request, body []byte, now time.Time) (authResult, error) {
pubHex := r.Header.Get(hdrPub)
ts := r.Header.Get(hdrTs)
nonce := r.Header.Get(hdrNonce)
sigB64 := r.Header.Get(hdrSig)
if pubHex == "" || ts == "" || nonce == "" || sigB64 == "" {
return authResult{}, fmt.Errorf("missing auth headers")
}
pub, err := hex.DecodeString(pubHex)
if err != nil || len(pub) != 32 {
return authResult{}, fmt.Errorf("malformed %s (want 32-byte Ed25519 hex)", hdrPub)
}
tsInt, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return authResult{}, fmt.Errorf("malformed %s", hdrTs)
}
if d := now.Unix() - tsInt; d > int64(clockSkew/time.Second) || d < -int64(clockSkew/time.Second) {
return authResult{}, fmt.Errorf("timestamp out of range (skew %ds)", d)
}
sig, err := base64.StdEncoding.DecodeString(sigB64)
if err != nil {
return authResult{}, fmt.Errorf("malformed %s", hdrSig)
}
canonical := CanonicalRequest(r.Method, r.URL.RequestURI(), ts, nonce, body)
if !cs.VerifyEd25519(pub, canonical, sig) {
return authResult{}, fmt.Errorf("invalid signature")
}
if !s.nonces.rememberOrReject(nonce, now) {
return authResult{}, fmt.Errorf("replayed nonce")
}
if !s.store.IsAuthorized(pubHex) {
return authResult{}, fmt.Errorf("identity not authorized")
}
user, err := s.store.GetUser(pubHex)
if err != nil {
// IsAuthorized passed but the row vanished (race with revoke): fail closed.
return authResult{}, fmt.Errorf("identity not authorized")
}
return authResult{pubHex: pubHex, user: user}, nil
}
+194
View File
@@ -0,0 +1,194 @@
package membership
import (
"bytes"
"encoding/base64"
"encoding/hex"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"strconv"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
)
// authHarness boots an in-process membershipd HTTP server in the given auth mode
// with a fresh store + blob store, and seeds one active admin ("alice").
type authHarness struct {
ts *httptest.Server
store *Store
alice cs.Identity
alicePub string // hex
}
func newAuthHarness(t *testing.T, mode AuthMode) *authHarness {
t.Helper()
dir := t.TempDir()
store, err := Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
t.Fatalf("open blobs: %v", err)
}
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
alicePub := hex.EncodeToString(alice.SignPub)
if err := store.AddUser(alicePub, "alice", RoleAdmin); err != nil {
t.Fatalf("seed admin: %v", err)
}
srv := NewServer(store, blobs, mode)
ts := httptest.NewServer(srv)
t.Cleanup(func() {
ts.Close()
store.Close()
})
return &authHarness{ts: ts, store: store, alice: alice, alicePub: alicePub}
}
// signedReq builds a control-plane request signed by id, with explicit ts/nonce
// so tests can force skew and replay. It signs via the same CanonicalRequest the
// production client uses, so the test verifies the real wire contract.
func signedReq(t *testing.T, base, method, path string, body []byte, id cs.Identity, ts int64, nonce string) *http.Request {
t.Helper()
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequest(method, base+path, rdr)
if err != nil {
t.Fatalf("new request: %v", err)
}
tss := strconv.FormatInt(ts, 10)
canonical := CanonicalRequest(method, path, tss, nonce, body)
sig := cs.SignEd25519(id.SignPriv, canonical)
req.Header.Set(hdrPub, hex.EncodeToString(id.SignPub))
req.Header.Set(hdrTs, tss)
req.Header.Set(hdrNonce, nonce)
req.Header.Set(hdrSig, base64.StdEncoding.EncodeToString(sig))
return req
}
func do(t *testing.T, req *http.Request) (int, string) {
t.Helper()
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("do request: %v", err)
}
defer resp.Body.Close()
b, _ := io.ReadAll(resp.Body)
return resp.StatusCode, string(b)
}
const okPath = "/members/alice-endpoint/rooms" // always 200 with an empty list
// Golden: a request signed by a registered, active identity is accepted.
func TestAuthGoldenAccepted(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
code, _ := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-golden"))
if code != http.StatusOK {
t.Fatalf("golden signed request should be 200, got %d", code)
}
}
// Error path: a structurally valid signature from an identity that is NOT in the
// allowlist is rejected with 401.
func TestAuthUnregisteredRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
now := time.Now().Unix()
code, body := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, bob, now, "nonce-bob"))
if code != http.StatusUnauthorized {
t.Fatalf("unregistered identity should be 401, got %d (%s)", code, body)
}
}
// Error path: replaying a captured request (same nonce + signature) is rejected.
func TestAuthReplayRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
first := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
if code, body := do(t, first); code != http.StatusOK {
t.Fatalf("first request should be 200, got %d (%s)", code, body)
}
// Identical ts + nonce + signature: a replay.
second := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
if code, body := do(t, second); code != http.StatusUnauthorized {
t.Fatalf("replayed request should be 401, got %d (%s)", code, body)
}
}
// Error path: a timestamp outside the ±30s window is rejected even with a valid
// signature (defends against long-delayed captured requests).
func TestAuthClockSkewRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
stale := time.Now().Unix() - 120
code, body := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, stale, "nonce-skew"))
if code != http.StatusUnauthorized {
t.Fatalf("clock-skewed request should be 401, got %d (%s)", code, body)
}
}
// Error path: tampering the body after signing invalidates the signature.
func TestAuthTamperedBodyRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
req := signedReq(t, h.ts.URL, "POST", "/rooms", []byte(`{"subject":"x"}`), h.alice, now, "nonce-tamper")
// Swap the body for different bytes the signature does not cover.
req.Body = io.NopCloser(bytes.NewReader([]byte(`{"subject":"evil"}`)))
req.ContentLength = int64(len(`{"subject":"evil"}`))
code, body := do(t, req)
if code != http.StatusUnauthorized {
t.Fatalf("tampered body should be 401, got %d (%s)", code, body)
}
}
// Error path: missing auth headers under enforce are rejected.
func TestAuthMissingHeadersRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
req, _ := http.NewRequest("GET", h.ts.URL+okPath, nil)
code, _ := do(t, req)
if code != http.StatusUnauthorized {
t.Fatalf("unsigned request under enforce should be 401, got %d", code)
}
}
// Exemption: the health probe bypasses auth even under enforce.
func TestAuthHealthExempt(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
req, _ := http.NewRequest("GET", h.ts.URL+"/healthz", nil)
code, _ := do(t, req)
if code != http.StatusOK {
t.Fatalf("/healthz must be reachable without auth, got %d", code)
}
}
// Soft mode: an unauthenticated request is logged but allowed through, so
// clients can migrate without an outage.
func TestAuthSoftAllowsUnauthenticated(t *testing.T) {
h := newAuthHarness(t, AuthSoft)
req, _ := http.NewRequest("GET", h.ts.URL+okPath, nil)
code, _ := do(t, req)
if code != http.StatusOK {
t.Fatalf("soft mode should allow unsigned request, got %d", code)
}
}
// Off mode (default for legacy callers): no verification at all.
func TestAuthOffNoVerification(t *testing.T) {
h := newAuthHarness(t, AuthOff)
req, _ := http.NewRequest("GET", h.ts.URL+okPath, nil)
code, _ := do(t, req)
if code != http.StatusOK {
t.Fatalf("off mode should allow unsigned request, got %d", code)
}
}
+22
View File
@@ -0,0 +1,22 @@
-- 002_users.sql — bus-level user directory (issue 0001a).
--
-- The authoritative allowlist of identities permitted to use the bus, independent
-- of room membership. A user is identified by its Ed25519 signing public key (the
-- same key that derives the endpoint via frame.EndpointID); roles gate admin-only
-- control-plane operations; status enables revocation without deleting history.
--
-- Additive and idempotent: safe to apply repeatedly. Never modify this file;
-- further schema changes go in new numbered migrations (see
-- .claude/rules/db_migrations.md). The embedded copy under
-- pkg/membership/migrations/002_users.sql mirrors this file byte-for-byte.
CREATE TABLE IF NOT EXISTS users (
sign_pub TEXT PRIMARY KEY, -- Ed25519 public key in lowercase hex (peer identity)
handle TEXT NOT NULL, -- human-readable name (unique recommended, not enforced as PK)
role TEXT NOT NULL DEFAULT 'member', -- 'admin' | 'member'
status TEXT NOT NULL DEFAULT 'active', -- 'active' | 'revoked'
created_at TEXT NOT NULL,
revoked_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_users_status ON users(status);
+57 -8
View File
@@ -1,14 +1,17 @@
package membership
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"time"
cs "fn-registry/functions/cybersecurity"
@@ -24,20 +27,66 @@ import (
// rate limiting, and read endpoints (GET) are unauthenticated. Hardening
// (mTLS, capabilities, rate limits) is a later phase.
type Server struct {
store *Store
blobs *blobstore.Store
mux *http.ServeMux
store *Store
blobs *blobstore.Store
mux *http.ServeMux
authMode AuthMode
nonces *nonceCache
}
// NewServer wires the membership store and blob store into an http.Handler.
func NewServer(store *Store, blobs *blobstore.Store) *Server {
s := &Server{store: store, blobs: blobs, mux: http.NewServeMux()}
// NewServer wires the membership store and blob store into an http.Handler. The
// authMode selects the control-plane auth rollout state (AuthOff for callers and
// tests that have not migrated to signed requests yet).
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
s := &Server{
store: store,
blobs: blobs,
mux: http.NewServeMux(),
authMode: authMode,
nonces: newNonceCache(nonceTTL),
}
s.routes()
return s
}
// ServeHTTP satisfies http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) }
// ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware
// (signature verification + anti-replay + allowlist) ahead of the router
// according to authMode, then dispatches to the matched handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.authMode == AuthOff || isAuthExempt(r) {
s.mux.ServeHTTP(w, r)
return
}
// Buffer the body so the signature can be verified over it and the handler
// still reads it. Bodies on the control plane are small (JSON metadata or a
// media blob already capped upstream), so full buffering is acceptable.
body, err := io.ReadAll(r.Body)
if err != nil {
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
return
}
_ = r.Body.Close()
r.Body = io.NopCloser(bytes.NewReader(body))
if _, err := s.authenticate(r, body, time.Now()); err != nil {
if s.authMode == AuthSoft {
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
s.mux.ServeHTTP(w, r)
return
}
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
return
}
s.mux.ServeHTTP(w, r)
}
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
// Only the unauthenticated health probe qualifies: it carries no data and is
// needed by load balancers / smoke checks / systemd before any identity exists.
func isAuthExempt(r *http.Request) bool {
return r.Method == http.MethodGet && r.URL.Path == "/healthz"
}
func (s *Server) routes() {
s.mux.HandleFunc("GET /healthz", s.handleHealth)
+164
View File
@@ -0,0 +1,164 @@
package membership
import (
"database/sql"
"errors"
"fmt"
"strings"
)
// User roles and statuses. They are stored as free text in the users table so
// new values can be introduced without a schema change; these constants name
// the ones the code reasons about today.
const (
RoleAdmin = "admin"
RoleMember = "member"
StatusActive = "active"
StatusRevoked = "revoked"
)
// ErrUserExists is returned by AddUser when a user with the same sign_pub is
// already registered. Callers that want upsert semantics should branch on it.
var ErrUserExists = errors.New("membership: user already exists")
// User is a bus-level identity in the allowlist: the Ed25519 signing public key
// that authenticates a peer on both the control plane (request signatures) and
// the data plane (NATS nkey), plus its role and revocation status. SignPub is
// the lowercase hex of the 32-byte Ed25519 public key — the same key that
// derives the endpoint id via frame.EndpointID.
type User struct {
SignPub string // Ed25519 public key, lowercase hex
Handle string
Role string // RoleAdmin | RoleMember
Status string // StatusActive | StatusRevoked
CreatedAt string
RevokedAt string // empty unless revoked
}
// normalizeSignPub lowercases the hex key so lookups are case-insensitive: the
// primary key is stored lowercase and every query normalizes its input the same
// way, so a caller passing uppercase hex still matches.
func normalizeSignPub(signPub string) string {
return strings.ToLower(strings.TrimSpace(signPub))
}
// AddUser inserts a new bus user. role defaults to RoleMember when empty. It
// returns ErrUserExists if the sign_pub is already registered (the caller may
// choose to revoke+re-add or ignore). handle and signPub must be non-empty.
func (s *Store) AddUser(signPub, handle, role string) error {
signPub = normalizeSignPub(signPub)
if signPub == "" || handle == "" {
return fmt.Errorf("membership: AddUser: sign_pub and handle required")
}
if role == "" {
role = RoleMember
}
if role != RoleAdmin && role != RoleMember {
return fmt.Errorf("membership: AddUser: invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)
}
_, err := s.db.Exec(
`INSERT INTO users (sign_pub, handle, role, status, created_at) VALUES (?, ?, ?, ?, ?)`,
signPub, handle, role, StatusActive, nowRFC3339(),
)
if err != nil {
// modernc.org/sqlite surfaces a UNIQUE/PRIMARY KEY violation as a message
// containing "UNIQUE constraint failed"; translate it into a typed error so
// callers do not have to string-match.
if strings.Contains(err.Error(), "UNIQUE constraint") || strings.Contains(err.Error(), "PRIMARY KEY") {
return ErrUserExists
}
return fmt.Errorf("membership: insert user: %w", err)
}
return nil
}
// GetUser returns the user with the given signing public key. It returns
// sql.ErrNoRows (wrapped) when there is no such user.
func (s *Store) GetUser(signPub string) (User, error) {
signPub = normalizeSignPub(signPub)
var u User
var revoked sql.NullString
err := s.db.QueryRow(
`SELECT sign_pub, handle, role, status, created_at, revoked_at FROM users WHERE sign_pub = ?`,
signPub,
).Scan(&u.SignPub, &u.Handle, &u.Role, &u.Status, &u.CreatedAt, &revoked)
if err != nil {
return User{}, fmt.Errorf("membership: get user %q: %w", signPub, err)
}
u.RevokedAt = revoked.String
return u, nil
}
// ListUsers returns every user ordered by handle then sign_pub (stable output).
func (s *Store) ListUsers() ([]User, error) {
rows, err := s.db.Query(
`SELECT sign_pub, handle, role, status, created_at, revoked_at FROM users ORDER BY handle, sign_pub`,
)
if err != nil {
return nil, fmt.Errorf("membership: list users: %w", err)
}
defer rows.Close()
var out []User
for rows.Next() {
var u User
var revoked sql.NullString
if err := rows.Scan(&u.SignPub, &u.Handle, &u.Role, &u.Status, &u.CreatedAt, &revoked); err != nil {
return nil, fmt.Errorf("membership: scan user: %w", err)
}
u.RevokedAt = revoked.String
out = append(out, u)
}
return out, rows.Err()
}
// RevokeUser marks a user as revoked and stamps revoked_at. Revocation is a
// status flip (not a delete) so the identity stays auditable and IsAuthorized
// immediately denies it on both planes. Revoking an unknown or already-revoked
// user returns an error / is a no-op respectively.
func (s *Store) RevokeUser(signPub string) error {
signPub = normalizeSignPub(signPub)
res, err := s.db.Exec(
`UPDATE users SET status = ?, revoked_at = ? WHERE sign_pub = ? AND status = ?`,
StatusRevoked, nowRFC3339(), signPub, StatusActive,
)
if err != nil {
return fmt.Errorf("membership: revoke user %q: %w", signPub, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("membership: revoke user %q: rows affected: %w", signPub, err)
}
if n == 0 {
return fmt.Errorf("membership: revoke user %q: no active user with that key", signPub)
}
return nil
}
// IsAuthorized reports whether signPub belongs to an active (non-revoked) bus
// user. It is the single authorization predicate consulted by both the control
// plane (HTTP request middleware) and the data plane (NATS nkey authenticator),
// so revoking a user denies access on both without restarting anything. An
// unknown key, a revoked key, or any query error all yield false (fail closed).
func (s *Store) IsAuthorized(signPub string) bool {
signPub = normalizeSignPub(signPub)
if signPub == "" {
return false
}
var one int
err := s.db.QueryRow(
`SELECT 1 FROM users WHERE sign_pub = ? AND status = ?`, signPub, StatusActive,
).Scan(&one)
return err == nil && one == 1
}
// HasAdmin reports whether at least one active admin exists. The control plane
// uses it to gate user-management endpoints: until the host operator seeds the
// first admin via the local CLI, those endpoints stay closed (chicken-egg).
func (s *Store) HasAdmin() bool {
var one int
err := s.db.QueryRow(
`SELECT 1 FROM users WHERE role = ? AND status = ? LIMIT 1`, RoleAdmin, StatusActive,
).Scan(&one)
return err == nil && one == 1
}
+164
View File
@@ -0,0 +1,164 @@
package membership
import (
"errors"
"strings"
"testing"
)
// a valid-shape Ed25519 public key in hex (64 hex chars). The bytes are
// arbitrary: the store treats sign_pub as an opaque identifier and only the CLI
// validates the length, so any 64-hex string round-trips through the store.
const (
pubAlice = "1111111111111111111111111111111111111111111111111111111111111111"
pubBob = "2222222222222222222222222222222222222222222222222222222222222222"
)
// Golden: add a user, read it back, and confirm it authorizes.
func TestAddGetIsAuthorized(t *testing.T) {
s := openTestStore(t)
if err := s.AddUser(pubAlice, "alice", RoleAdmin); err != nil {
t.Fatalf("AddUser: %v", err)
}
u, err := s.GetUser(pubAlice)
if err != nil {
t.Fatalf("GetUser: %v", err)
}
if u.Handle != "alice" || u.Role != RoleAdmin || u.Status != StatusActive {
t.Fatalf("GetUser mismatch: %+v", u)
}
if u.CreatedAt == "" {
t.Fatalf("CreatedAt not stamped")
}
if u.RevokedAt != "" {
t.Fatalf("RevokedAt should be empty for an active user, got %q", u.RevokedAt)
}
if !s.IsAuthorized(pubAlice) {
t.Fatalf("active user should be authorized")
}
if !s.HasAdmin() {
t.Fatalf("HasAdmin should be true after seeding an admin")
}
}
// Edge: an empty role defaults to member; case-insensitive lookup; list order.
func TestAddDefaultsAndListing(t *testing.T) {
s := openTestStore(t)
if err := s.AddUser(pubBob, "bob", ""); err != nil {
t.Fatalf("AddUser bob: %v", err)
}
u, err := s.GetUser(pubBob)
if err != nil {
t.Fatalf("GetUser bob: %v", err)
}
if u.Role != RoleMember {
t.Fatalf("empty role should default to member, got %q", u.Role)
}
// Adding bob (a member only) must not make HasAdmin true.
if s.HasAdmin() {
t.Fatalf("HasAdmin should be false with only a member registered")
}
// Lookup is case-insensitive: uppercase hex matches the lowercase-stored key.
if !s.IsAuthorized(strings.ToUpper(pubBob)) {
t.Fatalf("IsAuthorized should be case-insensitive on the hex key")
}
if err := s.AddUser(pubAlice, "alice", RoleAdmin); err != nil {
t.Fatalf("AddUser alice: %v", err)
}
users, err := s.ListUsers()
if err != nil {
t.Fatalf("ListUsers: %v", err)
}
// Ordered by handle: alice before bob.
if len(users) != 2 || users[0].Handle != "alice" || users[1].Handle != "bob" {
t.Fatalf("ListUsers order/content wrong: %+v", users)
}
}
// Edge: revocation flips status, stamps revoked_at, and denies authorization on
// the spot — the property both planes rely on for revoke-without-restart.
func TestRevokeDeniesAuthorization(t *testing.T) {
s := openTestStore(t)
if err := s.AddUser(pubAlice, "alice", RoleMember); err != nil {
t.Fatalf("AddUser: %v", err)
}
if !s.IsAuthorized(pubAlice) {
t.Fatalf("precondition: user should be authorized before revoke")
}
if err := s.RevokeUser(pubAlice); err != nil {
t.Fatalf("RevokeUser: %v", err)
}
if s.IsAuthorized(pubAlice) {
t.Fatalf("revoked user must NOT be authorized")
}
u, err := s.GetUser(pubAlice)
if err != nil {
t.Fatalf("GetUser after revoke: %v", err)
}
if u.Status != StatusRevoked || u.RevokedAt == "" {
t.Fatalf("revoke should set status=revoked and stamp revoked_at, got %+v", u)
}
}
// Error path: duplicate key, unknown user, invalid role, revoke of unknown.
func TestUserErrorPaths(t *testing.T) {
s := openTestStore(t)
if err := s.AddUser(pubAlice, "alice", RoleAdmin); err != nil {
t.Fatalf("AddUser: %v", err)
}
// Duplicate sign_pub -> typed ErrUserExists.
if err := s.AddUser(pubAlice, "alice2", RoleMember); !errors.Is(err, ErrUserExists) {
t.Fatalf("duplicate AddUser should return ErrUserExists, got %v", err)
}
// Invalid role rejected.
if err := s.AddUser(pubBob, "bob", "superuser"); err == nil {
t.Fatalf("invalid role should error")
}
// Missing handle/sign_pub rejected.
if err := s.AddUser("", "nobody", RoleMember); err == nil {
t.Fatalf("empty sign_pub should error")
}
// Unknown user is not authorized (fail closed) and GetUser errors.
if s.IsAuthorized(pubBob) {
t.Fatalf("unknown user must not be authorized")
}
if _, err := s.GetUser(pubBob); err == nil {
t.Fatalf("GetUser of unknown user should error")
}
// Revoking an unknown (or already-revoked) user errors (no active row).
if err := s.RevokeUser(pubBob); err == nil {
t.Fatalf("revoking unknown user should error")
}
if err := s.RevokeUser(pubAlice); err != nil {
t.Fatalf("first revoke should succeed: %v", err)
}
if err := s.RevokeUser(pubAlice); err == nil {
t.Fatalf("second revoke of same user should error (already revoked)")
}
}
// Migration safety: the users table and its index exist after Open, and the
// users migration is idempotent on re-apply (mirrors TestMigrationsCreateSchema).
func TestUsersMigrationIdempotent(t *testing.T) {
s := openTestStore(t)
var name string
if err := s.db.QueryRow(
`SELECT name FROM sqlite_master WHERE type='table' AND name='users'`,
).Scan(&name); err != nil {
t.Fatalf("users table not created: %v", err)
}
if err := s.db.QueryRow(
`SELECT name FROM sqlite_master WHERE type='index' AND name='idx_users_status'`,
).Scan(&name); err != nil {
t.Fatalf("idx_users_status not created: %v", err)
}
if err := s.applyMigrations(); err != nil {
t.Fatalf("re-apply migrations: %v", err)
}
}
+4 -1
View File
@@ -860,7 +860,10 @@ func main() {
ns.Shutdown()
log.Fatalf("open blob store: %v", err)
}
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs)}
// AuthOff: the playground is a local dev gateway that has not migrated to
// signed control-plane requests or a secured upstream bus yet. What it would
// need is written up in dev/0001e-remaining-clients.md (issue 0001, phase 0001e).
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs, membership.AuthOff)}
go func() {
if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("control plane: %v", err)