#!/usr/bin/env python3
"""
gx-cli — CLI usado por el agente Claude del panel Chat de graph_explorer.

Expone CRUD/promote/enricher/query sobre la operations.db activa, e incrementa
un contador en graph_explorer.db tras cada mutacion para que el viewport se
refresque automaticamente.

Env vars (las setea graph_explorer al spawnear claude -p):
  GX_OPS_DB    path absoluto a operations.db (mutable)
  GX_APP_DB    path absoluto a graph_explorer.db (jobs + agent counters)
  GX_APP_DIR   path al directorio de la app (para enrichers/cache)

Salida:
  - Comandos de lectura: JSON en stdout.
  - Mutaciones: JSON con `{"ok": true, "id": "...", ...}`.
  - Errores: JSON con `{"ok": false, "error": "..."}` y exit code 1.
"""
from __future__ import annotations

import argparse
import json
import os
import sqlite3
import sys
import time
from contextlib import redirect_stdout
from datetime import datetime, timezone
from io import StringIO
from pathlib import Path
from types import SimpleNamespace


# ----------------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------------

def _now_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _now_ms() -> int:
    return int(time.time() * 1000)


def _ops_db() -> str:
    p = os.environ.get("GX_OPS_DB", "")
    if not p:
        _die("GX_OPS_DB env var is empty")
    if not Path(p).exists():
        _die(f"GX_OPS_DB does not exist: {p}")
    return p


def _app_db() -> str:
    p = os.environ.get("GX_APP_DB", "")
    if not p:
        _die("GX_APP_DB env var is empty")
    return p


def _log(tag: str, msg: str) -> None:
    """Log a stderr y al fichero gx-cli.log junto a app_db (mismo dir
    que chat.log y .mutations.marker). El fichero permite auditar lo
    que el agente Echo hace cuando algo va mal — `_emit` solo va al
    stdout de la herramienta y se pierde en pipelines MCP."""
    line = f"[gx-cli {tag}] {msg}\n"
    sys.stderr.write(line)
    sys.stderr.flush()
    try:
        app_db = os.environ.get("GX_APP_DB", "")
        if app_db:
            log_path = Path(app_db).parent / "gx-cli.log"
            with open(log_path, "a", encoding="utf-8") as f:
                f.write(f"{_now_iso()} {line}")
    except OSError:
        pass


def _emit(payload: dict) -> None:
    print(json.dumps(payload, ensure_ascii=False, default=str))


def _ok(**kwargs) -> None:
    out = {"ok": True}
    out.update(kwargs)
    _emit(out)


def _die(msg: str, code: int = 1) -> None:
    _emit({"ok": False, "error": msg})
    sys.exit(code)


def _connect(path: str, *, readonly: bool = False) -> sqlite3.Connection:
    if readonly:
        uri = f"file:{path}?mode=ro"
        cn = sqlite3.connect(uri, uri=True)
    else:
        cn = sqlite3.connect(path)
    cn.row_factory = sqlite3.Row
    return cn


def _bump_counter(op: str, detail: str = "") -> None:
    """Escribe un marker file que graph_explorer poll para refrescar viewport.

    Usamos un fichero en lugar de tabla SQLite porque graph_explorer.db esta
    abierto en WAL desde el lado Windows mientras gx-cli escribe desde WSL.
    El locking de WAL falla silenciosamente cross-filesystem-boundary
    (NTFS <-> 9p). Un marker file con stat() funciona en todos lados.
    """
    # El marker vive junto a graph_explorer.db (mismo dir = misma escritura
    # rapida en /mnt/c).
    p = _app_db()
    marker = Path(p).parent / ".mutations.marker"
    try:
        ts = _now_ms()
        marker.write_text(f"{ts}\n{op}\n{detail}\n", encoding="utf-8")
    except OSError as e:
        sys.stderr.write(f"[gx-cli] warn: marker write failed: {e}\n")


# ----------------------------------------------------------------------------
# Detect type heuristic (mirror de entity_ops::detect_type)
# ----------------------------------------------------------------------------

def _detect_type(text: str) -> str:
    import re
    t = (text or "").strip()
    if not t:
        return "text"
    if re.fullmatch(r"[^\s@]+@[^\s@]+\.[^\s@]+", t):
        return "email"
    if re.fullmatch(r"(\d{1,3}\.){3}\d{1,3}", t):
        return "ip_address"
    if t.lower().startswith(("http://", "https://")):
        return "url"
    if re.fullmatch(r"[a-z0-9-]+(\.[a-z0-9-]+)+", t.lower()):
        return "domain"
    if re.fullmatch(r"\+?\d[\d\s-]{6,}\d", t):
        return "phone"
    return "text"


# ----------------------------------------------------------------------------
# node ops
# ----------------------------------------------------------------------------

def cmd_node_create(args) -> None:
    name = args.name
    type_ref = args.type or _detect_type(name)
    new_id = f"{type_ref}_{_now_ms()}"
    ts = _now_iso()
    src = "agent:gx-cli"
    description = args.description or ""
    notes = args.notes or ""

    _log("node_create",
         f"name={name!r} type={type_ref} notes_len={len(notes)} id={new_id}")
    cn = _connect(_ops_db())
    try:
        cn.execute(
            "INSERT INTO entities (id, name, type_ref, description, notes, "
            "source, created_at, updated_at) "
            "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
            (new_id, name, type_ref, description, notes, src, ts, ts),
        )
        cn.commit()
    except sqlite3.IntegrityError as e:
        _log("node_create", f"FAILED insert: {e}")
        _die(f"insert failed: {e}")
    finally:
        cn.close()

    _bump_counter("node.create", new_id)
    _ok(id=new_id, name=name, type_ref=type_ref)


