feat(registry): claude_stream + mcp_server_stdio para chat con tool-use

- claude_stream_go_core: lanza claude -p --output-format stream-json
  --verbose, decodifica NDJSON y emite eventos sinteticos (text_delta,
  tool_use, tool_result, result, error) por canal Go. 10 tests con fake
  claude bash.
- mcp_server_stdio_go_infra: scaffold de MCP server JSON-RPC 2.0 sobre
  stdio (initialize, tools/list, tools/call, ping). Usuario registra
  tool defs y handler unico. 9 tests.

Usadas por apps/kanban backend para reemplazar el chat HTTP one-shot
con XML actions por WebSocket streaming + tool-use nativa.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-09 14:54:56 +02:00
parent 98c4982707
commit 4881eeb7de
6 changed files with 1515 additions and 0 deletions
+284
View File
@@ -0,0 +1,284 @@
package infra
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"sync"
)
// MCPToolDef describes a tool exported by the MCP server.
// InputSchema must be a valid JSON Schema object with "type":"object" and
// "properties" describing the tool arguments.
type MCPToolDef struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"inputSchema"`
}
// MCPToolHandler executes a tool. input is the raw JSON of the arguments
// sent by the MCP client (the value of params.arguments).
// Returns result (any JSON-serializable value), isError (true when the tool
// itself reports a logical error, not a protocol error), and err (internal
// failure that results in a JSON-RPC error response with code -32603).
type MCPToolHandler func(ctx context.Context, name string, input json.RawMessage) (result any, isError bool, err error)
// MCPServerOpts configures the MCP stdio server.
type MCPServerOpts struct {
Name string // server name reported to the client in initialize
Version string // server version reported to the client in initialize
Tools []MCPToolDef
Handler MCPToolHandler // single dispatcher for all tools
In io.Reader // defaults to os.Stdin when nil
Out io.Writer // defaults to os.Stdout when nil
Logger io.Writer // optional log sink (e.g. os.Stderr); discards when nil
}
// jsonrpcRequest is the wire format for an incoming JSON-RPC 2.0 message.
type jsonrpcRequest struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id"` // number, string, or null; absent for notifications
Method string `json:"method"`
Params json.RawMessage `json:"params"`
}
// jsonrpcResponse is the wire format for an outgoing JSON-RPC 2.0 response.
type jsonrpcResponse struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id,omitempty"`
Result any `json:"result,omitempty"`
Error *jsonrpcError `json:"error,omitempty"`
}
type jsonrpcError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// mcpCallParams is params.arguments unwrapped from a tools/call request.
type mcpCallParams struct {
Name string `json:"name"`
Arguments json.RawMessage `json:"arguments"`
}
// ServeMCP runs the JSON-RPC 2.0 loop over stdio implementing the minimum MCP
// protocol surface: initialize, initialized (notification), tools/list,
// tools/call, and ping. It reads newline-delimited JSON from opts.In and writes
// newline-delimited JSON to opts.Out.
//
// ServeMCP returns nil when the client closes stdin (EOF) or when ctx is
// cancelled. It returns an error only on unrecoverable write failures.
func ServeMCP(ctx context.Context, opts MCPServerOpts) error {
in := opts.In
if in == nil {
in = os.Stdin
}
out := opts.Out
if out == nil {
out = os.Stdout
}
logf := func(format string, args ...any) {
if opts.Logger != nil {
fmt.Fprintf(opts.Logger, "[mcp] "+format+"\n", args...)
}
}
var mu sync.Mutex
writeLine := func(v any) error {
b, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("mcp marshal: %w", err)
}
mu.Lock()
defer mu.Unlock()
if _, err := out.Write(append(b, '\n')); err != nil {
return fmt.Errorf("mcp write: %w", err)
}
return nil
}
sendResult := func(id any, result any) error {
return writeLine(jsonrpcResponse{
JSONRPC: "2.0",
ID: id,
Result: result,
})
}
sendError := func(id any, code int, message string) error {
return writeLine(jsonrpcResponse{
JSONRPC: "2.0",
ID: id,
Error: &jsonrpcError{Code: code, Message: message},
})
}
scanner := bufio.NewScanner(in)
scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024)
scanCh := make(chan string)
scanErr := make(chan error, 1)
go func() {
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
select {
case scanCh <- line:
case <-ctx.Done():
return
}
}
if err := scanner.Err(); err != nil {
scanErr <- err
}
close(scanCh)
}()
for {
select {
case <-ctx.Done():
logf("context cancelled, stopping")
return nil
case err := <-scanErr:
return fmt.Errorf("mcp scanner: %w", err)
case line, ok := <-scanCh:
if !ok {
logf("stdin closed, stopping")
return nil
}
logf("recv: %s", line)
var req jsonrpcRequest
if err := json.Unmarshal([]byte(line), &req); err != nil {
logf("json parse error: %v", err)
// id is unknown; respond with null id
if err2 := sendError(nil, -32700, "parse error: "+err.Error()); err2 != nil {
return err2
}
continue
}
// Notifications have no id field. After unmarshal, ID is nil only
// when the key was absent (not when explicitly null). We distinguish
// by checking whether "id" key appears in the raw message.
isNotification := !jsonHasKey([]byte(line), "id")
switch req.Method {
case "initialize":
if isNotification {
continue
}
result := map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{
"tools": map[string]any{},
},
"serverInfo": map[string]any{
"name": opts.Name,
"version": opts.Version,
},
}
if err := sendResult(req.ID, result); err != nil {
return err
}
case "initialized":
// notification — ignore, no response
case "tools/list":
if isNotification {
continue
}
tools := opts.Tools
if tools == nil {
tools = []MCPToolDef{}
}
result := map[string]any{
"tools": tools,
}
if err := sendResult(req.ID, result); err != nil {
return err
}
case "tools/call":
if isNotification {
continue
}
var p mcpCallParams
if err := json.Unmarshal(req.Params, &p); err != nil {
if err2 := sendError(req.ID, -32602, "invalid params: "+err.Error()); err2 != nil {
return err2
}
continue
}
args := p.Arguments
if args == nil {
args = json.RawMessage(`{}`)
}
toolResult, isErr, handlerErr := opts.Handler(ctx, p.Name, args)
if handlerErr != nil {
logf("handler error for %q: %v", p.Name, handlerErr)
if err2 := sendError(req.ID, -32603, handlerErr.Error()); err2 != nil {
return err2
}
continue
}
// Serialize the result value to JSON text for the content block.
resultText, _ := json.Marshal(toolResult)
callResult := map[string]any{
"content": []map[string]any{
{
"type": "text",
"text": string(resultText),
},
},
"isError": isErr,
}
if err := sendResult(req.ID, callResult); err != nil {
return err
}
case "ping":
if isNotification {
continue
}
if err := sendResult(req.ID, map[string]any{}); err != nil {
return err
}
default:
if isNotification {
logf("unknown notification %q, ignoring", req.Method)
continue
}
logf("unknown method %q", req.Method)
if err2 := sendError(req.ID, -32601, "method not found: "+req.Method); err2 != nil {
return err2
}
}
}
}
}
// jsonHasKey reports whether the JSON object b contains the given top-level key.
func jsonHasKey(b []byte, key string) bool {
var m map[string]json.RawMessage
if err := json.Unmarshal(b, &m); err != nil {
return false
}
_, ok := m[key]
return ok
}
+133
View File
@@ -0,0 +1,133 @@
---
name: mcp_server_stdio
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ServeMCP(ctx context.Context, opts MCPServerOpts) error"
description: "Ejecuta un servidor MCP (Model Context Protocol) sobre stdio implementando JSON-RPC 2.0. Lee de opts.In linea a linea, despacha initialize/tools/list/tools/call/ping al handler del usuario, y escribe respuestas a opts.Out. Retorna nil al cerrar stdin o al cancelar ctx."
tags: [mcp, stdio, json-rpc, claude, tools, server, protocol]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports:
- bufio
- context
- encoding/json
- fmt
- io
- os
- sync
tested: true
tests:
- "initialize retorna serverInfo con Name y Version correctos"
- "tools/list retorna las tools registradas con su schema"
- "tools/call con tool valida invoca handler y retorna content con isError false"
- "tools/call cuando handler retorna error genera respuesta error -32603"
- "tools/call cuando handler retorna isError=true usa result.isError=true no error JSON-RPC"
- "metodo desconocido retorna error -32601"
- "notification sin id no produce respuesta en el buffer"
- "json invalido retorna error -32700 con id null"
- "ctx cancel detiene ServeMCP y retorna nil sin error"
test_file_path: "functions/infra/mcp_server_stdio_test.go"
file_path: "functions/infra/mcp_server_stdio.go"
params:
- name: ctx
desc: "Contexto de cancelacion. Cuando se cancela, el bucle de lectura termina limpiamente y la funcion retorna nil."
- name: opts
desc: "Configuracion del servidor: nombre y version del servidor, lista de tools (MCPToolDef con nombre, descripcion y JSON Schema del input), handler unico que despacha todas las tools (recibe name + arguments JSON crudo, retorna result + isError + err), reader de entrada (default os.Stdin), writer de salida (default os.Stdout), y writer opcional de log (default descartado)."
output: "nil cuando stdin se cierra o ctx se cancela. Error si ocurre un fallo irrecuperable de escritura en Out."
---
## Ejemplo
```go
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"fn-registry/functions/infra"
)
func main() {
tools := []infra.MCPToolDef{
{
Name: "echo",
Description: "Echoes the input message back",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"msg": {"type": "string", "description": "message to echo"}
},
"required": ["msg"]
}`),
},
}
handler := func(ctx context.Context, name string, input json.RawMessage) (any, bool, error) {
switch name {
case "echo":
var args struct{ Msg string `json:"msg"` }
if err := json.Unmarshal(input, &args); err != nil {
return nil, false, err
}
return map[string]string{"result": args.Msg}, false, nil
default:
return nil, true, fmt.Errorf("unknown tool: %s", name)
}
}
err := infra.ServeMCP(context.Background(), infra.MCPServerOpts{
Name: "my-app-mcp",
Version: "1.0.0",
Tools: tools,
Handler: handler,
Logger: os.Stderr,
})
if err != nil {
fmt.Fprintln(os.Stderr, "mcp error:", err)
os.Exit(1)
}
}
```
Para usar como MCP server en Claude Desktop / `claude -p`, registrar el binario en `.mcp.json`:
```json
{
"mcpServers": {
"my-app": {
"command": "/path/to/my-app-binary",
"args": ["--mcp"]
}
}
}
```
El binario detecta `--mcp` y llama `ServeMCP` en lugar del modo interactivo normal.
## Notas
**Protocolo soportado:** MCP 2024-11-05, subset minimo suficiente para exponer tools a `claude -p` y Claude Desktop.
**Metodos implementados:**
- `initialize` — handshake inicial; responde con protocolVersion, capabilities y serverInfo.
- `initialized` — notification enviada por el cliente tras el handshake; se ignora sin respuesta.
- `tools/list` — devuelve la lista de tools registradas.
- `tools/call` — invoca el Handler. Si handler.err != nil → JSON-RPC error -32603. Si isError=true → result.isError=true (error logico de la tool, no error de protocolo).
- `ping` — responde con `{}`.
- Cualquier otro metodo → JSON-RPC error -32601 (method not found).
- Notifications (mensajes sin campo `id`) → nunca se responden, ni siquiera con error.
**Buffer del scanner:** 4 MB para admitir schemas JSON grandes o resultados voluminosos.
**Concurrencia:** el bucle es secuencial hoy; los writes estan protegidos por mutex para que sea seguro si en el futuro se paraleliza el dispatch del handler.
**Distincion notification vs request:** la presencia del campo `id` en el JSON crudo (incluso si es null) indica request. La ausencia indica notification. Esto sigue la spec JSON-RPC 2.0.
+307
View File
@@ -0,0 +1,307 @@
package infra
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
)
// helper: build an MCPServerOpts wired to in/out buffers with a single echo tool.
func newTestServer(in *strings.Reader, out *bytes.Buffer) MCPServerOpts {
echoSchema := json.RawMessage(`{"type":"object","properties":{"msg":{"type":"string"}}}`)
return MCPServerOpts{
Name: "test-server",
Version: "0.1.0",
Tools: []MCPToolDef{
{Name: "echo", Description: "echoes input", InputSchema: echoSchema},
},
Handler: func(ctx context.Context, name string, input json.RawMessage) (any, bool, error) {
if name == "echo" {
var args map[string]string
_ = json.Unmarshal(input, &args)
return map[string]string{"echoed": args["msg"]}, false, nil
}
return nil, true, fmt.Errorf("unknown tool: %s", name)
},
In: strings.NewReader(""),
Out: out,
}
}
// runServer launches ServeMCP with the given lines as stdin, returns the output lines.
func runServer(t *testing.T, opts MCPServerOpts, lines []string) []map[string]any {
t.Helper()
payload := strings.Join(lines, "\n") + "\n"
opts.In = strings.NewReader(payload)
var buf bytes.Buffer
opts.Out = &buf
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
err := ServeMCP(ctx, opts)
if err != nil {
t.Fatalf("ServeMCP returned error: %v", err)
}
var results []map[string]any
for _, line := range strings.Split(strings.TrimSpace(buf.String()), "\n") {
if line == "" {
continue
}
var m map[string]any
if err2 := json.Unmarshal([]byte(line), &m); err2 != nil {
t.Fatalf("output not valid JSON: %q — %v", line, err2)
}
results = append(results, m)
}
return results
}
func TestServeMCP_initialize(t *testing.T) {
t.Run("initialize retorna serverInfo con Name y Version correctos", func(t *testing.T) {
req := `{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}`
opts := MCPServerOpts{
Name: "my-server", Version: "1.2.3",
Tools: []MCPToolDef{},
Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil },
}
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
r := results[0]
if r["id"].(float64) != 1 {
t.Errorf("wrong id: %v", r["id"])
}
result := r["result"].(map[string]any)
info := result["serverInfo"].(map[string]any)
if info["name"] != "my-server" {
t.Errorf("wrong name: %v", info["name"])
}
if info["version"] != "1.2.3" {
t.Errorf("wrong version: %v", info["version"])
}
if result["protocolVersion"] != "2024-11-05" {
t.Errorf("wrong protocolVersion: %v", result["protocolVersion"])
}
})
}
func TestServeMCP_toolsList(t *testing.T) {
t.Run("tools/list retorna las tools registradas con su schema", func(t *testing.T) {
schema := json.RawMessage(`{"type":"object","properties":{"x":{"type":"number"}}}`)
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{
{Name: "add", Description: "adds numbers", InputSchema: schema},
},
Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil },
}
req := `{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}`
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
result := results[0]["result"].(map[string]any)
tools := result["tools"].([]any)
if len(tools) != 1 {
t.Fatalf("expected 1 tool, got %d", len(tools))
}
tool := tools[0].(map[string]any)
if tool["name"] != "add" {
t.Errorf("wrong tool name: %v", tool["name"])
}
})
}
func TestServeMCP_toolsCall_success(t *testing.T) {
t.Run("tools/call con tool valida invoca handler y retorna content con isError false", func(t *testing.T) {
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{{Name: "echo", Description: "echo", InputSchema: json.RawMessage(`{}`)}},
Handler: func(_ context.Context, name string, input json.RawMessage) (any, bool, error) {
var args map[string]string
_ = json.Unmarshal(input, &args)
return map[string]string{"echoed": args["msg"]}, false, nil
},
}
req := `{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"echo","arguments":{"msg":"hello"}}}`
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
result := results[0]["result"].(map[string]any)
if result["isError"].(bool) {
t.Error("expected isError=false")
}
content := result["content"].([]any)
if len(content) == 0 {
t.Fatal("expected at least 1 content block")
}
block := content[0].(map[string]any)
if block["type"] != "text" {
t.Errorf("expected type=text, got %v", block["type"])
}
text := block["text"].(string)
if !strings.Contains(text, "hello") {
t.Errorf("expected echoed hello in text, got: %s", text)
}
})
}
func TestServeMCP_toolsCall_handlerError(t *testing.T) {
t.Run("tools/call cuando handler retorna error genera respuesta error -32603", func(t *testing.T) {
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{{Name: "fail", Description: "always fails", InputSchema: json.RawMessage(`{}`)}},
Handler: func(_ context.Context, name string, _ json.RawMessage) (any, bool, error) {
return nil, false, fmt.Errorf("internal failure")
},
}
req := `{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"fail","arguments":{}}}`
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
errField, ok := results[0]["error"].(map[string]any)
if !ok {
t.Fatalf("expected error field, got result: %v", results[0])
}
if errField["code"].(float64) != -32603 {
t.Errorf("expected code -32603, got %v", errField["code"])
}
})
}
func TestServeMCP_toolsCall_isError(t *testing.T) {
t.Run("tools/call cuando handler retorna isError=true usa result.isError=true no error JSON-RPC", func(t *testing.T) {
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{{Name: "badtool", Description: "logical error", InputSchema: json.RawMessage(`{}`)}},
Handler: func(_ context.Context, name string, _ json.RawMessage) (any, bool, error) {
return map[string]string{"reason": "not found"}, true, nil
},
}
req := `{"jsonrpc":"2.0","id":5,"method":"tools/call","params":{"name":"badtool","arguments":{}}}`
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
if _, hasErr := results[0]["error"]; hasErr {
t.Fatal("expected no JSON-RPC error field when isError=true")
}
result := results[0]["result"].(map[string]any)
if !result["isError"].(bool) {
t.Error("expected result.isError=true")
}
})
}
func TestServeMCP_unknownMethod(t *testing.T) {
t.Run("metodo desconocido retorna error -32601", func(t *testing.T) {
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{},
Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil },
}
req := `{"jsonrpc":"2.0","id":6,"method":"nope/nope","params":{}}`
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
errField, ok := results[0]["error"].(map[string]any)
if !ok {
t.Fatal("expected error field")
}
if errField["code"].(float64) != -32601 {
t.Errorf("expected code -32601, got %v", errField["code"])
}
})
}
func TestServeMCP_notification(t *testing.T) {
t.Run("notification sin id no produce respuesta en el buffer", func(t *testing.T) {
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{},
Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil },
}
// "initialized" is a notification (no id field)
notif := `{"jsonrpc":"2.0","method":"initialized"}`
// followed by a regular request so we know the server processed both
req := `{"jsonrpc":"2.0","id":7,"method":"ping","params":{}}`
results := runServer(t, opts, []string{notif, req})
// only ping should produce a response
if len(results) != 1 {
t.Fatalf("expected 1 response (only for ping), got %d", len(results))
}
if results[0]["id"].(float64) != 7 {
t.Errorf("expected id=7 from ping, got %v", results[0]["id"])
}
})
}
func TestServeMCP_invalidJSON(t *testing.T) {
t.Run("json invalido retorna error -32700 con id null", func(t *testing.T) {
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{},
Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil },
}
req := `not json at all`
results := runServer(t, opts, []string{req})
if len(results) != 1 {
t.Fatalf("expected 1 response, got %d", len(results))
}
errField, ok := results[0]["error"].(map[string]any)
if !ok {
t.Fatal("expected error field")
}
if errField["code"].(float64) != -32700 {
t.Errorf("expected code -32700, got %v", errField["code"])
}
// id must be null (absent or nil)
if id, hasID := results[0]["id"]; hasID && id != nil {
t.Errorf("expected null id, got %v", id)
}
})
}
func TestServeMCP_ctxCancel(t *testing.T) {
t.Run("ctx cancel detiene ServeMCP y retorna nil sin error", func(t *testing.T) {
// Use a pipe so stdin stays open forever
pr, pw := strings.NewReader(""), new(bytes.Buffer)
_ = pw
ctx, cancel := context.WithCancel(context.Background())
opts := MCPServerOpts{
Name: "srv", Version: "0.1",
Tools: []MCPToolDef{},
Handler: func(_ context.Context, _ string, _ json.RawMessage) (any, bool, error) { return nil, false, nil },
In: pr,
Out: new(bytes.Buffer),
}
done := make(chan error, 1)
go func() {
done <- ServeMCP(ctx, opts)
}()
// Cancel immediately — stdin is already at EOF so it will also stop cleanly.
cancel()
select {
case err := <-done:
if err != nil {
t.Errorf("expected nil error on ctx cancel, got: %v", err)
}
case <-time.After(2 * time.Second):
t.Error("ServeMCP did not stop after ctx cancel")
}
})
}