Files
agent_runner_api/sse.go
T

108 lines
2.2 KiB
Go

package main
import (
"fmt"
"net/http"
"sync"
"time"
)
// sseHub broadcasts events keyed by run_id to N subscribers.
type sseHub struct {
mu sync.Mutex
subscribers map[string]map[chan sseEvent]struct{}
}
type sseEvent struct {
Event string
Data string
}
func newSSEHub() *sseHub {
return &sseHub{subscribers: make(map[string]map[chan sseEvent]struct{})}
}
func (h *sseHub) subscribe(runID string) chan sseEvent {
ch := make(chan sseEvent, 16)
h.mu.Lock()
if _, ok := h.subscribers[runID]; !ok {
h.subscribers[runID] = make(map[chan sseEvent]struct{})
}
h.subscribers[runID][ch] = struct{}{}
h.mu.Unlock()
return ch
}
func (h *sseHub) unsubscribe(runID string, ch chan sseEvent) {
h.mu.Lock()
if subs, ok := h.subscribers[runID]; ok {
delete(subs, ch)
if len(subs) == 0 {
delete(h.subscribers, runID)
}
}
h.mu.Unlock()
close(ch)
}
func (h *sseHub) Publish(runID string, ev sseEvent) {
h.mu.Lock()
subs := h.subscribers[runID]
chans := make([]chan sseEvent, 0, len(subs))
for c := range subs {
chans = append(chans, c)
}
h.mu.Unlock()
for _, c := range chans {
select {
case c <- ev:
default:
// drop on slow subscriber
}
}
}
func (a *App) handleRunSSE(w http.ResponseWriter, r *http.Request, runID string) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
// Subscribe
ch := a.sse.subscribe(runID)
defer a.sse.unsubscribe(runID, ch)
// Send initial connected event
fmt.Fprintf(w, "event: connected\ndata: {\"run_id\":\"%s\"}\n\n", runID)
flusher.Flush()
// Heartbeat + events
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-r.Context().Done():
return
case <-heartbeat.C:
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
case ev, ok := <-ch:
if !ok {
return
}
if ev.Event != "" {
fmt.Fprintf(w, "event: %s\n", ev.Event)
}
fmt.Fprintf(w, "data: %s\n\n", ev.Data)
flusher.Flush()
}
}
}