From 4881eeb7deff4c5350c0af57b5fda0bdbe60bacc Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sat, 9 May 2026 14:54:56 +0200 Subject: [PATCH] feat(registry): claude_stream + mcp_server_stdio para chat con tool-use - claude_stream_go_core: lanza claude -p --output-format stream-json --verbose, decodifica NDJSON y emite eventos sinteticos (text_delta, tool_use, tool_result, result, error) por canal Go. 10 tests con fake claude bash. - mcp_server_stdio_go_infra: scaffold de MCP server JSON-RPC 2.0 sobre stdio (initialize, tools/list, tools/call, ping). Usuario registra tool defs y handler unico. 9 tests. Usadas por apps/kanban backend para reemplazar el chat HTTP one-shot con XML actions por WebSocket streaming + tool-use nativa. Co-Authored-By: Claude Opus 4.7 (1M context) --- functions/core/claude_stream.go | 334 +++++++++++++++++++++ functions/core/claude_stream.md | 103 +++++++ functions/core/claude_stream_test.go | 354 +++++++++++++++++++++++ functions/infra/mcp_server_stdio.go | 284 ++++++++++++++++++ functions/infra/mcp_server_stdio.md | 133 +++++++++ functions/infra/mcp_server_stdio_test.go | 307 ++++++++++++++++++++ 6 files changed, 1515 insertions(+) create mode 100644 functions/core/claude_stream.go create mode 100644 functions/core/claude_stream.md create mode 100644 functions/core/claude_stream_test.go create mode 100644 functions/infra/mcp_server_stdio.go create mode 100644 functions/infra/mcp_server_stdio.md create mode 100644 functions/infra/mcp_server_stdio_test.go diff --git a/functions/core/claude_stream.go b/functions/core/claude_stream.go new file mode 100644 index 00000000..28bbed85 --- /dev/null +++ b/functions/core/claude_stream.go @@ -0,0 +1,334 @@ +package core + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "strings" +) + +// ClaudeEventType es el tipo discriminador de eventos del stream-json de claude -p. +type ClaudeEventType string + +const ( + ClaudeEventSystem ClaudeEventType = "system" + ClaudeEventAssistant ClaudeEventType = "assistant" + ClaudeEventUser ClaudeEventType = "user" // tool_result + ClaudeEventResult ClaudeEventType = "result" // final + ClaudeEventToolUse ClaudeEventType = "tool_use" // sintetico + ClaudeEventToolResult ClaudeEventType = "tool_result" // sintetico + ClaudeEventTextDelta ClaudeEventType = "text_delta" // sintetico (porcion legible) + ClaudeEventError ClaudeEventType = "error" // sintetico +) + +// ClaudeEvent es un evento decodificado del stream. Raw siempre contiene la +// linea NDJSON original para casos no contemplados. Para los tipos comunes, +// los campos especificos vienen rellenos. +type ClaudeEvent struct { + Type ClaudeEventType `json:"type"` + Raw json.RawMessage `json:"raw,omitempty"` + + // Para system/init + Subtype string `json:"subtype,omitempty"` + SessionID string `json:"session_id,omitempty"` + Model string `json:"model,omitempty"` + + // Para text_delta (sintetico): porcion textual del mensaje del asistente + Text string `json:"text,omitempty"` + + // Para tool_use (sintetico) + ToolUseID string `json:"tool_use_id,omitempty"` + ToolName string `json:"tool_name,omitempty"` + ToolInput json.RawMessage `json:"tool_input,omitempty"` + + // Para tool_result (sintetico) + ToolResultID string `json:"tool_result_id,omitempty"` + ToolResultContent string `json:"tool_result_content,omitempty"` + ToolResultIsError bool `json:"tool_result_is_error,omitempty"` + + // Para result (final) + StopReason string `json:"stop_reason,omitempty"` + IsError bool `json:"is_error,omitempty"` + Result string `json:"result,omitempty"` + + // Para error + Error string `json:"error,omitempty"` +} + +// ClaudeStreamOpts configura el lanzamiento. +type ClaudeStreamOpts struct { + Bin string // default "claude" si vacio + Args []string // args extra (NO incluyas -p ni --output-format ni --verbose; se añaden automaticamente) + Stdin io.Reader // prompt user (puede ser nil si Args lleva el prompt en posicional) + Workdir string // CWD del subprocess + Env map[string]string // env extra (se mergea con os.Environ()) + Stderr io.Writer // si != nil, recibe stderr del subprocess en vivo +} + +// streamRawLine es la estructura minima para detectar el tipo de una linea NDJSON. +type streamRawLine struct { + Type ClaudeEventType `json:"type"` + Subtype string `json:"subtype,omitempty"` + + // system + SessionID string `json:"session_id,omitempty"` + Model string `json:"model,omitempty"` + + // result + StopReason string `json:"stop_reason,omitempty"` + IsError bool `json:"is_error,omitempty"` + Result string `json:"result,omitempty"` + + // assistant / user + Message *streamMessage `json:"message,omitempty"` +} + +type streamMessage struct { + Role string `json:"role"` + Content json.RawMessage `json:"content"` +} + +type contentBlock struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` + ToolUseID string `json:"tool_use_id,omitempty"` + Content json.RawMessage `json:"content,omitempty"` + IsError bool `json:"is_error,omitempty"` +} + +// extractToolResultContent extrae el texto de un tool_result content que puede +// ser string o array de bloques [{type:text,text:"..."}]. +func extractToolResultContent(raw json.RawMessage) string { + if len(raw) == 0 { + return "" + } + // Intentar como string + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return s + } + // Intentar como array de bloques + var blocks []contentBlock + if err := json.Unmarshal(raw, &blocks); err != nil { + return string(raw) + } + var sb strings.Builder + for _, b := range blocks { + if b.Type == "text" { + sb.WriteString(b.Text) + } + } + return sb.String() +} + +// StreamClaude lanza `claude -p --output-format stream-json --verbose ` +// y retorna un canal de eventos. El canal se cierra cuando termina el subprocess +// (EOF en stdout). El cancel del ctx mata al subprocess (SIGTERM, luego SIGKILL). +// +// La goroutine interna se encarga de: +// - Leer stdout linea a linea (NDJSON, buffer 4MB). +// - Decodificar cada linea a un evento del protocolo claude. +// - Para mensajes "assistant" expandir el array message.content emitiendo +// ClaudeEventTextDelta por cada bloque text y ClaudeEventToolUse por cada +// bloque tool_use. +// - Para mensajes "user" detectar tool_result y emitir ClaudeEventToolResult. +// - Si stdout emite linea no-JSON, emite ClaudeEventError con el contenido. +// - Capturar el exit code del subprocess; si != 0 emite ClaudeEventError final. +// +// Retorna error si el spawn falla. Si retorna chan != nil, el caller DEBE leerlo +// hasta que se cierre o cancelar el ctx. +func StreamClaude(ctx context.Context, opts ClaudeStreamOpts) (<-chan ClaudeEvent, error) { + bin := opts.Bin + if bin == "" { + var err error + bin, err = exec.LookPath("claude") + if err != nil { + return nil, fmt.Errorf("claude binary not found: %w", err) + } + } + + args := append([]string{"-p", "--output-format", "stream-json", "--verbose"}, opts.Args...) + cmd := exec.CommandContext(ctx, bin, args...) + + if opts.Stdin != nil { + cmd.Stdin = opts.Stdin + } + + if opts.Workdir != "" { + cmd.Dir = opts.Workdir + } + + // Merge env + if len(opts.Env) > 0 { + env := os.Environ() + for k, v := range opts.Env { + env = append(env, k+"="+v) + } + cmd.Env = env + } + + // Stderr + stderrWriter := io.Discard + if opts.Stderr != nil { + stderrWriter = opts.Stderr + } + + // Capturar stderr para reportar en error final + var stderrBuf strings.Builder + cmd.Stderr = io.MultiWriter(stderrWriter, &stderrBuf) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("stdout pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("start claude: %w", err) + } + + ch := make(chan ClaudeEvent, 64) + + go func() { + defer close(ch) + + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024) + + send := func(ev ClaudeEvent) { + select { + case ch <- ev: + case <-ctx.Done(): + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + var raw streamRawLine + if err := json.Unmarshal(line, &raw); err != nil { + send(ClaudeEvent{ + Type: ClaudeEventError, + Error: fmt.Sprintf("non-json line: %s", string(line)), + Raw: json.RawMessage(line), + }) + continue + } + + switch raw.Type { + case ClaudeEventSystem: + send(ClaudeEvent{ + Type: ClaudeEventSystem, + Subtype: raw.Subtype, + SessionID: raw.SessionID, + Model: raw.Model, + Raw: json.RawMessage(line), + }) + + case ClaudeEventAssistant: + if raw.Message != nil && len(raw.Message.Content) > 0 { + var blocks []contentBlock + if err := json.Unmarshal(raw.Message.Content, &blocks); err == nil { + for _, b := range blocks { + switch b.Type { + case "text": + send(ClaudeEvent{ + Type: ClaudeEventTextDelta, + Text: b.Text, + Raw: json.RawMessage(line), + }) + case "tool_use": + send(ClaudeEvent{ + Type: ClaudeEventToolUse, + ToolUseID: b.ID, + ToolName: b.Name, + ToolInput: b.Input, + Raw: json.RawMessage(line), + }) + } + } + } + } + // Emitir tambien el evento assistant crudo + send(ClaudeEvent{ + Type: ClaudeEventAssistant, + Raw: json.RawMessage(line), + }) + + case ClaudeEventUser: + if raw.Message != nil && len(raw.Message.Content) > 0 { + var blocks []contentBlock + if err := json.Unmarshal(raw.Message.Content, &blocks); err == nil { + for _, b := range blocks { + if b.Type == "tool_result" { + content := extractToolResultContent(b.Content) + send(ClaudeEvent{ + Type: ClaudeEventToolResult, + ToolResultID: b.ToolUseID, + ToolResultContent: content, + ToolResultIsError: b.IsError, + Raw: json.RawMessage(line), + }) + } + } + } + } + send(ClaudeEvent{ + Type: ClaudeEventUser, + Raw: json.RawMessage(line), + }) + + case ClaudeEventResult: + send(ClaudeEvent{ + Type: ClaudeEventResult, + Subtype: raw.Subtype, + SessionID: raw.SessionID, + StopReason: raw.StopReason, + IsError: raw.IsError, + Result: raw.Result, + Raw: json.RawMessage(line), + }) + + default: + send(ClaudeEvent{ + Type: raw.Type, + Raw: json.RawMessage(line), + }) + } + } + + // Esperar a que termine el subprocess + if err := cmd.Wait(); err != nil { + if ctx.Err() != nil { + // Cancelado por contexto — salida limpia + return + } + stderr := strings.TrimSpace(stderrBuf.String()) + errMsg := fmt.Sprintf("claude exit error: %v", err) + if stderr != "" { + // Solo las ultimas lineas para no saturar + lines := strings.Split(stderr, "\n") + tail := lines + if len(lines) > 5 { + tail = lines[len(lines)-5:] + } + errMsg = fmt.Sprintf("claude exit error: %v: %s", err, strings.Join(tail, "; ")) + } + send(ClaudeEvent{ + Type: ClaudeEventError, + Error: errMsg, + }) + } + }() + + return ch, nil +} diff --git a/functions/core/claude_stream.md b/functions/core/claude_stream.md new file mode 100644 index 00000000..1f514563 --- /dev/null +++ b/functions/core/claude_stream.md @@ -0,0 +1,103 @@ +--- +name: claude_stream +kind: function +lang: go +domain: core +version: "1.0.0" +purity: impure +signature: "func StreamClaude(ctx context.Context, opts ClaudeStreamOpts) (<-chan ClaudeEvent, error)" +description: "Lanza `claude -p --output-format stream-json --verbose` como subprocess y retorna un canal de eventos decodificados (text_delta, tool_use, tool_result, result, error). Expande automaticamente los bloques de contenido de los mensajes assistant/user en eventos sinteticos de grano fino." +tags: [claude, streaming, subprocess, agent, ndjson, tool-use] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: + - bufio + - context + - encoding/json + - fmt + - io + - os + - os/exec + - strings +tested: true +tests: + - "system event" + - "text delta" + - "multiple text blocks" + - "tool use" + - "tool result string content" + - "tool result array content" + - "result event" + - "non-zero exit" + - "non-json line" + - "ctx cancel" +test_file_path: "functions/core/claude_stream_test.go" +file_path: "functions/core/claude_stream.go" +params: + - name: ctx + desc: "Contexto de cancelacion. Al cancelar, el subprocess recibe SIGTERM/SIGKILL y el canal se cierra." + - name: opts + desc: "Opciones de lanzamiento: Bin (path al binario claude, default 'claude'), Args (args extra sin -p ni --output-format ni --verbose), Stdin (prompt como io.Reader), Workdir (CWD del subprocess), Env (env extra mergeado con os.Environ()), Stderr (destino del stderr del subprocess)." +output: "Canal de ClaudeEvent cerrado cuando el subprocess termina. Cada evento tiene Type discriminador y campos especificos segun el tipo. Raw contiene siempre la linea NDJSON original. Retorna error solo si el spawn falla." +--- + +## Tipos exportados + +**ClaudeEventType** — constantes de tipo de evento: +- `system` — evento de inicializacion con session_id y model +- `assistant` — mensaje raw del asistente (tambien genera text_delta y/o tool_use sinteticos) +- `user` — mensaje raw de usuario/tool_result (tambien genera tool_result sintetico) +- `result` — evento final con stop_reason, is_error, result +- `text_delta` — (sintetico) porcion de texto del asistente +- `tool_use` — (sintetico) llamada a herramienta con tool_use_id, tool_name, tool_input +- `tool_result` — (sintetico) resultado de herramienta con tool_result_id, content, is_error +- `error` — (sintetico) linea no-JSON o exit code != 0 + +**ClaudeStreamOpts** — configura el subprocess: +- `Bin string` — path al binario. Si vacio, usa `exec.LookPath("claude")`. +- `Args []string` — args extra. Se anteponen automaticamente `-p --output-format stream-json --verbose`. +- `Stdin io.Reader` — prompt user. Puede ser `strings.NewReader("prompt")` o nil. +- `Workdir string` — CWD del subprocess. +- `Env map[string]string` — variables extra mergeadas con `os.Environ()`. +- `Stderr io.Writer` — destino del stderr en vivo (ej. `os.Stderr` para debug). Si nil, se descarta. + +## Ejemplo + +```go +ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +defer cancel() + +ch, err := core.StreamClaude(ctx, core.ClaudeStreamOpts{ + Args: []string{"responde en una frase: que es Go"}, + Stderr: os.Stderr, +}) +if err != nil { + log.Fatal(err) +} +for ev := range ch { + switch ev.Type { + case core.ClaudeEventTextDelta: + fmt.Print(ev.Text) + case core.ClaudeEventToolUse: + fmt.Printf("\n[tool] %s(%s)\n", ev.ToolName, ev.ToolInput) + case core.ClaudeEventToolResult: + fmt.Printf("[result] %s\n", ev.ToolResultContent) + case core.ClaudeEventResult: + fmt.Printf("\n[done] stop_reason=%s\n", ev.StopReason) + case core.ClaudeEventError: + fmt.Fprintf(os.Stderr, "[error] %s\n", ev.Error) + } +} +``` + +## Notas + +- El caller DEBE consumir el canal hasta que se cierre, o cancelar el ctx. No consumir bloquea la goroutine interna. +- El canal tiene buffer de 64 para absorber ráfagas sin bloquear la lectura de stdout. +- Los eventos `assistant` y `user` raw se emiten ademas de los sinteticos, para casos no contemplados. +- `tool_result.content` puede ser string o array `[{type:text,text:"..."}]` — la funcion concatena los bloques text en ambos casos. +- Los tests usan un fake claude bash; se skipean si bash no esta disponible en el PATH. +- Equivalente Go de `projects/osint_graph/apps/graph_explorer/chat.cpp` (C++). diff --git a/functions/core/claude_stream_test.go b/functions/core/claude_stream_test.go new file mode 100644 index 00000000..43be2820 --- /dev/null +++ b/functions/core/claude_stream_test.go @@ -0,0 +1,354 @@ +package core + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" + "time" +) + +// makeFakeClaude crea un script bash temporal que escribe las lineas dadas a +// stdout y termina con el exit code indicado. +func makeFakeClaude(t *testing.T, lines []string, exitCode int) string { + t.Helper() + + // Skip si bash no esta disponible + if _, err := exec.LookPath("bash"); err != nil { + t.Skip("bash not available, skipping claude_stream tests") + } + + dir := t.TempDir() + script := filepath.Join(dir, "claude") + + var sb string + sb = "#!/usr/bin/env bash\n" + for _, l := range lines { + // Escapar comillas simples para echo + escaped := "" + for _, ch := range l { + if ch == '\'' { + escaped += "'\\''" + } else { + escaped += string(ch) + } + } + sb += fmt.Sprintf("printf '%%s\\n' '%s'\n", escaped) + } + if exitCode != 0 { + sb += fmt.Sprintf("exit %d\n", exitCode) + } + + if err := os.WriteFile(script, []byte(sb), 0o755); err != nil { + t.Fatalf("write fake claude: %v", err) + } + return script +} + +// collectEvents drena el canal con timeout. +func collectEvents(t *testing.T, ch <-chan ClaudeEvent, timeout time.Duration) []ClaudeEvent { + t.Helper() + var events []ClaudeEvent + deadline := time.After(timeout) + for { + select { + case ev, ok := <-ch: + if !ok { + return events + } + events = append(events, ev) + case <-deadline: + t.Fatal("timeout waiting for events channel to close") + } + } +} + +func TestStreamClaude_SystemEvent(t *testing.T) { + line := `{"type":"system","subtype":"init","session_id":"abc123","model":"claude-sonnet-4-5"}` + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventSystem { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no system event found; got %v", events) + } + if found.SessionID != "abc123" { + t.Errorf("session_id: got %q, want %q", found.SessionID, "abc123") + } + if found.Model != "claude-sonnet-4-5" { + t.Errorf("model: got %q, want %q", found.Model, "claude-sonnet-4-5") + } + if found.Subtype != "init" { + t.Errorf("subtype: got %q, want %q", found.Subtype, "init") + } +} + +func TestStreamClaude_TextDelta(t *testing.T) { + line := `{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"hola mundo"}]}}` + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventTextDelta { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no text_delta event found; got %v", events) + } + if found.Text != "hola mundo" { + t.Errorf("text: got %q, want %q", found.Text, "hola mundo") + } +} + +func TestStreamClaude_MultipleTextBlocks(t *testing.T) { + // Un solo mensaje assistant con dos bloques text + line := `{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"primero"},{"type":"text","text":"segundo"}]}}` + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var deltas []ClaudeEvent + for _, ev := range events { + if ev.Type == ClaudeEventTextDelta { + deltas = append(deltas, ev) + } + } + if len(deltas) != 2 { + t.Fatalf("expected 2 text_delta events, got %d: %v", len(deltas), deltas) + } + if deltas[0].Text != "primero" { + t.Errorf("delta[0].text: got %q, want %q", deltas[0].Text, "primero") + } + if deltas[1].Text != "segundo" { + t.Errorf("delta[1].text: got %q, want %q", deltas[1].Text, "segundo") + } +} + +func TestStreamClaude_ToolUse(t *testing.T) { + inputJSON := `{"column_id":"col1","title":"nueva card"}` + line := fmt.Sprintf(`{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"toolu_abc","name":"create_card","input":%s}]}}`, inputJSON) + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventToolUse { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no tool_use event found; got %v", events) + } + if found.ToolUseID != "toolu_abc" { + t.Errorf("tool_use_id: got %q, want %q", found.ToolUseID, "toolu_abc") + } + if found.ToolName != "create_card" { + t.Errorf("tool_name: got %q, want %q", found.ToolName, "create_card") + } + var input map[string]string + if err := json.Unmarshal(found.ToolInput, &input); err != nil { + t.Fatalf("unmarshal tool_input: %v", err) + } + if input["title"] != "nueva card" { + t.Errorf("tool_input.title: got %q, want %q", input["title"], "nueva card") + } +} + +func TestStreamClaude_ToolResultStringContent(t *testing.T) { + line := `{"type":"user","message":{"role":"user","content":[{"type":"tool_result","tool_use_id":"toolu_abc","content":"card creada exitosamente","is_error":false}]}}` + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventToolResult { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no tool_result event found; got %v", events) + } + if found.ToolResultID != "toolu_abc" { + t.Errorf("tool_result_id: got %q, want %q", found.ToolResultID, "toolu_abc") + } + if found.ToolResultContent != "card creada exitosamente" { + t.Errorf("content: got %q, want %q", found.ToolResultContent, "card creada exitosamente") + } + if found.ToolResultIsError { + t.Error("is_error should be false") + } +} + +func TestStreamClaude_ToolResultArrayContent(t *testing.T) { + line := `{"type":"user","message":{"role":"user","content":[{"type":"tool_result","tool_use_id":"toolu_xyz","content":[{"type":"text","text":"parte a"},{"type":"text","text":"parte b"}],"is_error":false}]}}` + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventToolResult { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no tool_result event found; got %v", events) + } + if found.ToolResultContent != "parte aparte b" { + t.Errorf("content: got %q, want %q", found.ToolResultContent, "parte aparte b") + } +} + +func TestStreamClaude_ResultEvent(t *testing.T) { + line := `{"type":"result","subtype":"success","is_error":false,"result":"respuesta final","stop_reason":"end_turn","session_id":"sess1"}` + bin := makeFakeClaude(t, []string{line}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventResult { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no result event found; got %v", events) + } + if found.StopReason != "end_turn" { + t.Errorf("stop_reason: got %q, want %q", found.StopReason, "end_turn") + } + if found.IsError { + t.Error("is_error should be false") + } + if found.Result != "respuesta final" { + t.Errorf("result: got %q, want %q", found.Result, "respuesta final") + } +} + +func TestStreamClaude_NonZeroExit(t *testing.T) { + bin := makeFakeClaude(t, nil, 7) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventError { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no error event for non-zero exit; got %v", events) + } + if found.Error == "" { + t.Error("error message should not be empty") + } +} + +func TestStreamClaude_NonJsonLine(t *testing.T) { + bin := makeFakeClaude(t, []string{"esto no es json"}, 0) + + ch, err := StreamClaude(context.Background(), ClaudeStreamOpts{Bin: bin}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + events := collectEvents(t, ch, 5*time.Second) + + var found *ClaudeEvent + for i := range events { + if events[i].Type == ClaudeEventError { + found = &events[i] + break + } + } + if found == nil { + t.Fatalf("no error event for non-json line; got %v", events) + } +} + +func TestStreamClaude_CtxCancel(t *testing.T) { + // Fake claude que duerme indefinidamente + if _, err := exec.LookPath("bash"); err != nil { + t.Skip("bash not available") + } + dir := t.TempDir() + script := filepath.Join(dir, "claude") + if err := os.WriteFile(script, []byte("#!/usr/bin/env bash\nsleep 60\n"), 0o755); err != nil { + t.Fatalf("write sleep script: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + ch, err := StreamClaude(ctx, ClaudeStreamOpts{Bin: script}) + if err != nil { + t.Fatalf("StreamClaude error: %v", err) + } + + // Cancelar enseguida + cancel() + + // El canal debe cerrarse en menos de 1 segundo + deadline := time.After(1 * time.Second) + for { + select { + case _, ok := <-ch: + if !ok { + return // canal cerrado: test OK + } + case <-deadline: + t.Fatal("channel did not close within 1s after ctx cancel") + } + } +} diff --git a/functions/infra/mcp_server_stdio.go b/functions/infra/mcp_server_stdio.go new file mode 100644 index 00000000..d50cb7d7 --- /dev/null +++ b/functions/infra/mcp_server_stdio.go @@ -0,0 +1,284 @@ +package infra + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os" + "sync" +) + +// MCPToolDef describes a tool exported by the MCP server. +// InputSchema must be a valid JSON Schema object with "type":"object" and +// "properties" describing the tool arguments. +type MCPToolDef struct { + Name string `json:"name"` + Description string `json:"description"` + InputSchema json.RawMessage `json:"inputSchema"` +} + +// MCPToolHandler executes a tool. input is the raw JSON of the arguments +// sent by the MCP client (the value of params.arguments). +// Returns result (any JSON-serializable value), isError (true when the tool +// itself reports a logical error, not a protocol error), and err (internal +// failure that results in a JSON-RPC error response with code -32603). +type MCPToolHandler func(ctx context.Context, name string, input json.RawMessage) (result any, isError bool, err error) + +// MCPServerOpts configures the MCP stdio server. +type MCPServerOpts struct { + Name string // server name reported to the client in initialize + Version string // server version reported to the client in initialize + Tools []MCPToolDef + Handler MCPToolHandler // single dispatcher for all tools + In io.Reader // defaults to os.Stdin when nil + Out io.Writer // defaults to os.Stdout when nil + Logger io.Writer // optional log sink (e.g. os.Stderr); discards when nil +} + +// jsonrpcRequest is the wire format for an incoming JSON-RPC 2.0 message. +type jsonrpcRequest struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` // number, string, or null; absent for notifications + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +// jsonrpcResponse is the wire format for an outgoing JSON-RPC 2.0 response. +type jsonrpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` + Result any `json:"result,omitempty"` + Error *jsonrpcError `json:"error,omitempty"` +} + +type jsonrpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// mcpCallParams is params.arguments unwrapped from a tools/call request. +type mcpCallParams struct { + Name string `json:"name"` + Arguments json.RawMessage `json:"arguments"` +} + +// ServeMCP runs the JSON-RPC 2.0 loop over stdio implementing the minimum MCP +// protocol surface: initialize, initialized (notification), tools/list, +// tools/call, and ping. It reads newline-delimited JSON from opts.In and writes +// newline-delimited JSON to opts.Out. +// +// ServeMCP returns nil when the client closes stdin (EOF) or when ctx is +// cancelled. It returns an error only on unrecoverable write failures. +func ServeMCP(ctx context.Context, opts MCPServerOpts) error { + in := opts.In + if in == nil { + in = os.Stdin + } + out := opts.Out + if out == nil { + out = os.Stdout + } + + logf := func(format string, args ...any) { + if opts.Logger != nil { + fmt.Fprintf(opts.Logger, "[mcp] "+format+"\n", args...) + } + } + + var mu sync.Mutex + writeLine := func(v any) error { + b, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("mcp marshal: %w", err) + } + mu.Lock() + defer mu.Unlock() + if _, err := out.Write(append(b, '\n')); err != nil { + return fmt.Errorf("mcp write: %w", err) + } + return nil + } + + sendResult := func(id any, result any) error { + return writeLine(jsonrpcResponse{ + JSONRPC: "2.0", + ID: id, + Result: result, + }) + } + + sendError := func(id any, code int, message string) error { + return writeLine(jsonrpcResponse{ + JSONRPC: "2.0", + ID: id, + Error: &jsonrpcError{Code: code, Message: message}, + }) + } + + scanner := bufio.NewScanner(in) + scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024) + + scanCh := make(chan string) + scanErr := make(chan error, 1) + + go func() { + for scanner.Scan() { + line := scanner.Text() + if len(line) == 0 { + continue + } + select { + case scanCh <- line: + case <-ctx.Done(): + return + } + } + if err := scanner.Err(); err != nil { + scanErr <- err + } + close(scanCh) + }() + + for { + select { + case <-ctx.Done(): + logf("context cancelled, stopping") + return nil + + case err := <-scanErr: + return fmt.Errorf("mcp scanner: %w", err) + + case line, ok := <-scanCh: + if !ok { + logf("stdin closed, stopping") + return nil + } + + logf("recv: %s", line) + + var req jsonrpcRequest + if err := json.Unmarshal([]byte(line), &req); err != nil { + logf("json parse error: %v", err) + // id is unknown; respond with null id + if err2 := sendError(nil, -32700, "parse error: "+err.Error()); err2 != nil { + return err2 + } + continue + } + + // Notifications have no id field. After unmarshal, ID is nil only + // when the key was absent (not when explicitly null). We distinguish + // by checking whether "id" key appears in the raw message. + isNotification := !jsonHasKey([]byte(line), "id") + + switch req.Method { + case "initialize": + if isNotification { + continue + } + result := map[string]any{ + "protocolVersion": "2024-11-05", + "capabilities": map[string]any{ + "tools": map[string]any{}, + }, + "serverInfo": map[string]any{ + "name": opts.Name, + "version": opts.Version, + }, + } + if err := sendResult(req.ID, result); err != nil { + return err + } + + case "initialized": + // notification — ignore, no response + + case "tools/list": + if isNotification { + continue + } + tools := opts.Tools + if tools == nil { + tools = []MCPToolDef{} + } + result := map[string]any{ + "tools": tools, + } + if err := sendResult(req.ID, result); err != nil { + return err + } + + case "tools/call": + if isNotification { + continue + } + var p mcpCallParams + if err := json.Unmarshal(req.Params, &p); err != nil { + if err2 := sendError(req.ID, -32602, "invalid params: "+err.Error()); err2 != nil { + return err2 + } + continue + } + + args := p.Arguments + if args == nil { + args = json.RawMessage(`{}`) + } + + toolResult, isErr, handlerErr := opts.Handler(ctx, p.Name, args) + if handlerErr != nil { + logf("handler error for %q: %v", p.Name, handlerErr) + if err2 := sendError(req.ID, -32603, handlerErr.Error()); err2 != nil { + return err2 + } + continue + } + + // Serialize the result value to JSON text for the content block. + resultText, _ := json.Marshal(toolResult) + callResult := map[string]any{ + "content": []map[string]any{ + { + "type": "text", + "text": string(resultText), + }, + }, + "isError": isErr, + } + if err := sendResult(req.ID, callResult); err != nil { + return err + } + + case "ping": + if isNotification { + continue + } + if err := sendResult(req.ID, map[string]any{}); err != nil { + return err + } + + default: + if isNotification { + logf("unknown notification %q, ignoring", req.Method) + continue + } + logf("unknown method %q", req.Method) + if err2 := sendError(req.ID, -32601, "method not found: "+req.Method); err2 != nil { + return err2 + } + } + } + } +} + +// jsonHasKey reports whether the JSON object b contains the given top-level key. +func jsonHasKey(b []byte, key string) bool { + var m map[string]json.RawMessage + if err := json.Unmarshal(b, &m); err != nil { + return false + } + _, ok := m[key] + return ok +} diff --git a/functions/infra/mcp_server_stdio.md b/functions/infra/mcp_server_stdio.md new file mode 100644 index 00000000..0f0398ea --- /dev/null +++ b/functions/infra/mcp_server_stdio.md @@ -0,0 +1,133 @@ +--- +name: mcp_server_stdio +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ServeMCP(ctx context.Context, opts MCPServerOpts) error" +description: "Ejecuta un servidor MCP (Model Context Protocol) sobre stdio implementando JSON-RPC 2.0. Lee de opts.In linea a linea, despacha initialize/tools/list/tools/call/ping al handler del usuario, y escribe respuestas a opts.Out. Retorna nil al cerrar stdin o al cancelar ctx." +tags: [mcp, stdio, json-rpc, claude, tools, server, protocol] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: + - bufio + - context + - encoding/json + - fmt + - io + - os + - sync +tested: true +tests: + - "initialize retorna serverInfo con Name y Version correctos" + - "tools/list retorna las tools registradas con su schema" + - "tools/call con tool valida invoca handler y retorna content con isError false" + - "tools/call cuando handler retorna error genera respuesta error -32603" + - "tools/call cuando handler retorna isError=true usa result.isError=true no error JSON-RPC" + - "metodo desconocido retorna error -32601" + - "notification sin id no produce respuesta en el buffer" + - "json invalido retorna error -32700 con id null" + - "ctx cancel detiene ServeMCP y retorna nil sin error" +test_file_path: "functions/infra/mcp_server_stdio_test.go" +file_path: "functions/infra/mcp_server_stdio.go" +params: + - name: ctx + desc: "Contexto de cancelacion. Cuando se cancela, el bucle de lectura termina limpiamente y la funcion retorna nil." + - name: opts + desc: "Configuracion del servidor: nombre y version del servidor, lista de tools (MCPToolDef con nombre, descripcion y JSON Schema del input), handler unico que despacha todas las tools (recibe name + arguments JSON crudo, retorna result + isError + err), reader de entrada (default os.Stdin), writer de salida (default os.Stdout), y writer opcional de log (default descartado)." +output: "nil cuando stdin se cierra o ctx se cancela. Error si ocurre un fallo irrecuperable de escritura en Out." +--- + +## Ejemplo + +```go +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "fn-registry/functions/infra" +) + +func main() { + tools := []infra.MCPToolDef{ + { + Name: "echo", + Description: "Echoes the input message back", + InputSchema: json.RawMessage(`{ + "type": "object", + "properties": { + "msg": {"type": "string", "description": "message to echo"} + }, + "required": ["msg"] + }`), + }, + } + + handler := func(ctx context.Context, name string, input json.RawMessage) (any, bool, error) { + switch name { + case "echo": + var args struct{ Msg string `json:"msg"` } + if err := json.Unmarshal(input, &args); err != nil { + return nil, false, err + } + return map[string]string{"result": args.Msg}, false, nil + default: + return nil, true, fmt.Errorf("unknown tool: %s", name) + } + } + + err := infra.ServeMCP(context.Background(), infra.MCPServerOpts{ + Name: "my-app-mcp", + Version: "1.0.0", + Tools: tools, + Handler: handler, + Logger: os.Stderr, + }) + if err != nil { + fmt.Fprintln(os.Stderr, "mcp error:", err) + os.Exit(1) + } +} +``` + +Para usar como MCP server en Claude Desktop / `claude -p`, registrar el binario en `.mcp.json`: + +```json +{ + "mcpServers": { + "my-app": { + "command": "/path/to/my-app-binary", + "args": ["--mcp"] + } + } +} +``` + +El binario detecta `--mcp` y llama `ServeMCP` en lugar del modo interactivo normal. + +## Notas + +**Protocolo soportado:** MCP 2024-11-05, subset minimo suficiente para exponer tools a `claude -p` y Claude Desktop. + +**Metodos implementados:** +- `initialize` — handshake inicial; responde con protocolVersion, capabilities y serverInfo. +- `initialized` — notification enviada por el cliente tras el handshake; se ignora sin respuesta. +- `tools/list` — devuelve la lista de tools registradas. +- `tools/call` — invoca el Handler. Si handler.err != nil → JSON-RPC error -32603. Si isError=true → result.isError=true (error logico de la tool, no error de protocolo). +- `ping` — responde con `{}`. +- Cualquier otro metodo → JSON-RPC error -32601 (method not found). +- Notifications (mensajes sin campo `id`) → nunca se responden, ni siquiera con error. + +**Buffer del scanner:** 4 MB para admitir schemas JSON grandes o resultados voluminosos. + +**Concurrencia:** el bucle es secuencial hoy; los writes estan protegidos por mutex para que sea seguro si en el futuro se paraleliza el dispatch del handler. + +**Distincion notification vs request:** la presencia del campo `id` en el JSON crudo (incluso si es null) indica request. La ausencia indica notification. Esto sigue la spec JSON-RPC 2.0. diff --git a/functions/infra/mcp_server_stdio_test.go b/functions/infra/mcp_server_stdio_test.go new file mode 100644 index 00000000..00d8d97e --- /dev/null +++ b/functions/infra/mcp_server_stdio_test.go @@ -0,0 +1,307 @@ +package infra + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + "testing" + "time" +) + +// helper: build an MCPServerOpts wired to in/out buffers with a single echo tool. +func newTestServer(in *strings.Reader, out *bytes.Buffer) MCPServerOpts { + echoSchema := json.RawMessage(`{"type":"object","properties":{"msg":{"type":"string"}}}`) + return MCPServerOpts{ + Name: "test-server", + Version: "0.1.0", + Tools: []MCPToolDef{ + {Name: "echo", Description: "echoes input", InputSchema: echoSchema}, + }, + Handler: func(ctx context.Context, name string, input json.RawMessage) (any, bool, error) { + if name == "echo" { + var args map[string]string + _ = json.Unmarshal(input, &args) + return map[string]string{"echoed": args["msg"]}, false, nil + } + return nil, true, fmt.Errorf("unknown tool: %s", name) + }, + In: strings.NewReader(""), + Out: out, + } +} + +// runServer launches ServeMCP with the given lines as stdin, returns the output lines. +func runServer(t *testing.T, opts MCPServerOpts, lines []string) []map[string]any { + t.Helper() + payload := strings.Join(lines, "\n") + "\n" + opts.In = strings.NewReader(payload) + var buf bytes.Buffer + opts.Out = &buf + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + err := ServeMCP(ctx, opts) + if err != nil { + t.Fatalf("ServeMCP returned error: %v", err) + } + + var results []map[string]any + for _, line := range strings.Split(strings.TrimSpace(buf.String()), "\n") { + if line == "" { + continue + } + var m map[string]any + if err2 := json.Unmarshal([]byte(line), &m); err2 != nil { + t.Fatalf("output not valid JSON: %q — %v", line, err2) + } + results = append(results, m) + } + return results +} + +func TestServeMCP_initialize(t *testing.T) { + t.Run("initialize retorna serverInfo con Name y Version correctos", func(t *testing.T) { + req := `{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}` + opts := MCPServerOpts{ + Name: "my-server", Version: "1.2.3", + Tools: []MCPToolDef{}, + Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil }, + } + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + r := results[0] + if r["id"].(float64) != 1 { + t.Errorf("wrong id: %v", r["id"]) + } + result := r["result"].(map[string]any) + info := result["serverInfo"].(map[string]any) + if info["name"] != "my-server" { + t.Errorf("wrong name: %v", info["name"]) + } + if info["version"] != "1.2.3" { + t.Errorf("wrong version: %v", info["version"]) + } + if result["protocolVersion"] != "2024-11-05" { + t.Errorf("wrong protocolVersion: %v", result["protocolVersion"]) + } + }) +} + +func TestServeMCP_toolsList(t *testing.T) { + t.Run("tools/list retorna las tools registradas con su schema", func(t *testing.T) { + schema := json.RawMessage(`{"type":"object","properties":{"x":{"type":"number"}}}`) + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{ + {Name: "add", Description: "adds numbers", InputSchema: schema}, + }, + Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil }, + } + req := `{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}` + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + result := results[0]["result"].(map[string]any) + tools := result["tools"].([]any) + if len(tools) != 1 { + t.Fatalf("expected 1 tool, got %d", len(tools)) + } + tool := tools[0].(map[string]any) + if tool["name"] != "add" { + t.Errorf("wrong tool name: %v", tool["name"]) + } + }) +} + +func TestServeMCP_toolsCall_success(t *testing.T) { + t.Run("tools/call con tool valida invoca handler y retorna content con isError false", func(t *testing.T) { + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{{Name: "echo", Description: "echo", InputSchema: json.RawMessage(`{}`)}}, + Handler: func(_ context.Context, name string, input json.RawMessage) (any, bool, error) { + var args map[string]string + _ = json.Unmarshal(input, &args) + return map[string]string{"echoed": args["msg"]}, false, nil + }, + } + req := `{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"echo","arguments":{"msg":"hello"}}}` + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + result := results[0]["result"].(map[string]any) + if result["isError"].(bool) { + t.Error("expected isError=false") + } + content := result["content"].([]any) + if len(content) == 0 { + t.Fatal("expected at least 1 content block") + } + block := content[0].(map[string]any) + if block["type"] != "text" { + t.Errorf("expected type=text, got %v", block["type"]) + } + text := block["text"].(string) + if !strings.Contains(text, "hello") { + t.Errorf("expected echoed hello in text, got: %s", text) + } + }) +} + +func TestServeMCP_toolsCall_handlerError(t *testing.T) { + t.Run("tools/call cuando handler retorna error genera respuesta error -32603", func(t *testing.T) { + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{{Name: "fail", Description: "always fails", InputSchema: json.RawMessage(`{}`)}}, + Handler: func(_ context.Context, name string, _ json.RawMessage) (any, bool, error) { + return nil, false, fmt.Errorf("internal failure") + }, + } + req := `{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"fail","arguments":{}}}` + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + errField, ok := results[0]["error"].(map[string]any) + if !ok { + t.Fatalf("expected error field, got result: %v", results[0]) + } + if errField["code"].(float64) != -32603 { + t.Errorf("expected code -32603, got %v", errField["code"]) + } + }) +} + +func TestServeMCP_toolsCall_isError(t *testing.T) { + t.Run("tools/call cuando handler retorna isError=true usa result.isError=true no error JSON-RPC", func(t *testing.T) { + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{{Name: "badtool", Description: "logical error", InputSchema: json.RawMessage(`{}`)}}, + Handler: func(_ context.Context, name string, _ json.RawMessage) (any, bool, error) { + return map[string]string{"reason": "not found"}, true, nil + }, + } + req := `{"jsonrpc":"2.0","id":5,"method":"tools/call","params":{"name":"badtool","arguments":{}}}` + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + if _, hasErr := results[0]["error"]; hasErr { + t.Fatal("expected no JSON-RPC error field when isError=true") + } + result := results[0]["result"].(map[string]any) + if !result["isError"].(bool) { + t.Error("expected result.isError=true") + } + }) +} + +func TestServeMCP_unknownMethod(t *testing.T) { + t.Run("metodo desconocido retorna error -32601", func(t *testing.T) { + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{}, + Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil }, + } + req := `{"jsonrpc":"2.0","id":6,"method":"nope/nope","params":{}}` + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + errField, ok := results[0]["error"].(map[string]any) + if !ok { + t.Fatal("expected error field") + } + if errField["code"].(float64) != -32601 { + t.Errorf("expected code -32601, got %v", errField["code"]) + } + }) +} + +func TestServeMCP_notification(t *testing.T) { + t.Run("notification sin id no produce respuesta en el buffer", func(t *testing.T) { + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{}, + Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil }, + } + // "initialized" is a notification (no id field) + notif := `{"jsonrpc":"2.0","method":"initialized"}` + // followed by a regular request so we know the server processed both + req := `{"jsonrpc":"2.0","id":7,"method":"ping","params":{}}` + results := runServer(t, opts, []string{notif, req}) + // only ping should produce a response + if len(results) != 1 { + t.Fatalf("expected 1 response (only for ping), got %d", len(results)) + } + if results[0]["id"].(float64) != 7 { + t.Errorf("expected id=7 from ping, got %v", results[0]["id"]) + } + }) +} + +func TestServeMCP_invalidJSON(t *testing.T) { + t.Run("json invalido retorna error -32700 con id null", func(t *testing.T) { + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{}, + Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil }, + } + req := `not json at all` + results := runServer(t, opts, []string{req}) + if len(results) != 1 { + t.Fatalf("expected 1 response, got %d", len(results)) + } + errField, ok := results[0]["error"].(map[string]any) + if !ok { + t.Fatal("expected error field") + } + if errField["code"].(float64) != -32700 { + t.Errorf("expected code -32700, got %v", errField["code"]) + } + // id must be null (absent or nil) + if id, hasID := results[0]["id"]; hasID && id != nil { + t.Errorf("expected null id, got %v", id) + } + }) +} + +func TestServeMCP_ctxCancel(t *testing.T) { + t.Run("ctx cancel detiene ServeMCP y retorna nil sin error", func(t *testing.T) { + // Use a pipe so stdin stays open forever + pr, pw := strings.NewReader(""), new(bytes.Buffer) + _ = pw + + ctx, cancel := context.WithCancel(context.Background()) + opts := MCPServerOpts{ + Name: "srv", Version: "0.1", + Tools: []MCPToolDef{}, + Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil }, + In: pr, + Out: new(bytes.Buffer), + } + + done := make(chan error, 1) + go func() { + done <- ServeMCP(ctx, opts) + }() + + // Cancel immediately — stdin is already at EOF so it will also stop cleanly. + cancel() + + select { + case err := <-done: + if err != nil { + t.Errorf("expected nil error on ctx cancel, got: %v", err) + } + case <-time.After(2 * time.Second): + t.Error("ServeMCP did not stop after ctx cancel") + } + }) +}