From 4574f08a22cf4851393ac62ab11f68d64ff4fbbe Mon Sep 17 00:00:00 2001 From: agent Date: Wed, 3 Jun 2026 23:27:12 +0200 Subject: [PATCH] feat: streaming incremental (--stream) parseando snapshots de la TUI Fase 2. Anade modo --stream que emite la respuesta de claude como NDJSON (eventos text_delta + result final), re-parseando snapshots del render. Compone dos funciones nuevas del registry: - pty_capture_stream_go_infra: captura snapshots acumulativos del PTY por canal. - text_prefix_delta_go_core: delta por prefijo comun entre snapshots sucesivos. Por cada snapshot: vt_render -> parse_claude_tui -> delta del Answer. Solo emite text_delta cuando el answer extiende limpiamente al anterior (HasPrefix); los frames no monotonos se reconcilian en el result final. Heuristico y documentado. e2e_check smoke_fake_stream verifica el flujo con el fake TUI (sin gastar claude). --- app.md | 36 ++++++++++++++++++++++++++++++--- go.mod | 1 + go.sum | 2 ++ main.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 3 deletions(-) diff --git a/app.md b/app.md index 0a896bd..30deb7b 100644 --- a/app.md +++ b/app.md @@ -7,8 +7,10 @@ description: "Alternativa a 'claude -p' que obtiene la respuesta de claude como tags: [cli, claude, terminal, pty, tui, parser] uses_functions: - pty_capture_idle_go_infra + - pty_capture_stream_go_infra - vt_render_go_tui - parse_claude_tui_go_tui + - text_prefix_delta_go_core uses_types: - claude_tui_parse_go_tui - claude_turn_go_tui @@ -30,6 +32,10 @@ e2e_checks: cmd: "./claude_pipe --bin ./tests/fake_claude.sh --warmup 300ms --step-delay 100ms --idle 700ms --max 5s --format json test" expect_stdout_contains: "\"result\":\"RESPUESTA_FAKE_OK\"" timeout_s: 15 + - id: smoke_fake_stream + cmd: "./claude_pipe --stream --bin ./tests/fake_claude.sh --warmup 300ms --step-delay 100ms --snapshot-interval 120ms --idle 700ms --max 5s test" + expect_stdout_contains: "\"type\":\"text_delta\"" + timeout_s: 15 --- # claude_pipe @@ -97,6 +103,27 @@ echo "explica este error" | ./claude_pipe --cwd /home/enmanuel/fn_registry | `--step-delay` | `600ms` | Espera entre teclear el prompt y pulsar Enter. | | `--idle` | `4s` | Corta la captura tras este silencio (respuesta terminada de renderizar). | | `--max` | `120s` | Timeout duro de toda la captura. | +| `--stream` | false | Emite la respuesta incrementalmente como NDJSON (`text_delta` por snapshot) y un `result` final. | +| `--snapshot-interval` | `150ms` | En `--stream`, cada cuánto se captura y re-parsea la TUI. | + +## Streaming (`--stream`) + +Con `--stream`, la app usa `pty_capture_stream_go_infra` para tomar snapshots del render cada +`--snapshot-interval`, re-parsea cada snapshot con `parse_claude_tui_go_tui`, y emite el delta del +`Answer` (via `text_prefix_delta_go_core`) como NDJSON, terminando con un `result`: + +```bash +./claude_pipe --stream --cwd /home/enmanuel/fn_registry "explica Go en 3 frases" +# {"type":"text_delta","text":"Go es un lenguaje..."} +# {"type":"text_delta","text":" compilado y concurrente..."} +# {"type":"result","subtype":"success","result":"Go es un lenguaje... compilado y concurrente..."} +``` + +Un programa externo Go (u otro) lo lanza como subprocess y lee líneas. **Es heurístico** (ver +Gotchas): la TUI re-renderiza el frame entero, así que solo se emite `text_delta` cuando el nuevo +`Answer` extiende limpiamente al anterior; los frames no monótonos (reflow) se reconcilian en el +`result` final, cuyo `result` lleva siempre la respuesta completa. Para streaming limpio y +monótono nativo sin tocar la TUI, `claude_stream_go_core` (stream-json) es superior. ## Cuando usarla @@ -122,6 +149,9 @@ Si no necesitas pasar por la TUI, usa `claude_stream_go_core` (stream-json) — - **Latencia**: anade `warmup` + `idle` (por defecto ~8s de overhead) sobre el tiempo de respuesta de claude. `claude -p` no tiene ese overhead. Es el precio de ir por la TUI. - **Linux/Unix only**: hereda el PTY POSIX de `pty_capture_idle_go_infra`. -- **Streaming**: esta version es one-shot (espera la respuesta completa y luego parsea). El - streaming incremental de la TUI esta planificado como fase 2 (requiere capturar snapshots - durante el render). +- **Streaming heurístico**: `--stream` emite deltas re-parseando snapshots del render. Como la TUI + re-renderiza el frame entero, el `Answer` parseado puede no ser monótono (reflow al crecer la + respuesta, spinner). Solo se emite `text_delta` cuando el nuevo answer extiende limpiamente al + anterior; el `result` final siempre lleva la respuesta completa. Posibles artefactos: deltas que + reaparecen, o fragmentos perdidos en un frame intermedio. Si necesitas streaming exacto, usa + `claude_stream_go_core` (stream-json). diff --git a/go.mod b/go.mod index dad0f0b..c9304a7 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect golang.org/x/crypto v0.51.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.44.0 // indirect golang.org/x/text v0.37.0 // indirect diff --git a/go.sum b/go.sum index 959e463..f58bb36 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a h1:+3jdDGGB8NGb1Zktc737jlt3/A5f6UlwSzmvqUuufxw= golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a/go.mod h1:d2fgXJLVs4dYDHUk5lwMIfzRzSrWCfGZb0ZqeLa/Vcw= +golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w= +golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go index 5214203..46016a6 100644 --- a/main.go +++ b/main.go @@ -34,6 +34,7 @@ import ( "context" + "fn-registry/functions/core" "fn-registry/functions/infra" "fn-registry/functions/tui" ) @@ -53,6 +54,17 @@ type claudePResult struct { Result string `json:"result"` } +// streamEvent is one NDJSON line emitted in --stream mode. Loosely mirrors the +// claude -p --output-format stream-json events: text_delta during generation, +// then a final result. +type streamEvent struct { + Type string `json:"type"` // "text_delta" | "result" + Text string `json:"text,omitempty"` // for text_delta + Subtype string `json:"subtype,omitempty"` // for result + IsError bool `json:"is_error,omitempty"` + Result string `json:"result,omitempty"` // for result: full answer +} + func main() { var ( prompt = flag.String("prompt", "", "prompt to send. If empty, taken from the positional arg, or from piped stdin") @@ -63,6 +75,8 @@ func main() { stepDelay = flag.Duration("step-delay", 600*time.Millisecond, "delay between typing the prompt and pressing Enter") idle = flag.Duration("idle", 4*time.Second, "stop capturing after this much silence (response finished rendering)") maxDur = flag.Duration("max", 120*time.Second, "hard timeout for the whole capture") + stream = flag.Bool("stream", false, "stream the answer incrementally as NDJSON (text_delta events) by parsing TUI snapshots as they render, then a final result event") + snapInt = flag.Duration("snapshot-interval", 150*time.Millisecond, "how often to snapshot and re-parse the TUI in --stream mode") ) flag.Usage = func() { fmt.Fprintf(os.Stderr, `claude_pipe — get a claude answer as data by parsing its TUI (alternative to claude -p). @@ -110,6 +124,11 @@ Flags: ctx, cancel := context.WithTimeout(context.Background(), *maxDur+10*time.Second) defer cancel() + if *stream { + streamAnswer(ctx, *bin, inputs, *warmup, *stepDelay, *snapInt, *idle, *maxDur) + return + } + raw, err := infra.PTYCaptureIdle(ctx, *bin, nil, *warmup, inputs, *stepDelay, *idle, *maxDur) if err != nil { fmt.Fprintf(os.Stderr, "claude_pipe: capture failed: %v\n", err) @@ -153,6 +172,49 @@ Flags: } } +// streamAnswer drives claude through a PTY and emits the assistant's answer +// incrementally as NDJSON, by re-parsing the TUI on every snapshot and emitting +// the prefix-delta of the parsed answer. Ends with a final result event. +// +// This is heuristic: the TUI re-renders the whole frame, so the parsed answer is +// not guaranteed monotonic (reflow can rewrite earlier text). We only emit a +// text_delta when the new answer cleanly extends the previous one (HasPrefix); +// non-monotonic frames are absorbed and reconciled by the final result, whose +// Result field carries the full answer regardless. +func streamAnswer(ctx context.Context, bin string, inputs []string, warmup, stepDelay, snapInt, idle, maxDur time.Duration) { + ch, err := infra.PTYCaptureStream(ctx, bin, nil, warmup, inputs, stepDelay, snapInt, idle, maxDur) + if err != nil { + fmt.Fprintf(os.Stderr, "claude_pipe: stream capture failed: %v\n", err) + os.Exit(1) + } + + enc := json.NewEncoder(os.Stdout) // os.Stdout is unbuffered: each Encode writes through immediately + prev := "" + final := "" + for snap := range ch { + screen := tui.VTRender(snap, ptyRows, ptyCols) + ans := tui.ParseClaudeTUI(screen).Answer + if strings.HasPrefix(ans, prev) { + if delta := core.PrefixDelta(prev, ans); delta != "" { + _ = enc.Encode(streamEvent{Type: "text_delta", Text: delta}) + } + prev = ans + } + // Keep the longest answer seen as the final, even if a later frame shrank + // (transient reflow / parse noise). + if len(ans) >= len(final) { + final = ans + } + } + + _ = enc.Encode(streamEvent{ + Type: "result", + Subtype: "success", + IsError: final == "", + Result: final, + }) +} + // stdinIsPiped reports whether stdin is connected to a pipe/file rather than a terminal. func stdinIsPiped() bool { info, err := os.Stdin.Stat()