Files
web_scraping/demo_e2e/mcp_client.py
T
egutierrez 23f9aa90e8 chore: auto-commit (3 archivos)
- .mcp.json
- CAPABILITIES_TODO.md
- demo_e2e/

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-06 12:49:54 +02:00

109 lines
3.5 KiB
Python

"""Cliente JSON-RPC stdio mínimo para hablar con un servidor MCP (mark3labs/mcp-go).
Secuencial por diseño: cada call() manda un request y bloquea hasta recibir la
respuesta con el mismo id, ignorando notificaciones intermedias. Esto replica
cómo un cliente MCP real (Claude) usa el servidor — a diferencia de mandar todos
los mensajes de golpe, que provoca una race porque el servidor procesa los
requests en goroutines concurrentes.
Un hilo lector vuelca stdout a una cola; _read_until consume de la cola con
timeout para no colgarse si una tool no responde.
"""
import json
import os
import queue
import subprocess
import threading
import time
class MCPClient:
def __init__(self, exe, env=None, cwd=None, stderr_path=None):
full_env = dict(os.environ)
if env:
full_env.update(env)
self._stderr = open(stderr_path, "w") if stderr_path else subprocess.DEVNULL
self.p = subprocess.Popen(
[exe],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=self._stderr,
text=True,
bufsize=1,
env=full_env,
cwd=cwd,
)
self._id = 0
self._q = queue.Queue()
self._reader = threading.Thread(target=self._read_loop, daemon=True)
self._reader.start()
def _read_loop(self):
for line in self.p.stdout:
line = line.strip()
if line:
self._q.put(line)
def _send(self, obj):
self.p.stdin.write(json.dumps(obj) + "\n")
self.p.stdin.flush()
def _read_until(self, want_id, timeout):
deadline = time.time() + timeout
while time.time() < deadline:
try:
line = self._q.get(timeout=max(0.1, deadline - time.time()))
except queue.Empty:
break
try:
m = json.loads(line)
except json.JSONDecodeError:
continue
if m.get("id") == want_id:
return m
raise TimeoutError(f"sin respuesta para id {want_id} en {timeout}s")
def initialize(self):
self._id += 1
iid = self._id
self._send({
"jsonrpc": "2.0", "id": iid, "method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "demo_e2e", "version": "1"},
},
})
r = self._read_until(iid, 15)
self._send({"jsonrpc": "2.0", "method": "notifications/initialized"})
return r
def call(self, name, arguments, timeout=60):
"""Llama una tool. Devuelve (texto, is_error)."""
self._id += 1
cid = self._id
self._send({
"jsonrpc": "2.0", "id": cid, "method": "tools/call",
"params": {"name": name, "arguments": arguments},
})
r = self._read_until(cid, timeout)
if "error" in r:
return json.dumps(r["error"]), True
res = r.get("result", {})
content = res.get("content", [])
text = "".join(c.get("text", "") for c in content if isinstance(c, dict))
return text, bool(res.get("isError", False))
def close(self):
try:
self.p.stdin.close()
except Exception:
pass
try:
self.p.wait(timeout=5)
except Exception:
self.p.kill()
if self._stderr not in (subprocess.DEVNULL, None):
self._stderr.close()