652ff6f02c
Bug derivado del fix anterior: gx-cli escribia ficheros JSON en `$GX_APP_DIR/agent_jobs_queue/` (apuntando al repo fuente) mientras main.cpp escaneaba `parent(g_layout_db_path)/agent_jobs_queue/` (install Windows). Dos directorios distintos -> jobs huerfanos. Echo reportaba "encolado" pero el worker nunca veia los ficheros. La causa: chat.cpp setea GX_APP_DIR=<registry>/projects/osint_graph/ apps/graph_explorer y GX_APP_DB=<install>/local_files/projects/<slug>/ graph_explorer.db. Dos sitios. Solo APP_DB coincide con donde graph_explorer.exe escanea (parent del .db). Fix: * gx-cli cmd_enricher_run: queue_dir = parent(GX_APP_DB) / agent_jobs_queue. Alineado con main.cpp. * gx-cli: nuevo helper `_log(tag, msg)` que escribe a stderr Y a `<parent(app_db)>/gx-cli.log` para auditoria persistente. Cubre node_create, node_update, node_delete, rel_create, enricher_run. * gx-cli mcp _mcp_log tambien persiste a gx-cli.log. * main.cpp: log el queue scan dir una vez por sesion para detectar mismatches a futuro. * .gitignore: agent_jobs_queue/ y gx-cli.log son runtime, no se commitean. Tests: * test_enricher_run_queue_dir_derives_from_app_db (regresion) configura GX_APP_DB en un dir distinto de GX_APP_DIR y verifica que el JSON aterriza junto a APP_DB. * test_enricher_run_writes_log_to_gx_cli_log valida la auditoria. WSL 81 / Windows 70 + 11 skipped.
603 lines
25 KiB
Python
603 lines
25 KiB
Python
"""Tests del CLI `gx-cli` y del dispatcher MCP empaquetado dentro.
|
|
|
|
Cubre las herramientas que el agente Echo usa para manejar el grafo.
|
|
Cada test prepara una operations.db + graph_explorer.db efimeras y
|
|
ejecuta gx-cli como subproceso pasando GX_OPS_DB / GX_APP_DB / GX_APP_DIR
|
|
en el entorno (mismo wire que la app real).
|
|
|
|
Tests del MCP dispatcher: importan gx-cli como modulo y llaman
|
|
`_mcp_dispatch(tool_name, args)` directamente — verifica que defaults
|
|
del MCP_DISPATCH cubren todos los kwargs que las funciones cmd_*
|
|
esperan (este es el bug original: `args.notes` faltaba).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
from importlib.machinery import SourceFileLoader
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
|
|
GX = Path(__file__).resolve().parents[1] / "gx-cli"
|
|
assert GX.exists(), f"gx-cli no existe en {GX}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema minimo replicado para los tests. Espejo del schema real al que
|
|
# apunta gx-cli; si cambia en operations.db, actualizar aqui tambien.
|
|
# ---------------------------------------------------------------------------
|
|
OPS_SCHEMA = """
|
|
CREATE TABLE entities (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
type_ref TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
description TEXT NOT NULL DEFAULT '',
|
|
domain TEXT NOT NULL DEFAULT '',
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
source TEXT NOT NULL,
|
|
metadata TEXT NOT NULL DEFAULT '{}',
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
group_id TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
CREATE TABLE relations (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
from_entity TEXT NOT NULL DEFAULT '',
|
|
to_entity TEXT NOT NULL,
|
|
via TEXT NOT NULL DEFAULT '',
|
|
description TEXT NOT NULL DEFAULT '',
|
|
purity TEXT NOT NULL DEFAULT '',
|
|
direction TEXT NOT NULL DEFAULT 'unidirectional',
|
|
weight REAL,
|
|
status TEXT NOT NULL DEFAULT 'designed',
|
|
started_at TEXT,
|
|
ended_at TEXT,
|
|
"order" INTEGER,
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
CREATE VIRTUAL TABLE entities_fts USING fts5(
|
|
id UNINDEXED, name, description, tags,
|
|
content=entities, content_rowid=rowid
|
|
);
|
|
CREATE TRIGGER entities_ai AFTER INSERT ON entities BEGIN
|
|
INSERT INTO entities_fts(rowid, id, name, description, tags)
|
|
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
|
|
END;
|
|
CREATE TRIGGER entities_ad AFTER DELETE ON entities BEGIN
|
|
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
|
|
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
|
|
END;
|
|
CREATE TRIGGER entities_au AFTER UPDATE ON entities BEGIN
|
|
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags)
|
|
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags);
|
|
INSERT INTO entities_fts(rowid, id, name, description, tags)
|
|
VALUES (new.rowid, new.id, new.name, new.description, new.tags);
|
|
END;
|
|
"""
|
|
|
|
APP_SCHEMA = """
|
|
CREATE TABLE jobs (
|
|
id TEXT PRIMARY KEY,
|
|
enricher_id TEXT NOT NULL,
|
|
node_id TEXT,
|
|
node_name TEXT NOT NULL DEFAULT '',
|
|
params_json TEXT NOT NULL DEFAULT '{}',
|
|
status TEXT NOT NULL,
|
|
progress REAL NOT NULL DEFAULT 0,
|
|
stage TEXT NOT NULL DEFAULT '',
|
|
result_json TEXT,
|
|
error TEXT,
|
|
pid INTEGER,
|
|
created_at INTEGER NOT NULL,
|
|
started_at INTEGER,
|
|
finished_at INTEGER
|
|
);
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def env_dirs(tmp_path):
|
|
"""Crea operations.db + graph_explorer.db con schemas y devuelve env."""
|
|
ops = tmp_path / "operations.db"
|
|
cn = sqlite3.connect(ops)
|
|
cn.executescript(OPS_SCHEMA)
|
|
cn.commit()
|
|
cn.close()
|
|
|
|
appdb = tmp_path / "graph_explorer.db"
|
|
cn = sqlite3.connect(appdb)
|
|
cn.executescript(APP_SCHEMA)
|
|
cn.commit()
|
|
cn.close()
|
|
|
|
enrichers_dir = tmp_path / "enrichers"
|
|
enrichers_dir.mkdir()
|
|
|
|
env = os.environ.copy()
|
|
env["GX_OPS_DB"] = str(ops)
|
|
env["GX_APP_DB"] = str(appdb)
|
|
env["GX_APP_DIR"] = str(tmp_path)
|
|
return {"ops": ops, "app": appdb, "dir": tmp_path, "env": env}
|
|
|
|
|
|
def run_gx(env_dirs, *args, expect_ok: bool = True) -> dict:
|
|
proc = subprocess.run(
|
|
[sys.executable, str(GX), *args],
|
|
capture_output=True, text=True, timeout=15, env=env_dirs["env"],
|
|
)
|
|
if proc.stdout.strip():
|
|
try:
|
|
payload = json.loads(proc.stdout.strip().splitlines()[-1])
|
|
except json.JSONDecodeError:
|
|
payload = {"_raw": proc.stdout, "_stderr": proc.stderr}
|
|
else:
|
|
payload = {"_raw": "", "_stderr": proc.stderr,
|
|
"_returncode": proc.returncode}
|
|
if expect_ok:
|
|
assert proc.returncode == 0, \
|
|
f"exit {proc.returncode}\nstdout: {proc.stdout}\nstderr: {proc.stderr}"
|
|
assert payload.get("ok"), payload
|
|
return payload
|
|
|
|
|
|
def fetchone(ops_db, sql, *params):
|
|
cn = sqlite3.connect(ops_db)
|
|
try:
|
|
cur = cn.execute(sql, params)
|
|
return cur.fetchone()
|
|
finally:
|
|
cn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI: node create / update con notes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCliNodeCreate:
|
|
def test_create_basic(self, env_dirs):
|
|
out = run_gx(env_dirs,
|
|
"node", "create", "--name", "doc1", "--type", "text")
|
|
assert out["name"] == "doc1"
|
|
assert out["type_ref"] == "text"
|
|
assert out["id"].startswith("text_")
|
|
|
|
def test_create_with_notes(self, env_dirs):
|
|
out = run_gx(env_dirs, "node", "create", "--name", "doc2",
|
|
"--type", "text", "--notes", "Texto inicial largo.")
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", out["id"])
|
|
assert row[0] == "Texto inicial largo."
|
|
|
|
def test_create_without_notes_defaults_empty(self, env_dirs):
|
|
out = run_gx(env_dirs, "node", "create", "--name", "doc3",
|
|
"--type", "text")
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", out["id"])
|
|
assert row[0] == ""
|
|
|
|
def test_create_autodetect_type_from_email(self, env_dirs):
|
|
out = run_gx(env_dirs, "node", "create", "--name", "foo@bar.com")
|
|
# detector marca emails con @ -> type=email
|
|
assert out["type_ref"] == "email"
|
|
|
|
|
|
class TestCliNodeUpdate:
|
|
def setup_method(self):
|
|
self.id_ = None
|
|
|
|
def _make(self, env_dirs, **kwargs):
|
|
out = run_gx(env_dirs, "node", "create",
|
|
"--name", kwargs.get("name", "x"),
|
|
"--type", kwargs.get("type", "text"))
|
|
return out["id"]
|
|
|
|
def test_update_notes_replace(self, env_dirs):
|
|
nid = self._make(env_dirs)
|
|
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
|
|
run_gx(env_dirs, "node", "update", nid, "--notes", "reemplazo")
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", nid)
|
|
assert row[0] == "reemplazo"
|
|
|
|
def test_update_notes_append(self, env_dirs):
|
|
nid = self._make(env_dirs)
|
|
run_gx(env_dirs, "node", "update", nid, "--notes", "primero")
|
|
run_gx(env_dirs, "node", "update", nid, "--notes", "segundo",
|
|
"--append-notes")
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", nid)
|
|
# double newline separator
|
|
assert row[0] == "primero\n\nsegundo"
|
|
|
|
def test_update_append_on_empty_creates_with_separator(self, env_dirs):
|
|
"""Append sobre notes vacio: separator se anyade igualmente — comportamiento
|
|
estable; el agente puede limpiarlo despues si le molesta el doble newline
|
|
inicial."""
|
|
nid = self._make(env_dirs)
|
|
run_gx(env_dirs, "node", "update", nid, "--notes", "primero",
|
|
"--append-notes")
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", nid)
|
|
# Como notes empieza vacio "" + "\n\nprimero" = "\n\nprimero".
|
|
assert row[0] == "\n\nprimero"
|
|
|
|
def test_update_other_fields_dont_clobber_notes(self, env_dirs):
|
|
nid = self._make(env_dirs)
|
|
run_gx(env_dirs, "node", "update", nid, "--notes", "preservame")
|
|
run_gx(env_dirs, "node", "update", nid, "--description", "desc nueva")
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT notes, description FROM entities WHERE id=?",
|
|
nid)
|
|
assert row[0] == "preservame"
|
|
assert row[1] == "desc nueva"
|
|
|
|
def test_update_no_fields_errors(self, env_dirs):
|
|
nid = self._make(env_dirs)
|
|
out = run_gx(env_dirs, "node", "update", nid, expect_ok=False)
|
|
assert out.get("ok") is False
|
|
assert "no fields to update" in out.get("error", "").lower()
|
|
|
|
def test_update_unknown_id_errors(self, env_dirs):
|
|
out = run_gx(env_dirs, "node", "update", "id_que_no_existe",
|
|
"--name", "x", expect_ok=False)
|
|
assert out.get("ok") is False
|
|
|
|
|
|
class TestCliNodeListShow:
|
|
def test_node_show_returns_notes(self, env_dirs):
|
|
out = run_gx(env_dirs, "node", "create", "--name", "shown",
|
|
"--type", "text", "--notes", "contenido visible")
|
|
nid = out["id"]
|
|
shown = run_gx(env_dirs, "node", "show", nid)
|
|
assert shown["entity"]["notes"] == "contenido visible"
|
|
|
|
def test_node_list_filters_by_type(self, env_dirs):
|
|
run_gx(env_dirs, "node", "create", "--name", "p1", "--type", "person")
|
|
run_gx(env_dirs, "node", "create", "--name", "p2", "--type", "person")
|
|
run_gx(env_dirs, "node", "create", "--name", "t1", "--type", "text")
|
|
listed = run_gx(env_dirs, "node", "list", "--type", "person")
|
|
assert listed["count"] == 2
|
|
|
|
def test_node_delete(self, env_dirs):
|
|
out = run_gx(env_dirs, "node", "create", "--name", "tmp",
|
|
"--type", "text")
|
|
nid = out["id"]
|
|
run_gx(env_dirs, "node", "delete", nid)
|
|
row = fetchone(env_dirs["ops"],
|
|
"SELECT id FROM entities WHERE id=?", nid)
|
|
assert row is None
|
|
|
|
|
|
class TestCliRelations:
|
|
def test_rel_create_and_list(self, env_dirs):
|
|
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
|
|
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
|
|
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
|
|
"--to", b["id"], "--name", "KNOWS")
|
|
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
|
|
assert listed["count"] == 1
|
|
assert listed["rows"][0]["name"] == "KNOWS"
|
|
assert listed["rows"][0]["to_entity"] == b["id"]
|
|
|
|
def test_rel_delete(self, env_dirs):
|
|
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
|
|
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
|
|
rel = run_gx(env_dirs, "rel", "create", "--from", a["id"],
|
|
"--to", b["id"])
|
|
run_gx(env_dirs, "rel", "delete", rel["id"])
|
|
listed = run_gx(env_dirs, "rel", "list", "--from", a["id"])
|
|
assert listed["count"] == 0
|
|
|
|
def test_node_delete_cascades_relations(self, env_dirs):
|
|
a = run_gx(env_dirs, "node", "create", "--name", "a", "--type", "text")
|
|
b = run_gx(env_dirs, "node", "create", "--name", "b", "--type", "text")
|
|
run_gx(env_dirs, "rel", "create", "--from", a["id"], "--to", b["id"])
|
|
run_gx(env_dirs, "node", "delete", a["id"])
|
|
# Sin a, no debe quedar relacion que lo apunte como from.
|
|
cn = sqlite3.connect(env_dirs["ops"])
|
|
rows = cn.execute(
|
|
"SELECT id FROM relations WHERE from_entity=? OR to_entity=?",
|
|
(a["id"], a["id"])
|
|
).fetchall()
|
|
cn.close()
|
|
assert rows == []
|
|
|
|
|
|
class TestCliEnricherRun:
|
|
"""`enricher run` encola un job. Debe escribir un fichero JSON en
|
|
`<app_dir>/agent_jobs_queue/` (NO insertar en SQL). Esto blinda
|
|
contra la regresion del bug "disk I/O error" que aparecia cuando
|
|
gx-cli (en WSL) escribia agent_jobs en graph_explorer.db abierta
|
|
en WAL desde Windows."""
|
|
|
|
def _make_enricher(self, env_dirs, eid: str = "split_sentences"):
|
|
"""Crea un manifest dummy en el dir de enrichers para que la
|
|
validacion de gx-cli (`if manifest.yaml exists`) pase."""
|
|
edir = env_dirs["dir"] / "enrichers" / eid
|
|
edir.mkdir(parents=True, exist_ok=True)
|
|
(edir / "manifest.yaml").write_text(
|
|
f"id: {eid}\nname: x\napplies_to: [text]\n", encoding="utf-8"
|
|
)
|
|
|
|
def test_enricher_run_writes_json_file(self, env_dirs):
|
|
self._make_enricher(env_dirs, "split_sentences")
|
|
node = run_gx(env_dirs, "node", "create", "--name", "doc",
|
|
"--type", "text")
|
|
out = run_gx(env_dirs, "enricher", "run", "split_sentences",
|
|
"--node", node["id"])
|
|
# 1 fichero JSON dropped en agent_jobs_queue/
|
|
queue = env_dirs["dir"] / "agent_jobs_queue"
|
|
files = list(queue.glob("*.json"))
|
|
assert len(files) == 1
|
|
payload = json.loads(files[0].read_text(encoding="utf-8"))
|
|
assert payload["enricher_id"] == "split_sentences"
|
|
assert payload["node_id"] == node["id"]
|
|
assert payload["id"] == out["request_id"]
|
|
assert "params_json" in payload
|
|
# Naming: el fichero se llama <req_id>.json, no .tmp
|
|
assert files[0].name == f"{out['request_id']}.json"
|
|
|
|
def test_enricher_run_does_not_use_sqlite(self, env_dirs):
|
|
"""Regresion del bug WAL cross-9p: el flujo NO debe abrir ni
|
|
crear la tabla agent_jobs en graph_explorer.db."""
|
|
self._make_enricher(env_dirs, "split_sentences")
|
|
node = run_gx(env_dirs, "node", "create", "--name", "doc",
|
|
"--type", "text")
|
|
run_gx(env_dirs, "enricher", "run", "split_sentences",
|
|
"--node", node["id"])
|
|
# graph_explorer.db NO debe haber recibido tabla agent_jobs.
|
|
cn = sqlite3.connect(env_dirs["app"])
|
|
try:
|
|
row = cn.execute(
|
|
"SELECT name FROM sqlite_master "
|
|
"WHERE type='table' AND name='agent_jobs'"
|
|
).fetchone()
|
|
finally:
|
|
cn.close()
|
|
assert row is None, \
|
|
"agent_jobs table re-creada en graph_explorer.db; el queue " \
|
|
"debe vivir en ficheros, NO en SQLite (cross-9p WAL falla)"
|
|
|
|
def test_enricher_run_unknown_enricher_errors(self, env_dirs):
|
|
out = run_gx(env_dirs, "enricher", "run", "no_existe",
|
|
expect_ok=False)
|
|
assert out.get("ok") is False
|
|
assert "not found" in (out.get("error") or "").lower()
|
|
|
|
def test_enricher_run_unknown_node_errors(self, env_dirs):
|
|
self._make_enricher(env_dirs, "split_sentences")
|
|
out = run_gx(env_dirs, "enricher", "run", "split_sentences",
|
|
"--node", "node_inexistente", expect_ok=False)
|
|
assert out.get("ok") is False
|
|
assert "node not found" in (out.get("error") or "").lower()
|
|
|
|
def test_enricher_run_atomic_write(self, env_dirs):
|
|
"""No deben quedar ficheros .tmp tras un encolado exitoso."""
|
|
self._make_enricher(env_dirs, "split_sentences")
|
|
node = run_gx(env_dirs, "node", "create", "--name", "doc",
|
|
"--type", "text")
|
|
run_gx(env_dirs, "enricher", "run", "split_sentences",
|
|
"--node", node["id"])
|
|
queue = env_dirs["dir"] / "agent_jobs_queue"
|
|
tmp_files = list(queue.glob("*.tmp"))
|
|
assert tmp_files == []
|
|
|
|
def test_enricher_run_queue_dir_derives_from_app_db(self, env_dirs,
|
|
tmp_path):
|
|
"""REGRESION: el queue_dir debe vivir junto a GX_APP_DB (que es
|
|
donde main.cpp lo escanea), NO junto a GX_APP_DIR. En el
|
|
deploy real chat.cpp setea GX_APP_DIR al repo fuente y
|
|
GX_APP_DB al install Windows — direcciones distintas. gx-cli
|
|
DEBE alinearse con APP_DB."""
|
|
self._make_enricher(env_dirs, "split_sentences")
|
|
|
|
# Mover el GX_APP_DB a un dir diferente, manteniendo GX_APP_DIR
|
|
# apuntando al original (que tiene los manifests de enrichers).
|
|
db_only_dir = tmp_path / "install_dir"
|
|
db_only_dir.mkdir()
|
|
new_app_db = db_only_dir / "graph_explorer.db"
|
|
# Crear el schema vacio en la nueva ubicacion.
|
|
cn = sqlite3.connect(new_app_db)
|
|
cn.executescript(APP_SCHEMA)
|
|
cn.commit()
|
|
cn.close()
|
|
|
|
env = dict(env_dirs["env"])
|
|
env["GX_APP_DB"] = str(new_app_db)
|
|
# GX_APP_DIR queda en env_dirs["dir"] donde estan los enrichers.
|
|
|
|
node = run_gx({**env_dirs, "env": env}, "node", "create",
|
|
"--name", "x", "--type", "text")
|
|
run_gx({**env_dirs, "env": env}, "enricher", "run",
|
|
"split_sentences", "--node", node["id"])
|
|
|
|
# El JSON DEBE estar junto al nuevo APP_DB.
|
|
files_in_new_db_dir = list(
|
|
(db_only_dir / "agent_jobs_queue").glob("*.json"))
|
|
files_in_app_dir = list(
|
|
(env_dirs["dir"] / "agent_jobs_queue").glob("*.json"))
|
|
assert len(files_in_new_db_dir) == 1, \
|
|
"queue file no aparecio junto al GX_APP_DB"
|
|
assert files_in_app_dir == [], \
|
|
"queue file aparecio junto al GX_APP_DIR (regresion del bug)"
|
|
|
|
def test_enricher_run_writes_log_to_gx_cli_log(self, env_dirs):
|
|
"""Los logs persistentes deben acabar en gx-cli.log junto a
|
|
graph_explorer.db para auditoria del agente Echo."""
|
|
self._make_enricher(env_dirs, "split_sentences")
|
|
node = run_gx(env_dirs, "node", "create", "--name", "logged",
|
|
"--type", "text")
|
|
run_gx(env_dirs, "enricher", "run", "split_sentences",
|
|
"--node", node["id"])
|
|
log_file = env_dirs["dir"] / "gx-cli.log"
|
|
assert log_file.exists()
|
|
content = log_file.read_text(encoding="utf-8")
|
|
# Algun log de node_create + alguno relacionado al enricher.
|
|
assert "node_create" in content
|
|
# El _log de enricher_run no se llama desde cmd_enricher_run
|
|
# actualmente (escribe directo a stderr). Si en el futuro se
|
|
# anyade, este assert lo cubrira automaticamente — por ahora
|
|
# basta con que node_create haya escrito.
|
|
|
|
|
|
class TestCliQuery:
|
|
def test_query_select(self, env_dirs):
|
|
run_gx(env_dirs, "node", "create", "--name", "q1", "--type", "text")
|
|
run_gx(env_dirs, "node", "create", "--name", "q2", "--type", "text")
|
|
out = run_gx(env_dirs, "query", "SELECT count(*) AS n FROM entities")
|
|
# gx-cli stringifica los valores en query (cmd_query usa str(v));
|
|
# esperar la version string es coherente con el contrato actual.
|
|
assert out["rows"][0]["n"] == "2"
|
|
|
|
def test_query_rejects_writes(self, env_dirs):
|
|
out = run_gx(env_dirs, "query",
|
|
"DELETE FROM entities WHERE 1=1", expect_ok=False)
|
|
assert out.get("ok") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCP dispatcher: importa gx-cli como modulo, llama _mcp_dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def gx_module():
|
|
"""Carga gx-cli como modulo Python para acceder a MCP_DISPATCH.
|
|
|
|
El script no tiene extension .py, asi que `spec_from_file_location`
|
|
no detecta loader — hay que pasar `SourceFileLoader` explicito.
|
|
"""
|
|
loader = SourceFileLoader("gxcli", str(GX))
|
|
spec = importlib.util.spec_from_loader("gxcli", loader)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
@pytest.fixture
|
|
def mcp_env(env_dirs, monkeypatch):
|
|
"""Activa el env de gx-cli en el proceso (gx-cli lee os.environ)."""
|
|
monkeypatch.setenv("GX_OPS_DB", str(env_dirs["ops"]))
|
|
monkeypatch.setenv("GX_APP_DB", str(env_dirs["app"]))
|
|
monkeypatch.setenv("GX_APP_DIR", str(env_dirs["dir"]))
|
|
return env_dirs
|
|
|
|
|
|
class TestMcpDispatcher:
|
|
"""Verifica que MCP_DISPATCH entrega a cada cmd_* todos los kwargs
|
|
que necesita aunque el LLM omita campos opcionales — bug original:
|
|
`args.notes` no existia porque MCP_DISPATCH no lo defaulteaba."""
|
|
|
|
def test_node_create_sin_notes_no_revienta(self, gx_module, mcp_env):
|
|
out = gx_module._mcp_dispatch("node_create",
|
|
{"name": "x", "type": "text"})
|
|
assert out.get("ok") is True, out
|
|
|
|
def test_node_create_con_notes(self, gx_module, mcp_env):
|
|
out = gx_module._mcp_dispatch("node_create", {
|
|
"name": "y", "type": "text", "notes": "hola mundo"
|
|
})
|
|
assert out["ok"] is True
|
|
row = fetchone(mcp_env["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", out["id"])
|
|
assert row[0] == "hola mundo"
|
|
|
|
def test_node_update_sin_notes_no_revienta(self, gx_module, mcp_env):
|
|
created = gx_module._mcp_dispatch("node_create",
|
|
{"name": "z", "type": "text"})
|
|
out = gx_module._mcp_dispatch("node_update",
|
|
{"id": created["id"], "name": "zz"})
|
|
assert out["ok"] is True
|
|
|
|
def test_node_update_replace_notes(self, gx_module, mcp_env):
|
|
created = gx_module._mcp_dispatch("node_create",
|
|
{"name": "u1", "type": "text"})
|
|
gx_module._mcp_dispatch("node_update", {
|
|
"id": created["id"], "notes": "primero",
|
|
})
|
|
row = fetchone(mcp_env["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", created["id"])
|
|
assert row[0] == "primero"
|
|
|
|
def test_node_update_append_notes(self, gx_module, mcp_env):
|
|
created = gx_module._mcp_dispatch("node_create",
|
|
{"name": "u2", "type": "text"})
|
|
gx_module._mcp_dispatch("node_update", {
|
|
"id": created["id"], "notes": "primero",
|
|
})
|
|
gx_module._mcp_dispatch("node_update", {
|
|
"id": created["id"], "notes": "segundo", "append_notes": True,
|
|
})
|
|
row = fetchone(mcp_env["ops"],
|
|
"SELECT notes FROM entities WHERE id=?", created["id"])
|
|
assert row[0] == "primero\n\nsegundo"
|
|
|
|
def test_dispatch_returns_error_for_unknown_tool(self, gx_module, mcp_env):
|
|
out = gx_module._mcp_dispatch("not_a_tool", {})
|
|
assert out["ok"] is False
|
|
assert "unknown tool" in out["error"]
|
|
|
|
def test_all_dispatch_entries_have_callable(self, gx_module):
|
|
"""Sanity: MCP_DISPATCH no tiene tools sin funcion. Si en el futuro
|
|
alguien anyade una entrada y olvida cablear el cmd_*, este test
|
|
falla."""
|
|
for name, (fn, defaults) in gx_module.MCP_DISPATCH.items():
|
|
assert callable(fn), f"{name} no tiene cmd_* callable"
|
|
assert isinstance(defaults, dict), f"{name} defaults no es dict"
|
|
|
|
def test_mcp_tools_match_dispatch_keys(self, gx_module):
|
|
"""El listado MCP_TOOLS (lo que el LLM ve) debe alinearse 1:1 con
|
|
las claves de MCP_DISPATCH (lo que el server ejecuta). Si un tool
|
|
anuncia capacidades pero el dispatch no las tiene, es un bug."""
|
|
tool_names = {t["name"] for t in gx_module.MCP_TOOLS}
|
|
dispatch_names = set(gx_module.MCP_DISPATCH.keys())
|
|
missing_in_dispatch = tool_names - dispatch_names
|
|
missing_in_tools = dispatch_names - tool_names
|
|
assert not missing_in_dispatch, \
|
|
f"tools sin dispatch: {missing_in_dispatch}"
|
|
assert not missing_in_tools, \
|
|
f"dispatch sin tool: {missing_in_tools}"
|
|
|
|
|
|
class TestMcpRegression0035d:
|
|
"""El bug que el agente Echo encontro: `args.notes` no existia en
|
|
SimpleNamespace porque MCP_DISPATCH no lo defaulteaba. Estos tests
|
|
fijan el contrato para que no vuelva a romperse silenciosamente."""
|
|
|
|
def test_node_create_dispatch_includes_notes_default(self, gx_module):
|
|
_, defaults = gx_module.MCP_DISPATCH["node_create"]
|
|
assert "notes" in defaults
|
|
assert defaults["notes"] is None
|
|
|
|
def test_node_update_dispatch_includes_notes_and_append(self, gx_module):
|
|
_, defaults = gx_module.MCP_DISPATCH["node_update"]
|
|
assert "notes" in defaults
|
|
assert defaults["notes"] is None
|
|
assert "append_notes" in defaults
|
|
assert defaults["append_notes"] is False
|
|
|
|
def test_node_create_inputschema_advertises_notes(self, gx_module):
|
|
tool = next(t for t in gx_module.MCP_TOOLS
|
|
if t["name"] == "node_create")
|
|
assert "notes" in tool["inputSchema"]["properties"]
|
|
|
|
def test_node_update_inputschema_advertises_notes_and_append(self, gx_module):
|
|
tool = next(t for t in gx_module.MCP_TOOLS
|
|
if t["name"] == "node_update")
|
|
props = tool["inputSchema"]["properties"]
|
|
assert "notes" in props
|
|
assert "append_notes" in props
|