#!/usr/bin/env python3 """ gx-cli — CLI usado por el agente Claude del panel Chat de graph_explorer. Expone CRUD/promote/enricher/query sobre la operations.db activa, e incrementa un contador en graph_explorer.db tras cada mutacion para que el viewport se refresque automaticamente. Env vars (las setea graph_explorer al spawnear claude -p): GX_OPS_DB path absoluto a operations.db (mutable) GX_APP_DB path absoluto a graph_explorer.db (jobs + agent counters) GX_APP_DIR path al directorio de la app (para enrichers/cache) Salida: - Comandos de lectura: JSON en stdout. - Mutaciones: JSON con `{"ok": true, "id": "...", ...}`. - Errores: JSON con `{"ok": false, "error": "..."}` y exit code 1. """ from __future__ import annotations import argparse import json import os import sqlite3 import sys import time from contextlib import redirect_stdout from datetime import datetime, timezone from io import StringIO from pathlib import Path from types import SimpleNamespace # ---------------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------------- def _now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _now_ms() -> int: return int(time.time() * 1000) def _ops_db() -> str: p = os.environ.get("GX_OPS_DB", "") if not p: _die("GX_OPS_DB env var is empty") if not Path(p).exists(): _die(f"GX_OPS_DB does not exist: {p}") return p def _app_db() -> str: p = os.environ.get("GX_APP_DB", "") if not p: _die("GX_APP_DB env var is empty") return p def _emit(payload: dict) -> None: print(json.dumps(payload, ensure_ascii=False, default=str)) def _ok(**kwargs) -> None: out = {"ok": True} out.update(kwargs) _emit(out) def _die(msg: str, code: int = 1) -> None: _emit({"ok": False, "error": msg}) sys.exit(code) def _connect(path: str, *, readonly: bool = False) -> sqlite3.Connection: if readonly: uri = f"file:{path}?mode=ro" cn = sqlite3.connect(uri, uri=True) else: cn = sqlite3.connect(path) cn.row_factory = sqlite3.Row return cn def _bump_counter(op: str, detail: str = "") -> None: """Escribe un marker file que graph_explorer poll para refrescar viewport. Usamos un fichero en lugar de tabla SQLite porque graph_explorer.db esta abierto en WAL desde el lado Windows mientras gx-cli escribe desde WSL. El locking de WAL falla silenciosamente cross-filesystem-boundary (NTFS <-> 9p). Un marker file con stat() funciona en todos lados. """ # El marker vive junto a graph_explorer.db (mismo dir = misma escritura # rapida en /mnt/c). p = _app_db() marker = Path(p).parent / ".mutations.marker" try: ts = _now_ms() marker.write_text(f"{ts}\n{op}\n{detail}\n", encoding="utf-8") except OSError as e: sys.stderr.write(f"[gx-cli] warn: marker write failed: {e}\n") # ---------------------------------------------------------------------------- # Detect type heuristic (mirror de entity_ops::detect_type) # ---------------------------------------------------------------------------- def _detect_type(text: str) -> str: import re t = (text or "").strip() if not t: return "text" if re.fullmatch(r"[^\s@]+@[^\s@]+\.[^\s@]+", t): return "email" if re.fullmatch(r"(\d{1,3}\.){3}\d{1,3}", t): return "ip_address" if t.lower().startswith(("http://", "https://")): return "url" if re.fullmatch(r"[a-z0-9-]+(\.[a-z0-9-]+)+", t.lower()): return "domain" if re.fullmatch(r"\+?\d[\d\s-]{6,}\d", t): return "phone" return "text" # ---------------------------------------------------------------------------- # node ops # ---------------------------------------------------------------------------- def cmd_node_create(args) -> None: name = args.name type_ref = args.type or _detect_type(name) new_id = f"{type_ref}_{_now_ms()}" ts = _now_iso() src = "agent:gx-cli" description = args.description or "" cn = _connect(_ops_db()) try: cn.execute( "INSERT INTO entities (id, name, type_ref, description, source, " "created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", (new_id, name, type_ref, description, src, ts, ts), ) cn.commit() except sqlite3.IntegrityError as e: _die(f"insert failed: {e}") finally: cn.close() _bump_counter("node.create", new_id) _ok(id=new_id, name=name, type_ref=type_ref) def cmd_node_delete(args) -> None: cn = _connect(_ops_db()) try: cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,)) cn.execute( "DELETE FROM relations WHERE from_entity = ? OR to_entity = ?", (args.id, args.id), ) cn.commit() if cur.rowcount == 0: _die(f"entity not found: {args.id}", code=2) finally: cn.close() _bump_counter("node.delete", args.id) _ok(id=args.id) def cmd_node_update(args) -> None: sets, params = [], [] if args.name is not None: sets.append("name = ?") params.append(args.name) if args.type is not None: sets.append("type_ref = ?") params.append(args.type) if args.status is not None: if args.status not in ("active", "stale", "corrupted", "archived"): _die(f"invalid status: {args.status}") sets.append("status = ?") params.append(args.status) if args.description is not None: sets.append("description = ?") params.append(args.description) if args.tags is not None: try: tags = json.loads(args.tags) if args.tags.startswith("[") \ else [t.strip() for t in args.tags.split(",") if t.strip()] if not isinstance(tags, list): raise ValueError("tags must be array") except (json.JSONDecodeError, ValueError) as e: _die(f"bad tags: {e}") sets.append("tags = ?") params.append(json.dumps(tags)) if not sets: _die("no fields to update") sets.append("updated_at = ?") params.append(_now_iso()) params.append(args.id) cn = _connect(_ops_db()) try: cur = cn.execute( f"UPDATE entities SET {', '.join(sets)} WHERE id = ?", params ) cn.commit() if cur.rowcount == 0: _die(f"entity not found: {args.id}", code=2) finally: cn.close() _bump_counter("node.update", args.id) _ok(id=args.id, fields_set=len(sets) - 1) def cmd_node_list(args) -> None: sql = ("SELECT id, name, type_ref, status, updated_at FROM entities") where, params = [], [] if args.type: where.append("type_ref = ?") params.append(args.type) if args.status: where.append("status = ?") params.append(args.status) if where: sql += " WHERE " + " AND ".join(where) sql += " ORDER BY type_ref, name LIMIT ?" params.append(max(1, min(args.limit, 1000))) cn = _connect(_ops_db(), readonly=True) rows = [dict(r) for r in cn.execute(sql, params).fetchall()] cn.close() _emit({"ok": True, "count": len(rows), "rows": rows}) def cmd_node_show(args) -> None: cn = _connect(_ops_db(), readonly=True) row = cn.execute( "SELECT * FROM entities WHERE id = ?", (args.id,) ).fetchone() if row is None: cn.close() _die(f"entity not found: {args.id}", code=2) rec = dict(row) # metadata + tags llegan como JSON string — parsea para legibilidad for k in ("metadata", "tags"): if k in rec and isinstance(rec[k], str): try: rec[k] = json.loads(rec[k]) except json.JSONDecodeError: pass # neighbors via relations rels = cn.execute( "SELECT id, name, from_entity, to_entity FROM relations " "WHERE from_entity = ? OR to_entity = ?", (args.id, args.id), ).fetchall() neighbors = [] for r in rels: other = r["to_entity"] if r["from_entity"] == args.id else r["from_entity"] direction = "out" if r["from_entity"] == args.id else "in" neighbors.append({ "rel_id": r["id"], "rel_name": r["name"], "other_id": other, "direction": direction, }) cn.close() _emit({"ok": True, "entity": rec, "neighbors": neighbors}) def cmd_node_search(args) -> None: q = (args.query or "").strip() if not q: _die("empty query") # FTS5 prefix search por token tokens = [t for t in q.split() if t.replace("-", "").replace("_", "").isalnum()] if not tokens: _die("no usable tokens after sanitization") fts = " OR ".join(f"{t}*" for t in tokens) cn = _connect(_ops_db(), readonly=True) try: rows = cn.execute( "SELECT e.id, e.name, e.type_ref, e.status, " " bm25(entities_fts) AS rank " "FROM entities_fts f JOIN entities e ON e.id = f.id " "WHERE entities_fts MATCH ? ORDER BY rank LIMIT ?", (fts, max(1, min(args.limit, 200))), ).fetchall() except sqlite3.OperationalError as e: _die(f"FTS not available: {e}") finally: cn.close() _emit({"ok": True, "count": len(rows), "rows": [dict(r) for r in rows]}) # ---------------------------------------------------------------------------- # relation ops # ---------------------------------------------------------------------------- def cmd_rel_create(args) -> None: new_id = f"rel_{_now_ms()}" ts = _now_iso() name = args.name or "RELATED_TO" cn = _connect(_ops_db()) try: # verifica que existen los endpoints for entity_id in (args.from_id, args.to_id): r = cn.execute( "SELECT 1 FROM entities WHERE id = ?", (entity_id,) ).fetchone() if r is None: _die(f"entity not found: {entity_id}", code=2) cn.execute( "INSERT INTO relations (id, name, from_entity, to_entity, " "created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", (new_id, name, args.from_id, args.to_id, ts, ts), ) cn.commit() finally: cn.close() _bump_counter("rel.create", new_id) _ok(id=new_id, name=name, from_id=args.from_id, to_id=args.to_id) def cmd_rel_delete(args) -> None: cn = _connect(_ops_db()) try: cur = cn.execute("DELETE FROM relations WHERE id = ?", (args.id,)) cn.commit() if cur.rowcount == 0: _die(f"relation not found: {args.id}", code=2) finally: cn.close() _bump_counter("rel.delete", args.id) _ok(id=args.id) def cmd_rel_list(args) -> None: sql = ("SELECT id, name, from_entity, to_entity, status FROM relations") where, params = [], [] if args.from_id: where.append("from_entity = ?") params.append(args.from_id) if args.to_id: where.append("to_entity = ?") params.append(args.to_id) if where: sql += " WHERE " + " AND ".join(where) sql += " ORDER BY name LIMIT ?" params.append(max(1, min(args.limit, 2000))) cn = _connect(_ops_db(), readonly=True) rows = [dict(r) for r in cn.execute(sql, params).fetchall()] cn.close() _emit({"ok": True, "count": len(rows), "rows": rows}) # ---------------------------------------------------------------------------- # table ops (Table-typed entities backed by DuckDB) # ---------------------------------------------------------------------------- def cmd_table_list(args) -> None: cn = _connect(_ops_db(), readonly=True) rows = cn.execute( "SELECT id, name, metadata FROM entities WHERE type_ref = 'Table' " "ORDER BY name" ).fetchall() cn.close() out = [] for r in rows: m = {} try: m = json.loads(r["metadata"] or "{}") except json.JSONDecodeError: pass out.append({ "id": r["id"], "name": r["name"], "duckdb_path": m.get("duckdb_path", ""), "table_name": m.get("table_name", ""), "row_type": m.get("row_type", ""), "columns": m.get("columns", []), }) _emit({"ok": True, "count": len(out), "tables": out}) def cmd_table_promote(args) -> None: """Promociona una fila DuckDB a entidad. Replica tableview_promote_row.""" cn = _connect(_ops_db()) try: row = cn.execute( "SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'", (args.table_id,), ).fetchone() if row is None: _die(f"Table not found: {args.table_id}", code=2) meta = json.loads(row["metadata"] or "{}") duckdb_path = meta.get("duckdb_path", "") table_name = meta.get("table_name", "") row_type = meta.get("row_type") or "row" label_column = meta.get("label_column") or "name" id_column = meta.get("id_column") or "id" if not duckdb_path or not table_name: _die(f"Table {args.table_id} has no duckdb_path/table_name") # resolver path relativo respecto al dir de operations.db if not Path(duckdb_path).is_absolute(): duckdb_path = str(Path(_ops_db()).parent / duckdb_path) # idempotencia: existe ya entidad con metadata.source.row_id? prom_id = f"prom_{row_type}_{args.row_id}".replace(" ", "_") existing = cn.execute( "SELECT id FROM entities WHERE id = ?", (prom_id,) ).fetchone() if existing: _ok(id=existing["id"], promoted=False, message="already promoted") return # leer la fila desde DuckDB try: import duckdb # type: ignore except ImportError: _die("duckdb python module not installed") ddb = duckdb.connect(duckdb_path, read_only=True) cur = ddb.execute( f'SELECT * FROM "{table_name}" WHERE CAST("{id_column}" AS VARCHAR) = ?', [args.row_id], ) cols = [d[0] for d in cur.description] vals = cur.fetchone() ddb.close() if vals is None: _die(f"row not found: {args.row_id}") row_dict = dict(zip(cols, vals)) label = str(row_dict.get(label_column, args.row_id)) ts = _now_iso() meta_out = { "source": {"duckdb": meta.get("duckdb_path"), "table": table_name, "row_id": args.row_id}, **{k: (v if isinstance(v, (str, int, float, bool)) or v is None else str(v)) for k, v in row_dict.items()}, } cn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " "created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", (prom_id, label, row_type, "agent:gx-cli", json.dumps(meta_out, default=str), ts, ts), ) # CONTAINS_ROW relacion (idempotente) rel_id = f"rel_contains_{prom_id}" cn.execute( "INSERT OR IGNORE INTO relations (id, name, from_entity, to_entity, " "created_at, updated_at) VALUES (?, 'CONTAINS_ROW', ?, ?, ?, ?)", (rel_id, args.table_id, prom_id, ts, ts), ) cn.commit() finally: cn.close() _bump_counter("table.promote", prom_id) _ok(id=prom_id, promoted=True) def cmd_table_demote(args) -> None: cn = _connect(_ops_db()) try: cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,)) cn.execute( "DELETE FROM relations WHERE from_entity = ? OR to_entity = ?", (args.id, args.id), ) cn.commit() if cur.rowcount == 0: _die(f"entity not found: {args.id}", code=2) finally: cn.close() _bump_counter("table.demote", args.id) _ok(id=args.id) def cmd_table_page(args) -> None: cn = _connect(_ops_db(), readonly=True) row = cn.execute( "SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'", (args.table_id,), ).fetchone() cn.close() if row is None: _die(f"Table not found: {args.table_id}", code=2) meta = json.loads(row["metadata"] or "{}") duckdb_path = meta.get("duckdb_path", "") table_name = meta.get("table_name", "") if not duckdb_path or not table_name: _die("metadata sin duckdb_path/table_name") if not Path(duckdb_path).is_absolute(): duckdb_path = str(Path(_ops_db()).parent / duckdb_path) cols = meta.get("columns") or [] id_col = meta.get("id_column") or "id" try: import duckdb # type: ignore except ImportError: _die("duckdb python module not installed") ddb = duckdb.connect(duckdb_path, read_only=True) select = f'"{id_col}"' + ( ", " + ", ".join(f'"{c}"' for c in cols) if cols else "" ) sql = f'SELECT {select} FROM "{table_name}" ORDER BY "{id_col}" LIMIT ? OFFSET ?' cur = ddb.execute(sql, [max(1, min(args.limit, 200)), max(0, args.offset)]) names = [d[0] for d in cur.description] rows = [dict(zip(names, [str(v) if v is not None else None for v in r])) for r in cur.fetchall()] total = ddb.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0] ddb.close() _emit({"ok": True, "table": args.table_id, "total": total, "offset": args.offset, "limit": args.limit, "rows": rows}) # ---------------------------------------------------------------------------- # enricher ops # ---------------------------------------------------------------------------- def _app_dir() -> str: p = os.environ.get("GX_APP_DIR", "") if p and Path(p).exists(): return p # fallback: dir donde vive este script return str(Path(__file__).resolve().parent) def _enrichers_dir() -> Path: return Path(_app_dir()) / "enrichers" def cmd_enricher_list(args) -> None: out = [] edir = _enrichers_dir() if not edir.is_dir(): _emit({"ok": True, "count": 0, "enrichers": []}) return for sub in sorted(edir.iterdir()): manifest = sub / "manifest.yaml" if not manifest.is_file(): continue m = _parse_yaml_minimal(manifest.read_text(encoding="utf-8")) applies_to = m.get("applies_to", []) or [] if args.type and applies_to and args.type not in applies_to: continue out.append({ "id": m.get("id", sub.name), "name": m.get("name", sub.name), "description": m.get("description", ""), "applies_to": applies_to, }) _emit({"ok": True, "count": len(out), "enrichers": out}) def _parse_yaml_minimal(text: str) -> dict: """Mini parser: top-level scalars + listas inline. Suficiente para los manifests de enrichers que ya tenemos. No es YAML completo.""" out: dict = {} for raw in text.splitlines(): line = raw.split("#", 1)[0].rstrip() if not line or not line[0].isalpha(): continue if ":" not in line: continue k, _, v = line.partition(":") k = k.strip() v = v.strip() if v.startswith("[") and v.endswith("]"): inner = v[1:-1].strip() out[k] = [x.strip().strip('"').strip("'") for x in inner.split(",") if x.strip()] elif v.startswith('"') and v.endswith('"'): out[k] = v[1:-1] elif v.startswith("'") and v.endswith("'"): out[k] = v[1:-1] else: out[k] = v return out def cmd_enricher_run(args) -> None: """Inserta un job en la cola agent_jobs. main.cpp lo recoge cada frame y lo somete via jobs_submit (que arranca el subprocess). Asi reusamos el pool de workers existente sin duplicar logica.""" edir = _enrichers_dir() if not (edir / args.enricher / "manifest.yaml").is_file(): _die(f"enricher not found: {args.enricher}") if args.node: cn = _connect(_ops_db(), readonly=True) r = cn.execute( "SELECT id, name FROM entities WHERE id = ?", (args.node,) ).fetchone() cn.close() if r is None: _die(f"node not found: {args.node}", code=2) node_name = r["name"] else: node_name = "" req_id = f"areq_{_now_ms()}" try: cn = sqlite3.connect(_app_db()) cn.execute( "CREATE TABLE IF NOT EXISTS agent_jobs (" " id TEXT PRIMARY KEY," " enricher_id TEXT NOT NULL," " node_id TEXT NOT NULL DEFAULT ''," " node_name TEXT NOT NULL DEFAULT ''," " params_json TEXT NOT NULL DEFAULT '{}'," " created_at INTEGER NOT NULL)" ) cn.execute( "INSERT INTO agent_jobs (id, enricher_id, node_id, node_name, " "params_json, created_at) VALUES (?, ?, ?, ?, ?, ?)", (req_id, args.enricher, args.node or "", node_name, args.params or "{}", _now_ms()), ) cn.commit() cn.close() except sqlite3.Error as e: _die(f"could not enqueue: {e}") _ok(request_id=req_id, enricher=args.enricher, node=args.node or "", message="job encolado, lo recoge el panel Jobs") # ---------------------------------------------------------------------------- # query (read-only SELECT) # ---------------------------------------------------------------------------- def cmd_query(args) -> None: sql = (args.sql or "").strip() low = sql.lower().lstrip("(").lstrip() if not (low.startswith("select") or low.startswith("with")): _die("only SELECT/WITH queries allowed") forbidden = ("attach", "detach", "pragma writable", "vacuum") if any(f in low for f in forbidden): _die(f"forbidden token in query") cn = _connect(_ops_db(), readonly=True) try: cur = cn.execute(sql) names = [d[0] for d in cur.description] if cur.description else [] rows = [dict(zip(names, [str(v) if v is not None else None for v in r])) for r in cur.fetchmany(max(1, min(args.limit, 500)))] except sqlite3.Error as e: cn.close() _die(f"sql error: {e}") cn.close() _emit({"ok": True, "count": len(rows), "columns": names, "rows": rows}) # ---------------------------------------------------------------------------- # info # ---------------------------------------------------------------------------- def cmd_info(args) -> None: info = { "ok": True, "ops_db": os.environ.get("GX_OPS_DB", ""), "app_db": os.environ.get("GX_APP_DB", ""), "app_dir": _app_dir(), } try: cn = _connect(_ops_db(), readonly=True) info["entity_count"] = cn.execute( "SELECT COUNT(*) FROM entities").fetchone()[0] info["relation_count"] = cn.execute( "SELECT COUNT(*) FROM relations").fetchone()[0] info["types"] = [r[0] for r in cn.execute( "SELECT DISTINCT type_ref FROM entities ORDER BY type_ref" ).fetchall()] cn.close() except sqlite3.Error as e: info["error"] = str(e) _emit(info) # ---------------------------------------------------------------------------- # MCP server (JSON-RPC 2.0 stdio) # # Cuando se invoca como `gx-cli mcp-server`, el script entra en bucle # leyendo lineas JSON-RPC en stdin y emitiendo respuestas en stdout. Los # logs van a stderr (claude los recoge en su propio log). # # Reutilizamos los `cmd_*` capturando stdout: cada cmd_* emite UN unico JSON # con _emit, asi que captura + json.loads = resultado de la tool. Mas simple # que reescribir 17 funciones. # ---------------------------------------------------------------------------- MCP_TOOLS = [ {"name": "info", "description": "Resumen del operations.db: contadores de entidades y relaciones, tipos distintos. Llamar primero al empezar para tener contexto.", "inputSchema": {"type": "object", "properties": {}}}, {"name": "node_list", "description": "Lista entidades del grafo. Filtros opcionales por type_ref y status.", "inputSchema": {"type": "object", "properties": { "type": {"type": "string", "description": "Filtra por type_ref exacto"}, "status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]}, "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 1000}}}}, {"name": "node_show", "description": "Devuelve la entidad completa (metadata, tags, descripcion, status) y todos sus vecinos 1-hop con direccion (in/out).", "inputSchema": {"type": "object", "properties": { "id": {"type": "string", "description": "ID de la entidad"}}, "required": ["id"]}}, {"name": "node_search", "description": "Busqueda FTS5 sobre name, description, tags. Tokeniza por whitespace y aplica busqueda por prefijo. Devuelve hits ordenados por relevancia.", "inputSchema": {"type": "object", "properties": { "query": {"type": "string"}, "limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}}, "required": ["query"]}}, {"name": "node_create", "description": "Crea una entidad nueva. Si type se omite, se infiere heuristicamente del name (email/url/domain/ip/phone/text).", "inputSchema": {"type": "object", "properties": { "name": {"type": "string"}, "type": {"type": "string", "description": "Opcional. Auto-detectado si se omite."}, "description": {"type": "string"}}, "required": ["name"]}}, {"name": "node_update", "description": "Modifica campos de una entidad existente. Al menos un campo aparte de id debe pasarse.", "inputSchema": {"type": "object", "properties": { "id": {"type": "string"}, "name": {"type": "string"}, "type": {"type": "string"}, "status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]}, "description": {"type": "string"}, "tags": {"type": "string", "description": "JSON array literal o CSV 'a,b,c'"}}, "required": ["id"]}}, {"name": "node_delete", "description": "Borra la entidad y todas sus relaciones. Irreversible. Confirmar con el usuario antes.", "inputSchema": {"type": "object", "properties": { "id": {"type": "string"}}, "required": ["id"]}}, {"name": "rel_create", "description": "Crea una relacion dirigida from_id -> to_id. Ambas entidades deben existir.", "inputSchema": {"type": "object", "properties": { "from_id": {"type": "string"}, "to_id": {"type": "string"}, "name": {"type": "string", "default": "RELATED_TO", "description": "Nombre semantico ej: KNOWS, OWNS, BELONGS_TO"}}, "required": ["from_id", "to_id"]}}, {"name": "rel_delete", "description": "Borra una relacion por id.", "inputSchema": {"type": "object", "properties": { "id": {"type": "string"}}, "required": ["id"]}}, {"name": "rel_list", "description": "Lista relaciones, opcionalmente filtradas por endpoint.", "inputSchema": {"type": "object", "properties": { "from_id": {"type": "string"}, "to_id": {"type": "string"}, "limit": {"type": "integer", "default": 200, "minimum": 1, "maximum": 2000}}}}, {"name": "table_list", "description": "Lista entidades de tipo Table (datasets DuckDB respaldando filas tabulares).", "inputSchema": {"type": "object", "properties": {}}}, {"name": "table_page", "description": "Lee una pagina de filas de la tabla DuckDB asociada a un nodo Table.", "inputSchema": {"type": "object", "properties": { "table_id": {"type": "string"}, "offset": {"type": "integer", "default": 0, "minimum": 0}, "limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}}, "required": ["table_id"]}}, {"name": "table_promote", "description": "Promociona una fila DuckDB a entidad del grafo (idempotente: si ya existe, devuelve la misma).", "inputSchema": {"type": "object", "properties": { "table_id": {"type": "string"}, "row_id": {"type": "string"}}, "required": ["table_id", "row_id"]}}, {"name": "table_demote", "description": "Borra la entidad promovida. La fila DuckDB queda intacta.", "inputSchema": {"type": "object", "properties": { "id": {"type": "string"}}, "required": ["id"]}}, {"name": "enricher_list", "description": "Lista enrichers cargados. Si se pasa type, filtra por applies_to.", "inputSchema": {"type": "object", "properties": { "type": {"type": "string"}}}}, {"name": "enricher_run", "description": "Encola un enricher para correr async sobre un nodo. graph_explorer tomara el job en su pool de workers; el viewport se refresca al terminar.", "inputSchema": {"type": "object", "properties": { "enricher": {"type": "string", "description": "ID del enricher (ej: fetch_webpage)"}, "node": {"type": "string", "description": "ID del nodo objetivo. Opcional para enrichers globales."}, "params": {"type": "string", "description": "JSON object stringified con params. Default '{}'."}}, "required": ["enricher"]}}, {"name": "query", "description": "Ejecuta SQL read-only (SELECT/WITH) sobre operations.db. Util para queries no cubiertas por las otras tools (agregaciones, joins).", "inputSchema": {"type": "object", "properties": { "sql": {"type": "string"}, "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 500}}, "required": ["sql"]}}, ] def _mcp_call_cmd(fn, args_dict: dict) -> dict: """Llama un cmd_* capturando su stdout y devolviendo el JSON parseado. Si el cmd hace sys.exit(1) (via _die), recoge el JSON de error igualmente.""" args = SimpleNamespace(**args_dict) buf = StringIO() try: with redirect_stdout(buf): fn(args) except SystemExit: pass out = buf.getvalue().strip() if not out: return {"ok": False, "error": "command produced no output"} try: return json.loads(out) except json.JSONDecodeError as e: return {"ok": False, "error": f"invalid JSON output: {e}", "raw": out[:500]} # Mapa: nombre de tool MCP -> (cmd_func, default_kwargs) MCP_DISPATCH = { "info": (cmd_info, {}), "node_list": (cmd_node_list, {"type": None, "status": None, "limit": 100}), "node_show": (cmd_node_show, {}), "node_search": (cmd_node_search, {"limit": 50}), "node_create": (cmd_node_create, {"type": None, "description": None}), "node_update": (cmd_node_update, {"name": None, "type": None, "status": None, "description": None, "tags": None}), "node_delete": (cmd_node_delete, {}), "rel_create": (cmd_rel_create, {"name": None}), "rel_delete": (cmd_rel_delete, {}), "rel_list": (cmd_rel_list, {"from_id": None, "to_id": None, "limit": 200}), "table_list": (cmd_table_list, {}), "table_page": (cmd_table_page, {"offset": 0, "limit": 50}), "table_promote": (cmd_table_promote, {}), "table_demote": (cmd_table_demote, {}), "enricher_list": (cmd_enricher_list, {"type": None}), "enricher_run": (cmd_enricher_run, {"node": None, "params": None}), "query": (cmd_query, {"limit": 100}), } def _mcp_dispatch(tool_name: str, args: dict) -> dict: if tool_name not in MCP_DISPATCH: return {"ok": False, "error": f"unknown tool: {tool_name}"} fn, defaults = MCP_DISPATCH[tool_name] merged = dict(defaults) merged.update(args or {}) return _mcp_call_cmd(fn, merged) def _mcp_log(msg: str) -> None: sys.stderr.write(f"[gx-cli mcp] {msg}\n") sys.stderr.flush() def cmd_mcp_server(_args) -> None: """Bucle JSON-RPC 2.0 stdio. Reads line-delimited JSON, writes responses.""" _mcp_log("server starting (pid=" + str(os.getpid()) + ")") _mcp_log(f"GX_OPS_DB={os.environ.get('GX_OPS_DB', '')}") _mcp_log(f"GX_APP_DB={os.environ.get('GX_APP_DB', '')}") _mcp_log(f"GX_APP_DIR={os.environ.get('GX_APP_DIR', '')}") def emit(obj: dict) -> None: sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n") sys.stdout.flush() for raw in sys.stdin: line = raw.strip() if not line: continue try: req = json.loads(line) except json.JSONDecodeError as e: _mcp_log(f"bad json: {e}") continue method = req.get("method", "") rpc_id = req.get("id") params = req.get("params") or {} _mcp_log(f"<- method={method} id={rpc_id}") if method == "initialize": emit({"jsonrpc": "2.0", "id": rpc_id, "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": False}}, "serverInfo": {"name": "graph_explorer", "version": "0.1.0"}, }}) elif method == "notifications/initialized": # Notification — no response. pass elif method == "tools/list": emit({"jsonrpc": "2.0", "id": rpc_id, "result": {"tools": MCP_TOOLS}}) elif method == "tools/call": tool_name = params.get("name", "") tool_args = params.get("arguments", {}) or {} _mcp_log(f" call {tool_name}({tool_args})") try: result = _mcp_dispatch(tool_name, tool_args) payload = json.dumps(result, ensure_ascii=False) is_err = not result.get("ok", True) emit({"jsonrpc": "2.0", "id": rpc_id, "result": { "content": [{"type": "text", "text": payload}], "isError": is_err, }}) except Exception as e: _mcp_log(f" exception: {e}") emit({"jsonrpc": "2.0", "id": rpc_id, "error": { "code": -32603, "message": str(e), }}) elif method == "ping": emit({"jsonrpc": "2.0", "id": rpc_id, "result": {}}) elif method.startswith("notifications/"): # Otras notificaciones — ignorar. pass else: if rpc_id is not None: emit({"jsonrpc": "2.0", "id": rpc_id, "error": { "code": -32601, "message": f"method not found: {method}", }}) _mcp_log("stdin closed, exiting") # ---------------------------------------------------------------------------- # argparse wiring # ---------------------------------------------------------------------------- def main() -> None: p = argparse.ArgumentParser(prog="gx-cli") sub = p.add_subparsers(dest="cmd", required=True) # info sp = sub.add_parser("info", help="Show ops_db summary") sp.set_defaults(fn=cmd_info) # node n = sub.add_parser("node").add_subparsers(dest="op", required=True) sp = n.add_parser("create") sp.add_argument("--name", required=True) sp.add_argument("--type") sp.add_argument("--description") sp.set_defaults(fn=cmd_node_create) sp = n.add_parser("delete") sp.add_argument("id") sp.set_defaults(fn=cmd_node_delete) sp = n.add_parser("update") sp.add_argument("id") sp.add_argument("--name") sp.add_argument("--type") sp.add_argument("--status") sp.add_argument("--description") sp.add_argument("--tags", help='JSON array o "tag1,tag2" CSV') sp.set_defaults(fn=cmd_node_update) sp = n.add_parser("list") sp.add_argument("--type") sp.add_argument("--status") sp.add_argument("--limit", type=int, default=100) sp.set_defaults(fn=cmd_node_list) sp = n.add_parser("show") sp.add_argument("id") sp.set_defaults(fn=cmd_node_show) sp = n.add_parser("search") sp.add_argument("query") sp.add_argument("--limit", type=int, default=50) sp.set_defaults(fn=cmd_node_search) # relation r = sub.add_parser("rel").add_subparsers(dest="op", required=True) sp = r.add_parser("create") sp.add_argument("--from", dest="from_id", required=True) sp.add_argument("--to", dest="to_id", required=True) sp.add_argument("--name") sp.set_defaults(fn=cmd_rel_create) sp = r.add_parser("delete") sp.add_argument("id") sp.set_defaults(fn=cmd_rel_delete) sp = r.add_parser("list") sp.add_argument("--from", dest="from_id") sp.add_argument("--to", dest="to_id") sp.add_argument("--limit", type=int, default=200) sp.set_defaults(fn=cmd_rel_list) # table t = sub.add_parser("table").add_subparsers(dest="op", required=True) sp = t.add_parser("list") sp.set_defaults(fn=cmd_table_list) sp = t.add_parser("promote") sp.add_argument("table_id") sp.add_argument("row_id") sp.set_defaults(fn=cmd_table_promote) sp = t.add_parser("demote") sp.add_argument("id") sp.set_defaults(fn=cmd_table_demote) sp = t.add_parser("page") sp.add_argument("table_id") sp.add_argument("--offset", type=int, default=0) sp.add_argument("--limit", type=int, default=50) sp.set_defaults(fn=cmd_table_page) # enricher e = sub.add_parser("enricher").add_subparsers(dest="op", required=True) sp = e.add_parser("list") sp.add_argument("--type") sp.set_defaults(fn=cmd_enricher_list) sp = e.add_parser("run") sp.add_argument("enricher") sp.add_argument("--node") sp.add_argument("--params", help='JSON, default "{}"') sp.set_defaults(fn=cmd_enricher_run) # query sp = sub.add_parser("query") sp.add_argument("sql") sp.add_argument("--limit", type=int, default=100) sp.set_defaults(fn=cmd_query) # mcp-server (JSON-RPC 2.0 stdio) sp = sub.add_parser("mcp-server", help="Run as MCP server reading JSON-RPC from stdin") sp.set_defaults(fn=cmd_mcp_server) args = p.parse_args() args.fn(args) if __name__ == "__main__": main()