From c3ce9956f7c481c2f75fbe7e61b235872e24e540 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:49:36 +0200 Subject: [PATCH] =?UTF-8?q?feat(jobs):=20implementacion=20Win32=20?= =?UTF-8?q?=E2=80=94=20wsl.exe=20+=20path=20translation=20(issue=200026)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sustituye el stub Windows por la implementacion real: C++: - Bloque #ifdef _WIN32 con CreateProcessW + 3 anonymous pipes (CreatePipe + SetHandleInformation), STARTF_USESTDHANDLES, CREATE_NO_WINDOW, ReadFile/WriteFile, WaitForSingleObject con polling para soportar cancelacion via TerminateProcess. - Helper to_wsl_path: convierte paths Windows a WSL antes de mandarlos al subprocess. Soporta: * "C:\\..." -> "/mnt/c/..." * "\\\\wsl.localhost\\\\..." -> "/..." * "\\\\wsl$\\\\..." -> "/..." * "/..." -> tal cual En POSIX la funcion es no-op. - build_stdin_json siempre normaliza ops_db_path/app_dir/cache_dir/ registry_root a paths WSL — el run.py corre dentro de WSL y solo entiende paths /home, /mnt, etc. - Subprocess invocacion: `wsl.exe --cd -- `. Asume que el usuario tiene WSL instalado y la distro Ubuntu (o ajusta FN_REGISTRY_ROOT al UNC adecuado). - kill_proc unificado: TerminateProcess en Win32, kill(SIGTERM) en POSIX. - JobControl con HANDLE+pid en Win32, pid_t en POSIX. main.cpp: - resolve_registry_root con fallback Windows: si FN_REGISTRY_ROOT env no esta y getcwd no encuentra registry.db (caso del .exe en Desktop), usa "\\\\wsl.localhost\\Ubuntu\\home\\lucas\\fn_registry". El usuario cambia el UNC via env var si su distro tiene otro nombre. Build: - cpp/build/windows/apps/graph_explorer/graph_explorer.exe linkea limpio contra MinGW; solo dependencias windows.h estandar (kernel32, etc.). - Linux smoke test sigue detectando los 4 enrichers tras la refactorizacion compartida. Notas operativas para el usuario Windows: - Ejecutar el .exe desde C:\\Users\\lucas\\Desktop\\apps\\graph_explorer\\ (doble clic). El primer arranque tarda ~1 s mas por cold-start de wsl.exe. - Si la distro no es Ubuntu, setear FN_REGISTRY_ROOT con el UNC correcto (ej. "\\\\wsl.localhost\\Debian\\home\\lucas\\fn_registry"). - Cancelar un job en Windows usa TerminateProcess (mas brutal que SIGTERM pero los run.py no tienen estado critico — sqlite3 rollback automatico por la transaccion implicita). Co-Authored-By: Claude Opus 4.7 (1M context) --- jobs.cpp | 501 +++++++++++++++++++++++++++++++++++++++---------------- main.cpp | 39 +++-- 2 files changed, 383 insertions(+), 157 deletions(-) diff --git a/jobs.cpp b/jobs.cpp index 4aecf59..8d97bb5 100644 --- a/jobs.cpp +++ b/jobs.cpp @@ -1,53 +1,10 @@ #include "jobs.h" -#ifdef _WIN32 -// ---------------------------------------------------------------------------- -// Windows stub (issue 0026): la implementacion real usa fork+exec+pipes POSIX. -// La version Windows debe escribirse con CreateProcess + anonymous pipes + -// ReadFile/WriteFile + TerminateProcess. Por ahora, en Windows el panel Jobs -// queda inactivo — el resto de la app funciona normal. TODO: implementar -// con la API Win32. -// ---------------------------------------------------------------------------- -#include - -namespace ge { - -bool jobs_init(const char*, const char*, const char*, const char*, const char*, int) { - std::fprintf(stderr, - "[jobs] Windows stub: enrichers no disponibles en esta build " - "(usa la build Linux/WSL para correr enrichers).\n"); - return false; -} - -void jobs_set_ops_db(const char*) {} - -bool jobs_submit(const char*, const char*, const char*, const char*, - char* out_id, size_t out_id_n) -{ - if (out_id && out_id_n > 0) out_id[0] = '\0'; - return false; -} - -bool jobs_cancel(const char*) { return false; } -bool jobs_delete(const char*) { return false; } -bool jobs_list (std::vector* out, int) { - if (out) out->clear(); - return true; -} -JobCounters jobs_counters() { return JobCounters{}; } -int jobs_dirty_counter() { return 0; } -void jobs_shutdown() {} - -} // namespace ge - -#else -// ---------------------------------------------------------------------------- -// POSIX (Linux/WSL/macOS): implementacion real con fork+exec+pipes. -// ---------------------------------------------------------------------------- - #include "../../../../cpp/vendor/sqlite3/sqlite3.h" +// Headers comunes a Win32 y POSIX. #include +#include #include #include #include @@ -55,20 +12,29 @@ void jobs_shutdown() {} #include #include #include -#include -#include +#include #include #include -#include #include #include -#include -#include #include -#include #include #include +#ifdef _WIN32 + #ifndef WIN32_LEAN_AND_MEAN + #define WIN32_LEAN_AND_MEAN + #endif + #include +#else + #include + #include + #include + #include + #include + #include +#endif + namespace ge { // ---------------------------------------------------------------------------- @@ -78,20 +44,25 @@ namespace ge { namespace { struct JobControl { - pid_t pid = -1; +#ifdef _WIN32 + HANDLE process = nullptr; // process handle (para TerminateProcess) + DWORD pid = 0; +#else + pid_t pid = -1; +#endif std::atomic cancel_requested{false}; }; struct State { std::string app_db_path; - std::string ops_db_path; // mutable: cambia con jobs_set_ops_db + std::string ops_db_path; std::string enrichers_dir; std::string app_dir; std::string registry_root; std::mutex q_mu; std::condition_variable q_cv; - std::queue pending; // job ids + std::queue pending; std::unordered_map> running; std::vector workers; @@ -101,15 +72,12 @@ struct State { State* g_state = nullptr; -// ---- helpers -------------------------------------------------------------- - long long now_ms() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()).count(); } std::string ulid() { - // ULID-ish: timestamp ms + 10 random hex chars. Suficiente para un PK. long long ts = now_ms(); static std::atomic ctr{(uint32_t)(ts & 0xFFFFFFFF)}; uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed); @@ -134,11 +102,7 @@ bool sql_run(sqlite3* db, const char* sql, const std::vector& params) { sqlite3_stmt* st = nullptr; - if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) { - std::fprintf(stderr, "[jobs] prepare failed: %s :: %s\n", - sqlite3_errmsg(db), sql); - return false; - } + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) return false; for (size_t i = 0; i < params.size(); ++i) { sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1, SQLITE_TRANSIENT); @@ -178,21 +142,16 @@ bool ensure_table(const char* db_path) { sql_exec_simple(db, "CREATE INDEX IF NOT EXISTS idx_jobs_status " "ON jobs(status, created_at);"); - - // Reaper: jobs `running` huerfanos de una sesion anterior. char ts[32]; std::snprintf(ts, sizeof(ts), "%lld", now_ms()); sql_run(db, "UPDATE jobs SET status='error', error='process died (app restart)', " "finished_at=? WHERE status='running'", {ts}); - sqlite3_close(db); return ok; } -// Escapa string para JSON. Simplificado: maneja comillas, backslash y -// caracteres de control basicos. std::string json_escape(const std::string& s) { std::string out; out.reserve(s.size() + 8); @@ -218,7 +177,62 @@ std::string json_escape(const std::string& s) { return out; } +// ---------------------------------------------------------------------------- +// Path normalization (Windows ↔ WSL) +// +// El binario Windows ejecuta los enrichers via wsl.exe → Python corre dentro +// del WSL. Los paths que viajan al subprocess deben estar en formato WSL: +// - "/home/..." se respeta tal cual. +// - "C:\\..." -> "/mnt/c/...". +// - "\\\\wsl.localhost\\\\path" -> "/path". +// - "\\\\wsl$\\\\path" -> "/path". +// En POSIX la funcion es no-op (devuelve la string igual). +// ---------------------------------------------------------------------------- +std::string to_wsl_path(const std::string& p) { +#ifndef _WIN32 + return p; +#else + if (p.empty()) return p; + if (p[0] == '/') return p; + auto is_sep = [](char c) { return c == '\\' || c == '/'; }; + + // UNC: \\\\... o ////... + if (p.size() >= 2 && is_sep(p[0]) && is_sep(p[1])) { + size_t i = 2; + while (i < p.size() && !is_sep(p[i])) ++i; + std::string server = p.substr(2, i - 2); + for (auto& c : server) c = (char)std::tolower((unsigned char)c); + if (server == "wsl.localhost" || server == "wsl$") { + // skip separator + distro + if (i < p.size()) ++i; // skip sep + while (i < p.size() && !is_sep(p[i])) ++i; // skip distro name + std::string rest = p.substr(i); + for (char& c : rest) if (c == '\\') c = '/'; + return rest.empty() ? std::string("/") : rest; + } + // UNC desconocido: convertir backslash a slash y devolverlo. + std::string out = p; + for (char& c : out) if (c == '\\') c = '/'; + return out; + } + + // Drive letter: "X:\\..." o "X:/..." + if (p.size() >= 3 && std::isalpha((unsigned char)p[0]) && p[1] == ':' && + is_sep(p[2])) { + std::string out = "/mnt/"; + out.push_back((char)std::tolower((unsigned char)p[0])); + for (size_t i = 2; i < p.size(); ++i) { + out.push_back(p[i] == '\\' ? '/' : p[i]); + } + return out; + } + return p; +#endif +} + // Lee un campo de la entidad como string. Devuelve "" si no existe. +// Importante: usa el path de operations.db tal y como lo recibimos (sin +// to_wsl) porque SQLite corre en el proceso C++, no dentro de WSL. std::string read_entity_field(const char* db_path, const char* id, const char* col) { @@ -244,8 +258,8 @@ std::string read_entity_field(const char* db_path, const char* id, return out; } -// Construye el JSON que se entrega al subprocess via stdin. Lee node de la -// operations.db actual. +// JSON entregado al subprocess. Todos los paths se normalizan a WSL en +// Windows; en POSIX los respeta tal cual. std::string build_stdin_json(const std::string& job_id, const std::string& enricher_id, const std::string& node_id, @@ -262,35 +276,28 @@ std::string build_stdin_json(const std::string& job_id, if (!m.empty()) node_metadata = m; } - std::string cache_dir = app_dir + "/cache"; + std::string ops_db_wsl = to_wsl_path(ops_db); + std::string app_dir_wsl = to_wsl_path(app_dir); + std::string root_wsl = to_wsl_path(registry_root); + std::string cache_dir = app_dir_wsl + "/cache"; std::ostringstream o; o << '{' - << "\"job_id\":\"" << json_escape(job_id) << "\"," - << "\"enricher_id\":\""<< json_escape(enricher_id) << "\"," - << "\"node_id\":\"" << json_escape(node_id) << "\"," - << "\"node_type\":\"" << json_escape(node_type) << "\"," - << "\"node_name\":\"" << json_escape(node_name) << "\"," - << "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << "," - << "\"params\":" << (params_json.empty() ? "{}" : params_json) << "," - << "\"ops_db_path\":\""<< json_escape(ops_db) << "\"," - << "\"app_dir\":\"" << json_escape(app_dir) << "\"," - << "\"cache_dir\":\"" << json_escape(cache_dir) << "\"," - << "\"registry_root\":\"" << json_escape(registry_root) << "\"" + << "\"job_id\":\"" << json_escape(job_id) << "\"," + << "\"enricher_id\":\"" << json_escape(enricher_id) << "\"," + << "\"node_id\":\"" << json_escape(node_id) << "\"," + << "\"node_type\":\"" << json_escape(node_type) << "\"," + << "\"node_name\":\"" << json_escape(node_name) << "\"," + << "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << "," + << "\"params\":" << (params_json.empty() ? "{}" : params_json) << "," + << "\"ops_db_path\":\"" << json_escape(ops_db_wsl) << "\"," + << "\"app_dir\":\"" << json_escape(app_dir_wsl) << "\"," + << "\"cache_dir\":\"" << json_escape(cache_dir) << "\"," + << "\"registry_root\":\"" << json_escape(root_wsl) << "\"" << '}'; return o.str(); } -// ---- subprocess (POSIX) --------------------------------------------------- - -struct ProcResult { - int exit_code = -1; - bool signaled = false; - int signal = 0; - std::string stdout_buf; - std::string stderr_tail; // ultimas lineas, para mensajes de error -}; - void update_progress(const std::string& job_id, double prog, const std::string& stage) { @@ -313,9 +320,59 @@ void update_progress(const std::string& job_id, double prog, sqlite3_close(db); } -// Spawnea python3 run.py. Pipes para stdin (write), stdout (read), -// stderr (read). Lee stdout entero al final; lee stderr line-by-line en un -// thread auxiliar parseando "PROGRESS: ". +// ---------------------------------------------------------------------------- +// Subprocess (POSIX y Win32) +// ---------------------------------------------------------------------------- + +struct ProcResult { + int exit_code = -1; + bool signaled = false; + int signal = 0; + std::string stdout_buf; + std::string stderr_tail; +}; + +// Parsea PROGRESS en una linea de stderr y, si aplica, actualiza la BD. +// stderr_tail crece con todo lo que NO sea PROGRESS, capado a 4 KB. +void process_stderr_line(const std::string& line, + const std::string& job_id, + std::string& stderr_tail, + std::mutex& tail_mu) +{ + if (line.rfind("PROGRESS:", 0) == 0) { + const char* p = line.c_str() + 9; + char* endp = nullptr; + double prog = std::strtod(p, &endp); + std::string stage; + if (endp && *endp) { + while (*endp == ' ') ++endp; + stage = endp; + } + update_progress(job_id, prog, stage); + } else { + std::lock_guard g(tail_mu); + stderr_tail += line; + stderr_tail += '\n'; + if (stderr_tail.size() > 4096) { + stderr_tail.erase(0, stderr_tail.size() - 4096); + } + } +} + +#ifdef _WIN32 + +// Construye command line para wsl.exe que ejecuta el enricher dentro de WSL. +// Usa --cd para asegurar el cwd. Los paths que se le pasan ya estan en +// formato WSL (run_path_wsl). Cita argumentos con espacios. +std::wstring utf8_to_wide(const std::string& s) { + if (s.empty()) return {}; + int n = MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), + nullptr, 0); + std::wstring out(n, 0); + MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), out.data(), n); + return out; +} + ProcResult run_subprocess(const std::string& job_id, const std::string& run_path, const std::string& stdin_payload, @@ -323,7 +380,192 @@ ProcResult run_subprocess(const std::string& job_id, { ProcResult out; - int p_in[2] = {-1, -1}; // padre escribe en p_in[1], hijo lee p_in[0] + SECURITY_ATTRIBUTES sa{}; + sa.nLength = sizeof(sa); + sa.bInheritHandle = TRUE; + sa.lpSecurityDescriptor = nullptr; + + HANDLE in_r = nullptr, in_w = nullptr; + HANDLE out_r = nullptr, out_w = nullptr; + HANDLE err_r = nullptr, err_w = nullptr; + + auto cleanup = [&]() { + for (HANDLE* h : {&in_r, &in_w, &out_r, &out_w, &err_r, &err_w}) { + if (*h) { CloseHandle(*h); *h = nullptr; } + } + }; + + if (!CreatePipe(&in_r, &in_w, &sa, 0) || + !CreatePipe(&out_r, &out_w, &sa, 0) || + !CreatePipe(&err_r, &err_w, &sa, 0)) + { + out.stderr_tail = "CreatePipe failed"; + cleanup(); + return out; + } + SetHandleInformation(in_w, HANDLE_FLAG_INHERIT, 0); + SetHandleInformation(out_r, HANDLE_FLAG_INHERIT, 0); + SetHandleInformation(err_r, HANDLE_FLAG_INHERIT, 0); + + // Convertir paths a WSL. + std::string run_wsl = to_wsl_path(run_path); + std::string root_wsl = to_wsl_path(g_state->registry_root); + std::string py_wsl = root_wsl + "/python/.venv/bin/python3"; + + // wsl.exe --cd -- + // Los argumentos van separados; wsl.exe interpreta bien rutas con espacios + // si se quotean. En nuestro caso no esperamos espacios. + std::wstring cmdline = L"wsl.exe --cd "; + cmdline += utf8_to_wide(root_wsl); + cmdline += L" -- "; + cmdline += utf8_to_wide(py_wsl); + cmdline += L" "; + cmdline += utf8_to_wide(run_wsl); + + std::vector cmdbuf(cmdline.begin(), cmdline.end()); + cmdbuf.push_back(0); + + STARTUPINFOW si{}; + si.cb = sizeof(si); + si.dwFlags = STARTF_USESTDHANDLES; + si.hStdInput = in_r; + si.hStdOutput = out_w; + si.hStdError = err_w; + + PROCESS_INFORMATION pi{}; + BOOL ok = CreateProcessW( + nullptr, + cmdbuf.data(), + nullptr, nullptr, + TRUE, // bInheritHandles + CREATE_NO_WINDOW, // sin ventana de consola + nullptr, nullptr, + &si, &pi); + + if (!ok) { + DWORD err = GetLastError(); + char buf[64]; + std::snprintf(buf, sizeof(buf), + "CreateProcessW failed (err=%lu, wsl.exe missing?)", + (unsigned long)err); + out.stderr_tail = buf; + cleanup(); + return out; + } + + // Cerrar handles que pertenecen al child en este lado. + CloseHandle(in_r); in_r = nullptr; + CloseHandle(out_w); out_w = nullptr; + CloseHandle(err_w); err_w = nullptr; + + ctrl->process = pi.hProcess; + ctrl->pid = pi.dwProcessId; + + // Persistir pid en BD. + { + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) { + sqlite3_stmt* st = nullptr; + if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1, + &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int (st, 1, (int)pi.dwProcessId); + sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + sqlite3_close(db); + } + } + + // Escribir stdin entero. + if (!stdin_payload.empty()) { + DWORD written = 0; + const char* p = stdin_payload.c_str(); + DWORD left = (DWORD)stdin_payload.size(); + while (left > 0) { + DWORD w = 0; + if (!WriteFile(in_w, p + written, left, &w, nullptr) || w == 0) break; + written += w; + left -= w; + } + } + CloseHandle(in_w); in_w = nullptr; + + // Thread aux para stderr. + std::string stderr_tail_local; + std::mutex tail_mu; + std::thread err_t([&]() { + std::string line; + char ch; + while (true) { + DWORD n = 0; + if (!ReadFile(err_r, &ch, 1, &n, nullptr) || n == 0) break; + if (ch == '\n') { + process_stderr_line(line, job_id, stderr_tail_local, tail_mu); + line.clear(); + } else if (ch != '\r') { + line.push_back(ch); + if (line.size() > 4096) line.clear(); + } + } + if (!line.empty()) { + process_stderr_line(line, job_id, stderr_tail_local, tail_mu); + } + }); + + // Leer stdout entero. + { + char buf[4096]; + while (true) { + DWORD n = 0; + if (!ReadFile(out_r, buf, sizeof(buf), &n, nullptr) || n == 0) break; + out.stdout_buf.append(buf, (size_t)n); + if (out.stdout_buf.size() > 1024 * 1024) break; + } + } + CloseHandle(out_r); out_r = nullptr; + + // Esperar al child con polling para soportar cancelacion. + while (true) { + DWORD wr = WaitForSingleObject(pi.hProcess, 100); + if (wr == WAIT_OBJECT_0) break; + if (ctrl->cancel_requested.load()) { + TerminateProcess(pi.hProcess, 1); + WaitForSingleObject(pi.hProcess, 5000); + break; + } + } + + DWORD exit_code = 0; + GetExitCodeProcess(pi.hProcess, &exit_code); + out.exit_code = (int)exit_code; + err_t.join(); + + CloseHandle(err_r); + CloseHandle(pi.hProcess); + CloseHandle(pi.hThread); + { + std::lock_guard g(tail_mu); + out.stderr_tail = std::move(stderr_tail_local); + } + return out; +} + +void kill_proc(JobControl& c) { + if (c.process) TerminateProcess(c.process, 1); +} + +#else // =========================== POSIX ================================= + +ProcResult run_subprocess(const std::string& job_id, + const std::string& run_path, + const std::string& stdin_payload, + std::shared_ptr ctrl) +{ + ProcResult out; + + int p_in[2] = {-1, -1}; int p_out[2] = {-1, -1}; int p_err[2] = {-1, -1}; if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) { @@ -341,15 +583,13 @@ ProcResult run_subprocess(const std::string& job_id, } if (pid == 0) { - // child dup2(p_in[0], 0); dup2(p_out[1], 1); dup2(p_err[1], 2); - close(p_in[0]); close(p_in[1]); + close(p_in[0]); close(p_in[1]); close(p_out[0]); close(p_out[1]); close(p_err[0]); close(p_err[1]); - // Resolver intérprete: /python/.venv/bin/python3 std::string py = g_state->registry_root + "/python/.venv/bin/python3"; const char* argv[] = { py.c_str(), run_path.c_str(), nullptr }; execv(py.c_str(), (char* const*)argv); @@ -357,13 +597,11 @@ ProcResult run_subprocess(const std::string& job_id, _exit(127); } - // parent ctrl->pid = pid; close(p_in[0]); close(p_out[1]); close(p_err[1]); - // Persistir pid en BD para mostrarlo en UI. { sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, @@ -380,7 +618,6 @@ ProcResult run_subprocess(const std::string& job_id, } } - // Escribir stdin entero. if (!stdin_payload.empty()) { ssize_t written = 0; const char* p = stdin_payload.c_str(); @@ -393,7 +630,6 @@ ProcResult run_subprocess(const std::string& job_id, } close(p_in[1]); - // Thread aux para stderr: parsea PROGRESS y guarda tail. std::string stderr_tail_local; std::mutex tail_mu; std::thread err_t([&]() { @@ -403,56 +639,33 @@ ProcResult run_subprocess(const std::string& job_id, ssize_t n = read(p_err[0], &ch, 1); if (n <= 0) break; if (ch == '\n') { - // Parse line. - if (line.rfind("PROGRESS:", 0) == 0) { - // PROGRESS: - const char* p = line.c_str() + 9; - char* endp = nullptr; - double prog = std::strtod(p, &endp); - std::string stage; - if (endp && *endp) { - while (*endp == ' ') ++endp; - stage = endp; - } - update_progress(job_id, prog, stage); - } else { - std::lock_guard g(tail_mu); - stderr_tail_local += line; - stderr_tail_local += '\n'; - // Cap a ~4 KB. - if (stderr_tail_local.size() > 4096) { - stderr_tail_local.erase(0, stderr_tail_local.size() - 4096); - } - } + process_stderr_line(line, job_id, stderr_tail_local, tail_mu); line.clear(); } else { line.push_back(ch); - if (line.size() > 4096) line.clear(); // proteccion + if (line.size() > 4096) line.clear(); } } + if (!line.empty()) { + process_stderr_line(line, job_id, stderr_tail_local, tail_mu); + } }); - // Leer stdout entero (sincrono). { char buf[4096]; while (true) { ssize_t n = read(p_out[0], buf, sizeof(buf)); if (n <= 0) break; out.stdout_buf.append(buf, (size_t)n); - if (out.stdout_buf.size() > 1024 * 1024) { - // 1 MB cap. - break; - } + if (out.stdout_buf.size() > 1024 * 1024) break; } } close(p_out[0]); - // Esperar al hijo. Si se pidio cancelar, mandamos SIGTERM y SIGKILL. int status = 0; while (true) { if (ctrl->cancel_requested.load() && pid > 0) { kill(pid, SIGTERM); - // pequena gracia, luego SIGKILL si hace falta for (int i = 0; i < 5; ++i) { pid_t r = waitpid(pid, &status, WNOHANG); if (r == pid) goto reaped; @@ -483,7 +696,15 @@ reaped: return out; } -// ---- worker --------------------------------------------------------------- +void kill_proc(JobControl& c) { + if (c.pid > 0) kill(c.pid, SIGTERM); +} + +#endif // _WIN32 + +// ---------------------------------------------------------------------------- +// Worker común +// ---------------------------------------------------------------------------- void persist_status(const std::string& job_id, const std::string& status, const std::string& result_json, @@ -525,10 +746,10 @@ void persist_status(const std::string& job_id, const std::string& status, sqlite3_close(db); } -// Lee la fila para reconstruir el contexto del job antes de spawn. struct JobContext { std::string id, enricher_id, node_id, node_name, params_json, status; }; + bool load_job(const std::string& id, JobContext* out) { sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, @@ -579,11 +800,9 @@ void worker_loop() { if (!load_job(job_id, &ctx)) continue; if (ctx.status == "cancelled") continue; - // Resolver run.py por convencion: //run.py. std::string run_path = g_state->enrichers_dir + "/" + ctx.enricher_id + "/run.py"; - // Marcar running. persist_status(job_id, "running", "", "", false); auto ctrl = std::make_shared(); @@ -592,7 +811,6 @@ void worker_loop() { g_state->running[job_id] = ctrl; } - // Construir stdin y ejecutar. std::string ops_db; { std::lock_guard lk(g_state->q_mu); @@ -604,10 +822,8 @@ void worker_loop() { ProcResult res = run_subprocess(job_id, run_path, stdin_payload, ctrl); - // Estado final. std::string final_status, error; std::string result_json = res.stdout_buf; - // Trim del result_json (saca trailing whitespace). while (!result_json.empty() && (result_json.back() == '\n' || result_json.back() == '\r' || result_json.back() == ' ' || result_json.back() == '\t')) { @@ -673,7 +889,6 @@ bool jobs_init(const char* app_db_path, g_state->app_dir = app_dir ? app_dir : ""; g_state->registry_root = registry_root ? registry_root : ""; - // Rehidratacion: jobs queued de sesiones anteriores se reencolan. { sqlite3* db = nullptr; if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY, @@ -759,10 +974,9 @@ bool jobs_cancel(const char* job_id) { } if (ctrl) { ctrl->cancel_requested.store(true); - if (ctrl->pid > 0) kill(ctrl->pid, SIGTERM); + kill_proc(*ctrl); return true; } - // No corriendo: marcar cancelled si esta queued. sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { @@ -888,12 +1102,11 @@ int jobs_dirty_counter() { void jobs_shutdown() { if (!g_state) return; g_state->stop_flag.store(true); - // Cancelar todos los running. { std::lock_guard lk(g_state->q_mu); for (auto& kv : g_state->running) { kv.second->cancel_requested.store(true); - if (kv.second->pid > 0) kill(kv.second->pid, SIGTERM); + kill_proc(*kv.second); } } g_state->q_cv.notify_all(); @@ -905,5 +1118,3 @@ void jobs_shutdown() { } } // namespace ge - -#endif // _WIN32 diff --git a/main.cpp b/main.cpp index 652a2be..c5f47f4 100644 --- a/main.cpp +++ b/main.cpp @@ -253,25 +253,40 @@ static bool load_input(bool first_load = true); // ---------------------------------------------------------------------------- // Devuelve el path absoluto al root de fn_registry. Estrategia: -// 1) FN_REGISTRY_ROOT env var -// 2) Sube desde getcwd() buscando un dir con `registry.db` -// 3) "" si no se encuentra +// 1) FN_REGISTRY_ROOT env var (acepta path Linux o UNC Windows +// `\\\\wsl.localhost\\Ubuntu\\home\\...`). +// 2) Sube desde getcwd() buscando un dir con `registry.db`. +// 3) En Windows, fallback al UNC default `\\\\wsl.localhost\\Ubuntu\\home\\ +// lucas\\fn_registry` (la build se distribuye al desktop fuera del +// arbol del registry, asi que getcwd nunca lo encuentra). +// 4) "" si no se encuentra. static std::string resolve_registry_root() { if (const char* env = std::getenv("FN_REGISTRY_ROOT")) { if (env && *env) return env; } char cwd[4096]; - if (getcwd(cwd, sizeof(cwd)) == nullptr) return ""; - std::string p = cwd; - for (int i = 0; i < 8; ++i) { - std::string probe = p + "/registry.db"; - FILE* f = std::fopen(probe.c_str(), "rb"); - if (f) { std::fclose(f); return p; } - size_t s = p.find_last_of('/'); - if (s == std::string::npos || s == 0) break; - p = p.substr(0, s); + if (getcwd(cwd, sizeof(cwd)) != nullptr) { + std::string p = cwd; +#ifdef _WIN32 + // Normalizar separadores para comparar. + for (char& c : p) if (c == '\\') c = '/'; +#endif + for (int i = 0; i < 8; ++i) { + std::string probe = p + "/registry.db"; + FILE* f = std::fopen(probe.c_str(), "rb"); + if (f) { std::fclose(f); return p; } + size_t s = p.find_last_of('/'); + if (s == std::string::npos || s == 0) break; + p = p.substr(0, s); + } } +#ifdef _WIN32 + // Fallback Windows: el UNC apunta al WSL del usuario. Ajustar el nombre + // de la distro si no es "Ubuntu". La build Linux/WSL nunca llega aqui. + return "\\\\wsl.localhost\\Ubuntu\\home\\lucas\\fn_registry"; +#else return ""; +#endif } // ----------------------------------------------------------------------------