Files
agents_and_robots/internal/api/pubsub.go
T
egutierrez 71b3b2bca9 feat(api): status ring buffer (last 100) + GET /status/recent endpoint
Bus.Publish now also appends each event to a per-topic ring buffer of
size 100. Bus.Recent(topic, n) returns the tail. New endpoint:

  GET /status/recent?n=N    → JSON array of last N status-diff events

This lets a fresh client (agents_dashboard launching cold) populate its
Status Feed panel with historical activity before subscribing to
/sse/status for live updates. Until now, new SSE subscribers only saw
events emitted AFTER they connected — making the panel useless for
recent history review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:38:55 +02:00

94 lines
2.4 KiB
Go

// Package api — in-memory pub/sub bus for SSE broadcast.
//
// TODO(v0.2): if a second consumer (e.g. from another VPS) is added,
// replace this in-memory bus with NATS or Redis pub/sub. For now
// (1 local client) the overhead of an external broker is unwarranted.
package api
import (
"sync"
)
// Event is a generic event payload (JSON-serialisable).
type Event = any
// Bus is a simple in-memory pub/sub hub.
// Topics are arbitrary strings (e.g. "status", "logs/agent-id").
// Per-topic ring buffer of recent events (default 100) lets new subscribers
// or GET endpoints fetch the recent history.
type Bus struct {
mu sync.RWMutex
subs map[string][]chan Event
recent map[string][]Event
histCap int
}
// NewBus creates an initialised Bus with a 100-event history per topic.
func NewBus() *Bus {
return &Bus{
subs: make(map[string][]chan Event),
recent: make(map[string][]Event),
histCap: 100,
}
}
// Recent returns up to n most recent events for topic (oldest first).
// n <= 0 returns the whole buffer (up to histCap).
func (b *Bus) Recent(topic string, n int) []Event {
b.mu.RLock()
defer b.mu.RUnlock()
buf := b.recent[topic]
if n <= 0 || n > len(buf) {
n = len(buf)
}
out := make([]Event, n)
copy(out, buf[len(buf)-n:])
return out
}
// Subscribe returns a channel that receives events published to topic.
// The channel is buffered (32) to avoid blocking the publisher.
func (b *Bus) Subscribe(topic string) <-chan Event {
ch := make(chan Event, 32)
b.mu.Lock()
b.subs[topic] = append(b.subs[topic], ch)
b.mu.Unlock()
return ch
}
// Unsubscribe removes ch from topic and closes it.
func (b *Bus) Unsubscribe(topic string, ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
list := b.subs[topic]
for i, c := range list {
if c == ch {
close(c)
b.subs[topic] = append(list[:i], list[i+1:]...)
return
}
}
}
// Publish sends ev to all subscribers of topic and appends to ring history.
// Non-blocking: if a subscriber channel is full, the event is dropped for that
// subscriber. History is always retained (capped at histCap).
func (b *Bus) Publish(topic string, ev Event) {
b.mu.Lock()
buf := b.recent[topic]
buf = append(buf, ev)
if len(buf) > b.histCap {
buf = buf[len(buf)-b.histCap:]
}
b.recent[topic] = buf
list := append([]chan Event(nil), b.subs[topic]...)
b.mu.Unlock()
for _, ch := range list {
select {
case ch <- ev:
default:
// drop for this slow subscriber
}
}
}