diff --git a/playground/README.md b/playground/README.md new file mode 100644 index 0000000..e8f6284 --- /dev/null +++ b/playground/README.md @@ -0,0 +1,85 @@ +# unibus playground + +An all-in-one, web-based sandbox for the **unibus** message bus. One command +brings up the entire stack embedded — no NATS to install, no services to wire — +and a browser UI lets you exercise the bus visually: create peers, create and +join rooms (cleartext or end-to-end encrypted), invite, publish, watch messages +arrive live, and kick members (forward secrecy). + +This is a **playground** (see `.claude/rules/playgrounds.md`): it lives inside +the `unibus` app, reuses the parent Go module (no separate `go.mod`), is not +indexed, and keeps all runtime state under `playground/local_files/` (ephemeral, +safe to delete). + +## Run + +From the `unibus` app directory: + +```bash +cd /home/enmanuel/fn_registry/projects/message_bus/apps/unibus +go run ./playground +``` + +Then open **http://localhost:7700** in your browser. + +Stop with `Ctrl-C` — the server tears down the web UI, every bus client, the +control plane, and the embedded NATS cleanly (no orphaned processes). + +## Architecture + +The browser never speaks NATS. The Go server is the actual bus peer: + +``` +browser ──fetch/SSE──▶ playground server (:7700) + │ holds one unibus client per named peer + ├──HTTP──▶ membership control plane (127.0.0.1:8480) + └──NATS──▶ embedded NATS + JetStream (:4260) +``` + +- **:7700** — web UI (the only browser-facing port). +- **127.0.0.1:8480** — membership control plane (rooms, members, sealed keys, + rekey, blobs). Internal only. +- **:4260** — embedded NATS + JetStream (the data plane). Internal only. + +Each named peer gets its own long-term identity, persisted to +`playground/local_files/.id`, so a peer keeps the same endpoint across +restarts. When a peer creates or joins a room, the server subscribes on its +behalf and streams every received frame to that peer's open browser tabs over +Server-Sent Events. + +The playground only orchestrates the public unibus client API +(`CreateRoom`, `Join`, `Subscribe`, `Publish`, `Invite`, `Kick`); it never +reimplements bus or crypto logic. + +## Try it: 2 peers + encryption + kick + +1. Open **two browser tabs** on http://localhost:7700. +2. Tab A: type `alice`, click **Connect**. +3. Tab B: type `bob`, click **Connect**. +4. Tab A (alice): type a subject like `room.general`, tick **🔒 encrypted + (E2E)**, click **Create room**. Copy the resulting `room_id`. +5. Tab A (alice): in the Action panel, pick `bob` as the target peer (use the + ↻ button to refresh the peer list if needed) and click **Invite to this + room**. +6. Tab B (bob): paste the `room_id` into the join field and click **Join**. +7. Type messages in **both** tabs and hit Send — each message appears live in + both tabs, tagged with subject, sender, time, and 🔒 (encrypted) or `clear`. +8. Tab A (alice): click **Kick from this room** with `bob` selected. The room + key rotates to a new epoch. New messages alice sends are no longer visible to + bob — **forward secrecy**: bob no longer holds the current key. + +Cleartext rooms (leave the checkbox unticked) behave like plain NATS fan-out: +fast, ephemeral, unsigned. Encrypted rooms are the Matrix-like mode: E2E +encrypted, persisted, and per-message signed. + +## State / cleanup + +All writable state lives under `playground/local_files/`: + +- `.id` — per-peer identity (private keys; treat like an SSH key). +- `play.db` — membership store (rooms, members, sealed keys). +- `blobs/` — media blob store. +- `js/` — embedded JetStream store. + +Delete the whole `playground/local_files/` directory to reset to a clean slate. +It is gitignored and never distributed. diff --git a/playground/index.html b/playground/index.html new file mode 100644 index 0000000..9660a93 --- /dev/null +++ b/playground/index.html @@ -0,0 +1,449 @@ + + + + + +unibus playground + + + +
+

unibus playground

+ embedded NATS + JetStream · E2E rooms · forward secrecy · SSE +
+ +
+ +
+
+

1 · Identity

+ +
+ + +
+
+
+
+ +
+

2 · Rooms

+ + +
+ + +
+ +
+ + + +
+
+ +
+

3 · Action

+ + + +
+ + +
+
+ +
+ + +
+ + +
+
+
+ + +
+
+

Live messages disconnected

+
+
+
+ ⓘ How to try it
+ Open 2 tabs. Connect as alice in one and bob in the other. + In alice: create a 🔒 encrypted room, copy the room_id, + then pick bob as target and Invite to this room. + In bob: paste that room_id and Join. + Type in both → messages appear live on each side. + In alice: Kick bob → bob stops seeing new messages (forward secrecy: the room + key rotates and bob no longer holds it). +
+
+
+ + + + diff --git a/playground/server.go b/playground/server.go new file mode 100644 index 0000000..1cfd61f --- /dev/null +++ b/playground/server.go @@ -0,0 +1,605 @@ +// Command playground is an all-in-one, web-based sandbox for the unibus message +// bus. A single `go run ./playground` launches the entire stack embedded: +// +// - an embedded NATS server with JetStream (the data plane), +// - the membership control plane (rooms, members, sealed keys, rekey) over an +// internal HTTP server, +// - the media blob store, and +// - a browser-facing web UI on :7700. +// +// The browser never speaks NATS. The Go server is the actual bus peer: it holds +// one unibus client per named peer, subscribes to rooms on the peer's behalf, +// and streams received messages to the browser over Server-Sent Events. The +// browser drives everything with plain fetch() + EventSource() — no build step, +// no JS framework, no external libraries. +// +// This is a playground (see .claude/rules/playgrounds.md): it lives inside the +// unibus app, reuses the parent module (no new go.mod), is not indexed, and +// stores ephemeral state under playground/local_files/. +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "sync" + "syscall" + "time" + + _ "embed" + + "github.com/nats-io/nats.go" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/enmanuel/unibus/pkg/room" +) + +// Fixed ports (verified free before assignment — do not change without reason). +const ( + webAddr = "127.0.0.1:7700" // browser-facing web UI + ctrlAddr = "127.0.0.1:8480" // internal membership control plane + ctrlURL = "http://" + ctrlAddr + natsPort = 4260 // internal embedded NATS + natsURL = "nats://127.0.0.1:4260" + localFiles = "playground/local_files" +) + +//go:embed index.html +var indexHTML []byte + +// --------------------------------------------------------------------------- +// Event: a message received by a peer on one of its subscribed rooms. Fanned +// out to every SSE listener attached to that peer. +// --------------------------------------------------------------------------- + +type Event struct { + RoomID string `json:"room_id"` + Subject string `json:"subject"` + Sender string `json:"sender"` + Text string `json:"text"` + Encrypted bool `json:"encrypted"` + TS int64 `json:"ts"` // unix millis +} + +// roomInfo caches the per-room metadata a peer needs to label incoming frames. +type roomInfo struct { + subject string + encrypt bool +} + +// peerState holds everything about one named peer: its bus client, its public +// endpoint, its live subscriptions, the rooms it knows, and the set of SSE +// listener channels currently attached to it. +type peerState struct { + name string + client *client.Client + endpoint client.Endpoint + + mu sync.Mutex + subs map[string]*nats.Subscription // roomID -> subscription + rooms map[string]roomInfo // roomID -> subject/encrypt + listeners map[chan Event]struct{} // attached SSE channels +} + +// emit fans an event out to all attached listeners without blocking on a slow +// or disconnected consumer. +func (p *peerState) emit(ev Event) { + p.mu.Lock() + defer p.mu.Unlock() + for ch := range p.listeners { + select { + case ch <- ev: + default: // listener buffer full: drop rather than block the NATS callback + } + } +} + +func (p *peerState) addListener(ch chan Event) { + p.mu.Lock() + p.listeners[ch] = struct{}{} + p.mu.Unlock() +} + +func (p *peerState) removeListener(ch chan Event) { + p.mu.Lock() + delete(p.listeners, ch) + p.mu.Unlock() +} + +func (p *peerState) setRoom(roomID string, info roomInfo) { + p.mu.Lock() + p.rooms[roomID] = info + p.mu.Unlock() +} + +func (p *peerState) roomFor(roomID string) (roomInfo, bool) { + p.mu.Lock() + defer p.mu.Unlock() + info, ok := p.rooms[roomID] + return info, ok +} + +// --------------------------------------------------------------------------- +// Hub: the registry of peers, protected by a single mutex. +// --------------------------------------------------------------------------- + +type Hub struct { + mu sync.Mutex + peers map[string]*peerState +} + +func newHub() *Hub { return &Hub{peers: map[string]*peerState{}} } + +// getOrCreate returns the peer for name, creating its identity + bus client on +// first use. Identities persist to playground/local_files/.id so a peer +// keeps the same endpoint across restarts. +func (h *Hub) getOrCreate(name string) (*peerState, error) { + h.mu.Lock() + defer h.mu.Unlock() + if p, ok := h.peers[name]; ok { + return p, nil + } + idPath := filepath.Join(localFiles, name+".id") + id, err := client.LoadOrCreateIdentity(idPath) + if err != nil { + return nil, fmt.Errorf("identity for %q: %w", name, err) + } + c, err := client.New(natsURL, ctrlURL, id) + if err != nil { + return nil, fmt.Errorf("client for %q: %w", name, err) + } + p := &peerState{ + name: name, + client: c, + endpoint: c.Endpoint(), + subs: map[string]*nats.Subscription{}, + rooms: map[string]roomInfo{}, + listeners: map[chan Event]struct{}{}, + } + h.peers[name] = p + return p, nil +} + +// lookup returns an already-created peer or false. +func (h *Hub) lookup(name string) (*peerState, bool) { + h.mu.Lock() + defer h.mu.Unlock() + p, ok := h.peers[name] + return p, ok +} + +// list returns a snapshot of all peers (name + endpoint id). +func (h *Hub) list() []map[string]string { + h.mu.Lock() + defer h.mu.Unlock() + out := make([]map[string]string, 0, len(h.peers)) + for name, p := range h.peers { + out = append(out, map[string]string{"name": name, "endpoint_id": p.endpoint.ID}) + } + return out +} + +func (h *Hub) closeAll() { + h.mu.Lock() + defer h.mu.Unlock() + for _, p := range h.peers { + p.mu.Lock() + for _, sub := range p.subs { + _ = sub.Unsubscribe() + } + p.mu.Unlock() + _ = p.client.Close() + } +} + +// subscribeRoom subscribes the peer to a room (idempotent) and wires the frame +// handler to fan incoming messages out as Events. info labels each event with +// the room's subject and encryption flag. +func (p *peerState) subscribeRoom(roomID string, info roomInfo) error { + p.mu.Lock() + if _, already := p.subs[roomID]; already { + p.mu.Unlock() + return nil + } + p.mu.Unlock() + + sub, err := p.client.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { + p.emit(Event{ + RoomID: roomID, + Subject: info.subject, + Sender: f.Sender, + Text: string(plaintext), + Encrypted: info.encrypt, + TS: time.Now().UnixMilli(), + }) + }) + if err != nil { + return fmt.Errorf("subscribe room %s: %w", roomID, err) + } + p.mu.Lock() + p.subs[roomID] = sub + p.mu.Unlock() + p.setRoom(roomID, info) + return nil +} + +// --------------------------------------------------------------------------- +// Control-plane helper: fetch a room's subject + policy from membershipd. The +// client package keeps fetchRoom private, so the playground talks to the +// control plane directly (read endpoints are unauthenticated by design). +// --------------------------------------------------------------------------- + +type ctrlRoomResp struct { + Subject string `json:"subject"` + Epoch int `json:"epoch"` + Policy struct { + Encrypt bool `json:"encrypt"` + Persist bool `json:"persist"` + SignMsgs bool `json:"sign_msgs"` + } `json:"policy"` +} + +func fetchRoomInfo(roomID string) (roomInfo, error) { + resp, err := http.Get(ctrlURL + "/rooms/" + roomID) + if err != nil { + return roomInfo{}, fmt.Errorf("fetch room %s: %w", roomID, err) + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return roomInfo{}, fmt.Errorf("room %s not found (status %d)", roomID, resp.StatusCode) + } + var r ctrlRoomResp + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return roomInfo{}, fmt.Errorf("decode room %s: %w", roomID, err) + } + return roomInfo{subject: r.Subject, encrypt: r.Policy.Encrypt}, nil +} + +// --------------------------------------------------------------------------- +// HTTP handlers (web UI on :7700). +// --------------------------------------------------------------------------- + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeErr(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]string{"error": msg}) +} + +func decodeBody(r *http.Request, out any) error { + defer r.Body.Close() + return json.NewDecoder(r.Body).Decode(out) +} + +func (h *Hub) handleIndex(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + _, _ = w.Write(indexHTML) +} + +func (h *Hub) handlePeer(w http.ResponseWriter, r *http.Request) { + var req struct { + Name string `json:"name"` + } + if err := decodeBody(r, &req); err != nil || req.Name == "" { + writeErr(w, http.StatusBadRequest, "name required") + return + } + p, err := h.getOrCreate(req.Name) + if err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"name": p.name, "endpoint_id": p.endpoint.ID}) +} + +func (h *Hub) handlePeers(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, h.list()) +} + +func (h *Hub) handleRoom(w http.ResponseWriter, r *http.Request) { + var req struct { + Peer string `json:"peer"` + Subject string `json:"subject"` + Encrypt bool `json:"encrypt"` + } + if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.Subject == "" { + writeErr(w, http.StatusBadRequest, "peer and subject required") + return + } + p, ok := h.lookup(req.Peer) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) + return + } + policy := room.ModeNATS + if req.Encrypt { + policy = room.ModeMatrix + } + roomID, err := p.client.CreateRoom(req.Subject, policy) + if err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + info := roomInfo{subject: req.Subject, encrypt: req.Encrypt} + if err := p.subscribeRoom(roomID, info); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "room_id": roomID, "subject": req.Subject, "encrypt": req.Encrypt, + }) +} + +func (h *Hub) handleJoin(w http.ResponseWriter, r *http.Request) { + var req struct { + Peer string `json:"peer"` + RoomID string `json:"room_id"` + } + if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" { + writeErr(w, http.StatusBadRequest, "peer and room_id required") + return + } + p, ok := h.lookup(req.Peer) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) + return + } + if err := p.client.Join(req.RoomID); err != nil { + writeErr(w, http.StatusBadRequest, "join failed: "+err.Error()) + return + } + info, err := fetchRoomInfo(req.RoomID) + if err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + if err := p.subscribeRoom(req.RoomID, info); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"subject": info.subject, "encrypt": info.encrypt}) +} + +func (h *Hub) handleInvite(w http.ResponseWriter, r *http.Request) { + var req struct { + Peer string `json:"peer"` + RoomID string `json:"room_id"` + Target string `json:"target"` + } + if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" { + writeErr(w, http.StatusBadRequest, "peer, room_id and target required") + return + } + p, ok := h.lookup(req.Peer) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) + return + } + target, ok := h.lookup(req.Target) + if !ok { + writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist; connect it first") + return + } + if err := p.client.Invite(req.RoomID, target.endpoint); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "invited", "target": req.Target}) +} + +func (h *Hub) handlePublish(w http.ResponseWriter, r *http.Request) { + var req struct { + Peer string `json:"peer"` + RoomID string `json:"room_id"` + Text string `json:"text"` + } + if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" { + writeErr(w, http.StatusBadRequest, "peer and room_id required") + return + } + p, ok := h.lookup(req.Peer) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) + return + } + if err := p.client.Publish(req.RoomID, []byte(req.Text)); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "published"}) +} + +func (h *Hub) handleKick(w http.ResponseWriter, r *http.Request) { + var req struct { + Peer string `json:"peer"` + RoomID string `json:"room_id"` + Target string `json:"target"` + } + if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" { + writeErr(w, http.StatusBadRequest, "peer, room_id and target required") + return + } + p, ok := h.lookup(req.Peer) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) + return + } + target, ok := h.lookup(req.Target) + if !ok { + writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist") + return + } + if err := p.client.Kick(req.RoomID, target.endpoint.ID); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "kicked", "target": req.Target}) +} + +// handleStream is the SSE endpoint. The browser opens one EventSource per peer; +// each received Event is emitted as a `data: \n\n` block. The listener is +// cleaned up when the HTTP request context is cancelled (tab closed / reload). +func (h *Hub) handleStream(w http.ResponseWriter, r *http.Request) { + name := r.URL.Query().Get("peer") + if name == "" { + writeErr(w, http.StatusBadRequest, "peer query param required") + return + } + p, ok := h.lookup(name) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown peer "+name) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + writeErr(w, http.StatusInternalServerError, "streaming unsupported") + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + ch := make(chan Event, 64) + p.addListener(ch) + defer p.removeListener(ch) + + // Initial comment so the browser marks the stream open immediately. + fmt.Fprintf(w, ": connected to %s\n\n", name) + flusher.Flush() + + ctx := r.Context() + ping := time.NewTicker(20 * time.Second) + defer ping.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ping.C: + fmt.Fprintf(w, ": ping\n\n") + flusher.Flush() + case ev := <-ch: + b, err := json.Marshal(ev) + if err != nil { + continue + } + fmt.Fprintf(w, "data: %s\n\n", b) + flusher.Flush() + } + } +} + +// --------------------------------------------------------------------------- +// main: bring up NATS, control plane, and the web server; tear them all down +// cleanly on signal. +// --------------------------------------------------------------------------- + +func main() { + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[playground] ") + + if err := os.MkdirAll(localFiles, 0o755); err != nil { + log.Fatalf("mkdir %s: %v", localFiles, err) + } + + // 1. Data plane: embedded NATS + JetStream on the fixed internal port. + ns, err := embeddednats.Start(filepath.Join(localFiles, "js"), natsPort) + if err != nil { + log.Fatalf("start embedded nats: %v", err) + } + log.Printf("embedded NATS (JetStream) ready: %s", embeddednats.ClientURL(ns)) + + // 2. Control plane: membership store + blob store + internal HTTP server. + store, err := membership.Open(filepath.Join(localFiles, "play.db")) + if err != nil { + ns.Shutdown() + log.Fatalf("open membership store: %v", err) + } + blobs, err := blobstore.New(filepath.Join(localFiles, "blobs")) + if err != nil { + store.Close() + ns.Shutdown() + log.Fatalf("open blob store: %v", err) + } + ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs)} + go func() { + if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("control plane: %v", err) + } + }() + if err := waitHealthy(ctrlURL+"/healthz", 5*time.Second); err != nil { + log.Fatalf("control plane not healthy: %v", err) + } + log.Printf("control plane ready: %s", ctrlURL) + + // 3. Web UI on :7700. + hub := newHub() + mux := http.NewServeMux() + mux.HandleFunc("/", hub.handleIndex) + mux.HandleFunc("POST /api/peer", hub.handlePeer) + mux.HandleFunc("GET /api/peers", hub.handlePeers) + mux.HandleFunc("POST /api/room", hub.handleRoom) + mux.HandleFunc("POST /api/join", hub.handleJoin) + mux.HandleFunc("POST /api/invite", hub.handleInvite) + mux.HandleFunc("POST /api/publish", hub.handlePublish) + mux.HandleFunc("POST /api/kick", hub.handleKick) + mux.HandleFunc("GET /api/stream", hub.handleStream) + webSrv := &http.Server{Addr: webAddr, Handler: mux} + go func() { + if err := webSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("web server: %v", err) + } + }() + log.Printf("web UI ready: http://%s", webAddr) + log.Printf("open http://localhost:7700 in two browser tabs to try the bus") + + // 4. Graceful shutdown. + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + log.Printf("shutting down...") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = webSrv.Shutdown(ctx) + hub.closeAll() + _ = ctrlSrv.Shutdown(ctx) + store.Close() + ns.Shutdown() + ns.WaitForShutdown() + log.Printf("bye") +} + +// waitHealthy polls url until it returns a 2xx/3xx or the deadline elapses. +func waitHealthy(url string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + c := &http.Client{Timeout: 500 * time.Millisecond} + for time.Now().Before(deadline) { + resp, err := c.Get(url) + if err == nil { + resp.Body.Close() + if resp.StatusCode < 400 { + return nil + } + } + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("timeout waiting for %s", url) +}