23f9aa90e8
- .mcp.json - CAPABILITIES_TODO.md - demo_e2e/ Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
109 lines
3.5 KiB
Python
109 lines
3.5 KiB
Python
"""Cliente JSON-RPC stdio mínimo para hablar con un servidor MCP (mark3labs/mcp-go).
|
|
|
|
Secuencial por diseño: cada call() manda un request y bloquea hasta recibir la
|
|
respuesta con el mismo id, ignorando notificaciones intermedias. Esto replica
|
|
cómo un cliente MCP real (Claude) usa el servidor — a diferencia de mandar todos
|
|
los mensajes de golpe, que provoca una race porque el servidor procesa los
|
|
requests en goroutines concurrentes.
|
|
|
|
Un hilo lector vuelca stdout a una cola; _read_until consume de la cola con
|
|
timeout para no colgarse si una tool no responde.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import queue
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
|
|
|
|
class MCPClient:
|
|
def __init__(self, exe, env=None, cwd=None, stderr_path=None):
|
|
full_env = dict(os.environ)
|
|
if env:
|
|
full_env.update(env)
|
|
self._stderr = open(stderr_path, "w") if stderr_path else subprocess.DEVNULL
|
|
self.p = subprocess.Popen(
|
|
[exe],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=self._stderr,
|
|
text=True,
|
|
bufsize=1,
|
|
env=full_env,
|
|
cwd=cwd,
|
|
)
|
|
self._id = 0
|
|
self._q = queue.Queue()
|
|
self._reader = threading.Thread(target=self._read_loop, daemon=True)
|
|
self._reader.start()
|
|
|
|
def _read_loop(self):
|
|
for line in self.p.stdout:
|
|
line = line.strip()
|
|
if line:
|
|
self._q.put(line)
|
|
|
|
def _send(self, obj):
|
|
self.p.stdin.write(json.dumps(obj) + "\n")
|
|
self.p.stdin.flush()
|
|
|
|
def _read_until(self, want_id, timeout):
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
try:
|
|
line = self._q.get(timeout=max(0.1, deadline - time.time()))
|
|
except queue.Empty:
|
|
break
|
|
try:
|
|
m = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if m.get("id") == want_id:
|
|
return m
|
|
raise TimeoutError(f"sin respuesta para id {want_id} en {timeout}s")
|
|
|
|
def initialize(self):
|
|
self._id += 1
|
|
iid = self._id
|
|
self._send({
|
|
"jsonrpc": "2.0", "id": iid, "method": "initialize",
|
|
"params": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {},
|
|
"clientInfo": {"name": "demo_e2e", "version": "1"},
|
|
},
|
|
})
|
|
r = self._read_until(iid, 15)
|
|
self._send({"jsonrpc": "2.0", "method": "notifications/initialized"})
|
|
return r
|
|
|
|
def call(self, name, arguments, timeout=60):
|
|
"""Llama una tool. Devuelve (texto, is_error)."""
|
|
self._id += 1
|
|
cid = self._id
|
|
self._send({
|
|
"jsonrpc": "2.0", "id": cid, "method": "tools/call",
|
|
"params": {"name": name, "arguments": arguments},
|
|
})
|
|
r = self._read_until(cid, timeout)
|
|
if "error" in r:
|
|
return json.dumps(r["error"]), True
|
|
res = r.get("result", {})
|
|
content = res.get("content", [])
|
|
text = "".join(c.get("text", "") for c in content if isinstance(c, dict))
|
|
return text, bool(res.get("isError", False))
|
|
|
|
def close(self):
|
|
try:
|
|
self.p.stdin.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.p.wait(timeout=5)
|
|
except Exception:
|
|
self.p.kill()
|
|
if self._stderr not in (subprocess.DEVNULL, None):
|
|
self._stderr.close()
|