package main import ( "fmt" "net/http" "sync" "time" ) // sseHub broadcasts events keyed by run_id to N subscribers. type sseHub struct { mu sync.Mutex subscribers map[string]map[chan sseEvent]struct{} } type sseEvent struct { Event string Data string } func newSSEHub() *sseHub { return &sseHub{subscribers: make(map[string]map[chan sseEvent]struct{})} } func (h *sseHub) subscribe(runID string) chan sseEvent { ch := make(chan sseEvent, 16) h.mu.Lock() if _, ok := h.subscribers[runID]; !ok { h.subscribers[runID] = make(map[chan sseEvent]struct{}) } h.subscribers[runID][ch] = struct{}{} h.mu.Unlock() return ch } func (h *sseHub) unsubscribe(runID string, ch chan sseEvent) { h.mu.Lock() if subs, ok := h.subscribers[runID]; ok { delete(subs, ch) if len(subs) == 0 { delete(h.subscribers, runID) } } h.mu.Unlock() close(ch) } func (h *sseHub) Publish(runID string, ev sseEvent) { h.mu.Lock() subs := h.subscribers[runID] chans := make([]chan sseEvent, 0, len(subs)) for c := range subs { chans = append(chans, c) } h.mu.Unlock() for _, c := range chans { select { case c <- ev: default: // drop on slow subscriber } } } func (a *App) handleRunSSE(w http.ResponseWriter, r *http.Request, runID string) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } // Subscribe ch := a.sse.subscribe(runID) defer a.sse.unsubscribe(runID, ch) // Send initial connected event fmt.Fprintf(w, "event: connected\ndata: {\"run_id\":\"%s\"}\n\n", runID) flusher.Flush() // Heartbeat + events heartbeat := time.NewTicker(15 * time.Second) defer heartbeat.Stop() for { select { case <-r.Context().Done(): return case <-heartbeat.C: fmt.Fprintf(w, ": heartbeat\n\n") flusher.Flush() case ev, ok := <-ch: if !ok { return } if ev.Event != "" { fmt.Fprintf(w, "event: %s\n", ev.Event) } fmt.Fprintf(w, "data: %s\n\n", ev.Data) flusher.Flush() } } }