def cmd_node_delete(args) -> None:
    _log("node_delete", f"id={args.id}")
    cn = _connect(_ops_db())
    try:
        cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
        cn.execute(
            "DELETE FROM relations WHERE from_entity = ? OR to_entity = ?",
            (args.id, args.id),
        )
        cn.commit()
        if cur.rowcount == 0:
            _log("node_delete", f"FAILED not found: {args.id}")
            _die(f"entity not found: {args.id}", code=2)
    finally:
        cn.close()
    _bump_counter("node.delete", args.id)
    _ok(id=args.id)


def cmd_node_update(args) -> None:
    sets, params = [], []
    if args.name is not None:
        sets.append("name = ?")
        params.append(args.name)
    if args.type is not None:
        sets.append("type_ref = ?")
        params.append(args.type)
    if args.status is not None:
        if args.status not in ("active", "stale", "corrupted", "archived"):
            _die(f"invalid status: {args.status}")
        sets.append("status = ?")
        params.append(args.status)
    if args.description is not None:
        sets.append("description = ?")
        params.append(args.description)
    if args.notes is not None:
        if args.append_notes and args.notes:
            sets.append("notes = COALESCE(notes, '') || ?")
            # separador con doble newline para legibilidad si ya hay contenido
            params.append("\n\n" + args.notes)
        else:
            sets.append("notes = ?")
            params.append(args.notes)
    if args.tags is not None:
        try:
            tags = json.loads(args.tags) if args.tags.startswith("[") \
                else [t.strip() for t in args.tags.split(",") if t.strip()]
            if not isinstance(tags, list):
                raise ValueError("tags must be array")
        except (json.JSONDecodeError, ValueError) as e:
            _die(f"bad tags: {e}")
        sets.append("tags = ?")
        params.append(json.dumps(tags))
    if not sets:
        _die("no fields to update")

    sets.append("updated_at = ?")
    params.append(_now_iso())
    params.append(args.id)

    _log("node_update",
         f"id={args.id} fields={[s.split(' = ')[0] for s in sets[:-1]]}")
    cn = _connect(_ops_db())
    try:
        cur = cn.execute(
            f"UPDATE entities SET {', '.join(sets)} WHERE id = ?", params
        )
        cn.commit()
        if cur.rowcount == 0:
            _log("node_update", f"FAILED not found: {args.id}")
            _die(f"entity not found: {args.id}", code=2)
    finally:
        cn.close()
    _bump_counter("node.update", args.id)
    _ok(id=args.id, fields_set=len(sets) - 1)


def cmd_node_list(args) -> None:
    sql = ("SELECT id, name, type_ref, status, updated_at FROM entities")
    where, params = [], []
    if args.type:
        where.append("type_ref = ?")
        params.append(args.type)
    if args.status:
        where.append("status = ?")
        params.append(args.status)
    if where:
        sql += " WHERE " + " AND ".join(where)
    sql += " ORDER BY type_ref, name LIMIT ?"
    params.append(max(1, min(args.limit, 1000)))

    cn = _connect(_ops_db(), readonly=True)
    rows = [dict(r) for r in cn.execute(sql, params).fetchall()]
    cn.close()
    _emit({"ok": True, "count": len(rows), "rows": rows})


def cmd_node_show(args) -> None:
    cn = _connect(_ops_db(), readonly=True)
    row = cn.execute(
        "SELECT * FROM entities WHERE id = ?", (args.id,)
    ).fetchone()
    if row is None:
        cn.close()
        _die(f"entity not found: {args.id}", code=2)

    rec = dict(row)
    # metadata + tags llegan como JSON string — parsea para legibilidad
    for k in ("metadata", "tags"):
        if k in rec and isinstance(rec[k], str):
            try:
                rec[k] = json.loads(rec[k])
            except json.JSONDecodeError:
                pass

    # neighbors via relations
    rels = cn.execute(
        "SELECT id, name, from_entity, to_entity FROM relations "
        "WHERE from_entity = ? OR to_entity = ?",
        (args.id, args.id),
    ).fetchall()
    neighbors = []
    for r in rels:
        other = r["to_entity"] if r["from_entity"] == args.id else r["from_entity"]
        direction = "out" if r["from_entity"] == args.id else "in"
        neighbors.append({
            "rel_id": r["id"], "rel_name": r["name"],
            "other_id": other, "direction": direction,
        })
    cn.close()
    _emit({"ok": True, "entity": rec, "neighbors": neighbors})


def cmd_node_search(args) -> None:
    q = (args.query or "").strip()
    if not q:
        _die("empty query")
    # FTS5 prefix search por token
    tokens = [t for t in q.split() if t.replace("-", "").replace("_", "").isalnum()]
    if not tokens:
        _die("no usable tokens after sanitization")
    fts = " OR ".join(f"{t}*" for t in tokens)

    cn = _connect(_ops_db(), readonly=True)
    try:
        rows = cn.execute(
            "SELECT e.id, e.name, e.type_ref, e.status, "
            "       bm25(entities_fts) AS rank "
            "FROM entities_fts f JOIN entities e ON e.id = f.id "
            "WHERE entities_fts MATCH ? ORDER BY rank LIMIT ?",
            (fts, max(1, min(args.limit, 200))),
        ).fetchall()
    except sqlite3.OperationalError as e:
        _die(f"FTS not available: {e}")
    finally:
        cn.close()
    _emit({"ok": True, "count": len(rows), "rows": [dict(r) for r in rows]})


