feat(webgw): web gateway peer (REST + SSE) for the chat SPA
Add cmd/webgw: a single Go binary that holds the operator's bus identity,
connects to the bus as a real authenticated peer (pkg/client), and exposes a
small REST + SSE API the browser consumes. The browser never signs, never
speaks NATS, and never sees a private key.
Endpoints (all under /api, gated by a session cookie except login):
POST /api/login unlock a session with the operator passphrase
POST /api/logout
GET /api/me operator identity the gateway acts as
GET /api/rooms ListMyRooms
POST /api/rooms CreateRoom (default policy: encrypted+persisted+signed)
POST /api/rooms/{id}/join Join (fetch room key)
POST /api/rooms/{id}/send Publish (sealed + signed by the peer)
GET /api/rooms/{id}/stream SSE of decrypted frames (history then live)
Design notes:
- One fan-out hub per room: a single bus subscription is multiplexed to N SSE
clients, avoiding the per-(room,endpoint) durable-consumer contention that
multiple Subscribe calls would cause.
- Posture seam mirrors unibus_admin/clientcheck: empty --ca = plaintext dev,
non-empty = TLS+nkey on both planes; RefreshSession after a membership change
only under the secured (ACL) posture.
- Identity loaded from `pass` or a 0600 file, held only in memory.
- Session auth: passphrase compared in constant time; opaque HttpOnly cookie
so EventSource (which cannot set headers) can authenticate the stream.
TRUST MODEL: room content stays end-to-end encrypted on the bus. The gateway
reads plaintext only because it acts AS the operator's client — a legitimate
member of each room holding the room key. The per-browser wallet (WebCrypto)
that moves decryption into the browser is phase 2.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,140 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/oklog/ulid/v2"
|
||||
)
|
||||
|
||||
// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The
|
||||
// unibus client derives a per-(room, endpoint) durable consumer name, so a
|
||||
// second Subscribe for the same room from the same operator would contend for
|
||||
// the same durable (load-balanced delivery) rather than each browser receiving
|
||||
// every message. The hub holds a single subscription per room and fans each
|
||||
// decrypted frame out to every connected browser, which also means the gateway
|
||||
// opens at most one bus subscription per room regardless of how many tabs watch
|
||||
// it.
|
||||
type roomHub struct {
|
||||
roomID string
|
||||
myEndpoint string
|
||||
sub *client.Sub
|
||||
|
||||
mu sync.Mutex
|
||||
clients map[chan msgWire]struct{}
|
||||
}
|
||||
|
||||
// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A
|
||||
// malformed id (should not happen for bus-produced frames) yields 0, which the
|
||||
// browser renders without crashing.
|
||||
func frameTS(msgID string) int64 {
|
||||
id, err := ulid.Parse(msgID)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return int64(id.Time())
|
||||
}
|
||||
|
||||
// newRoomHub opens the single bus subscription for roomID and starts fanning
|
||||
// decrypted frames out to registered clients. The room must already be joined
|
||||
// (so the gateway holds the room key) before this is called.
|
||||
func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) {
|
||||
h := &roomHub{
|
||||
roomID: roomID,
|
||||
myEndpoint: myEndpoint,
|
||||
clients: map[chan msgWire]struct{}{},
|
||||
}
|
||||
sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
||||
m := msgWire{
|
||||
ID: f.MsgID,
|
||||
Sender: f.Sender,
|
||||
Body: string(plaintext),
|
||||
TS: frameTS(f.MsgID),
|
||||
Mine: f.Sender == myEndpoint,
|
||||
}
|
||||
h.broadcast(m)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h.sub = sub
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// broadcast delivers a message to every registered client without blocking the
|
||||
// NATS delivery goroutine: a client whose buffer is full (a stalled browser)
|
||||
// drops this frame rather than stalling the whole room.
|
||||
func (h *roomHub) broadcast(m msgWire) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
for ch := range h.clients {
|
||||
select {
|
||||
case ch <- m:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add registers a new SSE client channel.
|
||||
func (h *roomHub) add(ch chan msgWire) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.clients[ch] = struct{}{}
|
||||
}
|
||||
|
||||
// stop unsubscribes from the bus. Local delivery ends; for a persisted room the
|
||||
// durable consumer's ack position stays on the server, so a later subscription
|
||||
// with the same operator resumes from where it left off.
|
||||
func (h *roomHub) stop() {
|
||||
if h.sub != nil {
|
||||
_ = h.sub.Unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
// openStream joins the room (idempotent; fetches the room key for encrypted
|
||||
// rooms), attaches an SSE client to the room's hub (creating it on first watcher),
|
||||
// and returns the client's message channel plus a cleanup func. The cleanup
|
||||
// detaches the client and, when it was the last watcher, tears down the room's
|
||||
// single bus subscription.
|
||||
func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) {
|
||||
if err := g.join(roomID); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
g.mu.Lock()
|
||||
h := g.hubs[roomID]
|
||||
if h == nil {
|
||||
var err error
|
||||
h, err = newRoomHub(g.cli, roomID, g.endpoint)
|
||||
if err != nil {
|
||||
g.mu.Unlock()
|
||||
return nil, nil, err
|
||||
}
|
||||
g.hubs[roomID] = h
|
||||
}
|
||||
g.mu.Unlock()
|
||||
|
||||
// Buffer so a brief render hitch in the browser does not drop live frames; a
|
||||
// sustained stall still drops (broadcast is non-blocking) rather than wedging
|
||||
// the room.
|
||||
ch := make(chan msgWire, 64)
|
||||
h.add(ch)
|
||||
|
||||
// cleanup takes g.mu before h.mu (the single, consistent lock order) so a
|
||||
// concurrent openStream that re-creates the hub cannot race the teardown.
|
||||
cleanup := func() {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
h.mu.Lock()
|
||||
delete(h.clients, ch)
|
||||
empty := len(h.clients) == 0
|
||||
h.mu.Unlock()
|
||||
if empty {
|
||||
if cur := g.hubs[roomID]; cur == h {
|
||||
delete(g.hubs, roomID)
|
||||
h.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
return ch, cleanup, nil
|
||||
}
|
||||
Reference in New Issue
Block a user