fix(agent_jobs): mover cola de SQLite a ficheros JSON (cross-9p safe)
Bug: Echo (gx-cli en WSL) recibia "disk I/O error" al INSERT en la tabla `agent_jobs` de graph_explorer.db. Causa: graph_explorer.exe mantiene esa BD abierta con journal_mode=WAL desde Windows, y SQLite WAL exige mmap del .shm compartido entre procesos. Cuando un escritor accede via /mnt/c (9p) y el otro nativo NTFS, ese mmap falla. El proyecto ya habia resuelto este patron antes: el contador de mutaciones (.mutations.marker) usa fichero plano en vez de SQL por exactamente la misma razon. agent_jobs era la unica cola que se quedo en SQLite — momento de aplicar el mismo fix. Cambios: * gx-cli cmd_enricher_run: en lugar de INSERT, escribe `<app_dir>/agent_jobs_queue/<req_id>.json` con el payload del job. Atomic write (tmp + rename, atomico tanto en NTFS como en 9p). * main.cpp polling: en lugar de SELECT/DELETE sobre agent_jobs, escanea ese directorio cada frame, lee cada JSON via json_extract (sqlite3 in-memory, sin tocar archivos en disco), llama jobs_submit, y borra el fichero. Throttle a 8 jobs por frame igual que antes. * main.cpp: anyade <filesystem> y <fstream>. * tests/test_gx_cli.py: 5 tests nuevos en TestCliEnricherRun: - escribe fichero JSON con req_id como nombre - NO crea tabla agent_jobs en graph_explorer.db (regresion) - errores claros si enricher o nodo no existen - no quedan .tmp tras encolado exitoso WSL 79 / Windows 68 + 11 skipped.
This commit is contained in:
@@ -597,9 +597,15 @@ def _parse_yaml_minimal(text: str) -> dict:
|
||||
|
||||
|
||||
def cmd_enricher_run(args) -> None:
|
||||
"""Inserta un job en la cola agent_jobs. main.cpp lo recoge cada frame y
|
||||
lo somete via jobs_submit (que arranca el subprocess). Asi reusamos el
|
||||
pool de workers existente sin duplicar logica."""
|
||||
"""Encola un job en `<app_dir>/agent_jobs_queue/<req_id>.json`.
|
||||
|
||||
main.cpp escanea ese directorio cada frame, lee cada JSON, somete
|
||||
via jobs_submit y borra el fichero. Usamos directorio de ficheros
|
||||
en lugar de tabla SQLite por la misma razon que el marker
|
||||
`.mutations.marker`: graph_explorer.db esta abierta en WAL desde
|
||||
el lado Windows, y gx-cli escribiendo via /mnt/c (9p) hace que el
|
||||
mmap del .shm falle silenciosamente -> disk I/O error.
|
||||
"""
|
||||
edir = _enrichers_dir()
|
||||
if not (edir / args.enricher / "manifest.yaml").is_file():
|
||||
_die(f"enricher not found: {args.enricher}")
|
||||
@@ -616,26 +622,29 @@ def cmd_enricher_run(args) -> None:
|
||||
node_name = ""
|
||||
|
||||
req_id = f"areq_{_now_ms()}"
|
||||
payload = {
|
||||
"id": req_id,
|
||||
"enricher_id": args.enricher,
|
||||
"node_id": args.node or "",
|
||||
"node_name": node_name,
|
||||
"params_json": args.params or "{}",
|
||||
"created_at": _now_ms(),
|
||||
}
|
||||
|
||||
app_dir = os.environ.get("GX_APP_DIR", "")
|
||||
if not app_dir:
|
||||
_die("GX_APP_DIR env var is empty")
|
||||
queue_dir = Path(app_dir) / "agent_jobs_queue"
|
||||
try:
|
||||
cn = sqlite3.connect(_app_db())
|
||||
cn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS agent_jobs ("
|
||||
" id TEXT PRIMARY KEY,"
|
||||
" enricher_id TEXT NOT NULL,"
|
||||
" node_id TEXT NOT NULL DEFAULT '',"
|
||||
" node_name TEXT NOT NULL DEFAULT '',"
|
||||
" params_json TEXT NOT NULL DEFAULT '{}',"
|
||||
" created_at INTEGER NOT NULL)"
|
||||
)
|
||||
cn.execute(
|
||||
"INSERT INTO agent_jobs (id, enricher_id, node_id, node_name, "
|
||||
"params_json, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(req_id, args.enricher, args.node or "", node_name,
|
||||
args.params or "{}", _now_ms()),
|
||||
)
|
||||
cn.commit()
|
||||
cn.close()
|
||||
except sqlite3.Error as e:
|
||||
queue_dir.mkdir(parents=True, exist_ok=True)
|
||||
# Atomic write: tmp + rename. main.cpp nunca lee un JSON a medias
|
||||
# porque el rename es atomico en NTFS y en 9p.
|
||||
tmp = queue_dir / f"{req_id}.json.tmp"
|
||||
final = queue_dir / f"{req_id}.json"
|
||||
tmp.write_text(json.dumps(payload, ensure_ascii=False),
|
||||
encoding="utf-8")
|
||||
os.replace(tmp, final)
|
||||
except OSError as e:
|
||||
_die(f"could not enqueue: {e}")
|
||||
_ok(request_id=req_id, enricher=args.enricher, node=args.node or "",
|
||||
message="job encolado, lo recoge el panel Jobs")
|
||||
|
||||
@@ -44,6 +44,8 @@
|
||||
#include <string>
|
||||
#include <sys/stat.h>
|
||||
#include <algorithm>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
@@ -1297,55 +1299,93 @@ static void render() {
|
||||
}
|
||||
}
|
||||
|
||||
// Chat agent — drena cola agent_jobs (gx-cli enricher run) e invoca
|
||||
// jobs_submit() para que el worker pool corriendo en C++ haga el trabajo.
|
||||
// Chat agent — drena cola de jobs encolados por gx-cli (Echo agent).
|
||||
//
|
||||
// Antes esta cola era una tabla `agent_jobs` en graph_explorer.db,
|
||||
// pero gx-cli corre dentro de WSL y graph_explorer.exe la tiene
|
||||
// abierta con WAL desde Windows. SQLite WAL falla cross-9p (mmap del
|
||||
// .shm) -> "disk I/O error" al hacer INSERT desde gx-cli. Igual que
|
||||
// el contador de mutaciones, lo movimos a ficheros JSON sueltos en
|
||||
// <project_dir>/agent_jobs_queue/. Cada fichero = 1 job. Aqui
|
||||
// escaneamos el dir, cargamos cada JSON, llamamos jobs_submit, y
|
||||
// borramos el fichero (atomico via rename desde gx-cli).
|
||||
if (!g_layout_db_path.empty()) {
|
||||
sqlite3* adb = nullptr;
|
||||
if (sqlite3_open_v2(g_layout_db_path.c_str(), &adb,
|
||||
SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) {
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(adb,
|
||||
"SELECT id, enricher_id, node_id, node_name, params_json "
|
||||
"FROM agent_jobs ORDER BY created_at LIMIT 8",
|
||||
-1, &st, nullptr) == SQLITE_OK) {
|
||||
std::vector<std::string> ids_to_drop;
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const char* req_id = (const char*)sqlite3_column_text(st, 0);
|
||||
const char* enr_id = (const char*)sqlite3_column_text(st, 1);
|
||||
const char* node = (const char*)sqlite3_column_text(st, 2);
|
||||
const char* nname = (const char*)sqlite3_column_text(st, 3);
|
||||
const char* params = (const char*)sqlite3_column_text(st, 4);
|
||||
char job_id[64];
|
||||
if (ge::jobs_submit(enr_id ? enr_id : "",
|
||||
node ? node : "",
|
||||
nname ? nname : "",
|
||||
params ? params : "{}",
|
||||
job_id, sizeof(job_id))) {
|
||||
std::fprintf(stdout,
|
||||
"[chat] queued enricher=%s node=%s as %s (req=%s)\n",
|
||||
enr_id ? enr_id : "", node ? node : "", job_id,
|
||||
req_id ? req_id : "");
|
||||
if (req_id) ids_to_drop.push_back(req_id);
|
||||
g_app.panel_jobs = true;
|
||||
} else {
|
||||
std::fprintf(stderr,
|
||||
"[chat] jobs_submit failed (req=%s enricher=%s)\n",
|
||||
req_id ? req_id : "", enr_id ? enr_id : "");
|
||||
}
|
||||
std::filesystem::path queue_dir =
|
||||
std::filesystem::path(g_layout_db_path).parent_path() /
|
||||
"agent_jobs_queue";
|
||||
std::error_code ec;
|
||||
if (std::filesystem::is_directory(queue_dir, ec)) {
|
||||
// Reusamos el sqlite ya en memoria solo para parsear JSON via
|
||||
// json_extract (json1 esta enabled en el build). Sin WAL.
|
||||
sqlite3* json_db = nullptr;
|
||||
sqlite3_open(":memory:", &json_db);
|
||||
sqlite3_stmt* parse = nullptr;
|
||||
sqlite3_prepare_v2(json_db,
|
||||
"SELECT json_extract(?,'$.id'), "
|
||||
" json_extract(?,'$.enricher_id'), "
|
||||
" json_extract(?,'$.node_id'), "
|
||||
" json_extract(?,'$.node_name'), "
|
||||
" json_extract(?,'$.params_json')",
|
||||
-1, &parse, nullptr);
|
||||
|
||||
int n_processed = 0;
|
||||
for (auto& ent : std::filesystem::directory_iterator(queue_dir, ec)) {
|
||||
if (n_processed >= 8) break; // throttle por frame
|
||||
if (!ent.is_regular_file()) continue;
|
||||
auto path = ent.path();
|
||||
if (path.extension() != ".json") continue;
|
||||
|
||||
// Leer contenido.
|
||||
std::ifstream f(path, std::ios::binary);
|
||||
if (!f) continue;
|
||||
std::string body((std::istreambuf_iterator<char>(f)),
|
||||
std::istreambuf_iterator<char>());
|
||||
f.close();
|
||||
|
||||
// Parsear via json_extract (5 binds del mismo body).
|
||||
sqlite3_reset(parse);
|
||||
for (int i = 1; i <= 5; ++i) {
|
||||
sqlite3_bind_text(parse, i, body.c_str(), -1,
|
||||
SQLITE_TRANSIENT);
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
for (auto& id : ids_to_drop) {
|
||||
sqlite3_stmt* d = nullptr;
|
||||
if (sqlite3_prepare_v2(adb,
|
||||
"DELETE FROM agent_jobs WHERE id = ?",
|
||||
-1, &d, nullptr) == SQLITE_OK) {
|
||||
sqlite3_bind_text(d, 1, id.c_str(), -1, SQLITE_TRANSIENT);
|
||||
sqlite3_step(d);
|
||||
sqlite3_finalize(d);
|
||||
}
|
||||
if (sqlite3_step(parse) != SQLITE_ROW) {
|
||||
std::fprintf(stderr,
|
||||
"[chat] queue file %s: json_extract failed\n",
|
||||
path.string().c_str());
|
||||
std::filesystem::remove(path, ec);
|
||||
continue;
|
||||
}
|
||||
auto col_str = [&](int i) -> std::string {
|
||||
const unsigned char* t = sqlite3_column_text(parse, i);
|
||||
return t ? (const char*)t : "";
|
||||
};
|
||||
std::string req_id = col_str(0);
|
||||
std::string enr_id = col_str(1);
|
||||
std::string node = col_str(2);
|
||||
std::string nname = col_str(3);
|
||||
std::string params = col_str(4);
|
||||
if (params.empty()) params = "{}";
|
||||
|
||||
char job_id[64];
|
||||
if (ge::jobs_submit(enr_id.c_str(), node.c_str(),
|
||||
nname.c_str(), params.c_str(),
|
||||
job_id, sizeof(job_id))) {
|
||||
std::fprintf(stdout,
|
||||
"[chat] queued enricher=%s node=%s as %s (req=%s)\n",
|
||||
enr_id.c_str(), node.c_str(), job_id, req_id.c_str());
|
||||
g_app.panel_jobs = true;
|
||||
} else {
|
||||
std::fprintf(stderr,
|
||||
"[chat] jobs_submit failed (req=%s enricher=%s)\n",
|
||||
req_id.c_str(), enr_id.c_str());
|
||||
}
|
||||
// Borrar el fichero independientemente de exito de submit:
|
||||
// si jobs_submit fallo, reintenrar produciria duplicados.
|
||||
std::filesystem::remove(path, ec);
|
||||
++n_processed;
|
||||
}
|
||||
sqlite3_close(adb);
|
||||
if (parse) sqlite3_finalize(parse);
|
||||
if (json_db) sqlite3_close(json_db);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -316,6 +316,86 @@ class TestCliRelations:
|
||||
assert rows == []
|
||||
|
||||
|
||||
class TestCliEnricherRun:
|
||||
"""`enricher run` encola un job. Debe escribir un fichero JSON en
|
||||
`<app_dir>/agent_jobs_queue/` (NO insertar en SQL). Esto blinda
|
||||
contra la regresion del bug "disk I/O error" que aparecia cuando
|
||||
gx-cli (en WSL) escribia agent_jobs en graph_explorer.db abierta
|
||||
en WAL desde Windows."""
|
||||
|
||||
def _make_enricher(self, env_dirs, eid: str = "split_sentences"):
|
||||
"""Crea un manifest dummy en el dir de enrichers para que la
|
||||
validacion de gx-cli (`if manifest.yaml exists`) pase."""
|
||||
edir = env_dirs["dir"] / "enrichers" / eid
|
||||
edir.mkdir(parents=True, exist_ok=True)
|
||||
(edir / "manifest.yaml").write_text(
|
||||
f"id: {eid}\nname: x\napplies_to: [text]\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
def test_enricher_run_writes_json_file(self, env_dirs):
|
||||
self._make_enricher(env_dirs, "split_sentences")
|
||||
node = run_gx(env_dirs, "node", "create", "--name", "doc",
|
||||
"--type", "text")
|
||||
out = run_gx(env_dirs, "enricher", "run", "split_sentences",
|
||||
"--node", node["id"])
|
||||
# 1 fichero JSON dropped en agent_jobs_queue/
|
||||
queue = env_dirs["dir"] / "agent_jobs_queue"
|
||||
files = list(queue.glob("*.json"))
|
||||
assert len(files) == 1
|
||||
payload = json.loads(files[0].read_text(encoding="utf-8"))
|
||||
assert payload["enricher_id"] == "split_sentences"
|
||||
assert payload["node_id"] == node["id"]
|
||||
assert payload["id"] == out["request_id"]
|
||||
assert "params_json" in payload
|
||||
# Naming: el fichero se llama <req_id>.json, no .tmp
|
||||
assert files[0].name == f"{out['request_id']}.json"
|
||||
|
||||
def test_enricher_run_does_not_use_sqlite(self, env_dirs):
|
||||
"""Regresion del bug WAL cross-9p: el flujo NO debe abrir ni
|
||||
crear la tabla agent_jobs en graph_explorer.db."""
|
||||
self._make_enricher(env_dirs, "split_sentences")
|
||||
node = run_gx(env_dirs, "node", "create", "--name", "doc",
|
||||
"--type", "text")
|
||||
run_gx(env_dirs, "enricher", "run", "split_sentences",
|
||||
"--node", node["id"])
|
||||
# graph_explorer.db NO debe haber recibido tabla agent_jobs.
|
||||
cn = sqlite3.connect(env_dirs["app"])
|
||||
try:
|
||||
row = cn.execute(
|
||||
"SELECT name FROM sqlite_master "
|
||||
"WHERE type='table' AND name='agent_jobs'"
|
||||
).fetchone()
|
||||
finally:
|
||||
cn.close()
|
||||
assert row is None, \
|
||||
"agent_jobs table re-creada en graph_explorer.db; el queue " \
|
||||
"debe vivir en ficheros, NO en SQLite (cross-9p WAL falla)"
|
||||
|
||||
def test_enricher_run_unknown_enricher_errors(self, env_dirs):
|
||||
out = run_gx(env_dirs, "enricher", "run", "no_existe",
|
||||
expect_ok=False)
|
||||
assert out.get("ok") is False
|
||||
assert "not found" in (out.get("error") or "").lower()
|
||||
|
||||
def test_enricher_run_unknown_node_errors(self, env_dirs):
|
||||
self._make_enricher(env_dirs, "split_sentences")
|
||||
out = run_gx(env_dirs, "enricher", "run", "split_sentences",
|
||||
"--node", "node_inexistente", expect_ok=False)
|
||||
assert out.get("ok") is False
|
||||
assert "node not found" in (out.get("error") or "").lower()
|
||||
|
||||
def test_enricher_run_atomic_write(self, env_dirs):
|
||||
"""No deben quedar ficheros .tmp tras un encolado exitoso."""
|
||||
self._make_enricher(env_dirs, "split_sentences")
|
||||
node = run_gx(env_dirs, "node", "create", "--name", "doc",
|
||||
"--type", "text")
|
||||
run_gx(env_dirs, "enricher", "run", "split_sentences",
|
||||
"--node", node["id"])
|
||||
queue = env_dirs["dir"] / "agent_jobs_queue"
|
||||
tmp_files = list(queue.glob("*.tmp"))
|
||||
assert tmp_files == []
|
||||
|
||||
|
||||
class TestCliQuery:
|
||||
def test_query_select(self, env_dirs):
|
||||
run_gx(env_dirs, "node", "create", "--name", "q1", "--type", "text")
|
||||
|
||||
Reference in New Issue
Block a user