feat: add web playground for interactive bus testing (SSE + vanilla UI, all-in-one server)

This commit is contained in:
agent
2026-06-03 21:18:27 +02:00
parent 888ff75236
commit 22d6106e32
3 changed files with 1139 additions and 0 deletions
+605
View File
@@ -0,0 +1,605 @@
// Command playground is an all-in-one, web-based sandbox for the unibus message
// bus. A single `go run ./playground` launches the entire stack embedded:
//
// - an embedded NATS server with JetStream (the data plane),
// - the membership control plane (rooms, members, sealed keys, rekey) over an
// internal HTTP server,
// - the media blob store, and
// - a browser-facing web UI on :7700.
//
// The browser never speaks NATS. The Go server is the actual bus peer: it holds
// one unibus client per named peer, subscribes to rooms on the peer's behalf,
// and streams received messages to the browser over Server-Sent Events. The
// browser drives everything with plain fetch() + EventSource() — no build step,
// no JS framework, no external libraries.
//
// This is a playground (see .claude/rules/playgrounds.md): it lives inside the
// unibus app, reuses the parent module (no new go.mod), is not indexed, and
// stores ephemeral state under playground/local_files/.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
_ "embed"
"github.com/nats-io/nats.go"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
)
// Fixed ports (verified free before assignment — do not change without reason).
const (
webAddr = "127.0.0.1:7700" // browser-facing web UI
ctrlAddr = "127.0.0.1:8480" // internal membership control plane
ctrlURL = "http://" + ctrlAddr
natsPort = 4260 // internal embedded NATS
natsURL = "nats://127.0.0.1:4260"
localFiles = "playground/local_files"
)
//go:embed index.html
var indexHTML []byte
// ---------------------------------------------------------------------------
// Event: a message received by a peer on one of its subscribed rooms. Fanned
// out to every SSE listener attached to that peer.
// ---------------------------------------------------------------------------
type Event struct {
RoomID string `json:"room_id"`
Subject string `json:"subject"`
Sender string `json:"sender"`
Text string `json:"text"`
Encrypted bool `json:"encrypted"`
TS int64 `json:"ts"` // unix millis
}
// roomInfo caches the per-room metadata a peer needs to label incoming frames.
type roomInfo struct {
subject string
encrypt bool
}
// peerState holds everything about one named peer: its bus client, its public
// endpoint, its live subscriptions, the rooms it knows, and the set of SSE
// listener channels currently attached to it.
type peerState struct {
name string
client *client.Client
endpoint client.Endpoint
mu sync.Mutex
subs map[string]*nats.Subscription // roomID -> subscription
rooms map[string]roomInfo // roomID -> subject/encrypt
listeners map[chan Event]struct{} // attached SSE channels
}
// emit fans an event out to all attached listeners without blocking on a slow
// or disconnected consumer.
func (p *peerState) emit(ev Event) {
p.mu.Lock()
defer p.mu.Unlock()
for ch := range p.listeners {
select {
case ch <- ev:
default: // listener buffer full: drop rather than block the NATS callback
}
}
}
func (p *peerState) addListener(ch chan Event) {
p.mu.Lock()
p.listeners[ch] = struct{}{}
p.mu.Unlock()
}
func (p *peerState) removeListener(ch chan Event) {
p.mu.Lock()
delete(p.listeners, ch)
p.mu.Unlock()
}
func (p *peerState) setRoom(roomID string, info roomInfo) {
p.mu.Lock()
p.rooms[roomID] = info
p.mu.Unlock()
}
func (p *peerState) roomFor(roomID string) (roomInfo, bool) {
p.mu.Lock()
defer p.mu.Unlock()
info, ok := p.rooms[roomID]
return info, ok
}
// ---------------------------------------------------------------------------
// Hub: the registry of peers, protected by a single mutex.
// ---------------------------------------------------------------------------
type Hub struct {
mu sync.Mutex
peers map[string]*peerState
}
func newHub() *Hub { return &Hub{peers: map[string]*peerState{}} }
// getOrCreate returns the peer for name, creating its identity + bus client on
// first use. Identities persist to playground/local_files/<name>.id so a peer
// keeps the same endpoint across restarts.
func (h *Hub) getOrCreate(name string) (*peerState, error) {
h.mu.Lock()
defer h.mu.Unlock()
if p, ok := h.peers[name]; ok {
return p, nil
}
idPath := filepath.Join(localFiles, name+".id")
id, err := client.LoadOrCreateIdentity(idPath)
if err != nil {
return nil, fmt.Errorf("identity for %q: %w", name, err)
}
c, err := client.New(natsURL, ctrlURL, id)
if err != nil {
return nil, fmt.Errorf("client for %q: %w", name, err)
}
p := &peerState{
name: name,
client: c,
endpoint: c.Endpoint(),
subs: map[string]*nats.Subscription{},
rooms: map[string]roomInfo{},
listeners: map[chan Event]struct{}{},
}
h.peers[name] = p
return p, nil
}
// lookup returns an already-created peer or false.
func (h *Hub) lookup(name string) (*peerState, bool) {
h.mu.Lock()
defer h.mu.Unlock()
p, ok := h.peers[name]
return p, ok
}
// list returns a snapshot of all peers (name + endpoint id).
func (h *Hub) list() []map[string]string {
h.mu.Lock()
defer h.mu.Unlock()
out := make([]map[string]string, 0, len(h.peers))
for name, p := range h.peers {
out = append(out, map[string]string{"name": name, "endpoint_id": p.endpoint.ID})
}
return out
}
func (h *Hub) closeAll() {
h.mu.Lock()
defer h.mu.Unlock()
for _, p := range h.peers {
p.mu.Lock()
for _, sub := range p.subs {
_ = sub.Unsubscribe()
}
p.mu.Unlock()
_ = p.client.Close()
}
}
// subscribeRoom subscribes the peer to a room (idempotent) and wires the frame
// handler to fan incoming messages out as Events. info labels each event with
// the room's subject and encryption flag.
func (p *peerState) subscribeRoom(roomID string, info roomInfo) error {
p.mu.Lock()
if _, already := p.subs[roomID]; already {
p.mu.Unlock()
return nil
}
p.mu.Unlock()
sub, err := p.client.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
p.emit(Event{
RoomID: roomID,
Subject: info.subject,
Sender: f.Sender,
Text: string(plaintext),
Encrypted: info.encrypt,
TS: time.Now().UnixMilli(),
})
})
if err != nil {
return fmt.Errorf("subscribe room %s: %w", roomID, err)
}
p.mu.Lock()
p.subs[roomID] = sub
p.mu.Unlock()
p.setRoom(roomID, info)
return nil
}
// ---------------------------------------------------------------------------
// Control-plane helper: fetch a room's subject + policy from membershipd. The
// client package keeps fetchRoom private, so the playground talks to the
// control plane directly (read endpoints are unauthenticated by design).
// ---------------------------------------------------------------------------
type ctrlRoomResp struct {
Subject string `json:"subject"`
Epoch int `json:"epoch"`
Policy struct {
Encrypt bool `json:"encrypt"`
Persist bool `json:"persist"`
SignMsgs bool `json:"sign_msgs"`
} `json:"policy"`
}
func fetchRoomInfo(roomID string) (roomInfo, error) {
resp, err := http.Get(ctrlURL + "/rooms/" + roomID)
if err != nil {
return roomInfo{}, fmt.Errorf("fetch room %s: %w", roomID, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return roomInfo{}, fmt.Errorf("room %s not found (status %d)", roomID, resp.StatusCode)
}
var r ctrlRoomResp
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return roomInfo{}, fmt.Errorf("decode room %s: %w", roomID, err)
}
return roomInfo{subject: r.Subject, encrypt: r.Policy.Encrypt}, nil
}
// ---------------------------------------------------------------------------
// HTTP handlers (web UI on :7700).
// ---------------------------------------------------------------------------
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func writeErr(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]string{"error": msg})
}
func decodeBody(r *http.Request, out any) error {
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(out)
}
func (h *Hub) handleIndex(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_, _ = w.Write(indexHTML)
}
func (h *Hub) handlePeer(w http.ResponseWriter, r *http.Request) {
var req struct {
Name string `json:"name"`
}
if err := decodeBody(r, &req); err != nil || req.Name == "" {
writeErr(w, http.StatusBadRequest, "name required")
return
}
p, err := h.getOrCreate(req.Name)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"name": p.name, "endpoint_id": p.endpoint.ID})
}
func (h *Hub) handlePeers(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, h.list())
}
func (h *Hub) handleRoom(w http.ResponseWriter, r *http.Request) {
var req struct {
Peer string `json:"peer"`
Subject string `json:"subject"`
Encrypt bool `json:"encrypt"`
}
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.Subject == "" {
writeErr(w, http.StatusBadRequest, "peer and subject required")
return
}
p, ok := h.lookup(req.Peer)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
return
}
policy := room.ModeNATS
if req.Encrypt {
policy = room.ModeMatrix
}
roomID, err := p.client.CreateRoom(req.Subject, policy)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
info := roomInfo{subject: req.Subject, encrypt: req.Encrypt}
if err := p.subscribeRoom(roomID, info); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"room_id": roomID, "subject": req.Subject, "encrypt": req.Encrypt,
})
}
func (h *Hub) handleJoin(w http.ResponseWriter, r *http.Request) {
var req struct {
Peer string `json:"peer"`
RoomID string `json:"room_id"`
}
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" {
writeErr(w, http.StatusBadRequest, "peer and room_id required")
return
}
p, ok := h.lookup(req.Peer)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
return
}
if err := p.client.Join(req.RoomID); err != nil {
writeErr(w, http.StatusBadRequest, "join failed: "+err.Error())
return
}
info, err := fetchRoomInfo(req.RoomID)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
if err := p.subscribeRoom(req.RoomID, info); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"subject": info.subject, "encrypt": info.encrypt})
}
func (h *Hub) handleInvite(w http.ResponseWriter, r *http.Request) {
var req struct {
Peer string `json:"peer"`
RoomID string `json:"room_id"`
Target string `json:"target"`
}
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" {
writeErr(w, http.StatusBadRequest, "peer, room_id and target required")
return
}
p, ok := h.lookup(req.Peer)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
return
}
target, ok := h.lookup(req.Target)
if !ok {
writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist; connect it first")
return
}
if err := p.client.Invite(req.RoomID, target.endpoint); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "invited", "target": req.Target})
}
func (h *Hub) handlePublish(w http.ResponseWriter, r *http.Request) {
var req struct {
Peer string `json:"peer"`
RoomID string `json:"room_id"`
Text string `json:"text"`
}
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" {
writeErr(w, http.StatusBadRequest, "peer and room_id required")
return
}
p, ok := h.lookup(req.Peer)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
return
}
if err := p.client.Publish(req.RoomID, []byte(req.Text)); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "published"})
}
func (h *Hub) handleKick(w http.ResponseWriter, r *http.Request) {
var req struct {
Peer string `json:"peer"`
RoomID string `json:"room_id"`
Target string `json:"target"`
}
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" {
writeErr(w, http.StatusBadRequest, "peer, room_id and target required")
return
}
p, ok := h.lookup(req.Peer)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
return
}
target, ok := h.lookup(req.Target)
if !ok {
writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist")
return
}
if err := p.client.Kick(req.RoomID, target.endpoint.ID); err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "kicked", "target": req.Target})
}
// handleStream is the SSE endpoint. The browser opens one EventSource per peer;
// each received Event is emitted as a `data: <json>\n\n` block. The listener is
// cleaned up when the HTTP request context is cancelled (tab closed / reload).
func (h *Hub) handleStream(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("peer")
if name == "" {
writeErr(w, http.StatusBadRequest, "peer query param required")
return
}
p, ok := h.lookup(name)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+name)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
writeErr(w, http.StatusInternalServerError, "streaming unsupported")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ch := make(chan Event, 64)
p.addListener(ch)
defer p.removeListener(ch)
// Initial comment so the browser marks the stream open immediately.
fmt.Fprintf(w, ": connected to %s\n\n", name)
flusher.Flush()
ctx := r.Context()
ping := time.NewTicker(20 * time.Second)
defer ping.Stop()
for {
select {
case <-ctx.Done():
return
case <-ping.C:
fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
case ev := <-ch:
b, err := json.Marshal(ev)
if err != nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", b)
flusher.Flush()
}
}
}
// ---------------------------------------------------------------------------
// main: bring up NATS, control plane, and the web server; tear them all down
// cleanly on signal.
// ---------------------------------------------------------------------------
func main() {
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[playground] ")
if err := os.MkdirAll(localFiles, 0o755); err != nil {
log.Fatalf("mkdir %s: %v", localFiles, err)
}
// 1. Data plane: embedded NATS + JetStream on the fixed internal port.
ns, err := embeddednats.Start(filepath.Join(localFiles, "js"), natsPort)
if err != nil {
log.Fatalf("start embedded nats: %v", err)
}
log.Printf("embedded NATS (JetStream) ready: %s", embeddednats.ClientURL(ns))
// 2. Control plane: membership store + blob store + internal HTTP server.
store, err := membership.Open(filepath.Join(localFiles, "play.db"))
if err != nil {
ns.Shutdown()
log.Fatalf("open membership store: %v", err)
}
blobs, err := blobstore.New(filepath.Join(localFiles, "blobs"))
if err != nil {
store.Close()
ns.Shutdown()
log.Fatalf("open blob store: %v", err)
}
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs)}
go func() {
if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("control plane: %v", err)
}
}()
if err := waitHealthy(ctrlURL+"/healthz", 5*time.Second); err != nil {
log.Fatalf("control plane not healthy: %v", err)
}
log.Printf("control plane ready: %s", ctrlURL)
// 3. Web UI on :7700.
hub := newHub()
mux := http.NewServeMux()
mux.HandleFunc("/", hub.handleIndex)
mux.HandleFunc("POST /api/peer", hub.handlePeer)
mux.HandleFunc("GET /api/peers", hub.handlePeers)
mux.HandleFunc("POST /api/room", hub.handleRoom)
mux.HandleFunc("POST /api/join", hub.handleJoin)
mux.HandleFunc("POST /api/invite", hub.handleInvite)
mux.HandleFunc("POST /api/publish", hub.handlePublish)
mux.HandleFunc("POST /api/kick", hub.handleKick)
mux.HandleFunc("GET /api/stream", hub.handleStream)
webSrv := &http.Server{Addr: webAddr, Handler: mux}
go func() {
if err := webSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("web server: %v", err)
}
}()
log.Printf("web UI ready: http://%s", webAddr)
log.Printf("open http://localhost:7700 in two browser tabs to try the bus")
// 4. Graceful shutdown.
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
log.Printf("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = webSrv.Shutdown(ctx)
hub.closeAll()
_ = ctrlSrv.Shutdown(ctx)
store.Close()
ns.Shutdown()
ns.WaitForShutdown()
log.Printf("bye")
}
// waitHealthy polls url until it returns a 2xx/3xx or the deadline elapses.
func waitHealthy(url string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
c := &http.Client{Timeout: 500 * time.Millisecond}
for time.Now().Before(deadline) {
resp, err := c.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode < 400 {
return nil
}
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for %s", url)
}