package infra import ( "bytes" "context" "fmt" "os/exec" "sync" "syscall" "time" "github.com/creack/pty" ) // PTYCaptureStream launches a command inside a pseudo-terminal (PTY) and // streams periodic snapshots of the accumulated output through a channel. // Unlike PTYCaptureIdle, which returns the full output at the end, // PTYCaptureStream emits the ENTIRE buffer accumulated so far on every // snapshotInterval tick — allowing callers to observe the terminal render // while the process is still running. // // The returned channel is closed when capture ends (idle/maxDur/ctx cancel). // The last value sent before closing is always a final snapshot of the // complete buffer, regardless of tick alignment. // // Callers MUST drain the channel or cancel ctx to avoid blocking the // internal goroutine. Error is returned only if pty.Start fails. func PTYCaptureStream( ctx context.Context, name string, args []string, warmup time.Duration, inputs []string, stepDelay time.Duration, snapshotInterval time.Duration, idle time.Duration, maxDur time.Duration, ) (<-chan string, error) { cmd := exec.CommandContext(ctx, name, args...) ptmx, err := pty.Start(cmd) if err != nil { return nil, fmt.Errorf("pty_capture_stream: pty.Start: %w", err) } // Set a reasonable terminal size so TUIs render without truncating. if szErr := pty.Setsize(ptmx, &pty.Winsize{Rows: 40, Cols: 120}); szErr != nil { // Non-fatal: continue even if resize fails. _ = szErr } var ( mu sync.Mutex buf bytes.Buffer lastByte = time.Now() ) // Reader goroutine: copy PTY output into buf and track last-byte time. readDone := make(chan struct{}) go func() { defer close(readDone) tmp := make([]byte, 4096) for { n, rerr := ptmx.Read(tmp) if n > 0 { mu.Lock() buf.Write(tmp[:n]) lastByte = time.Now() mu.Unlock() } if rerr != nil { // EIO/EOF is normal on Linux when the PTY master is closed // after the child exits. Not a real error. return } } }() ch := make(chan string, 16) // snapshot returns a copy of the current buffer contents. snapshot := func() string { mu.Lock() s := buf.String() mu.Unlock() return s } // send emits a snapshot to ch, respecting ctx cancellation. send := func(s string) bool { select { case ch <- s: return true case <-ctx.Done(): return false } } // Conducting goroutine: handles warmup, inputs, periodic snapshots, // idle/maxDur detection, and shutdown. go func() { defer func() { // Shutdown: close PTY master, SIGTERM → SIGKILL, wait reader. _ = ptmx.Close() if cmd.Process != nil { _ = cmd.Process.Signal(syscall.SIGTERM) killTimer := time.NewTimer(2 * time.Second) waitCh := make(chan error, 1) go func() { waitCh <- cmd.Wait() }() select { case <-waitCh: // Process exited cleanly. case <-killTimer.C: _ = cmd.Process.Kill() <-waitCh } killTimer.Stop() } <-readDone // Final snapshot — always emitted so consumers get the complete state. send(snapshot()) close(ch) }() start := time.Now() // Wait for warmup so the TUI/CLI has time to initialize. select { case <-time.After(warmup): case <-ctx.Done(): return } // Send inputs one by one with stepDelay between them. for _, in := range inputs { if _, werr := ptmx.Write([]byte(in)); werr != nil { // PTY may have closed already; stop sending. break } select { case <-time.After(stepDelay): case <-ctx.Done(): return } } // Main loop: emit snapshots on ticker, cut on idle or maxDur. ticker := time.NewTicker(snapshotInterval) defer ticker.Stop() for { select { case <-ticker.C: // Emit current accumulated snapshot. if !send(snapshot()) { return } // Check termination conditions. mu.Lock() sinceLastByte := time.Since(lastByte) mu.Unlock() elapsed := time.Since(start) if sinceLastByte >= idle || elapsed >= maxDur { return } case <-ctx.Done(): return } } }() return ch, nil }