// This file adds optional per-room durable persistence on top of the core // (ephemeral) NATS data plane. Persistence is gated entirely by the per-room // RoomPolicy.Persist flag: // // - Persist == false (room.ModeNATS): nothing here runs. Publish/Subscribe // use plain core NATS exactly as before — fast, fan-out, no history. // - Persist == true (room.ModeMatrix): the room's subject is captured by a // durable JetStream stream. Late joiners receive the full backlog // (scrollback) and a peer that was offline resumes from its last ack // (replay). The Frame, the encryption and the signature are identical to // the ephemeral path; only the transport changes. // // We use the modern github.com/nats-io/nats.go/jetstream API throughout (not // the legacy nc.JetStream() context), and we are consistent about it across // the whole client. package client import ( "context" "errors" "fmt" "strings" "time" "github.com/nats-io/nats.go/jetstream" ) // streamName derives a JetStream stream name from a room id. JetStream stream // (and durable consumer) names cannot contain '.', '*', '>', path separators // or whitespace, so we sanitize any char outside [A-Za-z0-9_] to '_' and prefix // with "UNIBUS_" to namespace the bus's streams. func streamName(roomID string) string { return "UNIBUS_" + sanitizeJSName(roomID) } // sanitizeJSName replaces every rune outside [A-Za-z0-9_] with '_' so the // result is a legal JetStream stream/consumer name token. func sanitizeJSName(s string) string { var b strings.Builder b.Grow(len(s)) for _, r := range s { switch { case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '_': b.WriteRune(r) default: b.WriteRune('_') } } return b.String() } // durableName derives a per-peer durable consumer name for a room. The same // (roomID, endpointID) pair always maps to the same durable, so a peer that // reconnects continues from its last acked message → replay of what it missed. func durableName(roomID, endpointID string) string { return sanitizeJSName(roomID) + "_" + sanitizeJSName(endpointID) } // ensureStream idempotently creates (or no-ops on) a file-backed JetStream // stream that captures the given subject for a persisted room. It is safe to // call repeatedly: CreateOrUpdateStream returns the existing stream unchanged // when the config matches. Called from CreateRoom(Persist), and defensively // from the first Publish/Subscribe to a persisted room. func (c *Client) ensureStream(ctx context.Context, roomID, subject string) error { if c.js == nil { return errors.New("client: JetStream not initialized") } cfg := jetstream.StreamConfig{ Name: streamName(roomID), Subjects: []string{subject}, Retention: jetstream.LimitsPolicy, Storage: jetstream.FileStorage, } if _, err := c.js.CreateOrUpdateStream(ctx, cfg); err != nil { return fmt.Errorf("client: ensure stream for room %s: %w", roomID, err) } return nil } // publishPersistent publishes a frame to a persisted room via JetStream and // waits for the server ack, guaranteeing the message is durably stored before // returning. The stream is ensured first so the very first publish to a freshly // created room (or after a restart) cannot be lost for lack of a stream. func (c *Client) publishPersistent(roomID, subject string, frameBytes []byte) error { if c.js == nil { return errors.New("client: JetStream not initialized") } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := c.ensureStream(ctx, roomID, subject); err != nil { return err } if _, err := c.js.Publish(ctx, subject, frameBytes); err != nil { return fmt.Errorf("client: jetstream publish to room %s: %w", roomID, err) } return nil } // subscribePersistent binds a per-peer durable consumer to the room's stream // with DeliverAll + explicit ack. DeliverAll delivers the full backlog // (scrollback) on first bind and then live messages; the durable name makes a // reconnecting peer resume from its last ack (offline replay). Each delivered // message is acked only after the supplied onMsg callback has processed it. func (c *Client) subscribePersistent(roomID, subject string, onMsg func(data []byte)) (jetstream.ConsumeContext, error) { if c.js == nil { return nil, errors.New("client: JetStream not initialized") } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := c.ensureStream(ctx, roomID, subject); err != nil { return nil, err } stream, err := c.js.Stream(ctx, streamName(roomID)) if err != nil { return nil, fmt.Errorf("client: lookup stream for room %s: %w", roomID, err) } cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: durableName(roomID, c.endpoint), DeliverPolicy: jetstream.DeliverAllPolicy, AckPolicy: jetstream.AckExplicitPolicy, }) if err != nil { return nil, fmt.Errorf("client: ensure durable consumer for room %s: %w", roomID, err) } cc, err := cons.Consume(func(msg jetstream.Msg) { onMsg(msg.Data()) _ = msg.Ack() }) if err != nil { return nil, fmt.Errorf("client: consume room %s: %w", roomID, err) } return cc, nil }