#include "jobs.h" #include "enrichers.h" #include "../../../../cpp/vendor/sqlite3/sqlite3.h" // Headers comunes a Win32 y POSIX. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef _WIN32 #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #include #else #include #include #include #include #include #include #endif namespace ge { // ---------------------------------------------------------------------------- // Internal state // ---------------------------------------------------------------------------- namespace { struct JobControl { #ifdef _WIN32 HANDLE process = nullptr; // process handle (para TerminateProcess) DWORD pid = 0; #else pid_t pid = -1; #endif std::atomic cancel_requested{false}; }; struct State { std::string app_db_path; std::string ops_db_path; std::string enrichers_dir; std::string app_dir; std::string registry_root; std::mutex q_mu; std::condition_variable q_cv; std::queue pending; std::unordered_map> running; std::vector workers; std::atomic stop_flag{false}; std::atomic dirty{0}; }; State* g_state = nullptr; // ============================================================================ // Python runtime resolver (issue 0033 fase B) // ============================================================================ // Resultado de resolver el Python runtime: path absoluto + procedencia // + flag indicando si el path apunta a un Python dentro de WSL (solo // Windows usa este flag para decidir si lanzar via wsl.exe). struct PyRuntime { std::string path; // path al ejecutable Python std::string kind; // "embedded" | "env" | "registry_venv" | "system" | "" bool needs_wsl = false; }; // Determina el directorio del ejecutable actual (junto al cual se // busca runtime/python/). En POSIX usa /proc/self/exe; en Windows // usa GetModuleFileNameW. std::string get_exe_dir() { #ifdef _WIN32 wchar_t buf[MAX_PATH * 2]; DWORD n = GetModuleFileNameW(nullptr, buf, (DWORD)(sizeof(buf)/sizeof(buf[0]))); if (n == 0 || n >= sizeof(buf)/sizeof(buf[0])) return ""; int u8n = WideCharToMultiByte(CP_UTF8, 0, buf, (int)n, nullptr, 0, nullptr, nullptr); std::string out(u8n, 0); WideCharToMultiByte(CP_UTF8, 0, buf, (int)n, out.data(), u8n, nullptr, nullptr); size_t slash = out.find_last_of("/\\"); return (slash == std::string::npos) ? "" : out.substr(0, slash); #else char buf[4096]; ssize_t n = readlink("/proc/self/exe", buf, sizeof(buf) - 1); if (n <= 0) return ""; buf[n] = 0; std::string out(buf); size_t slash = out.find_last_of('/'); return (slash == std::string::npos) ? "" : out.substr(0, slash); #endif } bool file_exists(const std::string& p) { if (p.empty()) return false; struct stat st{}; return stat(p.c_str(), &st) == 0 && !S_ISDIR(st.st_mode); } // Cadena de fallback (logged una sola vez al primer uso): // 1. /assets/runtime/python/{python.exe|bin/python3} -> kind=embedded // 2. /runtime/python/... (legacy, pre-assets/) -> kind=embedded // 3. $FN_PYTHON -> kind=env // 4. /python/.venv/bin/python3 -> kind=registry_venv // 5. python3 del PATH -> kind=system PyRuntime resolve_python_runtime() { PyRuntime r; std::string exe = get_exe_dir(); #ifdef _WIN32 const char* embed_rel[] = { "\\assets\\runtime\\python\\python.exe", "\\runtime\\python\\python.exe", // legacy }; #else const char* embed_rel[] = { "/assets/runtime/python/bin/python3", "/runtime/python/bin/python3", // legacy }; #endif if (!exe.empty()) { for (const char* rel : embed_rel) { std::string p = exe + rel; if (file_exists(p)) { r.path = p; r.kind = "embedded"; return r; } } } if (const char* env = std::getenv("FN_PYTHON"); env && *env) { if (file_exists(env)) { r.path = env; r.kind = "env"; return r; } } // Legacy: el venv del registry. En Windows requiere wsl.exe // porque ese .venv vive en el sistema de archivos Linux. if (!g_state->registry_root.empty()) { std::string p = g_state->registry_root + "/python/.venv/bin/python3"; #ifdef _WIN32 // En Windows el path es WSL-form; no podemos statearlo desde // Windows directamente, asumimos que existe si registry_root // se resolvio. needs_wsl=true marca que jobs.cpp debe seguir // el camino legacy con wsl.exe. r.path = p; r.kind = "registry_venv"; r.needs_wsl = true; return r; #else if (file_exists(p)) { r.path = p; r.kind = "registry_venv"; return r; } #endif } #ifdef _WIN32 r.path = "python.exe"; #else r.path = "python3"; #endif r.kind = "system"; return r; } // Cache estatico — log una vez la procedencia para que el usuario // vea en stdout que runtime se eligio. const PyRuntime& cached_python_runtime() { static bool inited = false; static PyRuntime r; if (!inited) { r = resolve_python_runtime(); std::fprintf(stdout, "[jobs] python runtime: kind=%s path=%s wsl=%d\n", r.kind.c_str(), r.path.c_str(), r.needs_wsl ? 1 : 0); inited = true; } return r; } long long now_ms() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()).count(); } std::string ulid() { long long ts = now_ms(); static std::atomic ctr{(uint32_t)(ts & 0xFFFFFFFF)}; uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed); char buf[64]; std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd); return buf; } bool sql_exec_simple(sqlite3* db, const char* sql) { char* err = nullptr; int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err); if (rc != SQLITE_OK) { std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n", err ? err : "?", sql); if (err) sqlite3_free(err); return false; } return true; } bool sql_run(sqlite3* db, const char* sql, const std::vector& params) { sqlite3_stmt* st = nullptr; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) return false; for (size_t i = 0; i < params.size(); ++i) { sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1, SQLITE_TRANSIENT); } int rc = sqlite3_step(st); sqlite3_finalize(st); return rc == SQLITE_DONE; } bool ensure_table(const char* db_path) { sqlite3* db = nullptr; if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return false; } sql_exec_simple(db, "PRAGMA journal_mode=WAL;"); bool ok = sql_exec_simple(db, "CREATE TABLE IF NOT EXISTS jobs (" " id TEXT PRIMARY KEY," " enricher_id TEXT NOT NULL," " node_id TEXT," " node_name TEXT NOT NULL DEFAULT ''," " params_json TEXT NOT NULL DEFAULT '{}'," " status TEXT NOT NULL," " progress REAL NOT NULL DEFAULT 0," " stage TEXT NOT NULL DEFAULT ''," " result_json TEXT," " error TEXT," " pid INTEGER," " created_at INTEGER NOT NULL," " started_at INTEGER," " finished_at INTEGER" ");" ); sql_exec_simple(db, "CREATE INDEX IF NOT EXISTS idx_jobs_status " "ON jobs(status, created_at);"); char ts[32]; std::snprintf(ts, sizeof(ts), "%lld", now_ms()); sql_run(db, "UPDATE jobs SET status='error', error='process died (app restart)', " "finished_at=? WHERE status='running'", {ts}); sqlite3_close(db); return ok; } std::string json_escape(const std::string& s) { std::string out; out.reserve(s.size() + 8); for (char c : s) { switch (c) { case '"': out += "\\\""; break; case '\\': out += "\\\\"; break; case '\b': out += "\\b"; break; case '\f': out += "\\f"; break; case '\n': out += "\\n"; break; case '\r': out += "\\r"; break; case '\t': out += "\\t"; break; default: if ((unsigned char)c < 0x20) { char buf[8]; std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c); out += buf; } else { out += c; } } } return out; } // ---------------------------------------------------------------------------- // Path normalization (Windows ↔ WSL) // // El binario Windows ejecuta los enrichers via wsl.exe → Python corre dentro // del WSL. Los paths que viajan al subprocess deben estar en formato WSL: // - "/home/..." se respeta tal cual. // - "C:\\..." -> "/mnt/c/...". // - "\\\\wsl.localhost\\\\path" -> "/path". // - "\\\\wsl$\\\\path" -> "/path". // En POSIX la funcion es no-op (devuelve la string igual). // ---------------------------------------------------------------------------- std::string to_wsl_path(const std::string& p) { #ifndef _WIN32 return p; #else if (p.empty()) return p; if (p[0] == '/') return p; auto is_sep = [](char c) { return c == '\\' || c == '/'; }; // UNC: \\\\... o ////... if (p.size() >= 2 && is_sep(p[0]) && is_sep(p[1])) { size_t i = 2; while (i < p.size() && !is_sep(p[i])) ++i; std::string server = p.substr(2, i - 2); for (auto& c : server) c = (char)std::tolower((unsigned char)c); if (server == "wsl.localhost" || server == "wsl$") { // skip separator + distro if (i < p.size()) ++i; // skip sep while (i < p.size() && !is_sep(p[i])) ++i; // skip distro name std::string rest = p.substr(i); for (char& c : rest) if (c == '\\') c = '/'; return rest.empty() ? std::string("/") : rest; } // UNC desconocido: convertir backslash a slash y devolverlo. std::string out = p; for (char& c : out) if (c == '\\') c = '/'; return out; } // Drive letter: "X:\\..." o "X:/..." if (p.size() >= 3 && std::isalpha((unsigned char)p[0]) && p[1] == ':' && is_sep(p[2])) { std::string out = "/mnt/"; out.push_back((char)std::tolower((unsigned char)p[0])); for (size_t i = 2; i < p.size(); ++i) { out.push_back(p[i] == '\\' ? '/' : p[i]); } return out; } return p; #endif } // Lee un campo de la entidad como string. Devuelve "" si no existe. // Importante: usa el path de operations.db tal y como lo recibimos (sin // to_wsl) porque SQLite corre en el proceso C++, no dentro de WSL. std::string read_entity_field(const char* db_path, const char* id, const char* col) { sqlite3* db = nullptr; if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return ""; } std::string sql = std::string("SELECT ") + col + " FROM entities WHERE id = ? LIMIT 1"; sqlite3_stmt* st = nullptr; std::string out; if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT); if (sqlite3_step(st) == SQLITE_ROW) { const unsigned char* t = sqlite3_column_text(st, 0); if (t) out = (const char*)t; } } sqlite3_finalize(st); sqlite3_close(db); return out; } // JSON entregado al subprocess. En Windows, los paths se normalizan a // forma WSL solo cuando el subprocess corre dentro de WSL (lang=bash, o // python con runtime registry_venv). Para subprocesses nativos Windows // (lang=go, o python embedded/FN_PYTHON/system) se mantienen los paths // Windows-nativos — pasarlos como /mnt/c/... haria que fallen al abrir. // En POSIX la conversion es no-op y siempre se respetan los paths. std::string build_stdin_json(const std::string& job_id, const std::string& enricher_id, const std::string& node_id, const std::string& params_json, const std::string& ops_db, const std::string& app_dir, const std::string& registry_root, const std::string& lang, int auto_group_threshold = 0) { std::string node_type, node_name, node_metadata = "{}"; if (!node_id.empty()) { node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref"); node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name"); std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata"); if (!m.empty()) node_metadata = m; } // Resolver paths a absoluto antes del to_wsl_path. Si el ops_db // viene relativo (caso tipico: "projects//operations.db"), // el subprocess Python lo abriria contra su propio cwd y crearia // un fichero vacio si no coincide. Forzar absoluto evita ese bug. auto absify = [](const std::string& p) -> std::string { if (p.empty()) return p; // Normalizar backslashes a forward slashes ANTES de absolute(). // El path puede venir mezclado (Windows fs::path::string() en // build cross, copia desde Windows...). Sin esto, std::filesystem // en Linux trata `\` como caracter literal del nombre. std::string norm = p; for (char& c : norm) if (c == '\\') c = '/'; std::error_code ec; std::filesystem::path fp(norm); if (fp.is_absolute()) return norm; auto abs = std::filesystem::absolute(fp, ec); std::string out = ec ? norm : abs.lexically_normal().string(); for (char& c : out) if (c == '\\') c = '/'; return out; }; std::string ops_db_abs = absify(ops_db); std::string app_dir_abs = absify(app_dir); std::string root_abs = absify(registry_root); // Decidir si convertir paths a forma WSL. Solo se hace cuando el // subprocess vive dentro de WSL — si no, los paths /mnt/c/... no // existen para el proceso Windows-nativo. bool use_wsl_paths = false; #ifdef _WIN32 if (lang == "bash") { use_wsl_paths = true; } else if (lang == "python") { use_wsl_paths = cached_python_runtime().needs_wsl; } // lang == "go": siempre nativo Windows. #else (void)lang; #endif std::string ops_db_out = use_wsl_paths ? to_wsl_path(ops_db_abs) : ops_db_abs; std::string app_dir_out = use_wsl_paths ? to_wsl_path(app_dir_abs) : app_dir_abs; std::string root_out = use_wsl_paths ? to_wsl_path(root_abs) : root_abs; std::string cache_dir = app_dir_out + "/cache"; std::ostringstream o; o << '{' << "\"job_id\":\"" << json_escape(job_id) << "\"," << "\"enricher_id\":\"" << json_escape(enricher_id) << "\"," << "\"node_id\":\"" << json_escape(node_id) << "\"," << "\"node_type\":\"" << json_escape(node_type) << "\"," << "\"node_name\":\"" << json_escape(node_name) << "\"," << "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << "," << "\"params\":" << (params_json.empty() ? "{}" : params_json) << "," << "\"ops_db_path\":\"" << json_escape(ops_db_out) << "\"," << "\"app_dir\":\"" << json_escape(app_dir_out) << "\"," << "\"cache_dir\":\"" << json_escape(cache_dir) << "\"," << "\"registry_root\":\"" << json_escape(root_out) << "\""; // Issue 0035e: solo emitimos el campo si el manifest declara override. // Asi las pruebas que NO setean el campo siguen viendo defaults estables // y los enrichers Python solo lo leen cuando viene declarado. if (auto_group_threshold > 0) { o << ",\"auto_group_threshold\":" << auto_group_threshold; } o << '}'; return o.str(); } void update_progress(const std::string& job_id, double prog, const std::string& stage) { if (!g_state) return; sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return; } sqlite3_stmt* st = nullptr; const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?"; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_double(st, 1, prog); sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT); sqlite3_step(st); } sqlite3_finalize(st); sqlite3_close(db); } // ---------------------------------------------------------------------------- // Subprocess (POSIX y Win32) // ---------------------------------------------------------------------------- struct ProcResult { int exit_code = -1; bool signaled = false; int signal = 0; std::string stdout_buf; std::string stderr_tail; }; // Parsea PROGRESS en una linea de stderr y, si aplica, actualiza la BD. // stderr_tail crece con todo lo que NO sea PROGRESS, capado a 4 KB. void process_stderr_line(const std::string& line, const std::string& job_id, std::string& stderr_tail, std::mutex& tail_mu) { if (line.rfind("PROGRESS:", 0) == 0) { const char* p = line.c_str() + 9; char* endp = nullptr; double prog = std::strtod(p, &endp); std::string stage; if (endp && *endp) { while (*endp == ' ') ++endp; stage = endp; } update_progress(job_id, prog, stage); } else { std::lock_guard g(tail_mu); stderr_tail += line; stderr_tail += '\n'; if (stderr_tail.size() > 4096) { stderr_tail.erase(0, stderr_tail.size() - 4096); } } } #ifdef _WIN32 // Construye command line para wsl.exe que ejecuta el enricher dentro de WSL. // Usa --cd para asegurar el cwd. Los paths que se le pasan ya estan en // formato WSL (run_path_wsl). Cita argumentos con espacios. std::wstring utf8_to_wide(const std::string& s) { if (s.empty()) return {}; int n = MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), nullptr, 0); std::wstring out(n, 0); MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), out.data(), n); return out; } ProcResult run_subprocess(const std::string& job_id, const std::string& run_path, const std::string& lang, const std::string& stdin_payload, std::shared_ptr ctrl) { ProcResult out; SECURITY_ATTRIBUTES sa{}; sa.nLength = sizeof(sa); sa.bInheritHandle = TRUE; sa.lpSecurityDescriptor = nullptr; HANDLE in_r = nullptr, in_w = nullptr; HANDLE out_r = nullptr, out_w = nullptr; HANDLE err_r = nullptr, err_w = nullptr; auto cleanup = [&]() { for (HANDLE* h : {&in_r, &in_w, &out_r, &out_w, &err_r, &err_w}) { if (*h) { CloseHandle(*h); *h = nullptr; } } }; if (!CreatePipe(&in_r, &in_w, &sa, 0) || !CreatePipe(&out_r, &out_w, &sa, 0) || !CreatePipe(&err_r, &err_w, &sa, 0)) { out.stderr_tail = "CreatePipe failed"; cleanup(); return out; } SetHandleInformation(in_w, HANDLE_FLAG_INHERIT, 0); SetHandleInformation(out_r, HANDLE_FLAG_INHERIT, 0); SetHandleInformation(err_r, HANDLE_FLAG_INHERIT, 0); // Construir cmdline segun lang (issue 0033). // - "go": ejecutar el .exe nativo directamente, sin wsl.exe. // - "python": embedded (Windows nativo) si existe runtime/, si // no fallback a wsl.exe + venv del registry. // - "bash": wsl.exe --cd -- bash (siempre) std::wstring cmdline; if (lang == "go") { // run_path es el .exe Windows nativo. CreateProcessW lo lanza // tal cual. No traducimos a WSL — corre fuera de WSL. cmdline = L"\""; cmdline += utf8_to_wide(run_path); cmdline += L"\""; } else if (lang == "bash") { std::string run_wsl = to_wsl_path(run_path); std::string root_wsl = to_wsl_path(g_state->registry_root); cmdline = L"wsl.exe --cd "; cmdline += utf8_to_wide(root_wsl); cmdline += L" -- /bin/bash "; cmdline += utf8_to_wide(run_wsl); } else { // python — fase B: usar embedded si esta disponible. const PyRuntime& rt = cached_python_runtime(); if (rt.needs_wsl) { // Legacy: registry venv vive en WSL. std::string run_wsl = to_wsl_path(run_path); std::string root_wsl = to_wsl_path(g_state->registry_root); cmdline = L"wsl.exe --cd "; cmdline += utf8_to_wide(root_wsl); cmdline += L" -- "; cmdline += utf8_to_wide(rt.path); cmdline += L" "; cmdline += utf8_to_wide(run_wsl); } else { // Embedded / FN_PYTHON / system — Python nativo Windows. // run_path es Windows nativo, no necesita conversion. cmdline = L"\""; cmdline += utf8_to_wide(rt.path); cmdline += L"\" \""; cmdline += utf8_to_wide(run_path); cmdline += L"\""; } } std::vector cmdbuf(cmdline.begin(), cmdline.end()); cmdbuf.push_back(0); STARTUPINFOW si{}; si.cb = sizeof(si); si.dwFlags = STARTF_USESTDHANDLES; si.hStdInput = in_r; si.hStdOutput = out_w; si.hStdError = err_w; PROCESS_INFORMATION pi{}; BOOL ok = CreateProcessW( nullptr, cmdbuf.data(), nullptr, nullptr, TRUE, // bInheritHandles CREATE_NO_WINDOW, // sin ventana de consola nullptr, nullptr, &si, &pi); if (!ok) { DWORD err = GetLastError(); char buf[64]; std::snprintf(buf, sizeof(buf), "CreateProcessW failed (err=%lu, wsl.exe missing?)", (unsigned long)err); out.stderr_tail = buf; cleanup(); return out; } // Cerrar handles que pertenecen al child en este lado. CloseHandle(in_r); in_r = nullptr; CloseHandle(out_w); out_w = nullptr; CloseHandle(err_w); err_w = nullptr; ctrl->process = pi.hProcess; ctrl->pid = pi.dwProcessId; // Persistir pid en BD. { sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) { sqlite3_stmt* st = nullptr; if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_int (st, 1, (int)pi.dwProcessId); sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT); sqlite3_step(st); } sqlite3_finalize(st); sqlite3_close(db); } } // Escribir stdin entero. if (!stdin_payload.empty()) { DWORD written = 0; const char* p = stdin_payload.c_str(); DWORD left = (DWORD)stdin_payload.size(); while (left > 0) { DWORD w = 0; if (!WriteFile(in_w, p + written, left, &w, nullptr) || w == 0) break; written += w; left -= w; } } CloseHandle(in_w); in_w = nullptr; // Thread aux para stderr. std::string stderr_tail_local; std::mutex tail_mu; std::thread err_t([&]() { std::string line; char ch; while (true) { DWORD n = 0; if (!ReadFile(err_r, &ch, 1, &n, nullptr) || n == 0) break; if (ch == '\n') { process_stderr_line(line, job_id, stderr_tail_local, tail_mu); line.clear(); } else if (ch != '\r') { line.push_back(ch); if (line.size() > 4096) line.clear(); } } if (!line.empty()) { process_stderr_line(line, job_id, stderr_tail_local, tail_mu); } }); // Leer stdout entero. { char buf[4096]; while (true) { DWORD n = 0; if (!ReadFile(out_r, buf, sizeof(buf), &n, nullptr) || n == 0) break; out.stdout_buf.append(buf, (size_t)n); if (out.stdout_buf.size() > 1024 * 1024) break; } } CloseHandle(out_r); out_r = nullptr; // Esperar al child con polling para soportar cancelacion. while (true) { DWORD wr = WaitForSingleObject(pi.hProcess, 100); if (wr == WAIT_OBJECT_0) break; if (ctrl->cancel_requested.load()) { TerminateProcess(pi.hProcess, 1); WaitForSingleObject(pi.hProcess, 5000); break; } } DWORD exit_code = 0; GetExitCodeProcess(pi.hProcess, &exit_code); out.exit_code = (int)exit_code; err_t.join(); CloseHandle(err_r); CloseHandle(pi.hProcess); CloseHandle(pi.hThread); { std::lock_guard g(tail_mu); out.stderr_tail = std::move(stderr_tail_local); } return out; } void kill_proc(JobControl& c) { if (c.process) TerminateProcess(c.process, 1); } #else // =========================== POSIX ================================= ProcResult run_subprocess(const std::string& job_id, const std::string& run_path, const std::string& lang, const std::string& stdin_payload, std::shared_ptr ctrl) { ProcResult out; int p_in[2] = {-1, -1}; int p_out[2] = {-1, -1}; int p_err[2] = {-1, -1}; if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) { out.stderr_tail = "pipe() failed"; return out; } pid_t pid = fork(); if (pid < 0) { out.stderr_tail = "fork() failed"; for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) { if (fd >= 0) close(fd); } return out; } if (pid == 0) { dup2(p_in[0], 0); dup2(p_out[1], 1); dup2(p_err[1], 2); close(p_in[0]); close(p_in[1]); close(p_out[0]); close(p_out[1]); close(p_err[0]); close(p_err[1]); // Bifurcacion por lang (issue 0033). // - "go": execv directo del binario. // - "bash": /bin/bash . // - "python": /python/.venv/bin/python3 . if (lang == "go") { const char* argv[] = { run_path.c_str(), nullptr }; execv(run_path.c_str(), (char* const*)argv); std::fprintf(stderr, "execv failed: %s\n", run_path.c_str()); _exit(127); } if (lang == "bash") { const char* sh = "/bin/bash"; const char* argv[] = { sh, run_path.c_str(), nullptr }; execv(sh, (char* const*)argv); std::fprintf(stderr, "execv bash failed\n"); _exit(127); } // Default: python — usa la cadena de fallback de fase B // (embedded > FN_PYTHON > registry venv > system PATH). const PyRuntime& rt = cached_python_runtime(); if (rt.kind == "system") { // Lookup en PATH via execvp. const char* argv[] = { rt.path.c_str(), run_path.c_str(), nullptr }; execvp(rt.path.c_str(), (char* const*)argv); } else { const char* argv[] = { rt.path.c_str(), run_path.c_str(), nullptr }; execv(rt.path.c_str(), (char* const*)argv); } std::fprintf(stderr, "execv failed: %s\n", rt.path.c_str()); _exit(127); } ctrl->pid = pid; close(p_in[0]); close(p_out[1]); close(p_err[1]); { sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) { sqlite3_stmt* st = nullptr; if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_int (st, 1, (int)pid); sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT); sqlite3_step(st); } sqlite3_finalize(st); sqlite3_close(db); } } if (!stdin_payload.empty()) { ssize_t written = 0; const char* p = stdin_payload.c_str(); size_t left = stdin_payload.size(); while (left > 0) { ssize_t n = write(p_in[1], p + written, left); if (n < 0) { if (errno == EINTR) continue; break; } written += n; left -= (size_t)n; } } close(p_in[1]); std::string stderr_tail_local; std::mutex tail_mu; std::thread err_t([&]() { std::string line; char ch; while (true) { ssize_t n = read(p_err[0], &ch, 1); if (n <= 0) break; if (ch == '\n') { process_stderr_line(line, job_id, stderr_tail_local, tail_mu); line.clear(); } else { line.push_back(ch); if (line.size() > 4096) line.clear(); } } if (!line.empty()) { process_stderr_line(line, job_id, stderr_tail_local, tail_mu); } }); { char buf[4096]; while (true) { ssize_t n = read(p_out[0], buf, sizeof(buf)); if (n <= 0) break; out.stdout_buf.append(buf, (size_t)n); if (out.stdout_buf.size() > 1024 * 1024) break; } } close(p_out[0]); int status = 0; while (true) { if (ctrl->cancel_requested.load() && pid > 0) { kill(pid, SIGTERM); for (int i = 0; i < 5; ++i) { pid_t r = waitpid(pid, &status, WNOHANG); if (r == pid) goto reaped; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } kill(pid, SIGKILL); } pid_t r = waitpid(pid, &status, 0); if (r == pid) break; if (r < 0 && errno == EINTR) continue; break; } reaped: err_t.join(); close(p_err[0]); if (WIFEXITED(status)) { out.exit_code = WEXITSTATUS(status); } else if (WIFSIGNALED(status)) { out.signaled = true; out.signal = WTERMSIG(status); out.exit_code = -1; } { std::lock_guard g(tail_mu); out.stderr_tail = std::move(stderr_tail_local); } return out; } void kill_proc(JobControl& c) { if (c.pid > 0) kill(c.pid, SIGTERM); } #endif // _WIN32 // ---------------------------------------------------------------------------- // Worker común // ---------------------------------------------------------------------------- void persist_status(const std::string& job_id, const std::string& status, const std::string& result_json, const std::string& error, bool set_finished) { if (!g_state) return; sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return; } if (set_finished) { sqlite3_stmt* st = nullptr; const char* sql = "UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? " "WHERE id=?"; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT); sqlite3_bind_int64 (st, 4, now_ms()); sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT); sqlite3_step(st); } sqlite3_finalize(st); } else { sqlite3_stmt* st = nullptr; const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?"; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT); sqlite3_bind_int64(st, 2, now_ms()); sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT); sqlite3_step(st); } sqlite3_finalize(st); } sqlite3_close(db); } struct JobContext { std::string id, enricher_id, node_id, node_name, params_json, status; }; bool load_job(const std::string& id, JobContext* out) { sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return false; } sqlite3_stmt* st = nullptr; const char* sql = "SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status " "FROM jobs WHERE id=?"; bool ok = false; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT); if (sqlite3_step(st) == SQLITE_ROW) { auto col = [&](int i) { const unsigned char* t = sqlite3_column_text(st, i); return std::string(t ? (const char*)t : ""); }; out->id = col(0); out->enricher_id = col(1); out->node_id = col(2); out->node_name = col(3); out->params_json = col(4); out->status = col(5); ok = true; } } sqlite3_finalize(st); sqlite3_close(db); return ok; } void worker_loop() { while (!g_state->stop_flag.load()) { std::string job_id; { std::unique_lock lk(g_state->q_mu); g_state->q_cv.wait(lk, [] { return g_state->stop_flag.load() || !g_state->pending.empty(); }); if (g_state->stop_flag.load()) return; job_id = std::move(g_state->pending.front()); g_state->pending.pop(); } JobContext ctx; if (!load_job(job_id, &ctx)) continue; if (ctx.status == "cancelled") continue; // Resolver run_path y lang desde el registro de enrichers // (issue 0033 — antes hardcodeaba run.py). const ge::EnricherSpec* spec = ge::enricher_by_id(ctx.enricher_id.c_str()); if (!spec) { persist_status(job_id, "failure", "", "enricher no encontrado en el registro", false); continue; } if (spec->disabled) { std::string err = "enricher deshabilitado: " + spec->disabled_reason; persist_status(job_id, "failure", "", err, false); continue; } std::string run_path = spec->run_path; std::string lang = spec->lang; persist_status(job_id, "running", "", "", false); auto ctrl = std::make_shared(); { std::lock_guard lk(g_state->q_mu); g_state->running[job_id] = ctrl; } std::string ops_db; { std::lock_guard lk(g_state->q_mu); ops_db = g_state->ops_db_path; } std::string stdin_payload = build_stdin_json( ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json, ops_db, g_state->app_dir, g_state->registry_root, lang, spec->auto_group_threshold); ProcResult res = run_subprocess(job_id, run_path, lang, stdin_payload, ctrl); std::string final_status, error; std::string result_json = res.stdout_buf; while (!result_json.empty() && (result_json.back() == '\n' || result_json.back() == '\r' || result_json.back() == ' ' || result_json.back() == '\t')) { result_json.pop_back(); } if (ctrl->cancel_requested.load()) { final_status = "cancelled"; error = "user cancelled"; } else if (res.exit_code == 0) { final_status = "done"; if (result_json.empty()) result_json = "{}"; } else { final_status = "error"; char buf[64]; if (res.signaled) { std::snprintf(buf, sizeof(buf), "signal %d", res.signal); } else { std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code); } error = std::string(buf); if (!res.stderr_tail.empty()) { error += "\n"; error += res.stderr_tail; } } persist_status(job_id, final_status, result_json, error, true); { std::lock_guard lk(g_state->q_mu); g_state->running.erase(job_id); } if (final_status == "done") { g_state->dirty.fetch_add(1, std::memory_order_relaxed); } } } } // namespace // ---------------------------------------------------------------------------- // Public API // ---------------------------------------------------------------------------- bool jobs_init(const char* app_db_path, const char* ops_db_path, const char* enrichers_dir, const char* app_dir, const char* registry_root, int n_workers) { if (g_state) return true; if (!app_db_path || !*app_db_path) return false; if (n_workers < 1) n_workers = 1; if (n_workers > 8) n_workers = 8; if (!ensure_table(app_db_path)) return false; g_state = new State(); g_state->app_db_path = app_db_path; g_state->ops_db_path = ops_db_path ? ops_db_path : ""; g_state->enrichers_dir = enrichers_dir ? enrichers_dir : ""; g_state->app_dir = app_dir ? app_dir : ""; g_state->registry_root = registry_root ? registry_root : ""; { sqlite3* db = nullptr; if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY, nullptr) == SQLITE_OK) { sqlite3_stmt* st = nullptr; const char* sql = "SELECT id FROM jobs WHERE status='queued' ORDER BY created_at"; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { while (sqlite3_step(st) == SQLITE_ROW) { const unsigned char* t = sqlite3_column_text(st, 0); if (t) g_state->pending.push((const char*)t); } } sqlite3_finalize(st); sqlite3_close(db); } } // Forzar resolucion del Python runtime al iniciar — asi el log // sale en stdout una sola vez con la procedencia (embedded / // env / registry_venv / system) y el usuario ve que se elegira. (void)cached_python_runtime(); for (int i = 0; i < n_workers; ++i) { g_state->workers.emplace_back(worker_loop); } return true; } void jobs_set_ops_db(const char* ops_db_path) { if (!g_state) return; std::lock_guard lk(g_state->q_mu); g_state->ops_db_path = ops_db_path ? ops_db_path : ""; } bool jobs_submit(const char* enricher_id, const char* node_id, const char* node_name, const char* params_json, char* out_id, size_t out_id_n) { if (!g_state || !enricher_id || !*enricher_id) return false; if (!out_id || out_id_n < 32) return false; std::string id = ulid(); std::snprintf(out_id, out_id_n, "%s", id.c_str()); sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return false; } sqlite3_stmt* st = nullptr; const char* sql = "INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, " "status, progress, stage, created_at) " "VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)"; bool ok = false; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT); sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT); sqlite3_bind_int64(st, 6, now_ms()); ok = sqlite3_step(st) == SQLITE_DONE; } sqlite3_finalize(st); sqlite3_close(db); if (!ok) return false; { std::lock_guard lk(g_state->q_mu); g_state->pending.push(id); } g_state->q_cv.notify_one(); return true; } bool jobs_cancel(const char* job_id) { if (!g_state || !job_id) return false; std::shared_ptr ctrl; { std::lock_guard lk(g_state->q_mu); auto it = g_state->running.find(job_id); if (it != g_state->running.end()) ctrl = it->second; } if (ctrl) { ctrl->cancel_requested.store(true); kill_proc(*ctrl); return true; } sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return false; } sqlite3_stmt* st = nullptr; const char* sql = "UPDATE jobs SET status='cancelled', finished_at=?, " "error='cancelled before start' WHERE id=? AND status='queued'"; bool ok = false; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_int64(st, 1, now_ms()); sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT); ok = sqlite3_step(st) == SQLITE_DONE; } sqlite3_finalize(st); sqlite3_close(db); return ok; } bool jobs_delete(const char* job_id) { if (!g_state || !job_id) return false; sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return false; } sqlite3_stmt* st = nullptr; const char* sql = "DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')"; bool ok = false; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT); ok = sqlite3_step(st) == SQLITE_DONE; } sqlite3_finalize(st); sqlite3_close(db); return ok; } bool jobs_list(std::vector* out, int limit) { if (!g_state || !out) return false; out->clear(); if (limit < 1) limit = 1; if (limit > 1000) limit = 1000; sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return false; } sqlite3_stmt* st = nullptr; const char* sql = "SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, " "progress, stage, COALESCE(error,''), COALESCE(result_json,''), " "created_at, COALESCE(started_at,0), COALESCE(finished_at,0) " "FROM jobs ORDER BY created_at DESC LIMIT ?"; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { sqlite3_bind_int(st, 1, limit); while (sqlite3_step(st) == SQLITE_ROW) { JobRow r; auto col = [&](int i) { const unsigned char* t = sqlite3_column_text(st, i); return std::string(t ? (const char*)t : ""); }; r.id = col(0); r.enricher_id = col(1); r.node_id = col(2); r.node_name = col(3); r.status = col(4); r.progress = sqlite3_column_double(st, 5); r.stage = col(6); r.error = col(7); r.result_json = col(8); r.created_at = sqlite3_column_int64(st, 9); r.started_at = sqlite3_column_int64(st, 10); r.finished_at = sqlite3_column_int64(st, 11); out->push_back(std::move(r)); } } sqlite3_finalize(st); sqlite3_close(db); return true; } JobCounters jobs_counters() { JobCounters c{}; if (!g_state) return c; sqlite3* db = nullptr; if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { if (db) sqlite3_close(db); return c; } sqlite3_stmt* st = nullptr; const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status"; if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { while (sqlite3_step(st) == SQLITE_ROW) { const unsigned char* s = sqlite3_column_text(st, 0); int n = sqlite3_column_int(st, 1); if (!s) continue; std::string status((const char*)s); if (status == "queued") c.queued = n; else if (status == "running") c.running = n; else if (status == "done") c.done = n; else if (status == "error") c.error = n; else if (status == "cancelled") c.cancelled = n; } } sqlite3_finalize(st); sqlite3_close(db); return c; } int jobs_dirty_counter() { if (!g_state) return 0; return g_state->dirty.load(std::memory_order_relaxed); } void jobs_shutdown() { if (!g_state) return; g_state->stop_flag.store(true); { std::lock_guard lk(g_state->q_mu); for (auto& kv : g_state->running) { kv.second->cancel_requested.store(true); kill_proc(*kv.second); } } g_state->q_cv.notify_all(); for (auto& t : g_state->workers) { if (t.joinable()) t.join(); } delete g_state; g_state = nullptr; } } // namespace ge