package membership // Server-side durable history for persisted rooms (room.ModeMatrix / Persist). // // A persisted room's messages ride a file-backed JetStream stream named // "UNIBUS_" (roomStreamName, identical to pkg/client.streamName). Until // now that stream was created only by the Go client's first publish/subscribe; a // client that speaks only core NATS (the browser client uniweb, which has no // JetStream) therefore never created it, so its messages were captured nowhere and // vanished on reload. This file moves stream ownership to the server: the control // plane ensures the stream when a persisted room is created (so capture starts at // minute zero whoever publishes) and exposes GET /rooms/{id}/history so a // JetStream-less client can read the backlog over plain HTTP. // // The server never decrypts: each stored message is the E2E frame exactly as it // was published (ciphertext for an encrypted room). The history endpoint returns // those bytes verbatim (base64-encoded for JSON safety), so end-to-end encryption // is preserved — the server only relays the bytes it already holds. import ( "context" "encoding/base64" "errors" "fmt" "net/http" "strconv" "time" "github.com/nats-io/nats.go/jetstream" ) const ( // defaultHistoryLimit is the number of most-recent messages returned when the // caller does not specify ?limit. defaultHistoryLimit = 200 // maxHistoryLimit is the hard ceiling on a single history response, so a caller // cannot ask the server to buffer an unbounded backlog into one JSON payload. maxHistoryLimit = 1000 // historyOpTimeout bounds each JetStream operation the history path performs // (stream lookup/ensure, info, per-message get) so a stalled data plane cannot // hang a control-plane request indefinitely. historyOpTimeout = 5 * time.Second ) // historyResp is the GET /rooms/{id}/history response envelope. messages is the // ordered (oldest→newest) list of the room's most recent frames, each the base64 // (standard encoding) of the marshaled, still-encrypted frame as it was published. // The key is a stable contract consumed by the browser client; do not rename it. type historyResp struct { Messages []string `json:"messages"` } // streamConfigForRoom builds the JetStream stream config for a persisted room. // // It MUST stay byte-for-byte compatible with pkg/client/persist.go's ensureStream // (the original owner of this format): same name derivation (roomStreamName == // pkg/client.streamName), same single subject, LimitsPolicy retention, file // storage. pkg/client is the source of truth for the format; we copy it here // rather than import it because pkg/client imports pkg/membership and importing it // back would be a cycle. The only addition is Replicas, matched to the cluster's // control-plane replication so a persisted room's history is as available as its // metadata (1 standalone, up to 3 in an HA cluster). CreateOrUpdateStream treats a // matching config as a no-op, so the client's later ensureStream is harmless. func streamConfigForRoom(roomID, subject string, replicas int) jetstream.StreamConfig { if replicas < 1 { replicas = 1 } return jetstream.StreamConfig{ Name: roomStreamName(roomID), Subjects: []string{subject}, Retention: jetstream.LimitsPolicy, Storage: jetstream.FileStorage, Replicas: replicas, } } // ensureRoomStream idempotently creates (or no-ops on) the durable stream that // captures a persisted room's subject. CreateOrUpdateStream returns the existing // stream unchanged when the config matches, so this is safe to call on every room // creation and on every history read (lazy backfill of pre-existing rooms). func ensureRoomStream(ctx context.Context, js jetstream.JetStream, roomID, subject string, replicas int) error { if _, err := js.CreateOrUpdateStream(ctx, streamConfigForRoom(roomID, subject, replicas)); err != nil { return fmt.Errorf("membership: ensure stream for room %s: %w", roomID, err) } return nil } // readRoomHistory returns the last `limit` messages of a room's durable stream in // chronological order (oldest→newest), each base64-encoded (standard encoding). A // stream that does not exist yet, or that holds no messages, yields an empty slice // (not an error): a freshly created or never-used room simply has no history. It // reads by sequence via the stream MSG.GET API rather than binding a consumer, so // it has no side effects on any peer's durable ack position. A gap in the sequence // range (a purged/deleted message) is skipped rather than failing the whole read, // so the result length is bounded by `limit` but may be smaller. func readRoomHistory(ctx context.Context, js jetstream.JetStream, roomID string, limit int) ([]string, error) { out := []string{} stream, err := js.Stream(ctx, roomStreamName(roomID)) if err != nil { if errors.Is(err, jetstream.ErrStreamNotFound) { return out, nil } return nil, fmt.Errorf("membership: lookup stream for room %s: %w", roomID, err) } si, err := stream.Info(ctx) if err != nil { return nil, fmt.Errorf("membership: stream info for room %s: %w", roomID, err) } first, last := si.State.FirstSeq, si.State.LastSeq if si.State.Msgs == 0 || last == 0 { return out, nil } // Window of the last `limit` sequence numbers, clamped to the first stored seq. // last >= limit guards the unsigned subtraction against underflow. start := first if last >= uint64(limit) { if cand := last - uint64(limit) + 1; cand > start { start = cand } } for seq := start; seq <= last; seq++ { raw, err := stream.GetMsg(ctx, seq) if err != nil { // A purged/deleted sequence leaves a gap; skip it rather than abort. continue } out = append(out, base64.StdEncoding.EncodeToString(raw.Data)) } return out, nil } // parseHistoryLimit reads the ?limit query value, applying the default when it is // absent and clamping out-of-range / malformed values to [1, maxHistoryLimit]. func parseHistoryLimit(q string) int { if q == "" { return defaultHistoryLimit } n, err := strconv.Atoi(q) if err != nil || n <= 0 { return defaultHistoryLimit } if n > maxHistoryLimit { return maxHistoryLimit } return n } // handleRoomHistory serves GET /rooms/{id}/history: the last ?limit (default 200, // hard cap 1000) messages of a persisted room, oldest→newest, each the base64 of // the still-encrypted frame as published. The server never decrypts — it relays // the ciphertext bytes the stream already holds, preserving E2E. // // Authorization mirrors the sibling room reads (/key, /members): the request must // be a member of the room (requireMember; allowed under AuthOff/dev where no signer // is verified). A missing room is 404; a non-member is 403; an unsigned request // under enforce is rejected with 401 by the auth middleware before this runs. // // For a persisted room the stream is ensured first (lazy backfill): a room created // before the server managed streams begins capturing from now on. Messages sent // before the stream existed were never captured and are unrecoverable — only // messages from stream creation onward appear here. func (s *Server) handleRoomHistory(w http.ResponseWriter, r *http.Request) { roomID := r.PathValue("id") // Existence first so a missing room is a clean 404 (the documented contract), // distinct from a 403 for an existing room the caller is not a member of. info, err := s.store.GetRoom(roomID) if err != nil { writeErr(w, http.StatusNotFound, "room not found") return } if _, ok := s.requireMember(w, r, roomID); !ok { return } limit := parseHistoryLimit(r.URL.Query().Get("limit")) // No JetStream wired (e.g. an external-NATS deployment without a cluster/KV // feature): there is no durable stream to read, so report an empty history // rather than 500 — a client degrades to "no backlog" gracefully. if s.js == nil { writeJSON(w, http.StatusOK, historyResp{Messages: []string{}}) return } ctx, cancel := context.WithTimeout(r.Context(), historyOpTimeout) defer cancel() if info.Persist { if err := ensureRoomStream(ctx, s.js, roomID, info.Subject, s.streamReplicas); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } } msgs, err := readRoomHistory(ctx, s.js, roomID, limit) if err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusOK, historyResp{Messages: msgs}) }