feat: initial scaffold of agent_runner_api service Go :8486
This commit is contained in:
@@ -0,0 +1,107 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// sseHub broadcasts events keyed by run_id to N subscribers.
|
||||
type sseHub struct {
|
||||
mu sync.Mutex
|
||||
subscribers map[string]map[chan sseEvent]struct{}
|
||||
}
|
||||
|
||||
type sseEvent struct {
|
||||
Event string
|
||||
Data string
|
||||
}
|
||||
|
||||
func newSSEHub() *sseHub {
|
||||
return &sseHub{subscribers: make(map[string]map[chan sseEvent]struct{})}
|
||||
}
|
||||
|
||||
func (h *sseHub) subscribe(runID string) chan sseEvent {
|
||||
ch := make(chan sseEvent, 16)
|
||||
h.mu.Lock()
|
||||
if _, ok := h.subscribers[runID]; !ok {
|
||||
h.subscribers[runID] = make(map[chan sseEvent]struct{})
|
||||
}
|
||||
h.subscribers[runID][ch] = struct{}{}
|
||||
h.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (h *sseHub) unsubscribe(runID string, ch chan sseEvent) {
|
||||
h.mu.Lock()
|
||||
if subs, ok := h.subscribers[runID]; ok {
|
||||
delete(subs, ch)
|
||||
if len(subs) == 0 {
|
||||
delete(h.subscribers, runID)
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func (h *sseHub) Publish(runID string, ev sseEvent) {
|
||||
h.mu.Lock()
|
||||
subs := h.subscribers[runID]
|
||||
chans := make([]chan sseEvent, 0, len(subs))
|
||||
for c := range subs {
|
||||
chans = append(chans, c)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
for _, c := range chans {
|
||||
select {
|
||||
case c <- ev:
|
||||
default:
|
||||
// drop on slow subscriber
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) handleRunSSE(w http.ResponseWriter, r *http.Request, runID string) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Subscribe
|
||||
ch := a.sse.subscribe(runID)
|
||||
defer a.sse.unsubscribe(runID, ch)
|
||||
|
||||
// Send initial connected event
|
||||
fmt.Fprintf(w, "event: connected\ndata: {\"run_id\":\"%s\"}\n\n", runID)
|
||||
flusher.Flush()
|
||||
|
||||
// Heartbeat + events
|
||||
heartbeat := time.NewTicker(15 * time.Second)
|
||||
defer heartbeat.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-heartbeat.C:
|
||||
fmt.Fprintf(w, ": heartbeat\n\n")
|
||||
flusher.Flush()
|
||||
case ev, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if ev.Event != "" {
|
||||
fmt.Fprintf(w, "event: %s\n", ev.Event)
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", ev.Data)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user