fb8a03cf0c
Add cmd/webgw: a single Go binary that holds the operator's bus identity,
connects to the bus as a real authenticated peer (pkg/client), and exposes a
small REST + SSE API the browser consumes. The browser never signs, never
speaks NATS, and never sees a private key.
Endpoints (all under /api, gated by a session cookie except login):
POST /api/login unlock a session with the operator passphrase
POST /api/logout
GET /api/me operator identity the gateway acts as
GET /api/rooms ListMyRooms
POST /api/rooms CreateRoom (default policy: encrypted+persisted+signed)
POST /api/rooms/{id}/join Join (fetch room key)
POST /api/rooms/{id}/send Publish (sealed + signed by the peer)
GET /api/rooms/{id}/stream SSE of decrypted frames (history then live)
Design notes:
- One fan-out hub per room: a single bus subscription is multiplexed to N SSE
clients, avoiding the per-(room,endpoint) durable-consumer contention that
multiple Subscribe calls would cause.
- Posture seam mirrors unibus_admin/clientcheck: empty --ca = plaintext dev,
non-empty = TLS+nkey on both planes; RefreshSession after a membership change
only under the secured (ACL) posture.
- Identity loaded from `pass` or a 0600 file, held only in memory.
- Session auth: passphrase compared in constant time; opaque HttpOnly cookie
so EventSource (which cannot set headers) can authenticate the stream.
TRUST MODEL: room content stays end-to-end encrypted on the bus. The gateway
reads plaintext only because it acts AS the operator's client — a legitimate
member of each room holding the room key. The per-browser wallet (WebCrypto)
that moves decryption into the browser is phase 2.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
141 lines
4.0 KiB
Go
141 lines
4.0 KiB
Go
package main
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/enmanuel/unibus/pkg/client"
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/oklog/ulid/v2"
|
|
)
|
|
|
|
// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The
|
|
// unibus client derives a per-(room, endpoint) durable consumer name, so a
|
|
// second Subscribe for the same room from the same operator would contend for
|
|
// the same durable (load-balanced delivery) rather than each browser receiving
|
|
// every message. The hub holds a single subscription per room and fans each
|
|
// decrypted frame out to every connected browser, which also means the gateway
|
|
// opens at most one bus subscription per room regardless of how many tabs watch
|
|
// it.
|
|
type roomHub struct {
|
|
roomID string
|
|
myEndpoint string
|
|
sub *client.Sub
|
|
|
|
mu sync.Mutex
|
|
clients map[chan msgWire]struct{}
|
|
}
|
|
|
|
// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A
|
|
// malformed id (should not happen for bus-produced frames) yields 0, which the
|
|
// browser renders without crashing.
|
|
func frameTS(msgID string) int64 {
|
|
id, err := ulid.Parse(msgID)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return int64(id.Time())
|
|
}
|
|
|
|
// newRoomHub opens the single bus subscription for roomID and starts fanning
|
|
// decrypted frames out to registered clients. The room must already be joined
|
|
// (so the gateway holds the room key) before this is called.
|
|
func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) {
|
|
h := &roomHub{
|
|
roomID: roomID,
|
|
myEndpoint: myEndpoint,
|
|
clients: map[chan msgWire]struct{}{},
|
|
}
|
|
sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
|
m := msgWire{
|
|
ID: f.MsgID,
|
|
Sender: f.Sender,
|
|
Body: string(plaintext),
|
|
TS: frameTS(f.MsgID),
|
|
Mine: f.Sender == myEndpoint,
|
|
}
|
|
h.broadcast(m)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h.sub = sub
|
|
return h, nil
|
|
}
|
|
|
|
// broadcast delivers a message to every registered client without blocking the
|
|
// NATS delivery goroutine: a client whose buffer is full (a stalled browser)
|
|
// drops this frame rather than stalling the whole room.
|
|
func (h *roomHub) broadcast(m msgWire) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
for ch := range h.clients {
|
|
select {
|
|
case ch <- m:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// add registers a new SSE client channel.
|
|
func (h *roomHub) add(ch chan msgWire) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.clients[ch] = struct{}{}
|
|
}
|
|
|
|
// stop unsubscribes from the bus. Local delivery ends; for a persisted room the
|
|
// durable consumer's ack position stays on the server, so a later subscription
|
|
// with the same operator resumes from where it left off.
|
|
func (h *roomHub) stop() {
|
|
if h.sub != nil {
|
|
_ = h.sub.Unsubscribe()
|
|
}
|
|
}
|
|
|
|
// openStream joins the room (idempotent; fetches the room key for encrypted
|
|
// rooms), attaches an SSE client to the room's hub (creating it on first watcher),
|
|
// and returns the client's message channel plus a cleanup func. The cleanup
|
|
// detaches the client and, when it was the last watcher, tears down the room's
|
|
// single bus subscription.
|
|
func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) {
|
|
if err := g.join(roomID); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
g.mu.Lock()
|
|
h := g.hubs[roomID]
|
|
if h == nil {
|
|
var err error
|
|
h, err = newRoomHub(g.cli, roomID, g.endpoint)
|
|
if err != nil {
|
|
g.mu.Unlock()
|
|
return nil, nil, err
|
|
}
|
|
g.hubs[roomID] = h
|
|
}
|
|
g.mu.Unlock()
|
|
|
|
// Buffer so a brief render hitch in the browser does not drop live frames; a
|
|
// sustained stall still drops (broadcast is non-blocking) rather than wedging
|
|
// the room.
|
|
ch := make(chan msgWire, 64)
|
|
h.add(ch)
|
|
|
|
// cleanup takes g.mu before h.mu (the single, consistent lock order) so a
|
|
// concurrent openStream that re-creates the hub cannot race the teardown.
|
|
cleanup := func() {
|
|
g.mu.Lock()
|
|
defer g.mu.Unlock()
|
|
h.mu.Lock()
|
|
delete(h.clients, ch)
|
|
empty := len(h.clients) == 0
|
|
h.mu.Unlock()
|
|
if empty {
|
|
if cur := g.hubs[roomID]; cur == h {
|
|
delete(g.hubs, roomID)
|
|
h.stop()
|
|
}
|
|
}
|
|
}
|
|
return ch, cleanup, nil
|
|
}
|