# ----------------------------------------------------------------------------
# relation ops
# ----------------------------------------------------------------------------

def cmd_rel_create(args) -> None:
    new_id = f"rel_{_now_ms()}"
    ts = _now_iso()
    name = args.name or "RELATED_TO"
    _log("rel_create",
         f"from={args.from_id} to={args.to_id} name={name} id={new_id}")
    cn = _connect(_ops_db())
    try:
        # verifica que existen los endpoints
        for entity_id in (args.from_id, args.to_id):
            r = cn.execute(
                "SELECT 1 FROM entities WHERE id = ?", (entity_id,)
            ).fetchone()
            if r is None:
                _log("rel_create", f"FAILED entity not found: {entity_id}")
                _die(f"entity not found: {entity_id}", code=2)
        cn.execute(
            "INSERT INTO relations (id, name, from_entity, to_entity, "
            "created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
            (new_id, name, args.from_id, args.to_id, ts, ts),
        )
        cn.commit()
    finally:
        cn.close()
    _bump_counter("rel.create", new_id)
    _ok(id=new_id, name=name, from_id=args.from_id, to_id=args.to_id)


def cmd_rel_delete(args) -> None:
    cn = _connect(_ops_db())
    try:
        cur = cn.execute("DELETE FROM relations WHERE id = ?", (args.id,))
        cn.commit()
        if cur.rowcount == 0:
            _die(f"relation not found: {args.id}", code=2)
    finally:
        cn.close()
    _bump_counter("rel.delete", args.id)
    _ok(id=args.id)


def cmd_rel_list(args) -> None:
    sql = ("SELECT id, name, from_entity, to_entity, status FROM relations")
    where, params = [], []
    if args.from_id:
        where.append("from_entity = ?")
        params.append(args.from_id)
    if args.to_id:
        where.append("to_entity = ?")
        params.append(args.to_id)
    if where:
        sql += " WHERE " + " AND ".join(where)
    sql += " ORDER BY name LIMIT ?"
    params.append(max(1, min(args.limit, 2000)))
    cn = _connect(_ops_db(), readonly=True)
    rows = [dict(r) for r in cn.execute(sql, params).fetchall()]
    cn.close()
    _emit({"ok": True, "count": len(rows), "rows": rows})


# ----------------------------------------------------------------------------
# table ops (Table-typed entities backed by DuckDB)
# ----------------------------------------------------------------------------

def cmd_table_list(args) -> None:
    cn = _connect(_ops_db(), readonly=True)
    rows = cn.execute(
        "SELECT id, name, metadata FROM entities WHERE type_ref = 'Table' "
        "ORDER BY name"
    ).fetchall()
    cn.close()
    out = []
    for r in rows:
        m = {}
        try:
            m = json.loads(r["metadata"] or "{}")
        except json.JSONDecodeError:
            pass
        out.append({
            "id": r["id"], "name": r["name"],
            "duckdb_path": m.get("duckdb_path", ""),
            "table_name": m.get("table_name", ""),
            "row_type": m.get("row_type", ""),
            "columns": m.get("columns", []),
        })
    _emit({"ok": True, "count": len(out), "tables": out})


def cmd_table_promote(args) -> None:
    """Promociona una fila DuckDB a entidad. Replica node_groups_promote_row."""
    cn = _connect(_ops_db())
    try:
        row = cn.execute(
            "SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'",
            (args.table_id,),
        ).fetchone()
        if row is None:
            _die(f"Table not found: {args.table_id}", code=2)
        meta = json.loads(row["metadata"] or "{}")
        duckdb_path = meta.get("duckdb_path", "")
        table_name = meta.get("table_name", "")
        row_type = meta.get("row_type") or "row"
        label_column = meta.get("label_column") or "name"
        id_column = meta.get("id_column") or "id"
        if not duckdb_path or not table_name:
            _die(f"Table {args.table_id} has no duckdb_path/table_name")

        # resolver path relativo respecto al dir de operations.db
        if not Path(duckdb_path).is_absolute():
            duckdb_path = str(Path(_ops_db()).parent / duckdb_path)

        # idempotencia: existe ya entidad con metadata.source.row_id?
        prom_id = f"prom_{row_type}_{args.row_id}".replace(" ", "_")
        existing = cn.execute(
            "SELECT id FROM entities WHERE id = ?", (prom_id,)
        ).fetchone()
        if existing:
            _ok(id=existing["id"], promoted=False, message="already promoted")
            return

        # leer la fila desde DuckDB
        try:
            import duckdb  # type: ignore
        except ImportError:
            _die("duckdb python module not installed")
        ddb = duckdb.connect(duckdb_path, read_only=True)
        cur = ddb.execute(
            f'SELECT * FROM "{table_name}" WHERE CAST("{id_column}" AS VARCHAR) = ?',
            [args.row_id],
        )
        cols = [d[0] for d in cur.description]
        vals = cur.fetchone()
        ddb.close()
        if vals is None:
            _die(f"row not found: {args.row_id}")
        row_dict = dict(zip(cols, vals))

        label = str(row_dict.get(label_column, args.row_id))
        ts = _now_iso()
        meta_out = {
            "source": {"duckdb": meta.get("duckdb_path"),
                       "table": table_name,
                       "row_id": args.row_id},
            **{k: (v if isinstance(v, (str, int, float, bool)) or v is None
                   else str(v)) for k, v in row_dict.items()},
        }
        cn.execute(
            "INSERT INTO entities (id, name, type_ref, source, metadata, "
            "created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
            (prom_id, label, row_type, "agent:gx-cli",
             json.dumps(meta_out, default=str), ts, ts),
        )
        # CONTAINS_ROW relacion (idempotente)
        rel_id = f"rel_contains_{prom_id}"
        cn.execute(
            "INSERT OR IGNORE INTO relations (id, name, from_entity, to_entity, "
            "created_at, updated_at) VALUES (?, 'CONTAINS_ROW', ?, ?, ?, ?)",
            (rel_id, args.table_id, prom_id, ts, ts),
        )
        cn.commit()
    finally:
        cn.close()
    _bump_counter("table.promote", prom_id)
    _ok(id=prom_id, promoted=True)


