Files
egutierrez 0bab5c97c7 chore: auto-commit (5 archivos)
- CMakeLists.txt
- agent.cpp
- agent.h
- gx-cli
- main.cpp

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 13:30:27 +02:00

1225 lines
47 KiB
Python
Executable File

#!/usr/bin/env python3
"""
gx-cli — CLI usado por el agente Claude del panel Chat de graph_explorer.
Expone CRUD/promote/enricher/query sobre la operations.db activa, e incrementa
un contador en graph_explorer.db tras cada mutacion para que el viewport se
refresque automaticamente.
Env vars (las setea graph_explorer al spawnear claude -p):
GX_OPS_DB path absoluto a operations.db (mutable)
GX_APP_DB path absoluto a graph_explorer.db (jobs + agent counters)
GX_APP_DIR path al directorio de la app (para enrichers/cache)
Salida:
- Comandos de lectura: JSON en stdout.
- Mutaciones: JSON con `{"ok": true, "id": "...", ...}`.
- Errores: JSON con `{"ok": false, "error": "..."}` y exit code 1.
"""
from __future__ import annotations
import argparse
import json
import os
import sqlite3
import sys
import time
from contextlib import redirect_stdout
from datetime import datetime, timezone
from io import StringIO
from pathlib import Path
from types import SimpleNamespace
# ----------------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _now_ms() -> int:
return int(time.time() * 1000)
def _ops_db() -> str:
p = os.environ.get("GX_OPS_DB", "")
if not p:
_die("GX_OPS_DB env var is empty")
if not Path(p).exists():
_die(f"GX_OPS_DB does not exist: {p}")
return p
def _app_db() -> str:
p = os.environ.get("GX_APP_DB", "")
if not p:
_die("GX_APP_DB env var is empty")
return p
def _log(tag: str, msg: str) -> None:
"""Log a stderr y al fichero gx-cli.log junto a app_db (mismo dir
que chat.log y .mutations.marker). El fichero permite auditar lo
que el agente hace cuando algo va mal — `_emit` solo va al
stdout de la herramienta y se pierde en pipelines MCP."""
line = f"[gx-cli {tag}] {msg}\n"
sys.stderr.write(line)
sys.stderr.flush()
try:
app_db = os.environ.get("GX_APP_DB", "")
if app_db:
log_path = Path(app_db).parent / "gx-cli.log"
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{_now_iso()} {line}")
except OSError:
pass
def _emit(payload: dict) -> None:
print(json.dumps(payload, ensure_ascii=False, default=str))
def _ok(**kwargs) -> None:
out = {"ok": True}
out.update(kwargs)
_emit(out)
def _die(msg: str, code: int = 1) -> None:
_emit({"ok": False, "error": msg})
sys.exit(code)
def _connect(path: str, *, readonly: bool = False) -> sqlite3.Connection:
if readonly:
uri = f"file:{path}?mode=ro"
cn = sqlite3.connect(uri, uri=True)
else:
cn = sqlite3.connect(path)
cn.row_factory = sqlite3.Row
return cn
def _bump_counter(op: str, detail: str = "") -> None:
"""Escribe un marker file que graph_explorer poll para refrescar viewport.
Usamos un fichero en lugar de tabla SQLite porque graph_explorer.db esta
abierto en WAL desde el lado Windows mientras gx-cli escribe desde WSL.
El locking de WAL falla silenciosamente cross-filesystem-boundary
(NTFS <-> 9p). Un marker file con stat() funciona en todos lados.
"""
# El marker vive junto a graph_explorer.db (mismo dir = misma escritura
# rapida en /mnt/c).
p = _app_db()
marker = Path(p).parent / ".mutations.marker"
try:
ts = _now_ms()
marker.write_text(f"{ts}\n{op}\n{detail}\n", encoding="utf-8")
except OSError as e:
sys.stderr.write(f"[gx-cli] warn: marker write failed: {e}\n")
# ----------------------------------------------------------------------------
# Detect type heuristic (mirror de entity_ops::detect_type)
# ----------------------------------------------------------------------------
def _detect_type(text: str) -> str:
import re
t = (text or "").strip()
if not t:
return "text"
if re.fullmatch(r"[^\s@]+@[^\s@]+\.[^\s@]+", t):
return "email"
if re.fullmatch(r"(\d{1,3}\.){3}\d{1,3}", t):
return "ip_address"
if t.lower().startswith(("http://", "https://")):
return "url"
if re.fullmatch(r"[a-z0-9-]+(\.[a-z0-9-]+)+", t.lower()):
return "domain"
if re.fullmatch(r"\+?\d[\d\s-]{6,}\d", t):
return "phone"
return "text"
# ----------------------------------------------------------------------------
# node ops
# ----------------------------------------------------------------------------
def cmd_node_create(args) -> None:
name = args.name
type_ref = args.type or _detect_type(name)
new_id = f"{type_ref}_{_now_ms()}"
ts = _now_iso()
src = "agent:gx-cli"
description = args.description or ""
notes = args.notes or ""
_log("node_create",
f"name={name!r} type={type_ref} notes_len={len(notes)} id={new_id}")
cn = _connect(_ops_db())
try:
cn.execute(
"INSERT INTO entities (id, name, type_ref, description, notes, "
"source, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(new_id, name, type_ref, description, notes, src, ts, ts),
)
cn.commit()
except sqlite3.IntegrityError as e:
_log("node_create", f"FAILED insert: {e}")
_die(f"insert failed: {e}")
finally:
cn.close()
_bump_counter("node.create", new_id)
_ok(id=new_id, name=name, type_ref=type_ref)
def cmd_node_delete(args) -> None:
_log("node_delete", f"id={args.id}")
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
cn.execute(
"DELETE FROM relations WHERE from_entity = ? OR to_entity = ?",
(args.id, args.id),
)
cn.commit()
if cur.rowcount == 0:
_log("node_delete", f"FAILED not found: {args.id}")
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("node.delete", args.id)
_ok(id=args.id)
def cmd_node_update(args) -> None:
sets, params = [], []
if args.name is not None:
sets.append("name = ?")
params.append(args.name)
if args.type is not None:
sets.append("type_ref = ?")
params.append(args.type)
if args.status is not None:
if args.status not in ("active", "stale", "corrupted", "archived"):
_die(f"invalid status: {args.status}")
sets.append("status = ?")
params.append(args.status)
if args.description is not None:
sets.append("description = ?")
params.append(args.description)
if args.notes is not None:
if args.append_notes and args.notes:
sets.append("notes = COALESCE(notes, '') || ?")
# separador con doble newline para legibilidad si ya hay contenido
params.append("\n\n" + args.notes)
else:
sets.append("notes = ?")
params.append(args.notes)
if args.tags is not None:
try:
tags = json.loads(args.tags) if args.tags.startswith("[") \
else [t.strip() for t in args.tags.split(",") if t.strip()]
if not isinstance(tags, list):
raise ValueError("tags must be array")
except (json.JSONDecodeError, ValueError) as e:
_die(f"bad tags: {e}")
sets.append("tags = ?")
params.append(json.dumps(tags))
# Issue 0036d: --clear-group-id saca la entidad de su grupo
# (UPDATE entities SET group_id = NULL). Booleano sin value param;
# combinable con otros sets.
if getattr(args, "clear_group_id", False):
sets.append("group_id = NULL")
_log("node_update", f"clear_group_id=true id={args.id}")
if not sets:
_die("no fields to update")
sets.append("updated_at = ?")
params.append(_now_iso())
params.append(args.id)
_log("node_update",
f"id={args.id} fields={[s.split(' = ')[0] for s in sets[:-1]]}")
cn = _connect(_ops_db())
try:
cur = cn.execute(
f"UPDATE entities SET {', '.join(sets)} WHERE id = ?", params
)
cn.commit()
if cur.rowcount == 0:
_log("node_update", f"FAILED not found: {args.id}")
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("node.update", args.id)
_ok(id=args.id, fields_set=len(sets) - 1)
def cmd_node_list(args) -> None:
sql = ("SELECT id, name, type_ref, status, updated_at FROM entities")
where, params = [], []
if args.type:
where.append("type_ref = ?")
params.append(args.type)
if args.status:
where.append("status = ?")
params.append(args.status)
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY type_ref, name LIMIT ?"
params.append(max(1, min(args.limit, 1000)))
cn = _connect(_ops_db(), readonly=True)
rows = [dict(r) for r in cn.execute(sql, params).fetchall()]
cn.close()
_emit({"ok": True, "count": len(rows), "rows": rows})
def cmd_node_show(args) -> None:
cn = _connect(_ops_db(), readonly=True)
row = cn.execute(
"SELECT * FROM entities WHERE id = ?", (args.id,)
).fetchone()
if row is None:
cn.close()
_die(f"entity not found: {args.id}", code=2)
rec = dict(row)
# metadata + tags llegan como JSON string — parsea para legibilidad
for k in ("metadata", "tags"):
if k in rec and isinstance(rec[k], str):
try:
rec[k] = json.loads(rec[k])
except json.JSONDecodeError:
pass
# neighbors via relations
rels = cn.execute(
"SELECT id, name, from_entity, to_entity FROM relations "
"WHERE from_entity = ? OR to_entity = ?",
(args.id, args.id),
).fetchall()
neighbors = []
for r in rels:
other = r["to_entity"] if r["from_entity"] == args.id else r["from_entity"]
direction = "out" if r["from_entity"] == args.id else "in"
neighbors.append({
"rel_id": r["id"], "rel_name": r["name"],
"other_id": other, "direction": direction,
})
cn.close()
_emit({"ok": True, "entity": rec, "neighbors": neighbors})
def cmd_node_search(args) -> None:
q = (args.query or "").strip()
if not q:
_die("empty query")
# FTS5 prefix search por token
tokens = [t for t in q.split() if t.replace("-", "").replace("_", "").isalnum()]
if not tokens:
_die("no usable tokens after sanitization")
fts = " OR ".join(f"{t}*" for t in tokens)
cn = _connect(_ops_db(), readonly=True)
try:
rows = cn.execute(
"SELECT e.id, e.name, e.type_ref, e.status, "
" bm25(entities_fts) AS rank "
"FROM entities_fts f JOIN entities e ON e.id = f.id "
"WHERE entities_fts MATCH ? ORDER BY rank LIMIT ?",
(fts, max(1, min(args.limit, 200))),
).fetchall()
except sqlite3.OperationalError as e:
_die(f"FTS not available: {e}")
finally:
cn.close()
_emit({"ok": True, "count": len(rows), "rows": [dict(r) for r in rows]})
# ----------------------------------------------------------------------------
# relation ops
# ----------------------------------------------------------------------------
def cmd_rel_create(args) -> None:
new_id = f"rel_{_now_ms()}"
ts = _now_iso()
name = args.name or "RELATED_TO"
_log("rel_create",
f"from={args.from_id} to={args.to_id} name={name} id={new_id}")
cn = _connect(_ops_db())
try:
# verifica que existen los endpoints
for entity_id in (args.from_id, args.to_id):
r = cn.execute(
"SELECT 1 FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if r is None:
_log("rel_create", f"FAILED entity not found: {entity_id}")
_die(f"entity not found: {entity_id}", code=2)
cn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
"created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(new_id, name, args.from_id, args.to_id, ts, ts),
)
cn.commit()
finally:
cn.close()
_bump_counter("rel.create", new_id)
_ok(id=new_id, name=name, from_id=args.from_id, to_id=args.to_id)
def cmd_rel_delete(args) -> None:
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM relations WHERE id = ?", (args.id,))
cn.commit()
if cur.rowcount == 0:
_die(f"relation not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("rel.delete", args.id)
_ok(id=args.id)
def cmd_rel_list(args) -> None:
sql = ("SELECT id, name, from_entity, to_entity, status FROM relations")
where, params = [], []
if args.from_id:
where.append("from_entity = ?")
params.append(args.from_id)
if args.to_id:
where.append("to_entity = ?")
params.append(args.to_id)
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY name LIMIT ?"
params.append(max(1, min(args.limit, 2000)))
cn = _connect(_ops_db(), readonly=True)
rows = [dict(r) for r in cn.execute(sql, params).fetchall()]
cn.close()
_emit({"ok": True, "count": len(rows), "rows": rows})
# ----------------------------------------------------------------------------
# table ops (Table-typed entities backed by DuckDB)
# ----------------------------------------------------------------------------
def cmd_table_list(args) -> None:
cn = _connect(_ops_db(), readonly=True)
rows = cn.execute(
"SELECT id, name, metadata FROM entities WHERE type_ref = 'Table' "
"ORDER BY name"
).fetchall()
cn.close()
out = []
for r in rows:
m = {}
try:
m = json.loads(r["metadata"] or "{}")
except json.JSONDecodeError:
pass
out.append({
"id": r["id"], "name": r["name"],
"duckdb_path": m.get("duckdb_path", ""),
"table_name": m.get("table_name", ""),
"row_type": m.get("row_type", ""),
"columns": m.get("columns", []),
})
_emit({"ok": True, "count": len(out), "tables": out})
def cmd_table_promote(args) -> None:
"""Promociona una fila DuckDB a entidad. Replica node_groups_promote_row."""
cn = _connect(_ops_db())
try:
row = cn.execute(
"SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'",
(args.table_id,),
).fetchone()
if row is None:
_die(f"Table not found: {args.table_id}", code=2)
meta = json.loads(row["metadata"] or "{}")
duckdb_path = meta.get("duckdb_path", "")
table_name = meta.get("table_name", "")
row_type = meta.get("row_type") or "row"
label_column = meta.get("label_column") or "name"
id_column = meta.get("id_column") or "id"
if not duckdb_path or not table_name:
_die(f"Table {args.table_id} has no duckdb_path/table_name")
# resolver path relativo respecto al dir de operations.db
if not Path(duckdb_path).is_absolute():
duckdb_path = str(Path(_ops_db()).parent / duckdb_path)
# idempotencia: existe ya entidad con metadata.source.row_id?
prom_id = f"prom_{row_type}_{args.row_id}".replace(" ", "_")
existing = cn.execute(
"SELECT id FROM entities WHERE id = ?", (prom_id,)
).fetchone()
if existing:
_ok(id=existing["id"], promoted=False, message="already promoted")
return
# leer la fila desde DuckDB
try:
import duckdb # type: ignore
except ImportError:
_die("duckdb python module not installed")
ddb = duckdb.connect(duckdb_path, read_only=True)
cur = ddb.execute(
f'SELECT * FROM "{table_name}" WHERE CAST("{id_column}" AS VARCHAR) = ?',
[args.row_id],
)
cols = [d[0] for d in cur.description]
vals = cur.fetchone()
ddb.close()
if vals is None:
_die(f"row not found: {args.row_id}")
row_dict = dict(zip(cols, vals))
label = str(row_dict.get(label_column, args.row_id))
ts = _now_iso()
meta_out = {
"source": {"duckdb": meta.get("duckdb_path"),
"table": table_name,
"row_id": args.row_id},
**{k: (v if isinstance(v, (str, int, float, bool)) or v is None
else str(v)) for k, v in row_dict.items()},
}
cn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
"created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(prom_id, label, row_type, "agent:gx-cli",
json.dumps(meta_out, default=str), ts, ts),
)
# CONTAINS_ROW relacion (idempotente)
rel_id = f"rel_contains_{prom_id}"
cn.execute(
"INSERT OR IGNORE INTO relations (id, name, from_entity, to_entity, "
"created_at, updated_at) VALUES (?, 'CONTAINS_ROW', ?, ?, ?, ?)",
(rel_id, args.table_id, prom_id, ts, ts),
)
cn.commit()
finally:
cn.close()
_bump_counter("table.promote", prom_id)
_ok(id=prom_id, promoted=True)
def cmd_table_demote(args) -> None:
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
cn.execute(
"DELETE FROM relations WHERE from_entity = ? OR to_entity = ?",
(args.id, args.id),
)
cn.commit()
if cur.rowcount == 0:
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
_bump_counter("table.demote", args.id)
_ok(id=args.id)
def cmd_table_page(args) -> None:
cn = _connect(_ops_db(), readonly=True)
row = cn.execute(
"SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'",
(args.table_id,),
).fetchone()
cn.close()
if row is None:
_die(f"Table not found: {args.table_id}", code=2)
meta = json.loads(row["metadata"] or "{}")
duckdb_path = meta.get("duckdb_path", "")
table_name = meta.get("table_name", "")
if not duckdb_path or not table_name:
_die("metadata sin duckdb_path/table_name")
if not Path(duckdb_path).is_absolute():
duckdb_path = str(Path(_ops_db()).parent / duckdb_path)
cols = meta.get("columns") or []
id_col = meta.get("id_column") or "id"
try:
import duckdb # type: ignore
except ImportError:
_die("duckdb python module not installed")
ddb = duckdb.connect(duckdb_path, read_only=True)
select = f'"{id_col}"' + (
", " + ", ".join(f'"{c}"' for c in cols) if cols else ""
)
sql = f'SELECT {select} FROM "{table_name}" ORDER BY "{id_col}" LIMIT ? OFFSET ?'
cur = ddb.execute(sql, [max(1, min(args.limit, 200)), max(0, args.offset)])
names = [d[0] for d in cur.description]
rows = [dict(zip(names, [str(v) if v is not None else None for v in r]))
for r in cur.fetchall()]
total = ddb.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0]
ddb.close()
_emit({"ok": True, "table": args.table_id, "total": total,
"offset": args.offset, "limit": args.limit, "rows": rows})
# ----------------------------------------------------------------------------
# group ops (issue 0036b) — espejo Python del loader C++ kind=Group
# ----------------------------------------------------------------------------
def cmd_group_visual(args) -> None:
"""Resuelve la iconografia heredada de un Group (issue 0035e).
Mira los `type_ref` distintos de los hijos de `container_id`
(entities con group_id=container_id, excluyendo subgrupos). Si todos
comparten un solo tipo, devuelve `inherited: <type>`. Si la familia
es heterogenea (>1 tipos distintos) o vacia, devuelve
`inherited: 'Group'` (visual generico).
Espejea la query SQL del lado C++ (`apply_group_inherited_visuals`
en data.cpp) para que los tests pytest verifiquen el contrato sin
el binario. La forma sigue siendo siempre 'square'.
"""
cn = _connect(_ops_db(), readonly=True)
cur = cn.execute(
"SELECT DISTINCT type_ref FROM entities "
"WHERE group_id = ? AND type_ref != 'Group'",
(args.container_id,),
)
types = sorted({r["type_ref"] for r in cur.fetchall() if r["type_ref"]})
cn.close()
homogeneous = (len(types) == 1)
inherited = types[0] if homogeneous else "Group"
_emit({
"ok": True,
"container": args.container_id,
"child_types": types,
"homogeneous": homogeneous,
"inherited": inherited,
"shape": "square",
})
def cmd_group_page(args) -> None:
"""Lista entidades hijas de un Group (entities.group_id = ?).
Espejea exactamente la query del loader C++
`node_groups_page_for_group` para que los tests pytest verifiquen
el contrato SQL (mismo orden de filas, mismas columnas) sin depender
del binario. Util tambien como tool MCP para que el agente
inspeccione el contenido de un Group sin abrir la app.
"""
cn = _connect(_ops_db(), readonly=True)
total = cn.execute(
"SELECT count(*) FROM entities WHERE group_id = ?",
(args.container_id,),
).fetchone()[0]
limit = max(1, min(int(args.limit), 5000))
offset = max(0, int(args.offset))
cur = cn.execute(
"SELECT id, name, type_ref, status, updated_at "
"FROM entities WHERE group_id = ? "
"ORDER BY updated_at DESC LIMIT ? OFFSET ?",
(args.container_id, limit, offset),
)
rows = [dict(r) for r in cur.fetchall()]
cn.close()
_emit({"ok": True, "container": args.container_id, "total": total,
"offset": offset, "limit": limit, "rows": rows})
# ----------------------------------------------------------------------------
# enricher ops
# ----------------------------------------------------------------------------
def _app_dir() -> str:
p = os.environ.get("GX_APP_DIR", "")
if p and Path(p).exists():
return p
# fallback: dir donde vive este script
return str(Path(__file__).resolve().parent)
def _enrichers_dir() -> Path:
return Path(_app_dir()) / "enrichers"
def cmd_enricher_list(args) -> None:
out = []
edir = _enrichers_dir()
if not edir.is_dir():
_emit({"ok": True, "count": 0, "enrichers": []})
return
for sub in sorted(edir.iterdir()):
manifest = sub / "manifest.yaml"
if not manifest.is_file():
continue
m = _parse_yaml_minimal(manifest.read_text(encoding="utf-8"))
applies_to = m.get("applies_to", []) or []
if args.type and applies_to and args.type not in applies_to:
continue
out.append({
"id": m.get("id", sub.name),
"name": m.get("name", sub.name),
"description": m.get("description", ""),
"applies_to": applies_to,
})
_emit({"ok": True, "count": len(out), "enrichers": out})
def _parse_yaml_minimal(text: str) -> dict:
"""Mini parser: top-level scalars + listas inline. Suficiente para los
manifests de enrichers que ya tenemos. No es YAML completo."""
out: dict = {}
for raw in text.splitlines():
line = raw.split("#", 1)[0].rstrip()
if not line or not line[0].isalpha():
continue
if ":" not in line:
continue
k, _, v = line.partition(":")
k = k.strip()
v = v.strip()
if v.startswith("[") and v.endswith("]"):
inner = v[1:-1].strip()
out[k] = [x.strip().strip('"').strip("'")
for x in inner.split(",") if x.strip()]
elif v.startswith('"') and v.endswith('"'):
out[k] = v[1:-1]
elif v.startswith("'") and v.endswith("'"):
out[k] = v[1:-1]
else:
out[k] = v
return out
def cmd_enricher_run(args) -> None:
"""Encola un job en `<app_dir>/agent_jobs_queue/<req_id>.json`.
main.cpp escanea ese directorio cada frame, lee cada JSON, somete
via jobs_submit y borra el fichero. Usamos directorio de ficheros
en lugar de tabla SQLite por la misma razon que el marker
`.mutations.marker`: graph_explorer.db esta abierta en WAL desde
el lado Windows, y gx-cli escribiendo via /mnt/c (9p) hace que el
mmap del .shm falle silenciosamente -> disk I/O error.
"""
edir = _enrichers_dir()
if not (edir / args.enricher / "manifest.yaml").is_file():
_die(f"enricher not found: {args.enricher}")
if args.node:
cn = _connect(_ops_db(), readonly=True)
r = cn.execute(
"SELECT id, name FROM entities WHERE id = ?", (args.node,)
).fetchone()
cn.close()
if r is None:
_die(f"node not found: {args.node}", code=2)
node_name = r["name"]
else:
node_name = ""
req_id = f"areq_{_now_ms()}"
payload = {
"id": req_id,
"enricher_id": args.enricher,
"node_id": args.node or "",
"node_name": node_name,
"params_json": args.params or "{}",
"created_at": _now_ms(),
}
# IMPORTANTE: el queue_dir debe coincidir con el que escanea main.cpp.
# main.cpp usa `parent(g_layout_db_path) / "agent_jobs_queue"`, asi
# que aqui derivamos del path de GX_APP_DB tambien — NO de GX_APP_DIR
# (que apunta al repo fuente). Si los dos no coinciden, gx-cli
# escribe en un sitio y main.cpp escanea otro -> jobs huerfanos.
app_db_path = os.environ.get("GX_APP_DB", "")
if not app_db_path:
_die("GX_APP_DB env var is empty")
queue_dir = Path(app_db_path).parent / "agent_jobs_queue"
sys.stderr.write(f"[gx-cli enricher_run] queue_dir={queue_dir}\n")
try:
queue_dir.mkdir(parents=True, exist_ok=True)
# Atomic write: tmp + rename. main.cpp nunca lee un JSON a medias
# porque el rename es atomico en NTFS y en 9p.
tmp = queue_dir / f"{req_id}.json.tmp"
final = queue_dir / f"{req_id}.json"
tmp.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
os.replace(tmp, final)
sys.stderr.write(
f"[gx-cli enricher_run] wrote {final} "
f"(enricher={args.enricher} node={args.node or ''} "
f"req={req_id})\n"
)
except OSError as e:
sys.stderr.write(
f"[gx-cli enricher_run] FAILED to write queue file: {e}\n"
)
_die(f"could not enqueue: {e}")
_ok(request_id=req_id, enricher=args.enricher, node=args.node or "",
queue_file=str(final),
message="job encolado, lo recoge el panel Jobs")
# ----------------------------------------------------------------------------
# query (read-only SELECT)
# ----------------------------------------------------------------------------
def cmd_query(args) -> None:
sql = (args.sql or "").strip()
low = sql.lower().lstrip("(").lstrip()
if not (low.startswith("select") or low.startswith("with")):
_die("only SELECT/WITH queries allowed")
forbidden = ("attach", "detach", "pragma writable", "vacuum")
if any(f in low for f in forbidden):
_die(f"forbidden token in query")
cn = _connect(_ops_db(), readonly=True)
try:
cur = cn.execute(sql)
names = [d[0] for d in cur.description] if cur.description else []
rows = [dict(zip(names, [str(v) if v is not None else None for v in r]))
for r in cur.fetchmany(max(1, min(args.limit, 500)))]
except sqlite3.Error as e:
cn.close()
_die(f"sql error: {e}")
cn.close()
_emit({"ok": True, "count": len(rows), "columns": names, "rows": rows})
# ----------------------------------------------------------------------------
# info
# ----------------------------------------------------------------------------
def cmd_info(args) -> None:
info = {
"ok": True,
"ops_db": os.environ.get("GX_OPS_DB", ""),
"app_db": os.environ.get("GX_APP_DB", ""),
"app_dir": _app_dir(),
}
try:
cn = _connect(_ops_db(), readonly=True)
info["entity_count"] = cn.execute(
"SELECT COUNT(*) FROM entities").fetchone()[0]
info["relation_count"] = cn.execute(
"SELECT COUNT(*) FROM relations").fetchone()[0]
info["types"] = [r[0] for r in cn.execute(
"SELECT DISTINCT type_ref FROM entities ORDER BY type_ref"
).fetchall()]
cn.close()
except sqlite3.Error as e:
info["error"] = str(e)
_emit(info)
# ----------------------------------------------------------------------------
# MCP server (JSON-RPC 2.0 stdio)
#
# Cuando se invoca como `gx-cli mcp-server`, el script entra en bucle
# leyendo lineas JSON-RPC en stdin y emitiendo respuestas en stdout. Los
# logs van a stderr (claude los recoge en su propio log).
#
# Reutilizamos los `cmd_*` capturando stdout: cada cmd_* emite UN unico JSON
# con _emit, asi que captura + json.loads = resultado de la tool. Mas simple
# que reescribir 17 funciones.
# ----------------------------------------------------------------------------
MCP_TOOLS = [
{"name": "info",
"description": "Resumen del operations.db: contadores de entidades y relaciones, tipos distintos. Llamar primero al empezar para tener contexto.",
"inputSchema": {"type": "object", "properties": {}}},
{"name": "node_list",
"description": "Lista entidades del grafo. Filtros opcionales por type_ref y status.",
"inputSchema": {"type": "object", "properties": {
"type": {"type": "string", "description": "Filtra por type_ref exacto"},
"status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 1000}}}},
{"name": "node_show",
"description": "Devuelve la entidad completa (metadata, tags, descripcion, status) y todos sus vecinos 1-hop con direccion (in/out).",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string", "description": "ID de la entidad"}},
"required": ["id"]}},
{"name": "node_search",
"description": "Busqueda FTS5 sobre name, description, tags. Tokeniza por whitespace y aplica busqueda por prefijo. Devuelve hits ordenados por relevancia.",
"inputSchema": {"type": "object", "properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
"required": ["query"]}},
{"name": "node_create",
"description": "Crea una entidad nueva. Si type se omite, se infiere heuristicamente del name (email/url/domain/ip/phone/text). Pasa `notes` para inicializar el panel Note del nodo (es lo que leen los enrichers split_sentences y extract_iocs_text como fuente de texto).",
"inputSchema": {"type": "object", "properties": {
"name": {"type": "string"},
"type": {"type": "string", "description": "Opcional. Auto-detectado si se omite."},
"description": {"type": "string"},
"notes": {"type": "string", "description": "Texto largo libre del nodo (panel Note)."}},
"required": ["name"]}},
{"name": "node_update",
"description": "Modifica campos de una entidad existente. Al menos un campo aparte de id debe pasarse. `notes` reemplaza el contenido completo del panel Note; combinalo con `append_notes=true` para anyadir al final preservando lo existente.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"type": {"type": "string"},
"status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
"description": {"type": "string"},
"notes": {"type": "string", "description": "Texto del panel Note. Reemplaza el contenido salvo que append_notes=true."},
"append_notes": {"type": "boolean", "default": False,
"description": "Si true, anyade `notes` al final con doble newline en vez de reemplazar."},
"tags": {"type": "string", "description": "JSON array literal o CSV 'a,b,c'"},
"clear_group_id": {"type": "boolean", "default": False,
"description": "saca la entidad del grupo (group_id = NULL)"}},
"required": ["id"]}},
{"name": "node_delete",
"description": "Borra la entidad y todas sus relaciones. Irreversible. Confirmar con el usuario antes.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"}}, "required": ["id"]}},
{"name": "rel_create",
"description": "Crea una relacion dirigida from_id -> to_id. Ambas entidades deben existir.",
"inputSchema": {"type": "object", "properties": {
"from_id": {"type": "string"},
"to_id": {"type": "string"},
"name": {"type": "string", "default": "RELATED_TO",
"description": "Nombre semantico ej: KNOWS, OWNS, BELONGS_TO"}},
"required": ["from_id", "to_id"]}},
{"name": "rel_delete",
"description": "Borra una relacion por id.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"}}, "required": ["id"]}},
{"name": "rel_list",
"description": "Lista relaciones, opcionalmente filtradas por endpoint.",
"inputSchema": {"type": "object", "properties": {
"from_id": {"type": "string"},
"to_id": {"type": "string"},
"limit": {"type": "integer", "default": 200, "minimum": 1, "maximum": 2000}}}},
{"name": "table_list",
"description": "Lista entidades de tipo Table (datasets DuckDB respaldando filas tabulares).",
"inputSchema": {"type": "object", "properties": {}}},
{"name": "table_page",
"description": "Lee una pagina de filas de la tabla DuckDB asociada a un nodo Table.",
"inputSchema": {"type": "object", "properties": {
"table_id": {"type": "string"},
"offset": {"type": "integer", "default": 0, "minimum": 0},
"limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
"required": ["table_id"]}},
{"name": "table_promote",
"description": "Promociona una fila DuckDB a entidad del grafo (idempotente: si ya existe, devuelve la misma).",
"inputSchema": {"type": "object", "properties": {
"table_id": {"type": "string"},
"row_id": {"type": "string"}},
"required": ["table_id", "row_id"]}},
{"name": "table_demote",
"description": "Borra la entidad promovida. La fila DuckDB queda intacta.",
"inputSchema": {"type": "object", "properties": {
"id": {"type": "string"}}, "required": ["id"]}},
{"name": "group_page",
"description": "Lista entidades hijas de un Group (entities.group_id = container_id). Espejea el loader C++ de NodeGroups kind=Group.",
"inputSchema": {"type": "object", "properties": {
"container_id": {"type": "string"},
"offset": {"type": "integer", "default": 0, "minimum": 0},
"limit": {"type": "integer", "default": 200, "minimum": 1, "maximum": 5000}},
"required": ["container_id"]}},
{"name": "enricher_list",
"description": "Lista enrichers cargados. Si se pasa type, filtra por applies_to.",
"inputSchema": {"type": "object", "properties": {
"type": {"type": "string"}}}},
{"name": "enricher_run",
"description": "Encola un enricher para correr async sobre un nodo. graph_explorer tomara el job en su pool de workers; el viewport se refresca al terminar.",
"inputSchema": {"type": "object", "properties": {
"enricher": {"type": "string", "description": "ID del enricher (ej: fetch_webpage)"},
"node": {"type": "string", "description": "ID del nodo objetivo. Opcional para enrichers globales."},
"params": {"type": "string", "description": "JSON object stringified con params. Default '{}'."}},
"required": ["enricher"]}},
{"name": "query",
"description": "Ejecuta SQL read-only (SELECT/WITH) sobre operations.db. Util para queries no cubiertas por las otras tools (agregaciones, joins).",
"inputSchema": {"type": "object", "properties": {
"sql": {"type": "string"},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 500}},
"required": ["sql"]}},
]
def _mcp_call_cmd(fn, args_dict: dict) -> dict:
"""Llama un cmd_* capturando su stdout y devolviendo el JSON parseado.
Si el cmd hace sys.exit(1) (via _die), recoge el JSON de error igualmente."""
args = SimpleNamespace(**args_dict)
buf = StringIO()
try:
with redirect_stdout(buf):
fn(args)
except SystemExit:
pass
out = buf.getvalue().strip()
if not out:
return {"ok": False, "error": "command produced no output"}
try:
return json.loads(out)
except json.JSONDecodeError as e:
return {"ok": False, "error": f"invalid JSON output: {e}",
"raw": out[:500]}
# Mapa: nombre de tool MCP -> (cmd_func, default_kwargs)
MCP_DISPATCH = {
"info": (cmd_info, {}),
"node_list": (cmd_node_list, {"type": None, "status": None, "limit": 100}),
"node_show": (cmd_node_show, {}),
"node_search": (cmd_node_search, {"limit": 50}),
"node_create": (cmd_node_create, {"type": None, "description": None,
"notes": None}),
"node_update": (cmd_node_update, {"name": None, "type": None,
"status": None, "description": None,
"notes": None, "append_notes": False,
"tags": None,
"clear_group_id": False}),
"node_delete": (cmd_node_delete, {}),
"rel_create": (cmd_rel_create, {"name": None}),
"rel_delete": (cmd_rel_delete, {}),
"rel_list": (cmd_rel_list, {"from_id": None, "to_id": None,
"limit": 200}),
"table_list": (cmd_table_list, {}),
"table_page": (cmd_table_page, {"offset": 0, "limit": 50}),
"table_promote": (cmd_table_promote, {}),
"table_demote": (cmd_table_demote, {}),
"group_page": (cmd_group_page, {"offset": 0, "limit": 200}),
"enricher_list": (cmd_enricher_list, {"type": None}),
"enricher_run": (cmd_enricher_run, {"node": None, "params": None}),
"query": (cmd_query, {"limit": 100}),
}
def _mcp_dispatch(tool_name: str, args: dict) -> dict:
if tool_name not in MCP_DISPATCH:
return {"ok": False, "error": f"unknown tool: {tool_name}"}
fn, defaults = MCP_DISPATCH[tool_name]
merged = dict(defaults)
merged.update(args or {})
return _mcp_call_cmd(fn, merged)
def _mcp_log(msg: str) -> None:
line = f"[gx-cli mcp] {msg}\n"
sys.stderr.write(line)
sys.stderr.flush()
# Persistir tambien a gx-cli.log junto a app_db para auditoria.
try:
app_db = os.environ.get("GX_APP_DB", "")
if app_db:
with open(Path(app_db).parent / "gx-cli.log", "a",
encoding="utf-8") as f:
f.write(f"{_now_iso()} {line}")
except OSError:
pass
def cmd_mcp_server(_args) -> None:
"""Bucle JSON-RPC 2.0 stdio. Reads line-delimited JSON, writes responses."""
_mcp_log("server starting (pid=" + str(os.getpid()) + ")")
_mcp_log(f"GX_OPS_DB={os.environ.get('GX_OPS_DB', '')}")
_mcp_log(f"GX_APP_DB={os.environ.get('GX_APP_DB', '')}")
_mcp_log(f"GX_APP_DIR={os.environ.get('GX_APP_DIR', '')}")
def emit(obj: dict) -> None:
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
for raw in sys.stdin:
line = raw.strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError as e:
_mcp_log(f"bad json: {e}")
continue
method = req.get("method", "")
rpc_id = req.get("id")
params = req.get("params") or {}
_mcp_log(f"<- method={method} id={rpc_id}")
if method == "initialize":
emit({"jsonrpc": "2.0", "id": rpc_id, "result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "graph_explorer", "version": "0.1.0"},
}})
elif method == "notifications/initialized":
# Notification — no response.
pass
elif method == "tools/list":
emit({"jsonrpc": "2.0", "id": rpc_id,
"result": {"tools": MCP_TOOLS}})
elif method == "tools/call":
tool_name = params.get("name", "")
tool_args = params.get("arguments", {}) or {}
_mcp_log(f" call {tool_name}({tool_args})")
try:
result = _mcp_dispatch(tool_name, tool_args)
payload = json.dumps(result, ensure_ascii=False)
is_err = not result.get("ok", True)
emit({"jsonrpc": "2.0", "id": rpc_id, "result": {
"content": [{"type": "text", "text": payload}],
"isError": is_err,
}})
except Exception as e:
_mcp_log(f" exception: {e}")
emit({"jsonrpc": "2.0", "id": rpc_id, "error": {
"code": -32603, "message": str(e),
}})
elif method == "ping":
emit({"jsonrpc": "2.0", "id": rpc_id, "result": {}})
elif method.startswith("notifications/"):
# Otras notificaciones — ignorar.
pass
else:
if rpc_id is not None:
emit({"jsonrpc": "2.0", "id": rpc_id, "error": {
"code": -32601, "message": f"method not found: {method}",
}})
_mcp_log("stdin closed, exiting")
# ----------------------------------------------------------------------------
# argparse wiring
# ----------------------------------------------------------------------------
def main() -> None:
p = argparse.ArgumentParser(prog="gx-cli")
sub = p.add_subparsers(dest="cmd", required=True)
# info
sp = sub.add_parser("info", help="Show ops_db summary")
sp.set_defaults(fn=cmd_info)
# node
n = sub.add_parser("node").add_subparsers(dest="op", required=True)
sp = n.add_parser("create")
sp.add_argument("--name", required=True)
sp.add_argument("--type")
sp.add_argument("--description")
sp.add_argument("--notes",
help="texto libre del nodo (panel Note del Inspector). "
"Es lo que leen split_sentences y extract_iocs_text.")
sp.set_defaults(fn=cmd_node_create)
sp = n.add_parser("delete")
sp.add_argument("id")
sp.set_defaults(fn=cmd_node_delete)
sp = n.add_parser("update")
sp.add_argument("id")
sp.add_argument("--name")
sp.add_argument("--type")
sp.add_argument("--status")
sp.add_argument("--description")
sp.add_argument("--notes",
help="reemplaza el contenido de notes (panel Note). "
"Combinable con --append-notes para acumular.")
sp.add_argument("--append-notes", action="store_true",
help="anade --notes al final de las notas existentes "
"en vez de reemplazarlas (separador: doble newline).")
sp.add_argument("--tags",
help='JSON array o "tag1,tag2" CSV')
sp.add_argument("--clear-group-id", dest="clear_group_id",
action="store_true",
help="saca la entidad del grupo (UPDATE group_id = NULL). "
"Idempotente; combinable con otros campos. "
"Issue 0036d.")
sp.set_defaults(fn=cmd_node_update)
sp = n.add_parser("list")
sp.add_argument("--type")
sp.add_argument("--status")
sp.add_argument("--limit", type=int, default=100)
sp.set_defaults(fn=cmd_node_list)
sp = n.add_parser("show")
sp.add_argument("id")
sp.set_defaults(fn=cmd_node_show)
sp = n.add_parser("search")
sp.add_argument("query")
sp.add_argument("--limit", type=int, default=50)
sp.set_defaults(fn=cmd_node_search)
# relation
r = sub.add_parser("rel").add_subparsers(dest="op", required=True)
sp = r.add_parser("create")
sp.add_argument("--from", dest="from_id", required=True)
sp.add_argument("--to", dest="to_id", required=True)
sp.add_argument("--name")
sp.set_defaults(fn=cmd_rel_create)
sp = r.add_parser("delete")
sp.add_argument("id")
sp.set_defaults(fn=cmd_rel_delete)
sp = r.add_parser("list")
sp.add_argument("--from", dest="from_id")
sp.add_argument("--to", dest="to_id")
sp.add_argument("--limit", type=int, default=200)
sp.set_defaults(fn=cmd_rel_list)
# table
t = sub.add_parser("table").add_subparsers(dest="op", required=True)
sp = t.add_parser("list")
sp.set_defaults(fn=cmd_table_list)
sp = t.add_parser("promote")
sp.add_argument("table_id")
sp.add_argument("row_id")
sp.set_defaults(fn=cmd_table_promote)
sp = t.add_parser("demote")
sp.add_argument("id")
sp.set_defaults(fn=cmd_table_demote)
sp = t.add_parser("page")
sp.add_argument("table_id")
sp.add_argument("--offset", type=int, default=0)
sp.add_argument("--limit", type=int, default=50)
sp.set_defaults(fn=cmd_table_page)
# group (issue 0036b)
g = sub.add_parser("group").add_subparsers(dest="op", required=True)
sp = g.add_parser("page",
help="Lista entidades hijas de un Group (group_id=?)")
sp.add_argument("container_id")
sp.add_argument("--offset", type=int, default=0)
sp.add_argument("--limit", type=int, default=200)
sp.set_defaults(fn=cmd_group_page)
# 0035e: visual heredado del tipo mayoritario
sp = g.add_parser("visual",
help="Resuelve iconografia heredada del Group (homogeneo vs heterogeneo)")
sp.add_argument("container_id")
sp.set_defaults(fn=cmd_group_visual)
# enricher
e = sub.add_parser("enricher").add_subparsers(dest="op", required=True)
sp = e.add_parser("list")
sp.add_argument("--type")
sp.set_defaults(fn=cmd_enricher_list)
sp = e.add_parser("run")
sp.add_argument("enricher")
sp.add_argument("--node")
sp.add_argument("--params", help='JSON, default "{}"')
sp.set_defaults(fn=cmd_enricher_run)
# query
sp = sub.add_parser("query")
sp.add_argument("sql")
sp.add_argument("--limit", type=int, default=100)
sp.set_defaults(fn=cmd_query)
# mcp-server (JSON-RPC 2.0 stdio)
sp = sub.add_parser("mcp-server",
help="Run as MCP server reading JSON-RPC from stdin")
sp.set_defaults(fn=cmd_mcp_server)
args = p.parse_args()
args.fn(args)
if __name__ == "__main__":
main()