12 Commits

Author SHA1 Message Date
egutierrez 4dea99a524 chore: auto-commit (1 archivos)
- build/

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-14 23:55:18 +02:00
egutierrez 07978fc697 Merge branch 'issue/room-history-endpoint'
Server owns the JetStream stream of persisted rooms + GET /rooms/{id}/history so
clients without JetStream (uniweb) can read the backlog over plain HTTP.
2026-06-14 19:47:05 +02:00
egutierrez bf47511f2a docs(unibus): bump to 0.16.0; document server stream ownership + history endpoint
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 19:46:56 +02:00
egutierrez 73fd89f0b9 feat(membershipd): open JetStream for the embedded node + wire it into the server
The control plane previously opened a privileged JetStream client only when
clustered or running --store kv (needJS). It now also opens one for a standalone
single-node embedded deployment (openJS = needJS || embedded), because the
embedded NATS always ships JetStream and the server needs it to own persisted
rooms' durable streams (ensure on create + serve GET /rooms/{id}/history). An
external NATS without a cluster/KV feature is unchanged (no JetStream; history
degrades to empty).

The internal service identity is generated under the same broadened condition so
the in-process JetStream connection authenticates under enforce. After NewServer
the js context is wired via SetJetStream with the control-plane KV replication
factor, so a persisted room's history is as available as its metadata.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 19:46:56 +02:00
egutierrez e71063b16e feat(membership): server owns persisted rooms' stream + GET /rooms/{id}/history
The durable JetStream stream of a persisted (ModeMatrix) room was created only
by the Go client's first publish/subscribe. A client that speaks only core NATS
(the browser client uniweb, which has no JetStream) therefore never created it,
so its messages were captured nowhere and lost on reload. Move stream ownership
to the control plane and expose the backlog over plain HTTP.

- handleCreateRoom ensures the room's stream (idempotent CreateOrUpdateStream)
  BEFORE writing the room row, so the subject is captured from the first message
  whoever publishes it. Done before the store write so a stream failure leaves no
  orphan room. Skipped when no JetStream is wired (room still works, no history).
- New member-only GET /rooms/{id}/history?limit=N (default 200, hard cap 1000):
  reads the stream server-side via the modern jetstream API (Stream.Info +
  GetMsg by sequence, no consumer) and returns the last N frames oldest->newest
  as {"messages":[<base64-std of the marshaled frame>]}. The server never
  decrypts — it relays the E2E ciphertext bytes the stream already holds.
  Existence is checked first (404), then membership (403); enforce rejects an
  unsigned request with 401 before the handler runs.
- Lazy backfill: the history endpoint ensures the stream of a pre-existing
  persisted room, so it starts capturing from now on. Messages sent before the
  stream existed were never captured and are unrecoverable.
- The stream config (streamConfigForRoom) mirrors pkg/client/persist.go
  byte-for-byte plus Replicas (matched to the control-plane KV replication). It
  is copied rather than imported because pkg/client imports pkg/membership and
  the reverse would be an import cycle; the source of truth is documented in a
  comment.
- Server gains SetJetStream(js, replicas) to wire the privileged JetStream
  context and the room-stream replication factor.

Tests (history_test.go): golden (3 frames round-trip in order, decodable),
core-NATS capture (the central fix), handleCreateRoom creates the stream, limit,
empty room ([] not null), 401 unsigned, 403 non-member, 404 missing room.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 19:46:45 +02:00
egutierrez 3fdbb54353 Merge branch 'quick/directory-route-path'
fix: directory route registered as /directory (Caddy strips /api) — was 404 in prod
2026-06-14 16:05:06 +02:00
egutierrez 8c3ddaa294 fix(membership): register directory route as /directory, not /api/directory
Caddy strips /api via `handle_path /api/*` before forwarding to membershipd,
so the SPA's GET /api/directory arrives as GET /directory. The route was
registered with the /api prefix, so the stripped request hit no route and
returned 404 in production: the directory never resolved and uniweb fell back
to short ids. Every other control-plane route is registered without the prefix;
this aligns directory with them.

The unit test passed despite the bug because it requested /api/directory, the
same wrong path as the registration. Corrected the request paths to /directory
so the test now exercises the real production path (verified: reverting the
registration to /api/directory now makes TestDirectoryGolden fail with 404).

Bump 0.15.0 -> 0.15.1.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 16:05:00 +02:00
egutierrez e48b092135 Merge branch 'issue/names-bot-provisioning'
Integra GET /api/directory (endpoint->handle resolution) y el provisioning
one-command de bots (membershipd bot add).
2026-06-14 15:39:55 +02:00
egutierrez 0b39e86ed6 docs(unibus): bump to 0.15.0; document directory + bot provisioning
Add the 'Directorio de nombres (endpoint -> handle)' and 'Provisioning de bots /
unibots' sections with an end-to-end snippet, and a capability growth log entry
for v0.15.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 15:32:00 +02:00
egutierrez 669bad52af feat(membershipd): one-command bot provisioning (bot add)
Add `membershipd bot add --handle <name> --out <path> [--role] [--store]` to
provision a bus identity for an automated process in a single step: mint a fresh
Ed25519+X25519 identity (cs.GenerateIdentity, the same derivation worker/chat
use), register its signing key in the allowlist, and write the credentials to a
0600 file. The file is the canonical identity format read by client.LoadIdentity,
so a worker/clientcheck binary pointed at --out connects as the new user with no
extra conversion. Shares the sqlite/kv store plumbing with `user add`.

New exported pkg/client.WriteNewIdentity writes an identity in that format but
refuses to overwrite an existing file (never silently clobber private keys).

provisionBot ordering guarantees no half-provisioned bot: refuse an existing
--out before touching the store, register (an already-registered key is a clear
error, not a panic), then write credentials. Tests cover the golden path
(register + 0600 file + LoadIdentity round-trip), default role, the
already-registered error path (no file written), and the out-exists error path
(no orphan user).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 15:30:17 +02:00
egutierrez 2ba40701b2 feat(membership): add GET /api/directory for endpoint->handle resolution
Authenticated bus users (member or admin) can now map a sender's endpoint id
back to a readable handle. The endpoint is derived server-side from each user's
sign_pub with frame.EndpointID (base64url(sha256(signPub)), unpadded), matching
the bus's own construction byte-for-byte. Only active users are listed; under
enforce the existing auth middleware rejects an unauthenticated caller with 401.

