feat: streaming incremental (--stream) parseando snapshots de la TUI

Fase 2. Anade modo --stream que emite la respuesta de claude como NDJSON
(eventos text_delta + result final), re-parseando snapshots del render.

Compone dos funciones nuevas del registry:
- pty_capture_stream_go_infra: captura snapshots acumulativos del PTY por canal.
- text_prefix_delta_go_core: delta por prefijo comun entre snapshots sucesivos.

Por cada snapshot: vt_render -> parse_claude_tui -> delta del Answer. Solo emite
text_delta cuando el answer extiende limpiamente al anterior (HasPrefix); los
frames no monotonos se reconcilian en el result final. Heuristico y documentado.

e2e_check smoke_fake_stream verifica el flujo con el fake TUI (sin gastar claude).
This commit is contained in:
agent
2026-06-03 23:27:12 +02:00
parent 8d6078e99e
commit 4574f08a22
4 changed files with 98 additions and 3 deletions
+33 -3
View File
@@ -7,8 +7,10 @@ description: "Alternativa a 'claude -p' que obtiene la respuesta de claude como
tags: [cli, claude, terminal, pty, tui, parser] tags: [cli, claude, terminal, pty, tui, parser]
uses_functions: uses_functions:
- pty_capture_idle_go_infra - pty_capture_idle_go_infra
- pty_capture_stream_go_infra
- vt_render_go_tui - vt_render_go_tui
- parse_claude_tui_go_tui - parse_claude_tui_go_tui
- text_prefix_delta_go_core
uses_types: uses_types:
- claude_tui_parse_go_tui - claude_tui_parse_go_tui
- claude_turn_go_tui - claude_turn_go_tui
@@ -30,6 +32,10 @@ e2e_checks:
cmd: "./claude_pipe --bin ./tests/fake_claude.sh --warmup 300ms --step-delay 100ms --idle 700ms --max 5s --format json test" cmd: "./claude_pipe --bin ./tests/fake_claude.sh --warmup 300ms --step-delay 100ms --idle 700ms --max 5s --format json test"
expect_stdout_contains: "\"result\":\"RESPUESTA_FAKE_OK\"" expect_stdout_contains: "\"result\":\"RESPUESTA_FAKE_OK\""
timeout_s: 15 timeout_s: 15
- id: smoke_fake_stream
cmd: "./claude_pipe --stream --bin ./tests/fake_claude.sh --warmup 300ms --step-delay 100ms --snapshot-interval 120ms --idle 700ms --max 5s test"
expect_stdout_contains: "\"type\":\"text_delta\""
timeout_s: 15
--- ---
# claude_pipe # claude_pipe
@@ -97,6 +103,27 @@ echo "explica este error" | ./claude_pipe --cwd /home/enmanuel/fn_registry
| `--step-delay` | `600ms` | Espera entre teclear el prompt y pulsar Enter. | | `--step-delay` | `600ms` | Espera entre teclear el prompt y pulsar Enter. |
| `--idle` | `4s` | Corta la captura tras este silencio (respuesta terminada de renderizar). | | `--idle` | `4s` | Corta la captura tras este silencio (respuesta terminada de renderizar). |
| `--max` | `120s` | Timeout duro de toda la captura. | | `--max` | `120s` | Timeout duro de toda la captura. |
| `--stream` | false | Emite la respuesta incrementalmente como NDJSON (`text_delta` por snapshot) y un `result` final. |
| `--snapshot-interval` | `150ms` | En `--stream`, cada cuánto se captura y re-parsea la TUI. |
## Streaming (`--stream`)
Con `--stream`, la app usa `pty_capture_stream_go_infra` para tomar snapshots del render cada
`--snapshot-interval`, re-parsea cada snapshot con `parse_claude_tui_go_tui`, y emite el delta del
`Answer` (via `text_prefix_delta_go_core`) como NDJSON, terminando con un `result`:
```bash
./claude_pipe --stream --cwd /home/enmanuel/fn_registry "explica Go en 3 frases"
# {"type":"text_delta","text":"Go es un lenguaje..."}
# {"type":"text_delta","text":" compilado y concurrente..."}
# {"type":"result","subtype":"success","result":"Go es un lenguaje... compilado y concurrente..."}
```
Un programa externo Go (u otro) lo lanza como subprocess y lee líneas. **Es heurístico** (ver
Gotchas): la TUI re-renderiza el frame entero, así que solo se emite `text_delta` cuando el nuevo
`Answer` extiende limpiamente al anterior; los frames no monótonos (reflow) se reconcilian en el
`result` final, cuyo `result` lleva siempre la respuesta completa. Para streaming limpio y
monótono nativo sin tocar la TUI, `claude_stream_go_core` (stream-json) es superior.
## Cuando usarla ## Cuando usarla
@@ -122,6 +149,9 @@ Si no necesitas pasar por la TUI, usa `claude_stream_go_core` (stream-json) —
- **Latencia**: anade `warmup` + `idle` (por defecto ~8s de overhead) sobre el tiempo de respuesta - **Latencia**: anade `warmup` + `idle` (por defecto ~8s de overhead) sobre el tiempo de respuesta
de claude. `claude -p` no tiene ese overhead. Es el precio de ir por la TUI. de claude. `claude -p` no tiene ese overhead. Es el precio de ir por la TUI.
- **Linux/Unix only**: hereda el PTY POSIX de `pty_capture_idle_go_infra`. - **Linux/Unix only**: hereda el PTY POSIX de `pty_capture_idle_go_infra`.
- **Streaming**: esta version es one-shot (espera la respuesta completa y luego parsea). El - **Streaming heurístico**: `--stream` emite deltas re-parseando snapshots del render. Como la TUI
streaming incremental de la TUI esta planificado como fase 2 (requiere capturar snapshots re-renderiza el frame entero, el `Answer` parseado puede no ser monótono (reflow al crecer la
durante el render). respuesta, spinner). Solo se emite `text_delta` cuando el nuevo answer extiende limpiamente al
anterior; el `result` final siempre lleva la respuesta completa. Posibles artefactos: deltas que
reaparecen, o fragmentos perdidos en un frame intermedio. Si necesitas streaming exacto, usa
`claude_stream_go_core` (stream-json).
+1
View File
@@ -34,6 +34,7 @@ require (
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e // indirect github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/crypto v0.51.0 // indirect golang.org/x/crypto v0.51.0 // indirect
golang.org/x/net v0.54.0 // indirect
golang.org/x/sync v0.20.0 // indirect golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect golang.org/x/text v0.37.0 // indirect
+2
View File
@@ -58,6 +58,8 @@ golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a h1:+3jdDGGB8NGb1Zktc737jlt3/A5f6UlwSzmvqUuufxw= golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a h1:+3jdDGGB8NGb1Zktc737jlt3/A5f6UlwSzmvqUuufxw=
golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a/go.mod h1:d2fgXJLVs4dYDHUk5lwMIfzRzSrWCfGZb0ZqeLa/Vcw= golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a/go.mod h1:d2fgXJLVs4dYDHUk5lwMIfzRzSrWCfGZb0ZqeLa/Vcw=
golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+62
View File
@@ -34,6 +34,7 @@ import (
"context" "context"
"fn-registry/functions/core"
"fn-registry/functions/infra" "fn-registry/functions/infra"
"fn-registry/functions/tui" "fn-registry/functions/tui"
) )
@@ -53,6 +54,17 @@ type claudePResult struct {
Result string `json:"result"` Result string `json:"result"`
} }
// streamEvent is one NDJSON line emitted in --stream mode. Loosely mirrors the
// claude -p --output-format stream-json events: text_delta during generation,
// then a final result.
type streamEvent struct {
Type string `json:"type"` // "text_delta" | "result"
Text string `json:"text,omitempty"` // for text_delta
Subtype string `json:"subtype,omitempty"` // for result
IsError bool `json:"is_error,omitempty"`
Result string `json:"result,omitempty"` // for result: full answer
}
func main() { func main() {
var ( var (
prompt = flag.String("prompt", "", "prompt to send. If empty, taken from the positional arg, or from piped stdin") prompt = flag.String("prompt", "", "prompt to send. If empty, taken from the positional arg, or from piped stdin")
@@ -63,6 +75,8 @@ func main() {
stepDelay = flag.Duration("step-delay", 600*time.Millisecond, "delay between typing the prompt and pressing Enter") stepDelay = flag.Duration("step-delay", 600*time.Millisecond, "delay between typing the prompt and pressing Enter")
idle = flag.Duration("idle", 4*time.Second, "stop capturing after this much silence (response finished rendering)") idle = flag.Duration("idle", 4*time.Second, "stop capturing after this much silence (response finished rendering)")
maxDur = flag.Duration("max", 120*time.Second, "hard timeout for the whole capture") maxDur = flag.Duration("max", 120*time.Second, "hard timeout for the whole capture")
stream = flag.Bool("stream", false, "stream the answer incrementally as NDJSON (text_delta events) by parsing TUI snapshots as they render, then a final result event")
snapInt = flag.Duration("snapshot-interval", 150*time.Millisecond, "how often to snapshot and re-parse the TUI in --stream mode")
) )
flag.Usage = func() { flag.Usage = func() {
fmt.Fprintf(os.Stderr, `claude_pipe — get a claude answer as data by parsing its TUI (alternative to claude -p). fmt.Fprintf(os.Stderr, `claude_pipe — get a claude answer as data by parsing its TUI (alternative to claude -p).
@@ -110,6 +124,11 @@ Flags:
ctx, cancel := context.WithTimeout(context.Background(), *maxDur+10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), *maxDur+10*time.Second)
defer cancel() defer cancel()
if *stream {
streamAnswer(ctx, *bin, inputs, *warmup, *stepDelay, *snapInt, *idle, *maxDur)
return
}
raw, err := infra.PTYCaptureIdle(ctx, *bin, nil, *warmup, inputs, *stepDelay, *idle, *maxDur) raw, err := infra.PTYCaptureIdle(ctx, *bin, nil, *warmup, inputs, *stepDelay, *idle, *maxDur)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "claude_pipe: capture failed: %v\n", err) fmt.Fprintf(os.Stderr, "claude_pipe: capture failed: %v\n", err)
@@ -153,6 +172,49 @@ Flags:
} }
} }
// streamAnswer drives claude through a PTY and emits the assistant's answer
// incrementally as NDJSON, by re-parsing the TUI on every snapshot and emitting
// the prefix-delta of the parsed answer. Ends with a final result event.
//
// This is heuristic: the TUI re-renders the whole frame, so the parsed answer is
// not guaranteed monotonic (reflow can rewrite earlier text). We only emit a
// text_delta when the new answer cleanly extends the previous one (HasPrefix);
// non-monotonic frames are absorbed and reconciled by the final result, whose
// Result field carries the full answer regardless.
func streamAnswer(ctx context.Context, bin string, inputs []string, warmup, stepDelay, snapInt, idle, maxDur time.Duration) {
ch, err := infra.PTYCaptureStream(ctx, bin, nil, warmup, inputs, stepDelay, snapInt, idle, maxDur)
if err != nil {
fmt.Fprintf(os.Stderr, "claude_pipe: stream capture failed: %v\n", err)
os.Exit(1)
}
enc := json.NewEncoder(os.Stdout) // os.Stdout is unbuffered: each Encode writes through immediately
prev := ""
final := ""
for snap := range ch {
screen := tui.VTRender(snap, ptyRows, ptyCols)
ans := tui.ParseClaudeTUI(screen).Answer
if strings.HasPrefix(ans, prev) {
if delta := core.PrefixDelta(prev, ans); delta != "" {
_ = enc.Encode(streamEvent{Type: "text_delta", Text: delta})
}
prev = ans
}
// Keep the longest answer seen as the final, even if a later frame shrank
// (transient reflow / parse noise).
if len(ans) >= len(final) {
final = ans
}
}
_ = enc.Encode(streamEvent{
Type: "result",
Subtype: "success",
IsError: final == "",
Result: final,
})
}
// stdinIsPiped reports whether stdin is connected to a pipe/file rather than a terminal. // stdinIsPiped reports whether stdin is connected to a pipe/file rather than a terminal.
func stdinIsPiped() bool { func stdinIsPiped() bool {
info, err := os.Stdin.Stat() info, err := os.Stdin.Stat()