feat(membership): server owns persisted rooms' stream + GET /rooms/{id}/history
The durable JetStream stream of a persisted (ModeMatrix) room was created only
by the Go client's first publish/subscribe. A client that speaks only core NATS
(the browser client uniweb, which has no JetStream) therefore never created it,
so its messages were captured nowhere and lost on reload. Move stream ownership
to the control plane and expose the backlog over plain HTTP.
- handleCreateRoom ensures the room's stream (idempotent CreateOrUpdateStream)
BEFORE writing the room row, so the subject is captured from the first message
whoever publishes it. Done before the store write so a stream failure leaves no
orphan room. Skipped when no JetStream is wired (room still works, no history).
- New member-only GET /rooms/{id}/history?limit=N (default 200, hard cap 1000):
reads the stream server-side via the modern jetstream API (Stream.Info +
GetMsg by sequence, no consumer) and returns the last N frames oldest->newest
as {"messages":[<base64-std of the marshaled frame>]}. The server never
decrypts — it relays the E2E ciphertext bytes the stream already holds.
Existence is checked first (404), then membership (403); enforce rejects an
unsigned request with 401 before the handler runs.
- Lazy backfill: the history endpoint ensures the stream of a pre-existing
persisted room, so it starts capturing from now on. Messages sent before the
stream existed were never captured and are unrecoverable.
- The stream config (streamConfigForRoom) mirrors pkg/client/persist.go
byte-for-byte plus Replicas (matched to the control-plane KV replication). It
is copied rather than imported because pkg/client imports pkg/membership and
the reverse would be an import cycle; the source of truth is documented in a
comment.
- Server gains SetJetStream(js, replicas) to wire the privileged JetStream
context and the room-stream replication factor.
Tests (history_test.go): golden (3 frames round-trip in order, decodable),
core-NATS capture (the central fix), handleCreateRoom creates the stream, limit,
empty room ([] not null), 401 unsigned, 403 non-member, 404 missing room.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,198 @@
|
||||
package membership
|
||||
|
||||
// Server-side durable history for persisted rooms (room.ModeMatrix / Persist).
|
||||
//
|
||||
// A persisted room's messages ride a file-backed JetStream stream named
|
||||
// "UNIBUS_<roomID>" (roomStreamName, identical to pkg/client.streamName). Until
|
||||
// now that stream was created only by the Go client's first publish/subscribe; a
|
||||
// client that speaks only core NATS (the browser client uniweb, which has no
|
||||
// JetStream) therefore never created it, so its messages were captured nowhere and
|
||||
// vanished on reload. This file moves stream ownership to the server: the control
|
||||
// plane ensures the stream when a persisted room is created (so capture starts at
|
||||
// minute zero whoever publishes) and exposes GET /rooms/{id}/history so a
|
||||
// JetStream-less client can read the backlog over plain HTTP.
|
||||
//
|
||||
// The server never decrypts: each stored message is the E2E frame exactly as it
|
||||
// was published (ciphertext for an encrypted room). The history endpoint returns
|
||||
// those bytes verbatim (base64-encoded for JSON safety), so end-to-end encryption
|
||||
// is preserved — the server only relays the bytes it already holds.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultHistoryLimit is the number of most-recent messages returned when the
|
||||
// caller does not specify ?limit.
|
||||
defaultHistoryLimit = 200
|
||||
// maxHistoryLimit is the hard ceiling on a single history response, so a caller
|
||||
// cannot ask the server to buffer an unbounded backlog into one JSON payload.
|
||||
maxHistoryLimit = 1000
|
||||
// historyOpTimeout bounds each JetStream operation the history path performs
|
||||
// (stream lookup/ensure, info, per-message get) so a stalled data plane cannot
|
||||
// hang a control-plane request indefinitely.
|
||||
historyOpTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// historyResp is the GET /rooms/{id}/history response envelope. messages is the
|
||||
// ordered (oldest→newest) list of the room's most recent frames, each the base64
|
||||
// (standard encoding) of the marshaled, still-encrypted frame as it was published.
|
||||
// The key is a stable contract consumed by the browser client; do not rename it.
|
||||
type historyResp struct {
|
||||
Messages []string `json:"messages"`
|
||||
}
|
||||
|
||||
// streamConfigForRoom builds the JetStream stream config for a persisted room.
|
||||
//
|
||||
// It MUST stay byte-for-byte compatible with pkg/client/persist.go's ensureStream
|
||||
// (the original owner of this format): same name derivation (roomStreamName ==
|
||||
// pkg/client.streamName), same single subject, LimitsPolicy retention, file
|
||||
// storage. pkg/client is the source of truth for the format; we copy it here
|
||||
// rather than import it because pkg/client imports pkg/membership and importing it
|
||||
// back would be a cycle. The only addition is Replicas, matched to the cluster's
|
||||
// control-plane replication so a persisted room's history is as available as its
|
||||
// metadata (1 standalone, up to 3 in an HA cluster). CreateOrUpdateStream treats a
|
||||
// matching config as a no-op, so the client's later ensureStream is harmless.
|
||||
func streamConfigForRoom(roomID, subject string, replicas int) jetstream.StreamConfig {
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
return jetstream.StreamConfig{
|
||||
Name: roomStreamName(roomID),
|
||||
Subjects: []string{subject},
|
||||
Retention: jetstream.LimitsPolicy,
|
||||
Storage: jetstream.FileStorage,
|
||||
Replicas: replicas,
|
||||
}
|
||||
}
|
||||
|
||||
// ensureRoomStream idempotently creates (or no-ops on) the durable stream that
|
||||
// captures a persisted room's subject. CreateOrUpdateStream returns the existing
|
||||
// stream unchanged when the config matches, so this is safe to call on every room
|
||||
// creation and on every history read (lazy backfill of pre-existing rooms).
|
||||
func ensureRoomStream(ctx context.Context, js jetstream.JetStream, roomID, subject string, replicas int) error {
|
||||
if _, err := js.CreateOrUpdateStream(ctx, streamConfigForRoom(roomID, subject, replicas)); err != nil {
|
||||
return fmt.Errorf("membership: ensure stream for room %s: %w", roomID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readRoomHistory returns the last `limit` messages of a room's durable stream in
|
||||
// chronological order (oldest→newest), each base64-encoded (standard encoding). A
|
||||
// stream that does not exist yet, or that holds no messages, yields an empty slice
|
||||
// (not an error): a freshly created or never-used room simply has no history. It
|
||||
// reads by sequence via the stream MSG.GET API rather than binding a consumer, so
|
||||
// it has no side effects on any peer's durable ack position. A gap in the sequence
|
||||
// range (a purged/deleted message) is skipped rather than failing the whole read,
|
||||
// so the result length is bounded by `limit` but may be smaller.
|
||||
func readRoomHistory(ctx context.Context, js jetstream.JetStream, roomID string, limit int) ([]string, error) {
|
||||
out := []string{}
|
||||
stream, err := js.Stream(ctx, roomStreamName(roomID))
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrStreamNotFound) {
|
||||
return out, nil
|
||||
}
|
||||
return nil, fmt.Errorf("membership: lookup stream for room %s: %w", roomID, err)
|
||||
}
|
||||
si, err := stream.Info(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("membership: stream info for room %s: %w", roomID, err)
|
||||
}
|
||||
first, last := si.State.FirstSeq, si.State.LastSeq
|
||||
if si.State.Msgs == 0 || last == 0 {
|
||||
return out, nil
|
||||
}
|
||||
// Window of the last `limit` sequence numbers, clamped to the first stored seq.
|
||||
// last >= limit guards the unsigned subtraction against underflow.
|
||||
start := first
|
||||
if last >= uint64(limit) {
|
||||
if cand := last - uint64(limit) + 1; cand > start {
|
||||
start = cand
|
||||
}
|
||||
}
|
||||
for seq := start; seq <= last; seq++ {
|
||||
raw, err := stream.GetMsg(ctx, seq)
|
||||
if err != nil {
|
||||
// A purged/deleted sequence leaves a gap; skip it rather than abort.
|
||||
continue
|
||||
}
|
||||
out = append(out, base64.StdEncoding.EncodeToString(raw.Data))
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// parseHistoryLimit reads the ?limit query value, applying the default when it is
|
||||
// absent and clamping out-of-range / malformed values to [1, maxHistoryLimit].
|
||||
func parseHistoryLimit(q string) int {
|
||||
if q == "" {
|
||||
return defaultHistoryLimit
|
||||
}
|
||||
n, err := strconv.Atoi(q)
|
||||
if err != nil || n <= 0 {
|
||||
return defaultHistoryLimit
|
||||
}
|
||||
if n > maxHistoryLimit {
|
||||
return maxHistoryLimit
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// handleRoomHistory serves GET /rooms/{id}/history: the last ?limit (default 200,
|
||||
// hard cap 1000) messages of a persisted room, oldest→newest, each the base64 of
|
||||
// the still-encrypted frame as published. The server never decrypts — it relays
|
||||
// the ciphertext bytes the stream already holds, preserving E2E.
|
||||
//
|
||||
// Authorization mirrors the sibling room reads (/key, /members): the request must
|
||||
// be a member of the room (requireMember; allowed under AuthOff/dev where no signer
|
||||
// is verified). A missing room is 404; a non-member is 403; an unsigned request
|
||||
// under enforce is rejected with 401 by the auth middleware before this runs.
|
||||
//
|
||||
// For a persisted room the stream is ensured first (lazy backfill): a room created
|
||||
// before the server managed streams begins capturing from now on. Messages sent
|
||||
// before the stream existed were never captured and are unrecoverable — only
|
||||
// messages from stream creation onward appear here.
|
||||
func (s *Server) handleRoomHistory(w http.ResponseWriter, r *http.Request) {
|
||||
roomID := r.PathValue("id")
|
||||
// Existence first so a missing room is a clean 404 (the documented contract),
|
||||
// distinct from a 403 for an existing room the caller is not a member of.
|
||||
info, err := s.store.GetRoom(roomID)
|
||||
if err != nil {
|
||||
writeErr(w, http.StatusNotFound, "room not found")
|
||||
return
|
||||
}
|
||||
if _, ok := s.requireMember(w, r, roomID); !ok {
|
||||
return
|
||||
}
|
||||
limit := parseHistoryLimit(r.URL.Query().Get("limit"))
|
||||
|
||||
// No JetStream wired (e.g. an external-NATS deployment without a cluster/KV
|
||||
// feature): there is no durable stream to read, so report an empty history
|
||||
// rather than 500 — a client degrades to "no backlog" gracefully.
|
||||
if s.js == nil {
|
||||
writeJSON(w, http.StatusOK, historyResp{Messages: []string{}})
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
|
||||
defer cancel()
|
||||
if info.Persist {
|
||||
if err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas); err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
msgs, err := readRoomHistory(ctx, s.js, roomID, limit)
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, historyResp{Messages: msgs})
|
||||
}
|
||||
@@ -0,0 +1,400 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cs "fn-registry/functions/cybersecurity"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// historyHarness is an enforce-mode control plane wired to a real embedded NATS
|
||||
// JetStream, so the history path exercises the production code: the server ensures
|
||||
// and reads actual durable streams. alice is a seeded admin (and any room's owner),
|
||||
// bob is a registered user added as a room member, and carol is a registered user
|
||||
// that is NOT a member of the test room (to exercise the 403 path).
|
||||
type historyHarness struct {
|
||||
ts *httptest.Server
|
||||
store Store
|
||||
js jetstream.JetStream
|
||||
nc *nats.Conn
|
||||
alice cs.Identity // admin + room owner
|
||||
bob cs.Identity // room member
|
||||
carol cs.Identity // registered, non-member
|
||||
}
|
||||
|
||||
func newHistoryHarness(t *testing.T) *historyHarness {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: filepath.Join(dir, "jetstream"),
|
||||
Host: "127.0.0.1",
|
||||
Port: kvFreePort(t),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("embedded nats: %v", err)
|
||||
}
|
||||
nc, err := nats.Connect(ns.ClientURL())
|
||||
if err != nil {
|
||||
ns.Shutdown()
|
||||
t.Fatalf("nats connect: %v", err)
|
||||
}
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("jetstream: %v", err)
|
||||
}
|
||||
store, err := Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
if err != nil {
|
||||
store.Close()
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
t.Fatalf("open blobs: %v", err)
|
||||
}
|
||||
mustID := func(name string) cs.Identity {
|
||||
id, err := cs.GenerateIdentity()
|
||||
if err != nil {
|
||||
t.Fatalf("identity %s: %v", name, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
alice, bob, carol := mustID("alice"), mustID("bob"), mustID("carol")
|
||||
if err := store.AddUser(hex.EncodeToString(alice.SignPub), "alice", RoleAdmin); err != nil {
|
||||
t.Fatalf("seed admin: %v", err)
|
||||
}
|
||||
for _, u := range []struct {
|
||||
id cs.Identity
|
||||
handle string
|
||||
}{{bob, "bob"}, {carol, "carol"}} {
|
||||
if err := store.AddUser(hex.EncodeToString(u.id.SignPub), u.handle, RoleMember); err != nil {
|
||||
t.Fatalf("register %s: %v", u.handle, err)
|
||||
}
|
||||
}
|
||||
|
||||
srv := NewServer(store, blobs, AuthEnforce)
|
||||
srv.SetJetStream(js, 1)
|
||||
ts := httptest.NewServer(srv)
|
||||
t.Cleanup(func() {
|
||||
ts.Close()
|
||||
store.Close()
|
||||
nc.Close()
|
||||
ns.Shutdown()
|
||||
ns.WaitForShutdown()
|
||||
})
|
||||
return &historyHarness{ts: ts, store: store, js: js, nc: nc, alice: alice, bob: bob, carol: carol}
|
||||
}
|
||||
|
||||
// seedPersistRoom creates a persisted (Matrix-policy) room directly in the store
|
||||
// with alice as owner and bob as a member, returning its id and subject. It does
|
||||
// NOT create the stream — that is left to the code under test (handleCreateRoom or
|
||||
// the lazy ensure in the history endpoint), which is exactly what we want to verify.
|
||||
func (h *historyHarness) seedPersistRoom(t *testing.T) (roomID, subject string) {
|
||||
t.Helper()
|
||||
roomID = newULID()
|
||||
subject = "unibus.room." + roomID
|
||||
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||
info := RoomInfo{RoomID: roomID, Subject: subject, OwnerEndpoint: aliceEp, Encrypt: true, Persist: true}
|
||||
if err := h.store.CreateRoom(info, h.alice.SignPub, h.alice.KexPub, []byte("alice-sealed")); err != nil {
|
||||
t.Fatalf("seed room: %v", err)
|
||||
}
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
bobM := Member{Endpoint: bobEp, Role: RoleMember, SignPub: h.bob.SignPub, KexPub: h.bob.KexPub}
|
||||
if err := h.store.AddMember(roomID, bobM, 0, []byte("bob-sealed")); err != nil {
|
||||
t.Fatalf("add member bob: %v", err)
|
||||
}
|
||||
return roomID, subject
|
||||
}
|
||||
|
||||
// makeFrame builds a marshaled PUB frame whose payload identifies it, so a test can
|
||||
// assert exact bytes and ordering after a round trip through the stream + endpoint.
|
||||
func makeFrame(t *testing.T, subject, sender string, i int) []byte {
|
||||
t.Helper()
|
||||
f := frame.Frame{
|
||||
Type: frame.PUB,
|
||||
Subject: subject,
|
||||
Sender: sender,
|
||||
MsgID: fmt.Sprintf("msg-%02d", i),
|
||||
Payload: []byte(fmt.Sprintf("ciphertext-%02d", i)),
|
||||
}
|
||||
b, err := f.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("marshal frame %d: %v", i, err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// getHistory signs a GET /rooms/{id}/history request as id and returns the status,
|
||||
// the raw body, and the decoded envelope. query is the raw query string (e.g.
|
||||
// "limit=2") or "". The signed path includes the query because the server verifies
|
||||
// the signature over r.URL.RequestURI(), which carries it.
|
||||
func (h *historyHarness) getHistory(t *testing.T, id cs.Identity, roomID, query string, n int) (int, string, historyResp) {
|
||||
t.Helper()
|
||||
path := "/rooms/" + roomID + "/history"
|
||||
if query != "" {
|
||||
path += "?" + query
|
||||
}
|
||||
req := signedReq(t, h.ts.URL, "GET", path, nil, id, time.Now().Unix(), nonceN(n))
|
||||
code, body := do(t, req)
|
||||
var out historyResp
|
||||
if code == 200 {
|
||||
if err := json.Unmarshal([]byte(body), &out); err != nil {
|
||||
t.Fatalf("decode history: %v (%s)", err, body)
|
||||
}
|
||||
}
|
||||
return code, body, out
|
||||
}
|
||||
|
||||
// TestCreateRoomEnsuresStream verifies handleCreateRoom creates the durable stream
|
||||
// for a persisted room before responding, so capture starts at room creation.
|
||||
func TestCreateRoomEnsuresStream(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
aliceEp := frame.EndpointID(h.alice.SignPub)
|
||||
reqBody := createRoomReq{
|
||||
Subject: "unibus.room.created",
|
||||
Policy: policyJSON{Encrypt: true, Persist: true},
|
||||
Owner: endpointJSON{Endpoint: aliceEp, SignPub: h.alice.SignPub, KexPub: h.alice.KexPub},
|
||||
SealedKeySelf: []byte("alice-sealed"),
|
||||
}
|
||||
body, _ := json.Marshal(reqBody)
|
||||
req := signedReq(t, h.ts.URL, "POST", "/rooms", body, h.alice, time.Now().Unix(), nonceN(1))
|
||||
code, respBody := do(t, req)
|
||||
if code != 201 {
|
||||
t.Fatalf("create room: want 201, got %d (%s)", code, respBody)
|
||||
}
|
||||
var cr createRoomResp
|
||||
if err := json.Unmarshal([]byte(respBody), &cr); err != nil {
|
||||
t.Fatalf("decode create resp: %v (%s)", err, respBody)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if _, err := h.js.Stream(ctx, roomStreamName(cr.RoomID)); err != nil {
|
||||
t.Fatalf("stream for created persist room should exist: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryGolden is the golden path: three frames published to a persisted
|
||||
// room's stream come back from the endpoint base64-encoded, in chronological order,
|
||||
// and decode to the exact frames that were published.
|
||||
func TestRoomHistoryGolden(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
want := make([][]byte, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
want[i] = makeFrame(t, subject, bobEp, i)
|
||||
// js.Publish waits for the stream ack, so the message is durably stored before
|
||||
// the next iteration — no sleeps, deterministic ordering.
|
||||
if _, err := h.js.Publish(ctx, subject, want[i]); err != nil {
|
||||
t.Fatalf("publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 10)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 3 {
|
||||
t.Fatalf("want 3 messages, got %d (%s)", len(hr.Messages), raw)
|
||||
}
|
||||
for i, m := range hr.Messages {
|
||||
decoded, err := base64.StdEncoding.DecodeString(m)
|
||||
if err != nil {
|
||||
t.Fatalf("message %d not valid base64: %v", i, err)
|
||||
}
|
||||
if string(decoded) != string(want[i]) {
|
||||
t.Fatalf("message %d bytes mismatch (order or content)", i)
|
||||
}
|
||||
f, err := frame.Unmarshal(decoded)
|
||||
if err != nil {
|
||||
t.Fatalf("message %d does not decode to a frame: %v", i, err)
|
||||
}
|
||||
if f.MsgID != fmt.Sprintf("msg-%02d", i) {
|
||||
t.Fatalf("message %d: want MsgID msg-%02d, got %q", i, i, f.MsgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryCapturesCoreNATSPublish proves the central fix: a message
|
||||
// published over PLAIN core NATS (as the JetStream-less browser client uniweb does)
|
||||
// is captured by the server-owned stream and served by the endpoint. Without the
|
||||
// server ensuring the stream, this message would be captured nowhere.
|
||||
func TestRoomHistoryCapturesCoreNATSPublish(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
sent := makeFrame(t, subject, bobEp, 7)
|
||||
if err := h.nc.Publish(subject, sent); err != nil {
|
||||
t.Fatalf("core publish: %v", err)
|
||||
}
|
||||
if err := h.nc.Flush(); err != nil {
|
||||
t.Fatalf("flush: %v", err)
|
||||
}
|
||||
// Core NATS publish has no stream ack; poll the stream until the message lands.
|
||||
h.waitMsgs(t, roomID, 1)
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 11)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 1 {
|
||||
t.Fatalf("want 1 captured message, got %d (%s)", len(hr.Messages), raw)
|
||||
}
|
||||
decoded, err := base64.StdEncoding.DecodeString(hr.Messages[0])
|
||||
if err != nil || string(decoded) != string(sent) {
|
||||
t.Fatalf("captured core-NATS message round-trip mismatch (err=%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryLimit verifies ?limit caps the response to the most recent N
|
||||
// messages, oldest→newest within the window.
|
||||
func TestRoomHistoryLimit(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, subject := h.seedPersistRoom(t)
|
||||
bobEp := frame.EndpointID(h.bob.SignPub)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ensureRoomStream(ctx, h.js, roomID, subject, 1); err != nil {
|
||||
t.Fatalf("ensure stream: %v", err)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := h.js.Publish(ctx, subject, makeFrame(t, subject, bobEp, i)); err != nil {
|
||||
t.Fatalf("publish %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "limit=2", 12)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if len(hr.Messages) != 2 {
|
||||
t.Fatalf("limit=2 over 5 messages: want 2, got %d", len(hr.Messages))
|
||||
}
|
||||
// The window is the last two messages (indices 3 and 4), in order.
|
||||
for off, m := range hr.Messages {
|
||||
decoded, _ := base64.StdEncoding.DecodeString(m)
|
||||
f, err := frame.Unmarshal(decoded)
|
||||
if err != nil {
|
||||
t.Fatalf("limited message %d does not decode: %v", off, err)
|
||||
}
|
||||
want := fmt.Sprintf("msg-%02d", off+3)
|
||||
if f.MsgID != want {
|
||||
t.Fatalf("limited message %d: want MsgID %s, got %q", off, want, f.MsgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryEmptyRoom verifies a persisted room with no messages returns an
|
||||
// empty (non-null) array, lazily ensuring the stream on the way.
|
||||
func TestRoomHistoryEmptyRoom(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
|
||||
code, raw, hr := h.getHistory(t, h.bob, roomID, "", 13)
|
||||
if code != 200 {
|
||||
t.Fatalf("history: want 200, got %d (%s)", code, raw)
|
||||
}
|
||||
if hr.Messages == nil {
|
||||
t.Fatalf("empty room must return [] not null (%s)", raw)
|
||||
}
|
||||
if len(hr.Messages) != 0 {
|
||||
t.Fatalf("empty room: want 0 messages, got %d", len(hr.Messages))
|
||||
}
|
||||
// The lazy ensure should have created the stream even though no message exists.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if _, err := h.js.Stream(ctx, roomStreamName(roomID)); err != nil {
|
||||
t.Fatalf("lazy ensure should have created the stream: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryUnauthenticated verifies an unsigned request is rejected with 401
|
||||
// under enforce, before the handler runs.
|
||||
func TestRoomHistoryUnauthenticated(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
// No signing headers: plain GET against the enforce-mode control plane.
|
||||
req, err := http.NewRequest("GET", h.ts.URL+"/rooms/"+roomID+"/history", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
code, body := do(t, req)
|
||||
if code != 401 {
|
||||
t.Fatalf("unauthenticated history: want 401, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryNonMember verifies a registered user who is NOT a member of the
|
||||
// room is rejected with 403.
|
||||
func TestRoomHistoryNonMember(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
roomID, _ := h.seedPersistRoom(t)
|
||||
code, body, _ := h.getHistory(t, h.carol, roomID, "", 14)
|
||||
if code != 403 {
|
||||
t.Fatalf("non-member history: want 403, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRoomHistoryRoomNotFound verifies a request for a non-existent room is a 404,
|
||||
// distinct from the 403 a non-member of an existing room gets.
|
||||
func TestRoomHistoryRoomNotFound(t *testing.T) {
|
||||
h := newHistoryHarness(t)
|
||||
code, body, _ := h.getHistory(t, h.alice, newULID(), "", 15)
|
||||
if code != 404 {
|
||||
t.Fatalf("missing room history: want 404, got %d (%s)", code, body)
|
||||
}
|
||||
}
|
||||
|
||||
// waitMsgs polls the room's stream until it holds at least want messages or a short
|
||||
// deadline elapses, so a core-NATS publish (which carries no stream ack) is observed
|
||||
// deterministically without a fixed sleep.
|
||||
func (h *historyHarness) waitMsgs(t *testing.T, roomID string, want uint64) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
st, err := h.js.Stream(ctx, roomStreamName(roomID))
|
||||
if err == nil {
|
||||
si, ierr := st.Info(ctx)
|
||||
if ierr == nil && si.State.Msgs >= want {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("stream for room %s never reached %d message(s)", roomID, want)
|
||||
}
|
||||
@@ -109,6 +109,21 @@ type Server struct {
|
||||
// the RemoteAddr-only behavior that predates the flag. Set by the command via
|
||||
// SetTrustedProxies. See clientIP.
|
||||
trustedProxies trustedProxyMatcher
|
||||
|
||||
// js is the privileged JetStream context the server uses to own the durable
|
||||
// per-room streams of persisted rooms: it ensures a room's stream on creation
|
||||
// so the room's subject is captured from the first message — even from a
|
||||
// JetStream-less browser client (uniweb) that speaks only core NATS — and reads
|
||||
// it back for GET /rooms/{id}/history. It is wired by the command via
|
||||
// SetJetStream whenever a JetStream-capable data plane is available (always for
|
||||
// the embedded server). nil leaves history empty and stream-ensure a no-op,
|
||||
// preserving the pre-feature behavior for a deployment without JetStream.
|
||||
js jetstream.JetStream
|
||||
// streamReplicas is the replication factor for the room streams the server
|
||||
// creates, matched to the cluster's control-plane (KV) replication — 1 for a
|
||||
// standalone node, up to 3 in an HA cluster — so a persisted room's history is
|
||||
// as available as its metadata. Used only when js != nil. See SetJetStream.
|
||||
streamReplicas int
|
||||
}
|
||||
|
||||
// Posture describes the security posture a membershipd node runs with. It is
|
||||
@@ -143,6 +158,19 @@ func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// SetJetStream wires the privileged JetStream context (and the room-stream
|
||||
// replication factor) the server uses to ensure and read the durable streams of
|
||||
// persisted rooms. replicas below 1 is clamped to 1. It must be called once at
|
||||
// startup, before the server begins serving; leaving it unset keeps history empty
|
||||
// and stream-ensure a no-op, the behavior for a deployment without JetStream.
|
||||
func (s *Server) SetJetStream(js jetstream.JetStream, replicas int) {
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
s.js = js
|
||||
s.streamReplicas = replicas
|
||||
}
|
||||
|
||||
// UseReplicatedNonces switches the server's anti-replay store from the
|
||||
// per-process in-memory cache to a JetStream KV bucket shared across the cluster
|
||||
// (issue 0003e). It MUST be called on every node of a multi-node deployment:
|
||||
@@ -403,6 +431,13 @@ func (s *Server) routes() {
|
||||
s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite)
|
||||
s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey)
|
||||
s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers)
|
||||
// Durable message history for a persisted room, read server-side from the room's
|
||||
// JetStream stream so a client without JetStream (the browser client uniweb) can
|
||||
// load the backlog over plain HTTP. Member-only, like /key and /members.
|
||||
// Registered without the /api prefix like every other control-plane route: Caddy
|
||||
// strips /api via handle_path /api/* before forwarding, so the SPA's
|
||||
// GET /api/rooms/{id}/history arrives here as GET /rooms/{id}/history.
|
||||
s.mux.HandleFunc("GET /rooms/{id}/history", s.handleRoomHistory)
|
||||
s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms)
|
||||
s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey)
|
||||
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
|
||||
@@ -632,6 +667,21 @@ func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
||||
SignMsgs: req.Policy.SignMsgs,
|
||||
OwnerEndpoint: req.Owner.Endpoint,
|
||||
}
|
||||
// Own the durable stream for a persisted room (issue room-history): ensure it
|
||||
// BEFORE the room row is written so the subject is captured from the very first
|
||||
// message whoever publishes it — a Go client OR a JetStream-less browser client.
|
||||
// Done first so a stream failure aborts cleanly with no orphan room row (the
|
||||
// rare orphan empty stream it can leave is harmless and idempotently reused).
|
||||
// Skipped when no JetStream is wired: the room still works, just without history.
|
||||
if info.Persist && s.js != nil {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout)
|
||||
err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas)
|
||||
cancel()
|
||||
if err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
|
||||
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user