e8e37d77fe
Extracted from unibus v0.13.0: the chat SPA (web/, React+Mantine, per-user BIP39 wallet) and the web gateway (cmd/webgw, REST+SSE) that acts as a bus peer for the browser. Consumes unibus as a Go module via replace => ../unibus, keeping its own replace fn-registry for the cybersecurity primitives. go build/vet/test and pnpm build green in the new location.
141 lines
4.0 KiB
Go
141 lines
4.0 KiB
Go
package main
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/enmanuel/unibus/pkg/client"
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/oklog/ulid/v2"
|
|
)
|
|
|
|
// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The
|
|
// unibus client derives a per-(room, endpoint) durable consumer name, so a
|
|
// second Subscribe for the same room from the same operator would contend for
|
|
// the same durable (load-balanced delivery) rather than each browser receiving
|
|
// every message. The hub holds a single subscription per room and fans each
|
|
// decrypted frame out to every connected browser, which also means the gateway
|
|
// opens at most one bus subscription per room regardless of how many tabs watch
|
|
// it.
|
|
type roomHub struct {
|
|
roomID string
|
|
myEndpoint string
|
|
sub *client.Sub
|
|
|
|
mu sync.Mutex
|
|
clients map[chan msgWire]struct{}
|
|
}
|
|
|
|
// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A
|
|
// malformed id (should not happen for bus-produced frames) yields 0, which the
|
|
// browser renders without crashing.
|
|
func frameTS(msgID string) int64 {
|
|
id, err := ulid.Parse(msgID)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return int64(id.Time())
|
|
}
|
|
|
|
// newRoomHub opens the single bus subscription for roomID and starts fanning
|
|
// decrypted frames out to registered clients. The room must already be joined
|
|
// (so the gateway holds the room key) before this is called.
|
|
func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) {
|
|
h := &roomHub{
|
|
roomID: roomID,
|
|
myEndpoint: myEndpoint,
|
|
clients: map[chan msgWire]struct{}{},
|
|
}
|
|
sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
|
m := msgWire{
|
|
ID: f.MsgID,
|
|
Sender: f.Sender,
|
|
Body: string(plaintext),
|
|
TS: frameTS(f.MsgID),
|
|
Mine: f.Sender == myEndpoint,
|
|
}
|
|
h.broadcast(m)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h.sub = sub
|
|
return h, nil
|
|
}
|
|
|
|
// broadcast delivers a message to every registered client without blocking the
|
|
// NATS delivery goroutine: a client whose buffer is full (a stalled browser)
|
|
// drops this frame rather than stalling the whole room.
|
|
func (h *roomHub) broadcast(m msgWire) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
for ch := range h.clients {
|
|
select {
|
|
case ch <- m:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// add registers a new SSE client channel.
|
|
func (h *roomHub) add(ch chan msgWire) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.clients[ch] = struct{}{}
|
|
}
|
|
|
|
// stop unsubscribes from the bus. Local delivery ends; for a persisted room the
|
|
// durable consumer's ack position stays on the server, so a later subscription
|
|
// with the same operator resumes from where it left off.
|
|
func (h *roomHub) stop() {
|
|
if h.sub != nil {
|
|
_ = h.sub.Unsubscribe()
|
|
}
|
|
}
|
|
|
|
// openStream joins the room (idempotent; fetches the room key for encrypted
|
|
// rooms), attaches an SSE client to the room's hub (creating it on first watcher),
|
|
// and returns the client's message channel plus a cleanup func. The cleanup
|
|
// detaches the client and, when it was the last watcher, tears down the room's
|
|
// single bus subscription.
|
|
func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) {
|
|
if err := g.join(roomID); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
g.mu.Lock()
|
|
h := g.hubs[roomID]
|
|
if h == nil {
|
|
var err error
|
|
h, err = newRoomHub(g.cli, roomID, g.endpoint)
|
|
if err != nil {
|
|
g.mu.Unlock()
|
|
return nil, nil, err
|
|
}
|
|
g.hubs[roomID] = h
|
|
}
|
|
g.mu.Unlock()
|
|
|
|
// Buffer so a brief render hitch in the browser does not drop live frames; a
|
|
// sustained stall still drops (broadcast is non-blocking) rather than wedging
|
|
// the room.
|
|
ch := make(chan msgWire, 64)
|
|
h.add(ch)
|
|
|
|
// cleanup takes g.mu before h.mu (the single, consistent lock order) so a
|
|
// concurrent openStream that re-creates the hub cannot race the teardown.
|
|
cleanup := func() {
|
|
g.mu.Lock()
|
|
defer g.mu.Unlock()
|
|
h.mu.Lock()
|
|
delete(h.clients, ch)
|
|
empty := len(h.clients) == 0
|
|
h.mu.Unlock()
|
|
if empty {
|
|
if cur := g.hubs[roomID]; cur == h {
|
|
delete(g.hubs, roomID)
|
|
h.stop()
|
|
}
|
|
}
|
|
}
|
|
return ch, cleanup, nil
|
|
}
|