From 4f5e9f6fbec09b118ffc4a4aeb3f970fc1995b4a Mon Sep 17 00:00:00 2001 From: agent Date: Mon, 18 May 2026 20:05:14 +0200 Subject: [PATCH] feat: wire sse_client_cpp_core for live updates from /api/boards/issues/stream --- CMakeLists.txt | 1 + app.md | 1 + backend/go.mod | 1 + backend/go.sum | 2 + backend/handlers_boards.go | 92 +++++++++++++++++ backend/main.go | 4 + backend/sse_hub.go | 78 +++++++++++++++ backend/sse_hub_test.go | 73 ++++++++++++++ backend/sse_watcher.go | 191 ++++++++++++++++++++++++++++++++++++ backend/sse_watcher_test.go | 126 ++++++++++++++++++++++++ data.cpp | 43 ++++---- main.cpp | 42 +++++++- panel_board.cpp | 88 +++++++++++++---- panel_calendar.cpp | 9 +- panel_dashboard.cpp | 11 ++- panels.h | 8 ++ 16 files changed, 726 insertions(+), 44 deletions(-) create mode 100644 backend/sse_hub.go create mode 100644 backend/sse_hub_test.go create mode 100644 backend/sse_watcher.go create mode 100644 backend/sse_watcher_test.go diff --git a/CMakeLists.txt b/CMakeLists.txt index d406812..f83a41e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,6 +9,7 @@ add_imgui_app(kanban_cpp panel_dod.cpp # Registry functions consumed (see app.md::uses_functions) ${CMAKE_SOURCE_DIR}/functions/core/http_request.cpp + ${CMAKE_SOURCE_DIR}/functions/core/sse_client.cpp ${CMAKE_SOURCE_DIR}/functions/viz/kpi_card.cpp ${CMAKE_SOURCE_DIR}/functions/viz/sparkline.cpp ${CMAKE_SOURCE_DIR}/functions/viz/agent_runs_timeline.cpp diff --git a/app.md b/app.md index ef80e76..cbf891a 100644 --- a/app.md +++ b/app.md @@ -10,6 +10,7 @@ icon: accent: "#a855f7" uses_functions: - http_request_cpp_core + - sse_client_cpp_core - dod_evidence_panel_cpp_viz - agent_runs_timeline_cpp_viz - kpi_card_cpp_viz diff --git a/backend/go.mod b/backend/go.mod index a4ff32c..d0708ef 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -4,6 +4,7 @@ go 1.25.0 require ( fn-registry v0.0.0-00010101000000-000000000000 + github.com/fsnotify/fsnotify v1.10.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/backend/go.sum b/backend/go.sum index 6cc7880..b6f8eaa 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -13,6 +13,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.10.1 h1:b0/UzAf9yR5rhf3RPm9gf3ehBPpf0oZKIjtpKrx59Ho= +github.com/fsnotify/fsnotify v1.10.1/go.mod h1:TLheqan6HD6GBK6PrDWyDPBaEV8LspOxvPSjC+bVfgo= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= diff --git a/backend/handlers_boards.go b/backend/handlers_boards.go index 9939e9f..6d0beb4 100644 --- a/backend/handlers_boards.go +++ b/backend/handlers_boards.go @@ -163,6 +163,14 @@ func handlePatchBoardCard() http.HandlerFunc { } _ = PatchFrontmatterField(file, "updated", time.Now().UTC().Format("2006-01-02")) cache.invalidate() + if globalHub != nil { + globalHub.Broadcast(ServerEvent{ + Board: board, + CardID: id, + Action: "updated", + EventType: "card_changed", + }) + } infra.HTTPJSONResponse(w, http.StatusOK, map[string]any{ "ok": true, "id": id, @@ -243,11 +251,95 @@ func handleLaunchBoardCard() http.HandlerFunc { } } +// GET /api/boards/{board}/stream (text/event-stream) +// +// Long-lived SSE connection that emits one event per card change on the +// given board. Events: +// - card_added {"board","card_id","action":"created"} +// - card_changed {"board","card_id","action":"updated"} +// - card_removed {"board","card_id","action":"deleted"} +// - keepalive ts= +// +// Events for OTHER boards are filtered out (one subscription per board). +// A keepalive is emitted every 15s to prevent proxy timeouts. +func handleBoardStream() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + board := r.PathValue("board") + dir, _, _ := dirAndCacheForBoard(board) + if dir == "" { + notFound(w, "unknown board: "+board) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + if globalHub == nil { + http.Error(w, "hub not initialised", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + ch := globalHub.Subscribe() + defer globalHub.Unsubscribe(ch) + + // Honor Last-Event-ID is not supported yet (TODO: replay buffer). + _ = r.Header.Get("Last-Event-ID") + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, err := fmt.Fprintf(w, "event: keepalive\ndata: ts=%d\n\n", time.Now().Unix()); err != nil { + return + } + flusher.Flush() + case ev, ok := <-ch: + if !ok { + return + } + if ev.Board != board { + continue + } + evType := ev.EventType + if evType == "" { + evType = "card_changed" + } + payload, err := json.Marshal(map[string]string{ + "board": ev.Board, + "card_id": ev.CardID, + "action": ev.Action, + }) + if err != nil { + continue + } + if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", evType, payload); err != nil { + return + } + flusher.Flush() + } + } + } +} + // boardRoutes returns the additional routes for issues/flows boards. Called // from apiRoutes() in handlers.go. func boardRoutes() []infra.Route { return []infra.Route{ {Method: "GET", Path: "/api/boards/{board}/cards", Handler: handleListBoardCards()}, + {Method: "GET", Path: "/api/boards/{board}/stream", Handler: handleBoardStream()}, {Method: "PATCH", Path: "/api/boards/{board}/cards/{id}", Handler: handlePatchBoardCard()}, {Method: "POST", Path: "/api/boards/{board}/cards/{id}/launch", Handler: handleLaunchBoardCard()}, } diff --git a/backend/main.go b/backend/main.go index 73b7167..fec236b 100644 --- a/backend/main.go +++ b/backend/main.go @@ -37,6 +37,10 @@ func main() { } defer db.Close() + // SSE: hub + fsnotify watcher for dev/issues + dev/flows. + globalHub = NewHub() + startBoardsWatcher(globalHub) + mux := infra.HTTPRouter(apiRoutes(db, &featureFlags)) mux.HandleFunc("/health", handleHealth(*port)) diff --git a/backend/sse_hub.go b/backend/sse_hub.go new file mode 100644 index 0000000..f23799f --- /dev/null +++ b/backend/sse_hub.go @@ -0,0 +1,78 @@ +package main + +import ( + "sync" +) + +// ServerEvent is a board-scoped event broadcast to all SSE subscribers +// of a given board. It is emitted both by the fsnotify watcher (file +// changes on disk under dev/issues or dev/flows) and by handlers that +// mutate cards (PATCH /api/boards/{board}/cards/{id}) so the C++ client +// updates in real time without polling. +type ServerEvent struct { + Board string `json:"board"` // "issues" | "flows" + CardID string `json:"card_id"` // canonical id, e.g. "0119" + Action string `json:"action"` // "created" | "updated" | "deleted" + EventType string `json:"-"` // SSE "event:" field. Empty → "card_changed" +} + +// Hub fans out ServerEvent messages to N concurrent subscribers. Each +// subscriber gets a buffered channel; if the channel is full, the event +// is dropped for THAT subscriber (slow consumer must reconnect to get a +// fresh snapshot). Hub itself is safe for concurrent use. +type Hub struct { + mu sync.RWMutex + clients map[chan ServerEvent]struct{} +} + +// NewHub returns an empty Hub ready to use. +func NewHub() *Hub { + return &Hub{ + clients: make(map[chan ServerEvent]struct{}), + } +} + +// Subscribe registers a new subscriber. The returned channel is buffered +// (16) so a brief stall on the consumer side doesn't block the producer. +func (h *Hub) Subscribe() chan ServerEvent { + ch := make(chan ServerEvent, 16) + h.mu.Lock() + h.clients[ch] = struct{}{} + h.mu.Unlock() + return ch +} + +// Unsubscribe removes a subscriber and closes its channel. Idempotent. +func (h *Hub) Unsubscribe(ch chan ServerEvent) { + h.mu.Lock() + defer h.mu.Unlock() + if _, ok := h.clients[ch]; !ok { + return + } + delete(h.clients, ch) + close(ch) +} + +// Broadcast sends ev to every current subscriber. Non-blocking: if a +// subscriber's channel is full the event is dropped for that subscriber. +func (h *Hub) Broadcast(ev ServerEvent) { + h.mu.RLock() + defer h.mu.RUnlock() + for ch := range h.clients { + select { + case ch <- ev: + default: + // slow consumer: drop + } + } +} + +// Count returns the current number of subscribers (test/diagnostic). +func (h *Hub) Count() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +// globalHub is initialised in main() and consumed by handlers + watcher. +var globalHub *Hub diff --git a/backend/sse_hub_test.go b/backend/sse_hub_test.go new file mode 100644 index 0000000..ab809f0 --- /dev/null +++ b/backend/sse_hub_test.go @@ -0,0 +1,73 @@ +package main + +import ( + "testing" + "time" +) + +func TestHub_BroadcastReachesSubscriber(t *testing.T) { + h := NewHub() + ch := h.Subscribe() + defer h.Unsubscribe(ch) + + want := ServerEvent{Board: "issues", CardID: "0119", Action: "updated"} + h.Broadcast(want) + + select { + case got := <-ch: + if got != want { + t.Fatalf("got %+v, want %+v", got, want) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for broadcast") + } +} + +func TestHub_UnsubscribeStopsDelivery(t *testing.T) { + h := NewHub() + ch := h.Subscribe() + if got := h.Count(); got != 1 { + t.Fatalf("Count() = %d, want 1", got) + } + h.Unsubscribe(ch) + if got := h.Count(); got != 0 { + t.Fatalf("Count() after Unsubscribe = %d, want 0", got) + } + // channel should be closed + if _, ok := <-ch; ok { + t.Fatalf("expected closed channel after Unsubscribe") + } + // double-unsubscribe is a no-op + h.Unsubscribe(ch) + + // broadcast should not panic and should reach nobody + h.Broadcast(ServerEvent{Board: "issues", CardID: "x", Action: "updated"}) +} + +func TestHub_MultipleSubscribersAllReceive(t *testing.T) { + h := NewHub() + const n = 5 + chans := make([]chan ServerEvent, n) + for i := range chans { + chans[i] = h.Subscribe() + } + defer func() { + for _, ch := range chans { + h.Unsubscribe(ch) + } + }() + + want := ServerEvent{Board: "flows", CardID: "abc", Action: "created", EventType: "card_added"} + h.Broadcast(want) + + for i, ch := range chans { + select { + case got := <-ch: + if got != want { + t.Fatalf("sub %d: got %+v, want %+v", i, got, want) + } + case <-time.After(time.Second): + t.Fatalf("sub %d: timeout", i) + } + } +} diff --git a/backend/sse_watcher.go b/backend/sse_watcher.go new file mode 100644 index 0000000..01df539 --- /dev/null +++ b/backend/sse_watcher.go @@ -0,0 +1,191 @@ +package main + +import ( + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/fsnotify/fsnotify" +) + +// startBoardsWatcher launches a goroutine that watches dev/issues and +// dev/flows (recursively, one level deep — completed/ subdir) for .md +// changes and broadcasts ServerEvent messages via the hub. It also +// invalidates the relevant cardsCache so the next /cards GET reflects +// disk state. +// +// On fsnotify errors the watcher logs + retries every 30s. +func startBoardsWatcher(hub *Hub) { + go func() { + for { + if err := runBoardsWatcher(hub); err != nil { + log.Printf("sse_watcher: %v — retrying in 30s", err) + time.Sleep(30 * time.Second) + } + } + }() +} + +func runBoardsWatcher(hub *Hub) error { + w, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer w.Close() + + roots := map[string]string{ + "issues": issuesDir(), + "flows": flowsDir(), + } + for board, dir := range roots { + if err := addRecursive(w, dir); err != nil { + log.Printf("sse_watcher: watch %s (%s): %v", board, dir, err) + } else { + log.Printf("sse_watcher: watching %s -> %s", board, dir) + } + } + + for { + select { + case ev, ok := <-w.Events: + if !ok { + return nil + } + handleFsEvent(hub, ev) + case err, ok := <-w.Errors: + if !ok { + return nil + } + log.Printf("sse_watcher: fsnotify error: %v", err) + } + } +} + +// addRecursive adds dir and its immediate subdirectories (e.g. +// dev/issues/completed/) to the watcher. We do NOT follow symlinks. +func addRecursive(w *fsnotify.Watcher, root string) error { + if err := w.Add(root); err != nil { + return err + } + entries, err := readDirNoSymlink(root) + if err != nil { + return nil // root added, subdirs best-effort + } + for _, name := range entries { + full := filepath.Join(root, name) + // best-effort, ignore errors on subdirs + _ = w.Add(full) + } + return nil +} + +// handleFsEvent translates an fsnotify event into a ServerEvent (if +// applicable) and broadcasts it. Non-md files and skipped names +// (README/INDEX/...) are ignored. +func handleFsEvent(hub *Hub, ev fsnotify.Event) { + board, cardID, action := classifyEvent(ev) + if board == "" { + return + } + // Invalidate the right cache so the next GET re-scans disk. + switch board { + case "issues": + if issuesCache != nil { + issuesCache.invalidate() + } + case "flows": + if flowsCache != nil { + flowsCache.invalidate() + } + } + if hub == nil { + return + } + hub.Broadcast(ServerEvent{ + Board: board, + CardID: cardID, + Action: action, + EventType: sseEventForAction(action), + }) +} + +// classifyEvent inspects a raw fsnotify event and returns +// (board, cardID, action) — board is "" if the event is irrelevant. +func classifyEvent(ev fsnotify.Event) (board, cardID, action string) { + name := filepath.Base(ev.Name) + if !strings.HasSuffix(strings.ToLower(name), ".md") { + return "", "", "" + } + if isSkippedMarkdown(name) { + return "", "", "" + } + // Determine board from the path. + lower := strings.ToLower(filepath.ToSlash(ev.Name)) + switch { + case strings.Contains(lower, "/dev/issues/"): + board = "issues" + case strings.Contains(lower, "/dev/flows/"): + board = "flows" + default: + return "", "", "" + } + cardID = deriveIDFromFilename(name) + switch { + case ev.Op&fsnotify.Create != 0: + action = "created" + case ev.Op&fsnotify.Remove != 0: + action = "deleted" + case ev.Op&fsnotify.Rename != 0: + // Treat rename as updated — the file likely moved between + // canonical dir and completed/. + action = "updated" + case ev.Op&fsnotify.Write != 0: + action = "updated" + default: + return "", "", "" + } + return board, cardID, action +} + +func sseEventForAction(action string) string { + switch action { + case "created": + return "card_added" + case "deleted": + return "card_removed" + default: + return "card_changed" + } +} + +// readDirNoSymlink lists subdirectory names under root, skipping symlinks +// and hidden entries. +func readDirNoSymlink(root string) ([]string, error) { + entries, err := os.ReadDir(root) + if err != nil { + return nil, err + } + out := []string{} + for _, e := range entries { + if !e.IsDir() { + continue + } + name := e.Name() + if strings.HasPrefix(name, ".") { + continue + } + // Detect symlinks: lstat the full path. + full := filepath.Join(root, name) + info, err := os.Lstat(full) + if err != nil { + continue + } + if info.Mode()&os.ModeSymlink != 0 { + continue + } + out = append(out, name) + } + return out, nil +} diff --git a/backend/sse_watcher_test.go b/backend/sse_watcher_test.go new file mode 100644 index 0000000..3250a72 --- /dev/null +++ b/backend/sse_watcher_test.go @@ -0,0 +1,126 @@ +package main + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/fsnotify/fsnotify" +) + +func TestWatcher_PathToEvent_IssuesCreate(t *testing.T) { + ev := fsnotify.Event{ + Name: "/home/x/fn_registry/dev/issues/0119-frontmatter-migration.md", + Op: fsnotify.Create, + } + board, id, action := classifyEvent(ev) + if board != "issues" || id != "0119" || action != "created" { + t.Fatalf("got (%q,%q,%q), want (issues,0119,created)", board, id, action) + } + if sseEventForAction(action) != "card_added" { + t.Fatalf("sseEventForAction(created) != card_added") + } +} + +func TestWatcher_PathToEvent_FlowsRename(t *testing.T) { + ev := fsnotify.Event{ + Name: "/x/dev/flows/0042-deploy-vps.md", + Op: fsnotify.Rename, + } + board, id, action := classifyEvent(ev) + if board != "flows" || id != "0042" || action != "updated" { + t.Fatalf("got (%q,%q,%q), want (flows,0042,updated)", board, id, action) + } +} + +func TestWatcher_PathToEvent_Remove(t *testing.T) { + ev := fsnotify.Event{ + Name: "/x/dev/issues/0050-foo.md", + Op: fsnotify.Remove, + } + board, id, action := classifyEvent(ev) + if board != "issues" || id != "0050" || action != "deleted" { + t.Fatalf("got (%q,%q,%q), want (issues,0050,deleted)", board, id, action) + } + if sseEventForAction(action) != "card_removed" { + t.Fatalf("sseEventForAction(deleted) != card_removed") + } +} + +func TestWatcher_PathToEvent_SkippedNames(t *testing.T) { + cases := []string{ + "/x/dev/issues/README.md", + "/x/dev/issues/INDEX.md", + "/x/dev/issues/AGENT_GUIDE.md", + "/x/dev/issues/notes.txt", // not .md + "/x/somewhere-else/0001-foo.md", // not under dev/issues|flows + } + for _, p := range cases { + ev := fsnotify.Event{Name: p, Op: fsnotify.Create} + if board, _, _ := classifyEvent(ev); board != "" { + t.Fatalf("expected ignored, got board=%q for %s", board, p) + } + } +} + +func TestWatcher_DetectsWrite(t *testing.T) { + // Build a temp tree that *looks like* dev/issues/ so classifyEvent + // will accept it (it matches by path substring "/dev/issues/"). + root := t.TempDir() + issuesDirPath := filepath.Join(root, "dev", "issues") + if err := os.MkdirAll(issuesDirPath, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + + w, err := fsnotify.NewWatcher() + if err != nil { + t.Fatalf("new watcher: %v", err) + } + defer w.Close() + if err := w.Add(issuesDirPath); err != nil { + t.Fatalf("watch add: %v", err) + } + + hub := NewHub() + ch := hub.Subscribe() + defer hub.Unsubscribe(ch) + + // Drive events in a goroutine via handleFsEvent so we exercise the + // full pipeline (classify -> broadcast). + done := make(chan struct{}) + go func() { + defer close(done) + for { + select { + case ev, ok := <-w.Events: + if !ok { + return + } + handleFsEvent(hub, ev) + case <-w.Errors: + case <-time.After(2 * time.Second): + return + } + } + }() + + cardPath := filepath.Join(issuesDirPath, "0999-test.md") + if err := os.WriteFile(cardPath, []byte("---\nstatus: pendiente\n---\n"), 0o644); err != nil { + t.Fatalf("write: %v", err) + } + + select { + case ev := <-ch: + if ev.Board != "issues" || ev.CardID != "0999" { + t.Fatalf("unexpected event: %+v", ev) + } + if ev.Action != "created" && ev.Action != "updated" { + t.Fatalf("expected created/updated, got action=%q", ev.Action) + } + case <-time.After(3 * time.Second): + t.Fatal("timeout waiting for write event") + } + + <-done +} diff --git a/data.cpp b/data.cpp index 32edd1a..26d6f58 100644 --- a/data.cpp +++ b/data.cpp @@ -94,12 +94,25 @@ fn_http::Response do_post_json(const std::string& url, const std::string& body, } // namespace bool health(const ClientConfig& cfg) { - auto r = do_get(cfg.base_url + "/health", cfg.timeout_ms); + // /health legacy tiene auth middleware → 500. Usar endpoint sync layer + // (issue 0119) sin auth como ping. + auto r = do_get(cfg.base_url + "/api/boards/issues/cards", cfg.timeout_ms); return r.status >= 200 && r.status < 300; } +static std::string status_to_column(const std::string& s) { + if (s == "pendiente" || s == "pending") return "backlog"; + if (s == "en-curso" || s == "in-progress") return "doing"; + if (s == "en-revision" || s == "review") return "review"; + if (s == "done" || s == "completado") return "done"; + if (s == "deferred") return "deferred"; + return "backlog"; +} + std::vector list_cards(const ClientConfig& cfg, std::string& err) { - auto r = do_get(cfg.base_url + "/api/cards", cfg.timeout_ms); + // Issue 0119 sync layer: cards = issues + flows. Aqui solo issues; flows + // viven en su propio tab/panel cuando se anada. + auto r = do_get(cfg.base_url + "/api/boards/issues/cards", cfg.timeout_ms); if (r.status == 0) { err = "transport: " + r.error; return {}; } if (r.status >= 400) { err = "http " + std::to_string(r.status); return {}; } std::vector out; @@ -108,9 +121,9 @@ std::vector list_cards(const ClientConfig& cfg, std::string& err) { c.id = find_str_field(obj, "id"); c.title = find_str_field(obj, "title"); c.description = find_str_field(obj, "description"); - c.column_id = find_str_field(obj, "column_id"); - c.priority = find_str_field(obj, "priority"); c.status = find_str_field(obj, "status"); + c.column_id = status_to_column(c.status); + c.priority = find_str_field(obj, "priority"); c.position = find_int_field(obj, "position"); c.due_date = find_int_field(obj, "due_date"); c.assignee = find_str_field(obj, "assignee"); @@ -119,19 +132,15 @@ std::vector list_cards(const ClientConfig& cfg, std::string& err) { return out; } -std::vector list_columns(const ClientConfig& cfg, std::string& err) { - auto r = do_get(cfg.base_url + "/api/columns", cfg.timeout_ms); - if (r.status == 0) { err = "transport: " + r.error; return {}; } - if (r.status >= 400) { err = "http " + std::to_string(r.status); return {}; } - std::vector out; - for (const auto& obj : split_objects(r.body)) { - Column c; - c.id = find_str_field(obj, "id"); - c.name = find_str_field(obj, "name"); - c.order = static_cast(find_int_field(obj, "order")); - if (!c.id.empty()) out.push_back(c); - } - return out; +std::vector list_columns(const ClientConfig& /*cfg*/, std::string& /*err*/) { + // Columnas fijas derivadas de taxonomia (issue 0103). + return { + {"backlog", "Backlog", 0}, + {"doing", "Doing", 1}, + {"review", "Review", 2}, + {"done", "Done", 3}, + {"deferred", "Deferred", 4}, + }; } bool move_card(const ClientConfig& cfg, const std::string& card_id, diff --git a/main.cpp b/main.cpp index 40888b2..4ae0086 100644 --- a/main.cpp +++ b/main.cpp @@ -6,12 +6,16 @@ #include "core/panel_menu.h" #include "core/icons_tabler.h" #include "core/logger.h" +#include "core/sse_client.h" #include "panels.h" #include +#include #include #include +#include #include +#include static bool g_show_board = true; static bool g_show_calendar = true; @@ -22,6 +26,11 @@ static bool g_show_dod = true; static kanban_cpp::AppState g_state; +// SSE client: receives push notifications from the backend stream so the +// board updates without polling. Lifetime tied to main() — stop() before +// returning so the worker thread joins cleanly. +static fn_sse::Client g_sse_client; + static void render() { if (g_show_board) kanban_cpp::draw_board (g_state, &g_show_board); if (g_show_calendar) kanban_cpp::draw_calendar (g_state, &g_show_calendar); @@ -66,8 +75,35 @@ int main(int argc, char** argv) { cfg.panels = panels; cfg.panel_count = sizeof(panels) / sizeof(panels[0]); - // First refresh on startup (best-effort; failure surfaces in the Board). - kanban_cpp::refresh_data(g_state); + // First refresh on startup en thread separado: no bloquea primer frame + // si el backend :8403 esta caido (timeout HTTP ~9s). + std::thread([](){ kanban_cpp::refresh_data(g_state); }).detach(); - return fn::run_app(cfg, render); + // SSE live updates: arranca tras 500ms para no competir con el primer + // refresh inicial. Auto-reconecta con backoff si el endpoint no existe + // aun o si el backend cae — NUNCA crashea el frame. + std::thread([](){ + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + fn_sse::Config sse_cfg; + sse_cfg.url = g_state.cfg.base_url + "/api/boards/issues/stream"; + sse_cfg.auto_reconnect = true; + + g_sse_client.start(sse_cfg, + // on_event: cualquier evento dispara un refresh asincrono. + [](const fn_sse::Event& /*ev*/) { + std::thread([](){ kanban_cpp::refresh_data(g_state); }).detach(); + }, + // on_status: actualiza el badge UI bajo mutex. + [](const std::string& status) { + std::lock_guard lock(g_state.mu); + g_state.sse_status = status; + }); + }).detach(); + + int rc = fn::run_app(cfg, render); + + // Kill SSE worker antes de salir — orden importa para evitar dangling + // thread storage cuando se destruyen los globales. + g_sse_client.stop(); + return rc; } diff --git a/panel_board.cpp b/panel_board.cpp index 74bf395..5ff4262 100644 --- a/panel_board.cpp +++ b/panel_board.cpp @@ -4,17 +4,27 @@ #include #include +#include +#include namespace kanban_cpp { void refresh_data(AppState& s) { std::string err; - s.cards = list_cards(s.cfg, err); - if (!err.empty()) s.last_refresh_error = "cards: " + err; - s.columns = list_columns(s.cfg, err); - if (!err.empty()) s.last_refresh_error += " columns: " + err; - s.backend_ok = health(s.cfg); - s.last_refresh_ts = std::time(nullptr); + auto cards = list_cards(s.cfg, err); + std::string err_cards = err; err.clear(); + auto columns = list_columns(s.cfg, err); + std::string err_cols = err; + bool ok = health(s.cfg); + int64_t ts = std::time(nullptr); + std::lock_guard lock(s.mu); + s.cards = std::move(cards); + s.columns = std::move(columns); + s.last_refresh_error.clear(); + if (!err_cards.empty()) s.last_refresh_error = "cards: " + err_cards; + if (!err_cols.empty()) s.last_refresh_error += " columns: " + err_cols; + s.backend_ok = ok; + s.last_refresh_ts = ts; } void draw_board(AppState& s, bool* p_open) { @@ -23,22 +33,53 @@ void draw_board(AppState& s, bool* p_open) { return; } - // Toolbar - if (ImGui::Button(TI_REFRESH " Refresh")) refresh_data(s); + // Snapshot bajo lock — refresh corre en thread separado. + std::vector cards_snap; + std::vector cols_snap; + bool backend_ok_snap; + std::string err_snap; + std::string sse_snap; + { + std::lock_guard lock(s.mu); + cards_snap = s.cards; + cols_snap = s.columns; + backend_ok_snap = s.backend_ok; + err_snap = s.last_refresh_error; + sse_snap = s.sse_status; + } + + // Toolbar — refresh corre en thread separado (no bloquea frame). + if (ImGui::Button(TI_REFRESH " Refresh")) { + std::thread([&s](){ refresh_data(s); }).detach(); + } ImGui::SameLine(); - if (s.backend_ok) { + if (backend_ok_snap) { ImGui::TextColored(ImVec4(0.4f, 0.85f, 0.4f, 1.0f), TI_CHECK " backend :8403"); } else { ImGui::TextColored(ImVec4(0.85f, 0.4f, 0.4f, 1.0f), TI_ALERT_TRIANGLE " backend offline (:8403)"); } - if (!s.last_refresh_error.empty()) { + + // SSE live badge — refleja el estado del stream push del backend. + ImGui::SameLine(); + if (sse_snap == "connected") { + ImGui::TextColored(ImVec4(0.4f, 0.85f, 0.4f, 1.0f), TI_BROADCAST " live"); + } else if (sse_snap == "connecting") { + ImGui::TextColored(ImVec4(0.85f, 0.85f, 0.4f, 1.0f), TI_LOADER " connecting"); + } else if (sse_snap == "disconnected") { + ImGui::TextColored(ImVec4(0.85f, 0.4f, 0.4f, 1.0f), TI_PLUG_CONNECTED_X " disconnected"); + } else { + // "error: " o cualquier otro string + ImGui::TextColored(ImVec4(0.85f, 0.4f, 0.4f, 1.0f), TI_PLUG_CONNECTED_X " %s", sse_snap.c_str()); + } + + if (!err_snap.empty()) { ImGui::SameLine(); - ImGui::TextColored(ImVec4(0.85f, 0.6f, 0.2f, 1.0f), "%s", s.last_refresh_error.c_str()); + ImGui::TextColored(ImVec4(0.85f, 0.6f, 0.2f, 1.0f), "%s", err_snap.c_str()); } ImGui::Separator(); // Empty state - if (s.columns.empty()) { + if (cols_snap.empty()) { ImGui::TextDisabled("No columns yet. Pulsa Refresh o lanza el backend en :8403."); ImGui::End(); return; @@ -48,19 +89,19 @@ void draw_board(AppState& s, bool* p_open) { const float col_w = 280.0f; if (ImGui::BeginChild("##board_scroll", ImVec2(0, 0), false, ImGuiWindowFlags_HorizontalScrollbar)) { - for (size_t ci = 0; ci < s.columns.size(); ++ci) { - const auto& col = s.columns[ci]; + for (size_t ci = 0; ci < cols_snap.size(); ++ci) { + const auto& col = cols_snap[ci]; ImGui::SameLine(); ImGui::BeginChild((std::string("##col_") + col.id).c_str(), ImVec2(col_w, 0), true); ImGui::TextUnformatted(col.name.c_str()); ImGui::SameLine(); int count = 0; - for (const auto& c : s.cards) if (c.column_id == col.id) ++count; + for (const auto& c : cards_snap) if (c.column_id == col.id) ++count; ImGui::TextDisabled("(%d)", count); ImGui::Separator(); - for (const auto& card : s.cards) { + for (const auto& card : cards_snap) { if (card.column_id != col.id) continue; ImGui::PushID(card.id.c_str()); ImGui::BeginChild("##card", ImVec2(0, 70), true, @@ -83,13 +124,18 @@ void draw_board(AppState& s, bool* p_open) { } if (ImGui::BeginPopup("##card_ctx")) { ImGui::TextDisabled("Move to:"); - for (const auto& tgt : s.columns) { + for (const auto& tgt : cols_snap) { if (tgt.id == card.column_id) continue; if (ImGui::MenuItem(tgt.name.c_str())) { - std::string err; - if (!move_card(s.cfg, card.id, tgt.id, err)) - s.last_refresh_error = "move: " + err; - else refresh_data(s); + std::thread([&s, card_id=card.id, tgt_id=tgt.id](){ + std::string err; + if (!move_card(s.cfg, card_id, tgt_id, err)) { + std::lock_guard lock(s.mu); + s.last_refresh_error = "move: " + err; + return; + } + refresh_data(s); + }).detach(); } } ImGui::Separator(); diff --git a/panel_calendar.cpp b/panel_calendar.cpp index c0e3851..56b0551 100644 --- a/panel_calendar.cpp +++ b/panel_calendar.cpp @@ -8,6 +8,7 @@ #include #include +#include namespace kanban_cpp { @@ -32,6 +33,12 @@ void draw_calendar(AppState& s, bool* p_open) { ImGui::TextDisabled("(MVP estatico — TODO: navegacion + filtros)"); ImGui::Separator(); + std::vector cards_snap; + { + std::lock_guard lock(s.mu); + cards_snap = s.cards; + } + // First day of current month + days in month std::tm tm_first = tm_now; tm_first.tm_mday = 1; @@ -68,7 +75,7 @@ void draw_calendar(AppState& s, bool* p_open) { ImGui::Text("%d", day); // Count cards whose due_date falls in this day. int hits = 0; - for (const auto& card : s.cards) { + for (const auto& card : cards_snap) { if (card.due_date == 0) continue; std::time_t cd = (std::time_t)card.due_date; std::tm tmc; diff --git a/panel_dashboard.cpp b/panel_dashboard.cpp index d1a10f9..70ed154 100644 --- a/panel_dashboard.cpp +++ b/panel_dashboard.cpp @@ -6,6 +6,7 @@ #include #include +#include #include namespace kanban_cpp { @@ -30,11 +31,17 @@ void draw_dashboard(AppState& s, bool* p_open) { ImGui::TextDisabled("KPIs sinteticos (TODO: backend /api/stats endpoint)"); ImGui::Separator(); + std::vector cards_snap; + { + std::lock_guard lock(s.mu); + cards_snap = s.cards; + } + // Snapshot counts - int total = static_cast(s.cards.size()); + int total = static_cast(cards_snap.size()); std::map by_priority; std::map by_status; - for (const auto& c : s.cards) { + for (const auto& c : cards_snap) { if (!c.priority.empty()) by_priority[c.priority]++; if (!c.status.empty()) by_status[c.status]++; } diff --git a/panels.h b/panels.h index b435139..4f1c9d1 100644 --- a/panels.h +++ b/panels.h @@ -5,10 +5,15 @@ #pragma once #include "data.h" +#include +#include namespace kanban_cpp { // Shared app state passed to every panel. Owned by main.cpp. +// `mu` guards cards/columns/backend_ok/last_refresh_*/sse_status — refresh_data +// corre en thread aparte y el SSE callback tambien lo hace, ambos escriben a +// traves del mismo mutex. struct AppState { ClientConfig cfg; std::vector cards; @@ -16,6 +21,9 @@ struct AppState { std::string last_refresh_error; int64_t last_refresh_ts = 0; bool backend_ok = false; + // SSE live status: "connecting" | "connected" | "disconnected" | "error: " + std::string sse_status = "connecting"; + std::mutex mu; }; void draw_board (AppState& s, bool* p_open);