feat: wire sse_client_cpp_core for live updates from /api/boards/issues/stream

This commit is contained in:
agent
2026-05-18 20:05:14 +02:00
parent 264c5939f3
commit 4f5e9f6fbe
16 changed files with 726 additions and 44 deletions
+1
View File
@@ -4,6 +4,7 @@ go 1.25.0
require (
fn-registry v0.0.0-00010101000000-000000000000
github.com/fsnotify/fsnotify v1.10.1
gopkg.in/yaml.v3 v3.0.1
)
+2
View File
@@ -13,6 +13,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.10.1 h1:b0/UzAf9yR5rhf3RPm9gf3ehBPpf0oZKIjtpKrx59Ho=
github.com/fsnotify/fsnotify v1.10.1/go.mod h1:TLheqan6HD6GBK6PrDWyDPBaEV8LspOxvPSjC+bVfgo=
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
+92
View File
@@ -163,6 +163,14 @@ func handlePatchBoardCard() http.HandlerFunc {
}
_ = PatchFrontmatterField(file, "updated", time.Now().UTC().Format("2006-01-02"))
cache.invalidate()
if globalHub != nil {
globalHub.Broadcast(ServerEvent{
Board: board,
CardID: id,
Action: "updated",
EventType: "card_changed",
})
}
infra.HTTPJSONResponse(w, http.StatusOK, map[string]any{
"ok": true,
"id": id,
@@ -243,11 +251,95 @@ func handleLaunchBoardCard() http.HandlerFunc {
}
}
// GET /api/boards/{board}/stream (text/event-stream)
//
// Long-lived SSE connection that emits one event per card change on the
// given board. Events:
// - card_added {"board","card_id","action":"created"}
// - card_changed {"board","card_id","action":"updated"}
// - card_removed {"board","card_id","action":"deleted"}
// - keepalive ts=<unix>
//
// Events for OTHER boards are filtered out (one subscription per board).
// A keepalive is emitted every 15s to prevent proxy timeouts.
func handleBoardStream() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
board := r.PathValue("board")
dir, _, _ := dirAndCacheForBoard(board)
if dir == "" {
notFound(w, "unknown board: "+board)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
if globalHub == nil {
http.Error(w, "hub not initialised", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
flusher.Flush()
ch := globalHub.Subscribe()
defer globalHub.Unsubscribe(ch)
// Honor Last-Event-ID is not supported yet (TODO: replay buffer).
_ = r.Header.Get("Last-Event-ID")
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if _, err := fmt.Fprintf(w, "event: keepalive\ndata: ts=%d\n\n", time.Now().Unix()); err != nil {
return
}
flusher.Flush()
case ev, ok := <-ch:
if !ok {
return
}
if ev.Board != board {
continue
}
evType := ev.EventType
if evType == "" {
evType = "card_changed"
}
payload, err := json.Marshal(map[string]string{
"board": ev.Board,
"card_id": ev.CardID,
"action": ev.Action,
})
if err != nil {
continue
}
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", evType, payload); err != nil {
return
}
flusher.Flush()
}
}
}
}
// boardRoutes returns the additional routes for issues/flows boards. Called
// from apiRoutes() in handlers.go.
func boardRoutes() []infra.Route {
return []infra.Route{
{Method: "GET", Path: "/api/boards/{board}/cards", Handler: handleListBoardCards()},
{Method: "GET", Path: "/api/boards/{board}/stream", Handler: handleBoardStream()},
{Method: "PATCH", Path: "/api/boards/{board}/cards/{id}", Handler: handlePatchBoardCard()},
{Method: "POST", Path: "/api/boards/{board}/cards/{id}/launch", Handler: handleLaunchBoardCard()},
}
+4
View File
@@ -37,6 +37,10 @@ func main() {
}
defer db.Close()
// SSE: hub + fsnotify watcher for dev/issues + dev/flows.
globalHub = NewHub()
startBoardsWatcher(globalHub)
mux := infra.HTTPRouter(apiRoutes(db, &featureFlags))
mux.HandleFunc("/health", handleHealth(*port))
+78
View File
@@ -0,0 +1,78 @@
package main
import (
"sync"
)
// ServerEvent is a board-scoped event broadcast to all SSE subscribers
// of a given board. It is emitted both by the fsnotify watcher (file
// changes on disk under dev/issues or dev/flows) and by handlers that
// mutate cards (PATCH /api/boards/{board}/cards/{id}) so the C++ client
// updates in real time without polling.
type ServerEvent struct {
Board string `json:"board"` // "issues" | "flows"
CardID string `json:"card_id"` // canonical id, e.g. "0119"
Action string `json:"action"` // "created" | "updated" | "deleted"
EventType string `json:"-"` // SSE "event:" field. Empty → "card_changed"
}
// Hub fans out ServerEvent messages to N concurrent subscribers. Each
// subscriber gets a buffered channel; if the channel is full, the event
// is dropped for THAT subscriber (slow consumer must reconnect to get a
// fresh snapshot). Hub itself is safe for concurrent use.
type Hub struct {
mu sync.RWMutex
clients map[chan ServerEvent]struct{}
}
// NewHub returns an empty Hub ready to use.
func NewHub() *Hub {
return &Hub{
clients: make(map[chan ServerEvent]struct{}),
}
}
// Subscribe registers a new subscriber. The returned channel is buffered
// (16) so a brief stall on the consumer side doesn't block the producer.
func (h *Hub) Subscribe() chan ServerEvent {
ch := make(chan ServerEvent, 16)
h.mu.Lock()
h.clients[ch] = struct{}{}
h.mu.Unlock()
return ch
}
// Unsubscribe removes a subscriber and closes its channel. Idempotent.
func (h *Hub) Unsubscribe(ch chan ServerEvent) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.clients[ch]; !ok {
return
}
delete(h.clients, ch)
close(ch)
}
// Broadcast sends ev to every current subscriber. Non-blocking: if a
// subscriber's channel is full the event is dropped for that subscriber.
func (h *Hub) Broadcast(ev ServerEvent) {
h.mu.RLock()
defer h.mu.RUnlock()
for ch := range h.clients {
select {
case ch <- ev:
default:
// slow consumer: drop
}
}
}
// Count returns the current number of subscribers (test/diagnostic).
func (h *Hub) Count() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
// globalHub is initialised in main() and consumed by handlers + watcher.
var globalHub *Hub
+73
View File
@@ -0,0 +1,73 @@
package main
import (
"testing"
"time"
)
func TestHub_BroadcastReachesSubscriber(t *testing.T) {
h := NewHub()
ch := h.Subscribe()
defer h.Unsubscribe(ch)
want := ServerEvent{Board: "issues", CardID: "0119", Action: "updated"}
h.Broadcast(want)
select {
case got := <-ch:
if got != want {
t.Fatalf("got %+v, want %+v", got, want)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}
func TestHub_UnsubscribeStopsDelivery(t *testing.T) {
h := NewHub()
ch := h.Subscribe()
if got := h.Count(); got != 1 {
t.Fatalf("Count() = %d, want 1", got)
}
h.Unsubscribe(ch)
if got := h.Count(); got != 0 {
t.Fatalf("Count() after Unsubscribe = %d, want 0", got)
}
// channel should be closed
if _, ok := <-ch; ok {
t.Fatalf("expected closed channel after Unsubscribe")
}
// double-unsubscribe is a no-op
h.Unsubscribe(ch)
// broadcast should not panic and should reach nobody
h.Broadcast(ServerEvent{Board: "issues", CardID: "x", Action: "updated"})
}
func TestHub_MultipleSubscribersAllReceive(t *testing.T) {
h := NewHub()
const n = 5
chans := make([]chan ServerEvent, n)
for i := range chans {
chans[i] = h.Subscribe()
}
defer func() {
for _, ch := range chans {
h.Unsubscribe(ch)
}
}()
want := ServerEvent{Board: "flows", CardID: "abc", Action: "created", EventType: "card_added"}
h.Broadcast(want)
for i, ch := range chans {
select {
case got := <-ch:
if got != want {
t.Fatalf("sub %d: got %+v, want %+v", i, got, want)
}
case <-time.After(time.Second):
t.Fatalf("sub %d: timeout", i)
}
}
}
+191
View File
@@ -0,0 +1,191 @@
package main
import (
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
)
// startBoardsWatcher launches a goroutine that watches dev/issues and
// dev/flows (recursively, one level deep — completed/ subdir) for .md
// changes and broadcasts ServerEvent messages via the hub. It also
// invalidates the relevant cardsCache so the next /cards GET reflects
// disk state.
//
// On fsnotify errors the watcher logs + retries every 30s.
func startBoardsWatcher(hub *Hub) {
go func() {
for {
if err := runBoardsWatcher(hub); err != nil {
log.Printf("sse_watcher: %v — retrying in 30s", err)
time.Sleep(30 * time.Second)
}
}
}()
}
func runBoardsWatcher(hub *Hub) error {
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer w.Close()
roots := map[string]string{
"issues": issuesDir(),
"flows": flowsDir(),
}
for board, dir := range roots {
if err := addRecursive(w, dir); err != nil {
log.Printf("sse_watcher: watch %s (%s): %v", board, dir, err)
} else {
log.Printf("sse_watcher: watching %s -> %s", board, dir)
}
}
for {
select {
case ev, ok := <-w.Events:
if !ok {
return nil
}
handleFsEvent(hub, ev)
case err, ok := <-w.Errors:
if !ok {
return nil
}
log.Printf("sse_watcher: fsnotify error: %v", err)
}
}
}
// addRecursive adds dir and its immediate subdirectories (e.g.
// dev/issues/completed/) to the watcher. We do NOT follow symlinks.
func addRecursive(w *fsnotify.Watcher, root string) error {
if err := w.Add(root); err != nil {
return err
}
entries, err := readDirNoSymlink(root)
if err != nil {
return nil // root added, subdirs best-effort
}
for _, name := range entries {
full := filepath.Join(root, name)
// best-effort, ignore errors on subdirs
_ = w.Add(full)
}
return nil
}
// handleFsEvent translates an fsnotify event into a ServerEvent (if
// applicable) and broadcasts it. Non-md files and skipped names
// (README/INDEX/...) are ignored.
func handleFsEvent(hub *Hub, ev fsnotify.Event) {
board, cardID, action := classifyEvent(ev)
if board == "" {
return
}
// Invalidate the right cache so the next GET re-scans disk.
switch board {
case "issues":
if issuesCache != nil {
issuesCache.invalidate()
}
case "flows":
if flowsCache != nil {
flowsCache.invalidate()
}
}
if hub == nil {
return
}
hub.Broadcast(ServerEvent{
Board: board,
CardID: cardID,
Action: action,
EventType: sseEventForAction(action),
})
}
// classifyEvent inspects a raw fsnotify event and returns
// (board, cardID, action) — board is "" if the event is irrelevant.
func classifyEvent(ev fsnotify.Event) (board, cardID, action string) {
name := filepath.Base(ev.Name)
if !strings.HasSuffix(strings.ToLower(name), ".md") {
return "", "", ""
}
if isSkippedMarkdown(name) {
return "", "", ""
}
// Determine board from the path.
lower := strings.ToLower(filepath.ToSlash(ev.Name))
switch {
case strings.Contains(lower, "/dev/issues/"):
board = "issues"
case strings.Contains(lower, "/dev/flows/"):
board = "flows"
default:
return "", "", ""
}
cardID = deriveIDFromFilename(name)
switch {
case ev.Op&fsnotify.Create != 0:
action = "created"
case ev.Op&fsnotify.Remove != 0:
action = "deleted"
case ev.Op&fsnotify.Rename != 0:
// Treat rename as updated — the file likely moved between
// canonical dir and completed/.
action = "updated"
case ev.Op&fsnotify.Write != 0:
action = "updated"
default:
return "", "", ""
}
return board, cardID, action
}
func sseEventForAction(action string) string {
switch action {
case "created":
return "card_added"
case "deleted":
return "card_removed"
default:
return "card_changed"
}
}
// readDirNoSymlink lists subdirectory names under root, skipping symlinks
// and hidden entries.
func readDirNoSymlink(root string) ([]string, error) {
entries, err := os.ReadDir(root)
if err != nil {
return nil, err
}
out := []string{}
for _, e := range entries {
if !e.IsDir() {
continue
}
name := e.Name()
if strings.HasPrefix(name, ".") {
continue
}
// Detect symlinks: lstat the full path.
full := filepath.Join(root, name)
info, err := os.Lstat(full)
if err != nil {
continue
}
if info.Mode()&os.ModeSymlink != 0 {
continue
}
out = append(out, name)
}
return out, nil
}
+126
View File
@@ -0,0 +1,126 @@
package main
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/fsnotify/fsnotify"
)
func TestWatcher_PathToEvent_IssuesCreate(t *testing.T) {
ev := fsnotify.Event{
Name: "/home/x/fn_registry/dev/issues/0119-frontmatter-migration.md",
Op: fsnotify.Create,
}
board, id, action := classifyEvent(ev)
if board != "issues" || id != "0119" || action != "created" {
t.Fatalf("got (%q,%q,%q), want (issues,0119,created)", board, id, action)
}
if sseEventForAction(action) != "card_added" {
t.Fatalf("sseEventForAction(created) != card_added")
}
}
func TestWatcher_PathToEvent_FlowsRename(t *testing.T) {
ev := fsnotify.Event{
Name: "/x/dev/flows/0042-deploy-vps.md",
Op: fsnotify.Rename,
}
board, id, action := classifyEvent(ev)
if board != "flows" || id != "0042" || action != "updated" {
t.Fatalf("got (%q,%q,%q), want (flows,0042,updated)", board, id, action)
}
}
func TestWatcher_PathToEvent_Remove(t *testing.T) {
ev := fsnotify.Event{
Name: "/x/dev/issues/0050-foo.md",
Op: fsnotify.Remove,
}
board, id, action := classifyEvent(ev)
if board != "issues" || id != "0050" || action != "deleted" {
t.Fatalf("got (%q,%q,%q), want (issues,0050,deleted)", board, id, action)
}
if sseEventForAction(action) != "card_removed" {
t.Fatalf("sseEventForAction(deleted) != card_removed")
}
}
func TestWatcher_PathToEvent_SkippedNames(t *testing.T) {
cases := []string{
"/x/dev/issues/README.md",
"/x/dev/issues/INDEX.md",
"/x/dev/issues/AGENT_GUIDE.md",
"/x/dev/issues/notes.txt", // not .md
"/x/somewhere-else/0001-foo.md", // not under dev/issues|flows
}
for _, p := range cases {
ev := fsnotify.Event{Name: p, Op: fsnotify.Create}
if board, _, _ := classifyEvent(ev); board != "" {
t.Fatalf("expected ignored, got board=%q for %s", board, p)
}
}
}
func TestWatcher_DetectsWrite(t *testing.T) {
// Build a temp tree that *looks like* dev/issues/ so classifyEvent
// will accept it (it matches by path substring "/dev/issues/").
root := t.TempDir()
issuesDirPath := filepath.Join(root, "dev", "issues")
if err := os.MkdirAll(issuesDirPath, 0o755); err != nil {
t.Fatalf("mkdir: %v", err)
}
w, err := fsnotify.NewWatcher()
if err != nil {
t.Fatalf("new watcher: %v", err)
}
defer w.Close()
if err := w.Add(issuesDirPath); err != nil {
t.Fatalf("watch add: %v", err)
}
hub := NewHub()
ch := hub.Subscribe()
defer hub.Unsubscribe(ch)
// Drive events in a goroutine via handleFsEvent so we exercise the
// full pipeline (classify -> broadcast).
done := make(chan struct{})
go func() {
defer close(done)
for {
select {
case ev, ok := <-w.Events:
if !ok {
return
}
handleFsEvent(hub, ev)
case <-w.Errors:
case <-time.After(2 * time.Second):
return
}
}
}()
cardPath := filepath.Join(issuesDirPath, "0999-test.md")
if err := os.WriteFile(cardPath, []byte("---\nstatus: pendiente\n---\n"), 0o644); err != nil {
t.Fatalf("write: %v", err)
}
select {
case ev := <-ch:
if ev.Board != "issues" || ev.CardID != "0999" {
t.Fatalf("unexpected event: %+v", ev)
}
if ev.Action != "created" && ev.Action != "updated" {
t.Fatalf("expected created/updated, got action=%q", ev.Action)
}
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for write event")
}
<-done
}