Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4dea99a524 | |||
| 07978fc697 | |||
| bf47511f2a | |||
| 73fd89f0b9 | |||
| e71063b16e | |||
| 3fdbb54353 | |||
| 8c3ddaa294 | |||
| e48b092135 | |||
| 0b39e86ed6 | |||
| 669bad52af | |||
| 2ba40701b2 | |||
| 363aa97def |
@@ -2,7 +2,7 @@
|
||||
name: unibus
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.14.0
|
||||
version: 0.16.0
|
||||
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
|
||||
tags: [service, messaging, nats, e2e]
|
||||
uses_functions:
|
||||
@@ -158,6 +158,62 @@ Para apuntar a un NATS externo en producción: `--nats-url nats://host:4222` en
|
||||
`cybersecurity` del registry compila limpio con `CGO_ENABLED=0`. NO requiere
|
||||
`fts5` ni `gcc`.
|
||||
|
||||
## Directorio de nombres (endpoint → handle)
|
||||
|
||||
Cada frame del bus lleva el **endpoint id** del remitente
|
||||
(`base64url(sha256(signPub))`, sin padding — `frame.EndpointID`), no un nombre
|
||||
legible. Para que un cliente muestre nombres en vez de hashes, el control-plane
|
||||
expone la ruta del directorio. La SPA la llama como `GET /api/directory`, pero
|
||||
Caddy hace `handle_path /api/*` y **stripea `/api`** antes de reenviar a
|
||||
`membershipd`, así que el servidor la registra (como todas las rutas del
|
||||
control-plane) SIN el prefijo: `GET /directory`:
|
||||
|
||||
- **Auth:** el mismo middleware de firma que el resto del control-plane
|
||||
(cabeceras `X-Unibus-Pub/Ts/Nonce/Sig` sobre `CanonicalRequest`). NO es
|
||||
admin-only: cualquier usuario activo del bus (member o admin) puede leerlo. En
|
||||
modo `enforce`, una request sin firmar recibe 401 antes de llegar al handler.
|
||||
- **Respuesta** `{ "members": [ { "sign_pub", "endpoint", "handle", "role" } ] }`,
|
||||
solo usuarios `status=active`. El `endpoint` lo computa el servidor desde el
|
||||
`sign_pub` con la misma derivación que el bus, así que casa byte a byte con el
|
||||
sender id que el cliente ya tiene en cada mensaje.
|
||||
- CORS: cubierto por la allowlist `--cors-origins` existente (mismas cabeceras
|
||||
que el resto de rutas, sin caso especial).
|
||||
|
||||
## Provisioning de bots / unibots
|
||||
|
||||
Dar de alta una identidad para un proceso automatizado es **un solo comando**.
|
||||
Antes había que derivar un keypair a mano y pasar el `sign_pub` a `user add`;
|
||||
ahora `bot add` lo hace todo: mintea una identidad de bus fresca (Ed25519 +
|
||||
X25519, la misma derivación `cs.GenerateIdentity` que usan `worker`/`chat`),
|
||||
registra su `sign_pub` en el allowlist con `handle` y `role`, y escribe las
|
||||
credenciales a un fichero 0600 que el proceso lee para conectar.
|
||||
|
||||
```bash
|
||||
# 1. Provisionar el bot (store sqlite local; usa --store kv contra un cluster vivo).
|
||||
membershipd bot add --handle notifier --out ./local_files/notifier.id
|
||||
# provisioned bot "notifier" role=member
|
||||
# sign_pub: 97d5a903...b1d4
|
||||
# endpoint: HU85l2onjrK4EoTLoBfJVkGEXMw9LAjNEjPWiDS8YwM
|
||||
# credentials: ./local_files/notifier.id (0600)
|
||||
|
||||
# 2. El proceso arranca como ese usuario leyendo el --out (formato canónico
|
||||
# pkg/client.LoadIdentity, sin conversión): el worker demo lo consume directo.
|
||||
worker --id-file ./local_files/notifier.id --nats-url nats://127.0.0.1:4250 \
|
||||
--ctrl-url http://127.0.0.1:8470
|
||||
|
||||
# 3. (opcional) Verlo en el directorio / en user list.
|
||||
membershipd user list
|
||||
```
|
||||
|
||||
Las credenciales (`--out`) quedan en el fichero indicado, con permisos 0600. Es
|
||||
el secreto del bot: contiene las claves privadas, trátalo como una clave SSH
|
||||
(ver Gotcha "Identidad = secreto crítico"). `bot add` rehúsa sobrescribir un
|
||||
`--out` existente, y registra al usuario ANTES de escribir el fichero, de modo
|
||||
que un fallo nunca deja un bot a medias.
|
||||
|
||||
Flags: `--handle` y `--out` obligatorios; `--role admin|member` (default member);
|
||||
`--store sqlite|kv` y el resto de flags de conexión idénticos a `user add`.
|
||||
|
||||
## Convención de subjects
|
||||
|
||||
```
|
||||
@@ -169,6 +225,33 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v0.16.0 (2026-06-14) — feat: el server asegura el stream JetStream de las rooms
|
||||
persist + `GET /rooms/{id}/history` para que clientes sin JetStream (uniweb) lean
|
||||
el histórico. (1) `handleCreateRoom` crea (idempotente, `CreateOrUpdateStream`) el
|
||||
stream durable `UNIBUS_<roomID>` de una room persist ANTES de responder, así su
|
||||
subject se captura desde el minuto cero venga el mensaje de un cliente Go o de un
|
||||
cliente browser que solo habla core NATS (antes el stream lo creaba solo el cliente
|
||||
Go, así que los mensajes de uniweb se perdían). (2) Nuevo endpoint member-only
|
||||
`GET /rooms/{id}/history?limit=N` (default 200, cap 1000): lee el stream
|
||||
server-side y devuelve `{messages:[<base64-std del frame marshalado>]}` en orden
|
||||
oldest→newest; el server jamás descifra (relay del ciphertext E2E). Backfill de
|
||||
rooms persist existentes: lazy-ensure del stream en el endpoint (empiezan a
|
||||
capturar desde ahora; los mensajes previos al stream no son recuperables). El
|
||||
control plane abre ahora su propio contexto JetStream también en single-node
|
||||
embebido. Todo aditivo; build/vet/test verdes.
|
||||
- v0.15.1 (2026-06-14) — fix: la ruta del directorio se registraba con prefijo /api y Caddy lo stripeaba (404 en prod); corregida a /directory.
|
||||
- v0.15.0 (2026-06-14) — nombres legibles + provisioning de bots de un comando.
|
||||
(1) Nuevo `GET /api/directory` en el control-plane: cualquier usuario activo del
|
||||
bus (member o admin), autenticado con la misma firma Ed25519 que el resto de
|
||||
rutas, resuelve endpoint id → handle. Devuelve `{members:[{sign_pub, endpoint,
|
||||
handle, role}]}` solo de usuarios activos; el endpoint lo deriva el servidor con
|
||||
`frame.EndpointID`, casando byte a byte con el sender id de cada frame (paridad
|
||||
verificada contra el vector de `cmd/busvectors`). (2) Nuevo `membershipd bot add
|
||||
--handle <name> --out <path> [--role] [--store]`: mintea identidad, la registra en
|
||||
el allowlist y escribe credenciales 0600 en formato `client.LoadIdentity`, de modo
|
||||
que un proceso (worker/clientcheck) conecta como ese usuario sin pasos manuales.
|
||||
Nuevo helper exportado `pkg/client.WriteNewIdentity` (no sobrescribe ficheros
|
||||
existentes). Todo aditivo; build/vet/test verdes.
|
||||
- v0.14.0 (2026-06-13) — prep para el cliente browser-nativo `uniweb` (issue
|
||||
uniweb/0001, Fase 0), todo aditivo y opt-in: (1) el nats-server embebido puede
|
||||
exponer un listener WebSocket (`WebsocketConfig`) para que un navegador hable el
|
||||
|
||||
Executable
BIN
Binary file not shown.
@@ -0,0 +1,159 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// runBotCLI implements `membershipd bot add ...`, one-command provisioning of a
|
||||
// bus identity for an automated process. Where `user add` requires the operator
|
||||
// to derive a keypair by hand and pass the public key, `bot add` mints the
|
||||
// identity, registers its signing key in the allowlist, AND writes the bot's
|
||||
// credentials to a 0600 file the process reads to connect — no manual key
|
||||
// derivation, no second step. It shares the SQLite/KV store plumbing with the
|
||||
// user CLI, so `--store kv` provisions against a live cluster the same way.
|
||||
//
|
||||
// Like the user CLI it never returns: it exits non-zero on error so it composes
|
||||
// in shell scripts and systemd ExecStartPre hooks.
|
||||
func runBotCLI(args []string) {
|
||||
if len(args) == 0 {
|
||||
botUsage()
|
||||
os.Exit(2)
|
||||
}
|
||||
sub, rest := args[0], args[1:]
|
||||
switch sub {
|
||||
case "add":
|
||||
botAdd(rest)
|
||||
case "-h", "--help", "help":
|
||||
botUsage()
|
||||
os.Exit(0)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "membershipd bot: unknown subcommand %q\n\n", sub)
|
||||
botUsage()
|
||||
os.Exit(2)
|
||||
}
|
||||
}
|
||||
|
||||
func botUsage() {
|
||||
fmt.Fprint(os.Stderr, `usage: membershipd bot add [flags]
|
||||
|
||||
Provision a bus identity for an automated process (a "unibot") in one command:
|
||||
mint a fresh Ed25519+X25519 identity, register its signing key in the allowlist,
|
||||
and write the credentials to a 0600 file the process loads to connect.
|
||||
|
||||
required flags:
|
||||
--handle <name> human-readable name for the bot (shown in the directory)
|
||||
--out <path> where to write the bot credentials (refused if it exists)
|
||||
|
||||
optional flags:
|
||||
--role <role> admin or member (default member)
|
||||
--store <kind> sqlite (local DB, default) | kv (the live cluster's allowlist)
|
||||
--db <path> SQLite database path (--store sqlite; default ./local_files/unibus.db)
|
||||
|
||||
--store kv flags (defaults assume an on-node invocation):
|
||||
--nats-url <url> cluster NATS (default nats://127.0.0.1:4250)
|
||||
--internal-id-file <path> persisted internal service identity (default /opt/unibus/secrets/internal.id)
|
||||
--ca <path> CA cert pinning the data-plane TLS (default /opt/unibus/tls/ca.crt)
|
||||
--kv-replicas <n> KV replication factor, match the cluster (default 3)
|
||||
|
||||
examples:
|
||||
membershipd bot add --handle notifier --out ./local_files/notifier.id
|
||||
membershipd bot add --store kv --handle relay --role member --out /opt/unibus/secrets/relay.id
|
||||
|
||||
The --out file is the canonical identity format read by the worker/clientcheck
|
||||
clients (pkg/client.LoadIdentity), so the provisioned bot connects with no extra
|
||||
conversion: point the process at it (e.g. worker --id-file <path>) and it joins
|
||||
the bus as this user.
|
||||
`)
|
||||
}
|
||||
|
||||
func botAdd(args []string) {
|
||||
fs := flag.NewFlagSet("bot add", flag.ExitOnError)
|
||||
handle := fs.String("handle", "", "human-readable bot name (required)")
|
||||
role := fs.String("role", membership.RoleMember, "role: admin or member")
|
||||
out := fs.String("out", "", "path to write the bot credentials, 0600 (required)")
|
||||
dbPath := fs.String("db", defaultDBPath, "SQLite database path")
|
||||
kf := registerKVFlags(fs)
|
||||
_ = fs.Parse(args)
|
||||
|
||||
if *handle == "" || *out == "" {
|
||||
fmt.Fprintln(os.Stderr, "membershipd bot add: --handle and --out are required")
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
store, kv, closeStore := resolveStore("bot add", kf, *dbPath)
|
||||
defer closeStore()
|
||||
|
||||
signPubHex, endpoint, err := provisionBot(store, *handle, *role, *out)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "membershipd bot add: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("provisioned bot %q role=%s\n", *handle, *role)
|
||||
fmt.Printf(" sign_pub: %s\n", signPubHex)
|
||||
fmt.Printf(" endpoint: %s\n", endpoint)
|
||||
fmt.Printf(" credentials: %s (0600)\n", *out)
|
||||
if kv != nil {
|
||||
reportKVReplication(kv.js)
|
||||
}
|
||||
}
|
||||
|
||||
// provisionBot mints a fresh bus identity and provisions it. It is the generating
|
||||
// half; provisionBotWithIdentity does the registration + persistence so a test can
|
||||
// inject a known identity (e.g. to exercise the already-registered error path).
|
||||
func provisionBot(store membership.Store, handle, role, out string) (signPubHex, endpoint string, err error) {
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("generate bot identity: %w", err)
|
||||
}
|
||||
return provisionBotWithIdentity(store, id, handle, role, out)
|
||||
}
|
||||
|
||||
// provisionBotWithIdentity registers id's signing key under handle/role and writes
|
||||
// id's credentials to out. It returns the lowercase-hex signing key and the
|
||||
// derived endpoint id.
|
||||
//
|
||||
// Ordering is deliberate so a failure never leaves a half-provisioned bot:
|
||||
// 1. refuse if out already exists, BEFORE the store is touched (no orphan user);
|
||||
// 2. register the user — an already-registered key is a clear error, not a panic;
|
||||
// 3. only then write the 0600 credentials file.
|
||||
//
|
||||
// A write failure after a successful register is reported with the registered key
|
||||
// so the operator can revoke it; this is the one residual non-atomic seam (a
|
||||
// local admin command, acceptable per KISS).
|
||||
func provisionBotWithIdentity(store membership.Store, id cs.Identity, handle, role, out string) (signPubHex, endpoint string, err error) {
|
||||
if handle == "" || out == "" {
|
||||
return "", "", fmt.Errorf("handle and out are required")
|
||||
}
|
||||
if role == "" {
|
||||
role = membership.RoleMember
|
||||
}
|
||||
if _, statErr := os.Stat(out); statErr == nil {
|
||||
return "", "", fmt.Errorf("out file %q already exists; refusing to overwrite bot credentials", out)
|
||||
} else if !os.IsNotExist(statErr) {
|
||||
return "", "", fmt.Errorf("stat out %q: %w", out, statErr)
|
||||
}
|
||||
|
||||
signPubHex = hex.EncodeToString(id.SignPub)
|
||||
endpoint = frame.EndpointID(id.SignPub)
|
||||
|
||||
if err := store.AddUser(signPubHex, handle, role); err != nil {
|
||||
if errors.Is(err, membership.ErrUserExists) {
|
||||
return "", "", fmt.Errorf("sign_pub %s already registered; revoke it first to replace", signPubHex)
|
||||
}
|
||||
return "", "", fmt.Errorf("register bot user: %w", err)
|
||||
}
|
||||
if err := client.WriteNewIdentity(out, id); err != nil {
|
||||
return "", "", fmt.Errorf("write bot credentials to %q (user %s WAS registered — revoke it to retry): %w", out, signPubHex, err)
|
||||
}
|
||||
return signPubHex, endpoint, nil
|
||||
}
|
||||
@@ -0,0 +1,149 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
)
|
||||
|
||||
// openTestStore opens a fresh SQLite membership store in a temp dir.
|
||||
func openTestStore(t *testing.T) membership.Store {
|
||||
t.Helper()
|
||||
store, err := membership.Open(filepath.Join(t.TempDir(), "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
return store
|
||||
}
|
||||
|
||||
// TestProvisionBotGolden is the happy path: provisioning a bot registers it in the
|
||||
// allowlist with the right handle and role, AND writes a 0600 credentials file
|
||||
// that LoadIdentity reconstructs into the same identity — so a worker/clientcheck
|
||||
// binary pointed at the file connects as exactly this user with no extra step.
|
||||
func TestProvisionBotGolden(t *testing.T) {
|
||||
store := openTestStore(t)
|
||||
out := filepath.Join(t.TempDir(), "notifier.id")
|
||||
|
||||
signPubHex, endpoint, err := provisionBot(store, "notifier", membership.RoleMember, out)
|
||||
if err != nil {
|
||||
t.Fatalf("provisionBot: %v", err)
|
||||
}
|
||||
|
||||
// Registered in the allowlist with the right handle/role/status.
|
||||
u, err := store.GetUser(signPubHex)
|
||||
if err != nil {
|
||||
t.Fatalf("get provisioned user: %v", err)
|
||||
}
|
||||
if u.Handle != "notifier" || u.Role != membership.RoleMember || u.Status != membership.StatusActive {
|
||||
t.Fatalf("provisioned user row wrong: %+v", u)
|
||||
}
|
||||
|
||||
// And it shows up in user list (the `user list` surface).
|
||||
users, err := store.ListUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("list users: %v", err)
|
||||
}
|
||||
found := false
|
||||
for _, x := range users {
|
||||
if x.SignPub == signPubHex {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("provisioned bot missing from user list: %+v", users)
|
||||
}
|
||||
|
||||
// Credentials file exists, is 0600, and round-trips through LoadIdentity to the
|
||||
// same signing key + endpoint (no-friction contract).
|
||||
info, err := os.Stat(out)
|
||||
if err != nil {
|
||||
t.Fatalf("stat out file: %v", err)
|
||||
}
|
||||
if perm := info.Mode().Perm(); perm != 0o600 {
|
||||
t.Fatalf("out file perms = %o, want 600", perm)
|
||||
}
|
||||
id, err := client.LoadIdentity(out)
|
||||
if err != nil {
|
||||
t.Fatalf("LoadIdentity(out): %v", err)
|
||||
}
|
||||
if got := hex.EncodeToString(id.SignPub); got != signPubHex {
|
||||
t.Fatalf("loaded sign_pub %q != provisioned %q", got, signPubHex)
|
||||
}
|
||||
if got := frame.EndpointID(id.SignPub); got != endpoint {
|
||||
t.Fatalf("loaded endpoint %q != reported %q", got, endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProvisionBotDefaultRole: an empty role defaults to member.
|
||||
func TestProvisionBotDefaultRole(t *testing.T) {
|
||||
store := openTestStore(t)
|
||||
out := filepath.Join(t.TempDir(), "bot.id")
|
||||
signPubHex, _, err := provisionBot(store, "defrole", "", out)
|
||||
if err != nil {
|
||||
t.Fatalf("provisionBot: %v", err)
|
||||
}
|
||||
u, err := store.GetUser(signPubHex)
|
||||
if err != nil {
|
||||
t.Fatalf("get user: %v", err)
|
||||
}
|
||||
if u.Role != membership.RoleMember {
|
||||
t.Fatalf("empty role should default to member, got %q", u.Role)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProvisionBotSignPubAlreadyRegistered is the error path: provisioning an
|
||||
// identity whose signing key is already in the allowlist fails with a clear error
|
||||
// (not a panic) AND does not write a credentials file (no half-provisioned bot).
|
||||
func TestProvisionBotSignPubAlreadyRegistered(t *testing.T) {
|
||||
store := openTestStore(t)
|
||||
|
||||
// Pre-register a key, then try to provision a bot with that SAME identity.
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("generate identity: %v", err)
|
||||
}
|
||||
signPubHex := hex.EncodeToString(id.SignPub)
|
||||
if err := store.AddUser(signPubHex, "preexisting", membership.RoleMember); err != nil {
|
||||
t.Fatalf("pre-register: %v", err)
|
||||
}
|
||||
|
||||
out := filepath.Join(t.TempDir(), "dup.id")
|
||||
_, _, err = provisionBotWithIdentity(store, id, "dupbot", membership.RoleMember, out)
|
||||
if err == nil {
|
||||
t.Fatalf("provisioning an already-registered key should error")
|
||||
}
|
||||
if _, statErr := os.Stat(out); !os.IsNotExist(statErr) {
|
||||
t.Fatalf("credentials file must NOT be written on a duplicate-key failure (stat err = %v)", statErr)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProvisionBotOutExists is the other error path: an existing --out file is
|
||||
// refused BEFORE the store is mutated, so the run leaves no orphan user behind.
|
||||
func TestProvisionBotOutExists(t *testing.T) {
|
||||
store := openTestStore(t)
|
||||
out := filepath.Join(t.TempDir(), "taken.id")
|
||||
if err := os.WriteFile(out, []byte("preexisting credentials"), 0o600); err != nil {
|
||||
t.Fatalf("seed out file: %v", err)
|
||||
}
|
||||
|
||||
_, _, err := provisionBot(store, "clobber", membership.RoleMember, out)
|
||||
if err == nil {
|
||||
t.Fatalf("provisioning over an existing out file should error")
|
||||
}
|
||||
// The store must be untouched: no user was registered.
|
||||
users, err := store.ListUsers()
|
||||
if err != nil {
|
||||
t.Fatalf("list users: %v", err)
|
||||
}
|
||||
if len(users) != 0 {
|
||||
t.Fatalf("no user should be registered when out exists, got %+v", users)
|
||||
}
|
||||
}
|
||||
+29
-3
@@ -47,6 +47,14 @@ func main() {
|
||||
runMigrateCLI(os.Args[2:])
|
||||
return
|
||||
}
|
||||
// `membershipd bot add ...` provisions a bus identity for an automated process
|
||||
// in one command (mint identity + register + write 0600 credentials). It shares
|
||||
// the same trusted-host model and store plumbing as the user CLI, so it is
|
||||
// dispatched here before the server flag set parses os.Args.
|
||||
if len(os.Args) > 1 && os.Args[1] == "bot" {
|
||||
runBotCLI(os.Args[2:])
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers")
|
||||
@@ -142,6 +150,16 @@ func main() {
|
||||
decentralized := *storeBackend == "kv"
|
||||
needJS := clustered || decentralized
|
||||
enforce := authMode == membership.AuthEnforce
|
||||
embedded := *natsURL == ""
|
||||
// The control plane also needs a privileged JetStream client to OWN the durable
|
||||
// per-room streams of persisted rooms (ensure the stream on room creation so the
|
||||
// subject is captured from the first message — even from a JetStream-less browser
|
||||
// client — and read it back for GET /rooms/{id}/history). The embedded NATS
|
||||
// always ships JetStream, so open the client whenever we run embedded, even for a
|
||||
// standalone SQLite node. For an EXTERNAL NATS we only reach for JetStream when a
|
||||
// cluster/KV feature explicitly requires it (unchanged), so an operator-managed
|
||||
// external deployment without those features behaves exactly as before.
|
||||
openJS := needJS || embedded
|
||||
|
||||
// Internal service identity (issue 0006a): when the embedded data plane enforces
|
||||
// auth, membershipd must still connect to its OWN server to manage JetStream.
|
||||
@@ -151,7 +169,7 @@ func main() {
|
||||
// the server is embedded), so a standalone or non-enforce node is unchanged.
|
||||
var internalID cs.Identity
|
||||
var internalPubHex string
|
||||
if needJS && enforce && *natsURL == "" {
|
||||
if openJS && enforce && embedded {
|
||||
if *internalIDFile != "" {
|
||||
// Persisted identity: load it, generating + writing it (0600) on first
|
||||
// start. A stable internal key is what `user add --store kv` presents to
|
||||
@@ -308,9 +326,9 @@ func main() {
|
||||
// only client that can connect in this window (the holder still denies everyone
|
||||
// else; the internal identity bypasses the store).
|
||||
var js jetstream.JetStream
|
||||
if needJS {
|
||||
if openJS {
|
||||
var internalNC *nats.Conn
|
||||
if *natsURL == "" {
|
||||
if embedded {
|
||||
internalNC, js, err = connectInternalJS(ns, internalID, enforce)
|
||||
} else {
|
||||
internalNC, js, err = connectExternalJS(natsClientURL, *caFile)
|
||||
@@ -332,6 +350,14 @@ func main() {
|
||||
}
|
||||
|
||||
srv := membership.NewServer(store, blobs, authMode)
|
||||
// Wire the privileged JetStream context so the control plane owns persisted
|
||||
// rooms' durable streams (ensure on create + serve GET /rooms/{id}/history). The
|
||||
// stream replication factor matches the control-plane KV replication so a room's
|
||||
// history is as available as its metadata. js is nil only for an external NATS
|
||||
// without a cluster/KV feature, where history degrades to empty (see openJS).
|
||||
if js != nil {
|
||||
srv.SetJetStream(js, *kvReplicas)
|
||||
}
|
||||
// On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS
|
||||
// has no per-subject ACL, so cleartext content would be readable by any
|
||||
// registered peer. Forcing E2E keeps message content confidential regardless
|
||||
|
||||
@@ -75,6 +75,22 @@ func LoadOrCreateIdentity(path string) (cs.Identity, error) {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// WriteNewIdentity writes id to path in the canonical identity-file format read
|
||||
// by LoadIdentity, but REFUSES to overwrite an existing file: provisioning a new
|
||||
// identity must never silently clobber another process's private keys. The file
|
||||
// is created 0600 (it holds private keys). It is the write half of one-command
|
||||
// bot provisioning (`membershipd bot add --out <path>`): the freshly minted
|
||||
// identity it writes is exactly what LoadIdentity reconstructs, so a bot binary
|
||||
// (worker/clientcheck) consumes the credentials with no extra conversion step.
|
||||
func WriteNewIdentity(path string, id cs.Identity) error {
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return fmt.Errorf("client: identity file %q already exists; refusing to overwrite", path)
|
||||
} else if !os.IsNotExist(err) {
|
||||
return fmt.Errorf("client: stat identity %q: %w", path, err)
|
||||
}
|
||||
return saveIdentity(path, id)
|
||||
}
|
||||
|
||||
func saveIdentity(path string, id cs.Identity) error {
|
||||
if dir := filepath.Dir(path); dir != "" {
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
|
||||
@@ -0,0 +1,155 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
)
|
||||
|
||||
// directory signs a GET /directory as id and decodes the response envelope. The
|
||||
// path has no /api prefix: Caddy strips /api before forwarding to membershipd, so
|
||||
// the route is registered (and hit here) as /directory, matching production.
|
||||
func directory(t *testing.T, h *authHarness, id cs.Identity, n int) (int, directoryResp) {
|
||||
t.Helper()
|
||||
code, body := signedJSON(t, h, "GET", "/directory", nil, id, n)
|
||||
var resp directoryResp
|
||||
if code == http.StatusOK {
|
||||
if err := json.Unmarshal([]byte(body), &resp); err != nil {
|
||||
t.Fatalf("decode directory: %v (%s)", err, body)
|
||||
}
|
||||
}
|
||||
return code, resp
|
||||
}
|
||||
|
||||
// findMember returns the directory entry for a signing key (case-insensitive).
|
||||
func findMember(members []directoryMember, signPub string) (directoryMember, bool) {
|
||||
want := normalizeSignPub(signPub)
|
||||
for _, m := range members {
|
||||
if normalizeSignPub(m.SignPub) == want {
|
||||
return m, true
|
||||
}
|
||||
}
|
||||
return directoryMember{}, false
|
||||
}
|
||||
|
||||
// TestDirectoryGolden is the happy path: an authenticated bus user (here the seed
|
||||
// admin alice, plus a registered member bob) reads the directory and gets every
|
||||
// active user's handle, role, and an endpoint derived server-side from the
|
||||
// sign_pub with the bus's own construction (frame.EndpointID). Two users in ->
|
||||
// 200 with both handles and correct endpoints.
|
||||
func TestDirectoryGolden(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
bob, _ := cs.GenerateIdentity()
|
||||
register(t, h, bob, "bob") // role member
|
||||
bobPub := hex.EncodeToString(bob.SignPub)
|
||||
|
||||
code, resp := directory(t, h, h.alice, 1)
|
||||
if code != http.StatusOK {
|
||||
t.Fatalf("directory should be 200 for an authenticated user, got %d", code)
|
||||
}
|
||||
|
||||
aliceRow, ok := findMember(resp.Members, h.alicePub)
|
||||
if !ok {
|
||||
t.Fatalf("seed admin alice missing from directory: %+v", resp.Members)
|
||||
}
|
||||
if aliceRow.Handle != "alice" || aliceRow.Role != RoleAdmin {
|
||||
t.Fatalf("alice row wrong: %+v", aliceRow)
|
||||
}
|
||||
if want := frame.EndpointID(h.alice.SignPub); aliceRow.Endpoint != want {
|
||||
t.Fatalf("alice endpoint = %q, want %q", aliceRow.Endpoint, want)
|
||||
}
|
||||
|
||||
bobRow, ok := findMember(resp.Members, bobPub)
|
||||
if !ok {
|
||||
t.Fatalf("registered member bob missing from directory: %+v", resp.Members)
|
||||
}
|
||||
if bobRow.Handle != "bob" || bobRow.Role != RoleMember {
|
||||
t.Fatalf("bob row wrong: %+v", bobRow)
|
||||
}
|
||||
if want := frame.EndpointID(bob.SignPub); bobRow.Endpoint != want {
|
||||
t.Fatalf("bob endpoint = %q, want %q", bobRow.Endpoint, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDirectoryUnauthenticatedRejected is the auth contract: under enforce an
|
||||
// unsigned GET /directory is rejected with 401 by the middleware, before the
|
||||
// handler ever runs — the directory is not public.
|
||||
func TestDirectoryUnauthenticatedRejected(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
req, _ := http.NewRequest("GET", h.ts.URL+"/directory", nil)
|
||||
code, _ := do(t, req)
|
||||
if code != http.StatusUnauthorized {
|
||||
t.Fatalf("unsigned directory request under enforce should be 401, got %d", code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDirectoryExcludesRevoked: a revoked user must not appear in the directory
|
||||
// (status=active filter), while active users still do.
|
||||
func TestDirectoryExcludesRevoked(t *testing.T) {
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
|
||||
gone, _ := cs.GenerateIdentity()
|
||||
register(t, h, gone, "gone")
|
||||
gonePub := hex.EncodeToString(gone.SignPub)
|
||||
if err := h.store.RevokeUser(gonePub); err != nil {
|
||||
t.Fatalf("revoke gone: %v", err)
|
||||
}
|
||||
|
||||
code, resp := directory(t, h, h.alice, 1)
|
||||
if code != http.StatusOK {
|
||||
t.Fatalf("directory should be 200, got %d", code)
|
||||
}
|
||||
if _, ok := findMember(resp.Members, gonePub); ok {
|
||||
t.Fatalf("revoked user must not appear in directory: %+v", resp.Members)
|
||||
}
|
||||
if _, ok := findMember(resp.Members, h.alicePub); !ok {
|
||||
t.Fatalf("active admin alice should still appear: %+v", resp.Members)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDirectoryEndpointParity pins the server-side endpoint derivation to the
|
||||
// cross-language parity vector emitted by cmd/busvectors (and consumed by the
|
||||
// uniweb crypto.ts endpointID test): for a FIXED sign_pub the directory must
|
||||
// return the exact base64url(sha256(signPub)) endpoint, byte-for-byte. The
|
||||
// expected value is recomputed here independently of frame.EndpointID so the test
|
||||
// fails if the handler ever diverges from the canonical construction.
|
||||
func TestDirectoryEndpointParity(t *testing.T) {
|
||||
// Vector from cmd/busvectors (seed 000102..1f -> Ed25519 public key).
|
||||
const vectorSignPub = "03a107bff3ce10be1d70dd18e74bc09967e4d6309ba50d5f1ddc8664125531b8"
|
||||
const vectorEndpoint = "Vkdap1RjR0wChd9dvyvKtz2mUTWIOem3dIGy6rEHcIw"
|
||||
|
||||
// Independent recomputation: base64url(sha256(raw signPub bytes)), unpadded.
|
||||
raw, err := hex.DecodeString(vectorSignPub)
|
||||
if err != nil {
|
||||
t.Fatalf("decode vector sign_pub: %v", err)
|
||||
}
|
||||
sum := sha256.Sum256(raw)
|
||||
if got := base64.RawURLEncoding.EncodeToString(sum[:]); got != vectorEndpoint {
|
||||
t.Fatalf("vector self-check: recomputed endpoint %q != pinned %q", got, vectorEndpoint)
|
||||
}
|
||||
|
||||
h := newAuthHarness(t, AuthEnforce)
|
||||
if err := h.store.AddUser(vectorSignPub, "vectorbot", RoleMember); err != nil {
|
||||
t.Fatalf("add vector user: %v", err)
|
||||
}
|
||||
|
||||
code, resp := directory(t, h, h.alice, 1)
|
||||
if code != http.StatusOK {
|
||||
t.Fatalf("directory should be 200, got %d", code)
|
||||
}
|
||||
row, ok := findMember(resp.Members, vectorSignPub)
|
||||
if !ok {
|
||||
t.Fatalf("vector user missing from directory: %+v", resp.Members)
|
||||
}
|
||||
if row.Endpoint != vectorEndpoint {
|
||||
t.Fatalf("endpoint parity broken: directory returned %q, want %q", row.Endpoint, vectorEndpoint)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,198 @@
|
||||
package membership
|
||||
|
||||
// Server-side durable history for persisted rooms (room.ModeMatrix / Persist).
|
||||
//
|
||||
// A persisted room's messages ride a file-backed JetStream stream named
|
||||
// "UNIBUS_<roomID>" (roomStreamName, identical to pkg/client.streamName). Until
|
||||
// now that stream was created only by the Go client's first publish/subscribe; a
|
||||
// client that speaks only core NATS (the browser client uniweb, which has no
|
||||
// JetStream) therefore never created it, so its messages were captured nowhere and
|
||||
// vanished on reload. This file moves stream ownership to the server: the control
|
||||
// plane ensures the stream when a persisted room is created (so capture starts at
|
||||
// minute zero whoever publishes) and exposes GET /rooms/{id}/history so a
|
||||
// JetStream-less client can read the backlog over plain HTTP.
|
||||
//
|
||||
// The server never decrypts: each stored message is the E2E frame exactly as it
|
||||
// was published (ciphertext for an encrypted room). The history endpoint returns
|
||||
// those bytes verbatim (base64-encoded for JSON safety), so end-to-end encryption
|
||||
// is preserved — the server only relays the bytes it already holds.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultHistoryLimit is the number of most-recent messages returned when the
|
||||
// caller does not specify ?limit.
|
||||
defaultHistoryLimit = 200
|
||||
// maxHistoryLimit is the hard ceiling on a single history response, so a caller
|
||||
// cannot ask the server to buffer an unbounded backlog into one JSON payload.
|
||||
maxHistoryLimit = 1000
|
||||
// historyOpTimeout bounds each JetStream operation the history path performs
|
||||
// (stream lookup/ensure, info, per-message get) so a stalled data plane cannot
|
||||
// hang a control-plane request indefinitely.
|
||||
historyOpTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// historyResp is the GET /rooms/{id}/history response envelope. messages is the
|
||||
// ordered (oldest→newest) list of the room's most recent frames, each the base64
|
||||
// (standard encoding) of the marshaled, still-encrypted frame as it was published.
|
||||
// The key is a stable contract consumed by the browser client; do not rename it.
|
||||
type historyResp struct {
|
||||
Messages []string `json:"messages"`
|
||||
}
|
||||
|
||||
// streamConfigForRoom builds the JetStream stream config for a persisted room.
|
||||
//
|
||||
// It MUST stay byte-for-byte compatible with pkg/client/persist.go's ensureStream
|
||||
// (the original owner of this format): same name derivation (roomStreamName ==
|
||||
// pkg/client.streamName), same single subject, LimitsPolicy retention, file
|
||||
// storage. pkg/client is the source of truth for the format; we copy it here
|
||||
// rather than import it because pkg/client imports pkg/membership and importing it
|
||||
// back would be a cycle. The only addition is Replicas, matched to the cluster's
|
||||
// control-plane replication so a persisted room's history is as available as its
|
||||
// metadata (1 standalone, up to 3 in an HA cluster). CreateOrUpdateStream treats a
|
||||
// matching config as a no-op, so the client's later ensureStream is harmless.
|
||||
func streamConfigForRoom(roomID, subject string, replicas int) jetstream.StreamConfig {
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
return jetstream.StreamConfig{
|
||||
Name: roomStreamName(roomID),
|
||||
Subjects: []string{subject},
|
||||
Retention: jetstream.LimitsPolicy,
|
||||
Storage: jetstream.FileStorage,
|
||||
Replicas: replicas,
|
||||
}
|
||||
}
|
||||
|
||||
// ensureRoomStream idempotently creates (or no-ops on) the durable stream that
|
||||
// captures a persisted room's subject. CreateOrUpdateStream returns the existing
|
||||
// stream unchanged when the config matches, so this is safe to call on every room
|
||||
// creation and on every history read (lazy backfill of pre-existing rooms).
|
||||
func ensureRoomStream(ctx context.Context, js jetstream.JetStream, roomID, subject string, replicas int) error {
|
||||
if _, err := js.CreateOrUpdateStream(ctx, streamConfigForRoom(roomID, subject, replicas)); err != nil {
|
||||
return fmt.Errorf("membership: ensure stream for room %s: %w", roomID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readRoomHistory returns the last `limit` messages of a room's durable stream in
|
||||
// chronological order (oldest→newest), each base64-encoded (standard encoding). A
|
||||
// stream that does not exist yet, or that holds no messages, yields an empty slice
|
||||
// (not an error): a freshly created or never-used room simply has no history. It
|
||||
// reads by sequence via the stream MSG.GET API rather than binding a consumer, so
|
||||
// it has no side effects on any peer's durable ack position. A gap in the sequence
|
||||
// range (a purged/deleted message) is skipped rather than failing the whole read,
|
||||
// so the result length is bounded by `limit` but may be smaller.
|
||||
func readRoomHistory(ctx context.Context, js jetstream.JetStream, roomID string, limit int) ([]string, error) {
|
||||
out := []string{}
|
||||
stream, err := js.Stream(ctx, roomStreamName(roomID))
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrStreamNotFound) {
|
||||
return out, nil
|
||||
}
|
||||
return nil, fmt.Errorf("membership: lookup stream for room %s: %w", roomID, err)
|
||||
}
|
||||
si, err := stream.Info(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: stream info for room %s: %w", roomID, err)
|
||||
}
|
||||
first, last := si.State.FirstSeq, si.State.LastSeq
|
||||
if si.State.Msgs == 0 || last == 0 {
|
||||
return out, nil
|
||||
}
|
||||
// Window of the last `limit` sequence numbers, clamped to the first stored seq.
|
||||
// last >= limit guards the unsigned subtraction against underflow.
|
||||
start := first
|
||||
if last >= uint64(limit) {
|
||||
if cand := last - uint64(limit) + 1; cand > start {
|
||||
start = cand
|
||||
}
|
||||
}
|
||||
for seq := start; seq <= last; seq++ {
|
||||
raw, err := stream.GetMsg(ctx, seq)
|
||||
if err != nil {
|
||||
// A purged/deleted sequence leaves a gap; skip it rather than abort.
|
||||
continue
|
||||
}
|
||||
out = append(out, base64.StdEncoding.EncodeToString(raw.Data))
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// parseHistoryLimit reads the ?limit query value, applying the default when it is
|
||||
// absent and clamping out-of-range / malformed values to [1, maxHistoryLimit].
|
||||
func parseHistoryLimit(q string) int {
|
||||
if q == "" {
|
||||
return defaultHistoryLimit
|
||||
}
|
||||
n, err := strconv.Atoi(q)
|
||||
if err != nil || n <= 0 {
|
||||
return defaultHistoryLimit
|
||||
}
|
||||
if n > maxHistoryLimit {
|
||||
return maxHistoryLimit
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// handleRoomHistory serves GET /rooms/{id}/history: the last ?limit (default 200,
|
||||
// hard cap 1000) messages of a persisted room, oldest→newest, each the base64 of
|
||||
// the still-encrypted frame as published. The server never decrypts — it relays
|
||||
// the ciphertext bytes the stream already holds, preserving E2E.
|
||||
//
|
||||
// Authorization mirrors the sibling room reads (/key, /members): the request must
|
||||
// be a member of the room (requireMember; allowed under AuthOff/dev where no signer
|
||||
// is verified). A missing room is 404; a non-member is 403; an unsigned request
|
||||
// under enforce is rejected with 401 by the auth middleware before this runs.
|
||||
//
|
||||
// For a persisted room the stream is ensured first (lazy backfill): a room created
|
||||
// before the server managed streams begins capturing from now on. Messages sent
|
||||
// before the stream existed were never captured and are unrecoverable — only
|
||||
// messages from stream creation onward appear here.
|
||||
func (s *Server) handleRoomHistory(w http.ResponseWriter, r *http.Request) {
|
||||
roomID := r.PathValue("id")
|
||||
// Existence first so a missing room is a clean 404 (the documented contract),
|
||||
// distinct from a 403 for an existing room the caller is not a member of.
|
||||
info, err := s.store.GetRoom(roomID)
|
||||
if err != nil {
|
||||
writeErr(w, http.StatusNotFound, "room not found")
|
||||
return
|
||||
}
|
||||
if _, ok := s.requireMember(w, r, roomID); !ok {
|
||||
return
|
||||
}
|
||||
limit := parseHistoryLimit(r.URL.Query().Get("limit"))
|
||||
|
||||
// No JetStream wired (e.g. an external-NATS deployment without a cluster/KV
|
||||
// feature): there is no durable stream to read, so report an empty history
|
||||
// rather than 500 — a client degrades to "no backlog" gracefully.
|
||||
if s.js == nil {
|
||||
writeJSON(w, http.StatusOK, historyResp{Messages: []string{}})
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
|
||||
defer cancel()
|
||||
if info.Persist {
|
||||
if err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas); err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
msgs, err := readRoomHistory(ctx, s.js, roomID, limit)
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, historyResp{Messages: msgs})
|
||||
}
|
||||
@@ -0,0 +1,400 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// historyHarness is an enforce-mode control plane wired to a real embedded NATS
|
||||
// JetStream, so the history path exercises the production code: the server ensures
|
||||
// and reads actual durable streams. alice is a seeded admin (and any room's owner),
|
||||
// bob is a registered user added as a room member, and carol is a registered user
|
||||
// that is NOT a member of the test room (to exercise the 403 path).
|
||||
type historyHarness struct {
|
||||
ts *httptest.Server
|
||||
store Store
|
||||
js jetstream.JetStream
|
||||
nc *nats.Conn
|
||||
alice cs.Identity // admin + room owner
|
||||
bob cs.Identity // room member
|
||||
carol cs.Identity // registered, non-member
|
||||
}
|
||||
|
||||
func newHistoryHarness(t *testing.T) *historyHarness {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: filepath.Join(dir, "jetstream"),
|
||||
Host: "127.0.0.1",
|
||||
Port: kvFreePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("embedded nats: %v", err)
|
||||
}
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
ns.Shutdown()
|
||||
t.Fatalf("nats connect: %v", err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
store, err := Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
if err != nil {
|
||||
store.Close()
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open blobs: %v", err)
|
||||
}
|
||||
mustID := func(name string) cs.Identity {
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity %s: %v", name, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
alice, bob, carol := mustID("alice"), mustID("bob"), mustID("carol")
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", RoleAdmin); err != nil {
|
||||
t.Fatalf("seed admin: %v", err)
|
||||
}
|
||||
for _, u := range []struct {
|
||||
id cs.Identity
|
||||
handle string
|
||||
}{{bob, "bob"}, {carol, "carol"}} {
|
||||
if err := store.AddUser(hex.EncodeToString(u.id.SignPub), u.handle, RoleMember); err != nil {
|
||||
t.Fatalf("register %s: %v", u.handle, err)
|
||||
}
|
||||
}
|
||||
|
||||
srv := NewServer(store, blobs, AuthEnforce)
|
||||
srv.SetJetStream(js, 1)
|
||||
ts := httptest.NewServer(srv)
|
||||
t.Cleanup(func() {
|
||||
ts.Close()
|
||||
store.Close()
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
ns.WaitForShutdown()
|
||||
})
|
||||
return &historyHarness{ts: ts, store: store, js: js, nc: nc, alice: alice, bob: bob, carol: carol}
|
||||
}
|
||||
|
||||
// seedPersistRoom creates a persisted (Matrix-policy) room directly in the store
|
||||
// with alice as owner and bob as a member, returning its id and subject. It does
|
||||
// NOT create the stream — that is left to the code under test (handleCreateRoom or
|
||||
// the lazy ensure in the history endpoint), which is exactly what we want to verify.
|
||||
func (h *historyHarness) seedPersistRoom(t *testing.T) (roomID, subject string) {
|
||||
t.Helper()
|
||||
roomID = newULID()
|
||||
subject = "unibus.room." + roomID
|
||||
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||
info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true, Persist: true}
|
||||
if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed")); err != nil {
|
||||
t.Fatalf("seed room: %v", err)
|
||||
}
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
bobM := Member{Endpoint: bobEp, Role: RoleMember, SignPub: h.bob.SignPub, KexPub: h.bob.KexPub}
|
||||
if err := h.store.AddMember(roomID, bobM, 0, []byte("bob-sealed")); err != nil {
|
||||
t.Fatalf("add member bob: %v", err)
|
||||
}
|
||||
return roomID, subject
|
||||
}
|
||||
|
||||
// makeFrame builds a marshaled PUB frame whose payload identifies it, so a test can
|
||||
// assert exact bytes and ordering after a round trip through the stream + endpoint.
|
||||
func makeFrame(t *testing.T, subject, sender string, i int) []byte {
|
||||
t.Helper()
|
||||
f := frame.Frame{
|
||||
Type: frame.PUB,
|
||||
Subject: subject,
|
||||
Sender: sender,
|
||||
MsgID: fmt.Sprintf("msg-%02d", i),
|
||||
Payload: []byte(fmt.Sprintf("ciphertext-%02d", i)),
|
||||
}
|
||||
b, err := f.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("marshal frame %d: %v", i, err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// getHistory signs a GET /rooms/{id}/history request as id and returns the status,
|
||||
// the raw body, and the decoded envelope. query is the raw query string (e.g.
|
||||
// "limit=2") or "". The signed path includes the query because the server verifies
|
||||
// the signature over r.URL.RequestURI(), which carries it.
|
||||
func (h *historyHarness) getHistory(t *testing.T, id cs.Identity, roomID, query string, n int) (int, string, historyResp) {
|
||||
t.Helper()
|
||||
path := "/rooms/" + roomID + "/history"
|
||||
if query != "" {
|
||||
path += "?" + query
|
||||
}
|
||||
req := signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n))
|
||||
code, body := do(t, req)
|
||||
var out historyResp
|
||||
if code == 200 {
|
||||
if err := json.Unmarshal([]byte(body), &out); err != nil {
|
||||
t.Fatalf("decode history: %v (%s)", err, body)
|
||||
}
|
||||
}
|
||||
return code, body, out
|
||||
}
|
||||
|
||||
// TestCreateRoomEnsuresStream verifies handleCreateRoom creates the durable stream
|
||||
// for a persisted room before responding, so capture starts at room creation.
|
||||
func TestCreateRoomEnsuresStream(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||
reqBody := createRoomReq{
|
||||
Subject: "unibus.room.created",
|
||||
Policy: policyJSON{Encrypt: true, Persist: true},
|
||||
Owner: endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub},
|
||||
SealedKeySelf: []byte("alice-sealed"),
|
||||
}
|
||||
body, _ := json.Marshal(reqBody)
|
||||
req := signedReq(t, h.ts.URL, "POST", "/rooms", body, h.alice, time.Now().Unix(), nonceN(1))
|
||||
code, respBody := do(t, req)
|
||||
if code != 201 {
|
||||
t.Fatalf("create room: want 201, got %d (%s)", code, respBody)
|
||||
}
|
||||
var cr createRoomResp
|
||||
if err := json.Unmarshal([]byte(respBody), &cr); err != nil {
|
||||
t.Fatalf("decode create resp: %v (%s)", err, respBody)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if _, err := h.js.Stream(ctx, roomStreamName(cr.RoomID)); err != nil {
|
||||
t.Fatalf("stream for created persist room should exist: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryGolden is the golden path: three frames published to a persisted
|
||||
// room's stream come back from the endpoint base64-encoded, in chronological order,
|
||||
// and decode to the exact frames that were published.
|
||||
func TestRoomHistoryGolden(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
want := make([][]byte, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
want[i] = makeFrame(t, subject, bobEp, i)
|
||||
// js.Publish waits for the stream ack, so the message is durably stored before
|
||||
// the next iteration — no sleeps, deterministic ordering.
|
||||
if _, err := h.js.Publish(ctx, subject, want[i]); err != nil {
|
||||
t.Fatalf("publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 10)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 3 {
|
||||
t.Fatalf("want 3 messages, got %d (%s)", len(hr.Messages), raw)
|
||||
}
|
||||
for i, m := range hr.Messages {
|
||||
decoded, err := base64.StdEncoding.DecodeString(m)
|
||||
if err != nil {
|
||||
t.Fatalf("message %d not valid base64: %v", i, err)
|
||||
}
|
||||
if string(decoded) != string(want[i]) {
|
||||
t.Fatalf("message %d bytes mismatch (order or content)", i)
|
||||
}
|
||||
f, err := frame.Unmarshal(decoded)
|
||||
if err != nil {
|
||||
t.Fatalf("message %d does not decode to a frame: %v", i, err)
|
||||
}
|
||||
if f.MsgID != fmt.Sprintf("msg-%02d", i) {
|
||||
t.Fatalf("message %d: want MsgID msg-%02d, got %q", i, i, f.MsgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryCapturesCoreNATSPublish proves the central fix: a message
|
||||
// published over PLAIN core NATS (as the JetStream-less browser client uniweb does)
|
||||
// is captured by the server-owned stream and served by the endpoint. Without the
|
||||
// server ensuring the stream, this message would be captured nowhere.
|
||||
func TestRoomHistoryCapturesCoreNATSPublish(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
sent := makeFrame(t, subject, bobEp, 7)
|
||||
if err := h.nc.Publish(subject, sent); err != nil {
|
||||
t.Fatalf("core publish: %v", err)
|
||||
}
|
||||
if err := h.nc.Flush(); err != nil {
|
||||
t.Fatalf("flush: %v", err)
|
||||
}
|
||||
// Core NATS publish has no stream ack; poll the stream until the message lands.
|
||||
h.waitMsgs(t, roomID, 1)
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 11)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 1 {
|
||||
t.Fatalf("want 1 captured message, got %d (%s)", len(hr.Messages), raw)
|
||||
}
|
||||
decoded, err := base64.StdEncoding.DecodeString(hr.Messages[0])
|
||||
if err != nil || string(decoded) != string(sent) {
|
||||
t.Fatalf("captured core-NATS message round-trip mismatch (err=%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryLimit verifies ?limit caps the response to the most recent N
|
||||
// messages, oldest→newest within the window.
|
||||
func TestRoomHistoryLimit(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := h.js.Publish(ctx, subject, makeFrame(t, subject, bobEp, i)); err != nil {
|
||||
t.Fatalf("publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "limit=2", 12)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 2 {
|
||||
t.Fatalf("limit=2 over 5 messages: want 2, got %d", len(hr.Messages))
|
||||
}
|
||||
// The window is the last two messages (indices 3 and 4), in order.
|
||||
for off, m := range hr.Messages {
|
||||
decoded, _ := base64.StdEncoding.DecodeString(m)
|
||||
f, err := frame.Unmarshal(decoded)
|
||||
if err != nil {
|
||||
t.Fatalf("limited message %d does not decode: %v", off, err)
|
||||
}
|
||||
want := fmt.Sprintf("msg-%02d", off+3)
|
||||
if f.MsgID != want {
|
||||
t.Fatalf("limited message %d: want MsgID %s, got %q", off, want, f.MsgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryEmptyRoom verifies a persisted room with no messages returns an
|
||||
// empty (non-null) array, lazily ensuring the stream on the way.
|
||||
func TestRoomHistoryEmptyRoom(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 13)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if hr.Messages == nil {
|
||||
t.Fatalf("empty room must return [] not null (%s)", raw)
|
||||
}
|
||||
if len(hr.Messages) != 0 {
|
||||
t.Fatalf("empty room: want 0 messages, got %d", len(hr.Messages))
|
||||
}
|
||||
// The lazy ensure should have created the stream even though no message exists.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if _, err := h.js.Stream(ctx, roomStreamName(roomID)); err != nil {
|
||||
t.Fatalf("lazy ensure should have created the stream: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryUnauthenticated verifies an unsigned request is rejected with 401
|
||||
// under enforce, before the handler runs.
|
||||
func TestRoomHistoryUnauthenticated(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
// No signing headers: plain GET against the enforce-mode control plane.
|
||||
req, err := http.NewRequest("GET", h.ts.URL+"/rooms/"+roomID+"/history", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
code, body := do(t, req)
|
||||
if code != 401 {
|
||||
t.Fatalf("unauthenticated history: want 401, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryNonMember verifies a registered user who is NOT a member of the
|
||||
// room is rejected with 403.
|
||||
func TestRoomHistoryNonMember(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
code, body, _ := h.getHistory(t, h.carol, roomID, "", 14)
|
||||
if code != 403 {
|
||||
t.Fatalf("non-member history: want 403, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryRoomNotFound verifies a request for a non-existent room is a 404,
|
||||
// distinct from the 403 a non-member of an existing room gets.
|
||||
func TestRoomHistoryRoomNotFound(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
code, body, _ := h.getHistory(t, h.alice, newULID(), "", 15)
|
||||
if code != 404 {
|
||||
t.Fatalf("missing room history: want 404, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// waitMsgs polls the room's stream until it holds at least want messages or a short
|
||||
// deadline elapses, so a core-NATS publish (which carries no stream ack) is observed
|
||||
// deterministically without a fixed sleep.
|
||||
func (h *historyHarness) waitMsgs(t *testing.T, roomID string, want uint64) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
st, err := h.js.Stream(ctx, roomStreamName(roomID))
|
||||
if err == nil {
|
||||
si, ierr := st.Info(ctx)
|
||||
if ierr == nil && si.State.Msgs >= want {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("stream for room %s never reached %d message(s)", roomID, want)
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package membership
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -108,6 +109,21 @@ type Server struct {
|
||||
// the RemoteAddr-only behavior that predates the flag. Set by the command via
|
||||
// SetTrustedProxies. See clientIP.
|
||||
trustedProxies trustedProxyMatcher
|
||||
|
||||
// js is the privileged JetStream context the server uses to own the durable
|
||||
// per-room streams of persisted rooms: it ensures a room's stream on creation
|
||||
// so the room's subject is captured from the first message — even from a
|
||||
// JetStream-less browser client (uniweb) that speaks only core NATS — and reads
|
||||
// it back for GET /rooms/{id}/history. It is wired by the command via
|
||||
// SetJetStream whenever a JetStream-capable data plane is available (always for
|
||||
// the embedded server). nil leaves history empty and stream-ensure a no-op,
|
||||
// preserving the pre-feature behavior for a deployment without JetStream.
|
||||
js jetstream.JetStream
|
||||
// streamReplicas is the replication factor for the room streams the server
|
||||
// creates, matched to the cluster's control-plane (KV) replication — 1 for a
|
||||
// standalone node, up to 3 in an HA cluster — so a persisted room's history is
|
||||
// as available as its metadata. Used only when js != nil. See SetJetStream.
|
||||
streamReplicas int
|
||||
}
|
||||
|
||||
// Posture describes the security posture a membershipd node runs with. It is
|
||||
@@ -142,6 +158,19 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// SetJetStream wires the privileged JetStream context (and the room-stream
|
||||
// replication factor) the server uses to ensure and read the durable streams of
|
||||
// persisted rooms. replicas below 1 is clamped to 1. It must be called once at
|
||||
// startup, before the server begins serving; leaving it unset keeps history empty
|
||||
// and stream-ensure a no-op, the behavior for a deployment without JetStream.
|
||||
func (s *Server) SetJetStream(js jetstream.JetStream, replicas int) {
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
s.js = js
|
||||
s.streamReplicas = replicas
|
||||
}
|
||||
|
||||
// UseReplicatedNonces switches the server's anti-replay store from the
|
||||
// per-process in-memory cache to a JetStream KV bucket shared across the cluster
|
||||
// (issue 0003e). It MUST be called on every node of a multi-node deployment:
|
||||
@@ -402,6 +431,13 @@ func (s *Server) routes() {
|
||||
s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite)
|
||||
s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey)
|
||||
s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers)
|
||||
// Durable message history for a persisted room, read server-side from the room's
|
||||
// JetStream stream so a client without JetStream (the browser client uniweb) can
|
||||
// load the backlog over plain HTTP. Member-only, like /key and /members.
|
||||
// Registered without the /api prefix like every other control-plane route: Caddy
|
||||
// strips /api via handle_path /api/* before forwarding, so the SPA's
|
||||
// GET /api/rooms/{id}/history arrives here as GET /rooms/{id}/history.
|
||||
s.mux.HandleFunc("GET /rooms/{id}/history", s.handleRoomHistory)
|
||||
s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms)
|
||||
s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey)
|
||||
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
|
||||
@@ -414,6 +450,15 @@ func (s *Server) routes() {
|
||||
s.mux.HandleFunc("GET /users", s.handleListUsers)
|
||||
s.mux.HandleFunc("POST /users", s.handleAddUser)
|
||||
s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser)
|
||||
// Member directory — any authenticated bus user (member or admin) may map an
|
||||
// endpoint id back to its human handle, so clients can render readable sender
|
||||
// names instead of raw endpoint hashes. Unlike /users it is NOT admin-only and
|
||||
// returns only active users; under enforce the auth middleware already rejects
|
||||
// an unauthenticated caller with 401 before this handler runs (uniweb/0002).
|
||||
// Registered without the /api prefix like every other control-plane route:
|
||||
// Caddy strips /api via handle_path /api/* before forwarding to membershipd,
|
||||
// so the SPA's GET /api/directory arrives here as GET /directory.
|
||||
s.mux.HandleFunc("GET /directory", s.handleDirectory)
|
||||
}
|
||||
|
||||
// ---- wire types -----------------------------------------------------------
|
||||
@@ -512,6 +557,24 @@ type addUserReq struct {
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
// directoryMember is one entry of the GET /directory response: enough for a
|
||||
// client to map a message's endpoint id (which the bus stamps on every frame)
|
||||
// back to a readable handle. endpoint is derived server-side from sign_pub with
|
||||
// the SAME construction the bus uses (frame.EndpointID = base64url(sha256(signPub)),
|
||||
// unpadded), so it matches the sender id a client already has byte-for-byte.
|
||||
type directoryMember struct {
|
||||
SignPub string `json:"sign_pub"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Handle string `json:"handle"`
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
// directoryResp is the GET /directory response envelope. The members key is a
|
||||
// stable contract consumed by the browser client; do not rename it.
|
||||
type directoryResp struct {
|
||||
Members []directoryMember `json:"members"`
|
||||
}
|
||||
|
||||
// ---- helpers --------------------------------------------------------------
|
||||
|
||||
func writeJSON(w http.ResponseWriter, code int, v any) {
|
||||
@@ -604,6 +667,21 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
||||
SignMsgs: req.Policy.SignMsgs,
|
||||
OwnerEndpoint: req.Owner.Endpoint,
|
||||
}
|
||||
// Own the durable stream for a persisted room (issue room-history): ensure it
|
||||
// BEFORE the room row is written so the subject is captured from the very first
|
||||
// message whoever publishes it — a Go client OR a JetStream-less browser client.
|
||||
// Done first so a stream failure aborts cleanly with no orphan room row (the
|
||||
// rare orphan empty stream it can leave is harmless and idempotently reused).
|
||||
// Skipped when no JetStream is wired: the room still works, just without history.
|
||||
if info.Persist && s.js != nil {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
|
||||
err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas)
|
||||
cancel()
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
@@ -857,6 +935,41 @@ func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
// handleDirectory returns the active bus user directory so a client can resolve a
|
||||
// sender's endpoint id to a readable handle. Unlike handleListUsers it is NOT
|
||||
// admin-only: every authenticated bus user may read it (the auth middleware has
|
||||
// already verified the caller is an active user under enforce, and rejected an
|
||||
// unauthenticated one with 401). Only active users are listed, and each endpoint
|
||||
// is computed server-side from the user's sign_pub with frame.EndpointID — the
|
||||
// exact derivation the bus stamps on every frame, so the returned endpoint matches
|
||||
// the sender id a client already holds. A user with a malformed sign_pub (which
|
||||
// the add path rejects, so this is defensive) is skipped rather than failing the
|
||||
// whole listing.
|
||||
func (s *Server) handleDirectory(w http.ResponseWriter, r *http.Request) {
|
||||
users, err := s.store.ListUsers()
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
out := make([]directoryMember, 0, len(users))
|
||||
for _, u := range users {
|
||||
if u.Status != StatusActive {
|
||||
continue
|
||||
}
|
||||
signPub, err := hex.DecodeString(u.SignPub)
|
||||
if err != nil || len(signPub) != 32 {
|
||||
continue
|
||||
}
|
||||
out = append(out, directoryMember{
|
||||
SignPub: u.SignPub,
|
||||
Endpoint: frame.EndpointID(signPub),
|
||||
Handle: u.Handle,
|
||||
Role: u.Role,
|
||||
})
|
||||
}
|
||||
writeJSON(w, http.StatusOK, directoryResp{Members: out})
|
||||
}
|
||||
|
||||
// handleAddUser registers a new bus user from an admin-supplied Ed25519 signing
|
||||
// key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the
|
||||
// role must be admin or member (empty defaults to member), and re-adding an
|
||||
|
||||
Reference in New Issue
Block a user