// Command playground is an all-in-one, web-based sandbox for the unibus message // bus. A single `go run ./playground` launches the entire stack embedded: // // - an embedded NATS server with JetStream (the data plane), // - the membership control plane (rooms, members, sealed keys, rekey) over an // internal HTTP server, // - the media blob store, and // - a browser-facing web UI on :7700. // // The browser never speaks NATS. The Go server is the actual bus peer: it holds // one unibus client per named peer, subscribes to rooms on the peer's behalf, // and streams received messages to the browser over Server-Sent Events. The // browser drives everything with plain fetch() + EventSource() — no build step, // no JS framework, no external libraries. // // This is a playground (see .claude/rules/playgrounds.md): it lives inside the // unibus app, reuses the parent module (no new go.mod), is not indexed, and // stores ephemeral state under playground/local_files/. package main import ( "bytes" "context" "encoding/json" "errors" "fmt" "log" "net/http" "os" "os/signal" "path/filepath" "strconv" "sync" "sync/atomic" "syscall" "time" _ "embed" cs "fn-registry/functions/cybersecurity" "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/embeddednats" "github.com/enmanuel/unibus/pkg/frame" "github.com/enmanuel/unibus/pkg/membership" "github.com/enmanuel/unibus/pkg/room" ) // Fixed ports (verified free before assignment — do not change without reason). const ( webAddr = "127.0.0.1:7700" // browser-facing web UI ctrlAddr = "127.0.0.1:8480" // internal membership control plane ctrlURL = "http://" + ctrlAddr natsPort = 4260 // internal embedded NATS natsURL = "nats://127.0.0.1:4260" localFiles = "playground/local_files" ) //go:embed index.html var indexHTML []byte // --------------------------------------------------------------------------- // Event: a message received by a peer on one of its subscribed rooms. Fanned // out to every SSE listener attached to that peer. // --------------------------------------------------------------------------- type Event struct { RoomID string `json:"room_id"` Subject string `json:"subject"` Sender string `json:"sender"` Text string `json:"text"` Encrypted bool `json:"encrypted"` TS int64 `json:"ts"` // unix millis } // roomInfo caches the per-room metadata a peer needs to label incoming frames. type roomInfo struct { subject string encrypt bool } // peerState holds everything about one named peer: its bus client, its public // endpoint, its live subscriptions, the rooms it knows, and the set of SSE // listener channels currently attached to it. type peerState struct { name string client *client.Client endpoint client.Endpoint mu sync.Mutex subs map[string]*client.Sub // roomID -> subscription rooms map[string]roomInfo // roomID -> subject/encrypt listeners map[chan Event]struct{} // attached SSE channels } // emit fans an event out to all attached listeners without blocking on a slow // or disconnected consumer. func (p *peerState) emit(ev Event) { p.mu.Lock() defer p.mu.Unlock() for ch := range p.listeners { select { case ch <- ev: default: // listener buffer full: drop rather than block the NATS callback } } } func (p *peerState) addListener(ch chan Event) { p.mu.Lock() p.listeners[ch] = struct{}{} p.mu.Unlock() } func (p *peerState) removeListener(ch chan Event) { p.mu.Lock() delete(p.listeners, ch) p.mu.Unlock() } func (p *peerState) setRoom(roomID string, info roomInfo) { p.mu.Lock() p.rooms[roomID] = info p.mu.Unlock() } // --------------------------------------------------------------------------- // Hub: the registry of peers, protected by a single mutex. // --------------------------------------------------------------------------- type Hub struct { mu sync.Mutex peers map[string]*peerState } func newHub() *Hub { return &Hub{peers: map[string]*peerState{}} } // getOrCreate returns the peer for name, creating its identity + bus client on // first use. Identities persist to playground/local_files/.id so a peer // keeps the same endpoint across restarts. func (h *Hub) getOrCreate(name string) (*peerState, error) { h.mu.Lock() defer h.mu.Unlock() if p, ok := h.peers[name]; ok { return p, nil } idPath := filepath.Join(localFiles, name+".id") id, err := client.LoadOrCreateIdentity(idPath) if err != nil { return nil, fmt.Errorf("identity for %q: %w", name, err) } c, err := client.New(natsURL, ctrlURL, id) if err != nil { return nil, fmt.Errorf("client for %q: %w", name, err) } p := &peerState{ name: name, client: c, endpoint: c.Endpoint(), subs: map[string]*client.Sub{}, rooms: map[string]roomInfo{}, listeners: map[chan Event]struct{}{}, } h.peers[name] = p return p, nil } // lookup returns an already-created peer or false. func (h *Hub) lookup(name string) (*peerState, bool) { h.mu.Lock() defer h.mu.Unlock() p, ok := h.peers[name] return p, ok } // list returns a snapshot of all peers (name + endpoint id). func (h *Hub) list() []map[string]string { h.mu.Lock() defer h.mu.Unlock() out := make([]map[string]string, 0, len(h.peers)) for name, p := range h.peers { out = append(out, map[string]string{"name": name, "endpoint_id": p.endpoint.ID}) } return out } func (h *Hub) closeAll() { h.mu.Lock() defer h.mu.Unlock() for _, p := range h.peers { p.mu.Lock() for _, sub := range p.subs { _ = sub.Unsubscribe() } p.mu.Unlock() _ = p.client.Close() } } // subscribeRoom subscribes the peer to a room (idempotent) and wires the frame // handler to fan incoming messages out as Events. info labels each event with // the room's subject and encryption flag. func (p *peerState) subscribeRoom(roomID string, info roomInfo) error { p.mu.Lock() if _, already := p.subs[roomID]; already { p.mu.Unlock() return nil } p.mu.Unlock() sub, err := p.client.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { p.emit(Event{ RoomID: roomID, Subject: info.subject, Sender: f.Sender, Text: string(plaintext), Encrypted: info.encrypt, TS: time.Now().UnixMilli(), }) }) if err != nil { return fmt.Errorf("subscribe room %s: %w", roomID, err) } p.mu.Lock() p.subs[roomID] = sub p.mu.Unlock() p.setRoom(roomID, info) return nil } // --------------------------------------------------------------------------- // Control-plane helper: fetch a room's subject + policy from membershipd. The // client package keeps fetchRoom private, so the playground talks to the // control plane directly (read endpoints are unauthenticated by design). // --------------------------------------------------------------------------- type ctrlRoomResp struct { Subject string `json:"subject"` Epoch int `json:"epoch"` Policy struct { Encrypt bool `json:"encrypt"` Persist bool `json:"persist"` SignMsgs bool `json:"sign_msgs"` } `json:"policy"` } func fetchRoomInfo(roomID string) (roomInfo, error) { resp, err := http.Get(ctrlURL + "/rooms/" + roomID) if err != nil { return roomInfo{}, fmt.Errorf("fetch room %s: %w", roomID, err) } defer resp.Body.Close() if resp.StatusCode >= 300 { return roomInfo{}, fmt.Errorf("room %s not found (status %d)", roomID, resp.StatusCode) } var r ctrlRoomResp if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { return roomInfo{}, fmt.Errorf("decode room %s: %w", roomID, err) } return roomInfo{subject: r.Subject, encrypt: r.Policy.Encrypt}, nil } // --------------------------------------------------------------------------- // HTTP handlers (web UI on :7700). // --------------------------------------------------------------------------- func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } func writeErr(w http.ResponseWriter, code int, msg string) { writeJSON(w, code, map[string]string{"error": msg}) } func decodeBody(r *http.Request, out any) error { defer r.Body.Close() return json.NewDecoder(r.Body).Decode(out) } func (h *Hub) handleIndex(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") _, _ = w.Write(indexHTML) } func (h *Hub) handlePeer(w http.ResponseWriter, r *http.Request) { var req struct { Name string `json:"name"` } if err := decodeBody(r, &req); err != nil || req.Name == "" { writeErr(w, http.StatusBadRequest, "name required") return } p, err := h.getOrCreate(req.Name) if err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"name": p.name, "endpoint_id": p.endpoint.ID}) } func (h *Hub) handlePeers(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, h.list()) } func (h *Hub) handleRoom(w http.ResponseWriter, r *http.Request) { var req struct { Peer string `json:"peer"` Subject string `json:"subject"` Encrypt bool `json:"encrypt"` Persist bool `json:"persist"` } if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.Subject == "" { writeErr(w, http.StatusBadRequest, "peer and subject required") return } p, ok := h.lookup(req.Peer) if !ok { writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) return } // The two checkboxes map to an explicit per-room policy. encrypt drives both // encryption and per-message signing; persist (default false) independently // toggles durable JetStream history. persist=false keeps plain ephemeral NATS. policy := room.Policy{Encrypt: req.Encrypt, Persist: req.Persist, SignMsgs: req.Encrypt} roomID, err := p.client.CreateRoom(req.Subject, policy) if err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } info := roomInfo{subject: req.Subject, encrypt: req.Encrypt} if err := p.subscribeRoom(roomID, info); err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]any{ "room_id": roomID, "subject": req.Subject, "encrypt": req.Encrypt, "persist": req.Persist, }) } func (h *Hub) handleJoin(w http.ResponseWriter, r *http.Request) { var req struct { Peer string `json:"peer"` RoomID string `json:"room_id"` } if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" { writeErr(w, http.StatusBadRequest, "peer and room_id required") return } p, ok := h.lookup(req.Peer) if !ok { writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) return } if err := p.client.Join(req.RoomID); err != nil { writeErr(w, http.StatusBadRequest, "join failed: "+err.Error()) return } info, err := fetchRoomInfo(req.RoomID) if err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } if err := p.subscribeRoom(req.RoomID, info); err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]any{"subject": info.subject, "encrypt": info.encrypt}) } func (h *Hub) handleInvite(w http.ResponseWriter, r *http.Request) { var req struct { Peer string `json:"peer"` RoomID string `json:"room_id"` Target string `json:"target"` } if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" { writeErr(w, http.StatusBadRequest, "peer, room_id and target required") return } p, ok := h.lookup(req.Peer) if !ok { writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) return } target, ok := h.lookup(req.Target) if !ok { writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist; connect it first") return } if err := p.client.Invite(req.RoomID, target.endpoint); err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"status": "invited", "target": req.Target}) } func (h *Hub) handlePublish(w http.ResponseWriter, r *http.Request) { var req struct { Peer string `json:"peer"` RoomID string `json:"room_id"` Text string `json:"text"` } if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" { writeErr(w, http.StatusBadRequest, "peer and room_id required") return } p, ok := h.lookup(req.Peer) if !ok { writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) return } if err := p.client.Publish(req.RoomID, []byte(req.Text)); err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"status": "published"}) } func (h *Hub) handleKick(w http.ResponseWriter, r *http.Request) { var req struct { Peer string `json:"peer"` RoomID string `json:"room_id"` Target string `json:"target"` } if err := decodeBody(r, &req); err != nil || req.Peer == "" || req.RoomID == "" || req.Target == "" { writeErr(w, http.StatusBadRequest, "peer, room_id and target required") return } p, ok := h.lookup(req.Peer) if !ok { writeErr(w, http.StatusBadRequest, "unknown peer "+req.Peer) return } target, ok := h.lookup(req.Target) if !ok { writeErr(w, http.StatusBadRequest, "target peer "+req.Target+" does not exist") return } if err := p.client.Kick(req.RoomID, target.endpoint.ID); err != nil { writeErr(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"status": "kicked", "target": req.Target}) } // handleStream is the SSE endpoint. The browser opens one EventSource per peer; // each received Event is emitted as a `data: \n\n` block. The listener is // cleaned up when the HTTP request context is cancelled (tab closed / reload). func (h *Hub) handleStream(w http.ResponseWriter, r *http.Request) { name := r.URL.Query().Get("peer") if name == "" { writeErr(w, http.StatusBadRequest, "peer query param required") return } p, ok := h.lookup(name) if !ok { writeErr(w, http.StatusBadRequest, "unknown peer "+name) return } flusher, ok := w.(http.Flusher) if !ok { writeErr(w, http.StatusInternalServerError, "streaming unsupported") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") ch := make(chan Event, 64) p.addListener(ch) defer p.removeListener(ch) // Initial comment so the browser marks the stream open immediately. fmt.Fprintf(w, ": connected to %s\n\n", name) flusher.Flush() ctx := r.Context() ping := time.NewTicker(20 * time.Second) defer ping.Stop() for { select { case <-ctx.Done(): return case <-ping.C: fmt.Fprintf(w, ": ping\n\n") flusher.Flush() case ev := <-ch: b, err := json.Marshal(ev) if err != nil { continue } fmt.Fprintf(w, "data: %s\n\n", b) flusher.Flush() } } } // --------------------------------------------------------------------------- // Benchmark: one publisher floods a room with thousands of messages that N // subscribers receive. The two policy axes are exposed as independent flags: // encrypt (AEAD payload + Ed25519 per-message signature) and persist (durable // JetStream history vs ephemeral core NATS). Payload size is configurable. The // benchmark uses its own ephemeral peers (not the hub's named peers) so it never // interferes with the manual sandbox, and streams progress samples over SSE so // the browser can animate a live throughput chart. // --------------------------------------------------------------------------- // benchSample is one Server-Sent Event of a running benchmark. type benchSample struct { Type string `json:"type"` // "start" | "sample" | "done" | "error" T float64 `json:"t"` Sent int64 `json:"sent"` Recv int64 `json:"recv"` NMsgs int `json:"n_msgs,omitempty"` NSubs int `json:"n_subs,omitempty"` Payload int `json:"payload,omitempty"` Encrypt bool `json:"encrypt,omitempty"` Persist bool `json:"persist,omitempty"` Capped bool `json:"capped,omitempty"` PubTps int64 `json:"pub_tps,omitempty"` RecvTps int64 `json:"recv_tps,omitempty"` PerSub []int64 `json:"per_sub,omitempty"` Msg string `json:"msg,omitempty"` } // runBench wires up one publisher + nSubs subscribers, publishes nMsgs payloads, // and calls emit periodically with the running totals. emit is only ever called // from the calling goroutine (the SSE handler), so it needs no locking. func runBench(ctx context.Context, emit func(benchSample), nMsgs, nSubs, payloadBytes int, encrypt, persist bool) { policy := room.Policy{Encrypt: encrypt, Persist: persist, SignMsgs: encrypt} subject := fmt.Sprintf("bench.%d", time.Now().UnixNano()) newPeer := func() (*client.Client, error) { id, err := cs.GenerateIdentity() if err != nil { return nil, err } return client.New(natsURL, ctrlURL, id) } pub, err := newPeer() if err != nil { emit(benchSample{Type: "error", Msg: "publisher: " + err.Error()}) return } defer pub.Close() roomID, err := pub.CreateRoom(subject, policy) if err != nil { emit(benchSample{Type: "error", Msg: "create room: " + err.Error()}) return } counters := make([]int64, nSubs) subClients := make([]*client.Client, 0, nSubs) defer func() { for _, c := range subClients { _ = c.Close() } }() // One room, N subscribers. For encrypted rooms each subscriber must be invited // (sealed key) and join before subscribing; for cleartext rooms Subscribe on // the shared roomID is enough. for i := 0; i < nSubs; i++ { c, err := newPeer() if err != nil { emit(benchSample{Type: "error", Msg: fmt.Sprintf("subscriber %d: %v", i, err)}) return } subClients = append(subClients, c) if encrypt { if err := pub.Invite(roomID, c.Endpoint()); err != nil { emit(benchSample{Type: "error", Msg: fmt.Sprintf("invite %d: %v", i, err)}) return } if err := c.Join(roomID); err != nil { emit(benchSample{Type: "error", Msg: fmt.Sprintf("join %d: %v", i, err)}) return } } idx := i if _, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) { atomic.AddInt64(&counters[idx], 1) }); err != nil { emit(benchSample{Type: "error", Msg: fmt.Sprintf("subscribe %d: %v", i, err)}) return } } sumRecv := func() int64 { var s int64 for i := range counters { s += atomic.LoadInt64(&counters[i]) } return s } payload := bytes.Repeat([]byte{'x'}, payloadBytes) var sent int64 emit(benchSample{Type: "start", NMsgs: nMsgs, NSubs: nSubs, Payload: payloadBytes, Encrypt: encrypt, Persist: persist}) t0 := time.Now() done := make(chan struct{}) var pubErr atomic.Value go func() { defer close(done) for k := 0; k < nMsgs; k++ { if err := pub.Publish(roomID, payload); err != nil { pubErr.Store(err) return } atomic.AddInt64(&sent, 1) if k%256 == 0 { select { case <-ctx.Done(): return default: } } } }() ticker := time.NewTicker(60 * time.Millisecond) defer ticker.Stop() deadline := time.After(120 * time.Second) target := int64(nMsgs) * int64(nSubs) sampleLoop: for { select { case <-ctx.Done(): return case <-deadline: break sampleLoop case <-done: break sampleLoop case <-ticker.C: emit(benchSample{Type: "sample", T: time.Since(t0).Seconds(), Sent: atomic.LoadInt64(&sent), Recv: sumRecv()}) } } if v := pubErr.Load(); v != nil { emit(benchSample{Type: "error", Msg: "publish: " + v.(error).Error()}) return } // Final drain: keep sampling until every subscriber has caught up (or we give up). for i := 0; i < 240; i++ { if sumRecv() >= target { break } select { case <-ctx.Done(): return case <-time.After(25 * time.Millisecond): } emit(benchSample{Type: "sample", T: time.Since(t0).Seconds(), Sent: atomic.LoadInt64(&sent), Recv: sumRecv()}) } dur := time.Since(t0).Seconds() finalSent := atomic.LoadInt64(&sent) finalRecv := sumRecv() per := make([]int64, nSubs) for i := range counters { per[i] = atomic.LoadInt64(&counters[i]) } var pubTps, recvTps int64 if dur > 0 { pubTps = int64(float64(finalSent) / dur) recvTps = int64(float64(finalRecv) / dur) } emit(benchSample{Type: "done", T: dur, Sent: finalSent, Recv: finalRecv, PerSub: per, PubTps: pubTps, RecvTps: recvTps, NSubs: nSubs}) } // handleBench is the SSE endpoint that drives a benchmark from query params: // // GET /api/bench?n_msgs=20000&n_subs=3&payload=128&encrypt=0&persist=0 // // Encrypted/persistent runs are capped to a lower message count (the per-message // crypto + JetStream ack make them far slower); the cap is reported in the start // sample so the UI can show it. func (h *Hub) handleBench(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() atoiDef := func(k string, def int) int { if v, err := strconv.Atoi(q.Get(k)); err == nil { return v } return def } truthy := func(k string) bool { v := q.Get(k); return v == "1" || v == "true" } nMsgs := atoiDef("n_msgs", 20000) nSubs := atoiDef("n_subs", 3) payload := atoiDef("payload", 128) encrypt := truthy("encrypt") persist := truthy("persist") if nSubs < 1 { nSubs = 1 } else if nSubs > 16 { nSubs = 16 } if payload < 1 { payload = 1 } else if payload > 8192 { payload = 8192 } if nMsgs < 100 { nMsgs = 100 } maxMsgs := 200000 if encrypt || persist { maxMsgs = 30000 // crypto + JetStream ack are much slower; keep the run bounded } capped := false if nMsgs > maxMsgs { nMsgs, capped = maxMsgs, true } flusher, ok := w.(http.Flusher) if !ok { writeErr(w, http.StatusInternalServerError, "streaming unsupported") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") fmt.Fprintf(w, ": bench start\n\n") flusher.Flush() emit := func(s benchSample) { if s.Type == "start" { s.Capped = capped } b, err := json.Marshal(s) if err != nil { return } fmt.Fprintf(w, "data: %s\n\n", b) flusher.Flush() } runBench(r.Context(), emit, nMsgs, nSubs, payload, encrypt, persist) fmt.Fprintf(w, "event: end\ndata: {}\n\n") flusher.Flush() } // --------------------------------------------------------------------------- // main: bring up NATS, control plane, and the web server; tear them all down // cleanly on signal. // --------------------------------------------------------------------------- func main() { log.SetFlags(log.LstdFlags | log.Lmsgprefix) log.SetPrefix("[playground] ") if err := os.MkdirAll(localFiles, 0o755); err != nil { log.Fatalf("mkdir %s: %v", localFiles, err) } // 1. Data plane: embedded NATS + JetStream on the fixed internal port. ns, err := embeddednats.Start(filepath.Join(localFiles, "js"), natsPort) if err != nil { log.Fatalf("start embedded nats: %v", err) } log.Printf("embedded NATS (JetStream) ready: %s", embeddednats.ClientURL(ns)) // 2. Control plane: membership store + blob store + internal HTTP server. store, err := membership.Open(filepath.Join(localFiles, "play.db")) if err != nil { ns.Shutdown() log.Fatalf("open membership store: %v", err) } blobs, err := blobstore.New(filepath.Join(localFiles, "blobs")) if err != nil { store.Close() ns.Shutdown() log.Fatalf("open blob store: %v", err) } ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs)} go func() { if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("control plane: %v", err) } }() if err := waitHealthy(ctrlURL+"/healthz", 5*time.Second); err != nil { log.Fatalf("control plane not healthy: %v", err) } log.Printf("control plane ready: %s", ctrlURL) // 3. Web UI on :7700. hub := newHub() mux := http.NewServeMux() mux.HandleFunc("/", hub.handleIndex) mux.HandleFunc("POST /api/peer", hub.handlePeer) mux.HandleFunc("GET /api/peers", hub.handlePeers) mux.HandleFunc("POST /api/room", hub.handleRoom) mux.HandleFunc("POST /api/join", hub.handleJoin) mux.HandleFunc("POST /api/invite", hub.handleInvite) mux.HandleFunc("POST /api/publish", hub.handlePublish) mux.HandleFunc("POST /api/kick", hub.handleKick) mux.HandleFunc("GET /api/stream", hub.handleStream) mux.HandleFunc("GET /api/bench", hub.handleBench) webSrv := &http.Server{Addr: webAddr, Handler: mux} go func() { if err := webSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("web server: %v", err) } }() log.Printf("web UI ready: http://%s", webAddr) log.Printf("open http://localhost:7700 in two browser tabs to try the bus") // 4. Graceful shutdown. stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) <-stop log.Printf("shutting down...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = webSrv.Shutdown(ctx) hub.closeAll() _ = ctrlSrv.Shutdown(ctx) store.Close() ns.Shutdown() ns.WaitForShutdown() log.Printf("bye") } // waitHealthy polls url until it returns a 2xx/3xx or the deadline elapses. func waitHealthy(url string, timeout time.Duration) error { deadline := time.Now().Add(timeout) c := &http.Client{Timeout: 500 * time.Millisecond} for time.Now().Before(deadline) { resp, err := c.Get(url) if err == nil { resp.Body.Close() if resp.StatusCode < 400 { return nil } } time.Sleep(100 * time.Millisecond) } return fmt.Errorf("timeout waiting for %s", url) }