def cmd_table_demote(args) -> None:
    cn = _connect(_ops_db())
    try:
        cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
        cn.execute(
            "DELETE FROM relations WHERE from_entity = ? OR to_entity = ?",
            (args.id, args.id),
        )
        cn.commit()
        if cur.rowcount == 0:
            _die(f"entity not found: {args.id}", code=2)
    finally:
        cn.close()
    _bump_counter("table.demote", args.id)
    _ok(id=args.id)


def cmd_table_page(args) -> None:
    cn = _connect(_ops_db(), readonly=True)
    row = cn.execute(
        "SELECT metadata FROM entities WHERE id = ? AND type_ref = 'Table'",
        (args.table_id,),
    ).fetchone()
    cn.close()
    if row is None:
        _die(f"Table not found: {args.table_id}", code=2)
    meta = json.loads(row["metadata"] or "{}")
    duckdb_path = meta.get("duckdb_path", "")
    table_name = meta.get("table_name", "")
    if not duckdb_path or not table_name:
        _die("metadata sin duckdb_path/table_name")
    if not Path(duckdb_path).is_absolute():
        duckdb_path = str(Path(_ops_db()).parent / duckdb_path)
    cols = meta.get("columns") or []
    id_col = meta.get("id_column") or "id"
    try:
        import duckdb  # type: ignore
    except ImportError:
        _die("duckdb python module not installed")
    ddb = duckdb.connect(duckdb_path, read_only=True)
    select = f'"{id_col}"' + (
        ", " + ", ".join(f'"{c}"' for c in cols) if cols else ""
    )
    sql = f'SELECT {select} FROM "{table_name}" ORDER BY "{id_col}" LIMIT ? OFFSET ?'
    cur = ddb.execute(sql, [max(1, min(args.limit, 200)), max(0, args.offset)])
    names = [d[0] for d in cur.description]
    rows = [dict(zip(names, [str(v) if v is not None else None for v in r]))
            for r in cur.fetchall()]
    total = ddb.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0]
    ddb.close()
    _emit({"ok": True, "table": args.table_id, "total": total,
           "offset": args.offset, "limit": args.limit, "rows": rows})


# ----------------------------------------------------------------------------
# enricher ops
# ----------------------------------------------------------------------------

def _app_dir() -> str:
    p = os.environ.get("GX_APP_DIR", "")
    if p and Path(p).exists():
        return p
    # fallback: dir donde vive este script
    return str(Path(__file__).resolve().parent)


def _enrichers_dir() -> Path:
    return Path(_app_dir()) / "enrichers"


def cmd_enricher_list(args) -> None:
    out = []
    edir = _enrichers_dir()
    if not edir.is_dir():
        _emit({"ok": True, "count": 0, "enrichers": []})
        return
    for sub in sorted(edir.iterdir()):
        manifest = sub / "manifest.yaml"
        if not manifest.is_file():
            continue
        m = _parse_yaml_minimal(manifest.read_text(encoding="utf-8"))
        applies_to = m.get("applies_to", []) or []
        if args.type and applies_to and args.type not in applies_to:
            continue
        out.append({
            "id": m.get("id", sub.name),
            "name": m.get("name", sub.name),
            "description": m.get("description", ""),
            "applies_to": applies_to,
        })
    _emit({"ok": True, "count": len(out), "enrichers": out})


def _parse_yaml_minimal(text: str) -> dict:
    """Mini parser: top-level scalars + listas inline. Suficiente para los
    manifests de enrichers que ya tenemos. No es YAML completo."""
    out: dict = {}
    for raw in text.splitlines():
        line = raw.split("#", 1)[0].rstrip()
        if not line or not line[0].isalpha():
            continue
        if ":" not in line:
            continue
        k, _, v = line.partition(":")
        k = k.strip()
        v = v.strip()
        if v.startswith("[") and v.endswith("]"):
            inner = v[1:-1].strip()
            out[k] = [x.strip().strip('"').strip("'")
                      for x in inner.split(",") if x.strip()]
        elif v.startswith('"') and v.endswith('"'):
            out[k] = v[1:-1]
        elif v.startswith("'") and v.endswith("'"):
            out[k] = v[1:-1]
        else:
            out[k] = v
    return out


