Merge issue/mcp-notes-bugfix
This commit is contained in:
@@ -726,20 +726,24 @@ MCP_TOOLS = [
|
||||
"limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
|
||||
"required": ["query"]}},
|
||||
{"name": "node_create",
|
||||
"description": "Crea una entidad nueva. Si type se omite, se infiere heuristicamente del name (email/url/domain/ip/phone/text).",
|
||||
"description": "Crea una entidad nueva. Si type se omite, se infiere heuristicamente del name (email/url/domain/ip/phone/text). Pasa `notes` para inicializar el panel Note del nodo (es lo que leen los enrichers split_sentences y extract_iocs_text como fuente de texto).",
|
||||
"inputSchema": {"type": "object", "properties": {
|
||||
"name": {"type": "string"},
|
||||
"type": {"type": "string", "description": "Opcional. Auto-detectado si se omite."},
|
||||
"description": {"type": "string"}},
|
||||
"description": {"type": "string"},
|
||||
"notes": {"type": "string", "description": "Texto largo libre del nodo (panel Note)."}},
|
||||
"required": ["name"]}},
|
||||
{"name": "node_update",
|
||||
"description": "Modifica campos de una entidad existente. Al menos un campo aparte de id debe pasarse.",
|
||||
"description": "Modifica campos de una entidad existente. Al menos un campo aparte de id debe pasarse. `notes` reemplaza el contenido completo del panel Note; combinalo con `append_notes=true` para anyadir al final preservando lo existente.",
|
||||
"inputSchema": {"type": "object", "properties": {
|
||||
"id": {"type": "string"},
|
||||
"name": {"type": "string"},
|
||||
"type": {"type": "string"},
|
||||
"status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
|
||||
"description": {"type": "string"},
|
||||
"notes": {"type": "string", "description": "Texto del panel Note. Reemplaza el contenido salvo que append_notes=true."},
|
||||
"append_notes": {"type": "boolean", "default": False,
|
||||
"description": "Si true, anyade `notes` al final con doble newline en vez de reemplazar."},
|
||||
"tags": {"type": "string", "description": "JSON array literal o CSV 'a,b,c'"}},
|
||||
"required": ["id"]}},
|
||||
{"name": "node_delete",
|
||||
@@ -830,9 +834,11 @@ MCP_DISPATCH = {
|
||||
"node_list": (cmd_node_list, {"type": None, "status": None, "limit": 100}),
|
||||
"node_show": (cmd_node_show, {}),
|
||||
"node_search": (cmd_node_search, {"limit": 50}),
|
||||
"node_create": (cmd_node_create, {"type": None, "description": None}),
|
||||
"node_create": (cmd_node_create, {"type": None, "description": None,
|
||||
"notes": None}),
|
||||
"node_update": (cmd_node_update, {"name": None, "type": None,
|
||||
"status": None, "description": None,
|
||||
"notes": None, "append_notes": False,
|
||||
"tags": None}),
|
||||
"node_delete": (cmd_node_delete, {}),
|
||||
"rel_create": (cmd_rel_create, {"name": None}),
|
||||
|
||||
@@ -0,0 +1,465 @@
|
||||
"""Tests del CLI `gx-cli` y del dispatcher MCP empaquetado dentro.
|
||||
|
||||
Cubre las herramientas que el agente Echo usa para manejar el grafo.
|
||||
Cada test prepara una operations.db + graph_explorer.db efimeras y
|
||||
ejecuta gx-cli como subproceso pasando GX_OPS_DB / GX_APP_DB / GX_APP_DIR
|
||||
en el entorno (mismo wire que la app real).
|
||||
|
||||
Tests del MCP dispatcher: importan gx-cli como modulo y llaman
|
||||
`_mcp_dispatch(tool_name, args)` directamente — verifica que defaults
|
||||
del MCP_DISPATCH cubren todos los kwargs que las funciones cmd_*
|
||||
esperan (este es el bug original: `args.notes` faltaba).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
from importlib.machinery import SourceFileLoader
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
GX = Path(__file__).resolve().parents[1] / "gx-cli"
|
||||
assert GX.exists(), f"gx-cli no existe en {GX}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schema minimo replicado para los tests. Espejo del schema real al que
|
||||
# apunta gx-cli; si cambia en operations.db, actualizar aqui tambien.
|
||||
# ---------------------------------------------------------------------------
|
||||
OPS_SCHEMA = """
|
||||
CREATE TABLE entities (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
type_ref TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'active',
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
domain TEXT NOT NULL DEFAULT '',
|
||||
tags TEXT NOT NULL DEFAULT '[]',
|
||||
source TEXT NOT NULL,
|
||||
metadata TEXT NOT NULL DEFAULT '{}',
|
||||
notes TEXT NOT NULL DEFAULT '',
|
||||
group_id TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
CREATE TABLE relations (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
from_entity TEXT NOT NULL DEFAULT '',
|
||||
to_entity TEXT NOT NULL,
|
||||
via TEXT NOT NULL DEFAULT '',
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
purity TEXT NOT NULL DEFAULT '',
|
||||
direction TEXT NOT NULL DEFAULT 'unidirectional',
|
||||
weight REAL,
|
||||
status TEXT NOT NULL DEFAULT 'designed',
|
||||
started_at TEXT,
|
||||
ended_at TEXT,
|
||||
"order" INTEGER,
|
||||
tags TEXT NOT NULL DEFAULT '[]',
|
||||
notes TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
CREATE VIRTUAL TABLE entities_fts USING fts5(
|
||||
id UNINDEXED, name, description, tags,
|
||||
content=entities, content_rowid=rowid
|
||||
);
|
||||
CREATE TRIGGER entities_ai AFTER INSERT ON entities BEGIN
|
||||
INSERT INTO entities_fts(rowid, id, name, description, tags)
|
||||
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
|
||||
END;
|
||||
CREATE TRIGGER entities_ad AFTER DELETE ON entities BEGIN
|
||||
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
|
||||
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
|
||||
END;
|
||||
CREATE TRIGGER entities_au AFTER UPDATE ON entities BEGIN
|
||||
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
|
||||
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
|
||||
INSERT INTO entities_fts(rowid, id, name, description, tags)
|
||||
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
|
||||
END;
|
||||
"""
|
||||
|
||||
APP_SCHEMA = """
|
||||
CREATE TABLE jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
enricher_id TEXT NOT NULL,
|
||||
node_id TEXT,
|
||||
node_name TEXT NOT NULL DEFAULT '',
|
||||
params_json TEXT NOT NULL DEFAULT '{}',
|
||||
status TEXT NOT NULL,
|
||||
progress REAL NOT NULL DEFAULT 0,
|
||||
stage TEXT NOT NULL DEFAULT '',
|
||||
result_json TEXT,
|
||||
error TEXT,
|
||||
pid INTEGER,
|
||||
created_at INTEGER NOT NULL,
|
||||
started_at INTEGER,
|
||||
finished_at INTEGER
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def env_dirs(tmp_path):
|
||||
"""Crea operations.db + graph_explorer.db con schemas y devuelve env."""
|
||||
ops = tmp_path / "operations.db"
|
||||
cn = sqlite3.connect(ops)
|
||||
cn.executescript(OPS_SCHEMA)
|
||||
cn.commit()
|
||||
cn.close()
|
||||
|
||||
appdb = tmp_path / "graph_explorer.db"
|
||||
cn = sqlite3.connect(appdb)
|
||||
cn.executescript(APP_SCHEMA)
|
||||
cn.commit()
|
||||
cn.close()
|
||||
|
||||
enrichers_dir = tmp_path / "enrichers"
|
||||
enrichers_dir.mkdir()
|
||||
|
||||
env = os.environ.copy()
|
||||
env["GX_OPS_DB"] = str(ops)
|
||||
env["GX_APP_DB"] = str(appdb)
|
||||
env["GX_APP_DIR"] = str(tmp_path)
|
||||
return {"ops": ops, "app": appdb, "dir": tmp_path, "env": env}
|
||||
|
||||
|
||||
def run_gx(env_dirs, *args, expect_ok: bool = True) -> dict:
|
||||
proc = subprocess.run(
|
||||
[sys.executable, str(GX), *args],
|
||||
capture_output=True, text=True, timeout=15, env=env_dirs["env"],
|
||||
)
|
||||
if proc.stdout.strip():
|
||||
try:
|
||||
payload = json.loads(proc.stdout.strip().splitlines()[-1])
|
||||
except json.JSONDecodeError:
|
||||
payload = {"_raw": proc.stdout, "_stderr": proc.stderr}
|
||||
else:
|
||||
payload = {"_raw": "", "_stderr": proc.stderr,
|
||||
"_returncode": proc.returncode}
|
||||
if expect_ok:
|
||||
assert proc.returncode == 0, \
|
||||
f"exit {proc.returncode}\nstdout: {proc.stdout}\nstderr: {proc.stderr}"
|
||||
assert payload.get("ok"), payload
|
||||
return payload
|
||||
|
||||
|
||||
def fetchone(ops_db, sql, *params):
|
||||
cn = sqlite3.connect(ops_db)
|
||||
try:
|
||||
cur = cn.execute(sql, params)
|
||||
return cur.fetchone()
|
||||
finally:
|
||||
cn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI: node create / update con notes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCliNodeCreate:
|
||||
def test_create_basic(self, env_dirs):
|
||||
out = run_gx(env_dirs,
|
||||
"node", "create", "--name", "doc1", "--type", "text")
|
||||
assert out["name"] == "doc1"
|
||||
assert out["type_ref"] == "text"
|
||||
assert out["id"].startswith("text_")
|
||||
|
||||
def test_create_with_notes(self, env_dirs):
|
||||
out = run_gx(env_dirs, "node", "create", "--name", "doc2",
|
||||
"--type", "text", "--notes", "Texto inicial largo.")
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", out["id"])
|
||||
assert row[0] == "Texto inicial largo."
|
||||
|
||||
def test_create_without_notes_defaults_empty(self, env_dirs):
|
||||
out = run_gx(env_dirs, "node", "create", "--name", "doc3",
|
||||
"--type", "text")
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", out["id"])
|
||||
assert row[0] == ""
|
||||
|
||||
def test_create_autodetect_type_from_email(self, env_dirs):
|
||||
out = run_gx(env_dirs, "node", "create", "--name", "foo@bar.com")
|
||||
# detector marca emails con @ -> type=email
|
||||
assert out["type_ref"] == "email"
|
||||
|
||||
|
||||
class TestCliNodeUpdate:
|
||||
def setup_method(self):
|
||||
self.id_ = None
|
||||
|
||||
def _make(self, env_dirs, **kwargs):
|
||||
out = run_gx(env_dirs, "node", "create",
|
||||
"--name", kwargs.get("name", "x"),
|
||||
"--type", kwargs.get("type", "text"))
|
||||
return out["id"]
|
||||
|
||||
def test_update_notes_replace(self, env_dirs):
|
||||
nid = self._make(env_dirs)
|
||||
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
|
||||
run_gx(env_dirs, "node", "update", nid, "--notes", "reemplazo")
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", nid)
|
||||
assert row[0] == "reemplazo"
|
||||
|
||||
def test_update_notes_append(self, env_dirs):
|
||||
nid = self._make(env_dirs)
|
||||
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
|
||||
run_gx(env_dirs, "node", "update", nid, "--notes", "segundo",
|
||||
"--append-notes")
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", nid)
|
||||
# double newline separator
|
||||
assert row[0] == "primero\n\nsegundo"
|
||||
|
||||
def test_update_append_on_empty_creates_with_separator(self, env_dirs):
|
||||
"""Append sobre notes vacio: separator se anyade igualmente — comportamiento
|
||||
estable; el agente puede limpiarlo despues si le molesta el doble newline
|
||||
inicial."""
|
||||
nid = self._make(env_dirs)
|
||||
run_gx(env_dirs, "node", "update", nid, "--notes", "primero",
|
||||
"--append-notes")
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", nid)
|
||||
# Como notes empieza vacio "" + "\n\nprimero" = "\n\nprimero".
|
||||
assert row[0] == "\n\nprimero"
|
||||
|
||||
def test_update_other_fields_dont_clobber_notes(self, env_dirs):
|
||||
nid = self._make(env_dirs)
|
||||
run_gx(env_dirs, "node", "update", nid, "--notes", "preservame")
|
||||
run_gx(env_dirs, "node", "update", nid, "--description", "desc nueva")
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT notes, description FROM entities WHERE id=?",
|
||||
nid)
|
||||
assert row[0] == "preservame"
|
||||
assert row[1] == "desc nueva"
|
||||
|
||||
def test_update_no_fields_errors(self, env_dirs):
|
||||
nid = self._make(env_dirs)
|
||||
out = run_gx(env_dirs, "node", "update", nid, expect_ok=False)
|
||||
assert out.get("ok") is False
|
||||
assert "no fields to update" in out.get("error", "").lower()
|
||||
|
||||
def test_update_unknown_id_errors(self, env_dirs):
|
||||
out = run_gx(env_dirs, "node", "update", "id_que_no_existe",
|
||||
"--name", "x", expect_ok=False)
|
||||
assert out.get("ok") is False
|
||||
|
||||
|
||||
class TestCliNodeListShow:
|
||||
def test_node_show_returns_notes(self, env_dirs):
|
||||
out = run_gx(env_dirs, "node", "create", "--name", "shown",
|
||||
"--type", "text", "--notes", "contenido visible")
|
||||
nid = out["id"]
|
||||
shown = run_gx(env_dirs, "node", "show", nid)
|
||||
assert shown["entity"]["notes"] == "contenido visible"
|
||||
|
||||
def test_node_list_filters_by_type(self, env_dirs):
|
||||
run_gx(env_dirs, "node", "create", "--name", "p1", "--type", "person")
|
||||
run_gx(env_dirs, "node", "create", "--name", "p2", "--type", "person")
|
||||
run_gx(env_dirs, "node", "create", "--name", "t1", "--type", "text")
|
||||
listed = run_gx(env_dirs, "node", "list", "--type", "person")
|
||||
assert listed["count"] == 2
|
||||
|
||||
def test_node_delete(self, env_dirs):
|
||||
out = run_gx(env_dirs, "node", "create", "--name", "tmp",
|
||||
"--type", "text")
|
||||
nid = out["id"]
|
||||
run_gx(env_dirs, "node", "delete", nid)
|
||||
row = fetchone(env_dirs["ops"],
|
||||
"SELECT id FROM entities WHERE id=?", nid)
|
||||
assert row is None
|
||||
|
||||
|
||||
class TestCliRelations:
|
||||
def test_rel_create_and_list(self, env_dirs):
|
||||
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
|
||||
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
|
||||
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
|
||||
"--to", b["id"], "--name", "KNOWS")
|
||||
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
|
||||
assert listed["count"] == 1
|
||||
assert listed["rows"][0]["name"] == "KNOWS"
|
||||
assert listed["rows"][0]["to_entity"] == b["id"]
|
||||
|
||||
def test_rel_delete(self, env_dirs):
|
||||
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
|
||||
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
|
||||
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
|
||||
"--to", b["id"])
|
||||
run_gx(env_dirs, "rel", "delete", rel["id"])
|
||||
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
|
||||
assert listed["count"] == 0
|
||||
|
||||
def test_node_delete_cascades_relations(self, env_dirs):
|
||||
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
|
||||
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
|
||||
run_gx(env_dirs, "rel", "create", "--from", a["id"], "--to", b["id"])
|
||||
run_gx(env_dirs, "node", "delete", a["id"])
|
||||
# Sin a, no debe quedar relacion que lo apunte como from.
|
||||
cn = sqlite3.connect(env_dirs["ops"])
|
||||
rows = cn.execute(
|
||||
"SELECT id FROM relations WHERE from_entity=? OR to_entity=?",
|
||||
(a["id"], a["id"])
|
||||
).fetchall()
|
||||
cn.close()
|
||||
assert rows == []
|
||||
|
||||
|
||||
class TestCliQuery:
|
||||
def test_query_select(self, env_dirs):
|
||||
run_gx(env_dirs, "node", "create", "--name", "q1", "--type", "text")
|
||||
run_gx(env_dirs, "node", "create", "--name", "q2", "--type", "text")
|
||||
out = run_gx(env_dirs, "query", "SELECT count(*) AS n FROM entities")
|
||||
# gx-cli stringifica los valores en query (cmd_query usa str(v));
|
||||
# esperar la version string es coherente con el contrato actual.
|
||||
assert out["rows"][0]["n"] == "2"
|
||||
|
||||
def test_query_rejects_writes(self, env_dirs):
|
||||
out = run_gx(env_dirs, "query",
|
||||
"DELETE FROM entities WHERE 1=1", expect_ok=False)
|
||||
assert out.get("ok") is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCP dispatcher: importa gx-cli como modulo, llama _mcp_dispatch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def gx_module():
|
||||
"""Carga gx-cli como modulo Python para acceder a MCP_DISPATCH.
|
||||
|
||||
El script no tiene extension .py, asi que `spec_from_file_location`
|
||||
no detecta loader — hay que pasar `SourceFileLoader` explicito.
|
||||
"""
|
||||
loader = SourceFileLoader("gxcli", str(GX))
|
||||
spec = importlib.util.spec_from_loader("gxcli", loader)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mcp_env(env_dirs, monkeypatch):
|
||||
"""Activa el env de gx-cli en el proceso (gx-cli lee os.environ)."""
|
||||
monkeypatch.setenv("GX_OPS_DB", str(env_dirs["ops"]))
|
||||
monkeypatch.setenv("GX_APP_DB", str(env_dirs["app"]))
|
||||
monkeypatch.setenv("GX_APP_DIR", str(env_dirs["dir"]))
|
||||
return env_dirs
|
||||
|
||||
|
||||
class TestMcpDispatcher:
|
||||
"""Verifica que MCP_DISPATCH entrega a cada cmd_* todos los kwargs
|
||||
que necesita aunque el LLM omita campos opcionales — bug original:
|
||||
`args.notes` no existia porque MCP_DISPATCH no lo defaulteaba."""
|
||||
|
||||
def test_node_create_sin_notes_no_revienta(self, gx_module, mcp_env):
|
||||
out = gx_module._mcp_dispatch("node_create",
|
||||
{"name": "x", "type": "text"})
|
||||
assert out.get("ok") is True, out
|
||||
|
||||
def test_node_create_con_notes(self, gx_module, mcp_env):
|
||||
out = gx_module._mcp_dispatch("node_create", {
|
||||
"name": "y", "type": "text", "notes": "hola mundo"
|
||||
})
|
||||
assert out["ok"] is True
|
||||
row = fetchone(mcp_env["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", out["id"])
|
||||
assert row[0] == "hola mundo"
|
||||
|
||||
def test_node_update_sin_notes_no_revienta(self, gx_module, mcp_env):
|
||||
created = gx_module._mcp_dispatch("node_create",
|
||||
{"name": "z", "type": "text"})
|
||||
out = gx_module._mcp_dispatch("node_update",
|
||||
{"id": created["id"], "name": "zz"})
|
||||
assert out["ok"] is True
|
||||
|
||||
def test_node_update_replace_notes(self, gx_module, mcp_env):
|
||||
created = gx_module._mcp_dispatch("node_create",
|
||||
{"name": "u1", "type": "text"})
|
||||
gx_module._mcp_dispatch("node_update", {
|
||||
"id": created["id"], "notes": "primero",
|
||||
})
|
||||
row = fetchone(mcp_env["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", created["id"])
|
||||
assert row[0] == "primero"
|
||||
|
||||
def test_node_update_append_notes(self, gx_module, mcp_env):
|
||||
created = gx_module._mcp_dispatch("node_create",
|
||||
{"name": "u2", "type": "text"})
|
||||
gx_module._mcp_dispatch("node_update", {
|
||||
"id": created["id"], "notes": "primero",
|
||||
})
|
||||
gx_module._mcp_dispatch("node_update", {
|
||||
"id": created["id"], "notes": "segundo", "append_notes": True,
|
||||
})
|
||||
row = fetchone(mcp_env["ops"],
|
||||
"SELECT notes FROM entities WHERE id=?", created["id"])
|
||||
assert row[0] == "primero\n\nsegundo"
|
||||
|
||||
def test_dispatch_returns_error_for_unknown_tool(self, gx_module, mcp_env):
|
||||
out = gx_module._mcp_dispatch("not_a_tool", {})
|
||||
assert out["ok"] is False
|
||||
assert "unknown tool" in out["error"]
|
||||
|
||||
def test_all_dispatch_entries_have_callable(self, gx_module):
|
||||
"""Sanity: MCP_DISPATCH no tiene tools sin funcion. Si en el futuro
|
||||
alguien anyade una entrada y olvida cablear el cmd_*, este test
|
||||
falla."""
|
||||
for name, (fn, defaults) in gx_module.MCP_DISPATCH.items():
|
||||
assert callable(fn), f"{name} no tiene cmd_* callable"
|
||||
assert isinstance(defaults, dict), f"{name} defaults no es dict"
|
||||
|
||||
def test_mcp_tools_match_dispatch_keys(self, gx_module):
|
||||
"""El listado MCP_TOOLS (lo que el LLM ve) debe alinearse 1:1 con
|
||||
las claves de MCP_DISPATCH (lo que el server ejecuta). Si un tool
|
||||
anuncia capacidades pero el dispatch no las tiene, es un bug."""
|
||||
tool_names = {t["name"] for t in gx_module.MCP_TOOLS}
|
||||
dispatch_names = set(gx_module.MCP_DISPATCH.keys())
|
||||
missing_in_dispatch = tool_names - dispatch_names
|
||||
missing_in_tools = dispatch_names - tool_names
|
||||
assert not missing_in_dispatch, \
|
||||
f"tools sin dispatch: {missing_in_dispatch}"
|
||||
assert not missing_in_tools, \
|
||||
f"dispatch sin tool: {missing_in_tools}"
|
||||
|
||||
|
||||
class TestMcpRegression0035d:
|
||||
"""El bug que el agente Echo encontro: `args.notes` no existia en
|
||||
SimpleNamespace porque MCP_DISPATCH no lo defaulteaba. Estos tests
|
||||
fijan el contrato para que no vuelva a romperse silenciosamente."""
|
||||
|
||||
def test_node_create_dispatch_includes_notes_default(self, gx_module):
|
||||
_, defaults = gx_module.MCP_DISPATCH["node_create"]
|
||||
assert "notes" in defaults
|
||||
assert defaults["notes"] is None
|
||||
|
||||
def test_node_update_dispatch_includes_notes_and_append(self, gx_module):
|
||||
_, defaults = gx_module.MCP_DISPATCH["node_update"]
|
||||
assert "notes" in defaults
|
||||
assert defaults["notes"] is None
|
||||
assert "append_notes" in defaults
|
||||
assert defaults["append_notes"] is False
|
||||
|
||||
def test_node_create_inputschema_advertises_notes(self, gx_module):
|
||||
tool = next(t for t in gx_module.MCP_TOOLS
|
||||
if t["name"] == "node_create")
|
||||
assert "notes" in tool["inputSchema"]["properties"]
|
||||
|
||||
def test_node_update_inputschema_advertises_notes_and_append(self, gx_module):
|
||||
tool = next(t for t in gx_module.MCP_TOOLS
|
||||
if t["name"] == "node_update")
|
||||
props = tool["inputSchema"]["properties"]
|
||||
assert "notes" in props
|
||||
assert "append_notes" in props
|
||||
Reference in New Issue
Block a user