package llm import ( "bufio" "bytes" "context" "encoding/json" "fmt" "log/slog" "os" "os/exec" "strings" "syscall" "time" "github.com/enmanuel/agents/internal/config" coretypes "github.com/enmanuel/agents/pkg/llm" ) const ( defaultClaudeBinary = "claude" defaultClaudeTimeout = 5 * time.Minute ) // claudeJSONOutput represents the JSON output from `claude -p --output-format json`. type claudeJSONOutput struct { Type string `json:"type"` Subtype string `json:"subtype"` CostUSD float64 `json:"cost_usd"` IsError bool `json:"is_error"` Duration float64 `json:"duration_api_ms"` NumTurns int `json:"num_turns"` Result string `json:"result"` SessionID string `json:"session_id"` TotalCost float64 `json:"total_cost_usd"` Usage claudeUsage `json:"usage"` ContentBlock []claudeContent `json:"content"` } type claudeUsage struct { InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` } type claudeContent struct { Type string `json:"type"` Text string `json:"text"` } // NewClaudeCodeComplete creates a CompleteFunc that executes `claude -p` as a subprocess. func NewClaudeCodeComplete(cfg config.ClaudeCodeCfg, log *slog.Logger) coretypes.CompleteFunc { binary := cfg.Binary if binary == "" { binary = defaultClaudeBinary } timeout := cfg.Timeout if timeout <= 0 { timeout = defaultClaudeTimeout } // Resolve working directory once at init time. workDir := resolveWorkDir(cfg.WorkingDir, log) return func(ctx context.Context, req coretypes.CompletionRequest) (coretypes.CompletionResponse, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() args := buildClaudeArgs(cfg, req) prompt := flattenMessages(req.Messages) log.Debug("claude_code_exec", "binary", binary, "args", strings.Join(args, " "), "prompt_len", len(prompt), "working_dir", workDir, "streaming", cfg.Streaming, ) cmd := exec.CommandContext(ctx, binary, args...) if workDir != "" { cmd.Dir = workDir } // Build clean env: inherit parent but remove ANTHROPIC_API_KEY // so claude uses its own OAuth auth instead of a potentially invalid key. cmd.Env = filterEnv(os.Environ(), "ANTHROPIC_API_KEY") cmd.Stdin = strings.NewReader(prompt) // Create a new process group so we can kill claude + all its children. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // Override the default cancel behavior: kill the entire process group // instead of just the main process, preventing orphaned child processes. cmd.Cancel = func() error { if cmd.Process != nil { pgid := cmd.Process.Pid log.Info("killing claude-code process group", "pgid", pgid) // Negative PID = kill entire process group return syscall.Kill(-pgid, syscall.SIGKILL) } return nil } // Choose between streaming and buffered mode if cfg.Streaming && req.StreamFunc != nil { return executeStreaming(ctx, cmd, req.StreamFunc, log) } return executeBuffered(ctx, cmd, log) } } // executeBuffered runs the claude subprocess and collects all output at once. // This is the original (non-streaming) code path. func executeBuffered(ctx context.Context, cmd *exec.Cmd, log *slog.Logger) (coretypes.CompletionResponse, error) { var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr start := time.Now() err := cmd.Run() elapsed := time.Since(start) // Ensure the process group is fully dead after Run returns. if cmd.Process != nil { _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) } log.Debug("claude_code_done", "elapsed_ms", elapsed.Milliseconds(), "stdout_len", stdout.Len(), "stderr_len", stderr.Len(), "exit_err", err, ) return parseClaudeOutput(stdout.Bytes(), stderr.Bytes(), err, elapsed, log) } // executeStreaming runs the claude subprocess with --output-format stream-json, // reads stdout line by line, emits StreamEvents via the callback, and accumulates // the final result. func executeStreaming(ctx context.Context, cmd *exec.Cmd, streamFn coretypes.StreamFunc, log *slog.Logger) (coretypes.CompletionResponse, error) { stdout, err := cmd.StdoutPipe() if err != nil { return coretypes.CompletionResponse{}, fmt.Errorf("claude-code: stdout pipe: %w", err) } var stderr bytes.Buffer cmd.Stderr = &stderr start := time.Now() if err := cmd.Start(); err != nil { return coretypes.CompletionResponse{}, fmt.Errorf("claude-code: start: %w", err) } // Scan stdout line by line, parsing each JSON event var lastResult *claudeJSONOutput scanner := bufio.NewScanner(stdout) scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) // allow up to 1MB lines for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } evt, parsed, parseErr := parseStreamLine(line) if parseErr != nil { log.Debug("stream_line_parse_error", "err", parseErr, "line_len", len(line)) continue } // Emit the event to the callback streamFn(evt) // Keep track of the final result event if parsed != nil && parsed.Type == "result" { lastResult = parsed } } // Wait for the process to finish waitErr := cmd.Wait() elapsed := time.Since(start) // Ensure the process group is fully dead after Run returns. if cmd.Process != nil { _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) } if scanErr := scanner.Err(); scanErr != nil { log.Warn("stream_scanner_error", "err", scanErr) } log.Debug("claude_code_stream_done", "elapsed_ms", elapsed.Milliseconds(), "stderr_len", stderr.Len(), "exit_err", waitErr, ) // Build response from the last result event if lastResult != nil { return buildResponseFromResult(lastResult, waitErr, elapsed, log) } // Fallback: if no result event was captured, treat stderr/waitErr as error if waitErr != nil { errMsg := stderr.String() if errMsg == "" { errMsg = waitErr.Error() } return coretypes.CompletionResponse{}, fmt.Errorf("claude-code stream process failed: %s", errMsg) } return coretypes.CompletionResponse{ Content: "", FinishReason: "stop", }, nil } // buildResponseFromResult converts a parsed result event into a CompletionResponse. func buildResponseFromResult(output *claudeJSONOutput, execErr error, elapsed time.Duration, log *slog.Logger) (coretypes.CompletionResponse, error) { if output.IsError { return coretypes.CompletionResponse{}, fmt.Errorf("claude-code error: %s", output.Result) } content := output.Result if content == "" && len(output.ContentBlock) > 0 { var parts []string for _, block := range output.ContentBlock { if block.Type == "text" && block.Text != "" { parts = append(parts, block.Text) } } content = strings.Join(parts, "\n") } finishReason := "stop" if execErr != nil { finishReason = "error" } log.Info("claude_code_response", "content_len", len(content), "input_tokens", output.Usage.InputTokens, "output_tokens", output.Usage.OutputTokens, "num_turns", output.NumTurns, "cost_usd", output.TotalCost, "elapsed_ms", elapsed.Milliseconds(), ) return coretypes.CompletionResponse{ Content: content, Usage: coretypes.TokenUsage{ InputTokens: output.Usage.InputTokens, OutputTokens: output.Usage.OutputTokens, TotalTokens: output.Usage.InputTokens + output.Usage.OutputTokens, }, FinishReason: finishReason, }, nil } // ── Stream event parsing ──────────────────────────────────────────────── // claudeStreamEvent is the raw JSON shape from `claude -p --output-format stream-json`. // Each line of stdout is one JSON object with at least a "type" field. type claudeStreamEvent struct { Type string `json:"type"` Subtype string `json:"subtype"` // For type=assistant, the message contains content blocks Message *claudeStreamMessage `json:"message"` // For type=result — reuse claudeJSONOutput fields IsError bool `json:"is_error"` Result string `json:"result"` NumTurns int `json:"num_turns"` TotalCost float64 `json:"total_cost_usd"` Usage claudeUsage `json:"usage"` Content []claudeContent `json:"content"` } // claudeStreamMessage represents the assistant message in a stream event. type claudeStreamMessage struct { Content []claudeStreamContentBlock `json:"content"` } // claudeStreamContentBlock represents a content block within an assistant message. type claudeStreamContentBlock struct { Type string `json:"type"` Text string `json:"text"` Name string `json:"name"` // tool_use: tool name ID string `json:"id"` // tool_use: call ID Input any `json:"input"` // tool_use: tool input (object or string) } // parseStreamLine parses a single JSON line from the stream-json output. // Returns the StreamEvent, optionally the raw parsed result (if type=result), // and any parse error. func parseStreamLine(line []byte) (coretypes.StreamEvent, *claudeJSONOutput, error) { var raw claudeStreamEvent if err := json.Unmarshal(line, &raw); err != nil { return coretypes.StreamEvent{}, nil, fmt.Errorf("parse stream line: %w", err) } switch raw.Type { case "system": // Init event — emit as init return coretypes.StreamEvent{ Kind: coretypes.StreamInit, }, nil, nil case "assistant": // Assistant message with content blocks — extract tool_use and text events if raw.Message != nil && len(raw.Message.Content) > 0 { // Look for the most interesting content block for _, block := range raw.Message.Content { switch block.Type { case "tool_use": inputStr := truncateToolInput(block.Input) return coretypes.StreamEvent{ Kind: coretypes.StreamToolUse, ToolName: block.Name, ToolInput: inputStr, }, nil, nil case "tool_result": return coretypes.StreamEvent{ Kind: coretypes.StreamToolResult, }, nil, nil case "text": return coretypes.StreamEvent{ Kind: coretypes.StreamText, Content: block.Text, }, nil, nil } } } // Assistant message without interesting content blocks return coretypes.StreamEvent{ Kind: coretypes.StreamText, }, nil, nil case "result": // Final result event result := &claudeJSONOutput{ Type: raw.Type, Subtype: raw.Subtype, IsError: raw.IsError, Result: raw.Result, NumTurns: raw.NumTurns, TotalCost: raw.TotalCost, Usage: raw.Usage, } evt := coretypes.StreamEvent{ Kind: coretypes.StreamResult, Content: raw.Result, IsError: raw.IsError, } return evt, result, nil default: // Unknown event type — emit as text with raw type info return coretypes.StreamEvent{ Kind: coretypes.StreamText, Content: raw.Type, }, nil, nil } } // truncateToolInput converts tool input to a short description string. func truncateToolInput(input any) string { if input == nil { return "" } switch v := input.(type) { case string: return truncateStr(v, 100) case map[string]any: // For tool inputs like {"command": "ls -la"}, extract the most useful field if cmd, ok := v["command"]; ok { return truncateStr(fmt.Sprintf("%v", cmd), 100) } if file, ok := v["file_path"]; ok { return truncateStr(fmt.Sprintf("%v", file), 100) } // Fallback: serialize the whole thing b, err := json.Marshal(v) if err != nil { return "" } return truncateStr(string(b), 100) default: b, err := json.Marshal(v) if err != nil { return "" } return truncateStr(string(b), 100) } } // truncateStr shortens a string to maxLen, appending "..." if truncated. func truncateStr(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen-3] + "..." } // ── Shared helpers ────────────────────────────────────────────────────── // resolveWorkDir determines the working directory for the claude subprocess. // If configured is empty, it creates a temporary directory to avoid inheriting the launcher's CWD. // If configured is non-empty, it ensures the directory exists. func resolveWorkDir(configured string, log *slog.Logger) string { if configured == "" { tmp, err := os.MkdirTemp("", "claude-agent-*") if err != nil { log.Error("claude-code: failed to create temp working dir", "err", err) return "" // Fall through — cmd.Dir will remain empty (inherits CWD). } log.Warn("claude-code working_dir is empty, using temporary directory", "dir", tmp, ) return tmp } // Ensure configured directory exists. if err := os.MkdirAll(configured, 0o755); err != nil { log.Error("claude-code: failed to create working dir", "dir", configured, "err", err) } return configured } // buildClaudeArgs constructs the CLI arguments for claude -p. func buildClaudeArgs(cfg config.ClaudeCodeCfg, req coretypes.CompletionRequest) []string { outputFormat := "json" if cfg.Streaming && req.StreamFunc != nil { outputFormat = "stream-json" } args := []string{"--print", "--output-format", outputFormat} // stream-json requires --verbose if outputFormat == "stream-json" { args = append(args, "--verbose") } if req.SystemPrompt != "" { args = append(args, "--system-prompt", req.SystemPrompt) } // Issue 0145: --mcp-config tells claude where to find external MCP // servers (per-agent devicemesh bridge). Must come BEFORE --allowedTools // because the allowed list usually references `mcp____` // names that only exist once the MCP config is loaded. if cfg.MCPConfigPath != "" { args = append(args, "--mcp-config", cfg.MCPConfigPath) } // Defensive: DisableTools=true plus a non-empty AllowedTools is a // contradiction. The launcher's ApplyMCPBridge already forces // DisableTools=false in that case, but this guard keeps direct callers // safe too. effectiveDisableTools := cfg.DisableTools && len(cfg.AllowedTools) == 0 if effectiveDisableTools { args = append(args, "--tools", "") } else { if len(cfg.AllowedTools) > 0 { args = append(args, "--allowedTools") args = append(args, cfg.AllowedTools...) } if len(cfg.DisallowedTools) > 0 { args = append(args, "--disallowedTools") args = append(args, cfg.DisallowedTools...) } } if cfg.PermissionMode != "" { args = append(args, "--permission-mode", cfg.PermissionMode) } if cfg.Model != "" { args = append(args, "--model", cfg.Model) } if cfg.FallbackModel != "" { args = append(args, "--fallback-model", cfg.FallbackModel) } if cfg.SessionID != "" { args = append(args, "--session-id", cfg.SessionID) } for _, dir := range cfg.AddDirs { args = append(args, "--add-dir", dir) } return args } // flattenMessages converts a conversation history into a single text prompt for stdin. func flattenMessages(msgs []coretypes.Message) string { var b strings.Builder for _, m := range msgs { switch m.Role { case coretypes.RoleUser: fmt.Fprintf(&b, "User: %s\n\n", m.Content) case coretypes.RoleAssistant: fmt.Fprintf(&b, "Assistant: %s\n\n", m.Content) case coretypes.RoleTool: fmt.Fprintf(&b, "Tool result: %s\n\n", m.Content) } } return b.String() } // parseClaudeOutput parses the JSON output from `claude -p --output-format json`. func parseClaudeOutput( stdout, stderr []byte, execErr error, elapsed time.Duration, log *slog.Logger, ) (coretypes.CompletionResponse, error) { // If the process failed and there's no stdout, report the error if execErr != nil && len(stdout) == 0 { errMsg := string(stderr) if errMsg == "" { errMsg = execErr.Error() } return coretypes.CompletionResponse{}, fmt.Errorf("claude-code process failed: %s", errMsg) } // Parse JSON output var output claudeJSONOutput if err := json.Unmarshal(stdout, &output); err != nil { // Fall back to treating stdout as plain text log.Warn("claude_code_json_parse_failed", "err", err, "stdout_len", len(stdout)) return coretypes.CompletionResponse{ Content: strings.TrimSpace(string(stdout)), FinishReason: "stop", }, nil } if output.IsError { return coretypes.CompletionResponse{}, fmt.Errorf("claude-code error: %s", output.Result) } // Extract text from result field or content blocks content := output.Result if content == "" && len(output.ContentBlock) > 0 { var parts []string for _, block := range output.ContentBlock { if block.Type == "text" && block.Text != "" { parts = append(parts, block.Text) } } content = strings.Join(parts, "\n") } finishReason := "stop" if execErr != nil { finishReason = "error" } log.Info("claude_code_response", "content_len", len(content), "input_tokens", output.Usage.InputTokens, "output_tokens", output.Usage.OutputTokens, "num_turns", output.NumTurns, "cost_usd", output.TotalCost, "elapsed_ms", elapsed.Milliseconds(), ) return coretypes.CompletionResponse{ Content: content, Usage: coretypes.TokenUsage{ InputTokens: output.Usage.InputTokens, OutputTokens: output.Usage.OutputTokens, TotalTokens: output.Usage.InputTokens + output.Usage.OutputTokens, }, FinishReason: finishReason, }, nil } // filterEnv returns a copy of environ with the named keys removed. func filterEnv(environ []string, keys ...string) []string { out := make([]string, 0, len(environ)) for _, e := range environ { skip := false for _, k := range keys { if strings.HasPrefix(e, k+"=") { skip = true break } } if !skip { out = append(out, e) } } return out }