package core import ( "bufio" "context" "fmt" "io" "os" "os/exec" "sync" "syscall" "time" ) // StreamEvent es una linea capturada de stdout o stderr del subproceso. type StreamEvent struct { Stream string // "stdout" | "stderr" Line string // sin trailing newline Time time.Time // timestamp de recepcion } // StreamResult es el resultado final del subproceso, enviado por el canal de // resultados cuando ambos pipes han llegado a EOF y el proceso ha terminado. type StreamResult struct { ExitCode int Err error DurationMs int64 } // SubprocessStream lanza name con args como subproceso y retorna dos canales: // - events: recibe StreamEvent (linea de stdout/stderr) hasta EOF de ambos pipes. // - result: recibe exactamente un StreamResult cuando el proceso termina. // // env se concatena con os.Environ(). stdin puede ser nil. // // Cancelar ctx envia SIGTERM al proceso; si no termina en 2 segundos, SIGKILL. // El caller DEBE consumir events hasta que se cierre o cancelar ctx para evitar // bloquear las goroutines internas. func SubprocessStream( ctx context.Context, name string, args []string, env []string, stdin io.Reader, ) (<-chan StreamEvent, <-chan StreamResult) { events := make(chan StreamEvent, 64) results := make(chan StreamResult, 1) go func() { defer close(events) defer close(results) start := time.Now() cmd := exec.CommandContext(ctx, name, args...) // Entorno: base + extra if len(env) > 0 { cmd.Env = append(os.Environ(), env...) } if stdin != nil { cmd.Stdin = stdin } // Process group propio para matar hijos al recibir SIGTERM/SIGKILL cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} stdoutPipe, err := cmd.StdoutPipe() if err != nil { results <- StreamResult{ExitCode: -1, Err: fmt.Errorf("stdout pipe: %w", err), DurationMs: 0} return } stderrPipe, err := cmd.StderrPipe() if err != nil { results <- StreamResult{ExitCode: -1, Err: fmt.Errorf("stderr pipe: %w", err), DurationMs: 0} return } if err := cmd.Start(); err != nil { results <- StreamResult{ExitCode: -1, Err: fmt.Errorf("start: %w", err), DurationMs: 0} return } // Goroutine de supervision de ctx: SIGTERM → grace 2s → SIGKILL ctxDone := make(chan struct{}) go func() { select { case <-ctx.Done(): if cmd.Process != nil { _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM) timer := time.NewTimer(2 * time.Second) defer timer.Stop() select { case <-timer.C: _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) case <-ctxDone: } } case <-ctxDone: } }() send := func(stream, line string) { ev := StreamEvent{Stream: stream, Line: line, Time: time.Now()} select { case events <- ev: case <-ctx.Done(): } } // Leer stdout y stderr concurrentemente const bufSize = 1024 * 1024 // 1 MB para lineas largas (sd-cli progress, etc.) var wg sync.WaitGroup scanPipe := func(r io.Reader, stream string) { defer wg.Done() sc := bufio.NewScanner(r) sc.Buffer(make([]byte, bufSize), bufSize) for sc.Scan() { send(stream, sc.Text()) } } wg.Add(2) go scanPipe(stdoutPipe, "stdout") go scanPipe(stderrPipe, "stderr") wg.Wait() close(ctxDone) // señal al supervisor de ctx para que pare exitCode := 0 var waitErr error if err := cmd.Wait(); err != nil { waitErr = err if exitErr, ok := err.(*exec.ExitError); ok { exitCode = exitErr.ExitCode() waitErr = nil // exit code no-cero no es un error de spawn } } // Si el contexto fue cancelado, reportar como error de cancelacion if ctx.Err() != nil && waitErr == nil { waitErr = ctx.Err() } results <- StreamResult{ ExitCode: exitCode, Err: waitErr, DurationMs: time.Since(start).Milliseconds(), } }() return events, results }