diff --git a/internal/api/handlers.go b/internal/api/handlers.go index a945266..3d5001c 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -103,6 +103,22 @@ func queryMessages24h(agentID, dataDir string) int { return count } +// --- Recent status events --- + +// handleStatusRecent returns the last N status-diff events from the bus ring +// buffer (default 100, cap 100). Lets a new client populate its Status Feed +// panel with history before subscribing to /sse/status for live updates. +func (s *Server) handleStatusRecent(w http.ResponseWriter, r *http.Request) { + n := 100 + if qn := r.URL.Query().Get("n"); qn != "" { + if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 { + n = parsed + } + } + events := s.bus.Recent("status", n) + writeJSON(w, http.StatusOK, events) +} + // --- Health --- func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/pubsub.go b/internal/api/pubsub.go index 73bb52b..bcfb1d2 100644 --- a/internal/api/pubsub.go +++ b/internal/api/pubsub.go @@ -14,14 +14,36 @@ type Event = any // Bus is a simple in-memory pub/sub hub. // Topics are arbitrary strings (e.g. "status", "logs/agent-id"). +// Per-topic ring buffer of recent events (default 100) lets new subscribers +// or GET endpoints fetch the recent history. type Bus struct { - mu sync.RWMutex - subs map[string][]chan Event + mu sync.RWMutex + subs map[string][]chan Event + recent map[string][]Event + histCap int } -// NewBus creates an initialised Bus. +// NewBus creates an initialised Bus with a 100-event history per topic. func NewBus() *Bus { - return &Bus{subs: make(map[string][]chan Event)} + return &Bus{ + subs: make(map[string][]chan Event), + recent: make(map[string][]Event), + histCap: 100, + } +} + +// Recent returns up to n most recent events for topic (oldest first). +// n <= 0 returns the whole buffer (up to histCap). +func (b *Bus) Recent(topic string, n int) []Event { + b.mu.RLock() + defer b.mu.RUnlock() + buf := b.recent[topic] + if n <= 0 || n > len(buf) { + n = len(buf) + } + out := make([]Event, n) + copy(out, buf[len(buf)-n:]) + return out } // Subscribe returns a channel that receives events published to topic. @@ -48,12 +70,19 @@ func (b *Bus) Unsubscribe(topic string, ch <-chan Event) { } } -// Publish sends ev to all subscribers of topic. -// Non-blocking: if a subscriber channel is full, the event is dropped for that subscriber. +// Publish sends ev to all subscribers of topic and appends to ring history. +// Non-blocking: if a subscriber channel is full, the event is dropped for that +// subscriber. History is always retained (capped at histCap). func (b *Bus) Publish(topic string, ev Event) { - b.mu.RLock() - list := b.subs[topic] - b.mu.RUnlock() + b.mu.Lock() + buf := b.recent[topic] + buf = append(buf, ev) + if len(buf) > b.histCap { + buf = buf[len(buf)-b.histCap:] + } + b.recent[topic] = buf + list := append([]chan Event(nil), b.subs[topic]...) + b.mu.Unlock() for _, ch := range list { select { case ch <- ev: diff --git a/internal/api/server.go b/internal/api/server.go index 543220e..9d04935 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -95,6 +95,9 @@ func (s *Server) Run(ctx context.Context) error { mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus))) mux.Handle("GET /sse/agents/{id}/logs", s.auth(http.HandlerFunc(s.handleSSEAgentLogs))) + // History endpoint: recent status-diff events from the in-memory ring buffer. + mux.Handle("GET /status/recent", s.auth(http.HandlerFunc(s.handleStatusRecent))) + addr := ":" + strconv.Itoa(s.port) ln, err := net.Listen("tcp", addr) if err != nil {