fix(agent_jobs): queue dir desde GX_APP_DB, no GX_APP_DIR + logs verbosos

Bug derivado del fix anterior: gx-cli escribia ficheros JSON en
`$GX_APP_DIR/agent_jobs_queue/` (apuntando al repo fuente) mientras
main.cpp escaneaba `parent(g_layout_db_path)/agent_jobs_queue/`
(install Windows). Dos directorios distintos -> jobs huerfanos.

Echo reportaba "encolado" pero el worker nunca veia los ficheros.
La causa: chat.cpp setea GX_APP_DIR=<registry>/projects/osint_graph/
apps/graph_explorer y GX_APP_DB=<install>/local_files/projects/<slug>/
graph_explorer.db. Dos sitios. Solo APP_DB coincide con donde
graph_explorer.exe escanea (parent del .db).

Fix:

* gx-cli cmd_enricher_run: queue_dir = parent(GX_APP_DB) /
  agent_jobs_queue. Alineado con main.cpp.
* gx-cli: nuevo helper `_log(tag, msg)` que escribe a stderr Y a
  `<parent(app_db)>/gx-cli.log` para auditoria persistente. Cubre
  node_create, node_update, node_delete, rel_create, enricher_run.
* gx-cli mcp _mcp_log tambien persiste a gx-cli.log.
* main.cpp: log el queue scan dir una vez por sesion para detectar
  mismatches a futuro.
* .gitignore: agent_jobs_queue/ y gx-cli.log son runtime, no se
  commitean.

Tests:

* test_enricher_run_queue_dir_derives_from_app_db (regresion)
  configura GX_APP_DB en un dir distinto de GX_APP_DIR y verifica
  que el JSON aterriza junto a APP_DB.
* test_enricher_run_writes_log_to_gx_cli_log valida la auditoria.

WSL 81 / Windows 70 + 11 skipped.
This commit is contained in:
2026-05-03 16:32:22 +02:00
parent b67c44e8f9
commit 652ff6f02c
4 changed files with 128 additions and 5 deletions
+59 -5
View File
@@ -59,6 +59,24 @@ def _app_db() -> str:
return p
def _log(tag: str, msg: str) -> None:
"""Log a stderr y al fichero gx-cli.log junto a app_db (mismo dir
que chat.log y .mutations.marker). El fichero permite auditar lo
que el agente Echo hace cuando algo va mal — `_emit` solo va al
stdout de la herramienta y se pierde en pipelines MCP."""
line = f"[gx-cli {tag}] {msg}\n"
sys.stderr.write(line)
sys.stderr.flush()
try:
app_db = os.environ.get("GX_APP_DB", "")
if app_db:
log_path = Path(app_db).parent / "gx-cli.log"
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{_now_iso()} {line}")
except OSError:
pass
def _emit(payload: dict) -> None:
print(json.dumps(payload, ensure_ascii=False, default=str))
@@ -138,6 +156,8 @@ def cmd_node_create(args) -> None:
description = args.description or ""
notes = args.notes or ""
_log("node_create",
f"name={name!r} type={type_ref} notes_len={len(notes)} id={new_id}")
cn = _connect(_ops_db())
try:
cn.execute(
@@ -148,6 +168,7 @@ def cmd_node_create(args) -> None:
)
cn.commit()
except sqlite3.IntegrityError as e:
_log("node_create", f"FAILED insert: {e}")
_die(f"insert failed: {e}")
finally:
cn.close()
@@ -157,6 +178,7 @@ def cmd_node_create(args) -> None:
def cmd_node_delete(args) -> None:
_log("node_delete", f"id={args.id}")
cn = _connect(_ops_db())
try:
cur = cn.execute("DELETE FROM entities WHERE id = ?", (args.id,))
@@ -166,6 +188,7 @@ def cmd_node_delete(args) -> None:
)
cn.commit()
if cur.rowcount == 0:
_log("node_delete", f"FAILED not found: {args.id}")
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
@@ -214,6 +237,8 @@ def cmd_node_update(args) -> None:
params.append(_now_iso())
params.append(args.id)
_log("node_update",
f"id={args.id} fields={[s.split(' = ')[0] for s in sets[:-1]]}")
cn = _connect(_ops_db())
try:
cur = cn.execute(
@@ -221,6 +246,7 @@ def cmd_node_update(args) -> None:
)
cn.commit()
if cur.rowcount == 0:
_log("node_update", f"FAILED not found: {args.id}")
_die(f"entity not found: {args.id}", code=2)
finally:
cn.close()
@@ -318,6 +344,8 @@ def cmd_rel_create(args) -> None:
new_id = f"rel_{_now_ms()}"
ts = _now_iso()
name = args.name or "RELATED_TO"
_log("rel_create",
f"from={args.from_id} to={args.to_id} name={name} id={new_id}")
cn = _connect(_ops_db())
try:
# verifica que existen los endpoints
@@ -326,6 +354,7 @@ def cmd_rel_create(args) -> None:
"SELECT 1 FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if r is None:
_log("rel_create", f"FAILED entity not found: {entity_id}")
_die(f"entity not found: {entity_id}", code=2)
cn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
@@ -631,10 +660,16 @@ def cmd_enricher_run(args) -> None:
"created_at": _now_ms(),
}
app_dir = os.environ.get("GX_APP_DIR", "")
if not app_dir:
_die("GX_APP_DIR env var is empty")
queue_dir = Path(app_dir) / "agent_jobs_queue"
# IMPORTANTE: el queue_dir debe coincidir con el que escanea main.cpp.
# main.cpp usa `parent(g_layout_db_path) / "agent_jobs_queue"`, asi
# que aqui derivamos del path de GX_APP_DB tambien — NO de GX_APP_DIR
# (que apunta al repo fuente). Si los dos no coinciden, gx-cli
# escribe en un sitio y main.cpp escanea otro -> jobs huerfanos.
app_db_path = os.environ.get("GX_APP_DB", "")
if not app_db_path:
_die("GX_APP_DB env var is empty")
queue_dir = Path(app_db_path).parent / "agent_jobs_queue"
sys.stderr.write(f"[gx-cli enricher_run] queue_dir={queue_dir}\n")
try:
queue_dir.mkdir(parents=True, exist_ok=True)
# Atomic write: tmp + rename. main.cpp nunca lee un JSON a medias
@@ -644,9 +679,18 @@ def cmd_enricher_run(args) -> None:
tmp.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
os.replace(tmp, final)
sys.stderr.write(
f"[gx-cli enricher_run] wrote {final} "
f"(enricher={args.enricher} node={args.node or ''} "
f"req={req_id})\n"
)
except OSError as e:
sys.stderr.write(
f"[gx-cli enricher_run] FAILED to write queue file: {e}\n"
)
_die(f"could not enqueue: {e}")
_ok(request_id=req_id, enricher=args.enricher, node=args.node or "",
queue_file=str(final),
message="job encolado, lo recoge el panel Jobs")
@@ -874,8 +918,18 @@ def _mcp_dispatch(tool_name: str, args: dict) -> dict:
def _mcp_log(msg: str) -> None:
sys.stderr.write(f"[gx-cli mcp] {msg}\n")
line = f"[gx-cli mcp] {msg}\n"
sys.stderr.write(line)
sys.stderr.flush()
# Persistir tambien a gx-cli.log junto a app_db para auditoria.
try:
app_db = os.environ.get("GX_APP_DB", "")
if app_db:
with open(Path(app_db).parent / "gx-cli.log", "a",
encoding="utf-8") as f:
f.write(f"{_now_iso()} {line}")
except OSError:
pass
def cmd_mcp_server(_args) -> None: