fix(gx-cli mcp): expone notes/append_notes en MCP y bloquea regresion

Bug encontrado por el agente Echo: el MCP server gx-cli (subcomando
`mcp-server`) llamaba a cmd_node_create / cmd_node_update con un
SimpleNamespace que NO incluia `notes`, asi que `args.notes` lanzaba
AttributeError. Causa raiz: MCP_DISPATCH no defaulteaba `notes` ni
`append_notes`, y el inputSchema de las tools tampoco los anunciaba.

Cambios:

* MCP_TOOLS["node_create"].inputSchema.properties anyade `notes`.
* MCP_TOOLS["node_update"].inputSchema.properties anyade `notes`
  + `append_notes` (boolean, default false).
* MCP_DISPATCH["node_create"] defaultea `notes: None`.
* MCP_DISPATCH["node_update"] defaultea `notes: None`,
  `append_notes: False`.

Tests nuevos en tests/test_gx_cli.py (30 tests):

* CLI: node create/update/delete con notes (replace + append),
  list/show/search, rel create/list/delete con cascada, query
  read-only que rechaza writes, autodetect de tipos.
* MCP dispatcher: cada cmd_* tolera args opcionales omitidos,
  notes y append_notes funcionan via dispatch, MCP_TOOLS y
  MCP_DISPATCH coinciden 1:1 (sanity contractual).
* Regresion 0035d: tests dedicados que congelan el contrato
  notes/append_notes en defaults e inputSchema — si alguien
  vuelve a quitarlos el test se queja inmediatamente.

