From 6b162deeb0b1749978deb41e6143d82279884695 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Wed, 3 Jun 2026 22:33:26 +0200 Subject: [PATCH] feat(playground): benchmark de rendimiento con flags JetStream/E2E/payload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Añade GET /api/bench (SSE) y una seccion de simulador en index.html: un publisher inunda una room con miles de mensajes a N subscribers y una grafica en vivo anima el throughput. Las dos politicas de room se exponen como flags independientes (persist=JetStream, encrypt=E2E AEAD+Ed25519) mas tamano de payload, midiendo el coste de cada capa con la libreria cliente real. El benchmark usa peers efimeros propios, sin tocar los peers nombrados del sandbox manual. Verificado: las 4 combinaciones enc x persist con fan-out exacto. Bump app v0.2.0. --- app.md | 12 +- playground/README.md | 34 ++++++ playground/index.html | 137 ++++++++++++++++++++++ playground/server.go | 256 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 438 insertions(+), 1 deletion(-) diff --git a/app.md b/app.md index 90ba159..783f9b0 100644 --- a/app.md +++ b/app.md @@ -2,7 +2,7 @@ name: unibus lang: go domain: infra -version: 0.1.0 +version: 0.2.0 description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo." tags: [service, messaging, nats, e2e] uses_functions: @@ -151,3 +151,13 @@ rpc. request/reply (rpc.indexer) room. chat humano/grupo (room.general) agent..{in,out} inbox/outbox de agente LLM (agent.scout.in) ``` + +## Capability growth log + +- v0.2.0 (2026-06-03) — el playground gana un benchmark de rendimiento + (`GET /api/bench`, SSE): un publisher inunda una room con miles de mensajes a + N subscribers y una gráfica en vivo anima el throughput. Expone las dos + políticas como flags independientes (JetStream/`Persist` y encriptación + E2E/`Encrypt`) más tamaño de payload, de modo que se mide el coste de cada + capa (core NATS vs JetStream vs E2E vs E2E+JetStream) usando la librería + cliente real, sin reimplementar nada. diff --git a/playground/README.md b/playground/README.md index e8f6284..75488b0 100644 --- a/playground/README.md +++ b/playground/README.md @@ -72,6 +72,40 @@ Cleartext rooms (leave the checkbox unticked) behave like plain NATS fan-out: fast, ephemeral, unsigned. Encrypted rooms are the Matrix-like mode: E2E encrypted, persisted, and per-message signed. +## Benchmark: throughput simulator + +The bottom panel of the UI is a performance simulator. Press **▶ Ejecutar +benchmark** and one publisher floods a fresh room with thousands of messages +that N subscribers receive (fan-out); a live canvas chart animates the sent vs +received totals while it runs. + +The two policy axes are exposed as **independent flags**, so the benchmark +measures the cost of each layer in isolation: + +| JetStream | Encryption | Room policy | What it costs | +|---|---|---|---| +| off | off | `{Encrypt:false, Persist:false}` | plain core NATS fan-out | +| **on** | off | `{Encrypt:false, Persist:true}` | durable JetStream (publish ack per message) | +| off | **on** | `{Encrypt:true, Persist:false}` | AEAD + Ed25519 signature per message, core transport | +| **on** | **on** | `{Encrypt:true, Persist:true}` | full E2E + durable history | + +A **payload size** slider (16 B – 8 KiB) sets the message size. Encrypted or +persistent runs are capped to 30 000 messages (each message pays per-message +crypto and/or a JetStream ack, so they run much slower than plain NATS). + +The benchmark uses its own ephemeral peers (fresh identities, never persisted), +so it never touches the named peers of the manual sandbox. + +It is driven by an SSE endpoint that streams progress samples: + +```bash +curl -N "http://localhost:7700/api/bench?n_msgs=20000&n_subs=3&payload=128&encrypt=0&persist=0" +# emits: data: {"type":"start",...} data: {"type":"sample",...} data: {"type":"done",...} +``` + +Query params: `n_msgs`, `n_subs` (1–16), `payload` (bytes), `encrypt` (0/1), +`persist` (0/1). + ## State / cleanup All writable state lives under `playground/local_files/`: diff --git a/playground/index.html b/playground/index.html index 945c236..be97845 100644 --- a/playground/index.html +++ b/playground/index.html @@ -221,6 +221,51 @@ + +
+
+

