6b162deeb0
Añade GET /api/bench (SSE) y una seccion de simulador en index.html: un publisher inunda una room con miles de mensajes a N subscribers y una grafica en vivo anima el throughput. Las dos politicas de room se exponen como flags independientes (persist=JetStream, encrypt=E2E AEAD+Ed25519) mas tamano de payload, midiendo el coste de cada capa con la libreria cliente real. El benchmark usa peers efimeros propios, sin tocar los peers nombrados del sandbox manual. Verificado: las 4 combinaciones enc x persist con fan-out exacto. Bump app v0.2.0.
854 lines
25 KiB
Go
854 lines
25 KiB
Go
// Command playground is an all-in-one, web-based sandbox for the unibus message
|
|
// bus. A single `go run ./playground` launches the entire stack embedded:
|
|
//
|
|
// - an embedded NATS server with JetStream (the data plane),
|
|
// - the membership control plane (rooms, members, sealed keys, rekey) over an
|
|
// internal HTTP server,
|
|
// - the media blob store, and
|
|
// - a browser-facing web UI on :7700.
|
|
//
|
|
// The browser never speaks NATS. The Go server is the actual bus peer: it holds
|
|
// one unibus client per named peer, subscribes to rooms on the peer's behalf,
|
|
// and streams received messages to the browser over Server-Sent Events. The
|
|
// browser drives everything with plain fetch() + EventSource() — no build step,
|
|
// no JS framework, no external libraries.
|
|
//
|
|
// This is a playground (see .claude/rules/playgrounds.md): it lives inside the
|
|
// unibus app, reuses the parent module (no new go.mod), is not indexed, and
|
|
// stores ephemeral state under playground/local_files/.
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
_ "embed"
|
|
|
|
cs "fn-registry/functions/cybersecurity"
|
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
|
"github.com/enmanuel/unibus/pkg/client"
|
|
"github.com/enmanuel/unibus/pkg/embeddednats"
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/enmanuel/unibus/pkg/membership"
|
|
"github.com/enmanuel/unibus/pkg/room"
|
|
)
|
|
|
|
// Fixed ports (verified free before assignment — do not change without reason).
|
|
const (
|
|
webAddr = "127.0.0.1:7700" // browser-facing web UI
|
|
ctrlAddr = "127.0.0.1:8480" // internal membership control plane
|
|
ctrlURL = "http://" + ctrlAddr
|
|
natsPort = 4260 // internal embedded NATS
|
|
natsURL = "nats://127.0.0.1:4260"
|
|
localFiles = "playground/local_files"
|
|
)
|
|
|
|
//go:embed index.html
|
|
var indexHTML []byte
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Event: a message received by a peer on one of its subscribed rooms. Fanned
|
|
// out to every SSE listener attached to that peer.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type Event struct {
|
|
RoomID string `json:"room_id"`
|
|
Subject string `json:"subject"`
|
|
Sender string `json:"sender"`
|
|
Text string `json:"text"`
|
|
Encrypted bool `json:"encrypted"`
|
|
TS int64 `json:"ts"` // unix millis
|
|
}
|
|
|
|
// roomInfo caches the per-room metadata a peer needs to label incoming frames.
|
|
type roomInfo struct {
|
|
subject string
|
|
encrypt bool
|
|
}
|
|
|
|
// peerState holds everything about one named peer: its bus client, its public
|
|
// endpoint, its live subscriptions, the rooms it knows, and the set of SSE
|
|
// listener channels currently attached to it.
|
|
type peerState struct {
|
|
name string
|
|
client *client.Client
|
|
endpoint client.Endpoint
|
|
|
|
mu sync.Mutex
|
|
subs map[string]*client.Sub // roomID -> subscription
|
|
rooms map[string]roomInfo // roomID -> subject/encrypt
|
|
listeners map[chan Event]struct{} // attached SSE channels
|
|
}
|
|
|
|
// emit fans an event out to all attached listeners without blocking on a slow
|
|
// or disconnected consumer.
|
|
func (p *peerState) emit(ev Event) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for ch := range p.listeners {
|
|
select {
|
|
case ch <- ev:
|
|
default: // listener buffer full: drop rather than block the NATS callback
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *peerState) addListener(ch chan Event) {
|
|
p.mu.Lock()
|
|
p.listeners[ch] = struct{}{}
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
func (p *peerState) removeListener(ch chan Event) {
|
|
p.mu.Lock()
|
|
delete(p.listeners, ch)
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
func (p *peerState) setRoom(roomID string, info roomInfo) {
|
|
p.mu.Lock()
|
|
p.rooms[roomID] = info
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Hub: the registry of peers, protected by a single mutex.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type Hub struct {
|
|
mu sync.Mutex
|
|
peers map[string]*peerState
|
|
}
|
|
|
|
func newHub() *Hub { return &Hub{peers: map[string]*peerState{}} }
|
|
|
|
// getOrCreate returns the peer for name, creating its identity + bus client on
|
|
// first use. Identities persist to playground/local_files/<name>.id so a peer
|
|
// keeps the same endpoint across restarts.
|
|
func (h *Hub) getOrCreate(name string) (*peerState, error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if p, ok := h.peers[name]; ok {
|
|
return p, nil
|
|
}
|
|
idPath := filepath.Join(localFiles, name+".id")
|
|
id, err := client.LoadOrCreateIdentity(idPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("identity for %q: %w", name, err)
|
|
}
|
|
c, err := client.New(natsURL, ctrlURL, id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("client for %q: %w", name, err)
|
|
}
|
|
p := &peerState{
|
|
name: name,
|
|
client: c,
|
|
endpoint: c.Endpoint(),
|
|
subs: map[string]*client.Sub{},
|
|
rooms: map[string]roomInfo{},
|
|
listeners: map[chan Event]struct{}{},
|
|
}
|
|
h.peers[name] = p
|
|
return p, nil
|
|
}
|
|
|
|
// lookup returns an already-created peer or false.
|
|
func (h *Hub) lookup(name string) (*peerState, bool) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
p, ok := h.peers[name]
|
|
return p, ok
|
|
}
|
|
|
|
// list returns a snapshot of all peers (name + endpoint id).
|
|
func (h *Hub) list() []map[string]string {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
out := make([]map[string]string, 0, len(h.peers))
|
|
for name, p := range h.peers {
|
|
out = append(out, map[string]string{"name": name, "endpoint_id": p.endpoint.ID})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (h *Hub) closeAll() {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
for _, p := range h.peers {
|
|
p.mu.Lock()
|
|
for _, sub := range p.subs {
|
|
_ = sub.Unsubscribe()
|
|
}
|
|
p.mu.Unlock()
|
|
_ = p.client.Close()
|
|
}
|
|
}
|
|
|
|
// subscribeRoom subscribes the peer to a room (idempotent) and wires the frame
|
|
// handler to fan incoming messages out as Events. info labels each event with
|
|
// the room's subject and encryption flag.
|
|
func (p *peerState) subscribeRoom(roomID string, info roomInfo) error {
|
|
p.mu.Lock()
|
|
if _, already := p.subs[roomID]; already {
|
|
p.mu.Unlock()
|
|
return nil
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
sub, err := p.client.Subscribe(roomID, func(f frame.Frame, plaintext []byte) {
|
|
p.emit(Event{
|
|
RoomID: roomID,
|
|
Subject: info.subject,
|
|
Sender: f.Sender,
|
|
Text: string(plaintext),
|
|
Encrypted: info.encrypt,
|
|
TS: time.Now().UnixMilli(),
|
|
})
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("subscribe room %s: %w", roomID, err)
|
|
}
|
|
p.mu.Lock()
|
|
p.subs[roomID] = sub
|
|
p.mu.Unlock()
|
|
p.setRoom(roomID, info)
|
|
return nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Control-plane helper: fetch a room's subject + policy from membershipd. The
|
|
// client package keeps fetchRoom private, so the playground talks to the
|
|
// control plane directly (read endpoints are unauthenticated by design).
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type ctrlRoomResp struct {
|
|
Subject string `json:"subject"`
|
|
Epoch int `json:"epoch"`
|
|
Policy struct {
|
|
Encrypt bool `json:"encrypt"`
|
|
Persist bool `json:"persist"`
|
|
SignMsgs bool `json:"sign_msgs"`
|
|
} `json:"policy"`
|
|
}
|
|
|
|
func fetchRoomInfo(roomID string) (roomInfo, error) {
|
|
resp, err := http.Get(ctrlURL + "/rooms/" + roomID)
|
|
if err != nil {
|
|
return roomInfo{}, fmt.Errorf("fetch room %s: %w", roomID, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return roomInfo{}, fmt.Errorf("room %s not found (status %d)", roomID, resp.StatusCode)
|
|
}
|
|
var r ctrlRoomResp
|
|
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
|
|
return roomInfo{}, fmt.Errorf("decode room %s: %w", roomID, err)
|
|
}
|
|
return roomInfo{subject: r.Subject, encrypt: r.Policy.Encrypt}, nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// HTTP handlers (web UI on :7700).
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func writeJSON(w http.ResponseWriter, code int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(code)
|
|
_ = json.NewEncoder(w).Encode(v)
|
|
}
|
|
|
|
func writeErr(w http.ResponseWriter, code int, msg string) {
|
|
writeJSON(w, code, map[string]string{"error": msg})
|
|
}
|
|
|
|
func decodeBody(r *http.Request, out any) error {
|
|
defer r.Body.Close()
|
|
return json.NewDecoder(r.Body).Decode(out)
|
|
}
|
|
|
|
func (h *Hub) handleIndex(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/" {
|
|
http.NotFound(w, r)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
_, _ = w.Write(indexHTML)
|
|
}
|
|
|
|
func (h *Hub) handlePeer(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Name string `json:"name"`
|
|
}
|
|
if err := decodeBody(r, &req); err != nil || req.Name == "" {
|
|
writeErr(w, http.StatusBadRequest, "name required")
|
|
return
|
|
}
|
|
p, err := h.getOrCreate(req.Name)
|
|
if err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"name": p.name, "endpoint_id": p.endpoint.ID})
|
|
}
|
|
|
|
func (h *Hub) handlePeers(w http.ResponseWriter, r *http.Request) {
|
|
writeJSON(w, http.StatusOK, h.list())
|
|
}
|
|
|
|
func (h *Hub) handleRoom(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Peer string `json:"peer"`
|
|
Subject string `json:"subject"`
|
|
Encrypt bool `json:"encrypt"`
|
|
Persist bool `json:"persist"`
|
|
}
|
|
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.Subject == "" {
|
|
writeErr(w, http.StatusBadRequest, "peer and subject required")
|
|
return
|
|
}
|
|
p, ok := h.lookup(req.Peer)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
|
|
return
|
|
}
|
|
// The two checkboxes map to an explicit per-room policy. encrypt drives both
|
|
// encryption and per-message signing; persist (default false) independently
|
|
// toggles durable JetStream history. persist=false keeps plain ephemeral NATS.
|
|
policy := room.Policy{Encrypt: req.Encrypt, Persist: req.Persist, SignMsgs: req.Encrypt}
|
|
roomID, err := p.client.CreateRoom(req.Subject, policy)
|
|
if err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
info := roomInfo{subject: req.Subject, encrypt: req.Encrypt}
|
|
if err := p.subscribeRoom(roomID, info); err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"room_id": roomID, "subject": req.Subject, "encrypt": req.Encrypt, "persist": req.Persist,
|
|
})
|
|
}
|
|
|
|
func (h *Hub) handleJoin(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Peer string `json:"peer"`
|
|
RoomID string `json:"room_id"`
|
|
}
|
|
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" {
|
|
writeErr(w, http.StatusBadRequest, "peer and room_id required")
|
|
return
|
|
}
|
|
p, ok := h.lookup(req.Peer)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
|
|
return
|
|
}
|
|
if err := p.client.Join(req.RoomID); err != nil {
|
|
writeErr(w, http.StatusBadRequest, "join failed: "+err.Error())
|
|
return
|
|
}
|
|
info, err := fetchRoomInfo(req.RoomID)
|
|
if err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if err := p.subscribeRoom(req.RoomID, info); err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"subject": info.subject, "encrypt": info.encrypt})
|
|
}
|
|
|
|
func (h *Hub) handleInvite(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Peer string `json:"peer"`
|
|
RoomID string `json:"room_id"`
|
|
Target string `json:"target"`
|
|
}
|
|
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" {
|
|
writeErr(w, http.StatusBadRequest, "peer, room_id and target required")
|
|
return
|
|
}
|
|
p, ok := h.lookup(req.Peer)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
|
|
return
|
|
}
|
|
target, ok := h.lookup(req.Target)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist; connect it first")
|
|
return
|
|
}
|
|
if err := p.client.Invite(req.RoomID, target.endpoint); err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "invited", "target": req.Target})
|
|
}
|
|
|
|
func (h *Hub) handlePublish(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Peer string `json:"peer"`
|
|
RoomID string `json:"room_id"`
|
|
Text string `json:"text"`
|
|
}
|
|
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" {
|
|
writeErr(w, http.StatusBadRequest, "peer and room_id required")
|
|
return
|
|
}
|
|
p, ok := h.lookup(req.Peer)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
|
|
return
|
|
}
|
|
if err := p.client.Publish(req.RoomID, []byte(req.Text)); err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "published"})
|
|
}
|
|
|
|
func (h *Hub) handleKick(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Peer string `json:"peer"`
|
|
RoomID string `json:"room_id"`
|
|
Target string `json:"target"`
|
|
}
|
|
if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" {
|
|
writeErr(w, http.StatusBadRequest, "peer, room_id and target required")
|
|
return
|
|
}
|
|
p, ok := h.lookup(req.Peer)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer)
|
|
return
|
|
}
|
|
target, ok := h.lookup(req.Target)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist")
|
|
return
|
|
}
|
|
if err := p.client.Kick(req.RoomID, target.endpoint.ID); err != nil {
|
|
writeErr(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "kicked", "target": req.Target})
|
|
}
|
|
|
|
// handleStream is the SSE endpoint. The browser opens one EventSource per peer;
|
|
// each received Event is emitted as a `data: <json>\n\n` block. The listener is
|
|
// cleaned up when the HTTP request context is cancelled (tab closed / reload).
|
|
func (h *Hub) handleStream(w http.ResponseWriter, r *http.Request) {
|
|
name := r.URL.Query().Get("peer")
|
|
if name == "" {
|
|
writeErr(w, http.StatusBadRequest, "peer query param required")
|
|
return
|
|
}
|
|
p, ok := h.lookup(name)
|
|
if !ok {
|
|
writeErr(w, http.StatusBadRequest, "unknown peer "+name)
|
|
return
|
|
}
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
writeErr(w, http.StatusInternalServerError, "streaming unsupported")
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
|
|
ch := make(chan Event, 64)
|
|
p.addListener(ch)
|
|
defer p.removeListener(ch)
|
|
|
|
// Initial comment so the browser marks the stream open immediately.
|
|
fmt.Fprintf(w, ": connected to %s\n\n", name)
|
|
flusher.Flush()
|
|
|
|
ctx := r.Context()
|
|
ping := time.NewTicker(20 * time.Second)
|
|
defer ping.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ping.C:
|
|
fmt.Fprintf(w, ": ping\n\n")
|
|
flusher.Flush()
|
|
case ev := <-ch:
|
|
b, err := json.Marshal(ev)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
fmt.Fprintf(w, "data: %s\n\n", b)
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Benchmark: one publisher floods a room with thousands of messages that N
|
|
// subscribers receive. The two policy axes are exposed as independent flags:
|
|
// encrypt (AEAD payload + Ed25519 per-message signature) and persist (durable
|
|
// JetStream history vs ephemeral core NATS). Payload size is configurable. The
|
|
// benchmark uses its own ephemeral peers (not the hub's named peers) so it never
|
|
// interferes with the manual sandbox, and streams progress samples over SSE so
|
|
// the browser can animate a live throughput chart.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// benchSample is one Server-Sent Event of a running benchmark.
|
|
type benchSample struct {
|
|
Type string `json:"type"` // "start" | "sample" | "done" | "error"
|
|
T float64 `json:"t"`
|
|
Sent int64 `json:"sent"`
|
|
Recv int64 `json:"recv"`
|
|
NMsgs int `json:"n_msgs,omitempty"`
|
|
NSubs int `json:"n_subs,omitempty"`
|
|
Payload int `json:"payload,omitempty"`
|
|
Encrypt bool `json:"encrypt,omitempty"`
|
|
Persist bool `json:"persist,omitempty"`
|
|
Capped bool `json:"capped,omitempty"`
|
|
PubTps int64 `json:"pub_tps,omitempty"`
|
|
RecvTps int64 `json:"recv_tps,omitempty"`
|
|
PerSub []int64 `json:"per_sub,omitempty"`
|
|
Msg string `json:"msg,omitempty"`
|
|
}
|
|
|
|
// runBench wires up one publisher + nSubs subscribers, publishes nMsgs payloads,
|
|
// and calls emit periodically with the running totals. emit is only ever called
|
|
// from the calling goroutine (the SSE handler), so it needs no locking.
|
|
func runBench(ctx context.Context, emit func(benchSample), nMsgs, nSubs, payloadBytes int, encrypt, persist bool) {
|
|
policy := room.Policy{Encrypt: encrypt, Persist: persist, SignMsgs: encrypt}
|
|
subject := fmt.Sprintf("bench.%d", time.Now().UnixNano())
|
|
|
|
newPeer := func() (*client.Client, error) {
|
|
id, err := cs.GenerateIdentity()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client.New(natsURL, ctrlURL, id)
|
|
}
|
|
|
|
pub, err := newPeer()
|
|
if err != nil {
|
|
emit(benchSample{Type: "error", Msg: "publisher: " + err.Error()})
|
|
return
|
|
}
|
|
defer pub.Close()
|
|
|
|
roomID, err := pub.CreateRoom(subject, policy)
|
|
if err != nil {
|
|
emit(benchSample{Type: "error", Msg: "create room: " + err.Error()})
|
|
return
|
|
}
|
|
|
|
counters := make([]int64, nSubs)
|
|
subClients := make([]*client.Client, 0, nSubs)
|
|
defer func() {
|
|
for _, c := range subClients {
|
|
_ = c.Close()
|
|
}
|
|
}()
|
|
|
|
// One room, N subscribers. For encrypted rooms each subscriber must be invited
|
|
// (sealed key) and join before subscribing; for cleartext rooms Subscribe on
|
|
// the shared roomID is enough.
|
|
for i := 0; i < nSubs; i++ {
|
|
c, err := newPeer()
|
|
if err != nil {
|
|
emit(benchSample{Type: "error", Msg: fmt.Sprintf("subscriber %d: %v", i, err)})
|
|
return
|
|
}
|
|
subClients = append(subClients, c)
|
|
if encrypt {
|
|
if err := pub.Invite(roomID, c.Endpoint()); err != nil {
|
|
emit(benchSample{Type: "error", Msg: fmt.Sprintf("invite %d: %v", i, err)})
|
|
return
|
|
}
|
|
if err := c.Join(roomID); err != nil {
|
|
emit(benchSample{Type: "error", Msg: fmt.Sprintf("join %d: %v", i, err)})
|
|
return
|
|
}
|
|
}
|
|
idx := i
|
|
if _, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) {
|
|
atomic.AddInt64(&counters[idx], 1)
|
|
}); err != nil {
|
|
emit(benchSample{Type: "error", Msg: fmt.Sprintf("subscribe %d: %v", i, err)})
|
|
return
|
|
}
|
|
}
|
|
|
|
sumRecv := func() int64 {
|
|
var s int64
|
|
for i := range counters {
|
|
s += atomic.LoadInt64(&counters[i])
|
|
}
|
|
return s
|
|
}
|
|
|
|
payload := bytes.Repeat([]byte{'x'}, payloadBytes)
|
|
var sent int64
|
|
|
|
emit(benchSample{Type: "start", NMsgs: nMsgs, NSubs: nSubs, Payload: payloadBytes, Encrypt: encrypt, Persist: persist})
|
|
|
|
t0 := time.Now()
|
|
done := make(chan struct{})
|
|
var pubErr atomic.Value
|
|
go func() {
|
|
defer close(done)
|
|
for k := 0; k < nMsgs; k++ {
|
|
if err := pub.Publish(roomID, payload); err != nil {
|
|
pubErr.Store(err)
|
|
return
|
|
}
|
|
atomic.AddInt64(&sent, 1)
|
|
if k%256 == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
ticker := time.NewTicker(60 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
deadline := time.After(120 * time.Second)
|
|
target := int64(nMsgs) * int64(nSubs)
|
|
|
|
sampleLoop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-deadline:
|
|
break sampleLoop
|
|
case <-done:
|
|
break sampleLoop
|
|
case <-ticker.C:
|
|
emit(benchSample{Type: "sample", T: time.Since(t0).Seconds(), Sent: atomic.LoadInt64(&sent), Recv: sumRecv()})
|
|
}
|
|
}
|
|
if v := pubErr.Load(); v != nil {
|
|
emit(benchSample{Type: "error", Msg: "publish: " + v.(error).Error()})
|
|
return
|
|
}
|
|
|
|
// Final drain: keep sampling until every subscriber has caught up (or we give up).
|
|
for i := 0; i < 240; i++ {
|
|
if sumRecv() >= target {
|
|
break
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(25 * time.Millisecond):
|
|
}
|
|
emit(benchSample{Type: "sample", T: time.Since(t0).Seconds(), Sent: atomic.LoadInt64(&sent), Recv: sumRecv()})
|
|
}
|
|
|
|
dur := time.Since(t0).Seconds()
|
|
finalSent := atomic.LoadInt64(&sent)
|
|
finalRecv := sumRecv()
|
|
per := make([]int64, nSubs)
|
|
for i := range counters {
|
|
per[i] = atomic.LoadInt64(&counters[i])
|
|
}
|
|
var pubTps, recvTps int64
|
|
if dur > 0 {
|
|
pubTps = int64(float64(finalSent) / dur)
|
|
recvTps = int64(float64(finalRecv) / dur)
|
|
}
|
|
emit(benchSample{Type: "done", T: dur, Sent: finalSent, Recv: finalRecv, PerSub: per, PubTps: pubTps, RecvTps: recvTps, NSubs: nSubs})
|
|
}
|
|
|
|
// handleBench is the SSE endpoint that drives a benchmark from query params:
|
|
//
|
|
// GET /api/bench?n_msgs=20000&n_subs=3&payload=128&encrypt=0&persist=0
|
|
//
|
|
// Encrypted/persistent runs are capped to a lower message count (the per-message
|
|
// crypto + JetStream ack make them far slower); the cap is reported in the start
|
|
// sample so the UI can show it.
|
|
func (h *Hub) handleBench(w http.ResponseWriter, r *http.Request) {
|
|
q := r.URL.Query()
|
|
atoiDef := func(k string, def int) int {
|
|
if v, err := strconv.Atoi(q.Get(k)); err == nil {
|
|
return v
|
|
}
|
|
return def
|
|
}
|
|
truthy := func(k string) bool { v := q.Get(k); return v == "1" || v == "true" }
|
|
|
|
nMsgs := atoiDef("n_msgs", 20000)
|
|
nSubs := atoiDef("n_subs", 3)
|
|
payload := atoiDef("payload", 128)
|
|
encrypt := truthy("encrypt")
|
|
persist := truthy("persist")
|
|
|
|
if nSubs < 1 {
|
|
nSubs = 1
|
|
} else if nSubs > 16 {
|
|
nSubs = 16
|
|
}
|
|
if payload < 1 {
|
|
payload = 1
|
|
} else if payload > 8192 {
|
|
payload = 8192
|
|
}
|
|
if nMsgs < 100 {
|
|
nMsgs = 100
|
|
}
|
|
maxMsgs := 200000
|
|
if encrypt || persist {
|
|
maxMsgs = 30000 // crypto + JetStream ack are much slower; keep the run bounded
|
|
}
|
|
capped := false
|
|
if nMsgs > maxMsgs {
|
|
nMsgs, capped = maxMsgs, true
|
|
}
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
writeErr(w, http.StatusInternalServerError, "streaming unsupported")
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
fmt.Fprintf(w, ": bench start\n\n")
|
|
flusher.Flush()
|
|
|
|
emit := func(s benchSample) {
|
|
if s.Type == "start" {
|
|
s.Capped = capped
|
|
}
|
|
b, err := json.Marshal(s)
|
|
if err != nil {
|
|
return
|
|
}
|
|
fmt.Fprintf(w, "data: %s\n\n", b)
|
|
flusher.Flush()
|
|
}
|
|
|
|
runBench(r.Context(), emit, nMsgs, nSubs, payload, encrypt, persist)
|
|
fmt.Fprintf(w, "event: end\ndata: {}\n\n")
|
|
flusher.Flush()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// main: bring up NATS, control plane, and the web server; tear them all down
|
|
// cleanly on signal.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func main() {
|
|
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
|
log.SetPrefix("[playground] ")
|
|
|
|
if err := os.MkdirAll(localFiles, 0o755); err != nil {
|
|
log.Fatalf("mkdir %s: %v", localFiles, err)
|
|
}
|
|
|
|
// 1. Data plane: embedded NATS + JetStream on the fixed internal port.
|
|
ns, err := embeddednats.Start(filepath.Join(localFiles, "js"), natsPort)
|
|
if err != nil {
|
|
log.Fatalf("start embedded nats: %v", err)
|
|
}
|
|
log.Printf("embedded NATS (JetStream) ready: %s", embeddednats.ClientURL(ns))
|
|
|
|
// 2. Control plane: membership store + blob store + internal HTTP server.
|
|
store, err := membership.Open(filepath.Join(localFiles, "play.db"))
|
|
if err != nil {
|
|
ns.Shutdown()
|
|
log.Fatalf("open membership store: %v", err)
|
|
}
|
|
blobs, err := blobstore.New(filepath.Join(localFiles, "blobs"))
|
|
if err != nil {
|
|
store.Close()
|
|
ns.Shutdown()
|
|
log.Fatalf("open blob store: %v", err)
|
|
}
|
|
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs)}
|
|
go func() {
|
|
if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
log.Fatalf("control plane: %v", err)
|
|
}
|
|
}()
|
|
if err := waitHealthy(ctrlURL+"/healthz", 5*time.Second); err != nil {
|
|
log.Fatalf("control plane not healthy: %v", err)
|
|
}
|
|
log.Printf("control plane ready: %s", ctrlURL)
|
|
|
|
// 3. Web UI on :7700.
|
|
hub := newHub()
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/", hub.handleIndex)
|
|
mux.HandleFunc("POST /api/peer", hub.handlePeer)
|
|
mux.HandleFunc("GET /api/peers", hub.handlePeers)
|
|
mux.HandleFunc("POST /api/room", hub.handleRoom)
|
|
mux.HandleFunc("POST /api/join", hub.handleJoin)
|
|
mux.HandleFunc("POST /api/invite", hub.handleInvite)
|
|
mux.HandleFunc("POST /api/publish", hub.handlePublish)
|
|
mux.HandleFunc("POST /api/kick", hub.handleKick)
|
|
mux.HandleFunc("GET /api/stream", hub.handleStream)
|
|
mux.HandleFunc("GET /api/bench", hub.handleBench)
|
|
webSrv := &http.Server{Addr: webAddr, Handler: mux}
|
|
go func() {
|
|
if err := webSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
log.Fatalf("web server: %v", err)
|
|
}
|
|
}()
|
|
log.Printf("web UI ready: http://%s", webAddr)
|
|
log.Printf("open http://localhost:7700 in two browser tabs to try the bus")
|
|
|
|
// 4. Graceful shutdown.
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
|
|
<-stop
|
|
log.Printf("shutting down...")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = webSrv.Shutdown(ctx)
|
|
hub.closeAll()
|
|
_ = ctrlSrv.Shutdown(ctx)
|
|
store.Close()
|
|
ns.Shutdown()
|
|
ns.WaitForShutdown()
|
|
log.Printf("bye")
|
|
}
|
|
|
|
// waitHealthy polls url until it returns a 2xx/3xx or the deadline elapses.
|
|
func waitHealthy(url string, timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
c := &http.Client{Timeout: 500 * time.Millisecond}
|
|
for time.Now().Before(deadline) {
|
|
resp, err := c.Get(url)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
if resp.StatusCode < 400 {
|
|
return nil
|
|
}
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return fmt.Errorf("timeout waiting for %s", url)
|
|
}
|