def cmd_enricher_run(args) -> None:
    """Encola un job en `<app_dir>/agent_jobs_queue/<req_id>.json`.

    main.cpp escanea ese directorio cada frame, lee cada JSON, somete
    via jobs_submit y borra el fichero. Usamos directorio de ficheros
    en lugar de tabla SQLite por la misma razon que el marker
    `.mutations.marker`: graph_explorer.db esta abierta en WAL desde
    el lado Windows, y gx-cli escribiendo via /mnt/c (9p) hace que el
    mmap del .shm falle silenciosamente -> disk I/O error.
    """
    edir = _enrichers_dir()
    if not (edir / args.enricher / "manifest.yaml").is_file():
        _die(f"enricher not found: {args.enricher}")
    if args.node:
        cn = _connect(_ops_db(), readonly=True)
        r = cn.execute(
            "SELECT id, name FROM entities WHERE id = ?", (args.node,)
        ).fetchone()
        cn.close()
        if r is None:
            _die(f"node not found: {args.node}", code=2)
        node_name = r["name"]
    else:
        node_name = ""

    req_id = f"areq_{_now_ms()}"
    payload = {
        "id":          req_id,
        "enricher_id": args.enricher,
        "node_id":     args.node or "",
        "node_name":   node_name,
        "params_json": args.params or "{}",
        "created_at":  _now_ms(),
    }

    # IMPORTANTE: el queue_dir debe coincidir con el que escanea main.cpp.
    # main.cpp usa `parent(g_layout_db_path) / "agent_jobs_queue"`, asi
    # que aqui derivamos del path de GX_APP_DB tambien — NO de GX_APP_DIR
    # (que apunta al repo fuente). Si los dos no coinciden, gx-cli
    # escribe en un sitio y main.cpp escanea otro -> jobs huerfanos.
    app_db_path = os.environ.get("GX_APP_DB", "")
    if not app_db_path:
        _die("GX_APP_DB env var is empty")
    queue_dir = Path(app_db_path).parent / "agent_jobs_queue"
    sys.stderr.write(f"[gx-cli enricher_run] queue_dir={queue_dir}\n")
    try:
        queue_dir.mkdir(parents=True, exist_ok=True)
        # Atomic write: tmp + rename. main.cpp nunca lee un JSON a medias
        # porque el rename es atomico en NTFS y en 9p.
        tmp = queue_dir / f"{req_id}.json.tmp"
        final = queue_dir / f"{req_id}.json"
        tmp.write_text(json.dumps(payload, ensure_ascii=False),
                       encoding="utf-8")
        os.replace(tmp, final)
        sys.stderr.write(
            f"[gx-cli enricher_run] wrote {final} "
            f"(enricher={args.enricher} node={args.node or ''} "
            f"req={req_id})\n"
        )
    except OSError as e:
        sys.stderr.write(
            f"[gx-cli enricher_run] FAILED to write queue file: {e}\n"
        )
        _die(f"could not enqueue: {e}")
    _ok(request_id=req_id, enricher=args.enricher, node=args.node or "",
        queue_file=str(final),
        message="job encolado, lo recoge el panel Jobs")


# ----------------------------------------------------------------------------
# query (read-only SELECT)
# ----------------------------------------------------------------------------

def cmd_query(args) -> None:
    sql = (args.sql or "").strip()
    low = sql.lower().lstrip("(").lstrip()
    if not (low.startswith("select") or low.startswith("with")):
        _die("only SELECT/WITH queries allowed")
    forbidden = ("attach", "detach", "pragma writable", "vacuum")
    if any(f in low for f in forbidden):
        _die(f"forbidden token in query")
    cn = _connect(_ops_db(), readonly=True)
    try:
        cur = cn.execute(sql)
        names = [d[0] for d in cur.description] if cur.description else []
        rows = [dict(zip(names, [str(v) if v is not None else None for v in r]))
                for r in cur.fetchmany(max(1, min(args.limit, 500)))]
    except sqlite3.Error as e:
        cn.close()
        _die(f"sql error: {e}")
    cn.close()
    _emit({"ok": True, "count": len(rows), "columns": names, "rows": rows})


# ----------------------------------------------------------------------------
# info
# ----------------------------------------------------------------------------

def cmd_info(args) -> None:
    info = {
        "ok": True,
        "ops_db": os.environ.get("GX_OPS_DB", ""),
        "app_db": os.environ.get("GX_APP_DB", ""),
        "app_dir": _app_dir(),
    }
    try:
        cn = _connect(_ops_db(), readonly=True)
        info["entity_count"] = cn.execute(
            "SELECT COUNT(*) FROM entities").fetchone()[0]
        info["relation_count"] = cn.execute(
            "SELECT COUNT(*) FROM relations").fetchone()[0]
        info["types"] = [r[0] for r in cn.execute(
            "SELECT DISTINCT type_ref FROM entities ORDER BY type_ref"
        ).fetchall()]
        cn.close()
    except sqlite3.Error as e:
        info["error"] = str(e)
    _emit(info)


# ----------------------------------------------------------------------------
# MCP server (JSON-RPC 2.0 stdio)
#
# Cuando se invoca como `gx-cli mcp-server`, el script entra en bucle
# leyendo lineas JSON-RPC en stdin y emitiendo respuestas en stdout. Los
# logs van a stderr (claude los recoge en su propio log).
#
# Reutilizamos los `cmd_*` capturando stdout: cada cmd_* emite UN unico JSON
# con _emit, asi que captura + json.loads = resultado de la tool. Mas simple
# que reescribir 17 funciones.
# ----------------------------------------------------------------------------

