Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3db4443b65 | |||
| 4822208306 | |||
| cd0ba85a22 | |||
| bdd0c6266d |
@@ -232,16 +232,30 @@ func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
// Initial ping: SSE clients consider the stream "connected" only after
|
||||
// receiving the first byte of body. Without this, agents_dashboard sits
|
||||
// on "connecting" until the first status diff (which can be minutes away).
|
||||
fmt.Fprint(w, ": ping\n\n")
|
||||
flusher.Flush()
|
||||
|
||||
sub := s.bus.Subscribe("status")
|
||||
defer s.bus.Unsubscribe("status", sub)
|
||||
|
||||
ticker := time.NewTicker(15 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
ctx := r.Context()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Periodic heartbeat: keeps proxies (Traefik, CDN) from closing
|
||||
// the idle connection and lets the client detect dead servers.
|
||||
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
case ev, ok := <-sub:
|
||||
if !ok {
|
||||
return
|
||||
@@ -275,6 +289,9 @@ func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
// Initial ping unblocks client fgets so the UI flips from "connecting"
|
||||
// to "connected" immediately (logfile may be silent for a while).
|
||||
fmt.Fprint(w, ": ping\n\n")
|
||||
flusher.Flush()
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
@@ -147,6 +147,16 @@ func (sw *statusWriter) WriteHeader(code int) {
|
||||
sw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
// Flush forwards to the underlying ResponseWriter when it implements Flusher.
|
||||
// Without this method, the type assertion `w.(http.Flusher)` in the SSE handlers
|
||||
// fails (the wrapper hides the inner Flusher), and the handler aborts with
|
||||
// "streaming unsupported".
|
||||
func (sw *statusWriter) Flush() {
|
||||
if f, ok := sw.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
func writeJSON(w http.ResponseWriter, status int, v any) {
|
||||
|
||||
@@ -47,15 +47,23 @@ func tailLogFile(ctx context.Context, path string, w http.ResponseWriter, flushe
|
||||
}
|
||||
}
|
||||
|
||||
// Tail the file: poll for new bytes every 200ms
|
||||
// Tail the file: poll for new bytes every 200ms.
|
||||
// Separate heartbeat ticker keeps proxies / clients alive on idle logs.
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
heartbeat := time.NewTicker(15 * time.Second)
|
||||
defer heartbeat.Stop()
|
||||
|
||||
reader := bufio.NewReader(f)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-heartbeat.C:
|
||||
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
case <-ticker.C:
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
|
||||
Reference in New Issue
Block a user