diff --git a/app.md b/app.md index 9f4daf4e..b48d677f 100644 --- a/app.md +++ b/app.md @@ -2,7 +2,7 @@ name: unibus lang: go domain: infra -version: 0.15.1 +version: 0.16.0 description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." tags: [service, messaging, nats, e2e] uses_functions: @@ -225,6 +225,20 @@ agent..{in,out} inbox/outbox de agente LLM (agent.scout.in) ## Capability growth log +- v0.16.0 (2026-06-14) — feat: el server asegura el stream JetStream de las rooms + persist + `GET /rooms/{id}/history` para que clientes sin JetStream (uniweb) lean + el histórico. (1) `handleCreateRoom` crea (idempotente, `CreateOrUpdateStream`) el + stream durable `UNIBUS_` de una room persist ANTES de responder, así su + subject se captura desde el minuto cero venga el mensaje de un cliente Go o de un + cliente browser que solo habla core NATS (antes el stream lo creaba solo el cliente + Go, así que los mensajes de uniweb se perdían). (2) Nuevo endpoint member-only + `GET /rooms/{id}/history?limit=N` (default 200, cap 1000): lee el stream + server-side y devuelve `{messages:[]}` en orden + oldest→newest; el server jamás descifra (relay del ciphertext E2E). Backfill de + rooms persist existentes: lazy-ensure del stream en el endpoint (empiezan a + capturar desde ahora; los mensajes previos al stream no son recuperables). El + control plane abre ahora su propio contexto JetStream también en single-node + embebido. Todo aditivo; build/vet/test verdes. - v0.15.1 (2026-06-14) — fix: la ruta del directorio se registraba con prefijo /api y Caddy lo stripeaba (404 en prod); corregida a /directory. - v0.15.0 (2026-06-14) — nombres legibles + provisioning de bots de un comando. (1) Nuevo `GET /api/directory` en el control-plane: cualquier usuario activo del diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index 5670677f..31bbf5be 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -150,6 +150,16 @@ func main() { decentralized := *storeBackend == "kv" needJS := clustered || decentralized enforce := authMode == membership.AuthEnforce + embedded := *natsURL == "" + // The control plane also needs a privileged JetStream client to OWN the durable + // per-room streams of persisted rooms (ensure the stream on room creation so the + // subject is captured from the first message — even from a JetStream-less browser + // client — and read it back for GET /rooms/{id}/history). The embedded NATS + // always ships JetStream, so open the client whenever we run embedded, even for a + // standalone SQLite node. For an EXTERNAL NATS we only reach for JetStream when a + // cluster/KV feature explicitly requires it (unchanged), so an operator-managed + // external deployment without those features behaves exactly as before. + openJS := needJS || embedded // Internal service identity (issue 0006a): when the embedded data plane enforces // auth, membershipd must still connect to its OWN server to manage JetStream. @@ -159,7 +169,7 @@ func main() { // the server is embedded), so a standalone or non-enforce node is unchanged. var internalID cs.Identity var internalPubHex string - if needJS && enforce && *natsURL == "" { + if openJS && enforce && embedded { if *internalIDFile != "" { // Persisted identity: load it, generating + writing it (0600) on first // start. A stable internal key is what `user add --store kv` presents to @@ -316,9 +326,9 @@ func main() { // only client that can connect in this window (the holder still denies everyone // else; the internal identity bypasses the store). var js jetstream.JetStream - if needJS { + if openJS { var internalNC *nats.Conn - if *natsURL == "" { + if embedded { internalNC, js, err = connectInternalJS(ns, internalID, enforce) } else { internalNC, js, err = connectExternalJS(natsClientURL, *caFile) @@ -340,6 +350,14 @@ func main() { } srv := membership.NewServer(store, blobs, authMode) + // Wire the privileged JetStream context so the control plane owns persisted + // rooms' durable streams (ensure on create + serve GET /rooms/{id}/history). The + // stream replication factor matches the control-plane KV replication so a room's + // history is as available as its metadata. js is nil only for an external NATS + // without a cluster/KV feature, where history degrades to empty (see openJS). + if js != nil { + srv.SetJetStream(js, *kvReplicas) + } // On a public (non-loopback) bind, disable cleartext rooms: the embedded NATS // has no per-subject ACL, so cleartext content would be readable by any // registered peer. Forcing E2E keeps message content confidential regardless diff --git a/pkg/membership/history.go b/pkg/membership/history.go new file mode 100644 index 00000000..093c6051 --- /dev/null +++ b/pkg/membership/history.go @@ -0,0 +1,198 @@ +package membership + +// Server-side durable history for persisted rooms (room.ModeMatrix / Persist). +// +// A persisted room's messages ride a file-backed JetStream stream named +// "UNIBUS_" (roomStreamName, identical to pkg/client.streamName). Until +// now that stream was created only by the Go client's first publish/subscribe; a +// client that speaks only core NATS (the browser client uniweb, which has no +// JetStream) therefore never created it, so its messages were captured nowhere and +// vanished on reload. This file moves stream ownership to the server: the control +// plane ensures the stream when a persisted room is created (so capture starts at +// minute zero whoever publishes) and exposes GET /rooms/{id}/history so a +// JetStream-less client can read the backlog over plain HTTP. +// +// The server never decrypts: each stored message is the E2E frame exactly as it +// was published (ciphertext for an encrypted room). The history endpoint returns +// those bytes verbatim (base64-encoded for JSON safety), so end-to-end encryption +// is preserved — the server only relays the bytes it already holds. + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +const ( + // defaultHistoryLimit is the number of most-recent messages returned when the + // caller does not specify ?limit. + defaultHistoryLimit = 200 + // maxHistoryLimit is the hard ceiling on a single history response, so a caller + // cannot ask the server to buffer an unbounded backlog into one JSON payload. + maxHistoryLimit = 1000 + // historyOpTimeout bounds each JetStream operation the history path performs + // (stream lookup/ensure, info, per-message get) so a stalled data plane cannot + // hang a control-plane request indefinitely. + historyOpTimeout = 5 * time.Second +) + +// historyResp is the GET /rooms/{id}/history response envelope. messages is the +// ordered (oldest→newest) list of the room's most recent frames, each the base64 +// (standard encoding) of the marshaled, still-encrypted frame as it was published. +// The key is a stable contract consumed by the browser client; do not rename it. +type historyResp struct { + Messages []string `json:"messages"` +} + +// streamConfigForRoom builds the JetStream stream config for a persisted room. +// +// It MUST stay byte-for-byte compatible with pkg/client/persist.go's ensureStream +// (the original owner of this format): same name derivation (roomStreamName == +// pkg/client.streamName), same single subject, LimitsPolicy retention, file +// storage. pkg/client is the source of truth for the format; we copy it here +// rather than import it because pkg/client imports pkg/membership and importing it +// back would be a cycle. The only addition is Replicas, matched to the cluster's +// control-plane replication so a persisted room's history is as available as its +// metadata (1 standalone, up to 3 in an HA cluster). CreateOrUpdateStream treats a +// matching config as a no-op, so the client's later ensureStream is harmless. +func streamConfigForRoom(roomID, subject string, replicas int) jetstream.StreamConfig { + if replicas < 1 { + replicas = 1 + } + return jetstream.StreamConfig{ + Name: roomStreamName(roomID), + Subjects: []string{subject}, + Retention: jetstream.LimitsPolicy, + Storage: jetstream.FileStorage, + Replicas: replicas, + } +} + +// ensureRoomStream idempotently creates (or no-ops on) the durable stream that +// captures a persisted room's subject. CreateOrUpdateStream returns the existing +// stream unchanged when the config matches, so this is safe to call on every room +// creation and on every history read (lazy backfill of pre-existing rooms). +func ensureRoomStream(ctx context.Context, js jetstream.JetStream, roomID, subject string, replicas int) error { + if _, err := js.CreateOrUpdateStream(ctx, streamConfigForRoom(roomID, subject, replicas)); err != nil { + return fmt.Errorf("membership: ensure stream for room %s: %w", roomID, err) + } + return nil +} + +// readRoomHistory returns the last `limit` messages of a room's durable stream in +// chronological order (oldest→newest), each base64-encoded (standard encoding). A +// stream that does not exist yet, or that holds no messages, yields an empty slice +// (not an error): a freshly created or never-used room simply has no history. It +// reads by sequence via the stream MSG.GET API rather than binding a consumer, so +// it has no side effects on any peer's durable ack position. A gap in the sequence +// range (a purged/deleted message) is skipped rather than failing the whole read, +// so the result length is bounded by `limit` but may be smaller. +func readRoomHistory(ctx context.Context, js jetstream.JetStream, roomID string, limit int) ([]string, error) { + out := []string{} + stream, err := js.Stream(ctx, roomStreamName(roomID)) + if err != nil { + if errors.Is(err, jetstream.ErrStreamNotFound) { + return out, nil + } + return nil, fmt.Errorf("membership: lookup stream for room %s: %w", roomID, err) + } + si, err := stream.Info(ctx) + if err != nil { + return nil, fmt.Errorf("membership: stream info for room %s: %w", roomID, err) + } + first, last := si.State.FirstSeq, si.State.LastSeq + if si.State.Msgs == 0 || last == 0 { + return out, nil + } + // Window of the last `limit` sequence numbers, clamped to the first stored seq. + // last >= limit guards the unsigned subtraction against underflow. + start := first + if last >= uint64(limit) { + if cand := last - uint64(limit) + 1; cand > start { + start = cand + } + } + for seq := start; seq <= last; seq++ { + raw, err := stream.GetMsg(ctx, seq) + if err != nil { + // A purged/deleted sequence leaves a gap; skip it rather than abort. + continue + } + out = append(out, base64.StdEncoding.EncodeToString(raw.Data)) + } + return out, nil +} + +// parseHistoryLimit reads the ?limit query value, applying the default when it is +// absent and clamping out-of-range / malformed values to [1, maxHistoryLimit]. +func parseHistoryLimit(q string) int { + if q == "" { + return defaultHistoryLimit + } + n, err := strconv.Atoi(q) + if err != nil || n <= 0 { + return defaultHistoryLimit + } + if n > maxHistoryLimit { + return maxHistoryLimit + } + return n +} + +// handleRoomHistory serves GET /rooms/{id}/history: the last ?limit (default 200, +// hard cap 1000) messages of a persisted room, oldest→newest, each the base64 of +// the still-encrypted frame as published. The server never decrypts — it relays +// the ciphertext bytes the stream already holds, preserving E2E. +// +// Authorization mirrors the sibling room reads (/key, /members): the request must +// be a member of the room (requireMember; allowed under AuthOff/dev where no signer +// is verified). A missing room is 404; a non-member is 403; an unsigned request +// under enforce is rejected with 401 by the auth middleware before this runs. +// +// For a persisted room the stream is ensured first (lazy backfill): a room created +// before the server managed streams begins capturing from now on. Messages sent +// before the stream existed were never captured and are unrecoverable — only +// messages from stream creation onward appear here. +func (s *Server) handleRoomHistory(w http.ResponseWriter, r *http.Request) { + roomID := r.PathValue("id") + // Existence first so a missing room is a clean 404 (the documented contract), + // distinct from a 403 for an existing room the caller is not a member of. + info, err := s.store.GetRoom(roomID) + if err != nil { + writeErr(w, http.StatusNotFound, "room not found") + return + } + if _, ok := s.requireMember(w, r, roomID); !ok { + return + } + limit := parseHistoryLimit(r.URL.Query().Get("limit")) + + // No JetStream wired (e.g. an external-NATS deployment without a cluster/KV + // feature): there is no durable stream to read, so report an empty history + // rather than 500 — a client degrades to "no backlog" gracefully. + if s.js == nil { + writeJSON(w, http.StatusOK, historyResp{Messages: []string{}}) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout) + defer cancel() + if info.Persist { + if err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas); err != nil { + writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) + return + } + } + msgs, err := readRoomHistory(ctx, s.js, roomID, limit) + if err != nil { + writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) + return + } + writeJSON(w, http.StatusOK, historyResp{Messages: msgs}) +} diff --git a/pkg/membership/history_test.go b/pkg/membership/history_test.go new file mode 100644 index 00000000..feea1c5e --- /dev/null +++ b/pkg/membership/history_test.go @@ -0,0 +1,400 @@ +package membership + +import ( + "context" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// historyHarness is an enforce-mode control plane wired to a real embedded NATS +// JetStream, so the history path exercises the production code: the server ensures +// and reads actual durable streams. alice is a seeded admin (and any room's owner), +// bob is a registered user added as a room member, and carol is a registered user +// that is NOT a member of the test room (to exercise the 403 path). +type historyHarness struct { + ts *httptest.Server + store Store + js jetstream.JetStream + nc *nats.Conn + alice cs.Identity // admin + room owner + bob cs.Identity // room member + carol cs.Identity // registered, non-member +} + +func newHistoryHarness(t *testing.T) *historyHarness { + t.Helper() + dir := t.TempDir() + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: filepath.Join(dir, "jetstream"), + Host: "127.0.0.1", + Port: kvFreePort(t), + }) + if err != nil { + t.Fatalf("embedded nats: %v", err) + } + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + ns.Shutdown() + t.Fatalf("nats connect: %v", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("jetstream: %v", err) + } + store, err := Open(filepath.Join(dir, "unibus.db")) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("open store: %v", err) + } + blobs, err := blobstore.New(filepath.Join(dir, "blobs")) + if err != nil { + store.Close() + nc.Close() + ns.Shutdown() + t.Fatalf("open blobs: %v", err) + } + mustID := func(name string) cs.Identity { + id, err := cs.GenerateIdentity() + if err != nil { + t.Fatalf("identity %s: %v", name, err) + } + return id + } + alice, bob, carol := mustID("alice"), mustID("bob"), mustID("carol") + if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", RoleAdmin); err != nil { + t.Fatalf("seed admin: %v", err) + } + for _, u := range []struct { + id cs.Identity + handle string + }{{bob, "bob"}, {carol, "carol"}} { + if err := store.AddUser(hex.EncodeToString(u.id.SignPub), u.handle, RoleMember); err != nil { + t.Fatalf("register %s: %v", u.handle, err) + } + } + + srv := NewServer(store, blobs, AuthEnforce) + srv.SetJetStream(js, 1) + ts := httptest.NewServer(srv) + t.Cleanup(func() { + ts.Close() + store.Close() + nc.Close() + ns.Shutdown() + ns.WaitForShutdown() + }) + return &historyHarness{ts: ts, store: store, js: js, nc: nc, alice: alice, bob: bob, carol: carol} +} + +// seedPersistRoom creates a persisted (Matrix-policy) room directly in the store +// with alice as owner and bob as a member, returning its id and subject. It does +// NOT create the stream — that is left to the code under test (handleCreateRoom or +// the lazy ensure in the history endpoint), which is exactly what we want to verify. +func (h *historyHarness) seedPersistRoom(t *testing.T) (roomID, subject string) { + t.Helper() + roomID = newULID() + subject = "unibus.room." + roomID + aliceEp := frame.EndpointID(h.alice.SignPub) + info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true, Persist: true} + if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed")); err != nil { + t.Fatalf("seed room: %v", err) + } + bobEp := frame.EndpointID(h.bob.SignPub) + bobM := Member{Endpoint: bobEp, Role: RoleMember, SignPub: h.bob.SignPub, KexPub: h.bob.KexPub} + if err := h.store.AddMember(roomID, bobM, 0, []byte("bob-sealed")); err != nil { + t.Fatalf("add member bob: %v", err) + } + return roomID, subject +} + +// makeFrame builds a marshaled PUB frame whose payload identifies it, so a test can +// assert exact bytes and ordering after a round trip through the stream + endpoint. +func makeFrame(t *testing.T, subject, sender string, i int) []byte { + t.Helper() + f := frame.Frame{ + Type: frame.PUB, + Subject: subject, + Sender: sender, + MsgID: fmt.Sprintf("msg-%02d", i), + Payload: []byte(fmt.Sprintf("ciphertext-%02d", i)), + } + b, err := f.Marshal() + if err != nil { + t.Fatalf("marshal frame %d: %v", i, err) + } + return b +} + +// getHistory signs a GET /rooms/{id}/history request as id and returns the status, +// the raw body, and the decoded envelope. query is the raw query string (e.g. +// "limit=2") or "". The signed path includes the query because the server verifies +// the signature over r.URL.RequestURI(), which carries it. +func (h *historyHarness) getHistory(t *testing.T, id cs.Identity, roomID, query string, n int) (int, string, historyResp) { + t.Helper() + path := "/rooms/" + roomID + "/history" + if query != "" { + path += "?" + query + } + req := signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n)) + code, body := do(t, req) + var out historyResp + if code == 200 { + if err := json.Unmarshal([]byte(body), &out); err != nil { + t.Fatalf("decode history: %v (%s)", err, body) + } + } + return code, body, out +} + +// TestCreateRoomEnsuresStream verifies handleCreateRoom creates the durable stream +// for a persisted room before responding, so capture starts at room creation. +func TestCreateRoomEnsuresStream(t *testing.T) { + h := newHistoryHarness(t) + aliceEp := frame.EndpointID(h.alice.SignPub) + reqBody := createRoomReq{ + Subject: "unibus.room.created", + Policy: policyJSON{Encrypt: true, Persist: true}, + Owner: endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub}, + SealedKeySelf: []byte("alice-sealed"), + } + body, _ := json.Marshal(reqBody) + req := signedReq(t, h.ts.URL, "POST", "/rooms", body, h.alice, time.Now().Unix(), nonceN(1)) + code, respBody := do(t, req) + if code != 201 { + t.Fatalf("create room: want 201, got %d (%s)", code, respBody) + } + var cr createRoomResp + if err := json.Unmarshal([]byte(respBody), &cr); err != nil { + t.Fatalf("decode create resp: %v (%s)", err, respBody) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if _, err := h.js.Stream(ctx, roomStreamName(cr.RoomID)); err != nil { + t.Fatalf("stream for created persist room should exist: %v", err) + } +} + +// TestRoomHistoryGolden is the golden path: three frames published to a persisted +// room's stream come back from the endpoint base64-encoded, in chronological order, +// and decode to the exact frames that were published. +func TestRoomHistoryGolden(t *testing.T) { + h := newHistoryHarness(t) + roomID, subject := h.seedPersistRoom(t) + bobEp := frame.EndpointID(h.bob.SignPub) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil { + t.Fatalf("ensure stream: %v", err) + } + want := make([][]byte, 3) + for i := 0; i < 3; i++ { + want[i] = makeFrame(t, subject, bobEp, i) + // js.Publish waits for the stream ack, so the message is durably stored before + // the next iteration — no sleeps, deterministic ordering. + if _, err := h.js.Publish(ctx, subject, want[i]); err != nil { + t.Fatalf("publish %d: %v", i, err) + } + } + + code, raw, hr := h.getHistory(t, h.bob, roomID, "", 10) + if code != 200 { + t.Fatalf("history: want 200, got %d (%s)", code, raw) + } + if len(hr.Messages) != 3 { + t.Fatalf("want 3 messages, got %d (%s)", len(hr.Messages), raw) + } + for i, m := range hr.Messages { + decoded, err := base64.StdEncoding.DecodeString(m) + if err != nil { + t.Fatalf("message %d not valid base64: %v", i, err) + } + if string(decoded) != string(want[i]) { + t.Fatalf("message %d bytes mismatch (order or content)", i) + } + f, err := frame.Unmarshal(decoded) + if err != nil { + t.Fatalf("message %d does not decode to a frame: %v", i, err) + } + if f.MsgID != fmt.Sprintf("msg-%02d", i) { + t.Fatalf("message %d: want MsgID msg-%02d, got %q", i, i, f.MsgID) + } + } +} + +// TestRoomHistoryCapturesCoreNATSPublish proves the central fix: a message +// published over PLAIN core NATS (as the JetStream-less browser client uniweb does) +// is captured by the server-owned stream and served by the endpoint. Without the +// server ensuring the stream, this message would be captured nowhere. +func TestRoomHistoryCapturesCoreNATSPublish(t *testing.T) { + h := newHistoryHarness(t) + roomID, subject := h.seedPersistRoom(t) + bobEp := frame.EndpointID(h.bob.SignPub) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil { + t.Fatalf("ensure stream: %v", err) + } + sent := makeFrame(t, subject, bobEp, 7) + if err := h.nc.Publish(subject, sent); err != nil { + t.Fatalf("core publish: %v", err) + } + if err := h.nc.Flush(); err != nil { + t.Fatalf("flush: %v", err) + } + // Core NATS publish has no stream ack; poll the stream until the message lands. + h.waitMsgs(t, roomID, 1) + + code, raw, hr := h.getHistory(t, h.bob, roomID, "", 11) + if code != 200 { + t.Fatalf("history: want 200, got %d (%s)", code, raw) + } + if len(hr.Messages) != 1 { + t.Fatalf("want 1 captured message, got %d (%s)", len(hr.Messages), raw) + } + decoded, err := base64.StdEncoding.DecodeString(hr.Messages[0]) + if err != nil || string(decoded) != string(sent) { + t.Fatalf("captured core-NATS message round-trip mismatch (err=%v)", err) + } +} + +// TestRoomHistoryLimit verifies ?limit caps the response to the most recent N +// messages, oldest→newest within the window. +func TestRoomHistoryLimit(t *testing.T) { + h := newHistoryHarness(t) + roomID, subject := h.seedPersistRoom(t) + bobEp := frame.EndpointID(h.bob.SignPub) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil { + t.Fatalf("ensure stream: %v", err) + } + for i := 0; i < 5; i++ { + if _, err := h.js.Publish(ctx, subject, makeFrame(t, subject, bobEp, i)); err != nil { + t.Fatalf("publish %d: %v", i, err) + } + } + + code, raw, hr := h.getHistory(t, h.bob, roomID, "limit=2", 12) + if code != 200 { + t.Fatalf("history: want 200, got %d (%s)", code, raw) + } + if len(hr.Messages) != 2 { + t.Fatalf("limit=2 over 5 messages: want 2, got %d", len(hr.Messages)) + } + // The window is the last two messages (indices 3 and 4), in order. + for off, m := range hr.Messages { + decoded, _ := base64.StdEncoding.DecodeString(m) + f, err := frame.Unmarshal(decoded) + if err != nil { + t.Fatalf("limited message %d does not decode: %v", off, err) + } + want := fmt.Sprintf("msg-%02d", off+3) + if f.MsgID != want { + t.Fatalf("limited message %d: want MsgID %s, got %q", off, want, f.MsgID) + } + } +} + +// TestRoomHistoryEmptyRoom verifies a persisted room with no messages returns an +// empty (non-null) array, lazily ensuring the stream on the way. +func TestRoomHistoryEmptyRoom(t *testing.T) { + h := newHistoryHarness(t) + roomID, _ := h.seedPersistRoom(t) + + code, raw, hr := h.getHistory(t, h.bob, roomID, "", 13) + if code != 200 { + t.Fatalf("history: want 200, got %d (%s)", code, raw) + } + if hr.Messages == nil { + t.Fatalf("empty room must return [] not null (%s)", raw) + } + if len(hr.Messages) != 0 { + t.Fatalf("empty room: want 0 messages, got %d", len(hr.Messages)) + } + // The lazy ensure should have created the stream even though no message exists. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if _, err := h.js.Stream(ctx, roomStreamName(roomID)); err != nil { + t.Fatalf("lazy ensure should have created the stream: %v", err) + } +} + +// TestRoomHistoryUnauthenticated verifies an unsigned request is rejected with 401 +// under enforce, before the handler runs. +func TestRoomHistoryUnauthenticated(t *testing.T) { + h := newHistoryHarness(t) + roomID, _ := h.seedPersistRoom(t) + // No signing headers: plain GET against the enforce-mode control plane. + req, err := http.NewRequest("GET", h.ts.URL+"/rooms/"+roomID+"/history", nil) + if err != nil { + t.Fatalf("new request: %v", err) + } + code, body := do(t, req) + if code != 401 { + t.Fatalf("unauthenticated history: want 401, got %d (%s)", code, body) + } +} + +// TestRoomHistoryNonMember verifies a registered user who is NOT a member of the +// room is rejected with 403. +func TestRoomHistoryNonMember(t *testing.T) { + h := newHistoryHarness(t) + roomID, _ := h.seedPersistRoom(t) + code, body, _ := h.getHistory(t, h.carol, roomID, "", 14) + if code != 403 { + t.Fatalf("non-member history: want 403, got %d (%s)", code, body) + } +} + +// TestRoomHistoryRoomNotFound verifies a request for a non-existent room is a 404, +// distinct from the 403 a non-member of an existing room gets. +func TestRoomHistoryRoomNotFound(t *testing.T) { + h := newHistoryHarness(t) + code, body, _ := h.getHistory(t, h.alice, newULID(), "", 15) + if code != 404 { + t.Fatalf("missing room history: want 404, got %d (%s)", code, body) + } +} + +// waitMsgs polls the room's stream until it holds at least want messages or a short +// deadline elapses, so a core-NATS publish (which carries no stream ack) is observed +// deterministically without a fixed sleep. +func (h *historyHarness) waitMsgs(t *testing.T, roomID string, want uint64) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + st, err := h.js.Stream(ctx, roomStreamName(roomID)) + if err == nil { + si, ierr := st.Info(ctx) + if ierr == nil && si.State.Msgs >= want { + cancel() + return + } + } + cancel() + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("stream for room %s never reached %d message(s)", roomID, want) +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index 01a182f8..0f95d5ae 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -109,6 +109,21 @@ type Server struct { // the RemoteAddr-only behavior that predates the flag. Set by the command via // SetTrustedProxies. See clientIP. trustedProxies trustedProxyMatcher + + // js is the privileged JetStream context the server uses to own the durable + // per-room streams of persisted rooms: it ensures a room's stream on creation + // so the room's subject is captured from the first message — even from a + // JetStream-less browser client (uniweb) that speaks only core NATS — and reads + // it back for GET /rooms/{id}/history. It is wired by the command via + // SetJetStream whenever a JetStream-capable data plane is available (always for + // the embedded server). nil leaves history empty and stream-ensure a no-op, + // preserving the pre-feature behavior for a deployment without JetStream. + js jetstream.JetStream + // streamReplicas is the replication factor for the room streams the server + // creates, matched to the cluster's control-plane (KV) replication — 1 for a + // standalone node, up to 3 in an HA cluster — so a persisted room's history is + // as available as its metadata. Used only when js != nil. See SetJetStream. + streamReplicas int } // Posture describes the security posture a membershipd node runs with. It is @@ -143,6 +158,19 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server { return s } +// SetJetStream wires the privileged JetStream context (and the room-stream +// replication factor) the server uses to ensure and read the durable streams of +// persisted rooms. replicas below 1 is clamped to 1. It must be called once at +// startup, before the server begins serving; leaving it unset keeps history empty +// and stream-ensure a no-op, the behavior for a deployment without JetStream. +func (s *Server) SetJetStream(js jetstream.JetStream, replicas int) { + if replicas < 1 { + replicas = 1 + } + s.js = js + s.streamReplicas = replicas +} + // UseReplicatedNonces switches the server's anti-replay store from the // per-process in-memory cache to a JetStream KV bucket shared across the cluster // (issue 0003e). It MUST be called on every node of a multi-node deployment: @@ -403,6 +431,13 @@ func (s *Server) routes() { s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite) s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey) s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers) + // Durable message history for a persisted room, read server-side from the room's + // JetStream stream so a client without JetStream (the browser client uniweb) can + // load the backlog over plain HTTP. Member-only, like /key and /members. + // Registered without the /api prefix like every other control-plane route: Caddy + // strips /api via handle_path /api/* before forwarding, so the SPA's + // GET /api/rooms/{id}/history arrives here as GET /rooms/{id}/history. + s.mux.HandleFunc("GET /rooms/{id}/history", s.handleRoomHistory) s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms) s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey) s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom) @@ -632,6 +667,21 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) { SignMsgs: req.Policy.SignMsgs, OwnerEndpoint: req.Owner.Endpoint, } + // Own the durable stream for a persisted room (issue room-history): ensure it + // BEFORE the room row is written so the subject is captured from the very first + // message whoever publishes it — a Go client OR a JetStream-less browser client. + // Done first so a stream failure aborts cleanly with no orphan room row (the + // rare orphan empty stream it can leave is harmless and idempotently reused). + // Skipped when no JetStream is wired: the room still works, just without history. + if info.Persist && s.js != nil { + ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout) + err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas) + cancel() + if err != nil { + writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) + return + } + } if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return