WSL 74 / Windows 63 + 11 skipped.
This commit is contained in:
2026-05-03 16:09:47 +02:00
parent a0921d8a2c
commit e35c30cdf7
2 changed files with 475 additions and 4 deletions
+465
View File
@@ -0,0 +1,465 @@
"""Tests del CLI `gx-cli` y del dispatcher MCP empaquetado dentro.
Cubre las herramientas que el agente Echo usa para manejar el grafo.
Cada test prepara una operations.db + graph_explorer.db efimeras y
ejecuta gx-cli como subproceso pasando GX_OPS_DB / GX_APP_DB / GX_APP_DIR
en el entorno (mismo wire que la app real).
Tests del MCP dispatcher: importan gx-cli como modulo y llaman
`_mcp_dispatch(tool_name, args)` directamente — verifica que defaults
del MCP_DISPATCH cubren todos los kwargs que las funciones cmd_*
esperan (este es el bug original: `args.notes` faltaba).
"""
from __future__ import annotations
import importlib.util
import json
import os
import sqlite3
import subprocess
import sys
from importlib.machinery import SourceFileLoader
from pathlib import Path
import pytest
GX = Path(__file__).resolve().parents[1] / "gx-cli"
assert GX.exists(), f"gx-cli no existe en {GX}"
# ---------------------------------------------------------------------------
# Schema minimo replicado para los tests. Espejo del schema real al que
# apunta gx-cli; si cambia en operations.db, actualizar aqui tambien.
# ---------------------------------------------------------------------------
OPS_SCHEMA = """
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type_ref TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
description TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '',
group_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE relations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
from_entity TEXT NOT NULL DEFAULT '',
to_entity TEXT NOT NULL,
via TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
purity TEXT NOT NULL DEFAULT '',
direction TEXT NOT NULL DEFAULT 'unidirectional',
weight REAL,
status TEXT NOT NULL DEFAULT 'designed',
started_at TEXT,
ended_at TEXT,
"order" INTEGER,
tags TEXT NOT NULL DEFAULT '[]',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE VIRTUAL TABLE entities_fts USING fts5(
id UNINDEXED, name, description, tags,
content=entities, content_rowid=rowid
);
CREATE TRIGGER entities_ai AFTER INSERT ON entities BEGIN
INSERT INTO entities_fts(rowid, id, name, description, tags)
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
END;
CREATE TRIGGER entities_ad AFTER DELETE ON entities BEGIN
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
END;
CREATE TRIGGER entities_au AFTER UPDATE ON entities BEGIN
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
INSERT INTO entities_fts(rowid, id, name, description, tags)
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
END;
"""
APP_SCHEMA = """
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
enricher_id TEXT NOT NULL,
node_id TEXT,
node_name TEXT NOT NULL DEFAULT '',
params_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL,
progress REAL NOT NULL DEFAULT 0,
stage TEXT NOT NULL DEFAULT '',
result_json TEXT,
error TEXT,
pid INTEGER,
created_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER
);
"""
@pytest.fixture
def env_dirs(tmp_path):
"""Crea operations.db + graph_explorer.db con schemas y devuelve env."""
ops = tmp_path / "operations.db"
cn = sqlite3.connect(ops)
cn.executescript(OPS_SCHEMA)
cn.commit()
cn.close()
appdb = tmp_path / "graph_explorer.db"
cn = sqlite3.connect(appdb)
cn.executescript(APP_SCHEMA)
cn.commit()
cn.close()
enrichers_dir = tmp_path / "enrichers"
enrichers_dir.mkdir()
env = os.environ.copy()
env["GX_OPS_DB"] = str(ops)
env["GX_APP_DB"] = str(appdb)
env["GX_APP_DIR"] = str(tmp_path)
return {"ops": ops, "app": appdb, "dir": tmp_path, "env": env}
def run_gx(env_dirs, *args, expect_ok: bool = True) -> dict:
proc = subprocess.run(
[sys.executable, str(GX), *args],
capture_output=True, text=True, timeout=15, env=env_dirs["env"],
)
if proc.stdout.strip():
try:
payload = json.loads(proc.stdout.strip().splitlines()[-1])
except json.JSONDecodeError:
payload = {"_raw": proc.stdout, "_stderr": proc.stderr}
else:
payload = {"_raw": "", "_stderr": proc.stderr,
"_returncode": proc.returncode}
if expect_ok:
assert proc.returncode == 0, \
f"exit {proc.returncode}\nstdout: {proc.stdout}\nstderr: {proc.stderr}"
assert payload.get("ok"), payload
return payload
def fetchone(ops_db, sql, *params):
cn = sqlite3.connect(ops_db)
try:
cur = cn.execute(sql, params)
return cur.fetchone()
finally:
cn.close()
# ---------------------------------------------------------------------------
# CLI: node create / update con notes
# ---------------------------------------------------------------------------
class TestCliNodeCreate:
def test_create_basic(self, env_dirs):
out = run_gx(env_dirs,
"node", "create", "--name", "doc1", "--type", "text")
assert out["name"] == "doc1"
assert out["type_ref"] == "text"
assert out["id"].startswith("text_")
def test_create_with_notes(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "doc2",
"--type", "text", "--notes", "Texto inicial largo.")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", out["id"])
assert row[0] == "Texto inicial largo."
def test_create_without_notes_defaults_empty(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "doc3",
"--type", "text")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", out["id"])
assert row[0] == ""
def test_create_autodetect_type_from_email(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "foo@bar.com")
# detector marca emails con @ -> type=email
assert out["type_ref"] == "email"
class TestCliNodeUpdate:
def setup_method(self):
self.id_ = None
def _make(self, env_dirs, **kwargs):
out = run_gx(env_dirs, "node", "create",
"--name", kwargs.get("name", "x"),
"--type", kwargs.get("type", "text"))
return out["id"]
def test_update_notes_replace(self, env_dirs):
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
run_gx(env_dirs, "node", "update", nid, "--notes", "reemplazo")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", nid)
assert row[0] == "reemplazo"
def test_update_notes_append(self, env_dirs):
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
run_gx(env_dirs, "node", "update", nid, "--notes", "segundo",
"--append-notes")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", nid)
# double newline separator
assert row[0] == "primero\n\nsegundo"
def test_update_append_on_empty_creates_with_separator(self, env_dirs):
"""Append sobre notes vacio: separator se anyade igualmente — comportamiento
estable; el agente puede limpiarlo despues si le molesta el doble newline
inicial."""
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "primero",
"--append-notes")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", nid)
# Como notes empieza vacio "" + "\n\nprimero" = "\n\nprimero".
assert row[0] == "\n\nprimero"
def test_update_other_fields_dont_clobber_notes(self, env_dirs):
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "preservame")
run_gx(env_dirs, "node", "update", nid, "--description", "desc nueva")
row = fetchone(env_dirs["ops"],
"SELECT notes, description FROM entities WHERE id=?",
nid)
assert row[0] == "preservame"
assert row[1] == "desc nueva"
def test_update_no_fields_errors(self, env_dirs):
nid = self._make(env_dirs)
out = run_gx(env_dirs, "node", "update", nid, expect_ok=False)
assert out.get("ok") is False
assert "no fields to update" in out.get("error", "").lower()
def test_update_unknown_id_errors(self, env_dirs):
out = run_gx(env_dirs, "node", "update", "id_que_no_existe",
"--name", "x", expect_ok=False)
assert out.get("ok") is False
class TestCliNodeListShow:
def test_node_show_returns_notes(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "shown",
"--type", "text", "--notes", "contenido visible")
nid = out["id"]
shown = run_gx(env_dirs, "node", "show", nid)
assert shown["entity"]["notes"] == "contenido visible"
def test_node_list_filters_by_type(self, env_dirs):
run_gx(env_dirs, "node", "create", "--name", "p1", "--type", "person")
run_gx(env_dirs, "node", "create", "--name", "p2", "--type", "person")
run_gx(env_dirs, "node", "create", "--name", "t1", "--type", "text")
listed = run_gx(env_dirs, "node", "list", "--type", "person")
assert listed["count"] == 2
def test_node_delete(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "tmp",
"--type", "text")
nid = out["id"]
run_gx(env_dirs, "node", "delete", nid)
row = fetchone(env_dirs["ops"],
"SELECT id FROM entities WHERE id=?", nid)
assert row is None
class TestCliRelations:
def test_rel_create_and_list(self, env_dirs):
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
"--to", b["id"], "--name", "KNOWS")
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
assert listed["count"] == 1
assert listed["rows"][0]["name"] == "KNOWS"
assert listed["rows"][0]["to_entity"] == b["id"]
def test_rel_delete(self, env_dirs):
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
"--to", b["id"])
run_gx(env_dirs, "rel", "delete", rel["id"])
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
assert listed["count"] == 0
def test_node_delete_cascades_relations(self, env_dirs):
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
run_gx(env_dirs, "rel", "create", "--from", a["id"], "--to", b["id"])
run_gx(env_dirs, "node", "delete", a["id"])
# Sin a, no debe quedar relacion que lo apunte como from.
cn = sqlite3.connect(env_dirs["ops"])
rows = cn.execute(
"SELECT id FROM relations WHERE from_entity=? OR to_entity=?",
(a["id"], a["id"])
).fetchall()
cn.close()
assert rows == []
class TestCliQuery:
def test_query_select(self, env_dirs):
run_gx(env_dirs, "node", "create", "--name", "q1", "--type", "text")
run_gx(env_dirs, "node", "create", "--name", "q2", "--type", "text")
out = run_gx(env_dirs, "query", "SELECT count(*) AS n FROM entities")
# gx-cli stringifica los valores en query (cmd_query usa str(v));
# esperar la version string es coherente con el contrato actual.
assert out["rows"][0]["n"] == "2"
def test_query_rejects_writes(self, env_dirs):
out = run_gx(env_dirs, "query",
"DELETE FROM entities WHERE 1=1", expect_ok=False)
assert out.get("ok") is False
# ---------------------------------------------------------------------------
# MCP dispatcher: importa gx-cli como modulo, llama _mcp_dispatch
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def gx_module():
"""Carga gx-cli como modulo Python para acceder a MCP_DISPATCH.
El script no tiene extension .py, asi que `spec_from_file_location`
no detecta loader — hay que pasar `SourceFileLoader` explicito.
"""
loader = SourceFileLoader("gxcli", str(GX))
spec = importlib.util.spec_from_loader("gxcli", loader)
mod = importlib.util.module_from_spec(spec)
loader.exec_module(mod)
return mod
@pytest.fixture
def mcp_env(env_dirs, monkeypatch):
"""Activa el env de gx-cli en el proceso (gx-cli lee os.environ)."""
monkeypatch.setenv("GX_OPS_DB", str(env_dirs["ops"]))
monkeypatch.setenv("GX_APP_DB", str(env_dirs["app"]))
monkeypatch.setenv("GX_APP_DIR", str(env_dirs["dir"]))
return env_dirs
class TestMcpDispatcher:
"""Verifica que MCP_DISPATCH entrega a cada cmd_* todos los kwargs
que necesita aunque el LLM omita campos opcionales — bug original:
`args.notes` no existia porque MCP_DISPATCH no lo defaulteaba."""
def test_node_create_sin_notes_no_revienta(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("node_create",
{"name": "x", "type": "text"})
assert out.get("ok") is True, out
def test_node_create_con_notes(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("node_create", {
"name": "y", "type": "text", "notes": "hola mundo"
})
assert out["ok"] is True
row = fetchone(mcp_env["ops"],
"SELECT notes FROM entities WHERE id=?", out["id"])
assert row[0] == "hola mundo"
def test_node_update_sin_notes_no_revienta(self, gx_module, mcp_env):
created = gx_module._mcp_dispatch("node_create",
{"name": "z", "type": "text"})
out = gx_module._mcp_dispatch("node_update",
{"id": created["id"], "name": "zz"})
assert out["ok"] is True
def test_node_update_replace_notes(self, gx_module, mcp_env):
created = gx_module._mcp_dispatch("node_create",
{"name": "u1", "type": "text"})
gx_module._mcp_dispatch("node_update", {
"id": created["id"], "notes": "primero",
})
row = fetchone(mcp_env["ops"],
"SELECT notes FROM entities WHERE id=?", created["id"])
assert row[0] == "primero"
def test_node_update_append_notes(self, gx_module, mcp_env):
created = gx_module._mcp_dispatch("node_create",
{"name": "u2", "type": "text"})
gx_module._mcp_dispatch("node_update", {
"id": created["id"], "notes": "primero",
})
gx_module._mcp_dispatch("node_update", {
"id": created["id"], "notes": "segundo", "append_notes": True,
})
row = fetchone(mcp_env["ops"],
"SELECT notes FROM entities WHERE id=?", created["id"])
assert row[0] == "primero\n\nsegundo"
def test_dispatch_returns_error_for_unknown_tool(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("not_a_tool", {})
assert out["ok"] is False
assert "unknown tool" in out["error"]
def test_all_dispatch_entries_have_callable(self, gx_module):
"""Sanity: MCP_DISPATCH no tiene tools sin funcion. Si en el futuro
alguien anyade una entrada y olvida cablear el cmd_*, este test
falla."""
for name, (fn, defaults) in gx_module.MCP_DISPATCH.items():
assert callable(fn), f"{name} no tiene cmd_* callable"
assert isinstance(defaults, dict), f"{name} defaults no es dict"
def test_mcp_tools_match_dispatch_keys(self, gx_module):
"""El listado MCP_TOOLS (lo que el LLM ve) debe alinearse 1:1 con
las claves de MCP_DISPATCH (lo que el server ejecuta). Si un tool
anuncia capacidades pero el dispatch no las tiene, es un bug."""
tool_names = {t["name"] for t in gx_module.MCP_TOOLS}
dispatch_names = set(gx_module.MCP_DISPATCH.keys())
missing_in_dispatch = tool_names - dispatch_names
missing_in_tools = dispatch_names - tool_names
assert not missing_in_dispatch, \
f"tools sin dispatch: {missing_in_dispatch}"
assert not missing_in_tools, \
f"dispatch sin tool: {missing_in_tools}"
class TestMcpRegression0035d:
"""El bug que el agente Echo encontro: `args.notes` no existia en
SimpleNamespace porque MCP_DISPATCH no lo defaulteaba. Estos tests
fijan el contrato para que no vuelva a romperse silenciosamente."""
def test_node_create_dispatch_includes_notes_default(self, gx_module):
_, defaults = gx_module.MCP_DISPATCH["node_create"]
assert "notes" in defaults
assert defaults["notes"] is None
def test_node_update_dispatch_includes_notes_and_append(self, gx_module):
_, defaults = gx_module.MCP_DISPATCH["node_update"]
assert "notes" in defaults
assert defaults["notes"] is None
assert "append_notes" in defaults
assert defaults["append_notes"] is False
def test_node_create_inputschema_advertises_notes(self, gx_module):
tool = next(t for t in gx_module.MCP_TOOLS
if t["name"] == "node_create")
assert "notes" in tool["inputSchema"]["properties"]
def test_node_update_inputschema_advertises_notes_and_append(self, gx_module):
tool = next(t for t in gx_module.MCP_TOOLS
if t["name"] == "node_update")
props = tool["inputSchema"]["properties"]
assert "notes" in props
assert "append_notes" in props