Files
egutierrez f0d8a5ad04 feat(0036d): promote kind-aware (Group → clear group_id)
NodeGroups window kind=Group ahora expone un boton SmallButton(TI_ARROW_UP)
por fila que saca la entidad del grupo (group_id = NULL) y dispara
reload del grafo. kind=Table mantiene el comportamiento de issue 0011.

- entity_ops: nueva op `entity_clear_group_id(db, id)` idempotente. Si
  la columna group_id no existe (BD pre-0035a) retorna true como no-op.
  Falla solo si la entidad no existe o SQLite revienta.
- views.cpp: extra columna "promote" en kind=Group, tooltip header
  diferenciado por kind, boton conectado a app.want_clear_group_id_entity.
- main.cpp: handler que ejecuta entity_clear_group_id, marca windows
  como dirty, llama reload_after_mutation y loguea
  `[node_groups] promoted X out of group`.
- gx-cli: flag `node update --clear-group-id` (booleano) y exposicion
  MCP en inputSchema + MCP_DISPATCH defaults para que el agente Echo
  pueda promover via tool calls.
- tests: 3 nuevos CLI (clear, idempotente, combinable con --name) y
  4 MCP (defaults, schema, dispatch end-to-end, idempotente).

WSL: 102 passed (95 base + 7).
Windows: 91 passed, 11 skipped (84 base + 7).

Refs: issues/0036d-promote-kind-aware.md
2026-05-04 01:03:11 +02:00

700 lines
29 KiB
Python

