Files
graph_explorer/gx-cli
T
egutierrez 652ff6f02c fix(agent_jobs): queue dir desde GX_APP_DB, no GX_APP_DIR + logs verbosos
Bug derivado del fix anterior: gx-cli escribia ficheros JSON en
`$GX_APP_DIR/agent_jobs_queue/` (apuntando al repo fuente) mientras
main.cpp escaneaba `parent(g_layout_db_path)/agent_jobs_queue/`
(install Windows). Dos directorios distintos -> jobs huerfanos.

Echo reportaba "encolado" pero el worker nunca veia los ficheros.
La causa: chat.cpp setea GX_APP_DIR=<registry>/projects/osint_graph/
apps/graph_explorer y GX_APP_DB=<install>/local_files/projects/<slug>/
graph_explorer.db. Dos sitios. Solo APP_DB coincide con donde
graph_explorer.exe escanea (parent del .db).

Fix:

* gx-cli cmd_enricher_run: queue_dir = parent(GX_APP_DB) /
  agent_jobs_queue. Alineado con main.cpp.
* gx-cli: nuevo helper `_log(tag, msg)` que escribe a stderr Y a
  `<parent(app_db)>/gx-cli.log` para auditoria persistente. Cubre
  node_create, node_update, node_delete, rel_create, enricher_run.
* gx-cli mcp _mcp_log tambien persiste a gx-cli.log.
* main.cpp: log el queue scan dir una vez por sesion para detectar
  mismatches a futuro.
* .gitignore: agent_jobs_queue/ y gx-cli.log son runtime, no se
  commitean.

Tests:

* test_enricher_run_queue_dir_derives_from_app_db (regresion)
  configura GX_APP_DB en un dir distinto de GX_APP_DIR y verifica
  que el JSON aterriza junto a APP_DB.
* test_enricher_run_writes_log_to_gx_cli_log valida la auditoria.

WSL 81 / Windows 70 + 11 skipped.
2026-05-03 16:32:22 +02:00

1124 lines
42 KiB
Python
Executable File