MCP_TOOLS = [
    {"name": "info",
     "description": "Resumen del operations.db: contadores de entidades y relaciones, tipos distintos. Llamar primero al empezar para tener contexto.",
     "inputSchema": {"type": "object", "properties": {}}},
    {"name": "node_list",
     "description": "Lista entidades del grafo. Filtros opcionales por type_ref y status.",
     "inputSchema": {"type": "object", "properties": {
        "type": {"type": "string", "description": "Filtra por type_ref exacto"},
        "status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
        "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 1000}}}},
    {"name": "node_show",
     "description": "Devuelve la entidad completa (metadata, tags, descripcion, status) y todos sus vecinos 1-hop con direccion (in/out).",
     "inputSchema": {"type": "object", "properties": {
        "id": {"type": "string", "description": "ID de la entidad"}},
        "required": ["id"]}},
    {"name": "node_search",
     "description": "Busqueda FTS5 sobre name, description, tags. Tokeniza por whitespace y aplica busqueda por prefijo. Devuelve hits ordenados por relevancia.",
     "inputSchema": {"type": "object", "properties": {
        "query": {"type": "string"},
        "limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
        "required": ["query"]}},
    {"name": "node_create",
     "description": "Crea una entidad nueva. Si type se omite, se infiere heuristicamente del name (email/url/domain/ip/phone/text). Pasa `notes` para inicializar el panel Note del nodo (es lo que leen los enrichers split_sentences y extract_iocs_text como fuente de texto).",
     "inputSchema": {"type": "object", "properties": {
        "name": {"type": "string"},
        "type": {"type": "string", "description": "Opcional. Auto-detectado si se omite."},
        "description": {"type": "string"},
        "notes": {"type": "string", "description": "Texto largo libre del nodo (panel Note)."}},
        "required": ["name"]}},
    {"name": "node_update",
     "description": "Modifica campos de una entidad existente. Al menos un campo aparte de id debe pasarse. `notes` reemplaza el contenido completo del panel Note; combinalo con `append_notes=true` para anyadir al final preservando lo existente.",
     "inputSchema": {"type": "object", "properties": {
        "id": {"type": "string"},
        "name": {"type": "string"},
        "type": {"type": "string"},
        "status": {"type": "string", "enum": ["active", "stale", "corrupted", "archived"]},
        "description": {"type": "string"},
        "notes": {"type": "string", "description": "Texto del panel Note. Reemplaza el contenido salvo que append_notes=true."},
        "append_notes": {"type": "boolean", "default": False,
                          "description": "Si true, anyade `notes` al final con doble newline en vez de reemplazar."},
        "tags": {"type": "string", "description": "JSON array literal o CSV 'a,b,c'"}},
        "required": ["id"]}},
    {"name": "node_delete",
     "description": "Borra la entidad y todas sus relaciones. Irreversible. Confirmar con el usuario antes.",
     "inputSchema": {"type": "object", "properties": {
        "id": {"type": "string"}}, "required": ["id"]}},
    {"name": "rel_create",
     "description": "Crea una relacion dirigida from_id -> to_id. Ambas entidades deben existir.",
     "inputSchema": {"type": "object", "properties": {
        "from_id": {"type": "string"},
        "to_id": {"type": "string"},
        "name": {"type": "string", "default": "RELATED_TO",
                  "description": "Nombre semantico ej: KNOWS, OWNS, BELONGS_TO"}},
        "required": ["from_id", "to_id"]}},
    {"name": "rel_delete",
     "description": "Borra una relacion por id.",
     "inputSchema": {"type": "object", "properties": {
        "id": {"type": "string"}}, "required": ["id"]}},
    {"name": "rel_list",
     "description": "Lista relaciones, opcionalmente filtradas por endpoint.",
     "inputSchema": {"type": "object", "properties": {
        "from_id": {"type": "string"},
        "to_id": {"type": "string"},
        "limit": {"type": "integer", "default": 200, "minimum": 1, "maximum": 2000}}}},
    {"name": "table_list",
     "description": "Lista entidades de tipo Table (datasets DuckDB respaldando filas tabulares).",
     "inputSchema": {"type": "object", "properties": {}}},
    {"name": "table_page",
     "description": "Lee una pagina de filas de la tabla DuckDB asociada a un nodo Table.",
     "inputSchema": {"type": "object", "properties": {
        "table_id": {"type": "string"},
        "offset": {"type": "integer", "default": 0, "minimum": 0},
        "limit": {"type": "integer", "default": 50, "minimum": 1, "maximum": 200}},
        "required": ["table_id"]}},
    {"name": "table_promote",
     "description": "Promociona una fila DuckDB a entidad del grafo (idempotente: si ya existe, devuelve la misma).",
     "inputSchema": {"type": "object", "properties": {
        "table_id": {"type": "string"},
        "row_id": {"type": "string"}},
        "required": ["table_id", "row_id"]}},
    {"name": "table_demote",
     "description": "Borra la entidad promovida. La fila DuckDB queda intacta.",
     "inputSchema": {"type": "object", "properties": {
        "id": {"type": "string"}}, "required": ["id"]}},
    {"name": "enricher_list",
     "description": "Lista enrichers cargados. Si se pasa type, filtra por applies_to.",
     "inputSchema": {"type": "object", "properties": {
        "type": {"type": "string"}}}},
    {"name": "enricher_run",
     "description": "Encola un enricher para correr async sobre un nodo. graph_explorer tomara el job en su pool de workers; el viewport se refresca al terminar.",
     "inputSchema": {"type": "object", "properties": {
        "enricher": {"type": "string", "description": "ID del enricher (ej: fetch_webpage)"},
        "node": {"type": "string", "description": "ID del nodo objetivo. Opcional para enrichers globales."},
        "params": {"type": "string", "description": "JSON object stringified con params. Default '{}'."}},
        "required": ["enricher"]}},
    {"name": "query",
     "description": "Ejecuta SQL read-only (SELECT/WITH) sobre operations.db. Util para queries no cubiertas por las otras tools (agregaciones, joins).",
     "inputSchema": {"type": "object", "properties": {
        "sql": {"type": "string"},
        "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 500}},
        "required": ["sql"]}},
]


def _mcp_call_cmd(fn, args_dict: dict) -> dict:
    """Llama un cmd_* capturando su stdout y devolviendo el JSON parseado.
    Si el cmd hace sys.exit(1) (via _die), recoge el JSON de error igualmente."""
    args = SimpleNamespace(**args_dict)
    buf = StringIO()
    try:
        with redirect_stdout(buf):
            fn(args)
    except SystemExit:
        pass
    out = buf.getvalue().strip()
    if not out:
        return {"ok": False, "error": "command produced no output"}
    try:
        return json.loads(out)
    except json.JSONDecodeError as e:
        return {"ok": False, "error": f"invalid JSON output: {e}",
                 "raw": out[:500]}


# Mapa: nombre de tool MCP -> (cmd_func, default_kwargs)
MCP_DISPATCH = {
    "info":          (cmd_info,          {}),
    "node_list":     (cmd_node_list,     {"type": None, "status": None, "limit": 100}),
    "node_show":     (cmd_node_show,     {}),
    "node_search":   (cmd_node_search,   {"limit": 50}),
    "node_create":   (cmd_node_create,   {"type": None, "description": None,
                                           "notes": None}),
    "node_update":   (cmd_node_update,   {"name": None, "type": None,
                                           "status": None, "description": None,
                                           "notes": None, "append_notes": False,
                                           "tags": None}),
    "node_delete":   (cmd_node_delete,   {}),
    "rel_create":    (cmd_rel_create,    {"name": None}),
    "rel_delete":    (cmd_rel_delete,    {}),
    "rel_list":      (cmd_rel_list,      {"from_id": None, "to_id": None,
                                           "limit": 200}),
    "table_list":    (cmd_table_list,    {}),
    "table_page":    (cmd_table_page,    {"offset": 0, "limit": 50}),
    "table_promote": (cmd_table_promote, {}),
    "table_demote":  (cmd_table_demote,  {}),
    "enricher_list": (cmd_enricher_list, {"type": None}),
    "enricher_run":  (cmd_enricher_run,  {"node": None, "params": None}),
    "query":         (cmd_query,         {"limit": 100}),
}


def _mcp_dispatch(tool_name: str, args: dict) -> dict:
    if tool_name not in MCP_DISPATCH:
        return {"ok": False, "error": f"unknown tool: {tool_name}"}
    fn, defaults = MCP_DISPATCH[tool_name]
    merged = dict(defaults)
    merged.update(args or {})
    return _mcp_call_cmd(fn, merged)


def _mcp_log(msg: str) -> None:
    line = f"[gx-cli mcp] {msg}\n"
    sys.stderr.write(line)
    sys.stderr.flush()
    # Persistir tambien a gx-cli.log junto a app_db para auditoria.
    try:
        app_db = os.environ.get("GX_APP_DB", "")
        if app_db:
            with open(Path(app_db).parent / "gx-cli.log", "a",
                       encoding="utf-8") as f:
                f.write(f"{_now_iso()} {line}")
    except OSError:
        pass


def cmd_mcp_server(_args) -> None:
    """Bucle JSON-RPC 2.0 stdio. Reads line-delimited JSON, writes responses."""
    _mcp_log("server starting (pid=" + str(os.getpid()) + ")")
    _mcp_log(f"GX_OPS_DB={os.environ.get('GX_OPS_DB', '')}")
    _mcp_log(f"GX_APP_DB={os.environ.get('GX_APP_DB', '')}")
    _mcp_log(f"GX_APP_DIR={os.environ.get('GX_APP_DIR', '')}")

    def emit(obj: dict) -> None:
        sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
        sys.stdout.flush()

    for raw in sys.stdin:
        line = raw.strip()
        if not line:
            continue
        try:
            req = json.loads(line)
        except json.JSONDecodeError as e:
            _mcp_log(f"bad json: {e}")
            continue

        method = req.get("method", "")
        rpc_id = req.get("id")
        params = req.get("params") or {}
        _mcp_log(f"<- method={method} id={rpc_id}")

        if method == "initialize":
            emit({"jsonrpc": "2.0", "id": rpc_id, "result": {
                "protocolVersion": "2024-11-05",
                "capabilities": {"tools": {"listChanged": False}},
                "serverInfo": {"name": "graph_explorer", "version": "0.1.0"},
            }})

        elif method == "notifications/initialized":
            # Notification — no response.
            pass

        elif method == "tools/list":
            emit({"jsonrpc": "2.0", "id": rpc_id,
                   "result": {"tools": MCP_TOOLS}})

        elif method == "tools/call":
            tool_name = params.get("name", "")
            tool_args = params.get("arguments", {}) or {}
            _mcp_log(f"   call {tool_name}({tool_args})")
            try:
                result = _mcp_dispatch(tool_name, tool_args)
                payload = json.dumps(result, ensure_ascii=False)
                is_err = not result.get("ok", True)
                emit({"jsonrpc": "2.0", "id": rpc_id, "result": {
                    "content": [{"type": "text", "text": payload}],
                    "isError": is_err,
                }})
            except Exception as e:
                _mcp_log(f"   exception: {e}")
                emit({"jsonrpc": "2.0", "id": rpc_id, "error": {
                    "code": -32603, "message": str(e),
                }})

        elif method == "ping":
            emit({"jsonrpc": "2.0", "id": rpc_id, "result": {}})

        elif method.startswith("notifications/"):
            # Otras notificaciones — ignorar.
            pass

        else:
            if rpc_id is not None:
                emit({"jsonrpc": "2.0", "id": rpc_id, "error": {
                    "code": -32601, "message": f"method not found: {method}",
                }})

    _mcp_log("stdin closed, exiting")


# ----------------------------------------------------------------------------
# argparse wiring
# ----------------------------------------------------------------------------

def main() -> None:
    p = argparse.ArgumentParser(prog="gx-cli")
    sub = p.add_subparsers(dest="cmd", required=True)

    # info
    sp = sub.add_parser("info", help="Show ops_db summary")
    sp.set_defaults(fn=cmd_info)

    # node
    n = sub.add_parser("node").add_subparsers(dest="op", required=True)
    sp = n.add_parser("create")
    sp.add_argument("--name", required=True)
    sp.add_argument("--type")
    sp.add_argument("--description")
    sp.add_argument("--notes",
                    help="texto libre del nodo (panel Note del Inspector). "
                         "Es lo que leen split_sentences y extract_iocs_text.")
    sp.set_defaults(fn=cmd_node_create)
    sp = n.add_parser("delete")
    sp.add_argument("id")
    sp.set_defaults(fn=cmd_node_delete)
    sp = n.add_parser("update")
    sp.add_argument("id")
    sp.add_argument("--name")
    sp.add_argument("--type")
    sp.add_argument("--status")
    sp.add_argument("--description")
    sp.add_argument("--notes",
                    help="reemplaza el contenido de notes (panel Note). "
                         "Combinable con --append-notes para acumular.")
    sp.add_argument("--append-notes", action="store_true",
                    help="anade --notes al final de las notas existentes "
                         "en vez de reemplazarlas (separador: doble newline).")
    sp.add_argument("--tags",
                    help='JSON array o "tag1,tag2" CSV')
    sp.set_defaults(fn=cmd_node_update)
    sp = n.add_parser("list")
    sp.add_argument("--type")
    sp.add_argument("--status")
    sp.add_argument("--limit", type=int, default=100)
    sp.set_defaults(fn=cmd_node_list)
    sp = n.add_parser("show")
    sp.add_argument("id")
    sp.set_defaults(fn=cmd_node_show)
    sp = n.add_parser("search")
    sp.add_argument("query")
    sp.add_argument("--limit", type=int, default=50)
    sp.set_defaults(fn=cmd_node_search)

    # relation
    r = sub.add_parser("rel").add_subparsers(dest="op", required=True)
    sp = r.add_parser("create")
    sp.add_argument("--from", dest="from_id", required=True)
    sp.add_argument("--to", dest="to_id", required=True)
    sp.add_argument("--name")
    sp.set_defaults(fn=cmd_rel_create)
    sp = r.add_parser("delete")
    sp.add_argument("id")
    sp.set_defaults(fn=cmd_rel_delete)
    sp = r.add_parser("list")
    sp.add_argument("--from", dest="from_id")
    sp.add_argument("--to", dest="to_id")
    sp.add_argument("--limit", type=int, default=200)
    sp.set_defaults(fn=cmd_rel_list)

    # table
    t = sub.add_parser("table").add_subparsers(dest="op", required=True)
    sp = t.add_parser("list")
    sp.set_defaults(fn=cmd_table_list)
    sp = t.add_parser("promote")
    sp.add_argument("table_id")
    sp.add_argument("row_id")
    sp.set_defaults(fn=cmd_table_promote)
    sp = t.add_parser("demote")
    sp.add_argument("id")
    sp.set_defaults(fn=cmd_table_demote)
    sp = t.add_parser("page")
    sp.add_argument("table_id")
    sp.add_argument("--offset", type=int, default=0)
    sp.add_argument("--limit", type=int, default=50)
    sp.set_defaults(fn=cmd_table_page)

    # enricher
    e = sub.add_parser("enricher").add_subparsers(dest="op", required=True)
    sp = e.add_parser("list")
    sp.add_argument("--type")
    sp.set_defaults(fn=cmd_enricher_list)
    sp = e.add_parser("run")
    sp.add_argument("enricher")
    sp.add_argument("--node")
    sp.add_argument("--params", help='JSON, default "{}"')
    sp.set_defaults(fn=cmd_enricher_run)

    # query
    sp = sub.add_parser("query")
    sp.add_argument("sql")
    sp.add_argument("--limit", type=int, default=100)
    sp.set_defaults(fn=cmd_query)

    # mcp-server (JSON-RPC 2.0 stdio)
    sp = sub.add_parser("mcp-server",
        help="Run as MCP server reading JSON-RPC from stdin")
    sp.set_defaults(fn=cmd_mcp_server)

    args = p.parse_args()
    args.fn(args)


if __name__ == "__main__":
    main()
