Files
egutierrez 3db4443b65 fix(sse): initial ping + periodic heartbeat unblocks "connecting" state
SSE clients (agents_dashboard) consider the stream connected only after
receiving the first byte of body. The previous implementation flushed
headers and then blocked waiting for status diffs (sse_status) or log
lines (sse_agents_logs) — which could be silent for minutes. UI sat
on "connecting" indefinitely.

Fix:
- After WriteHeader + Flush, emit ":ping\n\n" comment (SSE spec, valid
  no-op) and flush. Unblocks client fgets immediately → state flips
  to "connected" in < 1s.
- Add 15s ticker emitting ":ping\n\n" so idle streams stay alive
  through Traefik / CDN proxies and clients detect dead servers.
- Same treatment for /sse/status and /sse/agents/{id}/logs (tail.go).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:42:29 +02:00

99 lines
2.2 KiB
Go

package api
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"os"
"time"
)
// tailLogFile streams new lines appended to path to w (SSE text/plain lines).
// Sends existing content first (last 200 lines), then polls for new content.
// Blocks until ctx is done.
func tailLogFile(ctx context.Context, path string, w http.ResponseWriter, flusher http.Flusher) {
f, err := os.Open(path)
if err != nil {
// File may not exist yet (agent hasn't written any logs).
// Wait for it to appear.
f = waitForFile(ctx, path)
if f == nil {
return // ctx cancelled before file appeared
}
}
defer f.Close()
// Seek to end minus ~50 KB to avoid dumping the whole file.
// This gives "recent context" without overwhelming the SSE stream.
const tailBytes = 50 * 1024
info, _ := f.Stat()
if info != nil && info.Size() > tailBytes {
_, _ = f.Seek(-tailBytes, io.SeekEnd)
// Skip incomplete first line
r := bufio.NewReader(f)
_, _ = r.ReadString('\n')
// Emit buffered remainder
for {
line, err := r.ReadString('\n')
if line != "" {
fmt.Fprintf(w, "event: log\ndata: %s\n\n", line)
flusher.Flush()
}
if err != nil {
break
}
}
}
// Tail the file: poll for new bytes every 200ms.
// Separate heartbeat ticker keeps proxies / clients alive on idle logs.
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
reader := bufio.NewReader(f)
for {
select {
case <-ctx.Done():
return
case <-heartbeat.C:
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case <-ticker.C:
for {
line, err := reader.ReadString('\n')
if line != "" {
fmt.Fprintf(w, "event: log\ndata: %s\n\n", line)
flusher.Flush()
}
if err != nil {
// io.EOF means no more data right now — wait next tick
break
}
}
}
}
}
// waitForFile polls until path exists or ctx is done.
func waitForFile(ctx context.Context, path string) *os.File {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
f, err := os.Open(path)
if err == nil {
return f
}
}
}
}