From cd02a521914a40612a4eb8811d82cea962d3d772 Mon Sep 17 00:00:00 2001 From: agent Date: Wed, 3 Jun 2026 19:47:32 +0200 Subject: [PATCH] feat: initial scaffold of unibus message bus (membership service + client lib + demo peers) --- .gitignore | 15 + app.md | 153 ++++++ cmd/chat/identity.go | 9 + cmd/chat/main.go | 184 ++++++++ cmd/membershipd/main.go | 93 ++++ cmd/worker/main.go | 71 +++ go.mod | 32 ++ go.sum | 73 +++ migrations/001_init.sql | 32 ++ pkg/blobstore/blobstore.go | 81 ++++ pkg/client/client.go | 627 +++++++++++++++++++++++++ pkg/client/client_test.go | 346 ++++++++++++++ pkg/client/identity.go | 103 ++++ pkg/embeddednats/embeddednats.go | 48 ++ pkg/frame/frame.go | 89 ++++ pkg/frame/frame_test.go | 72 +++ pkg/membership/migrations/001_init.sql | 32 ++ pkg/membership/server.go | 342 ++++++++++++++ pkg/membership/store.go | 289 ++++++++++++ pkg/membership/store_test.go | 142 ++++++ pkg/membership/ulid.go | 13 + pkg/room/room.go | 42 ++ 22 files changed, 2888 insertions(+) create mode 100644 .gitignore create mode 100644 app.md create mode 100644 cmd/chat/identity.go create mode 100644 cmd/chat/main.go create mode 100644 cmd/membershipd/main.go create mode 100644 cmd/worker/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 migrations/001_init.sql create mode 100644 pkg/blobstore/blobstore.go create mode 100644 pkg/client/client.go create mode 100644 pkg/client/client_test.go create mode 100644 pkg/client/identity.go create mode 100644 pkg/embeddednats/embeddednats.go create mode 100644 pkg/frame/frame.go create mode 100644 pkg/frame/frame_test.go create mode 100644 pkg/membership/migrations/001_init.sql create mode 100644 pkg/membership/server.go create mode 100644 pkg/membership/store.go create mode 100644 pkg/membership/store_test.go create mode 100644 pkg/membership/ulid.go create mode 100644 pkg/room/room.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..902cd57 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Per-PC writable runtime state (never distributed). +local_files/ +*.db +*.db-shm +*.db-wal +jetstream/ +blobs/ +worker.id +*.id + +# Build artifacts +/membershipd +/worker +/chat +*.exe diff --git a/app.md b/app.md new file mode 100644 index 0000000..d7eb8e0 --- /dev/null +++ b/app.md @@ -0,0 +1,153 @@ +--- +name: unibus +lang: go +domain: infra +version: 0.1.0 +description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." +tags: [service, messaging, nats, e2e] +uses_functions: + - generate_identity_go_cybersecurity + - seal_aead_go_cybersecurity + - open_aead_go_cybersecurity + - seal_key_box_go_cybersecurity + - open_key_box_go_cybersecurity + - sign_ed25519_go_cybersecurity + - verify_ed25519_go_cybersecurity +uses_types: [] +framework: "" +entry_point: "cmd/membershipd" +dir_path: "projects/message_bus/apps/unibus" +repo_url: "" +service: + port: 8420 + health_endpoint: /healthz + health_timeout_s: 3 + systemd_unit: null + systemd_scope: null + restart_policy: none + runtime: manual + pc_targets: + - lucas-linux + is_local_only: true +e2e_checks: + - id: build + cmd: "CGO_ENABLED=0 go build ./..." + timeout_s: 180 + - id: vet + cmd: "CGO_ENABLED=0 go vet ./..." + timeout_s: 120 + - id: unit + cmd: "CGO_ENABLED=0 go test ./..." + timeout_s: 180 + - id: smoke + cmd: "CGO_ENABLED=0 go build -o /tmp/unibus_membershipd ./cmd/membershipd && /tmp/unibus_membershipd --http-port 8420 --nats-port 14222 --db /tmp/unibus_smoke.db --store-dir /tmp/unibus_blobs --nats-store /tmp/unibus_js &" + health: "http://127.0.0.1:8420/healthz" + timeout_s: 30 +--- + +## Qué es + +`unibus` es un bus de mensajería unificado sobre NATS + JetStream. Una capa fina +encima de NATS aporta lo que NATS no tiene: membresía de rooms, invitaciones con +reparto de clave, y cifrado extremo a extremo (E2E) del payload por room, con +rotación de clave activa (forward secrecy) al expulsar a un miembro. + +Todos los participantes —procesos/workers, interfaces de chat humanas, agentes +LLM— son peers de primera clase que hablan el mismo protocolo y se diferencian +solo por las rooms a las que se unen y por lo que hacen con lo que reciben. + +### Piezas + +| Componente | Ruta | Rol | +|---|---|---| +| `membershipd` | `cmd/membershipd` | Service (control plane): metadata de rooms, directorio de miembros, reparto de claves selladas, object store de media. Arranca NATS embebido si no se pasa `--nats-url`. | +| librería cliente | `pkg/client` | La API que consume cualquier peer. Crear/unirse a rooms, invitar, publicar/suscribir, request/reply, kick con rotación de clave, media cifrada. | +| `worker` | `cmd/worker` | Peer demo: publica un contador incremental cada segundo a una room cleartext. | +| `chat` | `cmd/chat` | Peer demo: suscriptor en vivo (modo simple) + demo de cifrado E2E y forward secrecy (`--demo-encrypted`). | + +### Dos planos + +- **Control plane**: HTTP autoritativo en `membershipd`. Quién está en cada room, + sus claves públicas, y la clave de room `K` sellada por epoch para cada miembro. +- **Data plane**: NATS. Los mensajes (frames) viajan por subject; los blobs de + media NO viajan por el bus, se cifran y se suben al object store, y por NATS + solo viaja una referencia (hash + nonce). + +### Criptografía (importada del registry, NO reescrita) + +El cifrado E2E se compone de las 7 primitivas del capability group +`e2e-messaging` (`docs/capabilities/e2e-messaging.md`), importadas del paquete +`fn-registry/functions/cybersecurity`. Esta app no reimplementa ninguna +primitiva criptográfica. + +## Ejemplo + +Demo end-to-end con `go run` (NATS embebido, nada que instalar): + +```bash +cd projects/message_bus/apps/unibus + +# 1. Service de membresía/claves (NATS embebido en :4222, HTTP en :8420) +go run ./cmd/membershipd + +# 2. En otra terminal: peer publicador (proceso) — publica ticks cada 1s +go run ./cmd/worker + +# 3. En otra terminal: peer suscriptor (chat humano) — imprime cada tick en vivo +go run ./cmd/chat + +# 4. Demo de cifrado E2E + forward secrecy (contra el membershipd ya corriendo): +# A crea room cifrada, invita a B, A publica (B descifra), A expulsa a B, +# A publica de nuevo en el nuevo epoch (B ya NO puede descifrar). +go run ./cmd/chat --demo-encrypted +``` + +Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en +`membershipd`, `worker` y `chat`. + +## Cuando usarla + +- Cuando necesites un tejido de mensajería donde procesos, humanos y agentes LLM + sean peers uniformes (mismo protocolo, distinta política por room). +- Cuando quieras rooms cifradas E2E con forward secrecy (paridad con Matrix) sin + montar un Synapse: `room.ModeMatrix`. +- Cuando quieras fan-out cleartext rápido para telemetría/coordinación de + procesos: `room.ModeNATS`. +- Como sustituto de la capa de transporte Matrix de `agents_and_robots` (fase + posterior; v1 valida el bus de forma autónoma). + +## Gotchas + +- **El service NO está endurecido (v1).** No hay TLS, ni rate-limit, ni auth en + las rutas GET de lectura. Confía en la red interna. Las rutas mutantes + (`/rooms`, `/invite`, `/rekey`) sí exigen firma Ed25519 del owner sobre los + bytes canónicos de la request. Endurecer es fase posterior. +- **Identidad = secreto crítico.** El archivo de identidad (`worker.id`, + `chat.id`) contiene las claves privadas (Ed25519 + X25519). Se escribe 0600. + Perderlo = mensajes ilegibles, sin recuperación. Trátalo como una clave SSH. +- **Las rooms reciben un ULID fresco al crearse.** No hay "crear o unirse por + nombre": cada `CreateRoom` produce un room nuevo. Los peers demo cleartext + comparten el *subject* (NATS enruta por subject), así que worker→chat funcionan + aunque cada uno tenga su propio room id mapeado al mismo subject. +- **La media no viaja por el bus.** `PublishMedia` cifra, sube al object store y + publica solo un `BlobRef`. El receptor, si ve `Frame.Blob != nil`, descarga y + descifra con `FetchMedia`. El frame de media NO lleva payload inline (su nonce + vive en `BlobRef.Nonce`); `Subscribe` no intenta descifrar payloads vacíos. +- **Forward secrecy depende del rekey.** `Kick` rota `K` a un epoch nuevo y la + re-sella solo para los miembros restantes. El expulsado pierde acceso a los + mensajes publicados después del kick, pero conserva los anteriores (las claves + de epochs pasados no se borran: cifraban datos que ya podía leer). +- **NATS embebido escribe JetStream en disco.** `--nats-store` apunta a + `local_files/jetstream`; borrarlo resetea el historial persistido. +- **Build sin CGO.** Usa el driver `modernc.org/sqlite` (pure-Go) y el paquete + `cybersecurity` del registry compila limpio con `CGO_ENABLED=0`. NO requiere + `fts5` ni `gcc`. + +## Convención de subjects + +``` +proc.. telemetría/coordinación de procesos (proc.test.ticks) +rpc. request/reply (rpc.indexer) +room. chat humano/grupo (room.general) +agent..{in,out} inbox/outbox de agente LLM (agent.scout.in) +``` diff --git a/cmd/chat/identity.go b/cmd/chat/identity.go new file mode 100644 index 0000000..c606499 --- /dev/null +++ b/cmd/chat/identity.go @@ -0,0 +1,9 @@ +package main + +import cs "fn-registry/functions/cybersecurity" + +// newEphemeralIdentity generates an in-memory identity for the encrypted demo +// (not persisted to disk — the demo creates fresh peers each run). +func newEphemeralIdentity() (cs.Identity, error) { + return cs.GenerateIdentity() +} diff --git a/cmd/chat/main.go b/cmd/chat/main.go new file mode 100644 index 0000000..e224ba2 --- /dev/null +++ b/cmd/chat/main.go @@ -0,0 +1,184 @@ +// Command chat is a demo peer with two modes. +// +// Simple mode (default): joins a room subject and prints every received message +// live. It validates uniformity worker<->chat: a process publishes, a human UI +// subscribes, both speak the same protocol. +// +// Encrypted demo (--demo-encrypted): a self-contained script against a running +// membershipd that proves E2E encryption and forward secrecy. A creates an +// encrypted room, invites B, A publishes a message B can decrypt, then A kicks +// B and publishes again at the new epoch — B can no longer decrypt. Prints a +// PASS/FAIL summary of each assertion. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/room" +) + +func main() { + var ( + natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "NATS url") + ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8420", "membershipd control-plane url") + roomSub = flag.String("room", "proc.test.ticks", "room subject to subscribe to") + idFile = flag.String("id-file", "./local_files/chat.id", "identity file path") + demoEnc = flag.Bool("demo-encrypted", false, "run the encrypted forward-secrecy demo") + ) + flag.Parse() + + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[chat] ") + + if *demoEnc { + runEncryptedDemo(*natsURL, *ctrlURL) + return + } + runSimple(*natsURL, *ctrlURL, *roomSub, *idFile) +} + +// runSimple subscribes to a cleartext subject and prints messages live. +func runSimple(natsURL, ctrlURL, roomSub, idFile string) { + id, err := client.LoadOrCreateIdentity(idFile) + if err != nil { + log.Fatalf("identity: %v", err) + } + c, err := client.New(natsURL, ctrlURL, id) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer c.Close() + log.Printf("endpoint: %s", c.Endpoint().ID) + + // A subscriber needs a room to resolve the subject + policy. For the + // cleartext demo each peer owns a room mapped to the shared subject; NATS + // fans out by subject so worker publishes reach this subscription. + roomID, err := c.CreateRoom(roomSub, room.ModeNATS) + if err != nil { + log.Fatalf("create room: %v", err) + } + if err := c.Join(roomID); err != nil { + log.Fatalf("join: %v", err) + } + sub, err := c.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { + fmt.Printf("[%s] %s: %s\n", f.Subject, shortID(f.Sender), string(plaintext)) + }) + if err != nil { + log.Fatalf("subscribe: %v", err) + } + defer sub.Unsubscribe() + log.Printf("subscribed to %q; waiting for messages (Ctrl-C to stop)", roomSub) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + log.Printf("bye") +} + +func shortID(id string) string { + if len(id) > 8 { + return id[:8] + } + return id +} + +// runEncryptedDemo proves E2E encryption + forward secrecy end-to-end. +func runEncryptedDemo(natsURL, ctrlURL string) { + log.Printf("=== encrypted forward-secrecy demo ===") + pass := true + check := func(name string, ok bool) { + status := "PASS" + if !ok { + status = "FAIL" + pass = false + } + fmt.Printf(" [%s] %s\n", status, name) + } + + // Two identities: A (owner) and B (invitee). In-memory only (demo). + idA, err := newEphemeralIdentity() + must(err, "generate A identity") + idB, err := newEphemeralIdentity() + must(err, "generate B identity") + + a, err := client.New(natsURL, ctrlURL, idA) + must(err, "connect A") + defer a.Close() + b, err := client.New(natsURL, ctrlURL, idB) + must(err, "connect B") + defer b.Close() + + // A creates an encrypted room. + roomID, err := a.CreateRoom("room.test", room.ModeMatrix) + must(err, "A create room") + fmt.Printf(" room.test -> %s (E2E, persisted, signed)\n", roomID) + + // A invites B (seals K to B's X25519 key). + must(a.Invite(roomID, b.Endpoint()), "A invite B") + + // B joins (fetches + decrypts K). + must(b.Join(roomID), "B join") + + // B subscribes; capture received plaintexts. + recv := make(chan string, 4) + subB, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { + recv <- string(plaintext) + }) + must(err, "B subscribe") + defer subB.Unsubscribe() + time.Sleep(200 * time.Millisecond) // let the subscription settle + + // A publishes a message B can decrypt. + const msg1 = "hola E2E" + must(a.Publish(roomID, []byte(msg1)), "A publish msg1") + got1, ok := waitMsg(recv, 2*time.Second) + check("B decrypts pre-kick message", ok && got1 == msg1) + + // A kicks B (rotates K to a new epoch, re-sealed only for the remaining members). + must(a.Kick(roomID, b.Endpoint().ID), "A kick B") + time.Sleep(200 * time.Millisecond) + + // A publishes at the new epoch. B must NOT be able to decrypt: it was removed + // from the member list, so /key returns no key for B at the new epoch. + const msg2 = "secreto post-kick" + must(a.Publish(roomID, []byte(msg2)), "A publish msg2 (post-kick)") + got2, ok2 := waitMsg(recv, 1500*time.Millisecond) + // Forward secrecy holds if B did NOT receive the post-kick plaintext. + check("B cannot decrypt post-kick message (forward secrecy)", !(ok2 && got2 == msg2)) + if ok2 { + fmt.Printf(" (unexpected: B received %q after kick)\n", got2) + } else { + fmt.Printf(" (B received nothing decryptable after kick — correct)\n") + } + + fmt.Println() + if pass { + fmt.Println("RESULT: PASS — E2E encryption and forward secrecy verified") + os.Exit(0) + } + fmt.Println("RESULT: FAIL — see assertions above") + os.Exit(1) +} + +func waitMsg(ch <-chan string, timeout time.Duration) (string, bool) { + select { + case m := <-ch: + return m, true + case <-time.After(timeout): + return "", false + } +} + +func must(err error, what string) { + if err != nil { + log.Fatalf("%s: %v", what, err) + } +} diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go new file mode 100644 index 0000000..816e094 --- /dev/null +++ b/cmd/membershipd/main.go @@ -0,0 +1,93 @@ +// Command membershipd is the unibus control-plane service: room metadata, +// member directory, sealed key distribution, and the media blob store. The data +// plane is NATS — if --nats-url is empty it starts an embedded nats-server with +// JetStream so the whole stack runs with `go run` and nothing to install. +package main + +import ( + "context" + "flag" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + server "github.com/nats-io/nats-server/v2/server" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/membership" +) + +func main() { + var ( + natsURL = flag.String("nats-url", "", "external NATS url; empty starts an embedded server") + httpPort = flag.String("http-port", "8420", "HTTP port for the control-plane API") + dbPath = flag.String("db", "./local_files/unibus.db", "SQLite database path") + storeDir = flag.String("store-dir", "./local_files/blobs", "blob store directory") + natsPort = flag.Int("nats-port", 4222, "embedded NATS listen port (when --nats-url empty)") + natsStore = flag.String("nats-store", "./local_files/jetstream", "embedded JetStream store dir") + ) + flag.Parse() + + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[membershipd] ") + + // Data plane: embedded or external NATS. + var ns *server.Server + natsClientURL := *natsURL + if natsClientURL == "" { + var err error + ns, err = embeddednats.Start(*natsStore, *natsPort) + if err != nil { + log.Fatalf("start embedded nats: %v", err) + } + natsClientURL = embeddednats.ClientURL(ns) + log.Printf("embedded NATS (JetStream) ready: %s", natsClientURL) + } else { + log.Printf("using external NATS: %s", natsClientURL) + } + + // Control plane: SQLite store + blob store + HTTP API. + store, err := membership.Open(*dbPath) + if err != nil { + log.Fatalf("open membership store: %v", err) + } + defer store.Close() + log.Printf("membership store: %s", *dbPath) + + blobs, err := blobstore.New(*storeDir) + if err != nil { + log.Fatalf("open blob store: %v", err) + } + log.Printf("blob store: %s", *storeDir) + + srv := membership.NewServer(store, blobs) + addr := "127.0.0.1:" + *httpPort + httpSrv := &http.Server{Addr: addr, Handler: srv} + + go func() { + log.Printf("HTTP control-plane API: http://%s", addr) + log.Printf(" health: http://%s/healthz", addr) + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("http server: %v", err) + } + }() + + // Graceful shutdown on SIGINT/SIGTERM. + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + log.Printf("shutting down...") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = httpSrv.Shutdown(ctx) + if ns != nil { + ns.Shutdown() + ns.WaitForShutdown() + } + log.Printf("bye") +} diff --git a/cmd/worker/main.go b/cmd/worker/main.go new file mode 100644 index 0000000..3a64d94 --- /dev/null +++ b/cmd/worker/main.go @@ -0,0 +1,71 @@ +// Command worker is a demo peer: it creates (or joins) a cleartext room and +// publishes an incrementing counter once per second, to both stdout and the +// bus. It demonstrates that a process is a first-class bus peer, uniform with +// the human chat client. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/room" +) + +func main() { + var ( + natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "NATS url") + ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8420", "membershipd control-plane url") + roomSub = flag.String("room", "proc.test.ticks", "room subject to publish to") + idFile = flag.String("id-file", "./local_files/worker.id", "identity file path") + ) + flag.Parse() + + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[worker] ") + + id, err := client.LoadOrCreateIdentity(*idFile) + if err != nil { + log.Fatalf("identity: %v", err) + } + c, err := client.New(*natsURL, *ctrlURL, id) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer c.Close() + log.Printf("endpoint: %s", c.Endpoint().ID) + + // Create the room; if it already exists we cannot recreate it under a known + // id (rooms get fresh ULIDs), so for the demo each worker run owns its room. + roomID, err := c.CreateRoom(*roomSub, room.ModeNATS) + if err != nil { + log.Fatalf("create room: %v", err) + } + log.Printf("room %q -> %s (subject %s, cleartext)", *roomSub, roomID, *roomSub) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + n := 0 + for { + select { + case <-ticker.C: + n++ + payload := fmt.Sprintf("tick %d @ %s", n, time.Now().UTC().Format(time.RFC3339)) + fmt.Println(payload) + if err := c.Publish(roomID, []byte(payload)); err != nil { + log.Printf("publish: %v", err) + } + case <-stop: + log.Printf("stopping after %d ticks", n) + return + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..18577d6 --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module github.com/enmanuel/unibus + +go 1.25.0 + +replace fn-registry => ../../../../ + +require ( + fn-registry v0.0.0-00010101000000-000000000000 + github.com/nats-io/nats-server/v2 v2.10.22 + github.com/nats-io/nats.go v1.37.0 + github.com/oklog/ulid/v2 v2.1.0 + modernc.org/sqlite v1.47.0 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.18.3 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/minio/highwayhash v1.0.3 // indirect + github.com/nats-io/jwt/v2 v2.5.8 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/crypto v0.51.0 // indirect + golang.org/x/sys v0.44.0 // indirect + golang.org/x/time v0.7.0 // indirect + modernc.org/libc v1.70.0 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9b819b1 --- /dev/null +++ b/go.sum @@ -0,0 +1,73 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= +github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA= +github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= +github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= +golang.org/x/mod v0.36.0 h1:JJjpVx6myfUsUdAzZuOSTTmRE0PfZeNWzzvKrP7amb4= +golang.org/x/mod v0.36.0/go.mod h1:moc6ELqsWcOw5Ef3xVprK5ul/MvtVvkIXLziUOICjUQ= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= +golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8= +golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw= +modernc.org/ccgo/v4 v4.32.0/go.mod h1:6F08EBCx5uQc38kMGl+0Nm0oWczoo1c7cgpzEry7Uc0= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.70.0 h1:U58NawXqXbgpZ/dcdS9kMshu08aiA6b7gusEusqzNkw= +modernc.org/libc v1.70.0/go.mod h1:OVmxFGP1CI/Z4L3E0Q3Mf1PDE0BucwMkcXjjLntvHJo= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.47.0 h1:R1XyaNpoW4Et9yly+I2EeX7pBza/w+pmYee/0HJDyKk= +modernc.org/sqlite v1.47.0/go.mod h1:hWjRO6Tj/5Ik8ieqxQybiEOUXy0NJFNp2tpvVpKlvig= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/migrations/001_init.sql b/migrations/001_init.sql new file mode 100644 index 0000000..a571908 --- /dev/null +++ b/migrations/001_init.sql @@ -0,0 +1,32 @@ +-- 001_init.sql — initial schema for the unibus membership/key-distribution service. +-- Additive and idempotent: safe to apply repeatedly. Never modify this file; +-- schema changes go in new numbered migrations (see .claude/rules/db_migrations.md). + +CREATE TABLE IF NOT EXISTS rooms ( + room_id TEXT PRIMARY KEY, + subject TEXT NOT NULL, + key_epoch INTEGER NOT NULL DEFAULT 1, + encrypt INTEGER NOT NULL, + persist INTEGER NOT NULL, + sign_msgs INTEGER NOT NULL, + owner_endpoint TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS members ( + room_id TEXT NOT NULL, + endpoint TEXT NOT NULL, + role TEXT NOT NULL, + joined_at TEXT NOT NULL, + sign_pub BLOB NOT NULL, + kex_pub BLOB NOT NULL, + PRIMARY KEY (room_id, endpoint) +); + +CREATE TABLE IF NOT EXISTS room_keys ( + room_id TEXT NOT NULL, + epoch INTEGER NOT NULL, + endpoint TEXT NOT NULL, + sealed_key BLOB NOT NULL, + PRIMARY KEY (room_id, epoch, endpoint) +); diff --git a/pkg/blobstore/blobstore.go b/pkg/blobstore/blobstore.go new file mode 100644 index 0000000..54c82e0 --- /dev/null +++ b/pkg/blobstore/blobstore.go @@ -0,0 +1,81 @@ +// Package blobstore is a content-addressed object store on local disk. +// +// The bus transports messages, not blobs. Media (images, files, large payloads) +// is encrypted by the client BEFORE being stored here, so the store only ever +// sees ciphertext. Objects are addressed by the sha256 hex of their (encrypted) +// bytes, which makes Put idempotent and deduplicating. +package blobstore + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "os" + "path/filepath" +) + +// Store is a directory-backed content-addressed blob store. +type Store struct { + dir string +} + +// New creates a Store rooted at dir, creating the directory if needed. +func New(dir string) (*Store, error) { + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("blobstore: mkdir %q: %w", dir, err) + } + return &Store{dir: dir}, nil +} + +// path returns the on-disk path for a given content hash. +func (s *Store) path(hash string) string { + return filepath.Join(s.dir, hash) +} + +// Put writes data to the store and returns its sha256 hex hash. If an object +// with the same content already exists, Put is a no-op and returns the hash. +func (s *Store) Put(data []byte) (string, error) { + sum := sha256.Sum256(data) + hash := hex.EncodeToString(sum[:]) + p := s.path(hash) + + if _, err := os.Stat(p); err == nil { + return hash, nil // already present (content-addressed: identical bytes) + } + + // Write atomically: temp file + rename, so readers never see a partial blob. + tmp, err := os.CreateTemp(s.dir, ".tmp-*") + if err != nil { + return "", fmt.Errorf("blobstore: create temp: %w", err) + } + tmpName := tmp.Name() + if _, err := tmp.Write(data); err != nil { + tmp.Close() + os.Remove(tmpName) + return "", fmt.Errorf("blobstore: write temp: %w", err) + } + if err := tmp.Close(); err != nil { + os.Remove(tmpName) + return "", fmt.Errorf("blobstore: close temp: %w", err) + } + if err := os.Rename(tmpName, p); err != nil { + os.Remove(tmpName) + return "", fmt.Errorf("blobstore: rename: %w", err) + } + return hash, nil +} + +// Get reads the object with the given hash. +func (s *Store) Get(hash string) ([]byte, error) { + data, err := os.ReadFile(s.path(hash)) + if err != nil { + return nil, fmt.Errorf("blobstore: get %q: %w", hash, err) + } + return data, nil +} + +// Has reports whether an object with the given hash exists. +func (s *Store) Has(hash string) bool { + _, err := os.Stat(s.path(hash)) + return err == nil +} diff --git a/pkg/client/client.go b/pkg/client/client.go new file mode 100644 index 0000000..467d1b7 --- /dev/null +++ b/pkg/client/client.go @@ -0,0 +1,627 @@ +// Package client is the unibus client library: the single API that every peer +// (process worker, human chat UI, LLM agent) uses to talk to the bus. +// +// It hides the two planes behind one object: +// - control plane (HTTP to membershipd): create/join rooms, invite, fetch +// sealed keys, rekey on kick. +// - data plane (NATS): publish/subscribe/request/reply of frames. +// +// In encrypted rooms it transparently seals/opens payloads with the room key K, +// distributes K to invitees via sealed boxes, signs and verifies messages, and +// rotates K to a new epoch on kick (forward secrecy). All crypto comes from the +// fn-registry cybersecurity package; this library never reimplements primitives. +package client + +import ( + "bytes" + "crypto/rand" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/room" + "github.com/nats-io/nats.go" +) + +// Endpoint is the public identity of a peer. +type Endpoint struct { + ID string + SignPub []byte + KexPub []byte +} + +// Client is a connected unibus peer. +type Client struct { + id cs.Identity + endpoint string + nc *nats.Conn + ctrlURL string + http *http.Client + + mu sync.RWMutex + keyCache map[string]map[int][]byte // roomID -> epoch -> K + signCache map[string][]byte // sender endpoint -> sign pub (for verification) +} + +// New connects to NATS and records the control-plane URL. The identity holds +// the peer's long-term keypairs. +func New(natsURL, ctrlURL string, id cs.Identity) (*Client, error) { + nc, err := nats.Connect(natsURL, nats.Name("unibus-client")) + if err != nil { + return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err) + } + return &Client{ + id: id, + endpoint: frame.EndpointID(id.SignPub), + nc: nc, + ctrlURL: ctrlURL, + http: &http.Client{Timeout: 10 * time.Second}, + keyCache: map[string]map[int][]byte{}, + signCache: map[string][]byte{}, + }, nil +} + +// Endpoint returns this client's public identity. +func (c *Client) Endpoint() Endpoint { + return Endpoint{ID: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub} +} + +// Close releases the NATS connection. +func (c *Client) Close() error { + c.nc.Close() + return nil +} + +// ---- key cache ------------------------------------------------------------ + +func (c *Client) cacheKey(roomID string, epoch int, k []byte) { + c.mu.Lock() + defer c.mu.Unlock() + m := c.keyCache[roomID] + if m == nil { + m = map[int][]byte{} + c.keyCache[roomID] = m + } + m[epoch] = k +} + +func (c *Client) getCachedKey(roomID string, epoch int) ([]byte, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + if m := c.keyCache[roomID]; m != nil { + k, ok := m[epoch] + return k, ok + } + return nil, false +} + +// ---- control-plane HTTP helpers ------------------------------------------ + +func (c *Client) doJSON(method, path string, body, out any) error { + var rdr io.Reader + if body != nil { + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("client: marshal request: %w", err) + } + rdr = bytes.NewReader(b) + } + req, err := http.NewRequest(method, c.ctrlURL+path, rdr) + if err != nil { + return fmt.Errorf("client: new request: %w", err) + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + resp, err := c.http.Do(req) + if err != nil { + return fmt.Errorf("client: do %s %s: %w", method, path, err) + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 300 { + return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody)) + } + if out != nil { + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("client: decode response: %w", err) + } + } + return nil +} + +// signRequest signs the canonical bytes of req (req must already have its Sig +// field cleared) with the client's Ed25519 key. It is symmetric with the +// server's verifyOwnerSig. +func (c *Client) signRequest(req any) []byte { + b, _ := json.Marshal(req) + return cs.SignEd25519(c.id.SignPriv, b) +} + +// ---- mirror of server wire types (control plane) ------------------------- + +type policyJSON struct { + Encrypt bool `json:"encrypt"` + Persist bool `json:"persist"` + SignMsgs bool `json:"sign_msgs"` +} + +type endpointJSON struct { + Endpoint string `json:"endpoint"` + SignPub []byte `json:"sign_pub"` + KexPub []byte `json:"kex_pub"` +} + +type createRoomReq struct { + Subject string `json:"subject"` + Policy policyJSON `json:"policy"` + Owner endpointJSON `json:"owner"` + SealedKeySelf []byte `json:"sealed_key_self"` +} + +type createRoomResp struct { + RoomID string `json:"room_id"` +} + +type inviteReq struct { + By string `json:"by"` + Sig []byte `json:"sig"` + Member endpointJSON `json:"member"` + SealedKey []byte `json:"sealed_key"` +} + +type keyResp struct { + Epoch int `json:"epoch"` + SealedKey []byte `json:"sealed_key"` +} + +type memberJSON struct { + Endpoint string `json:"endpoint"` + Role string `json:"role"` + SignPub []byte `json:"sign_pub"` + KexPub []byte `json:"kex_pub"` +} + +type roomResp struct { + Subject string `json:"subject"` + Epoch int `json:"epoch"` + Policy policyJSON `json:"policy"` +} + +type rekeyKey struct { + Endpoint string `json:"endpoint"` + SealedKey []byte `json:"sealed_key"` +} + +type rekeyReq struct { + By string `json:"by"` + Sig []byte `json:"sig"` + NewEpoch int `json:"new_epoch"` + Keys []rekeyKey `json:"keys"` + Remove []string `json:"remove"` +} + +type blobResp struct { + Hash string `json:"hash"` +} + +// ---- room operations ------------------------------------------------------ + +// newRoomKey returns 32 random bytes for a symmetric room key. +func newRoomKey() ([]byte, error) { + k := make([]byte, 32) + if _, err := rand.Read(k); err != nil { + return nil, fmt.Errorf("client: generate room key: %w", err) + } + return k, nil +} + +// CreateRoom creates a room with the given subject and policy. For encrypted +// rooms it generates K, seals K to itself, and caches K at epoch 1. +func (c *Client) CreateRoom(subject string, p room.Policy) (string, error) { + req := createRoomReq{ + Subject: subject, + Policy: policyJSON{Encrypt: p.Encrypt, Persist: p.Persist, SignMsgs: p.SignMsgs}, + Owner: endpointJSON{Endpoint: c.endpoint, SignPub: c.id.SignPub, KexPub: c.id.KexPub}, + } + var k []byte + if p.Encrypt { + var err error + if k, err = newRoomKey(); err != nil { + return "", err + } + sealed, err := cs.SealKeyBox(c.id.KexPub, k) + if err != nil { + return "", fmt.Errorf("client: seal own key: %w", err) + } + req.SealedKeySelf = sealed + } + var resp createRoomResp + if err := c.doJSON("POST", "/rooms", req, &resp); err != nil { + return "", err + } + if p.Encrypt { + c.cacheKey(resp.RoomID, 1, k) + } + return resp.RoomID, nil +} + +// Invite adds a member to a room. It seals the current-epoch room key to the +// invitee's X25519 public key and signs the request as the owner. +func (c *Client) Invite(roomID string, m Endpoint) error { + info, err := c.fetchRoom(roomID) + if err != nil { + return err + } + var sealed []byte + if info.Policy.Encrypt { + k, ok := c.getCachedKey(roomID, info.Epoch) + if !ok { + return fmt.Errorf("client: invite: no cached key for room %s epoch %d", roomID, info.Epoch) + } + if sealed, err = cs.SealKeyBox(m.KexPub, k); err != nil { + return fmt.Errorf("client: seal key for invitee: %w", err) + } + } + req := inviteReq{ + By: c.endpoint, + Member: endpointJSON{Endpoint: m.ID, SignPub: m.SignPub, KexPub: m.KexPub}, + SealedKey: sealed, + } + req.Sig = c.signRequest(req) // Sig is zero-valued at sign time + return c.doJSON("POST", "/rooms/"+roomID+"/invite", req, nil) +} + +// roomView is the resolved room metadata. +type roomView struct { + Subject string + Epoch int + Policy room.Policy +} + +func (c *Client) fetchRoom(roomID string) (roomView, error) { + var resp roomResp + if err := c.doJSON("GET", "/rooms/"+roomID, nil, &resp); err != nil { + return roomView{}, err + } + return roomView{ + Subject: resp.Subject, + Epoch: resp.Epoch, + Policy: room.Policy{Encrypt: resp.Policy.Encrypt, Persist: resp.Policy.Persist, SignMsgs: resp.Policy.SignMsgs}, + }, nil +} + +// fetchKey retrieves and caches the room key K for the given epoch (epoch <= 0 +// means latest). It opens the sealed box with the client's own X25519 keypair. +func (c *Client) fetchKey(roomID string, epoch int) ([]byte, int, error) { + if epoch > 0 { + if k, ok := c.getCachedKey(roomID, epoch); ok { + return k, epoch, nil + } + } + path := fmt.Sprintf("/rooms/%s/key?endpoint=%s", roomID, c.endpoint) + if epoch > 0 { + path += fmt.Sprintf("&epoch=%d", epoch) + } + var resp keyResp + if err := c.doJSON("GET", path, nil, &resp); err != nil { + return nil, 0, err + } + k, err := cs.OpenKeyBox(c.id.KexPub, c.id.KexPriv, resp.SealedKey) + if err != nil { + return nil, 0, fmt.Errorf("client: open room key: %w", err) + } + c.cacheKey(roomID, resp.Epoch, k) + return k, resp.Epoch, nil +} + +// Join resolves room metadata and, for encrypted rooms, fetches and caches the +// current room key. It does not subscribe to NATS (use Subscribe for that). +func (c *Client) Join(roomID string) error { + info, err := c.fetchRoom(roomID) + if err != nil { + return err + } + if info.Policy.Encrypt { + if _, _, err := c.fetchKey(roomID, info.Epoch); err != nil { + return err + } + } + return nil +} + +// signerPub returns the sign public key of a sender endpoint, fetching the +// member list once and caching it. +func (c *Client) signerPub(roomID, sender string) ([]byte, error) { + c.mu.RLock() + pub, ok := c.signCache[sender] + c.mu.RUnlock() + if ok { + return pub, nil + } + var members []memberJSON + if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil { + return nil, err + } + c.mu.Lock() + for _, m := range members { + c.signCache[m.Endpoint] = m.SignPub + } + pub, ok = c.signCache[sender] + c.mu.Unlock() + if !ok { + return nil, fmt.Errorf("client: no sign key for sender %q", sender) + } + return pub, nil +} + +// ---- data plane: publish/subscribe --------------------------------------- + +// Publish sends plaintext to a room. For encrypted rooms it seals the payload +// with the current K using the subject as AEAD additional-authenticated-data; +// for signed rooms it attaches an Ed25519 signature. +func (c *Client) Publish(roomID string, plaintext []byte) error { + info, err := c.fetchRoom(roomID) + if err != nil { + return err + } + f := frame.Frame{ + Type: frame.PUB, + Subject: info.Subject, + Sender: c.endpoint, + MsgID: newULID(), + Epoch: info.Epoch, + } + if info.Policy.Encrypt { + k, ep, err := c.fetchKey(roomID, info.Epoch) + if err != nil { + return err + } + nonce, ct, err := cs.SealAEAD(k, plaintext, []byte(info.Subject)) + if err != nil { + return fmt.Errorf("client: seal payload: %w", err) + } + f.Epoch, f.Nonce, f.Payload = ep, nonce, ct + } else { + f.Payload = plaintext + } + if info.Policy.SignMsgs { + f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes()) + } + b, err := f.Marshal() + if err != nil { + return fmt.Errorf("client: marshal frame: %w", err) + } + return c.nc.Publish(info.Subject, b) +} + +// Subscribe subscribes to a room and invokes handler for each message with the +// decoded frame and (for encrypted rooms) the decrypted plaintext. Signature +// verification and epoch-driven key refresh happen transparently. Messages that +// fail verification or decryption are dropped (handler not called). +func (c *Client) Subscribe(roomID string, handler func(f frame.Frame, plaintext []byte)) (*nats.Subscription, error) { + info, err := c.fetchRoom(roomID) + if err != nil { + return nil, err + } + return c.nc.Subscribe(info.Subject, func(msg *nats.Msg) { + f, err := frame.Unmarshal(msg.Data) + if err != nil { + return + } + if info.Policy.SignMsgs && f.Sig != nil { + pub, err := c.signerPub(roomID, f.Sender) + if err != nil || !cs.VerifyEd25519(pub, f.SigningBytes(), f.Sig) { + return // unauthenticated frame: drop + } + } + plaintext := f.Payload + // Decrypt only inline payloads. Media frames carry their bytes in the + // blob store (referenced by f.Blob) with the nonce in BlobRef.Nonce; + // the handler decrypts those on demand via FetchMedia. A frame with an + // inline ciphertext always has a non-empty Nonce. + if info.Policy.Encrypt && len(f.Nonce) > 0 && len(f.Payload) > 0 { + k, ok := c.getCachedKey(roomID, f.Epoch) + if !ok { + // Sender used a newer (or unknown) epoch: refresh K from the control plane. + k, _, err = c.fetchKey(roomID, f.Epoch) + if err != nil { + return // cannot obtain key for this epoch (e.g. we were kicked): drop + } + } + pt, err := cs.OpenAEAD(k, f.Nonce, f.Payload, []byte(info.Subject)) + if err != nil { + return // cannot decrypt (wrong epoch/kicked): drop + } + plaintext = pt + } + handler(f, plaintext) + }) +} + +// ---- request/reply (cleartext v1) ---------------------------------------- + +// Request performs a NATS request/reply on subject (cleartext in v1, intended +// for rpc.* subjects). +func (c *Client) Request(subject string, plaintext []byte, timeout time.Duration) ([]byte, error) { + msg, err := c.nc.Request(subject, plaintext, timeout) + if err != nil { + return nil, fmt.Errorf("client: request %q: %w", subject, err) + } + return msg.Data, nil +} + +// Reply registers a responder for subject; handler receives the request bytes +// and returns the reply bytes (cleartext in v1). +func (c *Client) Reply(subject string, handler func([]byte) []byte) (*nats.Subscription, error) { + return c.nc.Subscribe(subject, func(msg *nats.Msg) { + if msg.Reply == "" { + return + } + _ = c.nc.Publish(msg.Reply, handler(msg.Data)) + }) +} + +// ---- kick / forward secrecy ---------------------------------------------- + +// Kick removes a member and rotates the room key to a new epoch, re-sealing it +// only for the remaining members. The kicked member can no longer decrypt +// messages published after the rotation (forward secrecy). +func (c *Client) Kick(roomID string, endpoint string) error { + info, err := c.fetchRoom(roomID) + if err != nil { + return err + } + newEpoch := info.Epoch + 1 + + if !info.Policy.Encrypt { + // Unencrypted room: just remove the member, no key rotation needed. + req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Remove: []string{endpoint}} + req.Sig = c.signRequest(req) + return c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil) + } + + kPrime, err := newRoomKey() + if err != nil { + return err + } + var members []memberJSON + if err := c.doJSON("GET", "/rooms/"+roomID+"/members", nil, &members); err != nil { + return err + } + var keys []rekeyKey + for _, m := range members { + if m.Endpoint == endpoint { + continue // exclude the kicked member + } + sealed, err := cs.SealKeyBox(m.KexPub, kPrime) + if err != nil { + return fmt.Errorf("client: seal new key for %q: %w", m.Endpoint, err) + } + keys = append(keys, rekeyKey{Endpoint: m.Endpoint, SealedKey: sealed}) + } + req := rekeyReq{By: c.endpoint, NewEpoch: newEpoch, Keys: keys, Remove: []string{endpoint}} + req.Sig = c.signRequest(req) + if err := c.doJSON("POST", "/rooms/"+roomID+"/rekey", req, nil); err != nil { + return err + } + c.cacheKey(roomID, newEpoch, kPrime) + return nil +} + +// ---- media (object store) ------------------------------------------------- + +// PublishMedia encrypts data with the room key, uploads the ciphertext to the +// blob store, and publishes a frame carrying only a BlobRef. Receivers whose +// handler sees f.Blob != nil should GET /blobs/{hash} and OpenAEAD it. +func (c *Client) PublishMedia(roomID string, data []byte) error { + info, err := c.fetchRoom(roomID) + if err != nil { + return err + } + f := frame.Frame{ + Type: frame.PUB, + Subject: info.Subject, + Sender: c.endpoint, + MsgID: newULID(), + Epoch: info.Epoch, + } + + var ciphertext []byte + var nonce []byte + if info.Policy.Encrypt { + k, ep, err := c.fetchKey(roomID, info.Epoch) + if err != nil { + return err + } + nonce, ciphertext, err = cs.SealAEAD(k, data, []byte(info.Subject)) + if err != nil { + return fmt.Errorf("client: seal media: %w", err) + } + f.Epoch = ep + } else { + ciphertext = data + } + + hash, err := c.putBlob(ciphertext) + if err != nil { + return err + } + f.Blob = &frame.BlobRef{Hash: hash, Nonce: nonce, Size: int64(len(ciphertext))} + + if info.Policy.SignMsgs { + f.Sig = cs.SignEd25519(c.id.SignPriv, f.SigningBytes()) + } + b, err := f.Marshal() + if err != nil { + return fmt.Errorf("client: marshal media frame: %w", err) + } + return c.nc.Publish(info.Subject, b) +} + +// FetchMedia downloads and (for encrypted rooms) decrypts a blob referenced by +// a received frame. It is a convenience for handlers that see f.Blob != nil. +func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) { + if f.Blob == nil { + return nil, fmt.Errorf("client: frame has no blob ref") + } + ct, err := c.getBlob(f.Blob.Hash) + if err != nil { + return nil, err + } + info, err := c.fetchRoom(roomID) + if err != nil { + return nil, err + } + if !info.Policy.Encrypt { + return ct, nil + } + k, ok := c.getCachedKey(roomID, f.Epoch) + if !ok { + if k, _, err = c.fetchKey(roomID, f.Epoch); err != nil { + return nil, err + } + } + return cs.OpenAEAD(k, f.Blob.Nonce, ct, []byte(info.Subject)) +} + +func (c *Client) putBlob(ciphertext []byte) (string, error) { + req, err := http.NewRequest("POST", c.ctrlURL+"/blobs", bytes.NewReader(ciphertext)) + if err != nil { + return "", fmt.Errorf("client: new blob request: %w", err) + } + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := c.http.Do(req) + if err != nil { + return "", fmt.Errorf("client: put blob: %w", err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 300 { + return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body)) + } + var r blobResp + if err := json.Unmarshal(body, &r); err != nil { + return "", fmt.Errorf("client: decode blob resp: %w", err) + } + return r.Hash, nil +} + +func (c *Client) getBlob(hash string) ([]byte, error) { + resp, err := c.http.Get(c.ctrlURL + "/blobs/" + hash) + if err != nil { + return nil, fmt.Errorf("client: get blob: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body)) + } + return io.ReadAll(resp.Body) +} diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go new file mode 100644 index 0000000..ff0b0fd --- /dev/null +++ b/pkg/client/client_test.go @@ -0,0 +1,346 @@ +package client_test + +import ( + "net" + "net/http" + "net/http/httptest" + "path/filepath" + "sync" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/enmanuel/unibus/pkg/room" + server "github.com/nats-io/nats-server/v2/server" +) + +// testHarness boots an embedded NATS server and an in-process membershipd HTTP +// server, returning their URLs and a cleanup func. +type testHarness struct { + natsURL string + ctrlURL string + ns *server.Server + httpts *httptest.Server +} + +func freePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +func newHarness(t *testing.T) *testHarness { + t.Helper() + dir := t.TempDir() + + ns, err := embeddednats.Start(filepath.Join(dir, "js"), freePort(t)) + if err != nil { + t.Fatalf("embedded nats: %v", err) + } + + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + ns.Shutdown() + t.Fatalf("membership store: %v", err) + } + blobs, err := blobstore.New(filepath.Join(dir, "blobs")) + if err != nil { + ns.Shutdown() + t.Fatalf("blob store: %v", err) + } + srv := membership.NewServer(store, blobs) + httpts := httptest.NewServer(srv) + + h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts} + t.Cleanup(func() { + httpts.Close() + store.Close() + ns.Shutdown() + ns.WaitForShutdown() + }) + return h +} + +func waitHealth(t *testing.T, ctrlURL string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + resp, err := http.Get(ctrlURL + "/healthz") + if err == nil && resp.StatusCode == 200 { + resp.Body.Close() + return + } + if resp != nil { + resp.Body.Close() + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("membershipd never became healthy") +} + +func mustIdentity(t *testing.T) cs.Identity { + t.Helper() + id, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("generate identity: %v", err) + } + return id +} + +// TestE2EEncryptedForwardSecrecy is the headline test: A creates an encrypted +// room, invites B, A publishes a message B decrypts, then A kicks B and +// publishes at the new epoch — B must NOT be able to decrypt the new message. +func TestE2EEncryptedForwardSecrecy(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("room.test", room.ModeMatrix) + if err != nil { + t.Fatalf("A create room: %v", err) + } + if err := a.Invite(roomID, b.Endpoint()); err != nil { + t.Fatalf("A invite B: %v", err) + } + if err := b.Join(roomID); err != nil { + t.Fatalf("B join: %v", err) + } + + var mu sync.Mutex + var received []string + sub, err := b.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { + mu.Lock() + received = append(received, string(plaintext)) + mu.Unlock() + }) + if err != nil { + t.Fatalf("B subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(150 * time.Millisecond) + + const msg1 = "hola E2E" + if err := a.Publish(roomID, []byte(msg1)); err != nil { + t.Fatalf("A publish msg1: %v", err) + } + + // Wait for B to receive and decrypt msg1. + if !waitFor(&mu, &received, func(rs []string) bool { + for _, r := range rs { + if r == msg1 { + return true + } + } + return false + }, 2*time.Second) { + t.Fatalf("B did not decrypt pre-kick message %q; got %v", msg1, snapshot(&mu, &received)) + } + + // A kicks B (rotates K to a new epoch, re-sealed only for the owner). + if err := a.Kick(roomID, b.Endpoint().ID); err != nil { + t.Fatalf("A kick B: %v", err) + } + time.Sleep(150 * time.Millisecond) + + const msg2 = "secreto post-kick" + if err := a.Publish(roomID, []byte(msg2)); err != nil { + t.Fatalf("A publish msg2: %v", err) + } + + // Give B a chance to (fail to) decrypt; assert it never sees msg2. + time.Sleep(1 * time.Second) + for _, r := range snapshot(&mu, &received) { + if r == msg2 { + t.Fatalf("forward secrecy broken: B decrypted post-kick message %q", msg2) + } + } + + // Sanity: A itself can still decrypt at the new epoch (self-loopback via a fresh subscriber). + aSub := subscribeCollect(t, a, roomID) + defer aSub.sub.Unsubscribe() + time.Sleep(100 * time.Millisecond) + const msg3 = "owner-still-works" + if err := a.Publish(roomID, []byte(msg3)); err != nil { + t.Fatalf("A publish msg3: %v", err) + } + if !waitFor(&aSub.mu, &aSub.msgs, func(rs []string) bool { + for _, r := range rs { + if r == msg3 { + return true + } + } + return false + }, 2*time.Second) { + t.Fatalf("owner could not decrypt own message at new epoch; got %v", snapshot(&aSub.mu, &aSub.msgs)) + } +} + +// TestCleartextWorkerToChat validates the ModeNATS path: a publisher and a +// subscriber sharing a subject, no encryption, messages flow through verbatim. +func TestCleartextWorkerToChat(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + pub, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect pub: %v", err) + } + defer pub.Close() + subC, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect sub: %v", err) + } + defer subC.Close() + + const subject = "proc.test.ticks" + // Each peer owns a room mapped to the shared subject; NATS fans out by subject. + pubRoom, err := pub.CreateRoom(subject, room.ModeNATS) + if err != nil { + t.Fatalf("pub create room: %v", err) + } + subRoom, err := subC.CreateRoom(subject, room.ModeNATS) + if err != nil { + t.Fatalf("sub create room: %v", err) + } + + collector := subscribeCollect(t, subC, subRoom) + defer collector.sub.Unsubscribe() + time.Sleep(150 * time.Millisecond) + + const msg = "tick 1" + if err := pub.Publish(pubRoom, []byte(msg)); err != nil { + t.Fatalf("publish: %v", err) + } + if !waitFor(&collector.mu, &collector.msgs, func(rs []string) bool { + for _, r := range rs { + if r == msg { + return true + } + } + return false + }, 2*time.Second) { + t.Fatalf("subscriber did not receive cleartext message; got %v", snapshot(&collector.mu, &collector.msgs)) + } +} + +// TestMediaBlobRoundTrip validates encrypted media via the object store. +func TestMediaBlobRoundTrip(t *testing.T) { + h := newHarness(t) + waitHealth(t, h.ctrlURL) + + a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("room.media", room.ModeMatrix) + if err != nil { + t.Fatalf("create room: %v", err) + } + if err := a.Invite(roomID, b.Endpoint()); err != nil { + t.Fatalf("invite: %v", err) + } + if err := b.Join(roomID); err != nil { + t.Fatalf("join: %v", err) + } + + gotBlob := make(chan []byte, 1) + sub, err := b.Subscribe(roomID, func(f frame.Frame, _ []byte) { + if f.Blob == nil { + return + } + data, err := b.FetchMedia(roomID, f) + if err != nil { + return + } + gotBlob <- data + }) + if err != nil { + t.Fatalf("subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(150 * time.Millisecond) + + payload := []byte("a fake image payload that should be encrypted in the store") + if err := a.PublishMedia(roomID, payload); err != nil { + t.Fatalf("publish media: %v", err) + } + + select { + case got := <-gotBlob: + if string(got) != string(payload) { + t.Fatalf("media mismatch: got %q want %q", got, payload) + } + case <-time.After(2 * time.Second): + t.Fatalf("B never received/decrypted the media blob") + } +} + +// ---- test helpers --------------------------------------------------------- + +type collector struct { + mu sync.Mutex + msgs []string + sub interface{ Unsubscribe() error } +} + +func subscribeCollect(t *testing.T, c *client.Client, roomID string) *collector { + t.Helper() + col := &collector{} + sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { + col.mu.Lock() + col.msgs = append(col.msgs, string(plaintext)) + col.mu.Unlock() + }) + if err != nil { + t.Fatalf("subscribe: %v", err) + } + col.sub = sub + return col +} + +func waitFor(mu *sync.Mutex, slice *[]string, pred func([]string) bool, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + mu.Lock() + cp := append([]string(nil), (*slice)...) + mu.Unlock() + if pred(cp) { + return true + } + time.Sleep(25 * time.Millisecond) + } + return false +} + +func snapshot(mu *sync.Mutex, slice *[]string) []string { + mu.Lock() + defer mu.Unlock() + return append([]string(nil), (*slice)...) +} diff --git a/pkg/client/identity.go b/pkg/client/identity.go new file mode 100644 index 0000000..24c181b --- /dev/null +++ b/pkg/client/identity.go @@ -0,0 +1,103 @@ +package client + +import ( + "crypto/rand" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "path/filepath" + + cs "fn-registry/functions/cybersecurity" + + "github.com/oklog/ulid/v2" +) + +// newULID returns a fresh, lexicographically-sortable message id with +// crypto/rand entropy. +func newULID() string { + return ulid.MustNew(ulid.Now(), rand.Reader).String() +} + +// identityFile is the on-disk JSON representation of an Identity. The four key +// fields are base64-encoded. +// +// SECURITY: this file contains the peer's long-term PRIVATE keys (SignPriv and +// KexPriv). It is written 0600. Losing it means losing the ability to decrypt +// any message addressed to this endpoint — there is no recovery. Treat it like +// an SSH private key. (Hardening with OS keyrings/HSM is a later phase.) +type identityFile struct { + SignPub string `json:"sign_pub"` + SignPriv string `json:"sign_priv"` + KexPub string `json:"kex_pub"` + KexPriv string `json:"kex_priv"` +} + +// LoadOrCreateIdentity loads the identity at path, or generates and persists a +// new one if the file does not exist. The file is written with 0600 +// permissions because it holds private keys. +func LoadOrCreateIdentity(path string) (cs.Identity, error) { + if data, err := os.ReadFile(path); err == nil { + var f identityFile + if err := json.Unmarshal(data, &f); err != nil { + return cs.Identity{}, fmt.Errorf("client: parse identity %q: %w", path, err) + } + id, err := f.toIdentity() + if err != nil { + return cs.Identity{}, fmt.Errorf("client: decode identity %q: %w", path, err) + } + return id, nil + } + + id, err := cs.GenerateIdentity() + if err != nil { + return cs.Identity{}, fmt.Errorf("client: generate identity: %w", err) + } + if err := saveIdentity(path, id); err != nil { + return cs.Identity{}, err + } + return id, nil +} + +func saveIdentity(path string, id cs.Identity) error { + if dir := filepath.Dir(path); dir != "" { + if err := os.MkdirAll(dir, 0o755); err != nil { + return fmt.Errorf("client: mkdir for identity: %w", err) + } + } + f := identityFile{ + SignPub: base64.StdEncoding.EncodeToString(id.SignPub), + SignPriv: base64.StdEncoding.EncodeToString(id.SignPriv), + KexPub: base64.StdEncoding.EncodeToString(id.KexPub), + KexPriv: base64.StdEncoding.EncodeToString(id.KexPriv), + } + data, err := json.MarshalIndent(f, "", " ") + if err != nil { + return fmt.Errorf("client: marshal identity: %w", err) + } + if err := os.WriteFile(path, data, 0o600); err != nil { + return fmt.Errorf("client: write identity %q: %w", path, err) + } + return nil +} + +func (f identityFile) toIdentity() (cs.Identity, error) { + dec := func(s string) ([]byte, error) { return base64.StdEncoding.DecodeString(s) } + signPub, err := dec(f.SignPub) + if err != nil { + return cs.Identity{}, err + } + signPriv, err := dec(f.SignPriv) + if err != nil { + return cs.Identity{}, err + } + kexPub, err := dec(f.KexPub) + if err != nil { + return cs.Identity{}, err + } + kexPriv, err := dec(f.KexPriv) + if err != nil { + return cs.Identity{}, err + } + return cs.Identity{SignPub: signPub, SignPriv: signPriv, KexPub: kexPub, KexPriv: kexPriv}, nil +} diff --git a/pkg/embeddednats/embeddednats.go b/pkg/embeddednats/embeddednats.go new file mode 100644 index 0000000..8707ba3 --- /dev/null +++ b/pkg/embeddednats/embeddednats.go @@ -0,0 +1,48 @@ +// Package embeddednats starts an in-process NATS server with JetStream enabled. +// +// This lets the whole unibus stack run with `go run` without installing or +// managing a separate NATS deployment. In production, point clients at an +// external NATS via the --nats-url flag instead of using this. +package embeddednats + +import ( + "fmt" + "time" + + server "github.com/nats-io/nats-server/v2/server" +) + +// Start launches an embedded nats-server with JetStream enabled, listening on +// the given port and persisting JetStream state under storeDir. It blocks until +// the server is ready to accept connections (up to 5s) and returns the running +// server. The caller is responsible for calling Shutdown on it. +func Start(storeDir string, port int) (*server.Server, error) { + opts := &server.Options{ + JetStream: true, + StoreDir: storeDir, + Port: port, + DontListen: false, + // Keep the embedded server quiet by default; the host app logs the URLs. + NoLog: true, + NoSigs: true, + } + + ns, err := server.NewServer(opts) + if err != nil { + return nil, fmt.Errorf("embeddednats: new server: %w", err) + } + + go ns.Start() + + if !ns.ReadyForConnections(5 * time.Second) { + ns.Shutdown() + return nil, fmt.Errorf("embeddednats: server not ready for connections within 5s") + } + + return ns, nil +} + +// ClientURL returns a NATS connection URL for the running embedded server. +func ClientURL(ns *server.Server) string { + return ns.ClientURL() +} diff --git a/pkg/frame/frame.go b/pkg/frame/frame.go new file mode 100644 index 0000000..d70524e --- /dev/null +++ b/pkg/frame/frame.go @@ -0,0 +1,89 @@ +// Package frame defines the wire format of the unibus message bus. +// +// A Frame is the unit transported over NATS. It carries an envelope (type, +// subject, sender, message id, epoch) plus an optional payload that, in +// encrypted rooms, is an AEAD ciphertext. Frames may be signed end-to-end with +// Ed25519 so that any receiver can authenticate the sender without trusting the +// transport. +// +// v1 serializes frames as JSON for legibility and forward-compatibility. The +// canonical signing bytes are the JSON of the frame with Sig cleared, so that +// signature verification is independent of field ordering as long as the +// marshaler is deterministic (encoding/json emits struct fields in declaration +// order). +package frame + +import ( + "crypto/sha256" + "encoding/base64" + "encoding/json" +) + +// FrameType enumerates the kinds of frames that travel over the bus. +type FrameType uint8 + +const ( + // PUB is a published message to a room/subject. + PUB FrameType = iota + // INVITE announces a new member was invited (informational; the key + // distribution itself happens over the control plane). + INVITE + // JOIN announces a member joined a room. + JOIN + // LEAVE announces a member voluntarily left a room. + LEAVE + // KICK announces a member was removed from a room. + KICK + // ACK acknowledges receipt of a previous frame. + ACK +) + +// BlobRef references an out-of-band encrypted blob stored in the object store. +// The bus never carries blob bytes; only this reference travels over NATS. +type BlobRef struct { + Hash string `json:"h"` // sha256 hex of the ciphertext in the blob store + Nonce []byte `json:"n"` // AEAD nonce used to encrypt the blob + Size int64 `json:"sz"` // size in bytes of the ciphertext +} + +// Frame is the unit of transport on the unibus message bus. +type Frame struct { + Type FrameType `json:"t"` + Subject string `json:"s"` + Sender string `json:"from"` // endpoint id = EndpointID(signPub) + MsgID string `json:"id"` // ULID + Epoch int `json:"e"` // epoch of the room key K used to encrypt + Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only) + Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt) + Blob *BlobRef `json:"b,omitempty"` + Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes() +} + +// Marshal serializes the frame to its wire representation (JSON in v1). +func (f Frame) Marshal() ([]byte, error) { + return json.Marshal(f) +} + +// Unmarshal parses a wire representation back into a Frame. +func Unmarshal(b []byte) (Frame, error) { + var f Frame + err := json.Unmarshal(b, &f) + return f, err +} + +// EndpointID derives a stable, transport-agnostic endpoint identifier from an +// Ed25519 signing public key: base64url(sha256(signPub)), unpadded. +func EndpointID(signPub []byte) string { + sum := sha256.Sum256(signPub) + return base64.RawURLEncoding.EncodeToString(sum[:]) +} + +// SigningBytes returns the canonical bytes that are signed and verified. The +// signature covers the entire frame except the Sig field itself, so we clear +// Sig before marshaling. Errors are intentionally swallowed: the frame is a +// plain struct of JSON-serializable fields, so json.Marshal cannot fail here. +func (f Frame) SigningBytes() []byte { + f.Sig = nil + b, _ := json.Marshal(f) + return b +} diff --git a/pkg/frame/frame_test.go b/pkg/frame/frame_test.go new file mode 100644 index 0000000..12e121d --- /dev/null +++ b/pkg/frame/frame_test.go @@ -0,0 +1,72 @@ +package frame + +import ( + "bytes" + "testing" +) + +func TestMarshalUnmarshalRoundTrip(t *testing.T) { + orig := Frame{ + Type: PUB, + Subject: "room.general", + Sender: "abc123", + MsgID: "01J000000000000000000000", + Epoch: 3, + Nonce: []byte{1, 2, 3, 4}, + Payload: []byte("ciphertext-bytes"), + Blob: &BlobRef{Hash: "deadbeef", Nonce: []byte{9, 8, 7}, Size: 42}, + Sig: []byte{0xaa, 0xbb}, + } + + b, err := orig.Marshal() + if err != nil { + t.Fatalf("Marshal: %v", err) + } + got, err := Unmarshal(b) + if err != nil { + t.Fatalf("Unmarshal: %v", err) + } + + if got.Type != orig.Type || got.Subject != orig.Subject || got.Sender != orig.Sender || + got.MsgID != orig.MsgID || got.Epoch != orig.Epoch { + t.Fatalf("envelope mismatch: got %+v want %+v", got, orig) + } + if !bytes.Equal(got.Nonce, orig.Nonce) || !bytes.Equal(got.Payload, orig.Payload) || !bytes.Equal(got.Sig, orig.Sig) { + t.Fatalf("byte fields mismatch") + } + if got.Blob == nil || got.Blob.Hash != orig.Blob.Hash || got.Blob.Size != orig.Blob.Size || + !bytes.Equal(got.Blob.Nonce, orig.Blob.Nonce) { + t.Fatalf("blob ref mismatch: %+v", got.Blob) + } +} + +func TestEndpointIDDeterministic(t *testing.T) { + pub := []byte("some-ed25519-public-key-bytes-32") + a := EndpointID(pub) + b := EndpointID(pub) + if a != b { + t.Fatalf("EndpointID not deterministic: %q != %q", a, b) + } + if a == "" { + t.Fatalf("EndpointID returned empty string") + } + // Different inputs must produce different ids. + if EndpointID([]byte("other-key")) == a { + t.Fatalf("EndpointID collision for different inputs") + } +} + +func TestSigningBytesExcludesSig(t *testing.T) { + withSig := Frame{Type: PUB, Subject: "s", Sender: "x", MsgID: "id", Epoch: 1, Payload: []byte("p"), Sig: []byte{1, 2, 3}} + noSig := withSig + noSig.Sig = nil + + if !bytes.Equal(withSig.SigningBytes(), noSig.SigningBytes()) { + t.Fatalf("SigningBytes should be identical regardless of Sig field") + } + // And SigningBytes must not be affected by mutating Sig afterward (value receiver). + sb := withSig.SigningBytes() + if bytes.Contains(sb, []byte{1, 2, 3}) { + t.Fatalf("SigningBytes leaked the Sig bytes") + } +} diff --git a/pkg/membership/migrations/001_init.sql b/pkg/membership/migrations/001_init.sql new file mode 100644 index 0000000..a571908 --- /dev/null +++ b/pkg/membership/migrations/001_init.sql @@ -0,0 +1,32 @@ +-- 001_init.sql — initial schema for the unibus membership/key-distribution service. +-- Additive and idempotent: safe to apply repeatedly. Never modify this file; +-- schema changes go in new numbered migrations (see .claude/rules/db_migrations.md). + +CREATE TABLE IF NOT EXISTS rooms ( + room_id TEXT PRIMARY KEY, + subject TEXT NOT NULL, + key_epoch INTEGER NOT NULL DEFAULT 1, + encrypt INTEGER NOT NULL, + persist INTEGER NOT NULL, + sign_msgs INTEGER NOT NULL, + owner_endpoint TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS members ( + room_id TEXT NOT NULL, + endpoint TEXT NOT NULL, + role TEXT NOT NULL, + joined_at TEXT NOT NULL, + sign_pub BLOB NOT NULL, + kex_pub BLOB NOT NULL, + PRIMARY KEY (room_id, endpoint) +); + +CREATE TABLE IF NOT EXISTS room_keys ( + room_id TEXT NOT NULL, + epoch INTEGER NOT NULL, + endpoint TEXT NOT NULL, + sealed_key BLOB NOT NULL, + PRIMARY KEY (room_id, epoch, endpoint) +); diff --git a/pkg/membership/server.go b/pkg/membership/server.go new file mode 100644 index 0000000..7bff786 --- /dev/null +++ b/pkg/membership/server.go @@ -0,0 +1,342 @@ +package membership + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" +) + +// Server is the HTTP control plane: the authoritative source of room metadata, +// membership, and per-epoch sealed keys. The data plane (messages) is NATS. +// +// Auth model (v1): mutating endpoints require an Ed25519 signature from the +// room owner over the canonical bytes of the request (the request body with the +// "sig" field cleared). v1 trusts the internal network: there is no TLS, no +// rate limiting, and read endpoints (GET) are unauthenticated. Hardening +// (mTLS, capabilities, rate limits) is a later phase. +type Server struct { + store *Store + blobs *blobstore.Store + mux *http.ServeMux +} + +// NewServer wires the membership store and blob store into an http.Handler. +func NewServer(store *Store, blobs *blobstore.Store) *Server { + s := &Server{store: store, blobs: blobs, mux: http.NewServeMux()} + s.routes() + return s +} + +// ServeHTTP satisfies http.Handler. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } + +func (s *Server) routes() { + s.mux.HandleFunc("GET /healthz", s.handleHealth) + s.mux.HandleFunc("POST /rooms", s.handleCreateRoom) + s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite) + s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey) + s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers) + s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey) + s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom) + s.mux.HandleFunc("POST /blobs", s.handlePutBlob) + s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob) +} + +// ---- wire types ----------------------------------------------------------- + +type policyJSON struct { + Encrypt bool `json:"encrypt"` + Persist bool `json:"persist"` + SignMsgs bool `json:"sign_msgs"` +} + +type endpointJSON struct { + Endpoint string `json:"endpoint"` + SignPub []byte `json:"sign_pub"` + KexPub []byte `json:"kex_pub"` +} + +type createRoomReq struct { + Subject string `json:"subject"` + Policy policyJSON `json:"policy"` + Owner endpointJSON `json:"owner"` + SealedKeySelf []byte `json:"sealed_key_self"` +} + +type createRoomResp struct { + RoomID string `json:"room_id"` +} + +type inviteReq struct { + By string `json:"by"` // owner endpoint id + Sig []byte `json:"sig"` // Ed25519 over canonical(request with sig cleared) + Member endpointJSON `json:"member"` + SealedKey []byte `json:"sealed_key"` +} + +type keyResp struct { + Epoch int `json:"epoch"` + SealedKey []byte `json:"sealed_key"` +} + +type memberJSON struct { + Endpoint string `json:"endpoint"` + Role string `json:"role"` + SignPub []byte `json:"sign_pub"` + KexPub []byte `json:"kex_pub"` +} + +type roomResp struct { + Subject string `json:"subject"` + Epoch int `json:"epoch"` + Policy policyJSON `json:"policy"` +} + +type rekeyKey struct { + Endpoint string `json:"endpoint"` + SealedKey []byte `json:"sealed_key"` +} + +type rekeyReq struct { + By string `json:"by"` + Sig []byte `json:"sig"` + NewEpoch int `json:"new_epoch"` + Keys []rekeyKey `json:"keys"` + Remove []string `json:"remove"` +} + +type blobResp struct { + Hash string `json:"hash"` +} + +// ---- helpers -------------------------------------------------------------- + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeErr(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]string{"error": msg}) +} + +// canonicalSig returns the bytes to verify for a request: the request struct +// re-marshaled with its Sig field cleared. The caller passes a copy with Sig +// already zeroed. This is symmetric with how the client signs. +func canonicalSig(v any) []byte { + b, _ := json.Marshal(v) + return b +} + +// verifyOwnerSig checks that sig is a valid Ed25519 signature by the room owner +// over canonical(reqWithSigCleared). It returns the owner Member on success. +func (s *Server) verifyOwnerSig(roomID, by string, sig, canonical []byte) (Member, error) { + info, err := s.store.GetRoom(roomID) + if err != nil { + return Member{}, fmt.Errorf("room not found") + } + if by != info.OwnerEndpoint { + return Member{}, fmt.Errorf("requester %q is not the room owner", by) + } + owner, err := s.store.GetMember(roomID, by) + if err != nil { + return Member{}, fmt.Errorf("owner member not found") + } + if !cs.VerifyEd25519(owner.SignPub, canonical, sig) { + return Member{}, fmt.Errorf("invalid owner signature") + } + return owner, nil +} + +// ---- handlers ------------------------------------------------------------- + +func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) { + var req createRoomReq + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) + return + } + if req.Subject == "" || req.Owner.Endpoint == "" { + writeErr(w, http.StatusBadRequest, "subject and owner.endpoint required") + return + } + roomID := newULID() + info := RoomInfo{ + RoomID: roomID, + Subject: req.Subject, + Encrypt: req.Policy.Encrypt, + Persist: req.Policy.Persist, + SignMsgs: req.Policy.SignMsgs, + OwnerEndpoint: req.Owner.Endpoint, + } + if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusCreated, createRoomResp{RoomID: roomID}) +} + +func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) { + roomID := r.PathValue("id") + var req inviteReq + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) + return + } + // Canonical bytes = the request with Sig cleared. + sig := req.Sig + req.Sig = nil + if _, err := s.verifyOwnerSig(roomID, req.By, sig, canonicalSig(req)); err != nil { + writeErr(w, http.StatusForbidden, err.Error()) + return + } + info, err := s.store.GetRoom(roomID) + if err != nil { + writeErr(w, http.StatusNotFound, err.Error()) + return + } + m := Member{ + Endpoint: req.Member.Endpoint, + Role: "member", + SignPub: req.Member.SignPub, + KexPub: req.Member.KexPub, + } + if err := s.store.AddMember(roomID, m, info.Epoch, req.SealedKey); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "invited"}) +} + +func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) { + roomID := r.PathValue("id") + endpoint := r.URL.Query().Get("endpoint") + if endpoint == "" { + writeErr(w, http.StatusBadRequest, "endpoint query param required") + return + } + epoch := 0 + if e := r.URL.Query().Get("epoch"); e != "" { + if n, err := strconv.Atoi(e); err == nil { + epoch = n + } + } + ep, sealed, err := s.store.GetSealedKey(roomID, endpoint, epoch) + if err != nil { + writeErr(w, http.StatusNotFound, err.Error()) + return + } + writeJSON(w, http.StatusOK, keyResp{Epoch: ep, SealedKey: sealed}) +} + +func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) { + roomID := r.PathValue("id") + members, err := s.store.ListMembers(roomID) + if err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + out := make([]memberJSON, 0, len(members)) + for _, m := range members { + out = append(out, memberJSON{Endpoint: m.Endpoint, Role: m.Role, SignPub: m.SignPub, KexPub: m.KexPub}) + } + writeJSON(w, http.StatusOK, out) +} + +func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) { + roomID := r.PathValue("id") + info, err := s.store.GetRoom(roomID) + if err != nil { + writeErr(w, http.StatusNotFound, err.Error()) + return + } + writeJSON(w, http.StatusOK, roomResp{ + Subject: info.Subject, + Epoch: info.Epoch, + Policy: policyJSON{Encrypt: info.Encrypt, Persist: info.Persist, SignMsgs: info.SignMsgs}, + }) +} + +func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) { + roomID := r.PathValue("id") + var req rekeyReq + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) + return + } + sig := req.Sig + req.Sig = nil + if _, err := s.verifyOwnerSig(roomID, req.By, sig, canonicalSig(req)); err != nil { + writeErr(w, http.StatusForbidden, err.Error()) + return + } + if req.NewEpoch <= 0 { + writeErr(w, http.StatusBadRequest, "new_epoch must be > 0") + return + } + // Bump epoch, then store the fresh sealed keys for the remaining members, + // then remove the kicked/left members. + if err := s.store.BumpEpoch(roomID, req.NewEpoch); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + keys := make(map[string][]byte, len(req.Keys)) + for _, k := range req.Keys { + keys[k.Endpoint] = k.SealedKey + } + if len(keys) > 0 { + if err := s.store.PutSealedKeys(roomID, req.NewEpoch, keys); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + } + for _, ep := range req.Remove { + if err := s.store.RemoveMember(roomID, ep); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + } + writeJSON(w, http.StatusOK, map[string]any{"status": "rekeyed", "epoch": req.NewEpoch}) +} + +func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) { + data, err := io.ReadAll(r.Body) + if err != nil { + writeErr(w, http.StatusBadRequest, "read body: "+err.Error()) + return + } + hash, err := s.blobs.Put(data) + if err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, blobResp{Hash: hash}) +} + +func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) { + hash := r.PathValue("hash") + if strings.ContainsAny(hash, "/\\.") { + writeErr(w, http.StatusBadRequest, "invalid hash") + return + } + data, err := s.blobs.Get(hash) + if err != nil { + writeErr(w, http.StatusNotFound, err.Error()) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(data) +} diff --git a/pkg/membership/store.go b/pkg/membership/store.go new file mode 100644 index 0000000..dfe34c9 --- /dev/null +++ b/pkg/membership/store.go @@ -0,0 +1,289 @@ +// Package membership implements the authoritative control plane of unibus: +// room metadata, member directory, and per-epoch sealed room keys. +// +// The data plane (actual messages) is NATS; this package owns the SQLite-backed +// state that NATS cannot: who is in a room, what their public keys are, and the +// encrypted room key K each member needs to participate at a given epoch. +// +// Migrations are embedded and applied idempotently on Open. The embedded copy +// under pkg/membership/migrations mirrors the module-root migrations/ directory +// (kept in sync); both are additive-only per .claude/rules/db_migrations.md. +package membership + +import ( + "database/sql" + "embed" + "fmt" + "io/fs" + "sort" + "strings" + "time" + + // modernc.org/sqlite registers the pure-Go "sqlite" driver (no CGO). + _ "modernc.org/sqlite" +) + +//go:embed migrations/*.sql +var migrationsFS embed.FS + +// Member is a participant of a room with their published public keys. +type Member struct { + Endpoint string `json:"endpoint"` + Role string `json:"role"` + SignPub []byte `json:"sign_pub"` + KexPub []byte `json:"kex_pub"` +} + +// RoomInfo is the metadata of a room. +type RoomInfo struct { + RoomID string + Subject string + Epoch int + Encrypt bool + Persist bool + SignMsgs bool + OwnerEndpoint string +} + +// Store is the SQLite-backed membership/key store. +type Store struct { + db *sql.DB +} + +// Open opens (creating if needed) the SQLite database at path and applies all +// embedded migrations idempotently. +func Open(path string) (*Store, error) { + // _pragma busy_timeout avoids spurious "database is locked" under concurrent + // HTTP handlers; foreign_keys kept off — we manage referential integrity in code. + dsn := fmt.Sprintf("file:%s?_pragma=busy_timeout(5000)&_pragma=journal_mode(WAL)", path) + db, err := sql.Open("sqlite", dsn) + if err != nil { + return nil, fmt.Errorf("membership: open db: %w", err) + } + if err := db.Ping(); err != nil { + db.Close() + return nil, fmt.Errorf("membership: ping db: %w", err) + } + s := &Store{db: db} + if err := s.applyMigrations(); err != nil { + db.Close() + return nil, err + } + return s, nil +} + +// Close closes the underlying database. +func (s *Store) Close() error { return s.db.Close() } + +// applyMigrations runs every embedded migration in lexical order, tolerating +// the "already applied" errors that SQLite's non-idempotent DDL produces. +func (s *Store) applyMigrations() error { + files, err := fs.Glob(migrationsFS, "migrations/*.sql") + if err != nil { + return fmt.Errorf("membership: glob migrations: %w", err) + } + sort.Strings(files) + for _, f := range files { + b, err := migrationsFS.ReadFile(f) + if err != nil { + return fmt.Errorf("membership: read %s: %w", f, err) + } + if _, err := s.db.Exec(string(b)); err != nil { + msg := err.Error() + if !strings.Contains(msg, "duplicate column") && !strings.Contains(msg, "already exists") { + return fmt.Errorf("membership: apply %s: %w", f, err) + } + } + } + return nil +} + +func nowRFC3339() string { return time.Now().UTC().Format(time.RFC3339Nano) } + +// CreateRoom inserts a room at epoch 1, registers the owner as a member with +// role "owner", and stores the owner's sealed key for epoch 1. Idempotent +// inserts are not used: a duplicate room_id returns an error. +func (s *Store) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("membership: begin: %w", err) + } + defer tx.Rollback() + + now := nowRFC3339() + if _, err := tx.Exec( + `INSERT INTO rooms (room_id, subject, key_epoch, encrypt, persist, sign_msgs, owner_endpoint, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + info.RoomID, info.Subject, 1, + b2i(info.Encrypt), b2i(info.Persist), b2i(info.SignMsgs), + info.OwnerEndpoint, now, + ); err != nil { + return fmt.Errorf("membership: insert room: %w", err) + } + + if _, err := tx.Exec( + `INSERT INTO members (room_id, endpoint, role, joined_at, sign_pub, kex_pub) + VALUES (?, ?, 'owner', ?, ?, ?)`, + info.RoomID, info.OwnerEndpoint, now, ownerSignPub, ownerKexPub, + ); err != nil { + return fmt.Errorf("membership: insert owner member: %w", err) + } + + if info.Encrypt { + if _, err := tx.Exec( + `INSERT INTO room_keys (room_id, epoch, endpoint, sealed_key) VALUES (?, 1, ?, ?)`, + info.RoomID, info.OwnerEndpoint, ownerSealedKey, + ); err != nil { + return fmt.Errorf("membership: insert owner key: %w", err) + } + } + + return tx.Commit() +} + +// GetRoom returns room metadata (including current epoch). +func (s *Store) GetRoom(roomID string) (RoomInfo, error) { + var info RoomInfo + var enc, per, sgn int + err := s.db.QueryRow( + `SELECT room_id, subject, key_epoch, encrypt, persist, sign_msgs, owner_endpoint + FROM rooms WHERE room_id = ?`, roomID, + ).Scan(&info.RoomID, &info.Subject, &info.Epoch, &enc, &per, &sgn, &info.OwnerEndpoint) + if err != nil { + return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err) + } + info.Encrypt, info.Persist, info.SignMsgs = enc != 0, per != 0, sgn != 0 + return info, nil +} + +// AddMember inserts a member at the given role and stores their sealed key for +// the supplied epoch. +func (s *Store) AddMember(roomID string, m Member, epoch int, sealedKey []byte) error { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("membership: begin: %w", err) + } + defer tx.Rollback() + + now := nowRFC3339() + if _, err := tx.Exec( + `INSERT INTO members (room_id, endpoint, role, joined_at, sign_pub, kex_pub) + VALUES (?, ?, ?, ?, ?, ?)`, + roomID, m.Endpoint, m.Role, now, m.SignPub, m.KexPub, + ); err != nil { + return fmt.Errorf("membership: insert member: %w", err) + } + if len(sealedKey) > 0 { + if _, err := tx.Exec( + `INSERT INTO room_keys (room_id, epoch, endpoint, sealed_key) VALUES (?, ?, ?, ?)`, + roomID, epoch, m.Endpoint, sealedKey, + ); err != nil { + return fmt.Errorf("membership: insert member key: %w", err) + } + } + return tx.Commit() +} + +// GetMember returns a single member of a room. +func (s *Store) GetMember(roomID, endpoint string) (Member, error) { + var m Member + err := s.db.QueryRow( + `SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? AND endpoint = ?`, + roomID, endpoint, + ).Scan(&m.Endpoint, &m.Role, &m.SignPub, &m.KexPub) + if err != nil { + return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err) + } + return m, nil +} + +// ListMembers returns all members of a room ordered by endpoint. +func (s *Store) ListMembers(roomID string) ([]Member, error) { + rows, err := s.db.Query( + `SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? ORDER BY endpoint`, + roomID, + ) + if err != nil { + return nil, fmt.Errorf("membership: list members %q: %w", roomID, err) + } + defer rows.Close() + + var out []Member + for rows.Next() { + var m Member + if err := rows.Scan(&m.Endpoint, &m.Role, &m.SignPub, &m.KexPub); err != nil { + return nil, fmt.Errorf("membership: scan member: %w", err) + } + out = append(out, m) + } + return out, rows.Err() +} + +// GetSealedKey returns the sealed room key for an endpoint at a given epoch. +// If epoch <= 0, the latest epoch for that endpoint is returned. +func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) { + var ep int + var sealed []byte + var err error + if epoch <= 0 { + err = s.db.QueryRow( + `SELECT epoch, sealed_key FROM room_keys + WHERE room_id = ? AND endpoint = ? ORDER BY epoch DESC LIMIT 1`, + roomID, endpoint, + ).Scan(&ep, &sealed) + } else { + err = s.db.QueryRow( + `SELECT epoch, sealed_key FROM room_keys + WHERE room_id = ? AND endpoint = ? AND epoch = ?`, + roomID, endpoint, epoch, + ).Scan(&ep, &sealed) + } + if err != nil { + return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err) + } + return ep, sealed, nil +} + +// PutSealedKeys stores a batch of sealed keys for the given epoch (endpoint -> +// sealed bytes), upserting on conflict so a rekey can overwrite stale entries. +func (s *Store) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("membership: begin: %w", err) + } + defer tx.Rollback() + for endpoint, sealed := range keys { + if _, err := tx.Exec( + `INSERT INTO room_keys (room_id, epoch, endpoint, sealed_key) VALUES (?, ?, ?, ?) + ON CONFLICT(room_id, epoch, endpoint) DO UPDATE SET sealed_key = excluded.sealed_key`, + roomID, epoch, endpoint, sealed, + ); err != nil { + return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err) + } + } + return tx.Commit() +} + +// BumpEpoch sets the room's current key_epoch to newEpoch. +func (s *Store) BumpEpoch(roomID string, newEpoch int) error { + if _, err := s.db.Exec(`UPDATE rooms SET key_epoch = ? WHERE room_id = ?`, newEpoch, roomID); err != nil { + return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err) + } + return nil +} + +// RemoveMember deletes a member from a room. Their sealed keys for past epochs +// are left intact (they encrypt only data that member could already read). +func (s *Store) RemoveMember(roomID, endpoint string) error { + if _, err := s.db.Exec(`DELETE FROM members WHERE room_id = ? AND endpoint = ?`, roomID, endpoint); err != nil { + return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err) + } + return nil +} + +func b2i(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/pkg/membership/store_test.go b/pkg/membership/store_test.go new file mode 100644 index 0000000..1e88c9e --- /dev/null +++ b/pkg/membership/store_test.go @@ -0,0 +1,142 @@ +package membership + +import ( + "bytes" + "path/filepath" + "testing" +) + +func openTestStore(t *testing.T) *Store { + t.Helper() + path := filepath.Join(t.TempDir(), "test.db") + s, err := Open(path) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { s.Close() }) + return s +} + +func TestMigrationsCreateSchema(t *testing.T) { + s := openTestStore(t) + // Verify the three tables exist by querying sqlite_master. + for _, tbl := range []string{"rooms", "members", "room_keys"} { + var name string + err := s.db.QueryRow( + `SELECT name FROM sqlite_master WHERE type='table' AND name=?`, tbl, + ).Scan(&name) + if err != nil { + t.Fatalf("table %q not created: %v", tbl, err) + } + } + // Re-applying migrations must be idempotent (no error on a populated db). + if err := s.applyMigrations(); err != nil { + t.Fatalf("re-apply migrations: %v", err) + } +} + +func TestRoomMemberKeyRoundTrip(t *testing.T) { + s := openTestStore(t) + + owner := "owner-ep" + roomID := "room-1" + info := RoomInfo{ + RoomID: roomID, + Subject: "room.test", + Encrypt: true, + Persist: true, + SignMsgs: true, + OwnerEndpoint: owner, + } + ownerSealed := []byte("owner-sealed-key-epoch1") + if err := s.CreateRoom(info, []byte("owner-sign"), []byte("owner-kex"), ownerSealed); err != nil { + t.Fatalf("CreateRoom: %v", err) + } + + // GetRoom returns epoch 1 and the policy. + got, err := s.GetRoom(roomID) + if err != nil { + t.Fatalf("GetRoom: %v", err) + } + if got.Epoch != 1 || !got.Encrypt || !got.Persist || !got.SignMsgs || got.OwnerEndpoint != owner { + t.Fatalf("GetRoom mismatch: %+v", got) + } + + // Owner sealed key at epoch 1 (latest). + ep, sealed, err := s.GetSealedKey(roomID, owner, 0) + if err != nil { + t.Fatalf("GetSealedKey owner: %v", err) + } + if ep != 1 || !bytes.Equal(sealed, ownerSealed) { + t.Fatalf("owner sealed key mismatch: epoch=%d sealed=%q", ep, sealed) + } + + // Add member at epoch 1. + member := Member{Endpoint: "member-ep", Role: "member", SignPub: []byte("m-sign"), KexPub: []byte("m-kex")} + memberSealed := []byte("member-sealed-epoch1") + if err := s.AddMember(roomID, member, 1, memberSealed); err != nil { + t.Fatalf("AddMember: %v", err) + } + + gotMember, err := s.GetMember(roomID, "member-ep") + if err != nil { + t.Fatalf("GetMember: %v", err) + } + if gotMember.Role != "member" || !bytes.Equal(gotMember.SignPub, []byte("m-sign")) { + t.Fatalf("GetMember mismatch: %+v", gotMember) + } + + members, err := s.ListMembers(roomID) + if err != nil { + t.Fatalf("ListMembers: %v", err) + } + if len(members) != 2 { + t.Fatalf("expected 2 members, got %d", len(members)) + } + + // Bump to epoch 2 and store new keys only for the owner (simulating a kick of member-ep). + if err := s.BumpEpoch(roomID, 2); err != nil { + t.Fatalf("BumpEpoch: %v", err) + } + newKeys := map[string][]byte{owner: []byte("owner-sealed-epoch2")} + if err := s.PutSealedKeys(roomID, 2, newKeys); err != nil { + t.Fatalf("PutSealedKeys: %v", err) + } + if err := s.RemoveMember(roomID, "member-ep"); err != nil { + t.Fatalf("RemoveMember: %v", err) + } + + got, err = s.GetRoom(roomID) + if err != nil { + t.Fatalf("GetRoom after bump: %v", err) + } + if got.Epoch != 2 { + t.Fatalf("expected epoch 2, got %d", got.Epoch) + } + + // Owner now has a fresh sealed key at epoch 2 (latest). + ep, sealed, err = s.GetSealedKey(roomID, owner, 0) + if err != nil { + t.Fatalf("GetSealedKey owner epoch2: %v", err) + } + if ep != 2 || !bytes.Equal(sealed, []byte("owner-sealed-epoch2")) { + t.Fatalf("owner epoch2 key mismatch: epoch=%d sealed=%q", ep, sealed) + } + + // The removed member is gone. + if _, err := s.GetMember(roomID, "member-ep"); err == nil { + t.Fatalf("expected error getting removed member") + } + + // The kicked member has no key at epoch 2 (was excluded from the rekey). + if _, _, err := s.GetSealedKey(roomID, "member-ep", 2); err == nil { + t.Fatalf("kicked member should have no key at epoch 2") + } + members, err = s.ListMembers(roomID) + if err != nil { + t.Fatalf("ListMembers after remove: %v", err) + } + if len(members) != 1 || members[0].Endpoint != owner { + t.Fatalf("expected only owner remaining, got %+v", members) + } +} diff --git a/pkg/membership/ulid.go b/pkg/membership/ulid.go new file mode 100644 index 0000000..29a11cc --- /dev/null +++ b/pkg/membership/ulid.go @@ -0,0 +1,13 @@ +package membership + +import ( + "crypto/rand" + + "github.com/oklog/ulid/v2" +) + +// newULID returns a fresh, lexicographically-sortable unique id used for room +// ids. It uses crypto/rand entropy so ids are unguessable and collision-free. +func newULID() string { + return ulid.MustNew(ulid.Now(), rand.Reader).String() +} diff --git a/pkg/room/room.go b/pkg/room/room.go new file mode 100644 index 0000000..c75ce8b --- /dev/null +++ b/pkg/room/room.go @@ -0,0 +1,42 @@ +// Package room models the policy and identity of a unibus room. +// +// A room is a logical channel on the bus. Its Policy decides whether messages +// are encrypted end-to-end, persisted to history, and signed per-message. Two +// canonical policies are provided: +// +// - ModeNATS: cleartext, ephemeral, unsigned. The "plain NATS" experience — +// fast fan-out for telemetry, process coordination, and RPC where the +// transport boundary is already trusted. +// - ModeMatrix: encrypted, persisted, signed. The "Matrix-like" experience — +// E2E confidentiality with forward secrecy (key rotation on leave/kick), +// durable history, and per-message authorship signatures. +// +// Subject naming convention (the address space of the bus): +// +// proc.. process/worker telemetry & coordination (e.g. proc.test.ticks) +// rpc. request/reply endpoints (e.g. rpc.indexer) +// room. human/group chat rooms (e.g. room.general) +// agent..{in,out} LLM agent inbox/outbox (e.g. agent.scout.in) +package room + +// Policy controls how a room treats its messages. +type Policy struct { + Encrypt bool // payload is AEAD-encrypted with the room key K + Persist bool // messages are kept in durable history (JetStream) + SignMsgs bool // each message carries an Ed25519 signature over its canonical bytes +} + +// ModeNATS is cleartext, ephemeral, unsigned: plain NATS semantics. +var ModeNATS = Policy{Encrypt: false, Persist: false, SignMsgs: false} + +// ModeMatrix is encrypted, persisted, signed: Matrix-like E2E semantics. +var ModeMatrix = Policy{Encrypt: true, Persist: true, SignMsgs: true} + +// Room is the in-memory view of a room: its identity, transport subject, the +// current key epoch, and its policy. +type Room struct { + ID string + Subject string + Epoch int + Policy Policy +}