Benchmark de rendimiento · 1 publisher → N subscribers

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+ JetStream y Encriptación son ejes independientes: NATS core (ambos off) · JetStream durable · E2E (AEAD + firma Ed25519 por mensaje) · E2E + JetStream. Los modos con cripto o persistencia se limitan a 30 000 mensajes (cada mensaje paga cifrado/firma/ack). +
+
+
Enviados
0
+
Recibidos (Σ subs)
0
+
Throughput recv
0
+
Tiempo
0.00 s
+
+ +
+ enviados (publisher) + recibidos (suma de subscribers) +
+
+
+
+ diff --git a/playground/server.go b/playground/server.go index 9ba550c..5690913 100644 --- a/playground/server.go +++ b/playground/server.go @@ -19,6 +19,7 @@ package main import ( + "bytes" "context" "encoding/json" "errors" @@ -28,12 +29,15 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "sync" + "sync/atomic" "syscall" "time" _ "embed" + cs "fn-registry/functions/cybersecurity" "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/embeddednats" @@ -498,6 +502,257 @@ func (h *Hub) handleStream(w http.ResponseWriter, r *http.Request) { } } +// --------------------------------------------------------------------------- +// Benchmark: one publisher floods a room with thousands of messages that N +// subscribers receive. The two policy axes are exposed as independent flags: +// encrypt (AEAD payload + Ed25519 per-message signature) and persist (durable +// JetStream history vs ephemeral core NATS). Payload size is configurable. The +// benchmark uses its own ephemeral peers (not the hub's named peers) so it never +// interferes with the manual sandbox, and streams progress samples over SSE so +// the browser can animate a live throughput chart. +// --------------------------------------------------------------------------- + +// benchSample is one Server-Sent Event of a running benchmark. +type benchSample struct { + Type string `json:"type"` // "start" | "sample" | "done" | "error" + T float64 `json:"t"` + Sent int64 `json:"sent"` + Recv int64 `json:"recv"` + NMsgs int `json:"n_msgs,omitempty"` + NSubs int `json:"n_subs,omitempty"` + Payload int `json:"payload,omitempty"` + Encrypt bool `json:"encrypt,omitempty"` + Persist bool `json:"persist,omitempty"` + Capped bool `json:"capped,omitempty"` + PubTps int64 `json:"pub_tps,omitempty"` + RecvTps int64 `json:"recv_tps,omitempty"` + PerSub []int64 `json:"per_sub,omitempty"` + Msg string `json:"msg,omitempty"` +} + +// runBench wires up one publisher + nSubs subscribers, publishes nMsgs payloads, +// and calls emit periodically with the running totals. emit is only ever called +// from the calling goroutine (the SSE handler), so it needs no locking. +func runBench(ctx context.Context, emit func(benchSample), nMsgs, nSubs, payloadBytes int, encrypt, persist bool) { + policy := room.Policy{Encrypt: encrypt, Persist: persist, SignMsgs: encrypt} + subject := fmt.Sprintf("bench.%d", time.Now().UnixNano()) + + newPeer := func() (*client.Client, error) { + id, err := cs.GenerateIdentity() + if err != nil { + return nil, err + } + return client.New(natsURL, ctrlURL, id) + } + + pub, err := newPeer() + if err != nil { + emit(benchSample{Type: "error", Msg: "publisher: " + err.Error()}) + return + } + defer pub.Close() + + roomID, err := pub.CreateRoom(subject, policy) + if err != nil { + emit(benchSample{Type: "error", Msg: "create room: " + err.Error()}) + return + } + + counters := make([]int64, nSubs) + subClients := make([]*client.Client, 0, nSubs) + defer func() { + for _, c := range subClients { + _ = c.Close() + } + }() + + // One room, N subscribers. For encrypted rooms each subscriber must be invited + // (sealed key) and join before subscribing; for cleartext rooms Subscribe on + // the shared roomID is enough. + for i := 0; i < nSubs; i++ { + c, err := newPeer() + if err != nil { + emit(benchSample{Type: "error", Msg: fmt.Sprintf("subscriber %d: %v", i, err)}) + return + } + subClients = append(subClients, c) + if encrypt { + if err := pub.Invite(roomID, c.Endpoint()); err != nil { + emit(benchSample{Type: "error", Msg: fmt.Sprintf("invite %d: %v", i, err)}) + return + } + if err := c.Join(roomID); err != nil { + emit(benchSample{Type: "error", Msg: fmt.Sprintf("join %d: %v", i, err)}) + return + } + } + idx := i + if _, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) { + atomic.AddInt64(&counters[idx], 1) + }); err != nil { + emit(benchSample{Type: "error", Msg: fmt.Sprintf("subscribe %d: %v", i, err)}) + return + } + } + + sumRecv := func() int64 { + var s int64 + for i := range counters { + s += atomic.LoadInt64(&counters[i]) + } + return s + } + + payload := bytes.Repeat([]byte{'x'}, payloadBytes) + var sent int64 + + emit(benchSample{Type: "start", NMsgs: nMsgs, NSubs: nSubs, Payload: payloadBytes, Encrypt: encrypt, Persist: persist}) + + t0 := time.Now() + done := make(chan struct{}) + var pubErr atomic.Value + go func() { + defer close(done) + for k := 0; k < nMsgs; k++ { + if err := pub.Publish(roomID, payload); err != nil { + pubErr.Store(err) + return + } + atomic.AddInt64(&sent, 1) + if k%256 == 0 { + select { + case <-ctx.Done(): + return + default: + } + } + } + }() + + ticker := time.NewTicker(60 * time.Millisecond) + defer ticker.Stop() + deadline := time.After(120 * time.Second) + target := int64(nMsgs) * int64(nSubs) + +sampleLoop: + for { + select { + case <-ctx.Done(): + return + case <-deadline: + break sampleLoop + case <-done: + break sampleLoop + case <-ticker.C: + emit(benchSample{Type: "sample", T: time.Since(t0).Seconds(), Sent: atomic.LoadInt64(&sent), Recv: sumRecv()}) + } + } + if v := pubErr.Load(); v != nil { + emit(benchSample{Type: "error", Msg: "publish: " + v.(error).Error()}) + return + } + + // Final drain: keep sampling until every subscriber has caught up (or we give up). + for i := 0; i < 240; i++ { + if sumRecv() >= target { + break + } + select { + case <-ctx.Done(): + return + case <-time.After(25 * time.Millisecond): + } + emit(benchSample{Type: "sample", T: time.Since(t0).Seconds(), Sent: atomic.LoadInt64(&sent), Recv: sumRecv()}) + } + + dur := time.Since(t0).Seconds() + finalSent := atomic.LoadInt64(&sent) + finalRecv := sumRecv() + per := make([]int64, nSubs) + for i := range counters { + per[i] = atomic.LoadInt64(&counters[i]) + } + var pubTps, recvTps int64 + if dur > 0 { + pubTps = int64(float64(finalSent) / dur) + recvTps = int64(float64(finalRecv) / dur) + } + emit(benchSample{Type: "done", T: dur, Sent: finalSent, Recv: finalRecv, PerSub: per, PubTps: pubTps, RecvTps: recvTps, NSubs: nSubs}) +} + +// handleBench is the SSE endpoint that drives a benchmark from query params: +// +// GET /api/bench?n_msgs=20000&n_subs=3&payload=128&encrypt=0&persist=0 +// +// Encrypted/persistent runs are capped to a lower message count (the per-message +// crypto + JetStream ack make them far slower); the cap is reported in the start +// sample so the UI can show it. +func (h *Hub) handleBench(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + atoiDef := func(k string, def int) int { + if v, err := strconv.Atoi(q.Get(k)); err == nil { + return v + } + return def + } + truthy := func(k string) bool { v := q.Get(k); return v == "1" || v == "true" } + + nMsgs := atoiDef("n_msgs", 20000) + nSubs := atoiDef("n_subs", 3) + payload := atoiDef("payload", 128) + encrypt := truthy("encrypt") + persist := truthy("persist") + + if nSubs < 1 { + nSubs = 1 + } else if nSubs > 16 { + nSubs = 16 + } + if payload < 1 { + payload = 1 + } else if payload > 8192 { + payload = 8192 + } + if nMsgs < 100 { + nMsgs = 100 + } + maxMsgs := 200000 + if encrypt || persist { + maxMsgs = 30000 // crypto + JetStream ack are much slower; keep the run bounded + } + capped := false + if nMsgs > maxMsgs { + nMsgs, capped = maxMsgs, true + } + + flusher, ok := w.(http.Flusher) + if !ok { + writeErr(w, http.StatusInternalServerError, "streaming unsupported") + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + fmt.Fprintf(w, ": bench start\n\n") + flusher.Flush() + + emit := func(s benchSample) { + if s.Type == "start" { + s.Capped = capped + } + b, err := json.Marshal(s) + if err != nil { + return + } + fmt.Fprintf(w, "data: %s\n\n", b) + flusher.Flush() + } + + runBench(r.Context(), emit, nMsgs, nSubs, payload, encrypt, persist) + fmt.Fprintf(w, "event: end\ndata: {}\n\n") + flusher.Flush() +} + // --------------------------------------------------------------------------- // main: bring up NATS, control plane, and the web server; tear them all down // cleanly on signal. @@ -553,6 +808,7 @@ func main() { mux.HandleFunc("POST /api/publish", hub.handlePublish) mux.HandleFunc("POST /api/kick", hub.handleKick) mux.HandleFunc("GET /api/stream", hub.handleStream) + mux.HandleFunc("GET /api/bench", hub.handleBench) webSrv := &http.Server{Addr: webAddr, Handler: mux} go func() { if err := webSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {