4 Commits

Author SHA1 Message Date
egutierrez 3db4443b65 fix(sse): initial ping + periodic heartbeat unblocks "connecting" state
SSE clients (agents_dashboard) consider the stream connected only after
receiving the first byte of body. The previous implementation flushed
headers and then blocked waiting for status diffs (sse_status) or log
lines (sse_agents_logs) — which could be silent for minutes. UI sat
on "connecting" indefinitely.

Fix:
- After WriteHeader + Flush, emit ":ping\n\n" comment (SSE spec, valid
  no-op) and flush. Unblocks client fgets immediately → state flips
  to "connected" in < 1s.
- Add 15s ticker emitting ":ping\n\n" so idle streams stay alive
  through Traefik / CDN proxies and clients detect dead servers.
- Same treatment for /sse/status and /sse/agents/{id}/logs (tail.go).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:42:29 +02:00
egutierrez 4822208306 fix(api): statusWriter implements http.Flusher for SSE handlers
The logMiddleware wrapper (statusWriter) didn't forward Flush, so
`w.(http.Flusher)` in SSE handlers failed and returned the plain text
"streaming unsupported" with 500. SSE clients (agents_dashboard C++ app)
saw a closed connection with no events.

Add Flush() that delegates to the embedded ResponseWriter when it
implements Flusher. Required for /sse/status and /sse/agents/{id}/logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:32:06 +02:00
egutierrez cd0ba85a22 chore: auto-commit (1 archivos)
- launcher

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 21:52:38 +02:00
egutierrez bdd0c6266d merge: 0128 http api + sse + apikey + systemd + unified status fix 2026-05-22 21:32:40 +02:00
4 changed files with 36 additions and 1 deletions
+17
View File
@@ -232,16 +232,30 @@ func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Initial ping: SSE clients consider the stream "connected" only after
// receiving the first byte of body. Without this, agents_dashboard sits
// on "connecting" until the first status diff (which can be minutes away).
fmt.Fprint(w, ": ping\n\n")
flusher.Flush()
sub := s.bus.Subscribe("status")
defer s.bus.Unsubscribe("status", sub)
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Periodic heartbeat: keeps proxies (Traefik, CDN) from closing
// the idle connection and lets the client detect dead servers.
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case ev, ok := <-sub:
if !ok {
return
@@ -275,6 +289,9 @@ func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Initial ping unblocks client fgets so the UI flips from "connecting"
// to "connected" immediately (logfile may be silent for a while).
fmt.Fprint(w, ": ping\n\n")
flusher.Flush()
ctx := r.Context()
+10
View File
@@ -147,6 +147,16 @@ func (sw *statusWriter) WriteHeader(code int) {
sw.ResponseWriter.WriteHeader(code)
}
// Flush forwards to the underlying ResponseWriter when it implements Flusher.
// Without this method, the type assertion `w.(http.Flusher)` in the SSE handlers
// fails (the wrapper hides the inner Flusher), and the handler aborts with
// "streaming unsupported".
func (sw *statusWriter) Flush() {
if f, ok := sw.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
// --- Helpers ---
func writeJSON(w http.ResponseWriter, status int, v any) {
+9 -1
View File
@@ -47,15 +47,23 @@ func tailLogFile(ctx context.Context, path string, w http.ResponseWriter, flushe
}
}
// Tail the file: poll for new bytes every 200ms
// Tail the file: poll for new bytes every 200ms.
// Separate heartbeat ticker keeps proxies / clients alive on idle logs.
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
reader := bufio.NewReader(f)
for {
select {
case <-ctx.Done():
return
case <-heartbeat.C:
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case <-ticker.C:
for {
line, err := reader.ReadString('\n')
Executable
BIN
View File
Binary file not shown.