"""Tests del CLI `gx-cli` y del dispatcher MCP empaquetado dentro.
Cubre las herramientas que el agente Echo usa para manejar el grafo.
Cada test prepara una operations.db + graph_explorer.db efimeras y
ejecuta gx-cli como subproceso pasando GX_OPS_DB / GX_APP_DB / GX_APP_DIR
en el entorno (mismo wire que la app real).
Tests del MCP dispatcher: importan gx-cli como modulo y llaman
`_mcp_dispatch(tool_name, args)` directamente — verifica que defaults
del MCP_DISPATCH cubren todos los kwargs que las funciones cmd_*
esperan (este es el bug original: `args.notes` faltaba).
"""
from __future__ import annotations
import importlib.util
import json
import os
import sqlite3
import subprocess
import sys
from importlib.machinery import SourceFileLoader
from pathlib import Path
import pytest
GX = Path(__file__).resolve().parents[1] / "gx-cli"
assert GX.exists(), f"gx-cli no existe en {GX}"
# ---------------------------------------------------------------------------
# Schema minimo replicado para los tests. Espejo del schema real al que
# apunta gx-cli; si cambia en operations.db, actualizar aqui tambien.
# ---------------------------------------------------------------------------
OPS_SCHEMA = """
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type_ref TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
description TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '',
group_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE relations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
from_entity TEXT NOT NULL DEFAULT '',
to_entity TEXT NOT NULL,
via TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
purity TEXT NOT NULL DEFAULT '',
direction TEXT NOT NULL DEFAULT 'unidirectional',
weight REAL,
status TEXT NOT NULL DEFAULT 'designed',
started_at TEXT,
ended_at TEXT,
"order" INTEGER,
tags TEXT NOT NULL DEFAULT '[]',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE VIRTUAL TABLE entities_fts USING fts5(
id UNINDEXED, name, description, tags,
content=entities, content_rowid=rowid
);
CREATE TRIGGER entities_ai AFTER INSERT ON entities BEGIN
INSERT INTO entities_fts(rowid, id, name, description, tags)
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
END;
CREATE TRIGGER entities_ad AFTER DELETE ON entities BEGIN
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
END;
CREATE TRIGGER entities_au AFTER UPDATE ON entities BEGIN
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
INSERT INTO entities_fts(rowid, id, name, description, tags)
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
END;
"""
APP_SCHEMA = """
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
enricher_id TEXT NOT NULL,
node_id TEXT,
node_name TEXT NOT NULL DEFAULT '',
params_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL,
progress REAL NOT NULL DEFAULT 0,
stage TEXT NOT NULL DEFAULT '',
result_json TEXT,
error TEXT,
pid INTEGER,
created_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER
);
"""
@pytest.fixture
def env_dirs(tmp_path):
"""Crea operations.db + graph_explorer.db con schemas y devuelve env."""
ops = tmp_path / "operations.db"
cn = sqlite3.connect(ops)
cn.executescript(OPS_SCHEMA)
cn.commit()
cn.close()
appdb = tmp_path / "graph_explorer.db"
cn = sqlite3.connect(appdb)
cn.executescript(APP_SCHEMA)
cn.commit()
cn.close()
enrichers_dir = tmp_path / "enrichers"
enrichers_dir.mkdir()
env = os.environ.copy()
env["GX_OPS_DB"] = str(ops)
env["GX_APP_DB"] = str(appdb)
env["GX_APP_DIR"] = str(tmp_path)
return {"ops": ops, "app": appdb, "dir": tmp_path, "env": env}
def run_gx(env_dirs, *args, expect_ok: bool = True) -> dict:
proc = subprocess.run(
[sys.executable, str(GX), *args],
capture_output=True, text=True, timeout=15, env=env_dirs["env"],
)
if proc.stdout.strip():
try:
payload = json.loads(proc.stdout.strip().splitlines()[-1])
except json.JSONDecodeError:
payload = {"_raw": proc.stdout, "_stderr": proc.stderr}
else:
payload = {"_raw": "", "_stderr": proc.stderr,
"_returncode": proc.returncode}
if expect_ok:
assert proc.returncode == 0, \
f"exit {proc.returncode}\nstdout: {proc.stdout}\nstderr: {proc.stderr}"
assert payload.get("ok"), payload
return payload
def fetchone(ops_db, sql, *params):
cn = sqlite3.connect(ops_db)
try:
cur = cn.execute(sql, params)
return cur.fetchone()
finally:
cn.close()
# ---------------------------------------------------------------------------
# CLI: node create / update con notes
# ---------------------------------------------------------------------------
class TestCliNodeCreate:
def test_create_basic(self, env_dirs):
out = run_gx(env_dirs,
"node", "create", "--name", "doc1", "--type", "text")
assert out["name"] == "doc1"
assert out["type_ref"] == "text"
assert out["id"].startswith("text_")
def test_create_with_notes(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "doc2",
"--type", "text", "--notes", "Texto inicial largo.")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", out["id"])
assert row[0] == "Texto inicial largo."
def test_create_without_notes_defaults_empty(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "doc3",
"--type", "text")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", out["id"])
assert row[0] == ""
def test_create_autodetect_type_from_email(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "foo@bar.com")
# detector marca emails con @ -> type=email
assert out["type_ref"] == "email"
class TestCliNodeUpdate:
def setup_method(self):
self.id_ = None
def _make(self, env_dirs, **kwargs):
out = run_gx(env_dirs, "node", "create",
"--name", kwargs.get("name", "x"),
"--type", kwargs.get("type", "text"))
return out["id"]
def test_update_notes_replace(self, env_dirs):
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
run_gx(env_dirs, "node", "update", nid, "--notes", "reemplazo")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", nid)
assert row[0] == "reemplazo"
def test_update_notes_append(self, env_dirs):
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
run_gx(env_dirs, "node", "update", nid, "--notes", "segundo",
"--append-notes")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", nid)
# double newline separator
assert row[0] == "primero\n\nsegundo"
def test_update_append_on_empty_creates_with_separator(self, env_dirs):
"""Append sobre notes vacio: separator se anyade igualmente — comportamiento
estable; el agente puede limpiarlo despues si le molesta el doble newline
inicial."""
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "primero",
"--append-notes")
row = fetchone(env_dirs["ops"],
"SELECT notes FROM entities WHERE id=?", nid)
# Como notes empieza vacio "" + "\n\nprimero" = "\n\nprimero".
assert row[0] == "\n\nprimero"
def test_update_other_fields_dont_clobber_notes(self, env_dirs):
nid = self._make(env_dirs)
run_gx(env_dirs, "node", "update", nid, "--notes", "preservame")
run_gx(env_dirs, "node", "update", nid, "--description", "desc nueva")
row = fetchone(env_dirs["ops"],
"SELECT notes, description FROM entities WHERE id=?",
nid)
assert row[0] == "preservame"
assert row[1] == "desc nueva"
def test_update_no_fields_errors(self, env_dirs):
nid = self._make(env_dirs)
out = run_gx(env_dirs, "node", "update", nid, expect_ok=False)
assert out.get("ok") is False
assert "no fields to update" in out.get("error", "").lower()
def test_update_unknown_id_errors(self, env_dirs):
out = run_gx(env_dirs, "node", "update", "id_que_no_existe",
"--name", "x", expect_ok=False)
assert out.get("ok") is False
def test_node_update_clear_group_id(self, env_dirs):
"""Issue 0036d: --clear-group-id saca la entidad del grupo."""
nid = self._make(env_dirs, name="grouped")
# Asignar group_id directamente via SQL (simula entity en grupo).
cn = sqlite3.connect(env_dirs["ops"])
cn.execute("UPDATE entities SET group_id = ? WHERE id = ?",
("group_xyz", nid))
cn.commit()
cn.close()
# Sanity: precondicion.
row = fetchone(env_dirs["ops"],
"SELECT group_id FROM entities WHERE id=?", nid)
assert row[0] == "group_xyz"
# Run clear.
run_gx(env_dirs, "node", "update", nid, "--clear-group-id")
row = fetchone(env_dirs["ops"],
"SELECT group_id FROM entities WHERE id=?", nid)
assert row[0] is None
def test_node_update_clear_group_id_idempotent(self, env_dirs):
"""Issue 0036d: clear sobre entidad ya sin grupo no falla."""
nid = self._make(env_dirs, name="solo")
# Precondicion: group_id NULL desde el start.
row = fetchone(env_dirs["ops"],
"SELECT group_id FROM entities WHERE id=?", nid)
assert row[0] is None
# Clear es no-op.
run_gx(env_dirs, "node", "update", nid, "--clear-group-id")
row = fetchone(env_dirs["ops"],
"SELECT group_id FROM entities WHERE id=?", nid)
assert row[0] is None
# Y la entidad sigue existiendo.
row = fetchone(env_dirs["ops"],
"SELECT id FROM entities WHERE id=?", nid)
assert row is not None and row[0] == nid
def test_node_update_clear_group_id_combinable(self, env_dirs):
"""--clear-group-id se combina con otros sets en un mismo update."""
nid = self._make(env_dirs, name="combo")
cn = sqlite3.connect(env_dirs["ops"])
cn.execute("UPDATE entities SET group_id = ? WHERE id = ?",
("group_abc", nid))
cn.commit()
cn.close()
run_gx(env_dirs, "node", "update", nid,
"--clear-group-id", "--name", "renamed")
row = fetchone(env_dirs["ops"],
"SELECT group_id, name FROM entities WHERE id=?", nid)
assert row[0] is None
assert row[1] == "renamed"
class TestCliNodeListShow:
def test_node_show_returns_notes(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "shown",
"--type", "text", "--notes", "contenido visible")
nid = out["id"]
shown = run_gx(env_dirs, "node", "show", nid)
assert shown["entity"]["notes"] == "contenido visible"
def test_node_list_filters_by_type(self, env_dirs):
run_gx(env_dirs, "node", "create", "--name", "p1", "--type", "person")
run_gx(env_dirs, "node", "create", "--name", "p2", "--type", "person")
run_gx(env_dirs, "node", "create", "--name", "t1", "--type", "text")
listed = run_gx(env_dirs, "node", "list", "--type", "person")
assert listed["count"] == 2
def test_node_delete(self, env_dirs):
out = run_gx(env_dirs, "node", "create", "--name", "tmp",
"--type", "text")
nid = out["id"]
run_gx(env_dirs, "node", "delete", nid)
row = fetchone(env_dirs["ops"],
"SELECT id FROM entities WHERE id=?", nid)
assert row is None
class TestCliRelations:
def test_rel_create_and_list(self, env_dirs):
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
"--to", b["id"], "--name", "KNOWS")
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
assert listed["count"] == 1
assert listed["rows"][0]["name"] == "KNOWS"
assert listed["rows"][0]["to_entity"] == b["id"]
def test_rel_delete(self, env_dirs):
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
"--to", b["id"])
run_gx(env_dirs, "rel", "delete", rel["id"])
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
assert listed["count"] == 0
def test_node_delete_cascades_relations(self, env_dirs):
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
run_gx(env_dirs, "rel", "create", "--from", a["id"], "--to", b["id"])
run_gx(env_dirs, "node", "delete", a["id"])
# Sin a, no debe quedar relacion que lo apunte como from.
cn = sqlite3.connect(env_dirs["ops"])
rows = cn.execute(
"SELECT id FROM relations WHERE from_entity=? OR to_entity=?",
(a["id"], a["id"])
).fetchall()
cn.close()
assert rows == []
class TestCliEnricherRun:
"""`enricher run` encola un job. Debe escribir un fichero JSON en
`<app_dir>/agent_jobs_queue/` (NO insertar en SQL). Esto blinda
contra la regresion del bug "disk I/O error" que aparecia cuando
gx-cli (en WSL) escribia agent_jobs en graph_explorer.db abierta
en WAL desde Windows."""
def _make_enricher(self, env_dirs, eid: str = "split_sentences"):
"""Crea un manifest dummy en el dir de enrichers para que la
validacion de gx-cli (`if manifest.yaml exists`) pase."""
edir = env_dirs["dir"] / "enrichers" / eid
edir.mkdir(parents=True, exist_ok=True)
(edir / "manifest.yaml").write_text(
f"id: {eid}\nname: x\napplies_to: [text]\n", encoding="utf-8"
)
def test_enricher_run_writes_json_file(self, env_dirs):
self._make_enricher(env_dirs, "split_sentences")
node = run_gx(env_dirs, "node", "create", "--name", "doc",
"--type", "text")
out = run_gx(env_dirs, "enricher", "run", "split_sentences",
"--node", node["id"])
# 1 fichero JSON dropped en agent_jobs_queue/
queue = env_dirs["dir"] / "agent_jobs_queue"
files = list(queue.glob("*.json"))
assert len(files) == 1
payload = json.loads(files[0].read_text(encoding="utf-8"))
assert payload["enricher_id"] == "split_sentences"
assert payload["node_id"] == node["id"]
assert payload["id"] == out["request_id"]
assert "params_json" in payload
# Naming: el fichero se llama <req_id>.json, no .tmp
assert files[0].name == f"{out['request_id']}.json"
def test_enricher_run_does_not_use_sqlite(self, env_dirs):
"""Regresion del bug WAL cross-9p: el flujo NO debe abrir ni
crear la tabla agent_jobs en graph_explorer.db."""
self._make_enricher(env_dirs, "split_sentences")
node = run_gx(env_dirs, "node", "create", "--name", "doc",
"--type", "text")
run_gx(env_dirs, "enricher", "run", "split_sentences",
"--node", node["id"])
# graph_explorer.db NO debe haber recibido tabla agent_jobs.
cn = sqlite3.connect(env_dirs["app"])
try:
row = cn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='agent_jobs'"
).fetchone()
finally:
cn.close()
assert row is None, \
"agent_jobs table re-creada en graph_explorer.db; el queue " \
"debe vivir en ficheros, NO en SQLite (cross-9p WAL falla)"
def test_enricher_run_unknown_enricher_errors(self, env_dirs):
out = run_gx(env_dirs, "enricher", "run", "no_existe",
expect_ok=False)
assert out.get("ok") is False
assert "not found" in (out.get("error") or "").lower()
def test_enricher_run_unknown_node_errors(self, env_dirs):
self._make_enricher(env_dirs, "split_sentences")
out = run_gx(env_dirs, "enricher", "run", "split_sentences",
"--node", "node_inexistente", expect_ok=False)
assert out.get("ok") is False
assert "node not found" in (out.get("error") or "").lower()
def test_enricher_run_atomic_write(self, env_dirs):
"""No deben quedar ficheros .tmp tras un encolado exitoso."""
self._make_enricher(env_dirs, "split_sentences")
node = run_gx(env_dirs, "node", "create", "--name", "doc",
"--type", "text")
run_gx(env_dirs, "enricher", "run", "split_sentences",
"--node", node["id"])
queue = env_dirs["dir"] / "agent_jobs_queue"
tmp_files = list(queue.glob("*.tmp"))
assert tmp_files == []
def test_enricher_run_queue_dir_derives_from_app_db(self, env_dirs,
tmp_path):
"""REGRESION: el queue_dir debe vivir junto a GX_APP_DB (que es
donde main.cpp lo escanea), NO junto a GX_APP_DIR. En el
deploy real chat.cpp setea GX_APP_DIR al repo fuente y
GX_APP_DB al install Windows — direcciones distintas. gx-cli
DEBE alinearse con APP_DB."""
self._make_enricher(env_dirs, "split_sentences")
# Mover el GX_APP_DB a un dir diferente, manteniendo GX_APP_DIR
# apuntando al original (que tiene los manifests de enrichers).
db_only_dir = tmp_path / "install_dir"
db_only_dir.mkdir()
new_app_db = db_only_dir / "graph_explorer.db"
# Crear el schema vacio en la nueva ubicacion.
cn = sqlite3.connect(new_app_db)
cn.executescript(APP_SCHEMA)
cn.commit()
cn.close()
env = dict(env_dirs["env"])
env["GX_APP_DB"] = str(new_app_db)
# GX_APP_DIR queda en env_dirs["dir"] donde estan los enrichers.
node = run_gx({**env_dirs, "env": env}, "node", "create",
"--name", "x", "--type", "text")
run_gx({**env_dirs, "env": env}, "enricher", "run",
"split_sentences", "--node", node["id"])
# El JSON DEBE estar junto al nuevo APP_DB.
files_in_new_db_dir = list(
(db_only_dir / "agent_jobs_queue").glob("*.json"))
files_in_app_dir = list(
(env_dirs["dir"] / "agent_jobs_queue").glob("*.json"))
assert len(files_in_new_db_dir) == 1, \
"queue file no aparecio junto al GX_APP_DB"
assert files_in_app_dir == [], \
"queue file aparecio junto al GX_APP_DIR (regresion del bug)"
def test_enricher_run_writes_log_to_gx_cli_log(self, env_dirs):
"""Los logs persistentes deben acabar en gx-cli.log junto a
graph_explorer.db para auditoria del agente Echo."""
self._make_enricher(env_dirs, "split_sentences")
node = run_gx(env_dirs, "node", "create", "--name", "logged",
"--type", "text")
run_gx(env_dirs, "enricher", "run", "split_sentences",
"--node", node["id"])
log_file = env_dirs["dir"] / "gx-cli.log"
assert log_file.exists()
content = log_file.read_text(encoding="utf-8")
# Algun log de node_create + alguno relacionado al enricher.
assert "node_create" in content
# El _log de enricher_run no se llama desde cmd_enricher_run
# actualmente (escribe directo a stderr). Si en el futuro se
# anyade, este assert lo cubrira automaticamente — por ahora
# basta con que node_create haya escrito.
class TestCliQuery:
def test_query_select(self, env_dirs):
run_gx(env_dirs, "node", "create", "--name", "q1", "--type", "text")
run_gx(env_dirs, "node", "create", "--name", "q2", "--type", "text")
out = run_gx(env_dirs, "query", "SELECT count(*) AS n FROM entities")
# gx-cli stringifica los valores en query (cmd_query usa str(v));
# esperar la version string es coherente con el contrato actual.
assert out["rows"][0]["n"] == "2"
def test_query_rejects_writes(self, env_dirs):
out = run_gx(env_dirs, "query",
"DELETE FROM entities WHERE 1=1", expect_ok=False)
assert out.get("ok") is False
# ---------------------------------------------------------------------------
# MCP dispatcher: importa gx-cli como modulo, llama _mcp_dispatch
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def gx_module():
"""Carga gx-cli como modulo Python para acceder a MCP_DISPATCH.
El script no tiene extension .py, asi que `spec_from_file_location`
no detecta loader — hay que pasar `SourceFileLoader` explicito.
"""
loader = SourceFileLoader("gxcli", str(GX))
spec = importlib.util.spec_from_loader("gxcli", loader)
mod = importlib.util.module_from_spec(spec)
loader.exec_module(mod)
return mod
@pytest.fixture
def mcp_env(env_dirs, monkeypatch):
"""Activa el env de gx-cli en el proceso (gx-cli lee os.environ)."""
monkeypatch.setenv("GX_OPS_DB", str(env_dirs["ops"]))
monkeypatch.setenv("GX_APP_DB", str(env_dirs["app"]))
monkeypatch.setenv("GX_APP_DIR", str(env_dirs["dir"]))
return env_dirs
class TestMcpDispatcher:
"""Verifica que MCP_DISPATCH entrega a cada cmd_* todos los kwargs
que necesita aunque el LLM omita campos opcionales — bug original:
`args.notes` no existia porque MCP_DISPATCH no lo defaulteaba."""
def test_node_create_sin_notes_no_revienta(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("node_create",
{"name": "x", "type": "text"})
assert out.get("ok") is True, out
def test_node_create_con_notes(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("node_create", {
"name": "y", "type": "text", "notes": "hola mundo"
})
assert out["ok"] is True
row = fetchone(mcp_env["ops"],
"SELECT notes FROM entities WHERE id=?", out["id"])
assert row[0] == "hola mundo"
def test_node_update_sin_notes_no_revienta(self, gx_module, mcp_env):
created = gx_module._mcp_dispatch("node_create",
{"name": "z", "type": "text"})
out = gx_module._mcp_dispatch("node_update",
{"id": created["id"], "name": "zz"})
assert out["ok"] is True
def test_node_update_replace_notes(self, gx_module, mcp_env):
created = gx_module._mcp_dispatch("node_create",
{"name": "u1", "type": "text"})
gx_module._mcp_dispatch("node_update", {
"id": created["id"], "notes": "primero",
})
row = fetchone(mcp_env["ops"],
"SELECT notes FROM entities WHERE id=?", created["id"])
assert row[0] == "primero"
def test_node_update_append_notes(self, gx_module, mcp_env):
created = gx_module._mcp_dispatch("node_create",
{"name": "u2", "type": "text"})
gx_module._mcp_dispatch("node_update", {
"id": created["id"], "notes": "primero",
})
gx_module._mcp_dispatch("node_update", {
"id": created["id"], "notes": "segundo", "append_notes": True,
})
row = fetchone(mcp_env["ops"],
"SELECT notes FROM entities WHERE id=?", created["id"])
assert row[0] == "primero\n\nsegundo"
def test_dispatch_returns_error_for_unknown_tool(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("not_a_tool", {})
assert out["ok"] is False
assert "unknown tool" in out["error"]
def test_all_dispatch_entries_have_callable(self, gx_module):
"""Sanity: MCP_DISPATCH no tiene tools sin funcion. Si en el futuro
alguien anyade una entrada y olvida cablear el cmd_*, este test
falla."""
for name, (fn, defaults) in gx_module.MCP_DISPATCH.items():
assert callable(fn), f"{name} no tiene cmd_* callable"
assert isinstance(defaults, dict), f"{name} defaults no es dict"
def test_mcp_tools_match_dispatch_keys(self, gx_module):
"""El listado MCP_TOOLS (lo que el LLM ve) debe alinearse 1:1 con
las claves de MCP_DISPATCH (lo que el server ejecuta). Si un tool
anuncia capacidades pero el dispatch no las tiene, es un bug."""
tool_names = {t["name"] for t in gx_module.MCP_TOOLS}
dispatch_names = set(gx_module.MCP_DISPATCH.keys())
missing_in_dispatch = tool_names - dispatch_names
missing_in_tools = dispatch_names - tool_names
assert not missing_in_dispatch, \
f"tools sin dispatch: {missing_in_dispatch}"
assert not missing_in_tools, \
f"dispatch sin tool: {missing_in_tools}"
class TestMcpRegression0035d:
"""El bug que el agente Echo encontro: `args.notes` no existia en
SimpleNamespace porque MCP_DISPATCH no lo defaulteaba. Estos tests
fijan el contrato para que no vuelva a romperse silenciosamente."""
def test_node_create_dispatch_includes_notes_default(self, gx_module):
_, defaults = gx_module.MCP_DISPATCH["node_create"]
assert "notes" in defaults
assert defaults["notes"] is None
def test_node_update_dispatch_includes_notes_and_append(self, gx_module):
_, defaults = gx_module.MCP_DISPATCH["node_update"]
assert "notes" in defaults
assert defaults["notes"] is None
assert "append_notes" in defaults
assert defaults["append_notes"] is False
def test_node_create_inputschema_advertises_notes(self, gx_module):
tool = next(t for t in gx_module.MCP_TOOLS
if t["name"] == "node_create")
assert "notes" in tool["inputSchema"]["properties"]
def test_node_update_inputschema_advertises_notes_and_append(self, gx_module):
tool = next(t for t in gx_module.MCP_TOOLS
if t["name"] == "node_update")
props = tool["inputSchema"]["properties"]
assert "notes" in props
assert "append_notes" in props
class TestMcpRegression0036d:
"""Issue 0036d: clear_group_id como flag MCP del node_update.
Bloquea cualquier futura regresion donde el dispatcher pierda el
default o el inputSchema deje de anunciar el campo."""
def test_node_update_dispatch_includes_clear_group_id_default(self, gx_module):
_, defaults = gx_module.MCP_DISPATCH["node_update"]
assert "clear_group_id" in defaults
assert defaults["clear_group_id"] is False
def test_node_update_inputschema_advertises_clear_group_id(self, gx_module):
tool = next(t for t in gx_module.MCP_TOOLS
if t["name"] == "node_update")
props = tool["inputSchema"]["properties"]
assert "clear_group_id" in props
assert props["clear_group_id"]["type"] == "boolean"
assert props["clear_group_id"]["default"] is False
def test_mcp_dispatch_clear_group_id_clears_membership(self, gx_module,
mcp_env):
"""End-to-end via dispatcher: crear nodo, asignar grupo via SQL,
dispatch node_update con clear_group_id=True, verificar NULL."""
out = gx_module._mcp_dispatch("node_create",
{"name": "grp_member", "type": "text"})
nid = out["id"]
cn = sqlite3.connect(mcp_env["ops"])
cn.execute("UPDATE entities SET group_id = ? WHERE id = ?",
("group_mcp", nid))
cn.commit()
cn.close()
out = gx_module._mcp_dispatch("node_update",
{"id": nid, "clear_group_id": True})
assert out["ok"] is True
row = fetchone(mcp_env["ops"],
"SELECT group_id FROM entities WHERE id=?", nid)
assert row[0] is None
def test_mcp_dispatch_clear_group_id_idempotent(self, gx_module, mcp_env):
out = gx_module._mcp_dispatch("node_create",
{"name": "grp_solo", "type": "text"})
nid = out["id"]
out = gx_module._mcp_dispatch("node_update",
{"id": nid, "clear_group_id": True})
assert out["ok"] is True