feat: streaming del subproceso claude-code con --output-format stream-json
Implementa la Fase 1 del issue 0036: soporte de streaming en tiempo real para el provider claude-code. - Tipos puros de streaming en pkg/llm/types.go: StreamEventKind, StreamEvent, StreamFunc (pure core, sin side effects) - Refactor de shell/llm/claudecode.go: nuevo code path executeStreaming que usa cmd.StdoutPipe + bufio.Scanner para leer linea a linea - Parser parseStreamLine que mapea eventos JSON del CLI (system, assistant, result) a StreamEvent del dominio - buildClaudeArgs ahora selecciona --output-format stream-json cuando streaming esta habilitado y StreamFunc presente - Campos Streaming y ShowToolProgress en ClaudeCodeCfg (config schema) - Backward compatible: streaming=false (default) no cambia comportamiento - 40 tests (20 existentes + 20 nuevos) pasan sin errores Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
+316
-22
@@ -1,6 +1,7 @@
|
||||
package llm
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
@@ -74,6 +75,7 @@ func NewClaudeCodeComplete(cfg config.ClaudeCodeCfg, log *slog.Logger) coretypes
|
||||
"args", strings.Join(args, " "),
|
||||
"prompt_len", len(prompt),
|
||||
"working_dir", workDir,
|
||||
"streaming", cfg.Streaming,
|
||||
)
|
||||
|
||||
cmd := exec.CommandContext(ctx, binary, args...)
|
||||
@@ -99,31 +101,313 @@ func NewClaudeCodeComplete(cfg config.ClaudeCodeCfg, log *slog.Logger) coretypes
|
||||
return nil
|
||||
}
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
start := time.Now()
|
||||
err := cmd.Run()
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Ensure the process group is fully dead after Run returns,
|
||||
// even if cmd.Run() returned without triggering Cancel (normal exit).
|
||||
if cmd.Process != nil {
|
||||
_ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
||||
// Choose between streaming and buffered mode
|
||||
if cfg.Streaming && req.StreamFunc != nil {
|
||||
return executeStreaming(ctx, cmd, req.StreamFunc, log)
|
||||
}
|
||||
|
||||
log.Debug("claude_code_done",
|
||||
"elapsed_ms", elapsed.Milliseconds(),
|
||||
"stdout_len", stdout.Len(),
|
||||
"stderr_len", stderr.Len(),
|
||||
"exit_err", err,
|
||||
)
|
||||
|
||||
return parseClaudeOutput(stdout.Bytes(), stderr.Bytes(), err, elapsed, log)
|
||||
return executeBuffered(ctx, cmd, log)
|
||||
}
|
||||
}
|
||||
|
||||
// executeBuffered runs the claude subprocess and collects all output at once.
|
||||
// This is the original (non-streaming) code path.
|
||||
func executeBuffered(ctx context.Context, cmd *exec.Cmd, log *slog.Logger) (coretypes.CompletionResponse, error) {
|
||||
var stdout, stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
start := time.Now()
|
||||
err := cmd.Run()
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Ensure the process group is fully dead after Run returns.
|
||||
if cmd.Process != nil {
|
||||
_ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
||||
}
|
||||
|
||||
log.Debug("claude_code_done",
|
||||
"elapsed_ms", elapsed.Milliseconds(),
|
||||
"stdout_len", stdout.Len(),
|
||||
"stderr_len", stderr.Len(),
|
||||
"exit_err", err,
|
||||
)
|
||||
|
||||
return parseClaudeOutput(stdout.Bytes(), stderr.Bytes(), err, elapsed, log)
|
||||
}
|
||||
|
||||
// executeStreaming runs the claude subprocess with --output-format stream-json,
|
||||
// reads stdout line by line, emits StreamEvents via the callback, and accumulates
|
||||
// the final result.
|
||||
func executeStreaming(ctx context.Context, cmd *exec.Cmd, streamFn coretypes.StreamFunc, log *slog.Logger) (coretypes.CompletionResponse, error) {
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return coretypes.CompletionResponse{}, fmt.Errorf("claude-code: stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
start := time.Now()
|
||||
if err := cmd.Start(); err != nil {
|
||||
return coretypes.CompletionResponse{}, fmt.Errorf("claude-code: start: %w", err)
|
||||
}
|
||||
|
||||
// Scan stdout line by line, parsing each JSON event
|
||||
var lastResult *claudeJSONOutput
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) // allow up to 1MB lines
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
evt, parsed, parseErr := parseStreamLine(line)
|
||||
if parseErr != nil {
|
||||
log.Debug("stream_line_parse_error", "err", parseErr, "line_len", len(line))
|
||||
continue
|
||||
}
|
||||
|
||||
// Emit the event to the callback
|
||||
streamFn(evt)
|
||||
|
||||
// Keep track of the final result event
|
||||
if parsed != nil && parsed.Type == "result" {
|
||||
lastResult = parsed
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the process to finish
|
||||
waitErr := cmd.Wait()
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Ensure the process group is fully dead after Run returns.
|
||||
if cmd.Process != nil {
|
||||
_ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
||||
}
|
||||
|
||||
if scanErr := scanner.Err(); scanErr != nil {
|
||||
log.Warn("stream_scanner_error", "err", scanErr)
|
||||
}
|
||||
|
||||
log.Debug("claude_code_stream_done",
|
||||
"elapsed_ms", elapsed.Milliseconds(),
|
||||
"stderr_len", stderr.Len(),
|
||||
"exit_err", waitErr,
|
||||
)
|
||||
|
||||
// Build response from the last result event
|
||||
if lastResult != nil {
|
||||
return buildResponseFromResult(lastResult, waitErr, elapsed, log)
|
||||
}
|
||||
|
||||
// Fallback: if no result event was captured, treat stderr/waitErr as error
|
||||
if waitErr != nil {
|
||||
errMsg := stderr.String()
|
||||
if errMsg == "" {
|
||||
errMsg = waitErr.Error()
|
||||
}
|
||||
return coretypes.CompletionResponse{}, fmt.Errorf("claude-code stream process failed: %s", errMsg)
|
||||
}
|
||||
|
||||
return coretypes.CompletionResponse{
|
||||
Content: "",
|
||||
FinishReason: "stop",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildResponseFromResult converts a parsed result event into a CompletionResponse.
|
||||
func buildResponseFromResult(output *claudeJSONOutput, execErr error, elapsed time.Duration, log *slog.Logger) (coretypes.CompletionResponse, error) {
|
||||
if output.IsError {
|
||||
return coretypes.CompletionResponse{}, fmt.Errorf("claude-code error: %s", output.Result)
|
||||
}
|
||||
|
||||
content := output.Result
|
||||
if content == "" && len(output.ContentBlock) > 0 {
|
||||
var parts []string
|
||||
for _, block := range output.ContentBlock {
|
||||
if block.Type == "text" && block.Text != "" {
|
||||
parts = append(parts, block.Text)
|
||||
}
|
||||
}
|
||||
content = strings.Join(parts, "\n")
|
||||
}
|
||||
|
||||
finishReason := "stop"
|
||||
if execErr != nil {
|
||||
finishReason = "error"
|
||||
}
|
||||
|
||||
log.Info("claude_code_response",
|
||||
"content_len", len(content),
|
||||
"input_tokens", output.Usage.InputTokens,
|
||||
"output_tokens", output.Usage.OutputTokens,
|
||||
"num_turns", output.NumTurns,
|
||||
"cost_usd", output.TotalCost,
|
||||
"elapsed_ms", elapsed.Milliseconds(),
|
||||
)
|
||||
|
||||
return coretypes.CompletionResponse{
|
||||
Content: content,
|
||||
Usage: coretypes.TokenUsage{
|
||||
InputTokens: output.Usage.InputTokens,
|
||||
OutputTokens: output.Usage.OutputTokens,
|
||||
TotalTokens: output.Usage.InputTokens + output.Usage.OutputTokens,
|
||||
},
|
||||
FinishReason: finishReason,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ── Stream event parsing ────────────────────────────────────────────────
|
||||
|
||||
// claudeStreamEvent is the raw JSON shape from `claude -p --output-format stream-json`.
|
||||
// Each line of stdout is one JSON object with at least a "type" field.
|
||||
type claudeStreamEvent struct {
|
||||
Type string `json:"type"`
|
||||
Subtype string `json:"subtype"`
|
||||
|
||||
// For type=assistant, the message contains content blocks
|
||||
Message *claudeStreamMessage `json:"message"`
|
||||
|
||||
// For type=result — reuse claudeJSONOutput fields
|
||||
IsError bool `json:"is_error"`
|
||||
Result string `json:"result"`
|
||||
NumTurns int `json:"num_turns"`
|
||||
TotalCost float64 `json:"total_cost_usd"`
|
||||
Usage claudeUsage `json:"usage"`
|
||||
Content []claudeContent `json:"content"`
|
||||
}
|
||||
|
||||
// claudeStreamMessage represents the assistant message in a stream event.
|
||||
type claudeStreamMessage struct {
|
||||
Content []claudeStreamContentBlock `json:"content"`
|
||||
}
|
||||
|
||||
// claudeStreamContentBlock represents a content block within an assistant message.
|
||||
type claudeStreamContentBlock struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
Name string `json:"name"` // tool_use: tool name
|
||||
ID string `json:"id"` // tool_use: call ID
|
||||
Input any `json:"input"` // tool_use: tool input (object or string)
|
||||
}
|
||||
|
||||
// parseStreamLine parses a single JSON line from the stream-json output.
|
||||
// Returns the StreamEvent, optionally the raw parsed result (if type=result),
|
||||
// and any parse error.
|
||||
func parseStreamLine(line []byte) (coretypes.StreamEvent, *claudeJSONOutput, error) {
|
||||
var raw claudeStreamEvent
|
||||
if err := json.Unmarshal(line, &raw); err != nil {
|
||||
return coretypes.StreamEvent{}, nil, fmt.Errorf("parse stream line: %w", err)
|
||||
}
|
||||
|
||||
switch raw.Type {
|
||||
case "system":
|
||||
// Init event — emit as init
|
||||
return coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamInit,
|
||||
}, nil, nil
|
||||
|
||||
case "assistant":
|
||||
// Assistant message with content blocks — extract tool_use and text events
|
||||
if raw.Message != nil && len(raw.Message.Content) > 0 {
|
||||
// Look for the most interesting content block
|
||||
for _, block := range raw.Message.Content {
|
||||
switch block.Type {
|
||||
case "tool_use":
|
||||
inputStr := truncateToolInput(block.Input)
|
||||
return coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamToolUse,
|
||||
ToolName: block.Name,
|
||||
ToolInput: inputStr,
|
||||
}, nil, nil
|
||||
case "tool_result":
|
||||
return coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamToolResult,
|
||||
}, nil, nil
|
||||
case "text":
|
||||
return coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamText,
|
||||
Content: block.Text,
|
||||
}, nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
// Assistant message without interesting content blocks
|
||||
return coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamText,
|
||||
}, nil, nil
|
||||
|
||||
case "result":
|
||||
// Final result event
|
||||
result := &claudeJSONOutput{
|
||||
Type: raw.Type,
|
||||
Subtype: raw.Subtype,
|
||||
IsError: raw.IsError,
|
||||
Result: raw.Result,
|
||||
NumTurns: raw.NumTurns,
|
||||
TotalCost: raw.TotalCost,
|
||||
Usage: raw.Usage,
|
||||
}
|
||||
evt := coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamResult,
|
||||
Content: raw.Result,
|
||||
IsError: raw.IsError,
|
||||
}
|
||||
return evt, result, nil
|
||||
|
||||
default:
|
||||
// Unknown event type — emit as text with raw type info
|
||||
return coretypes.StreamEvent{
|
||||
Kind: coretypes.StreamText,
|
||||
Content: raw.Type,
|
||||
}, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// truncateToolInput converts tool input to a short description string.
|
||||
func truncateToolInput(input any) string {
|
||||
if input == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
switch v := input.(type) {
|
||||
case string:
|
||||
return truncateStr(v, 100)
|
||||
case map[string]any:
|
||||
// For tool inputs like {"command": "ls -la"}, extract the most useful field
|
||||
if cmd, ok := v["command"]; ok {
|
||||
return truncateStr(fmt.Sprintf("%v", cmd), 100)
|
||||
}
|
||||
if file, ok := v["file_path"]; ok {
|
||||
return truncateStr(fmt.Sprintf("%v", file), 100)
|
||||
}
|
||||
// Fallback: serialize the whole thing
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return truncateStr(string(b), 100)
|
||||
default:
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return truncateStr(string(b), 100)
|
||||
}
|
||||
}
|
||||
|
||||
// truncateStr shortens a string to maxLen, appending "..." if truncated.
|
||||
func truncateStr(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen-3] + "..."
|
||||
}
|
||||
|
||||
// ── Shared helpers ──────────────────────────────────────────────────────
|
||||
|
||||
// resolveWorkDir determines the working directory for the claude subprocess.
|
||||
// If configured is empty, it creates a temporary directory to avoid inheriting the launcher's CWD.
|
||||
// If configured is non-empty, it ensures the directory exists.
|
||||
@@ -149,7 +433,17 @@ func resolveWorkDir(configured string, log *slog.Logger) string {
|
||||
|
||||
// buildClaudeArgs constructs the CLI arguments for claude -p.
|
||||
func buildClaudeArgs(cfg config.ClaudeCodeCfg, req coretypes.CompletionRequest) []string {
|
||||
args := []string{"--print", "--output-format", "json"}
|
||||
outputFormat := "json"
|
||||
if cfg.Streaming && req.StreamFunc != nil {
|
||||
outputFormat = "stream-json"
|
||||
}
|
||||
|
||||
args := []string{"--print", "--output-format", outputFormat}
|
||||
|
||||
// stream-json requires --verbose
|
||||
if outputFormat == "stream-json" {
|
||||
args = append(args, "--verbose")
|
||||
}
|
||||
|
||||
if req.SystemPrompt != "" {
|
||||
args = append(args, "--system-prompt", req.SystemPrompt)
|
||||
|
||||
Reference in New Issue
Block a user