Files
fn_registry/functions/infra/pty_capture_stream.go
2026-06-04 23:44:39 +02:00

177 lines
4.0 KiB
Go

package infra
import (
"bytes"
"context"
"fmt"
"os/exec"
"sync"
"syscall"
"time"
"github.com/creack/pty"
)
// PTYCaptureStream launches a command inside a pseudo-terminal (PTY) and
// streams periodic snapshots of the accumulated output through a channel.
// Unlike PTYCaptureIdle, which returns the full output at the end,
// PTYCaptureStream emits the ENTIRE buffer accumulated so far on every
// snapshotInterval tick — allowing callers to observe the terminal render
// while the process is still running.
//
// The returned channel is closed when capture ends (idle/maxDur/ctx cancel).
// The last value sent before closing is always a final snapshot of the
// complete buffer, regardless of tick alignment.
//
// Callers MUST drain the channel or cancel ctx to avoid blocking the
// internal goroutine. Error is returned only if pty.Start fails.
func PTYCaptureStream(
ctx context.Context,
name string,
args []string,
warmup time.Duration,
inputs []string,
stepDelay time.Duration,
snapshotInterval time.Duration,
idle time.Duration,
maxDur time.Duration,
) (<-chan string, error) {
cmd := exec.CommandContext(ctx, name, args...)
ptmx, err := pty.Start(cmd)
if err != nil {
return nil, fmt.Errorf("pty_capture_stream: pty.Start: %w", err)
}
// Set a reasonable terminal size so TUIs render without truncating.
if szErr := pty.Setsize(ptmx, &pty.Winsize{Rows: 40, Cols: 120}); szErr != nil {
// Non-fatal: continue even if resize fails.
_ = szErr
}
var (
mu sync.Mutex
buf bytes.Buffer
lastByte = time.Now()
)
// Reader goroutine: copy PTY output into buf and track last-byte time.
readDone := make(chan struct{})
go func() {
defer close(readDone)
tmp := make([]byte, 4096)
for {
n, rerr := ptmx.Read(tmp)
if n > 0 {
mu.Lock()
buf.Write(tmp[:n])
lastByte = time.Now()
mu.Unlock()
}
if rerr != nil {
// EIO/EOF is normal on Linux when the PTY master is closed
// after the child exits. Not a real error.
return
}
}
}()
ch := make(chan string, 16)
// snapshot returns a copy of the current buffer contents.
snapshot := func() string {
mu.Lock()
s := buf.String()
mu.Unlock()
return s
}
// send emits a snapshot to ch, respecting ctx cancellation.
send := func(s string) bool {
select {
case ch <- s:
return true
case <-ctx.Done():
return false
}
}
// Conducting goroutine: handles warmup, inputs, periodic snapshots,
// idle/maxDur detection, and shutdown.
go func() {
defer func() {
// Shutdown: close PTY master, SIGTERM → SIGKILL, wait reader.
_ = ptmx.Close()
if cmd.Process != nil {
_ = cmd.Process.Signal(syscall.SIGTERM)
killTimer := time.NewTimer(2 * time.Second)
waitCh := make(chan error, 1)
go func() { waitCh <- cmd.Wait() }()
select {
case <-waitCh:
// Process exited cleanly.
case <-killTimer.C:
_ = cmd.Process.Kill()
<-waitCh
}
killTimer.Stop()
}
<-readDone
// Final snapshot — always emitted so consumers get the complete state.
send(snapshot())
close(ch)
}()
start := time.Now()
// Wait for warmup so the TUI/CLI has time to initialize.
select {
case <-time.After(warmup):
case <-ctx.Done():
return
}
// Send inputs one by one with stepDelay between them.
for _, in := range inputs {
if _, werr := ptmx.Write([]byte(in)); werr != nil {
// PTY may have closed already; stop sending.
break
}
select {
case <-time.After(stepDelay):
case <-ctx.Done():
return
}
}
// Main loop: emit snapshots on ticker, cut on idle or maxDur.
ticker := time.NewTicker(snapshotInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Emit current accumulated snapshot.
if !send(snapshot()) {
return
}
// Check termination conditions.
mu.Lock()
sinceLastByte := time.Since(lastByte)
mu.Unlock()
elapsed := time.Since(start)
if sinceLastByte >= idle || elapsed >= maxDur {
return
}
case <-ctx.Done():
return
}
}
}()
return ch, nil
}