Tests cover the golden path (two users -> 200 with handles + endpoints), the
auth contract (unsigned -> 401), revoked-user exclusion, and endpoint parity
against the cross-language vector from cmd/busvectors.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 15:27:38 +02:00
egutierrez 363aa97def Merge branch 'quick/proxy-ready' 2026-06-14 13:49:24 +02:00
10 changed files with 1303 additions and 4 deletions
+84 -1
View File
@@ -2,7 +2,7 @@
name: unibus name: unibus
lang: go lang: go
domain: infra domain: infra
version: 0.14.0 version: 0.16.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e] tags: [service, messaging, nats, e2e]
uses_functions: uses_functions:
@@ -158,6 +158,62 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
`cybersecurity` del registry compila limpio con `CGO_ENABLED=0`. NO requiere `cybersecurity` del registry compila limpio con `CGO_ENABLED=0`. NO requiere
`fts5` ni `gcc`. `fts5` ni `gcc`.
## Directorio de nombres (endpoint → handle)
Cada frame del bus lleva el **endpoint id** del remitente
(`base64url(sha256(signPub))`, sin padding — `frame.EndpointID`), no un nombre
legible. Para que un cliente muestre nombres en vez de hashes, el control-plane
expone la ruta del directorio. La SPA la llama como `GET /api/directory`, pero
Caddy hace `handle_path /api/*` y **stripea `/api`** antes de reenviar a
`membershipd`, así que el servidor la registra (como todas las rutas del
control-plane) SIN el prefijo: `GET /directory`:
- **Auth:** el mismo middleware de firma que el resto del control-plane
(cabeceras `X-Unibus-Pub/Ts/Nonce/Sig` sobre `CanonicalRequest`). NO es
admin-only: cualquier usuario activo del bus (member o admin) puede leerlo. En
modo `enforce`, una request sin firmar recibe 401 antes de llegar al handler.
- **Respuesta** `{ "members": [ { "sign_pub", "endpoint", "handle", "role" } ] }`,
solo usuarios `status=active`. El `endpoint` lo computa el servidor desde el
`sign_pub` con la misma derivación que el bus, así que casa byte a byte con el
sender id que el cliente ya tiene en cada mensaje.
- CORS: cubierto por la allowlist `--cors-origins` existente (mismas cabeceras
que el resto de rutas, sin caso especial).
## Provisioning de bots / unibots
Dar de alta una identidad para un proceso automatizado es **un solo comando**.
Antes había que derivar un keypair a mano y pasar el `sign_pub` a `user add`;
ahora `bot add` lo hace todo: mintea una identidad de bus fresca (Ed25519 +
X25519, la misma derivación `cs.GenerateIdentity` que usan `worker`/`chat`),
registra su `sign_pub` en el allowlist con `handle` y `role`, y escribe las
credenciales a un fichero 0600 que el proceso lee para conectar.
```bash
# 1. Provisionar el bot (store sqlite local; usa --store kv contra un cluster vivo).
membershipd bot add --handle notifier --out ./local_files/notifier.id
# provisioned bot "notifier" role=member
# sign_pub: 97d5a903...b1d4
# endpoint: HU85l2onjrK4EoTLoBfJVkGEXMw9LAjNEjPWiDS8YwM
# credentials: ./local_files/notifier.id (0600)
# 2. El proceso arranca como ese usuario leyendo el --out (formato canónico
# pkg/client.LoadIdentity, sin conversión): el worker demo lo consume directo.
worker --id-file ./local_files/notifier.id --nats-url nats://127.0.0.1:4250 \
--ctrl-url http://127.0.0.1:8470
# 3. (opcional) Verlo en el directorio / en user list.
membershipd user list
```
Las credenciales (`--out`) quedan en el fichero indicado, con permisos 0600. Es
el secreto del bot: contiene las claves privadas, trátalo como una clave SSH
(ver Gotcha "Identidad = secreto crítico"). `bot add` rehúsa sobrescribir un
`--out` existente, y registra al usuario ANTES de escribir el fichero, de modo
que un fallo nunca deja un bot a medias.
Flags: `--handle` y `--out` obligatorios; `--role admin|member` (default member);
`--store sqlite|kv` y el resto de flags de conexión idénticos a `user add`.
## Convención de subjects ## Convención de subjects
``` ```
@@ -169,6 +225,33 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log ## Capability growth log
- v0.16.0 (2026-06-14) — feat: el server asegura el stream JetStream de las rooms
persist + `GET /rooms/{id}/history` para que clientes sin JetStream (uniweb) lean
el histórico. (1) `handleCreateRoom` crea (idempotente, `CreateOrUpdateStream`) el
stream durable `UNIBUS_<roomID>` de una room persist ANTES de responder, así su
subject se captura desde el minuto cero venga el mensaje de un cliente Go o de un
cliente browser que solo habla core NATS (antes el stream lo creaba solo el cliente
Go, así que los mensajes de uniweb se perdían). (2) Nuevo endpoint member-only
`GET /rooms/{id}/history?limit=N` (default 200, cap 1000): lee el stream
server-side y devuelve `{messages:[<base64-std del frame marshalado>]}` en orden
oldest→newest; el server jamás descifra (relay del ciphertext E2E). Backfill de
rooms persist existentes: lazy-ensure del stream en el endpoint (empiezan a
capturar desde ahora; los mensajes previos al stream no son recuperables). El
control plane abre ahora su propio contexto JetStream también en single-node
embebido. Todo aditivo; build/vet/test verdes.
- v0.15.1 (2026-06-14) — fix: la ruta del directorio se registraba con prefijo /api y Caddy lo stripeaba (404 en prod); corregida a /directory.
- v0.15.0 (2026-06-14) — nombres legibles + provisioning de bots de un comando.
(1) Nuevo `GET /api/directory` en el control-plane: cualquier usuario activo del
bus (member o admin), autenticado con la misma firma Ed25519 que el resto de
rutas, resuelve endpoint id → handle. Devuelve `{members:[{sign_pub, endpoint,
handle, role}]}` solo de usuarios activos; el endpoint lo deriva el servidor con
`frame.EndpointID`, casando byte a byte con el sender id de cada frame (paridad
verificada contra el vector de `cmd/busvectors`). (2) Nuevo `membershipd bot add
--handle <name> --out <path> [--role] [--store]`: mintea identidad, la registra en
el allowlist y escribe credenciales 0600 en formato `client.LoadIdentity`, de modo
que un proceso (worker/clientcheck) conecta como ese usuario sin pasos manuales.
Nuevo helper exportado `pkg/client.WriteNewIdentity` (no sobrescribe ficheros
existentes). Todo aditivo; build/vet/test verdes.
- v0.14.0 (2026-06-13) — prep para el cliente browser-nativo `uniweb` (issue - v0.14.0 (2026-06-13) — prep para el cliente browser-nativo `uniweb` (issue
uniweb/0001, Fase 0), todo aditivo y opt-in: (1) el nats-server embebido puede uniweb/0001, Fase 0), todo aditivo y opt-in: (1) el nats-server embebido puede
exponer un listener WebSocket (`WebsocketConfig`) para que un navegador hable el exponer un listener WebSocket (`WebsocketConfig`) para que un navegador hable el
BIN
View File
Binary file not shown.
+159
View File
@@ -0,0 +1,159 @@
package main
import (
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
)
// runBotCLI implements `membershipd bot add ...`, one-command provisioning of a
// bus identity for an automated process. Where `user add` requires the operator
// to derive a keypair by hand and pass the public key, `bot add` mints the
// identity, registers its signing key in the allowlist, AND writes the bot's
// credentials to a 0600 file the process reads to connect — no manual key
// derivation, no second step. It shares the SQLite/KV store plumbing with the
// user CLI, so `--store kv` provisions against a live cluster the same way.
//
// Like the user CLI it never returns: it exits non-zero on error so it composes
// in shell scripts and systemd ExecStartPre hooks.
func runBotCLI(args []string) {
if len(args) == 0 {
botUsage()
os.Exit(2)
}
sub, rest := args[0], args[1:]
switch sub {
case "add":
botAdd(rest)
case "-h", "--help", "help":
botUsage()
os.Exit(0)
default:
fmt.Fprintf(os.Stderr, "membershipd bot: unknown subcommand %q\n\n", sub)
botUsage()
os.Exit(2)
}
}
func botUsage() {
fmt.Fprint(os.Stderr, `usage: membershipd bot add [flags]
Provision a bus identity for an automated process (a "unibot") in one command:
mint a fresh Ed25519+X25519 identity, register its signing key in the allowlist,
and write the credentials to a 0600 file the process loads to connect.
required flags:
--handle <name> human-readable name for the bot (shown in the directory)
--out <path> where to write the bot credentials (refused if it exists)
optional flags:
--role <role> admin or member (default member)
--store <kind> sqlite (local DB, default) | kv (the live cluster's allowlist)
--db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
--store kv flags (defaults assume an on-node invocation):
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
--kv-replicas <n> KV replication factor, match the cluster (default 3)
examples:
membershipd bot add --handle notifier --out ./local_files/notifier.id
membershipd bot add --store kv --handle relay --role member --out /opt/unibus/secrets/relay.id
The --out file is the canonical identity format read by the worker/clientcheck
clients (pkg/client.LoadIdentity), so the provisioned bot connects with no extra
conversion: point the process at it (e.g. worker --id-file <path>) and it joins
the bus as this user.
`)
}
func botAdd(args []string) {
fs := flag.NewFlagSet("bot add", flag.ExitOnError)
handle := fs.String("handle", "", "human-readable bot name (required)")
role := fs.String("role", membership.RoleMember, "role: admin or member")
out := fs.String("out", "", "path to write the bot credentials, 0600 (required)")
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
kf := registerKVFlags(fs)
_ = fs.Parse(args)
if *handle == "" || *out == "" {
fmt.Fprintln(os.Stderr, "membershipd bot add: --handle and --out are required")
os.Exit(2)
}
store, kv, closeStore := resolveStore("bot add", kf, *dbPath)
defer closeStore()
signPubHex, endpoint, err := provisionBot(store, *handle, *role, *out)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd bot add: %v\n", err)
os.Exit(1)
}
fmt.Printf("provisioned bot %q role=%s\n", *handle, *role)
fmt.Printf(" sign_pub: %s\n", signPubHex)
fmt.Printf(" endpoint: %s\n", endpoint)
fmt.Printf(" credentials: %s (0600)\n", *out)
if kv != nil {
reportKVReplication(kv.js)
}
}
// provisionBot mints a fresh bus identity and provisions it. It is the generating
// half; provisionBotWithIdentity does the registration + persistence so a test can
// inject a known identity (e.g. to exercise the already-registered error path).
func provisionBot(store membership.Store, handle, role, out string) (signPubHex, endpoint string, err error) {
id, err := cs.GenerateIdentity()
if err != nil {
return "", "", fmt.Errorf("generate bot identity: %w", err)
}
return provisionBotWithIdentity(store, id, handle, role, out)
}
// provisionBotWithIdentity registers id's signing key under handle/role and writes
// id's credentials to out. It returns the lowercase-hex signing key and the
// derived endpoint id.
//
// Ordering is deliberate so a failure never leaves a half-provisioned bot:
// 1. refuse if out already exists, BEFORE the store is touched (no orphan user);
// 2. register the user — an already-registered key is a clear error, not a panic;
// 3. only then write the 0600 credentials file.
//
// A write failure after a successful register is reported with the registered key
// so the operator can revoke it; this is the one residual non-atomic seam (a
// local admin command, acceptable per KISS).
func provisionBotWithIdentity(store membership.Store, id cs.Identity, handle, role, out string) (signPubHex, endpoint string, err error) {
if handle == "" || out == "" {
return "", "", fmt.Errorf("handle and out are required")
}
if role == "" {
role = membership.RoleMember
}
if _, statErr := os.Stat(out); statErr == nil {
return "", "", fmt.Errorf("out file %q already exists; refusing to overwrite bot credentials", out)
} else if !os.IsNotExist(statErr) {
return "", "", fmt.Errorf("stat out %q: %w", out, statErr)
}
signPubHex = hex.EncodeToString(id.SignPub)
endpoint = frame.EndpointID(id.SignPub)
if err := store.AddUser(signPubHex, handle, role); err != nil {
if errors.Is(err, membership.ErrUserExists) {
return "", "", fmt.Errorf("sign_pub %s already registered; revoke it first to replace", signPubHex)
}
return "", "", fmt.Errorf("register bot user: %w", err)
}
if err := client.WriteNewIdentity(out, id); err != nil {
return "", "", fmt.Errorf("write bot credentials to %q (user %s WAS registered — revoke it to retry): %w", out, signPubHex, err)
}
return signPubHex, endpoint, nil
}
+149
View File
@@ -0,0 +1,149 @@
package main
import (
"encoding/hex"
"os"
"path/filepath"
"testing"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
)
// openTestStore opens a fresh SQLite membership store in a temp dir.
func openTestStore(t *testing.T) membership.Store {
t.Helper()
store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
t.Cleanup(func() { store.Close() })
return store
}
// TestProvisionBotGolden is the happy path: provisioning a bot registers it in the
// allowlist with the right handle and role, AND writes a 0600 credentials file
// that LoadIdentity reconstructs into the same identity — so a worker/clientcheck
// binary pointed at the file connects as exactly this user with no extra step.
func TestProvisionBotGolden(t *testing.T) {
store := openTestStore(t)
out := filepath.Join(t.TempDir(), "notifier.id")
signPubHex, endpoint, err := provisionBot(store, "notifier", membership.RoleMember, out)
if err != nil {
t.Fatalf("provisionBot: %v", err)
}
// Registered in the allowlist with the right handle/role/status.
u, err := store.GetUser(signPubHex)
if err != nil {
t.Fatalf("get provisioned user: %v", err)
}
if u.Handle != "notifier" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
t.Fatalf("provisioned user row wrong: %+v", u)
}
// And it shows up in user list (the `user list` surface).
users, err := store.ListUsers()
if err != nil {
t.Fatalf("list users: %v", err)
}
found := false
for _, x := range users {
if x.SignPub == signPubHex {
found = true
}
}
if !found {
t.Fatalf("provisioned bot missing from user list: %+v", users)
}
// Credentials file exists, is 0600, and round-trips through LoadIdentity to the
// same signing key + endpoint (no-friction contract).
info, err := os.Stat(out)
if err != nil {
t.Fatalf("stat out file: %v", err)
}
if perm := info.Mode().Perm(); perm != 0o600 {
t.Fatalf("out file perms = %o, want 600", perm)
}
id, err := client.LoadIdentity(out)
if err != nil {
t.Fatalf("LoadIdentity(out): %v", err)
}
if got := hex.EncodeToString(id.SignPub); got != signPubHex {
t.Fatalf("loaded sign_pub %q != provisioned %q", got, signPubHex)
}
if got := frame.EndpointID(id.SignPub); got != endpoint {
t.Fatalf("loaded endpoint %q != reported %q", got, endpoint)
}
}
// TestProvisionBotDefaultRole: an empty role defaults to member.
func TestProvisionBotDefaultRole(t *testing.T) {
store := openTestStore(t)
out := filepath.Join(t.TempDir(), "bot.id")
signPubHex, _, err := provisionBot(store, "defrole", "", out)
if err != nil {
t.Fatalf("provisionBot: %v", err)
}
u, err := store.GetUser(signPubHex)
if err != nil {
t.Fatalf("get user: %v", err)
}
if u.Role != membership.RoleMember {
t.Fatalf("empty role should default to member, got %q", u.Role)
}
}
// TestProvisionBotSignPubAlreadyRegistered is the error path: provisioning an
// identity whose signing key is already in the allowlist fails with a clear error
// (not a panic) AND does not write a credentials file (no half-provisioned bot).
func TestProvisionBotSignPubAlreadyRegistered(t *testing.T) {
store := openTestStore(t)
// Pre-register a key, then try to provision a bot with that SAME identity.
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("generate identity: %v", err)
}
signPubHex := hex.EncodeToString(id.SignPub)
if err := store.AddUser(signPubHex, "preexisting", membership.RoleMember); err != nil {
t.Fatalf("pre-register: %v", err)
}
out := filepath.Join(t.TempDir(), "dup.id")
_, _, err = provisionBotWithIdentity(store, id, "dupbot", membership.RoleMember, out)
if err == nil {
t.Fatalf("provisioning an already-registered key should error")
}
if _, statErr := os.Stat(out); !os.IsNotExist(statErr) {
t.Fatalf("credentials file must NOT be written on a duplicate-key failure (stat err = %v)", statErr)
}
}
// TestProvisionBotOutExists is the other error path: an existing --out file is
// refused BEFORE the store is mutated, so the run leaves no orphan user behind.
func TestProvisionBotOutExists(t *testing.T) {
store := openTestStore(t)
out := filepath.Join(t.TempDir(), "taken.id")
if err := os.WriteFile(out, []byte("preexisting credentials"), 0o600); err != nil {
t.Fatalf("seed out file: %v", err)
}
_, _, err := provisionBot(store, "clobber", membership.RoleMember, out)
if err == nil {
t.Fatalf("provisioning over an existing out file should error")
}
// The store must be untouched: no user was registered.
users, err := store.ListUsers()
if err != nil {
t.Fatalf("list users: %v", err)
}
if len(users) != 0 {
t.Fatalf("no user should be registered when out exists, got %+v", users)
}
}
+29 -3
View File
@@ -47,6 +47,14 @@ func main() {
runMigrateCLI(os.Args[2:]) runMigrateCLI(os.Args[2:])
return return
} }
// `membershipd bot add ...` provisions a bus identity for an automated process
// in one command (mint identity + register + write 0600 credentials). It shares
// the same trusted-host model and store plumbing as the user CLI, so it is
// dispatched here before the server flag set parses os.Args.
if len(os.Args) > 1 && os.Args[1] == "bot" {
runBotCLI(os.Args[2:])
return
}
var ( var (
bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers") bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers")
@@ -142,6 +150,16 @@ func main() {
decentralized := *storeBackend == "kv" decentralized := *storeBackend == "kv"
needJS := clustered || decentralized needJS := clustered || decentralized
enforce := authMode == membership.AuthEnforce enforce := authMode == membership.AuthEnforce
embedded := *natsURL == ""
// The control plane also needs a privileged JetStream client to OWN the durable
// per-room streams of persisted rooms (ensure the stream on room creation so the
// subject is captured from the first message — even from a JetStream-less browser
// client — and read it back for GET /rooms/{id}/history). The embedded NATS
// always ships JetStream, so open the client whenever we run embedded, even for a
// standalone SQLite node. For an EXTERNAL NATS we only reach for JetStream when a
// cluster/KV feature explicitly requires it (unchanged), so an operator-managed
// external deployment without those features behaves exactly as before.
openJS := needJS || embedded
// Internal service identity (issue 0006a): when the embedded data plane enforces // Internal service identity (issue 0006a): when the embedded data plane enforces
// auth, membershipd must still connect to its OWN server to manage JetStream. // auth, membershipd must still connect to its OWN server to manage JetStream.
@@ -151,7 +169,7 @@ func main() {
// the server is embedded), so a standalone or non-enforce node is unchanged. // the server is embedded), so a standalone or non-enforce node is unchanged.
var internalID cs.Identity var internalID cs.Identity
var internalPubHex string var internalPubHex string
if needJS && enforce && *natsURL == "" { if openJS && enforce && embedded {
if *internalIDFile != "" { if *internalIDFile != "" {
// Persisted identity: load it, generating + writing it (0600) on first // Persisted identity: load it, generating + writing it (0600) on first
// start. A stable internal key is what `user add --store kv` presents to // start. A stable internal key is what `user add --store kv` presents to
@@ -308,9 +326,9 @@ func main() {
// only client that can connect in this window (the holder still denies everyone // only client that can connect in this window (the holder still denies everyone
// else; the internal identity bypasses the store). // else; the internal identity bypasses the store).
var js jetstream.JetStream var js jetstream.JetStream
if needJS { if openJS {
var internalNC *nats.Conn var internalNC *nats.Conn
if *natsURL == "" { if embedded {
internalNC, js, err = connectInternalJS(ns, internalID, enforce) internalNC, js, err = connectInternalJS(ns, internalID, enforce)
} else { } else {
internalNC, js, err = connectExternalJS(natsClientURL, *caFile) internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
@@ -332,6 +350,14 @@ func main() {
} }
srv := membership.NewServer(store, blobs, authMode) srv := membership.NewServer(store, blobs, authMode)
// Wire the privileged JetStream context so the control plane owns persisted
// rooms' durable streams (ensure on create + serve GET /rooms/{id}/history). The
// stream replication factor matches the control-plane KV replication so a room's
// history is as available as its metadata. js is nil only for an external NATS
// without a cluster/KV feature, where history degrades to empty (see openJS).
if js != nil {
srv.SetJetStream(js, *kvReplicas)
}
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS // On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
// has no per-subject ACL, so cleartext content would be readable by any // has no per-subject ACL, so cleartext content would be readable by any
// registered peer. Forcing E2E keeps message content confidential regardless // registered peer. Forcing E2E keeps message content confidential regardless
+16
View File
@@ -75,6 +75,22 @@ func LoadOrCreateIdentity(path string) (cs.Identity, error) {
return id, nil return id, nil
} }
// WriteNewIdentity writes id to path in the canonical identity-file format read
// by LoadIdentity, but REFUSES to overwrite an existing file: provisioning a new
// identity must never silently clobber another process's private keys. The file
// is created 0600 (it holds private keys). It is the write half of one-command
// bot provisioning (`membershipd bot add --out <path>`): the freshly minted
// identity it writes is exactly what LoadIdentity reconstructs, so a bot binary
// (worker/clientcheck) consumes the credentials with no extra conversion step.
func WriteNewIdentity(path string, id cs.Identity) error {
if _, err := os.Stat(path); err == nil {
return fmt.Errorf("client: identity file %q already exists; refusing to overwrite", path)
} else if !os.IsNotExist(err) {
return fmt.Errorf("client: stat identity %q: %w", path, err)
}
return saveIdentity(path, id)
}
func saveIdentity(path string, id cs.Identity) error { func saveIdentity(path string, id cs.Identity) error {
if dir := filepath.Dir(path); dir != "" { if dir := filepath.Dir(path); dir != "" {
if err := os.MkdirAll(dir, 0o755); err != nil { if err := os.MkdirAll(dir, 0o755); err != nil {
+155
View File
@@ -0,0 +1,155 @@
package membership
import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"net/http"
"testing"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/frame"
)
// directory signs a GET /directory as id and decodes the response envelope. The
// path has no /api prefix: Caddy strips /api before forwarding to membershipd, so
// the route is registered (and hit here) as /directory, matching production.
func directory(t *testing.T, h *authHarness, id cs.Identity, n int) (int, directoryResp) {
t.Helper()
code, body := signedJSON(t, h, "GET", "/directory", nil, id, n)
var resp directoryResp
if code == http.StatusOK {
if err := json.Unmarshal([]byte(body), &resp); err != nil {
t.Fatalf("decode directory: %v (%s)", err, body)
}
}
return code, resp
}
// findMember returns the directory entry for a signing key (case-insensitive).
func findMember(members []directoryMember, signPub string) (directoryMember, bool) {
want := normalizeSignPub(signPub)
for _, m := range members {
if normalizeSignPub(m.SignPub) == want {
return m, true
}
}
return directoryMember{}, false
}
// TestDirectoryGolden is the happy path: an authenticated bus user (here the seed
// admin alice, plus a registered member bob) reads the directory and gets every
// active user's handle, role, and an endpoint derived server-side from the
// sign_pub with the bus's own construction (frame.EndpointID). Two users in ->
// 200 with both handles and correct endpoints.
func TestDirectoryGolden(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
register(t, h, bob, "bob") // role member
bobPub := hex.EncodeToString(bob.SignPub)
code, resp := directory(t, h, h.alice, 1)
if code != http.StatusOK {
t.Fatalf("directory should be 200 for an authenticated user, got %d", code)
}
aliceRow, ok := findMember(resp.Members, h.alicePub)
if !ok {
t.Fatalf("seed admin alice missing from directory: %+v", resp.Members)
}
if aliceRow.Handle != "alice" || aliceRow.Role != RoleAdmin {
t.Fatalf("alice row wrong: %+v", aliceRow)
}
if want := frame.EndpointID(h.alice.SignPub); aliceRow.Endpoint != want {
t.Fatalf("alice endpoint = %q, want %q", aliceRow.Endpoint, want)
}
bobRow, ok := findMember(resp.Members, bobPub)
if !ok {
t.Fatalf("registered member bob missing from directory: %+v", resp.Members)
}
if bobRow.Handle != "bob" || bobRow.Role != RoleMember {
t.Fatalf("bob row wrong: %+v", bobRow)
}
if want := frame.EndpointID(bob.SignPub); bobRow.Endpoint != want {
t.Fatalf("bob endpoint = %q, want %q", bobRow.Endpoint, want)
}
}
// TestDirectoryUnauthenticatedRejected is the auth contract: under enforce an
// unsigned GET /directory is rejected with 401 by the middleware, before the
// handler ever runs — the directory is not public.
func TestDirectoryUnauthenticatedRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
req, _ := http.NewRequest("GET", h.ts.URL+"/directory", nil)
code, _ := do(t, req)
if code != http.StatusUnauthorized {
t.Fatalf("unsigned directory request under enforce should be 401, got %d", code)
}
}
// TestDirectoryExcludesRevoked: a revoked user must not appear in the directory
// (status=active filter), while active users still do.
func TestDirectoryExcludesRevoked(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
gone, _ := cs.GenerateIdentity()
register(t, h, gone, "gone")
gonePub := hex.EncodeToString(gone.SignPub)
if err := h.store.RevokeUser(gonePub); err != nil {
t.Fatalf("revoke gone: %v", err)
}
code, resp := directory(t, h, h.alice, 1)
if code != http.StatusOK {
t.Fatalf("directory should be 200, got %d", code)
}
if _, ok := findMember(resp.Members, gonePub); ok {
t.Fatalf("revoked user must not appear in directory: %+v", resp.Members)
}
if _, ok := findMember(resp.Members, h.alicePub); !ok {
t.Fatalf("active admin alice should still appear: %+v", resp.Members)
}
}
// TestDirectoryEndpointParity pins the server-side endpoint derivation to the
// cross-language parity vector emitted by cmd/busvectors (and consumed by the
// uniweb crypto.ts endpointID test): for a FIXED sign_pub the directory must
// return the exact base64url(sha256(signPub)) endpoint, byte-for-byte. The
// expected value is recomputed here independently of frame.EndpointID so the test
// fails if the handler ever diverges from the canonical construction.
func TestDirectoryEndpointParity(t *testing.T) {
// Vector from cmd/busvectors (seed 000102..1f -> Ed25519 public key).
const vectorSignPub = "03a107bff3ce10be1d70dd18e74bc09967e4d6309ba50d5f1ddc8664125531b8"
const vectorEndpoint = "Vkdap1RjR0wChd9dvyvKtz2mUTWIOem3dIGy6rEHcIw"
// Independent recomputation: base64url(sha256(raw signPub bytes)), unpadded.
raw, err := hex.DecodeString(vectorSignPub)
if err != nil {
t.Fatalf("decode vector sign_pub: %v", err)
}
sum := sha256.Sum256(raw)
if got := base64.RawURLEncoding.EncodeToString(sum[:]); got != vectorEndpoint {
t.Fatalf("vector self-check: recomputed endpoint %q != pinned %q", got, vectorEndpoint)
}
h := newAuthHarness(t, AuthEnforce)
if err := h.store.AddUser(vectorSignPub, "vectorbot", RoleMember); err != nil {
t.Fatalf("add vector user: %v", err)
}
code, resp := directory(t, h, h.alice, 1)
if code != http.StatusOK {
t.Fatalf("directory should be 200, got %d", code)
}
row, ok := findMember(resp.Members, vectorSignPub)
if !ok {
t.Fatalf("vector user missing from directory: %+v", resp.Members)
}
if row.Endpoint != vectorEndpoint {
t.Fatalf("endpoint parity broken: directory returned %q, want %q", row.Endpoint, vectorEndpoint)
}
}
+198
View File
@@ -0,0 +1,198 @@
package membership
// Server-side durable history for persisted rooms (room.ModeMatrix / Persist).
//
// A persisted room's messages ride a file-backed JetStream stream named
// "UNIBUS_<roomID>" (roomStreamName, identical to pkg/client.streamName). Until
// now that stream was created only by the Go client's first publish/subscribe; a
// client that speaks only core NATS (the browser client uniweb, which has no
// JetStream) therefore never created it, so its messages were captured nowhere and
// vanished on reload. This file moves stream ownership to the server: the control
// plane ensures the stream when a persisted room is created (so capture starts at
// minute zero whoever publishes) and exposes GET /rooms/{id}/history so a
// JetStream-less client can read the backlog over plain HTTP.
//
// The server never decrypts: each stored message is the E2E frame exactly as it
// was published (ciphertext for an encrypted room). The history endpoint returns
// those bytes verbatim (base64-encoded for JSON safety), so end-to-end encryption
// is preserved — the server only relays the bytes it already holds.
import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/nats-io/nats.go/jetstream"
)
const (
// defaultHistoryLimit is the number of most-recent messages returned when the
// caller does not specify ?limit.
defaultHistoryLimit = 200
// maxHistoryLimit is the hard ceiling on a single history response, so a caller
// cannot ask the server to buffer an unbounded backlog into one JSON payload.
maxHistoryLimit = 1000
// historyOpTimeout bounds each JetStream operation the history path performs
// (stream lookup/ensure, info, per-message get) so a stalled data plane cannot
// hang a control-plane request indefinitely.
historyOpTimeout = 5 * time.Second
)
// historyResp is the GET /rooms/{id}/history response envelope. messages is the
// ordered (oldest→newest) list of the room's most recent frames, each the base64
// (standard encoding) of the marshaled, still-encrypted frame as it was published.
// The key is a stable contract consumed by the browser client; do not rename it.
type historyResp struct {
Messages []string `json:"messages"`
}
// streamConfigForRoom builds the JetStream stream config for a persisted room.
//
// It MUST stay byte-for-byte compatible with pkg/client/persist.go's ensureStream
// (the original owner of this format): same name derivation (roomStreamName ==
// pkg/client.streamName), same single subject, LimitsPolicy retention, file
// storage. pkg/client is the source of truth for the format; we copy it here
// rather than import it because pkg/client imports pkg/membership and importing it
// back would be a cycle. The only addition is Replicas, matched to the cluster's
// control-plane replication so a persisted room's history is as available as its
// metadata (1 standalone, up to 3 in an HA cluster). CreateOrUpdateStream treats a
// matching config as a no-op, so the client's later ensureStream is harmless.
func streamConfigForRoom(roomID, subject string, replicas int) jetstream.StreamConfig {
if replicas < 1 {
replicas = 1
}
return jetstream.StreamConfig{
Name: roomStreamName(roomID),
Subjects: []string{subject},
Retention: jetstream.LimitsPolicy,
Storage: jetstream.FileStorage,
Replicas: replicas,
}
}
// ensureRoomStream idempotently creates (or no-ops on) the durable stream that
// captures a persisted room's subject. CreateOrUpdateStream returns the existing
// stream unchanged when the config matches, so this is safe to call on every room
// creation and on every history read (lazy backfill of pre-existing rooms).
func ensureRoomStream(ctx context.Context, js jetstream.JetStream, roomID, subject string, replicas int) error {
if _, err := js.CreateOrUpdateStream(ctx, streamConfigForRoom(roomID, subject, replicas)); err != nil {
return fmt.Errorf("membership: ensure stream for room %s: %w", roomID, err)
}
return nil
}
// readRoomHistory returns the last `limit` messages of a room's durable stream in
// chronological order (oldest→newest), each base64-encoded (standard encoding). A
// stream that does not exist yet, or that holds no messages, yields an empty slice
// (not an error): a freshly created or never-used room simply has no history. It
// reads by sequence via the stream MSG.GET API rather than binding a consumer, so
// it has no side effects on any peer's durable ack position. A gap in the sequence
// range (a purged/deleted message) is skipped rather than failing the whole read,
// so the result length is bounded by `limit` but may be smaller.
func readRoomHistory(ctx context.Context, js jetstream.JetStream, roomID string, limit int) ([]string, error) {
out := []string{}
stream, err := js.Stream(ctx, roomStreamName(roomID))
if err != nil {
if errors.Is(err, jetstream.ErrStreamNotFound) {
return out, nil
}
return nil, fmt.Errorf("membership: lookup stream for room %s: %w", roomID, err)
}
si, err := stream.Info(ctx)
if err != nil {
return nil, fmt.Errorf("membership: stream info for room %s: %w", roomID, err)
}
first, last := si.State.FirstSeq, si.State.LastSeq
if si.State.Msgs == 0 || last == 0 {
return out, nil
}
// Window of the last `limit` sequence numbers, clamped to the first stored seq.
// last >= limit guards the unsigned subtraction against underflow.
start := first
if last >= uint64(limit) {
if cand := last - uint64(limit) + 1; cand > start {
start = cand
}
}
for seq := start; seq <= last; seq++ {
raw, err := stream.GetMsg(ctx, seq)
if err != nil {
// A purged/deleted sequence leaves a gap; skip it rather than abort.
continue
}
out = append(out, base64.StdEncoding.EncodeToString(raw.Data))
}
return out, nil
}
// parseHistoryLimit reads the ?limit query value, applying the default when it is
// absent and clamping out-of-range / malformed values to [1, maxHistoryLimit].
func parseHistoryLimit(q string) int {
if q == "" {
return defaultHistoryLimit
}
n, err := strconv.Atoi(q)
if err != nil || n <= 0 {
return defaultHistoryLimit
}
if n > maxHistoryLimit {
return maxHistoryLimit
}
return n
}
// handleRoomHistory serves GET /rooms/{id}/history: the last ?limit (default 200,
// hard cap 1000) messages of a persisted room, oldest→newest, each the base64 of
// the still-encrypted frame as published. The server never decrypts — it relays
// the ciphertext bytes the stream already holds, preserving E2E.
//
// Authorization mirrors the sibling room reads (/key, /members): the request must
// be a member of the room (requireMember; allowed under AuthOff/dev where no signer
// is verified). A missing room is 404; a non-member is 403; an unsigned request
// under enforce is rejected with 401 by the auth middleware before this runs.
//
// For a persisted room the stream is ensured first (lazy backfill): a room created
// before the server managed streams begins capturing from now on. Messages sent
// before the stream existed were never captured and are unrecoverable — only
// messages from stream creation onward appear here.
func (s *Server) handleRoomHistory(w http.ResponseWriter, r *http.Request) {
roomID := r.PathValue("id")
// Existence first so a missing room is a clean 404 (the documented contract),
// distinct from a 403 for an existing room the caller is not a member of.
info, err := s.store.GetRoom(roomID)
if err != nil {
writeErr(w, http.StatusNotFound, "room not found")
return
}
if _, ok := s.requireMember(w, r, roomID); !ok {
return
}
limit := parseHistoryLimit(r.URL.Query().Get("limit"))
// No JetStream wired (e.g. an external-NATS deployment without a cluster/KV
// feature): there is no durable stream to read, so report an empty history
// rather than 500 — a client degrades to "no backlog" gracefully.
if s.js == nil {
writeJSON(w, http.StatusOK, historyResp{Messages: []string{}})
return
}
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
defer cancel()
if info.Persist {
if err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas); err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
}
msgs, err := readRoomHistory(ctx, s.js, roomID, limit)
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
writeJSON(w, http.StatusOK, historyResp{Messages: msgs})
}
+400
View File
@@ -0,0 +1,400 @@
package membership
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// historyHarness is an enforce-mode control plane wired to a real embedded NATS
// JetStream, so the history path exercises the production code: the server ensures
// and reads actual durable streams. alice is a seeded admin (and any room's owner),
// bob is a registered user added as a room member, and carol is a registered user
// that is NOT a member of the test room (to exercise the 403 path).
type historyHarness struct {
ts *httptest.Server
store Store
js jetstream.JetStream
nc *nats.Conn
alice cs.Identity // admin + room owner
bob cs.Identity // room member
carol cs.Identity // registered, non-member
}
func newHistoryHarness(t *testing.T) *historyHarness {
t.Helper()
dir := t.TempDir()
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: filepath.Join(dir, "jetstream"),
Host: "127.0.0.1",
Port: kvFreePort(t),
})
if err != nil {
t.Fatalf("embedded nats: %v", err)
}
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
ns.Shutdown()
t.Fatalf("nats connect: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("jetstream: %v", err)
}
store, err := Open(filepath.Join(dir, "unibus.db"))
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("open store: %v", err)
}
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
store.Close()
nc.Close()
ns.Shutdown()
t.Fatalf("open blobs: %v", err)
}
mustID := func(name string) cs.Identity {
id, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity %s: %v", name, err)
}
return id
}
alice, bob, carol := mustID("alice"), mustID("bob"), mustID("carol")
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", RoleAdmin); err != nil {
t.Fatalf("seed admin: %v", err)
}
for _, u := range []struct {
id cs.Identity
handle string
}{{bob, "bob"}, {carol, "carol"}} {
if err := store.AddUser(hex.EncodeToString(u.id.SignPub), u.handle, RoleMember); err != nil {
t.Fatalf("register %s: %v", u.handle, err)
}
}
srv := NewServer(store, blobs, AuthEnforce)
srv.SetJetStream(js, 1)
ts := httptest.NewServer(srv)
t.Cleanup(func() {
ts.Close()
store.Close()
nc.Close()
ns.Shutdown()
ns.WaitForShutdown()
})
return &historyHarness{ts: ts, store: store, js: js, nc: nc, alice: alice, bob: bob, carol: carol}
}
// seedPersistRoom creates a persisted (Matrix-policy) room directly in the store
// with alice as owner and bob as a member, returning its id and subject. It does
// NOT create the stream — that is left to the code under test (handleCreateRoom or
// the lazy ensure in the history endpoint), which is exactly what we want to verify.
func (h *historyHarness) seedPersistRoom(t *testing.T) (roomID, subject string) {
t.Helper()
roomID = newULID()
subject = "unibus.room." + roomID
aliceEp := frame.EndpointID(h.alice.SignPub)
info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true, Persist: true}
if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed")); err != nil {
t.Fatalf("seed room: %v", err)
}
bobEp := frame.EndpointID(h.bob.SignPub)
bobM := Member{Endpoint: bobEp, Role: RoleMember, SignPub: h.bob.SignPub, KexPub: h.bob.KexPub}
if err := h.store.AddMember(roomID, bobM, 0, []byte("bob-sealed")); err != nil {
t.Fatalf("add member bob: %v", err)
}
return roomID, subject
}
// makeFrame builds a marshaled PUB frame whose payload identifies it, so a test can
// assert exact bytes and ordering after a round trip through the stream + endpoint.
func makeFrame(t *testing.T, subject, sender string, i int) []byte {
t.Helper()
f := frame.Frame{
Type: frame.PUB,
Subject: subject,
Sender: sender,
MsgID: fmt.Sprintf("msg-%02d", i),
Payload: []byte(fmt.Sprintf("ciphertext-%02d", i)),
}
b, err := f.Marshal()
if err != nil {
t.Fatalf("marshal frame %d: %v", i, err)
}
return b
}
// getHistory signs a GET /rooms/{id}/history request as id and returns the status,
// the raw body, and the decoded envelope. query is the raw query string (e.g.
// "limit=2") or "". The signed path includes the query because the server verifies
// the signature over r.URL.RequestURI(), which carries it.
func (h *historyHarness) getHistory(t *testing.T, id cs.Identity, roomID, query string, n int) (int, string, historyResp) {
t.Helper()
path := "/rooms/" + roomID + "/history"
if query != "" {
path += "?" + query
}
req := signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n))
code, body := do(t, req)
var out historyResp
if code == 200 {
if err := json.Unmarshal([]byte(body), &out); err != nil {
t.Fatalf("decode history: %v (%s)", err, body)
}
}
return code, body, out
}
// TestCreateRoomEnsuresStream verifies handleCreateRoom creates the durable stream
// for a persisted room before responding, so capture starts at room creation.
func TestCreateRoomEnsuresStream(t *testing.T) {
h := newHistoryHarness(t)
aliceEp := frame.EndpointID(h.alice.SignPub)
reqBody := createRoomReq{
Subject: "unibus.room.created",
Policy: policyJSON{Encrypt: true, Persist: true},
Owner: endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub},
SealedKeySelf: []byte("alice-sealed"),
}
body, _ := json.Marshal(reqBody)
req := signedReq(t, h.ts.URL, "POST", "/rooms", body, h.alice, time.Now().Unix(), nonceN(1))
code, respBody := do(t, req)
if code != 201 {
t.Fatalf("create room: want 201, got %d (%s)", code, respBody)
}
var cr createRoomResp
if err := json.Unmarshal([]byte(respBody), &cr); err != nil {
t.Fatalf("decode create resp: %v (%s)", err, respBody)
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if _, err := h.js.Stream(ctx, roomStreamName(cr.RoomID)); err != nil {
t.Fatalf("stream for created persist room should exist: %v", err)
}
}
// TestRoomHistoryGolden is the golden path: three frames published to a persisted
// room's stream come back from the endpoint base64-encoded, in chronological order,
// and decode to the exact frames that were published.
func TestRoomHistoryGolden(t *testing.T) {
h := newHistoryHarness(t)
roomID, subject := h.seedPersistRoom(t)
bobEp := frame.EndpointID(h.bob.SignPub)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
t.Fatalf("ensure stream: %v", err)
}
want := make([][]byte, 3)
for i := 0; i < 3; i++ {
want[i] = makeFrame(t, subject, bobEp, i)
// js.Publish waits for the stream ack, so the message is durably stored before
// the next iteration — no sleeps, deterministic ordering.
if _, err := h.js.Publish(ctx, subject, want[i]); err != nil {
t.Fatalf("publish %d: %v", i, err)
}
}
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 10)
if code != 200 {
t.Fatalf("history: want 200, got %d (%s)", code, raw)
}
if len(hr.Messages) != 3 {
t.Fatalf("want 3 messages, got %d (%s)", len(hr.Messages), raw)
}
for i, m := range hr.Messages {
decoded, err := base64.StdEncoding.DecodeString(m)
if err != nil {
t.Fatalf("message %d not valid base64: %v", i, err)
}
if string(decoded) != string(want[i]) {
t.Fatalf("message %d bytes mismatch (order or content)", i)
}
f, err := frame.Unmarshal(decoded)
if err != nil {
t.Fatalf("message %d does not decode to a frame: %v", i, err)
}
if f.MsgID != fmt.Sprintf("msg-%02d", i) {
t.Fatalf("message %d: want MsgID msg-%02d, got %q", i, i, f.MsgID)
}
}
}
// TestRoomHistoryCapturesCoreNATSPublish proves the central fix: a message
// published over PLAIN core NATS (as the JetStream-less browser client uniweb does)
// is captured by the server-owned stream and served by the endpoint. Without the
// server ensuring the stream, this message would be captured nowhere.
func TestRoomHistoryCapturesCoreNATSPublish(t *testing.T) {
h := newHistoryHarness(t)
roomID, subject := h.seedPersistRoom(t)
bobEp := frame.EndpointID(h.bob.SignPub)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
t.Fatalf("ensure stream: %v", err)
}
sent := makeFrame(t, subject, bobEp, 7)
if err := h.nc.Publish(subject, sent); err != nil {
t.Fatalf("core publish: %v", err)
}
if err := h.nc.Flush(); err != nil {
t.Fatalf("flush: %v", err)
}
// Core NATS publish has no stream ack; poll the stream until the message lands.
h.waitMsgs(t, roomID, 1)
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 11)
if code != 200 {
t.Fatalf("history: want 200, got %d (%s)", code, raw)
}
if len(hr.Messages) != 1 {
t.Fatalf("want 1 captured message, got %d (%s)", len(hr.Messages), raw)
}
decoded, err := base64.StdEncoding.DecodeString(hr.Messages[0])
if err != nil || string(decoded) != string(sent) {
t.Fatalf("captured core-NATS message round-trip mismatch (err=%v)", err)
}
}
// TestRoomHistoryLimit verifies ?limit caps the response to the most recent N
// messages, oldest→newest within the window.
func TestRoomHistoryLimit(t *testing.T) {
h := newHistoryHarness(t)
roomID, subject := h.seedPersistRoom(t)
bobEp := frame.EndpointID(h.bob.SignPub)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
t.Fatalf("ensure stream: %v", err)
}
for i := 0; i < 5; i++ {
if _, err := h.js.Publish(ctx, subject, makeFrame(t, subject, bobEp, i)); err != nil {
t.Fatalf("publish %d: %v", i, err)
}
}
code, raw, hr := h.getHistory(t, h.bob, roomID, "limit=2", 12)
if code != 200 {
t.Fatalf("history: want 200, got %d (%s)", code, raw)
}
if len(hr.Messages) != 2 {
t.Fatalf("limit=2 over 5 messages: want 2, got %d", len(hr.Messages))
}
// The window is the last two messages (indices 3 and 4), in order.
for off, m := range hr.Messages {
decoded, _ := base64.StdEncoding.DecodeString(m)
f, err := frame.Unmarshal(decoded)
if err != nil {
t.Fatalf("limited message %d does not decode: %v", off, err)
}
want := fmt.Sprintf("msg-%02d", off+3)
if f.MsgID != want {
t.Fatalf("limited message %d: want MsgID %s, got %q", off, want, f.MsgID)
}
}
}
// TestRoomHistoryEmptyRoom verifies a persisted room with no messages returns an
// empty (non-null) array, lazily ensuring the stream on the way.
func TestRoomHistoryEmptyRoom(t *testing.T) {
h := newHistoryHarness(t)
roomID, _ := h.seedPersistRoom(t)
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 13)
if code != 200 {
t.Fatalf("history: want 200, got %d (%s)", code, raw)
}
if hr.Messages == nil {
t.Fatalf("empty room must return [] not null (%s)", raw)
}
if len(hr.Messages) != 0 {
t.Fatalf("empty room: want 0 messages, got %d", len(hr.Messages))
}
// The lazy ensure should have created the stream even though no message exists.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if _, err := h.js.Stream(ctx, roomStreamName(roomID)); err != nil {
t.Fatalf("lazy ensure should have created the stream: %v", err)
}
}
// TestRoomHistoryUnauthenticated verifies an unsigned request is rejected with 401
// under enforce, before the handler runs.
func TestRoomHistoryUnauthenticated(t *testing.T) {
h := newHistoryHarness(t)
roomID, _ := h.seedPersistRoom(t)
// No signing headers: plain GET against the enforce-mode control plane.
req, err := http.NewRequest("GET", h.ts.URL+"/rooms/"+roomID+"/history", nil)
if err != nil {
t.Fatalf("new request: %v", err)
}
code, body := do(t, req)
if code != 401 {
t.Fatalf("unauthenticated history: want 401, got %d (%s)", code, body)
}
}
// TestRoomHistoryNonMember verifies a registered user who is NOT a member of the
// room is rejected with 403.
func TestRoomHistoryNonMember(t *testing.T) {
h := newHistoryHarness(t)
roomID, _ := h.seedPersistRoom(t)
code, body, _ := h.getHistory(t, h.carol, roomID, "", 14)
if code != 403 {
t.Fatalf("non-member history: want 403, got %d (%s)", code, body)
}
}
// TestRoomHistoryRoomNotFound verifies a request for a non-existent room is a 404,
// distinct from the 403 a non-member of an existing room gets.
func TestRoomHistoryRoomNotFound(t *testing.T) {
h := newHistoryHarness(t)
code, body, _ := h.getHistory(t, h.alice, newULID(), "", 15)
if code != 404 {
t.Fatalf("missing room history: want 404, got %d (%s)", code, body)
}
}
// waitMsgs polls the room's stream until it holds at least want messages or a short
// deadline elapses, so a core-NATS publish (which carries no stream ack) is observed
// deterministically without a fixed sleep.
func (h *historyHarness) waitMsgs(t *testing.T, roomID string, want uint64) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
st, err := h.js.Stream(ctx, roomStreamName(roomID))
if err == nil {
si, ierr := st.Info(ctx)
if ierr == nil && si.State.Msgs >= want {
cancel()
return
}
}
cancel()
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("stream for room %s never reached %d message(s)", roomID, want)
}
+113
View File
@@ -3,6 +3,7 @@ package membership
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -108,6 +109,21 @@ type Server struct {
// the RemoteAddr-only behavior that predates the flag. Set by the command via // the RemoteAddr-only behavior that predates the flag. Set by the command via
// SetTrustedProxies. See clientIP. // SetTrustedProxies. See clientIP.
trustedProxies trustedProxyMatcher trustedProxies trustedProxyMatcher
// js is the privileged JetStream context the server uses to own the durable
// per-room streams of persisted rooms: it ensures a room's stream on creation
// so the room's subject is captured from the first message — even from a
// JetStream-less browser client (uniweb) that speaks only core NATS — and reads
// it back for GET /rooms/{id}/history. It is wired by the command via
// SetJetStream whenever a JetStream-capable data plane is available (always for
// the embedded server). nil leaves history empty and stream-ensure a no-op,
// preserving the pre-feature behavior for a deployment without JetStream.
js jetstream.JetStream
// streamReplicas is the replication factor for the room streams the server
// creates, matched to the cluster's control-plane (KV) replication — 1 for a
// standalone node, up to 3 in an HA cluster — so a persisted room's history is
// as available as its metadata. Used only when js != nil. See SetJetStream.
streamReplicas int
} }
// Posture describes the security posture a membershipd node runs with. It is // Posture describes the security posture a membershipd node runs with. It is
@@ -142,6 +158,19 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
return s return s
} }
// SetJetStream wires the privileged JetStream context (and the room-stream
// replication factor) the server uses to ensure and read the durable streams of
// persisted rooms. replicas below 1 is clamped to 1. It must be called once at
// startup, before the server begins serving; leaving it unset keeps history empty
// and stream-ensure a no-op, the behavior for a deployment without JetStream.
func (s *Server) SetJetStream(js jetstream.JetStream, replicas int) {
if replicas < 1 {
replicas = 1
}
s.js = js
s.streamReplicas = replicas
}
// UseReplicatedNonces switches the server's anti-replay store from the // UseReplicatedNonces switches the server's anti-replay store from the
// per-process in-memory cache to a JetStream KV bucket shared across the cluster // per-process in-memory cache to a JetStream KV bucket shared across the cluster
// (issue 0003e). It MUST be called on every node of a multi-node deployment: // (issue 0003e). It MUST be called on every node of a multi-node deployment:
@@ -402,6 +431,13 @@ func (s *Server) routes() {
s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite) s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite)
s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey) s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey)
s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers) s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers)
// Durable message history for a persisted room, read server-side from the room's
// JetStream stream so a client without JetStream (the browser client uniweb) can
// load the backlog over plain HTTP. Member-only, like /key and /members.
// Registered without the /api prefix like every other control-plane route: Caddy
// strips /api via handle_path /api/* before forwarding, so the SPA's
// GET /api/rooms/{id}/history arrives here as GET /rooms/{id}/history.
s.mux.HandleFunc("GET /rooms/{id}/history", s.handleRoomHistory)
s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms) s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms)
s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey) s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey)
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom) s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
@@ -414,6 +450,15 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /users", s.handleListUsers) s.mux.HandleFunc("GET /users", s.handleListUsers)
s.mux.HandleFunc("POST /users", s.handleAddUser) s.mux.HandleFunc("POST /users", s.handleAddUser)
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser) s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
// Member directory — any authenticated bus user (member or admin) may map an
// endpoint id back to its human handle, so clients can render readable sender
// names instead of raw endpoint hashes. Unlike /users it is NOT admin-only and
// returns only active users; under enforce the auth middleware already rejects
// an unauthenticated caller with 401 before this handler runs (uniweb/0002).
// Registered without the /api prefix like every other control-plane route:
// Caddy strips /api via handle_path /api/* before forwarding to membershipd,
// so the SPA's GET /api/directory arrives here as GET /directory.
s.mux.HandleFunc("GET /directory", s.handleDirectory)
} }
// ---- wire types ----------------------------------------------------------- // ---- wire types -----------------------------------------------------------
@@ -512,6 +557,24 @@ type addUserReq struct {
Role string `json:"role"` Role string `json:"role"`
} }
// directoryMember is one entry of the GET /directory response: enough for a
// client to map a message's endpoint id (which the bus stamps on every frame)
// back to a readable handle. endpoint is derived server-side from sign_pub with
// the SAME construction the bus uses (frame.EndpointID = base64url(sha256(signPub)),
// unpadded), so it matches the sender id a client already has byte-for-byte.
type directoryMember struct {
SignPub string `json:"sign_pub"`
Endpoint string `json:"endpoint"`
Handle string `json:"handle"`
Role string `json:"role"`
}
// directoryResp is the GET /directory response envelope. The members key is a
// stable contract consumed by the browser client; do not rename it.
type directoryResp struct {
Members []directoryMember `json:"members"`
}
// ---- helpers -------------------------------------------------------------- // ---- helpers --------------------------------------------------------------
func writeJSON(w http.ResponseWriter, code int, v any) { func writeJSON(w http.ResponseWriter, code int, v any) {
@@ -604,6 +667,21 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
SignMsgs: req.Policy.SignMsgs, SignMsgs: req.Policy.SignMsgs,
OwnerEndpoint: req.Owner.Endpoint, OwnerEndpoint: req.Owner.Endpoint,
} }
// Own the durable stream for a persisted room (issue room-history): ensure it
// BEFORE the room row is written so the subject is captured from the very first
// message whoever publishes it — a Go client OR a JetStream-less browser client.
// Done first so a stream failure aborts cleanly with no orphan room row (the
// rare orphan empty stream it can leave is harmless and idempotently reused).
// Skipped when no JetStream is wired: the room still works, just without history.
if info.Persist && s.js != nil {
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas)
cancel()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
}
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil { if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return return
@@ -857,6 +935,41 @@ func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, out) writeJSON(w, http.StatusOK, out)
} }
// handleDirectory returns the active bus user directory so a client can resolve a
// sender's endpoint id to a readable handle. Unlike handleListUsers it is NOT
// admin-only: every authenticated bus user may read it (the auth middleware has
// already verified the caller is an active user under enforce, and rejected an
// unauthenticated one with 401). Only active users are listed, and each endpoint
// is computed server-side from the user's sign_pub with frame.EndpointID — the
// exact derivation the bus stamps on every frame, so the returned endpoint matches
// the sender id a client already holds. A user with a malformed sign_pub (which
// the add path rejects, so this is defensive) is skipped rather than failing the
// whole listing.
func (s *Server) handleDirectory(w http.ResponseWriter, r *http.Request) {
users, err := s.store.ListUsers()
if err != nil {
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
return
}
out := make([]directoryMember, 0, len(users))
for _, u := range users {
if u.Status != StatusActive {
continue
}
signPub, err := hex.DecodeString(u.SignPub)
if err != nil || len(signPub) != 32 {
continue
}
out = append(out, directoryMember{
SignPub: u.SignPub,
Endpoint: frame.EndpointID(signPub),
Handle: u.Handle,
Role: u.Role,
})
}
writeJSON(w, http.StatusOK, directoryResp{Members: out})
}
// handleAddUser registers a new bus user from an admin-supplied Ed25519 signing // handleAddUser registers a new bus user from an admin-supplied Ed25519 signing
// key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the // key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the
// role must be admin or member (empty defaults to member), and re-adding an // role must be admin or member (empty defaults to member), and re-adding an