package main import ( "sync" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/frame" "github.com/oklog/ulid/v2" ) // roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The // unibus client derives a per-(room, endpoint) durable consumer name, so a // second Subscribe for the same room from the same operator would contend for // the same durable (load-balanced delivery) rather than each browser receiving // every message. The hub holds a single subscription per room and fans each // decrypted frame out to every connected browser, which also means the gateway // opens at most one bus subscription per room regardless of how many tabs watch // it. type roomHub struct { roomID string myEndpoint string sub *client.Sub mu sync.Mutex clients map[chan msgWire]struct{} } // frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A // malformed id (should not happen for bus-produced frames) yields 0, which the // browser renders without crashing. func frameTS(msgID string) int64 { id, err := ulid.Parse(msgID) if err != nil { return 0 } return int64(id.Time()) } // newRoomHub opens the single bus subscription for roomID and starts fanning // decrypted frames out to registered clients. The room must already be joined // (so the gateway holds the room key) before this is called. func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) { h := &roomHub{ roomID: roomID, myEndpoint: myEndpoint, clients: map[chan msgWire]struct{}{}, } sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { m := msgWire{ ID: f.MsgID, Sender: f.Sender, Body: string(plaintext), TS: frameTS(f.MsgID), Mine: f.Sender == myEndpoint, } h.broadcast(m) }) if err != nil { return nil, err } h.sub = sub return h, nil } // broadcast delivers a message to every registered client without blocking the // NATS delivery goroutine: a client whose buffer is full (a stalled browser) // drops this frame rather than stalling the whole room. func (h *roomHub) broadcast(m msgWire) { h.mu.Lock() defer h.mu.Unlock() for ch := range h.clients { select { case ch <- m: default: } } } // add registers a new SSE client channel. func (h *roomHub) add(ch chan msgWire) { h.mu.Lock() defer h.mu.Unlock() h.clients[ch] = struct{}{} } // stop unsubscribes from the bus. Local delivery ends; for a persisted room the // durable consumer's ack position stays on the server, so a later subscription // with the same operator resumes from where it left off. func (h *roomHub) stop() { if h.sub != nil { _ = h.sub.Unsubscribe() } } // openStream joins the room (idempotent; fetches the room key for encrypted // rooms), attaches an SSE client to the room's hub (creating it on first watcher), // and returns the client's message channel plus a cleanup func. The cleanup // detaches the client and, when it was the last watcher, tears down the room's // single bus subscription. func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) { if err := g.join(roomID); err != nil { return nil, nil, err } g.mu.Lock() h := g.hubs[roomID] if h == nil { var err error h, err = newRoomHub(g.cli, roomID, g.endpoint) if err != nil { g.mu.Unlock() return nil, nil, err } g.hubs[roomID] = h } g.mu.Unlock() // Buffer so a brief render hitch in the browser does not drop live frames; a // sustained stall still drops (broadcast is non-blocking) rather than wedging // the room. ch := make(chan msgWire, 64) h.add(ch) // cleanup takes g.mu before h.mu (the single, consistent lock order) so a // concurrent openStream that re-creates the hub cannot race the teardown. cleanup := func() { g.mu.Lock() defer g.mu.Unlock() h.mu.Lock() delete(h.clients, ch) empty := len(h.clients) == 0 h.mu.Unlock() if empty { if cur := g.hubs[roomID]; cur == h { delete(g.hubs, roomID) h.stop() } } } return ch, cleanup, nil }