#!/usr/bin/env python3
"""
gx-cli — CLI usado por el agente Claude del panel Chat de graph_explorer.
Expone CRUD/promote/enricher/query sobre la operations.db activa, e incrementa
un contador en graph_explorer.db tras cada mutacion para que el viewport se
refresque automaticamente.
Env vars (las setea graph_explorer al spawnear claude -p):
GX_OPS_DB path absoluto a operations.db (mutable)
GX_APP_DB path absoluto a graph_explorer.db (jobs + agent counters)
GX_APP_DIR path al directorio de la app (para enrichers/cache)
Salida:
- Comandos de lectura: JSON en stdout.
- Mutaciones: JSON con `{"ok": true, "id": "...", ...}`.
- Errores: JSON con `{"ok": false, "error": "..."}` y exit code 1.
"""
from __future__ import annotations
import argparse
import json
import os
import sqlite3
import sys
import time
from contextlib import redirect_stdout
from datetime import datetime, timezone
from io import StringIO
from pathlib import Path
from types import SimpleNamespace
# ----------------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _now_ms() -> int:
return int(time.time() * 1000)
def _ops_db() -> str:
p = os.environ.get("GX_OPS_DB", "")
if not p:
_die("GX_OPS_DB env var is empty")
if not Path(p).exists():
_die(f"GX_OPS_DB does not exist: {p}")
return p
def _app_db() -> str:
p = os.environ.get("GX_APP_DB", "")
if not p:
_die("GX_APP_DB env var is empty")
return p
def _log(tag: str, msg: str) -> None:
"""Log a stderr y al fichero gx-cli.log junto a app_db (mismo dir
que chat.log y .mutations.marker). El fichero permite auditar lo
que el agente Echo hace cuando algo va mal — `_emit` solo va al
stdout de la herramienta y se pierde en pipelines MCP."""
line = f"[gx-cli {tag}] {msg}\n"
sys.stderr.write(line)
sys.stderr.flush()
try:
app_db = os.environ.get("GX_APP_DB", "")
if app_db:
log_path = Path(app_db).parent / "gx-cli.log"
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{_now_iso()} {line}")
except OSError:
pass
def _emit(payload: dict) -> None:
print(json.dumps(payload, ensure_ascii=False, default=str))
def _ok(**kwargs) -> None:
out = {"ok": True}
out.update(kwargs)
_emit(out)
def _die(msg: str, code: int = 1) -> None:
_emit({"ok": False, "error": msg})
sys.exit(code)
def _connect(path: str, *, readonly: bool = False) -> sqlite3.Connection:
if readonly:
uri = f"file:{path}?mode=ro"
cn = sqlite3.connect(uri, uri=True)
else:
cn = sqlite3.connect(path)
cn.row_factory = sqlite3.Row
return cn
def _bump_counter(op: str, detail: str = "") -> None:
"""Escribe un marker file que graph_explorer poll para refrescar viewport.
Usamos un fichero en lugar de tabla SQLite porque graph_explorer.db esta
abierto en WAL desde el lado Windows mientras gx-cli escribe desde WSL.
El locking de WAL falla silenciosamente cross-filesystem-boundary
(NTFS <-> 9p). Un marker file con stat() funciona en todos lados.
"""
# El marker vive junto a graph_explorer.db (mismo dir = misma escritura
# rapida en /mnt/c).
p = _app_db()
marker = Path(p).parent / ".mutations.marker"
try:
ts = _now_ms()
marker.write_text(f"{ts}\n{op}\n{detail}\n", encoding="utf-8")
except OSError as e:
sys.stderr.write(f"[gx-cli] warn: marker write failed: {e}\n")
# ----------------------------------------------------------------------------
# Detect type heuristic (mirror de entity_ops::detect_type)
# ----------------------------------------------------------------------------
def _detect_type(text: str) -> str:
import re
t = (text or "").strip()
if not t:
return "text"
if re.fullmatch(r"[^\s@]+@[^\s@]+\.[^\s@]+", t):
return "email"
if re.fullmatch(r"(\d{1,3}\.){3}\d{1,3}", t):
return "ip_address"
if t.lower().startswith(("http://", "https://")):
return "url"
if re.fullmatch(r"[a-z0-9-]+(\.[a-z0-9-]+)+", t.lower()):
return "domain"
if re.fullmatch(r"\+?\d[\d\s-]{6,}\d", t):
return "phone"
return "text"
# ----------------------------------------------------------------------------
# node ops
# ----------------------------------------------------------------------------
def cmd_node_create(args) -> None:
name = args.name
type_ref = args.type or _detect_type(name)
new_id = f"{type_ref}_{_now_ms()}"
ts = _now_iso()
src = "agent:gx-cli"
description = args.description or ""
notes = args.notes or ""
_log("node_create",
f"name={name!r} type={type_ref} notes_len={len(notes)} id={new_id}")
cn = _connect(_ops_db())
try:
cn.execute(
"INSERT INTO entities (id, name, type_ref, description, notes, "
"source, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(new_id, name, type_ref, description, notes, src, ts, ts),
)
cn.commit()
except sqlite3.IntegrityError as e:
_log("node_create", f"FAILED insert: {e}")
_die(f"insert failed: {e}")
finally:
cn.close()
_bump_counter("node.create", new_id)
_ok(id=new_id, name=name, type_ref=type_ref)
def cmd_node_delete(args) -> None:
_log("node_delete", f"id={args.id}")
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
cn.execute(
"DELETE FROM relations WHERE from_entity = ? OR to_entity = ?",
(args.id, args.id),
)
cn.commit()
if cur.rowcount == 0:
_log("node_delete", f"FAILED not found: {args.id}")
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("node.delete", args.id)
_ok(id=args.id)
def cmd_node_update(args) -> None:
sets, params = [], []
if args.name is not None:
sets.append("name = ?")
params.append(args.name)
if args.type is not None:
sets.append("type_ref = ?")
params.append(args.type)
if args.status is not None:
if args.status not in ("active", "stale", "corrupted", "archived"):
_die(f"invalid status: {args.status}")
sets.append("status = ?")
params.append(args.status)
if args.description is not None:
sets.append("description = ?")
params.append(args.description)
if args.notes is not None:
if args.append_notes and args.notes:
sets.append("notes = COALESCE(notes, '') || ?")
# separador con doble newline para legibilidad si ya hay contenido
params.append("\n\n" + args.notes)
else:
sets.append("notes = ?")
params.append(args.notes)
if args.tags is not None:
try:
tags = json.loads(args.tags) if args.tags.startswith("[") \
else [t.strip() for t in args.tags.split(",") if t.strip()]
if not isinstance(tags, list):
raise ValueError("tags must be array")
except (json.JSONDecodeError, ValueError) as e:
_die(f"bad tags: {e}")
sets.append("tags = ?")
params.append(json.dumps(tags))
if not sets:
_die("no fields to update")
sets.append("updated_at = ?")
params.append(_now_iso())
params.append(args.id)
_log("node_update",
f"id={args.id} fields={[s.split(' = ')[0] for s in sets[:-1]]}")
cn = _connect(_ops_db())
try:
cur = cn.execute(
f"UPDATE entities SET {', '.join(sets)} WHERE id = ?", params
)
cn.commit()
if cur.rowcount == 0:
_log("node_update", f"FAILED not found: {args.id}")
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("node.update", args.id)
_ok(id=args.id, fields_set=len(sets) - 1)
def cmd_node_list(args) -> None:
sql = ("SELECT id, name, type_ref, status, updated_at FROM entities")
where, params = [], []
if args.type:
where.append("type_ref = ?")
params.append(args.type)
if args.status:
where.append("status = ?")
params.append(args.status)
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY type_ref, name LIMIT ?"
params.append(max(1, min(args.limit, 1000)))
cn = _connect(_ops_db(), readonly=True)
rows = [dict(r) for r in cn.execute(sql, params).fetchall()]
cn.close()
_emit({"ok": True, "count": len(rows), "rows": rows})
def cmd_node_show(args) -> None:
cn = _connect(_ops_db(), readonly=True)
row = cn.execute(
"SELECT * FROM entities WHERE id = ?", (args.id,)
).fetchone()
if row is None:
cn.close()
_die(f"entity not found: {args.id}", code=2)
rec = dict(row)
# metadata + tags llegan como JSON string — parsea para legibilidad
for k in ("metadata", "tags"):
if k in rec and isinstance(rec[k], str):
try:
rec[k] = json.loads(rec[k])
except json.JSONDecodeError:
pass
# neighbors via relations
rels = cn.execute(
"SELECT id, name, from_entity, to_entity FROM relations "
"WHERE from_entity = ? OR to_entity = ?",
(args.id, args.id),
).fetchall()
neighbors = []
for r in rels:
other = r["to_entity"] if r["from_entity"] == args.id else r["from_entity"]
direction = "out" if r["from_entity"] == args.id else "in"
neighbors.append({
"rel_id": r["id"], "rel_name": r["name"],
"other_id": other, "direction": direction,
})
cn.close()
_emit({"ok": True, "entity": rec, "neighbors": neighbors})
def cmd_node_search(args) -> None:
q = (args.query or "").strip()
if not q:
_die("empty query")
# FTS5 prefix search por token
tokens = [t for t in q.split() if t.replace("-", "").replace("_", "").isalnum()]
if not tokens:
_die("no usable tokens after sanitization")
fts = " OR ".join(f"{t}*" for t in tokens)
cn = _connect(_ops_db(), readonly=True)
try:
rows = cn.execute(
"SELECT e.id, e.name, e.type_ref, e.status, "
" bm25(entities_fts) AS rank "
"FROM entities_fts f JOIN entities e ON e.id = f.id "
"WHERE entities_fts MATCH ? ORDER BY rank LIMIT ?",
(fts, max(1, min(args.limit, 200))),
).fetchall()
except sqlite3.OperationalError as e:
_die(f"FTS not available: {e}")
finally:
cn.close()
_emit({"ok": True, "count": len(rows), "rows": [dict(r) for r in rows]})
# ----------------------------------------------------------------------------
# relation ops
# ----------------------------------------------------------------------------
def cmd_rel_create(args) -> None:
new_id = f"rel_{_now_ms()}"
ts = _now_iso()
name = args.name or "RELATED_TO"
_log("rel_create",
f"from={args.from_id} to={args.to_id} name={name} id={new_id}")
cn = _connect(_ops_db())
try:
# verifica que existen los endpoints
for entity_id in (args.from_id, args.to_id):
r = cn.execute(
"SELECT 1 FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if r is None:
_log("rel_create", f"FAILED entity not found: {entity_id}")
_die(f"entity not found: {entity_id}", code=2)
cn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
"created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(new_id, name, args.from_id, args.to_id, ts, ts),
)
cn.commit()
finally:
cn.close()
_bump_counter("rel.create", new_id)
_ok(id=new_id, name=name, from_id=args.from_id, to_id=args.to_id)
def cmd_rel_delete(args) -> None:
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM relations WHERE id = ?", (args.id,))
cn.commit()
if cur.rowcount == 0:
_die(f"relation not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("rel.delete", args.id)
_ok(id=args.id)
def cmd_rel_list(args) -> None:
sql = ("SELECT id, name, from_entity, to_entity, status FROM relations")
where, params = [], []
if args.from_id:
where.append("from_entity = ?")
params.append(args.from_id)
if args.to_id:
where.append("to_entity = ?")
params.append(args.to_id)
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY name LIMIT ?"
params.append(max(1, min(args.limit, 2000)))
cn = _connect(_ops_db(), readonly=True)
rows = [dict(r) for r in cn.execute(sql, params).fetchall()]
cn.close()
_emit({"ok": True, "count": len(rows), "rows": rows})
# ----------------------------------------------------------------------------
# table ops (Table-typed entities backed by DuckDB)
# ----------------------------------------------------------------------------
def cmd_table_list(args) -> None:
cn = _connect(_ops_db(), readonly=True)
rows = cn.execute(
"SELECT id, name, metadata FROM entities WHERE type_ref = 'Table' "
"ORDER BY name"
).fetchall()
cn.close()
out = []
for r in rows:
m = {}
try:
m = json.loads(r["metadata"] or "{}")
except json.JSONDecodeError:
pass
out.append({
"id": r["id"], "name": r["name"],
"duckdb_path": m.get("duckdb_path", ""),
"table_name": m.get("table_name", ""),
"row_type": m.get("row_type", ""),
"columns": m.get("columns", []),
})
_emit({"ok": True, "count": len(out), "tables": out})
def cmd_table_promote(args) -> None:
"""Promociona una fila DuckDB a entidad. Replica tableview_promote_row."""
cn = _connect(_ops_db())
try:
row = cn.execute(
"SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'",
(args.table_id,),
).fetchone()
if row is None:
_die(f"Table not found: {args.table_id}", code=2)
meta = json.loads(row["metadata"] or "{}")
duckdb_path = meta.get("duckdb_path", "")
table_name = meta.get("table_name", "")
row_type = meta.get("row_type") or "row"
label_column = meta.get("label_column") or "name"
id_column = meta.get("id_column") or "id"
if not duckdb_path or not table_name:
_die(f"Table {args.table_id} has no duckdb_path/table_name")
# resolver path relativo respecto al dir de operations.db
if not Path(duckdb_path).is_absolute():
duckdb_path = str(Path(_ops_db()).parent / duckdb_path)
# idempotencia: existe ya entidad con metadata.source.row_id?
prom_id = f"prom_{row_type}_{args.row_id}".replace(" ", "_")
existing = cn.execute(
"SELECT id FROM entities WHERE id = ?", (prom_id,)
).fetchone()
if existing:
_ok(id=existing["id"], promoted=False, message="already promoted")
return
# leer la fila desde DuckDB
try:
import duckdb # type: ignore
except ImportError:
_die("duckdb python module not installed")
ddb = duckdb.connect(duckdb_path, read_only=True)
cur = ddb.execute(
f'SELECT * FROM "{table_name}" WHERE CAST("{id_column}" AS VARCHAR) = ?',
[args.row_id],
)
cols = [d[0] for d in cur.description]
vals = cur.fetchone()
ddb.close()
if vals is None:
_die(f"row not found: {args.row_id}")
row_dict = dict(zip(cols, vals))
label = str(row_dict.get(label_column, args.row_id))
ts = _now_iso()
meta_out = {
"source": {"duckdb": meta.get("duckdb_path"),
"table": table_name,
"row_id": args.row_id},
**{k: (v if isinstance(v, (str, int, float, bool)) or v is None
else str(v)) for k, v in row_dict.items()},
}
cn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
"created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(prom_id, label, row_type, "agent:gx-cli",
json.dumps(meta_out, default=str), ts, ts),
)
# CONTAINS_ROW relacion (idempotente)
rel_id = f"rel_contains_{prom_id}"
cn.execute(
"INSERT OR IGNORE INTO relations (id, name, from_entity, to_entity, "
"created_at, updated_at) VALUES (?, 'CONTAINS_ROW', ?, ?, ?, ?)",
(rel_id, args.table_id, prom_id, ts, ts),
)
cn.commit()
finally:
cn.close()
_bump_counter("table.promote", prom_id)
_ok(id=prom_id, promoted=True)
def cmd_table_demote(args) -> None:
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
cn.execute(
"DELETE FROM relations WHERE from_entity = ? OR to_entity = ?",
(args.id, args.id),
)
cn.commit()
if cur.rowcount == 0:
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("table.demote", args.id)
_ok(id=args.id)
def cmd_table_page(args) -> None:
cn = _connect(_ops_db(), readonly=True)
row = cn.execute(
"SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'",
(args.table_id,),
).fetchone()
cn.close()
if row is None:
_die(f"Table not found: {args.table_id}", code=2)
meta = json.loads(row["metadata"] or "{}")
duckdb_path = meta.get("duckdb_path", "")
table_name = meta.get("table_name", "")
if not duckdb_path or not table_name:
_die("metadata sin duckdb_path/table_name")
if not Path(duckdb_path).is_absolute():
duckdb_path = str(Path(_ops_db()).parent / duckdb_path)
cols = meta.get("columns") or []
id_col = meta.get("id_column") or "id"
try:
import duckdb # type: ignore
except ImportError:
_die("duckdb python module not installed")
ddb = duckdb.connect(duckdb_path, read_only=True)
select = f'"{id_col}"' + (
", " + ", ".join(f'"{c}"' for c in cols) if cols else ""
)
sql = f'SELECT {select} FROM "{table_name}" ORDER BY "{id_col}" LIMIT ? OFFSET ?'
cur = ddb.execute(sql, [max(1, min(args.limit, 200)), max(0, args.offset)])
names = [d[0] for d in cur.description]
rows = [dict(zip(names, [str(v) if v is not None else None for v in r]))
for r in cur.fetchall()]
total = ddb.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0]
ddb.close()
_emit({"ok": True, "table": args.table_id, "total": total,
"offset": args.offset, "limit": args.limit, "rows": rows})
# ----------------------------------------------------------------------------
# enricher ops
# ----------------------------------------------------------------------------
def _app_dir() -> str:
p = os.environ.get("GX_APP_DIR", "")
if p and Path(p).exists():
return p
# fallback: dir donde vive este script
return str(Path(__file__).resolve().parent)
def _enrichers_dir() -> Path:
return Path(_app_dir()) / "enrichers"
def cmd_enricher_list(args) -> None:
out = []
edir = _enrichers_dir()
if not edir.is_dir():
_emit({"ok": True, "count": 0, "enrichers": []})
return
for sub in sorted(edir.iterdir()):
manifest = sub / "manifest.yaml"
if not manifest.is_file():
continue
m = _parse_yaml_minimal(manifest.read_text(encoding="utf-8"))
applies_to = m.get("applies_to", []) or []
if args.type and applies_to and args.type not in applies_to:
continue
out.append({
"id": m.get("id", sub.name),
"name": m.get("name", sub.name),
"description": m.get("description", ""),
"applies_to": applies_to,
})
_emit({"ok": True, "count": len(out), "enrichers": out})
def _parse_yaml_minimal(text: str) -> dict:
"""Mini parser: top-level scalars + listas inline. Suficiente para los
manifests de enrichers que ya tenemos. No es YAML completo."""
out: dict = {}
for raw in text.splitlines():
line = raw.split("#", 1)[0].rstrip()
if not line or not line[0].isalpha():
continue
if ":" not in line:
continue
k, _, v = line.partition(":")
k = k.strip()
v = v.strip()
if v.startswith("[") and v.endswith("]"):
inner = v[1:-1].strip()
out[k] = [x.strip().strip('"').strip("'")
for x in inner.split(",") if x.strip()]
elif v.startswith('"') and v.endswith('"'):
out[k] = v[1:-1]
elif v.startswith("'") and v.endswith("'"):
out[k] = v[1:-1]
else:
out[k] = v
return out
def cmd_enricher_run(args) -> None:
"""Encola un job en `<app_dir>/agent_jobs_queue/<req_id>.json`.
main.cpp escanea ese directorio cada frame, lee cada JSON, somete
via jobs_submit y borra el fichero. Usamos directorio de ficheros
en lugar de tabla SQLite por la misma razon que el marker
`.mutations.marker`: graph_explorer.db esta abierta en WAL desde
el lado Windows, y gx-cli escribiendo via /mnt/c (9p) hace que el
mmap del .shm falle silenciosamente -> disk I/O error.
"""
edir = _enrichers_dir()
if not (edir / args.enricher / "manifest.yaml").is_file():
_die(f"enricher not found: {args.enricher}")
if args.node:
cn = _connect(_ops_db(), readonly=True)
r = cn.execute(
"SELECT id, name FROM entities WHERE id = ?", (args.node,)
).fetchone()
cn.close()
if r is None:
_die(f"node not found: {args.node}", code=2)
node_name = r["name"]
else:
node_name = ""
req_id = f"areq_{_now_ms()}"
payload = {
"id": req_id,
"enricher_id": args.enricher,
"node_id": args.node or "",
"node_name": node_name,
"params_json": args.params or "{}",
"created_at": _now_ms(),
}
# IMPORTANTE: el queue_dir debe coincidir con el que escanea main.cpp.
# main.cpp usa `parent(g_layout_db_path) / "agent_jobs_queue"`, asi
# que aqui derivamos del path de GX_APP_DB tambien — NO de GX_APP_DIR
# (que apunta al repo fuente). Si los dos no coinciden, gx-cli
# escribe en un sitio y main.cpp escanea otro -> jobs huerfanos.
app_db_path = os.environ.get("GX_APP_DB", "")
if not app_db_path:
_die("GX_APP_DB env var is empty")
queue_dir = Path(app_db_path).parent / "agent_jobs_queue"
sys.stderr.write(f"[gx-cli enricher_run] queue_dir={queue_dir}\n")
try:
queue_dir.mkdir(parents=True, exist_ok=True)
# Atomic write: tmp + rename. main.cpp nunca lee un JSON a medias
# porque el rename es atomico en NTFS y en 9p.
tmp = queue_dir / f"{req_id}.json.tmp"
final = queue_dir / f"{req_id}.json"
tmp.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
os.replace(tmp, final)
sys.stderr.write(
f"[gx-cli enricher_run] wrote {final} "
f"(enricher={args.enricher} node={args.node or ''} "
f"req={req_id})\n"
)
except OSError as e:
sys.stderr.write(
f"[gx-cli enricher_run] FAILED to write queue file: {e}\n"
)
_die(f"could not enqueue: {e}")
_ok(request_id=req_id, enricher=args.enricher, node=args.node or "",
queue_file=str(final),
message="job encolado, lo recoge el panel Jobs")
# ----------------------------------------------------------------------------
# query (read-only SELECT)
# ----------------------------------------------------------------------------
def cmd_query(args) -> None:
sql = (args.sql or "").strip()
low = sql.lower().lstrip("(").lstrip()
if not (low.startswith("select") or low.startswith("with")):
_die("only SELECT/WITH queries allowed")
forbidden = ("attach", "detach", "pragma writable", "vacuum")
if any(f in low for f in forbidden):
_die(f"forbidden token in query")
cn = _connect(_ops_db(), readonly=True)
try:
cur = cn.execute(sql)
names = [d[0] for d in cur.description] if cur.description else []
rows = [dict(zip(names, [str(v) if v is not None else None for v in r]))
for r in cur.fetchmany(max(1, min(args.limit, 500)))]
except sqlite3.Error as e:
cn.close()
_die(f"sql error: {e}")
cn.close()
_emit({"ok": True, "count": len(rows), "columns": names, "rows": rows})
# ----------------------------------------------------------------------------
# info
# ----------------------------------------------------------------------------
def cmd_info(args) -> None:
info = {
"ok": True,
"ops_db": os.environ.get("GX_OPS_DB", ""),
"app_db": os.environ.get("GX_APP_DB", ""),
"app_dir": _app_dir(),
}
try:
cn = _connect(_ops_db(), readonly=True)
info["entity_count"] = cn.execute(
"SELECT COUNT(*) FROM entities").fetchone()[0]
info["relation_count"] = cn.execute(
"SELECT COUNT(*) FROM relations").fetchone()[0]
info["types"] = [r[0] for r in cn.execute(
"SELECT DISTINCT type_ref FROM entities ORDER BY type_ref"
).fetchall()]
cn.close()
except sqlite3.Error as e:
info["error"] = str(e)
_emit(info)
# ----------------------------------------------------------------------------
# MCP server (JSON-RPC 2.0 stdio)
#
# Cuando se invoca como `gx-cli mcp-server`, el script entra en bucle
# leyendo lineas JSON-RPC en stdin y emitiendo respuestas en stdout. Los
# logs van a stderr (claude los recoge en su propio log).
#
# Reutilizamos los `cmd_*` capturando stdout: cada cmd_* emite UN unico JSON
# con _emit, asi que captura + json.loads = resultado de la tool. Mas simple
# que reescribir 17 funciones.
# ----------------------------------------------------------------------------
MCP_TOOLS = [
{"name": "info",
"description": "Resumen del operations.db: contadores de entidades y relaciones, tipos distintos. Llamar primero al empezar para tener contexto.",
"inputSchema": {"type": "object", "properties": {}}},
{"name": "node_list",
"description": "Lista entidades del grafo. Filtros opcionales por type_ref y status.",
"inputSchema": {"type": "object", "properties": {
"type": {"type": "string", "description": "Filtra por type_ref exacto"},
"status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 1000}}}},
{"name": "node_show",
"description": "Devuelve la entidad completa (metadata, tags, descripcion, status) y todos sus vecinos 1-hop con direccion (in/out).",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string", "description": "ID de la entidad"}},
"required": ["id"]}},
{"name": "node_search",
"description": "Busqueda FTS5 sobre name, description, tags. Tokeniza por whitespace y aplica busqueda por prefijo. Devuelve hits ordenados por relevancia.",
"inputSchema": {"type": "object", "properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
"required": ["query"]}},
{"name": "node_create",
"description": "Crea una entidad nueva. Si type se omite, se infiere heuristicamente del name (email/url/domain/ip/phone/text). Pasa `notes` para inicializar el panel Note del nodo (es lo que leen los enrichers split_sentences y extract_iocs_text como fuente de texto).",
"inputSchema": {"type": "object", "properties": {
"name": {"type": "string"},
"type": {"type": "string", "description": "Opcional. Auto-detectado si se omite."},
"description": {"type": "string"},
"notes": {"type": "string", "description": "Texto largo libre del nodo (panel Note)."}},
"required": ["name"]}},
{"name": "node_update",
"description": "Modifica campos de una entidad existente. Al menos un campo aparte de id debe pasarse. `notes` reemplaza el contenido completo del panel Note; combinalo con `append_notes=true` para anyadir al final preservando lo existente.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"type": {"type": "string"},
"status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
"description": {"type": "string"},
"notes": {"type": "string", "description": "Texto del panel Note. Reemplaza el contenido salvo que append_notes=true."},
"append_notes": {"type": "boolean", "default": False,
"description": "Si true, anyade `notes` al final con doble newline en vez de reemplazar."},
"tags": {"type": "string", "description": "JSON array literal o CSV 'a,b,c'"}},
"required": ["id"]}},
{"name": "node_delete",
"description": "Borra la entidad y todas sus relaciones. Irreversible. Confirmar con el usuario antes.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"}}, "required": ["id"]}},
{"name": "rel_create",
"description": "Crea una relacion dirigida from_id -> to_id. Ambas entidades deben existir.",
"inputSchema": {"type": "object", "properties": {
"from_id": {"type": "string"},
"to_id": {"type": "string"},
"name": {"type": "string", "default": "RELATED_TO",
"description": "Nombre semantico ej: KNOWS, OWNS, BELONGS_TO"}},
"required": ["from_id", "to_id"]}},
{"name": "rel_delete",
"description": "Borra una relacion por id.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"}}, "required": ["id"]}},
{"name": "rel_list",
"description": "Lista relaciones, opcionalmente filtradas por endpoint.",
"inputSchema": {"type": "object", "properties": {
"from_id": {"type": "string"},
"to_id": {"type": "string"},
"limit": {"type": "integer", "default": 200, "minimum": 1, "maximum": 2000}}}},
{"name": "table_list",
"description": "Lista entidades de tipo Table (datasets DuckDB respaldando filas tabulares).",
"inputSchema": {"type": "object", "properties": {}}},
{"name": "table_page",
"description": "Lee una pagina de filas de la tabla DuckDB asociada a un nodo Table.",
"inputSchema": {"type": "object", "properties": {
"table_id": {"type": "string"},
"offset": {"type": "integer", "default": 0, "minimum": 0},
"limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
"required": ["table_id"]}},
{"name": "table_promote",
"description": "Promociona una fila DuckDB a entidad del grafo (idempotente: si ya existe, devuelve la misma).",
"inputSchema": {"type": "object", "properties": {
"table_id": {"type": "string"},
"row_id": {"type": "string"}},
"required": ["table_id", "row_id"]}},
{"name": "table_demote",
"description": "Borra la entidad promovida. La fila DuckDB queda intacta.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"}}, "required": ["id"]}},
{"name": "enricher_list",
"description": "Lista enrichers cargados. Si se pasa type, filtra por applies_to.",
"inputSchema": {"type": "object", "properties": {
"type": {"type": "string"}}}},
{"name": "enricher_run",
"description": "Encola un enricher para correr async sobre un nodo. graph_explorer tomara el job en su pool de workers; el viewport se refresca al terminar.",
"inputSchema": {"type": "object", "properties": {
"enricher": {"type": "string", "description": "ID del enricher (ej: fetch_webpage)"},
"node": {"type": "string", "description": "ID del nodo objetivo. Opcional para enrichers globales."},
"params": {"type": "string", "description": "JSON object stringified con params. Default '{}'."}},
"required": ["enricher"]}},
{"name": "query",
"description": "Ejecuta SQL read-only (SELECT/WITH) sobre operations.db. Util para queries no cubiertas por las otras tools (agregaciones, joins).",
"inputSchema": {"type": "object", "properties": {
"sql": {"type": "string"},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 500}},
"required": ["sql"]}},
]
def _mcp_call_cmd(fn, args_dict: dict) -> dict:
"""Llama un cmd_* capturando su stdout y devolviendo el JSON parseado.
Si el cmd hace sys.exit(1) (via _die), recoge el JSON de error igualmente."""
args = SimpleNamespace(**args_dict)
buf = StringIO()
try:
with redirect_stdout(buf):
fn(args)
except SystemExit:
pass
out = buf.getvalue().strip()
if not out:
return {"ok": False, "error": "command produced no output"}
try:
return json.loads(out)
except json.JSONDecodeError as e:
return {"ok": False, "error": f"invalid JSON output: {e}",
"raw": out[:500]}
# Mapa: nombre de tool MCP -> (cmd_func, default_kwargs)
MCP_DISPATCH = {
"info": (cmd_info, {}),
"node_list": (cmd_node_list, {"type": None, "status": None, "limit": 100}),
"node_show": (cmd_node_show, {}),
"node_search": (cmd_node_search, {"limit": 50}),
"node_create": (cmd_node_create, {"type": None, "description": None,
"notes": None}),
"node_update": (cmd_node_update, {"name": None, "type": None,
"status": None, "description": None,
"notes": None, "append_notes": False,
"tags": None}),
"node_delete": (cmd_node_delete, {}),
"rel_create": (cmd_rel_create, {"name": None}),
"rel_delete": (cmd_rel_delete, {}),
"rel_list": (cmd_rel_list, {"from_id": None, "to_id": None,
"limit": 200}),
"table_list": (cmd_table_list, {}),
"table_page": (cmd_table_page, {"offset": 0, "limit": 50}),
"table_promote": (cmd_table_promote, {}),
"table_demote": (cmd_table_demote, {}),
"enricher_list": (cmd_enricher_list, {"type": None}),
"enricher_run": (cmd_enricher_run, {"node": None, "params": None}),
"query": (cmd_query, {"limit": 100}),
}
def _mcp_dispatch(tool_name: str, args: dict) -> dict:
if tool_name not in MCP_DISPATCH:
return {"ok": False, "error": f"unknown tool: {tool_name}"}
fn, defaults = MCP_DISPATCH[tool_name]
merged = dict(defaults)
merged.update(args or {})
return _mcp_call_cmd(fn, merged)
def _mcp_log(msg: str) -> None:
line = f"[gx-cli mcp] {msg}\n"
sys.stderr.write(line)
sys.stderr.flush()
# Persistir tambien a gx-cli.log junto a app_db para auditoria.
try:
app_db = os.environ.get("GX_APP_DB", "")
if app_db:
with open(Path(app_db).parent / "gx-cli.log", "a",
encoding="utf-8") as f:
f.write(f"{_now_iso()} {line}")
except OSError:
pass
def cmd_mcp_server(_args) -> None:
"""Bucle JSON-RPC 2.0 stdio. Reads line-delimited JSON, writes responses."""
_mcp_log("server starting (pid=" + str(os.getpid()) + ")")
_mcp_log(f"GX_OPS_DB={os.environ.get('GX_OPS_DB', '')}")
_mcp_log(f"GX_APP_DB={os.environ.get('GX_APP_DB', '')}")
_mcp_log(f"GX_APP_DIR={os.environ.get('GX_APP_DIR', '')}")
def emit(obj: dict) -> None:
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
for raw in sys.stdin:
line = raw.strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError as e:
_mcp_log(f"bad json: {e}")
continue
method = req.get("method", "")
rpc_id = req.get("id")
params = req.get("params") or {}
_mcp_log(f"<- method={method} id={rpc_id}")
if method == "initialize":
emit({"jsonrpc": "2.0", "id": rpc_id, "result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "graph_explorer", "version": "0.1.0"},
}})
elif method == "notifications/initialized":
# Notification — no response.
pass
elif method == "tools/list":
emit({"jsonrpc": "2.0", "id": rpc_id,
"result": {"tools": MCP_TOOLS}})
elif method == "tools/call":
tool_name = params.get("name", "")
tool_args = params.get("arguments", {}) or {}
_mcp_log(f" call {tool_name}({tool_args})")
try:
result = _mcp_dispatch(tool_name, tool_args)
payload = json.dumps(result, ensure_ascii=False)
is_err = not result.get("ok", True)
emit({"jsonrpc": "2.0", "id": rpc_id, "result": {
"content": [{"type": "text", "text": payload}],
"isError": is_err,
}})
except Exception as e:
_mcp_log(f" exception: {e}")
emit({"jsonrpc": "2.0", "id": rpc_id, "error": {
"code": -32603, "message": str(e),
}})
elif method == "ping":
emit({"jsonrpc": "2.0", "id": rpc_id, "result": {}})
elif method.startswith("notifications/"):
# Otras notificaciones — ignorar.
pass
else:
if rpc_id is not None:
emit({"jsonrpc": "2.0", "id": rpc_id, "error": {
"code": -32601, "message": f"method not found: {method}",
}})
_mcp_log("stdin closed, exiting")
# ----------------------------------------------------------------------------
# argparse wiring
# ----------------------------------------------------------------------------
def main() -> None:
p = argparse.ArgumentParser(prog="gx-cli")
sub = p.add_subparsers(dest="cmd", required=True)
# info
sp = sub.add_parser("info", help="Show ops_db summary")
sp.set_defaults(fn=cmd_info)
# node
n = sub.add_parser("node").add_subparsers(dest="op", required=True)
sp = n.add_parser("create")
sp.add_argument("--name", required=True)
sp.add_argument("--type")
sp.add_argument("--description")
sp.add_argument("--notes",
help="texto libre del nodo (panel Note del Inspector). "
"Es lo que leen split_sentences y extract_iocs_text.")
sp.set_defaults(fn=cmd_node_create)
sp = n.add_parser("delete")
sp.add_argument("id")
sp.set_defaults(fn=cmd_node_delete)
sp = n.add_parser("update")
sp.add_argument("id")
sp.add_argument("--name")
sp.add_argument("--type")
sp.add_argument("--status")
sp.add_argument("--description")
sp.add_argument("--notes",
help="reemplaza el contenido de notes (panel Note). "
"Combinable con --append-notes para acumular.")
sp.add_argument("--append-notes", action="store_true",
help="anade --notes al final de las notas existentes "
"en vez de reemplazarlas (separador: doble newline).")
sp.add_argument("--tags",
help='JSON array o "tag1,tag2" CSV')
sp.set_defaults(fn=cmd_node_update)
sp = n.add_parser("list")
sp.add_argument("--type")
sp.add_argument("--status")
sp.add_argument("--limit", type=int, default=100)
sp.set_defaults(fn=cmd_node_list)
sp = n.add_parser("show")
sp.add_argument("id")
sp.set_defaults(fn=cmd_node_show)
sp = n.add_parser("search")
sp.add_argument("query")
sp.add_argument("--limit", type=int, default=50)
sp.set_defaults(fn=cmd_node_search)
# relation
r = sub.add_parser("rel").add_subparsers(dest="op", required=True)
sp = r.add_parser("create")
sp.add_argument("--from", dest="from_id", required=True)
sp.add_argument("--to", dest="to_id", required=True)
sp.add_argument("--name")
sp.set_defaults(fn=cmd_rel_create)
sp = r.add_parser("delete")
sp.add_argument("id")
sp.set_defaults(fn=cmd_rel_delete)
sp = r.add_parser("list")
sp.add_argument("--from", dest="from_id")
sp.add_argument("--to", dest="to_id")
sp.add_argument("--limit", type=int, default=200)
sp.set_defaults(fn=cmd_rel_list)
# table
t = sub.add_parser("table").add_subparsers(dest="op", required=True)
sp = t.add_parser("list")
sp.set_defaults(fn=cmd_table_list)
sp = t.add_parser("promote")
sp.add_argument("table_id")
sp.add_argument("row_id")
sp.set_defaults(fn=cmd_table_promote)
sp = t.add_parser("demote")
sp.add_argument("id")
sp.set_defaults(fn=cmd_table_demote)
sp = t.add_parser("page")
sp.add_argument("table_id")
sp.add_argument("--offset", type=int, default=0)
sp.add_argument("--limit", type=int, default=50)
sp.set_defaults(fn=cmd_table_page)
# enricher
e = sub.add_parser("enricher").add_subparsers(dest="op", required=True)
sp = e.add_parser("list")
sp.add_argument("--type")
sp.set_defaults(fn=cmd_enricher_list)
sp = e.add_parser("run")
sp.add_argument("enricher")
sp.add_argument("--node")
sp.add_argument("--params", help='JSON, default "{}"')
sp.set_defaults(fn=cmd_enricher_run)
# query
sp = sub.add_parser("query")
sp.add_argument("sql")
sp.add_argument("--limit", type=int, default=100)
sp.set_defaults(fn=cmd_query)
# mcp-server (JSON-RPC 2.0 stdio)
sp = sub.add_parser("mcp-server",
help="Run as MCP server reading JSON-RPC from stdin")
sp.set_defaults(fn=cmd_mcp_server)
args = p.parse_args()
args.fn(args)
if __name__ == "__main__":
main()