diff --git a/gx-cli b/gx-cli index b4fe863..8bf7718 100755 --- a/gx-cli +++ b/gx-cli @@ -597,9 +597,15 @@ def _parse_yaml_minimal(text: str) -> dict: def cmd_enricher_run(args) -> None: - """Inserta un job en la cola agent_jobs. main.cpp lo recoge cada frame y - lo somete via jobs_submit (que arranca el subprocess). Asi reusamos el - pool de workers existente sin duplicar logica.""" + """Encola un job en `/agent_jobs_queue/.json`. + + main.cpp escanea ese directorio cada frame, lee cada JSON, somete + via jobs_submit y borra el fichero. Usamos directorio de ficheros + en lugar de tabla SQLite por la misma razon que el marker + `.mutations.marker`: graph_explorer.db esta abierta en WAL desde + el lado Windows, y gx-cli escribiendo via /mnt/c (9p) hace que el + mmap del .shm falle silenciosamente -> disk I/O error. + """ edir = _enrichers_dir() if not (edir / args.enricher / "manifest.yaml").is_file(): _die(f"enricher not found: {args.enricher}") @@ -616,26 +622,29 @@ def cmd_enricher_run(args) -> None: node_name = "" req_id = f"areq_{_now_ms()}" + payload = { + "id": req_id, + "enricher_id": args.enricher, + "node_id": args.node or "", + "node_name": node_name, + "params_json": args.params or "{}", + "created_at": _now_ms(), + } + + app_dir = os.environ.get("GX_APP_DIR", "") + if not app_dir: + _die("GX_APP_DIR env var is empty") + queue_dir = Path(app_dir) / "agent_jobs_queue" try: - cn = sqlite3.connect(_app_db()) - cn.execute( - "CREATE TABLE IF NOT EXISTS agent_jobs (" - " id TEXT PRIMARY KEY," - " enricher_id TEXT NOT NULL," - " node_id TEXT NOT NULL DEFAULT ''," - " node_name TEXT NOT NULL DEFAULT ''," - " params_json TEXT NOT NULL DEFAULT '{}'," - " created_at INTEGER NOT NULL)" - ) - cn.execute( - "INSERT INTO agent_jobs (id, enricher_id, node_id, node_name, " - "params_json, created_at) VALUES (?, ?, ?, ?, ?, ?)", - (req_id, args.enricher, args.node or "", node_name, - args.params or "{}", _now_ms()), - ) - cn.commit() - cn.close() - except sqlite3.Error as e: + queue_dir.mkdir(parents=True, exist_ok=True) + # Atomic write: tmp + rename. main.cpp nunca lee un JSON a medias + # porque el rename es atomico en NTFS y en 9p. + tmp = queue_dir / f"{req_id}.json.tmp" + final = queue_dir / f"{req_id}.json" + tmp.write_text(json.dumps(payload, ensure_ascii=False), + encoding="utf-8") + os.replace(tmp, final) + except OSError as e: _die(f"could not enqueue: {e}") _ok(request_id=req_id, enricher=args.enricher, node=args.node or "", message="job encolado, lo recoge el panel Jobs") diff --git a/main.cpp b/main.cpp index e907b5f..1dcbdcf 100644 --- a/main.cpp +++ b/main.cpp @@ -44,6 +44,8 @@ #include #include #include +#include +#include #include #include @@ -1297,55 +1299,93 @@ static void render() { } } - // Chat agent — drena cola agent_jobs (gx-cli enricher run) e invoca - // jobs_submit() para que el worker pool corriendo en C++ haga el trabajo. + // Chat agent — drena cola de jobs encolados por gx-cli (Echo agent). + // + // Antes esta cola era una tabla `agent_jobs` en graph_explorer.db, + // pero gx-cli corre dentro de WSL y graph_explorer.exe la tiene + // abierta con WAL desde Windows. SQLite WAL falla cross-9p (mmap del + // .shm) -> "disk I/O error" al hacer INSERT desde gx-cli. Igual que + // el contador de mutaciones, lo movimos a ficheros JSON sueltos en + // /agent_jobs_queue/. Cada fichero = 1 job. Aqui + // escaneamos el dir, cargamos cada JSON, llamamos jobs_submit, y + // borramos el fichero (atomico via rename desde gx-cli). if (!g_layout_db_path.empty()) { - sqlite3* adb = nullptr; - if (sqlite3_open_v2(g_layout_db_path.c_str(), &adb, - SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) { - sqlite3_stmt* st = nullptr; - if (sqlite3_prepare_v2(adb, - "SELECT id, enricher_id, node_id, node_name, params_json " - "FROM agent_jobs ORDER BY created_at LIMIT 8", - -1, &st, nullptr) == SQLITE_OK) { - std::vector ids_to_drop; - while (sqlite3_step(st) == SQLITE_ROW) { - const char* req_id = (const char*)sqlite3_column_text(st, 0); - const char* enr_id = (const char*)sqlite3_column_text(st, 1); - const char* node = (const char*)sqlite3_column_text(st, 2); - const char* nname = (const char*)sqlite3_column_text(st, 3); - const char* params = (const char*)sqlite3_column_text(st, 4); - char job_id[64]; - if (ge::jobs_submit(enr_id ? enr_id : "", - node ? node : "", - nname ? nname : "", - params ? params : "{}", - job_id, sizeof(job_id))) { - std::fprintf(stdout, - "[chat] queued enricher=%s node=%s as %s (req=%s)\n", - enr_id ? enr_id : "", node ? node : "", job_id, - req_id ? req_id : ""); - if (req_id) ids_to_drop.push_back(req_id); - g_app.panel_jobs = true; - } else { - std::fprintf(stderr, - "[chat] jobs_submit failed (req=%s enricher=%s)\n", - req_id ? req_id : "", enr_id ? enr_id : ""); - } + std::filesystem::path queue_dir = + std::filesystem::path(g_layout_db_path).parent_path() / + "agent_jobs_queue"; + std::error_code ec; + if (std::filesystem::is_directory(queue_dir, ec)) { + // Reusamos el sqlite ya en memoria solo para parsear JSON via + // json_extract (json1 esta enabled en el build). Sin WAL. + sqlite3* json_db = nullptr; + sqlite3_open(":memory:", &json_db); + sqlite3_stmt* parse = nullptr; + sqlite3_prepare_v2(json_db, + "SELECT json_extract(?,'$.id'), " + " json_extract(?,'$.enricher_id'), " + " json_extract(?,'$.node_id'), " + " json_extract(?,'$.node_name'), " + " json_extract(?,'$.params_json')", + -1, &parse, nullptr); + + int n_processed = 0; + for (auto& ent : std::filesystem::directory_iterator(queue_dir, ec)) { + if (n_processed >= 8) break; // throttle por frame + if (!ent.is_regular_file()) continue; + auto path = ent.path(); + if (path.extension() != ".json") continue; + + // Leer contenido. + std::ifstream f(path, std::ios::binary); + if (!f) continue; + std::string body((std::istreambuf_iterator(f)), + std::istreambuf_iterator()); + f.close(); + + // Parsear via json_extract (5 binds del mismo body). + sqlite3_reset(parse); + for (int i = 1; i <= 5; ++i) { + sqlite3_bind_text(parse, i, body.c_str(), -1, + SQLITE_TRANSIENT); } - sqlite3_finalize(st); - for (auto& id : ids_to_drop) { - sqlite3_stmt* d = nullptr; - if (sqlite3_prepare_v2(adb, - "DELETE FROM agent_jobs WHERE id = ?", - -1, &d, nullptr) == SQLITE_OK) { - sqlite3_bind_text(d, 1, id.c_str(), -1, SQLITE_TRANSIENT); - sqlite3_step(d); - sqlite3_finalize(d); - } + if (sqlite3_step(parse) != SQLITE_ROW) { + std::fprintf(stderr, + "[chat] queue file %s: json_extract failed\n", + path.string().c_str()); + std::filesystem::remove(path, ec); + continue; } + auto col_str = [&](int i) -> std::string { + const unsigned char* t = sqlite3_column_text(parse, i); + return t ? (const char*)t : ""; + }; + std::string req_id = col_str(0); + std::string enr_id = col_str(1); + std::string node = col_str(2); + std::string nname = col_str(3); + std::string params = col_str(4); + if (params.empty()) params = "{}"; + + char job_id[64]; + if (ge::jobs_submit(enr_id.c_str(), node.c_str(), + nname.c_str(), params.c_str(), + job_id, sizeof(job_id))) { + std::fprintf(stdout, + "[chat] queued enricher=%s node=%s as %s (req=%s)\n", + enr_id.c_str(), node.c_str(), job_id, req_id.c_str()); + g_app.panel_jobs = true; + } else { + std::fprintf(stderr, + "[chat] jobs_submit failed (req=%s enricher=%s)\n", + req_id.c_str(), enr_id.c_str()); + } + // Borrar el fichero independientemente de exito de submit: + // si jobs_submit fallo, reintenrar produciria duplicados. + std::filesystem::remove(path, ec); + ++n_processed; } - sqlite3_close(adb); + if (parse) sqlite3_finalize(parse); + if (json_db) sqlite3_close(json_db); } } diff --git a/tests/test_gx_cli.py b/tests/test_gx_cli.py index faf52ea..dd5c210 100644 --- a/tests/test_gx_cli.py +++ b/tests/test_gx_cli.py @@ -316,6 +316,86 @@ class TestCliRelations: assert rows == [] +class TestCliEnricherRun: + """`enricher run` encola un job. Debe escribir un fichero JSON en + `/agent_jobs_queue/` (NO insertar en SQL). Esto blinda + contra la regresion del bug "disk I/O error" que aparecia cuando + gx-cli (en WSL) escribia agent_jobs en graph_explorer.db abierta + en WAL desde Windows.""" + + def _make_enricher(self, env_dirs, eid: str = "split_sentences"): + """Crea un manifest dummy en el dir de enrichers para que la + validacion de gx-cli (`if manifest.yaml exists`) pase.""" + edir = env_dirs["dir"] / "enrichers" / eid + edir.mkdir(parents=True, exist_ok=True) + (edir / "manifest.yaml").write_text( + f"id: {eid}\nname: x\napplies_to: [text]\n", encoding="utf-8" + ) + + def test_enricher_run_writes_json_file(self, env_dirs): + self._make_enricher(env_dirs, "split_sentences") + node = run_gx(env_dirs, "node", "create", "--name", "doc", + "--type", "text") + out = run_gx(env_dirs, "enricher", "run", "split_sentences", + "--node", node["id"]) + # 1 fichero JSON dropped en agent_jobs_queue/ + queue = env_dirs["dir"] / "agent_jobs_queue" + files = list(queue.glob("*.json")) + assert len(files) == 1 + payload = json.loads(files[0].read_text(encoding="utf-8")) + assert payload["enricher_id"] == "split_sentences" + assert payload["node_id"] == node["id"] + assert payload["id"] == out["request_id"] + assert "params_json" in payload + # Naming: el fichero se llama .json, no .tmp + assert files[0].name == f"{out['request_id']}.json" + + def test_enricher_run_does_not_use_sqlite(self, env_dirs): + """Regresion del bug WAL cross-9p: el flujo NO debe abrir ni + crear la tabla agent_jobs en graph_explorer.db.""" + self._make_enricher(env_dirs, "split_sentences") + node = run_gx(env_dirs, "node", "create", "--name", "doc", + "--type", "text") + run_gx(env_dirs, "enricher", "run", "split_sentences", + "--node", node["id"]) + # graph_explorer.db NO debe haber recibido tabla agent_jobs. + cn = sqlite3.connect(env_dirs["app"]) + try: + row = cn.execute( + "SELECT name FROM sqlite_master " + "WHERE type='table' AND name='agent_jobs'" + ).fetchone() + finally: + cn.close() + assert row is None, \ + "agent_jobs table re-creada en graph_explorer.db; el queue " \ + "debe vivir en ficheros, NO en SQLite (cross-9p WAL falla)" + + def test_enricher_run_unknown_enricher_errors(self, env_dirs): + out = run_gx(env_dirs, "enricher", "run", "no_existe", + expect_ok=False) + assert out.get("ok") is False + assert "not found" in (out.get("error") or "").lower() + + def test_enricher_run_unknown_node_errors(self, env_dirs): + self._make_enricher(env_dirs, "split_sentences") + out = run_gx(env_dirs, "enricher", "run", "split_sentences", + "--node", "node_inexistente", expect_ok=False) + assert out.get("ok") is False + assert "node not found" in (out.get("error") or "").lower() + + def test_enricher_run_atomic_write(self, env_dirs): + """No deben quedar ficheros .tmp tras un encolado exitoso.""" + self._make_enricher(env_dirs, "split_sentences") + node = run_gx(env_dirs, "node", "create", "--name", "doc", + "--type", "text") + run_gx(env_dirs, "enricher", "run", "split_sentences", + "--node", node["id"]) + queue = env_dirs["dir"] / "agent_jobs_queue" + tmp_files = list(queue.glob("*.tmp")) + assert tmp_files == [] + + class TestCliQuery: def test_query_select(self, env_dirs): run_gx(env_dirs, "node", "create", "--name", "q1", "--type", "text")