7a94160fd2
Bloque de cambios revisados y validados con el usuario en sesiones previas que no habian aterrizado en commits propios. Lista por tema: * enrichers: web_search ahora usa lite.duckduckgo.com como endpoint primario (mas tolerante con bot detection desde IP residencial), con fallback al endpoint html. Detecta pagina captcha y emite error claro si ambos fallan. Anyade _DDGLiteParser para el formato lite + auto-pick de parser por contenido. * enrichers: tipo Webpage unificado en Url (campos de cuerpo cacheado viven en metadata del Url). Manifests actualizados (applies_to: [Url]). fetch_webpage ya no convierte Url->Webpage. * enrichers/manifest: campo `params` parseado a EnricherSpec.params (name, type, default_value, description). UI puede renderizar dialog de configuracion. * jobs: fix de path conversion para Python embebido nativo Windows (no convertir a /mnt/c/... cuando el subproceso es Windows-native; solo cuando es bash o python via WSL). * main.cpp: ventana ImGui (no modal) "Run enricher" con layout 2-col (label izq, input der). Inserta job con JSON tipado. Layout clustering apretado: hijos del mismo anchor en un solo anillo alrededor del padre, sin desperdigar por anillos crecientes. * views: inspector con layout 2-col via BeginTable (Identity, Schema fields, Extras). Description full-width debajo de su label. * tests: portable conftest (auto-detecta REGISTRY_ROOT, PYTHON_BIN, ENRICHERS_DIR para WSL y Windows portable). _runner.py trampoline inyecta stub via sys.path porque embedded Python ignora PYTHONPATH. Tests bash-only (vendor_script, freeze, dispatcher bash, resolver Linux-binary) skipean en Windows. Tests existentes adaptados a Webpage->Url. Resultado actual: 32 passed WSL, 21 passed + 11 skipped Windows.
1359 lines
46 KiB
C++
1359 lines
46 KiB
C++
#include "jobs.h"
|
|
#include "enrichers.h"
|
|
|
|
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
|
|
|
|
// Headers comunes a Win32 y POSIX.
|
|
#include <atomic>
|
|
#include <cctype>
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <cstdarg>
|
|
#include <cstdio>
|
|
#include <cstdlib>
|
|
#include <cstring>
|
|
#include <ctime>
|
|
#include <filesystem>
|
|
#include <memory>
|
|
#include <sys/stat.h>
|
|
#include <mutex>
|
|
#include <queue>
|
|
#include <sstream>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <unordered_map>
|
|
#include <vector>
|
|
|
|
#ifdef _WIN32
|
|
#ifndef WIN32_LEAN_AND_MEAN
|
|
#define WIN32_LEAN_AND_MEAN
|
|
#endif
|
|
#include <windows.h>
|
|
#else
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <signal.h>
|
|
#include <sys/types.h>
|
|
#include <sys/wait.h>
|
|
#include <unistd.h>
|
|
#endif
|
|
|
|
namespace ge {
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Internal state
|
|
// ----------------------------------------------------------------------------
|
|
|
|
namespace {
|
|
|
|
struct JobControl {
|
|
#ifdef _WIN32
|
|
HANDLE process = nullptr; // process handle (para TerminateProcess)
|
|
DWORD pid = 0;
|
|
#else
|
|
pid_t pid = -1;
|
|
#endif
|
|
std::atomic<bool> cancel_requested{false};
|
|
};
|
|
|
|
struct State {
|
|
std::string app_db_path;
|
|
std::string ops_db_path;
|
|
std::string enrichers_dir;
|
|
std::string app_dir;
|
|
std::string registry_root;
|
|
|
|
std::mutex q_mu;
|
|
std::condition_variable q_cv;
|
|
std::queue<std::string> pending;
|
|
std::unordered_map<std::string, std::shared_ptr<JobControl>> running;
|
|
|
|
std::vector<std::thread> workers;
|
|
std::atomic<bool> stop_flag{false};
|
|
std::atomic<int> dirty{0};
|
|
};
|
|
|
|
State* g_state = nullptr;
|
|
|
|
// ============================================================================
|
|
// Python runtime resolver (issue 0033 fase B)
|
|
// ============================================================================
|
|
|
|
// Resultado de resolver el Python runtime: path absoluto + procedencia
|
|
// + flag indicando si el path apunta a un Python dentro de WSL (solo
|
|
// Windows usa este flag para decidir si lanzar via wsl.exe).
|
|
struct PyRuntime {
|
|
std::string path; // path al ejecutable Python
|
|
std::string kind; // "embedded" | "env" | "registry_venv" | "system" | ""
|
|
bool needs_wsl = false;
|
|
};
|
|
|
|
// Determina el directorio del ejecutable actual (junto al cual se
|
|
// busca runtime/python/). En POSIX usa /proc/self/exe; en Windows
|
|
// usa GetModuleFileNameW.
|
|
std::string get_exe_dir() {
|
|
#ifdef _WIN32
|
|
wchar_t buf[MAX_PATH * 2];
|
|
DWORD n = GetModuleFileNameW(nullptr, buf, (DWORD)(sizeof(buf)/sizeof(buf[0])));
|
|
if (n == 0 || n >= sizeof(buf)/sizeof(buf[0])) return "";
|
|
int u8n = WideCharToMultiByte(CP_UTF8, 0, buf, (int)n, nullptr, 0, nullptr, nullptr);
|
|
std::string out(u8n, 0);
|
|
WideCharToMultiByte(CP_UTF8, 0, buf, (int)n, out.data(), u8n, nullptr, nullptr);
|
|
size_t slash = out.find_last_of("/\\");
|
|
return (slash == std::string::npos) ? "" : out.substr(0, slash);
|
|
#else
|
|
char buf[4096];
|
|
ssize_t n = readlink("/proc/self/exe", buf, sizeof(buf) - 1);
|
|
if (n <= 0) return "";
|
|
buf[n] = 0;
|
|
std::string out(buf);
|
|
size_t slash = out.find_last_of('/');
|
|
return (slash == std::string::npos) ? "" : out.substr(0, slash);
|
|
#endif
|
|
}
|
|
|
|
bool file_exists(const std::string& p) {
|
|
if (p.empty()) return false;
|
|
struct stat st{};
|
|
return stat(p.c_str(), &st) == 0 && !S_ISDIR(st.st_mode);
|
|
}
|
|
|
|
// Cadena de fallback (logged una sola vez al primer uso):
|
|
// 1. <exe_dir>/assets/runtime/python/{python.exe|bin/python3} -> kind=embedded
|
|
// 2. <exe_dir>/runtime/python/... (legacy, pre-assets/) -> kind=embedded
|
|
// 3. $FN_PYTHON -> kind=env
|
|
// 4. <registry_root>/python/.venv/bin/python3 -> kind=registry_venv
|
|
// 5. python3 del PATH -> kind=system
|
|
PyRuntime resolve_python_runtime() {
|
|
PyRuntime r;
|
|
std::string exe = get_exe_dir();
|
|
|
|
#ifdef _WIN32
|
|
const char* embed_rel[] = {
|
|
"\\assets\\runtime\\python\\python.exe",
|
|
"\\runtime\\python\\python.exe", // legacy
|
|
};
|
|
#else
|
|
const char* embed_rel[] = {
|
|
"/assets/runtime/python/bin/python3",
|
|
"/runtime/python/bin/python3", // legacy
|
|
};
|
|
#endif
|
|
if (!exe.empty()) {
|
|
for (const char* rel : embed_rel) {
|
|
std::string p = exe + rel;
|
|
if (file_exists(p)) { r.path = p; r.kind = "embedded"; return r; }
|
|
}
|
|
}
|
|
|
|
if (const char* env = std::getenv("FN_PYTHON"); env && *env) {
|
|
if (file_exists(env)) { r.path = env; r.kind = "env"; return r; }
|
|
}
|
|
|
|
// Legacy: el venv del registry. En Windows requiere wsl.exe
|
|
// porque ese .venv vive en el sistema de archivos Linux.
|
|
if (!g_state->registry_root.empty()) {
|
|
std::string p = g_state->registry_root + "/python/.venv/bin/python3";
|
|
#ifdef _WIN32
|
|
// En Windows el path es WSL-form; no podemos statearlo desde
|
|
// Windows directamente, asumimos que existe si registry_root
|
|
// se resolvio. needs_wsl=true marca que jobs.cpp debe seguir
|
|
// el camino legacy con wsl.exe.
|
|
r.path = p;
|
|
r.kind = "registry_venv";
|
|
r.needs_wsl = true;
|
|
return r;
|
|
#else
|
|
if (file_exists(p)) { r.path = p; r.kind = "registry_venv"; return r; }
|
|
#endif
|
|
}
|
|
|
|
#ifdef _WIN32
|
|
r.path = "python.exe";
|
|
#else
|
|
r.path = "python3";
|
|
#endif
|
|
r.kind = "system";
|
|
return r;
|
|
}
|
|
|
|
// Cache estatico — log una vez la procedencia para que el usuario
|
|
// vea en stdout que runtime se eligio.
|
|
const PyRuntime& cached_python_runtime() {
|
|
static bool inited = false;
|
|
static PyRuntime r;
|
|
if (!inited) {
|
|
r = resolve_python_runtime();
|
|
std::fprintf(stdout,
|
|
"[jobs] python runtime: kind=%s path=%s wsl=%d\n",
|
|
r.kind.c_str(), r.path.c_str(), r.needs_wsl ? 1 : 0);
|
|
inited = true;
|
|
}
|
|
return r;
|
|
}
|
|
|
|
long long now_ms() {
|
|
using namespace std::chrono;
|
|
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
|
}
|
|
|
|
std::string ulid() {
|
|
long long ts = now_ms();
|
|
static std::atomic<uint32_t> ctr{(uint32_t)(ts & 0xFFFFFFFF)};
|
|
uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed);
|
|
char buf[64];
|
|
std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd);
|
|
return buf;
|
|
}
|
|
|
|
bool sql_exec_simple(sqlite3* db, const char* sql) {
|
|
char* err = nullptr;
|
|
int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err);
|
|
if (rc != SQLITE_OK) {
|
|
std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n",
|
|
err ? err : "?", sql);
|
|
if (err) sqlite3_free(err);
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool sql_run(sqlite3* db, const char* sql,
|
|
const std::vector<std::string>& params)
|
|
{
|
|
sqlite3_stmt* st = nullptr;
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) return false;
|
|
for (size_t i = 0; i < params.size(); ++i) {
|
|
sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1,
|
|
SQLITE_TRANSIENT);
|
|
}
|
|
int rc = sqlite3_step(st);
|
|
sqlite3_finalize(st);
|
|
return rc == SQLITE_DONE;
|
|
}
|
|
|
|
bool ensure_table(const char* db_path) {
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(db_path, &db,
|
|
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
|
|
nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return false;
|
|
}
|
|
sql_exec_simple(db, "PRAGMA journal_mode=WAL;");
|
|
bool ok = sql_exec_simple(db,
|
|
"CREATE TABLE IF NOT EXISTS jobs ("
|
|
" id TEXT PRIMARY KEY,"
|
|
" enricher_id TEXT NOT NULL,"
|
|
" node_id TEXT,"
|
|
" node_name TEXT NOT NULL DEFAULT '',"
|
|
" params_json TEXT NOT NULL DEFAULT '{}',"
|
|
" status TEXT NOT NULL,"
|
|
" progress REAL NOT NULL DEFAULT 0,"
|
|
" stage TEXT NOT NULL DEFAULT '',"
|
|
" result_json TEXT,"
|
|
" error TEXT,"
|
|
" pid INTEGER,"
|
|
" created_at INTEGER NOT NULL,"
|
|
" started_at INTEGER,"
|
|
" finished_at INTEGER"
|
|
");"
|
|
);
|
|
sql_exec_simple(db,
|
|
"CREATE INDEX IF NOT EXISTS idx_jobs_status "
|
|
"ON jobs(status, created_at);");
|
|
char ts[32];
|
|
std::snprintf(ts, sizeof(ts), "%lld", now_ms());
|
|
sql_run(db,
|
|
"UPDATE jobs SET status='error', error='process died (app restart)', "
|
|
"finished_at=? WHERE status='running'",
|
|
{ts});
|
|
sqlite3_close(db);
|
|
return ok;
|
|
}
|
|
|
|
std::string json_escape(const std::string& s) {
|
|
std::string out;
|
|
out.reserve(s.size() + 8);
|
|
for (char c : s) {
|
|
switch (c) {
|
|
case '"': out += "\\\""; break;
|
|
case '\\': out += "\\\\"; break;
|
|
case '\b': out += "\\b"; break;
|
|
case '\f': out += "\\f"; break;
|
|
case '\n': out += "\\n"; break;
|
|
case '\r': out += "\\r"; break;
|
|
case '\t': out += "\\t"; break;
|
|
default:
|
|
if ((unsigned char)c < 0x20) {
|
|
char buf[8];
|
|
std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c);
|
|
out += buf;
|
|
} else {
|
|
out += c;
|
|
}
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Path normalization (Windows ↔ WSL)
|
|
//
|
|
// El binario Windows ejecuta los enrichers via wsl.exe → Python corre dentro
|
|
// del WSL. Los paths que viajan al subprocess deben estar en formato WSL:
|
|
// - "/home/..." se respeta tal cual.
|
|
// - "C:\\..." -> "/mnt/c/...".
|
|
// - "\\\\wsl.localhost\\<distro>\\path" -> "/path".
|
|
// - "\\\\wsl$\\<distro>\\path" -> "/path".
|
|
// En POSIX la funcion es no-op (devuelve la string igual).
|
|
// ----------------------------------------------------------------------------
|
|
std::string to_wsl_path(const std::string& p) {
|
|
#ifndef _WIN32
|
|
return p;
|
|
#else
|
|
if (p.empty()) return p;
|
|
if (p[0] == '/') return p;
|
|
auto is_sep = [](char c) { return c == '\\' || c == '/'; };
|
|
|
|
// UNC: \\<server>\<share>\... o //<server>/<share>/...
|
|
if (p.size() >= 2 && is_sep(p[0]) && is_sep(p[1])) {
|
|
size_t i = 2;
|
|
while (i < p.size() && !is_sep(p[i])) ++i;
|
|
std::string server = p.substr(2, i - 2);
|
|
for (auto& c : server) c = (char)std::tolower((unsigned char)c);
|
|
if (server == "wsl.localhost" || server == "wsl$") {
|
|
// skip separator + distro
|
|
if (i < p.size()) ++i; // skip sep
|
|
while (i < p.size() && !is_sep(p[i])) ++i; // skip distro name
|
|
std::string rest = p.substr(i);
|
|
for (char& c : rest) if (c == '\\') c = '/';
|
|
return rest.empty() ? std::string("/") : rest;
|
|
}
|
|
// UNC desconocido: convertir backslash a slash y devolverlo.
|
|
std::string out = p;
|
|
for (char& c : out) if (c == '\\') c = '/';
|
|
return out;
|
|
}
|
|
|
|
// Drive letter: "X:\\..." o "X:/..."
|
|
if (p.size() >= 3 && std::isalpha((unsigned char)p[0]) && p[1] == ':' &&
|
|
is_sep(p[2])) {
|
|
std::string out = "/mnt/";
|
|
out.push_back((char)std::tolower((unsigned char)p[0]));
|
|
for (size_t i = 2; i < p.size(); ++i) {
|
|
out.push_back(p[i] == '\\' ? '/' : p[i]);
|
|
}
|
|
return out;
|
|
}
|
|
return p;
|
|
#endif
|
|
}
|
|
|
|
// Lee un campo de la entidad como string. Devuelve "" si no existe.
|
|
// Importante: usa el path de operations.db tal y como lo recibimos (sin
|
|
// to_wsl) porque SQLite corre en el proceso C++, no dentro de WSL.
|
|
std::string read_entity_field(const char* db_path, const char* id,
|
|
const char* col)
|
|
{
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr)
|
|
!= SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return "";
|
|
}
|
|
std::string sql = std::string("SELECT ") + col +
|
|
" FROM entities WHERE id = ? LIMIT 1";
|
|
sqlite3_stmt* st = nullptr;
|
|
std::string out;
|
|
if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT);
|
|
if (sqlite3_step(st) == SQLITE_ROW) {
|
|
const unsigned char* t = sqlite3_column_text(st, 0);
|
|
if (t) out = (const char*)t;
|
|
}
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
return out;
|
|
}
|
|
|
|
// JSON entregado al subprocess. En Windows, los paths se normalizan a
|
|
// forma WSL solo cuando el subprocess corre dentro de WSL (lang=bash, o
|
|
// python con runtime registry_venv). Para subprocesses nativos Windows
|
|
// (lang=go, o python embedded/FN_PYTHON/system) se mantienen los paths
|
|
// Windows-nativos — pasarlos como /mnt/c/... haria que fallen al abrir.
|
|
// En POSIX la conversion es no-op y siempre se respetan los paths.
|
|
std::string build_stdin_json(const std::string& job_id,
|
|
const std::string& enricher_id,
|
|
const std::string& node_id,
|
|
const std::string& params_json,
|
|
const std::string& ops_db,
|
|
const std::string& app_dir,
|
|
const std::string& registry_root,
|
|
const std::string& lang)
|
|
{
|
|
std::string node_type, node_name, node_metadata = "{}";
|
|
if (!node_id.empty()) {
|
|
node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref");
|
|
node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name");
|
|
std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata");
|
|
if (!m.empty()) node_metadata = m;
|
|
}
|
|
|
|
// Resolver paths a absoluto antes del to_wsl_path. Si el ops_db
|
|
// viene relativo (caso tipico: "projects/<slug>/operations.db"),
|
|
// el subprocess Python lo abriria contra su propio cwd y crearia
|
|
// un fichero vacio si no coincide. Forzar absoluto evita ese bug.
|
|
auto absify = [](const std::string& p) -> std::string {
|
|
if (p.empty()) return p;
|
|
// Normalizar backslashes a forward slashes ANTES de absolute().
|
|
// El path puede venir mezclado (Windows fs::path::string() en
|
|
// build cross, copia desde Windows...). Sin esto, std::filesystem
|
|
// en Linux trata `\` como caracter literal del nombre.
|
|
std::string norm = p;
|
|
for (char& c : norm) if (c == '\\') c = '/';
|
|
std::error_code ec;
|
|
std::filesystem::path fp(norm);
|
|
if (fp.is_absolute()) return norm;
|
|
auto abs = std::filesystem::absolute(fp, ec);
|
|
std::string out = ec ? norm : abs.lexically_normal().string();
|
|
for (char& c : out) if (c == '\\') c = '/';
|
|
return out;
|
|
};
|
|
std::string ops_db_abs = absify(ops_db);
|
|
std::string app_dir_abs = absify(app_dir);
|
|
std::string root_abs = absify(registry_root);
|
|
|
|
// Decidir si convertir paths a forma WSL. Solo se hace cuando el
|
|
// subprocess vive dentro de WSL — si no, los paths /mnt/c/... no
|
|
// existen para el proceso Windows-nativo.
|
|
bool use_wsl_paths = false;
|
|
#ifdef _WIN32
|
|
if (lang == "bash") {
|
|
use_wsl_paths = true;
|
|
} else if (lang == "python") {
|
|
use_wsl_paths = cached_python_runtime().needs_wsl;
|
|
}
|
|
// lang == "go": siempre nativo Windows.
|
|
#else
|
|
(void)lang;
|
|
#endif
|
|
|
|
std::string ops_db_out = use_wsl_paths ? to_wsl_path(ops_db_abs) : ops_db_abs;
|
|
std::string app_dir_out = use_wsl_paths ? to_wsl_path(app_dir_abs) : app_dir_abs;
|
|
std::string root_out = use_wsl_paths ? to_wsl_path(root_abs) : root_abs;
|
|
std::string cache_dir = app_dir_out + "/cache";
|
|
|
|
std::ostringstream o;
|
|
o << '{'
|
|
<< "\"job_id\":\"" << json_escape(job_id) << "\","
|
|
<< "\"enricher_id\":\"" << json_escape(enricher_id) << "\","
|
|
<< "\"node_id\":\"" << json_escape(node_id) << "\","
|
|
<< "\"node_type\":\"" << json_escape(node_type) << "\","
|
|
<< "\"node_name\":\"" << json_escape(node_name) << "\","
|
|
<< "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << ","
|
|
<< "\"params\":" << (params_json.empty() ? "{}" : params_json) << ","
|
|
<< "\"ops_db_path\":\"" << json_escape(ops_db_out) << "\","
|
|
<< "\"app_dir\":\"" << json_escape(app_dir_out) << "\","
|
|
<< "\"cache_dir\":\"" << json_escape(cache_dir) << "\","
|
|
<< "\"registry_root\":\"" << json_escape(root_out) << "\""
|
|
<< '}';
|
|
return o.str();
|
|
}
|
|
|
|
void update_progress(const std::string& job_id, double prog,
|
|
const std::string& stage)
|
|
{
|
|
if (!g_state) return;
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?";
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_double(st, 1, prog);
|
|
sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_step(st);
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Subprocess (POSIX y Win32)
|
|
// ----------------------------------------------------------------------------
|
|
|
|
struct ProcResult {
|
|
int exit_code = -1;
|
|
bool signaled = false;
|
|
int signal = 0;
|
|
std::string stdout_buf;
|
|
std::string stderr_tail;
|
|
};
|
|
|
|
// Parsea PROGRESS en una linea de stderr y, si aplica, actualiza la BD.
|
|
// stderr_tail crece con todo lo que NO sea PROGRESS, capado a 4 KB.
|
|
void process_stderr_line(const std::string& line,
|
|
const std::string& job_id,
|
|
std::string& stderr_tail,
|
|
std::mutex& tail_mu)
|
|
{
|
|
if (line.rfind("PROGRESS:", 0) == 0) {
|
|
const char* p = line.c_str() + 9;
|
|
char* endp = nullptr;
|
|
double prog = std::strtod(p, &endp);
|
|
std::string stage;
|
|
if (endp && *endp) {
|
|
while (*endp == ' ') ++endp;
|
|
stage = endp;
|
|
}
|
|
update_progress(job_id, prog, stage);
|
|
} else {
|
|
std::lock_guard<std::mutex> g(tail_mu);
|
|
stderr_tail += line;
|
|
stderr_tail += '\n';
|
|
if (stderr_tail.size() > 4096) {
|
|
stderr_tail.erase(0, stderr_tail.size() - 4096);
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef _WIN32
|
|
|
|
// Construye command line para wsl.exe que ejecuta el enricher dentro de WSL.
|
|
// Usa --cd para asegurar el cwd. Los paths que se le pasan ya estan en
|
|
// formato WSL (run_path_wsl). Cita argumentos con espacios.
|
|
std::wstring utf8_to_wide(const std::string& s) {
|
|
if (s.empty()) return {};
|
|
int n = MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(),
|
|
nullptr, 0);
|
|
std::wstring out(n, 0);
|
|
MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), out.data(), n);
|
|
return out;
|
|
}
|
|
|
|
ProcResult run_subprocess(const std::string& job_id,
|
|
const std::string& run_path,
|
|
const std::string& lang,
|
|
const std::string& stdin_payload,
|
|
std::shared_ptr<JobControl> ctrl)
|
|
{
|
|
ProcResult out;
|
|
|
|
SECURITY_ATTRIBUTES sa{};
|
|
sa.nLength = sizeof(sa);
|
|
sa.bInheritHandle = TRUE;
|
|
sa.lpSecurityDescriptor = nullptr;
|
|
|
|
HANDLE in_r = nullptr, in_w = nullptr;
|
|
HANDLE out_r = nullptr, out_w = nullptr;
|
|
HANDLE err_r = nullptr, err_w = nullptr;
|
|
|
|
auto cleanup = [&]() {
|
|
for (HANDLE* h : {&in_r, &in_w, &out_r, &out_w, &err_r, &err_w}) {
|
|
if (*h) { CloseHandle(*h); *h = nullptr; }
|
|
}
|
|
};
|
|
|
|
if (!CreatePipe(&in_r, &in_w, &sa, 0) ||
|
|
!CreatePipe(&out_r, &out_w, &sa, 0) ||
|
|
!CreatePipe(&err_r, &err_w, &sa, 0))
|
|
{
|
|
out.stderr_tail = "CreatePipe failed";
|
|
cleanup();
|
|
return out;
|
|
}
|
|
SetHandleInformation(in_w, HANDLE_FLAG_INHERIT, 0);
|
|
SetHandleInformation(out_r, HANDLE_FLAG_INHERIT, 0);
|
|
SetHandleInformation(err_r, HANDLE_FLAG_INHERIT, 0);
|
|
|
|
// Construir cmdline segun lang (issue 0033).
|
|
// - "go": ejecutar el .exe nativo directamente, sin wsl.exe.
|
|
// - "python": embedded (Windows nativo) si existe runtime/, si
|
|
// no fallback a wsl.exe + venv del registry.
|
|
// - "bash": wsl.exe --cd <root> -- bash <run.sh> (siempre)
|
|
std::wstring cmdline;
|
|
if (lang == "go") {
|
|
// run_path es el .exe Windows nativo. CreateProcessW lo lanza
|
|
// tal cual. No traducimos a WSL — corre fuera de WSL.
|
|
cmdline = L"\"";
|
|
cmdline += utf8_to_wide(run_path);
|
|
cmdline += L"\"";
|
|
} else if (lang == "bash") {
|
|
std::string run_wsl = to_wsl_path(run_path);
|
|
std::string root_wsl = to_wsl_path(g_state->registry_root);
|
|
cmdline = L"wsl.exe --cd ";
|
|
cmdline += utf8_to_wide(root_wsl);
|
|
cmdline += L" -- /bin/bash ";
|
|
cmdline += utf8_to_wide(run_wsl);
|
|
} else {
|
|
// python — fase B: usar embedded si esta disponible.
|
|
const PyRuntime& rt = cached_python_runtime();
|
|
if (rt.needs_wsl) {
|
|
// Legacy: registry venv vive en WSL.
|
|
std::string run_wsl = to_wsl_path(run_path);
|
|
std::string root_wsl = to_wsl_path(g_state->registry_root);
|
|
cmdline = L"wsl.exe --cd ";
|
|
cmdline += utf8_to_wide(root_wsl);
|
|
cmdline += L" -- ";
|
|
cmdline += utf8_to_wide(rt.path);
|
|
cmdline += L" ";
|
|
cmdline += utf8_to_wide(run_wsl);
|
|
} else {
|
|
// Embedded / FN_PYTHON / system — Python nativo Windows.
|
|
// run_path es Windows nativo, no necesita conversion.
|
|
cmdline = L"\"";
|
|
cmdline += utf8_to_wide(rt.path);
|
|
cmdline += L"\" \"";
|
|
cmdline += utf8_to_wide(run_path);
|
|
cmdline += L"\"";
|
|
}
|
|
}
|
|
|
|
std::vector<wchar_t> cmdbuf(cmdline.begin(), cmdline.end());
|
|
cmdbuf.push_back(0);
|
|
|
|
STARTUPINFOW si{};
|
|
si.cb = sizeof(si);
|
|
si.dwFlags = STARTF_USESTDHANDLES;
|
|
si.hStdInput = in_r;
|
|
si.hStdOutput = out_w;
|
|
si.hStdError = err_w;
|
|
|
|
PROCESS_INFORMATION pi{};
|
|
BOOL ok = CreateProcessW(
|
|
nullptr,
|
|
cmdbuf.data(),
|
|
nullptr, nullptr,
|
|
TRUE, // bInheritHandles
|
|
CREATE_NO_WINDOW, // sin ventana de consola
|
|
nullptr, nullptr,
|
|
&si, &pi);
|
|
|
|
if (!ok) {
|
|
DWORD err = GetLastError();
|
|
char buf[64];
|
|
std::snprintf(buf, sizeof(buf),
|
|
"CreateProcessW failed (err=%lu, wsl.exe missing?)",
|
|
(unsigned long)err);
|
|
out.stderr_tail = buf;
|
|
cleanup();
|
|
return out;
|
|
}
|
|
|
|
// Cerrar handles que pertenecen al child en este lado.
|
|
CloseHandle(in_r); in_r = nullptr;
|
|
CloseHandle(out_w); out_w = nullptr;
|
|
CloseHandle(err_w); err_w = nullptr;
|
|
|
|
ctrl->process = pi.hProcess;
|
|
ctrl->pid = pi.dwProcessId;
|
|
|
|
// Persistir pid en BD.
|
|
{
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) {
|
|
sqlite3_stmt* st = nullptr;
|
|
if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1,
|
|
&st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_int (st, 1, (int)pi.dwProcessId);
|
|
sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_step(st);
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
}
|
|
}
|
|
|
|
// Escribir stdin entero.
|
|
if (!stdin_payload.empty()) {
|
|
DWORD written = 0;
|
|
const char* p = stdin_payload.c_str();
|
|
DWORD left = (DWORD)stdin_payload.size();
|
|
while (left > 0) {
|
|
DWORD w = 0;
|
|
if (!WriteFile(in_w, p + written, left, &w, nullptr) || w == 0) break;
|
|
written += w;
|
|
left -= w;
|
|
}
|
|
}
|
|
CloseHandle(in_w); in_w = nullptr;
|
|
|
|
// Thread aux para stderr.
|
|
std::string stderr_tail_local;
|
|
std::mutex tail_mu;
|
|
std::thread err_t([&]() {
|
|
std::string line;
|
|
char ch;
|
|
while (true) {
|
|
DWORD n = 0;
|
|
if (!ReadFile(err_r, &ch, 1, &n, nullptr) || n == 0) break;
|
|
if (ch == '\n') {
|
|
process_stderr_line(line, job_id, stderr_tail_local, tail_mu);
|
|
line.clear();
|
|
} else if (ch != '\r') {
|
|
line.push_back(ch);
|
|
if (line.size() > 4096) line.clear();
|
|
}
|
|
}
|
|
if (!line.empty()) {
|
|
process_stderr_line(line, job_id, stderr_tail_local, tail_mu);
|
|
}
|
|
});
|
|
|
|
// Leer stdout entero.
|
|
{
|
|
char buf[4096];
|
|
while (true) {
|
|
DWORD n = 0;
|
|
if (!ReadFile(out_r, buf, sizeof(buf), &n, nullptr) || n == 0) break;
|
|
out.stdout_buf.append(buf, (size_t)n);
|
|
if (out.stdout_buf.size() > 1024 * 1024) break;
|
|
}
|
|
}
|
|
CloseHandle(out_r); out_r = nullptr;
|
|
|
|
// Esperar al child con polling para soportar cancelacion.
|
|
while (true) {
|
|
DWORD wr = WaitForSingleObject(pi.hProcess, 100);
|
|
if (wr == WAIT_OBJECT_0) break;
|
|
if (ctrl->cancel_requested.load()) {
|
|
TerminateProcess(pi.hProcess, 1);
|
|
WaitForSingleObject(pi.hProcess, 5000);
|
|
break;
|
|
}
|
|
}
|
|
|
|
DWORD exit_code = 0;
|
|
GetExitCodeProcess(pi.hProcess, &exit_code);
|
|
out.exit_code = (int)exit_code;
|
|
err_t.join();
|
|
|
|
CloseHandle(err_r);
|
|
CloseHandle(pi.hProcess);
|
|
CloseHandle(pi.hThread);
|
|
{
|
|
std::lock_guard<std::mutex> g(tail_mu);
|
|
out.stderr_tail = std::move(stderr_tail_local);
|
|
}
|
|
return out;
|
|
}
|
|
|
|
void kill_proc(JobControl& c) {
|
|
if (c.process) TerminateProcess(c.process, 1);
|
|
}
|
|
|
|
#else // =========================== POSIX =================================
|
|
|
|
ProcResult run_subprocess(const std::string& job_id,
|
|
const std::string& run_path,
|
|
const std::string& lang,
|
|
const std::string& stdin_payload,
|
|
std::shared_ptr<JobControl> ctrl)
|
|
{
|
|
ProcResult out;
|
|
|
|
int p_in[2] = {-1, -1};
|
|
int p_out[2] = {-1, -1};
|
|
int p_err[2] = {-1, -1};
|
|
if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) {
|
|
out.stderr_tail = "pipe() failed";
|
|
return out;
|
|
}
|
|
|
|
pid_t pid = fork();
|
|
if (pid < 0) {
|
|
out.stderr_tail = "fork() failed";
|
|
for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) {
|
|
if (fd >= 0) close(fd);
|
|
}
|
|
return out;
|
|
}
|
|
|
|
if (pid == 0) {
|
|
dup2(p_in[0], 0);
|
|
dup2(p_out[1], 1);
|
|
dup2(p_err[1], 2);
|
|
close(p_in[0]); close(p_in[1]);
|
|
close(p_out[0]); close(p_out[1]);
|
|
close(p_err[0]); close(p_err[1]);
|
|
|
|
// Bifurcacion por lang (issue 0033).
|
|
// - "go": execv directo del binario.
|
|
// - "bash": /bin/bash <run_path>.
|
|
// - "python": <registry_root>/python/.venv/bin/python3 <run_path>.
|
|
if (lang == "go") {
|
|
const char* argv[] = { run_path.c_str(), nullptr };
|
|
execv(run_path.c_str(), (char* const*)argv);
|
|
std::fprintf(stderr, "execv failed: %s\n", run_path.c_str());
|
|
_exit(127);
|
|
}
|
|
if (lang == "bash") {
|
|
const char* sh = "/bin/bash";
|
|
const char* argv[] = { sh, run_path.c_str(), nullptr };
|
|
execv(sh, (char* const*)argv);
|
|
std::fprintf(stderr, "execv bash failed\n");
|
|
_exit(127);
|
|
}
|
|
// Default: python — usa la cadena de fallback de fase B
|
|
// (embedded > FN_PYTHON > registry venv > system PATH).
|
|
const PyRuntime& rt = cached_python_runtime();
|
|
if (rt.kind == "system") {
|
|
// Lookup en PATH via execvp.
|
|
const char* argv[] = { rt.path.c_str(), run_path.c_str(), nullptr };
|
|
execvp(rt.path.c_str(), (char* const*)argv);
|
|
} else {
|
|
const char* argv[] = { rt.path.c_str(), run_path.c_str(), nullptr };
|
|
execv(rt.path.c_str(), (char* const*)argv);
|
|
}
|
|
std::fprintf(stderr, "execv failed: %s\n", rt.path.c_str());
|
|
_exit(127);
|
|
}
|
|
|
|
ctrl->pid = pid;
|
|
close(p_in[0]);
|
|
close(p_out[1]);
|
|
close(p_err[1]);
|
|
|
|
{
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) {
|
|
sqlite3_stmt* st = nullptr;
|
|
if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1,
|
|
&st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_int (st, 1, (int)pid);
|
|
sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_step(st);
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
}
|
|
}
|
|
|
|
if (!stdin_payload.empty()) {
|
|
ssize_t written = 0;
|
|
const char* p = stdin_payload.c_str();
|
|
size_t left = stdin_payload.size();
|
|
while (left > 0) {
|
|
ssize_t n = write(p_in[1], p + written, left);
|
|
if (n < 0) { if (errno == EINTR) continue; break; }
|
|
written += n; left -= (size_t)n;
|
|
}
|
|
}
|
|
close(p_in[1]);
|
|
|
|
std::string stderr_tail_local;
|
|
std::mutex tail_mu;
|
|
std::thread err_t([&]() {
|
|
std::string line;
|
|
char ch;
|
|
while (true) {
|
|
ssize_t n = read(p_err[0], &ch, 1);
|
|
if (n <= 0) break;
|
|
if (ch == '\n') {
|
|
process_stderr_line(line, job_id, stderr_tail_local, tail_mu);
|
|
line.clear();
|
|
} else {
|
|
line.push_back(ch);
|
|
if (line.size() > 4096) line.clear();
|
|
}
|
|
}
|
|
if (!line.empty()) {
|
|
process_stderr_line(line, job_id, stderr_tail_local, tail_mu);
|
|
}
|
|
});
|
|
|
|
{
|
|
char buf[4096];
|
|
while (true) {
|
|
ssize_t n = read(p_out[0], buf, sizeof(buf));
|
|
if (n <= 0) break;
|
|
out.stdout_buf.append(buf, (size_t)n);
|
|
if (out.stdout_buf.size() > 1024 * 1024) break;
|
|
}
|
|
}
|
|
close(p_out[0]);
|
|
|
|
int status = 0;
|
|
while (true) {
|
|
if (ctrl->cancel_requested.load() && pid > 0) {
|
|
kill(pid, SIGTERM);
|
|
for (int i = 0; i < 5; ++i) {
|
|
pid_t r = waitpid(pid, &status, WNOHANG);
|
|
if (r == pid) goto reaped;
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
}
|
|
kill(pid, SIGKILL);
|
|
}
|
|
pid_t r = waitpid(pid, &status, 0);
|
|
if (r == pid) break;
|
|
if (r < 0 && errno == EINTR) continue;
|
|
break;
|
|
}
|
|
reaped:
|
|
err_t.join();
|
|
close(p_err[0]);
|
|
|
|
if (WIFEXITED(status)) {
|
|
out.exit_code = WEXITSTATUS(status);
|
|
} else if (WIFSIGNALED(status)) {
|
|
out.signaled = true;
|
|
out.signal = WTERMSIG(status);
|
|
out.exit_code = -1;
|
|
}
|
|
{
|
|
std::lock_guard<std::mutex> g(tail_mu);
|
|
out.stderr_tail = std::move(stderr_tail_local);
|
|
}
|
|
return out;
|
|
}
|
|
|
|
void kill_proc(JobControl& c) {
|
|
if (c.pid > 0) kill(c.pid, SIGTERM);
|
|
}
|
|
|
|
#endif // _WIN32
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Worker común
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void persist_status(const std::string& job_id, const std::string& status,
|
|
const std::string& result_json,
|
|
const std::string& error,
|
|
bool set_finished)
|
|
{
|
|
if (!g_state) return;
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return;
|
|
}
|
|
if (set_finished) {
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? "
|
|
"WHERE id=?";
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_int64 (st, 4, now_ms());
|
|
sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_step(st);
|
|
}
|
|
sqlite3_finalize(st);
|
|
} else {
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?";
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_int64(st, 2, now_ms());
|
|
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_step(st);
|
|
}
|
|
sqlite3_finalize(st);
|
|
}
|
|
sqlite3_close(db);
|
|
}
|
|
|
|
struct JobContext {
|
|
std::string id, enricher_id, node_id, node_name, params_json, status;
|
|
};
|
|
|
|
bool load_job(const std::string& id, JobContext* out) {
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return false;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status "
|
|
"FROM jobs WHERE id=?";
|
|
bool ok = false;
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
|
|
if (sqlite3_step(st) == SQLITE_ROW) {
|
|
auto col = [&](int i) {
|
|
const unsigned char* t = sqlite3_column_text(st, i);
|
|
return std::string(t ? (const char*)t : "");
|
|
};
|
|
out->id = col(0);
|
|
out->enricher_id = col(1);
|
|
out->node_id = col(2);
|
|
out->node_name = col(3);
|
|
out->params_json = col(4);
|
|
out->status = col(5);
|
|
ok = true;
|
|
}
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
return ok;
|
|
}
|
|
|
|
void worker_loop() {
|
|
while (!g_state->stop_flag.load()) {
|
|
std::string job_id;
|
|
{
|
|
std::unique_lock<std::mutex> lk(g_state->q_mu);
|
|
g_state->q_cv.wait(lk, [] {
|
|
return g_state->stop_flag.load() || !g_state->pending.empty();
|
|
});
|
|
if (g_state->stop_flag.load()) return;
|
|
job_id = std::move(g_state->pending.front());
|
|
g_state->pending.pop();
|
|
}
|
|
|
|
JobContext ctx;
|
|
if (!load_job(job_id, &ctx)) continue;
|
|
if (ctx.status == "cancelled") continue;
|
|
|
|
// Resolver run_path y lang desde el registro de enrichers
|
|
// (issue 0033 — antes hardcodeaba run.py).
|
|
const ge::EnricherSpec* spec = ge::enricher_by_id(ctx.enricher_id.c_str());
|
|
if (!spec) {
|
|
persist_status(job_id, "failure", "",
|
|
"enricher no encontrado en el registro", false);
|
|
continue;
|
|
}
|
|
if (spec->disabled) {
|
|
std::string err = "enricher deshabilitado: " + spec->disabled_reason;
|
|
persist_status(job_id, "failure", "", err, false);
|
|
continue;
|
|
}
|
|
std::string run_path = spec->run_path;
|
|
std::string lang = spec->lang;
|
|
|
|
persist_status(job_id, "running", "", "", false);
|
|
|
|
auto ctrl = std::make_shared<JobControl>();
|
|
{
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
g_state->running[job_id] = ctrl;
|
|
}
|
|
|
|
std::string ops_db;
|
|
{
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
ops_db = g_state->ops_db_path;
|
|
}
|
|
std::string stdin_payload = build_stdin_json(
|
|
ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json,
|
|
ops_db, g_state->app_dir, g_state->registry_root, lang);
|
|
|
|
ProcResult res = run_subprocess(job_id, run_path, lang,
|
|
stdin_payload, ctrl);
|
|
|
|
std::string final_status, error;
|
|
std::string result_json = res.stdout_buf;
|
|
while (!result_json.empty() &&
|
|
(result_json.back() == '\n' || result_json.back() == '\r' ||
|
|
result_json.back() == ' ' || result_json.back() == '\t')) {
|
|
result_json.pop_back();
|
|
}
|
|
if (ctrl->cancel_requested.load()) {
|
|
final_status = "cancelled";
|
|
error = "user cancelled";
|
|
} else if (res.exit_code == 0) {
|
|
final_status = "done";
|
|
if (result_json.empty()) result_json = "{}";
|
|
} else {
|
|
final_status = "error";
|
|
char buf[64];
|
|
if (res.signaled) {
|
|
std::snprintf(buf, sizeof(buf), "signal %d", res.signal);
|
|
} else {
|
|
std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code);
|
|
}
|
|
error = std::string(buf);
|
|
if (!res.stderr_tail.empty()) {
|
|
error += "\n";
|
|
error += res.stderr_tail;
|
|
}
|
|
}
|
|
|
|
persist_status(job_id, final_status, result_json, error, true);
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
g_state->running.erase(job_id);
|
|
}
|
|
if (final_status == "done") {
|
|
g_state->dirty.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Public API
|
|
// ----------------------------------------------------------------------------
|
|
|
|
bool jobs_init(const char* app_db_path,
|
|
const char* ops_db_path,
|
|
const char* enrichers_dir,
|
|
const char* app_dir,
|
|
const char* registry_root,
|
|
int n_workers)
|
|
{
|
|
if (g_state) return true;
|
|
if (!app_db_path || !*app_db_path) return false;
|
|
if (n_workers < 1) n_workers = 1;
|
|
if (n_workers > 8) n_workers = 8;
|
|
|
|
if (!ensure_table(app_db_path)) return false;
|
|
|
|
g_state = new State();
|
|
g_state->app_db_path = app_db_path;
|
|
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
|
|
g_state->enrichers_dir = enrichers_dir ? enrichers_dir : "";
|
|
g_state->app_dir = app_dir ? app_dir : "";
|
|
g_state->registry_root = registry_root ? registry_root : "";
|
|
|
|
{
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY,
|
|
nullptr) == SQLITE_OK) {
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"SELECT id FROM jobs WHERE status='queued' ORDER BY created_at";
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
while (sqlite3_step(st) == SQLITE_ROW) {
|
|
const unsigned char* t = sqlite3_column_text(st, 0);
|
|
if (t) g_state->pending.push((const char*)t);
|
|
}
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
}
|
|
}
|
|
|
|
// Forzar resolucion del Python runtime al iniciar — asi el log
|
|
// sale en stdout una sola vez con la procedencia (embedded /
|
|
// env / registry_venv / system) y el usuario ve que se elegira.
|
|
(void)cached_python_runtime();
|
|
|
|
for (int i = 0; i < n_workers; ++i) {
|
|
g_state->workers.emplace_back(worker_loop);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void jobs_set_ops_db(const char* ops_db_path) {
|
|
if (!g_state) return;
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
|
|
}
|
|
|
|
bool jobs_submit(const char* enricher_id,
|
|
const char* node_id,
|
|
const char* node_name,
|
|
const char* params_json,
|
|
char* out_id, size_t out_id_n)
|
|
{
|
|
if (!g_state || !enricher_id || !*enricher_id) return false;
|
|
if (!out_id || out_id_n < 32) return false;
|
|
|
|
std::string id = ulid();
|
|
std::snprintf(out_id, out_id_n, "%s", id.c_str());
|
|
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return false;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, "
|
|
"status, progress, stage, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)";
|
|
bool ok = false;
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT);
|
|
sqlite3_bind_int64(st, 6, now_ms());
|
|
ok = sqlite3_step(st) == SQLITE_DONE;
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
if (!ok) return false;
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
g_state->pending.push(id);
|
|
}
|
|
g_state->q_cv.notify_one();
|
|
return true;
|
|
}
|
|
|
|
bool jobs_cancel(const char* job_id) {
|
|
if (!g_state || !job_id) return false;
|
|
std::shared_ptr<JobControl> ctrl;
|
|
{
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
auto it = g_state->running.find(job_id);
|
|
if (it != g_state->running.end()) ctrl = it->second;
|
|
}
|
|
if (ctrl) {
|
|
ctrl->cancel_requested.store(true);
|
|
kill_proc(*ctrl);
|
|
return true;
|
|
}
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return false;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"UPDATE jobs SET status='cancelled', finished_at=?, "
|
|
"error='cancelled before start' WHERE id=? AND status='queued'";
|
|
bool ok = false;
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_int64(st, 1, now_ms());
|
|
sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT);
|
|
ok = sqlite3_step(st) == SQLITE_DONE;
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
return ok;
|
|
}
|
|
|
|
bool jobs_delete(const char* job_id) {
|
|
if (!g_state || !job_id) return false;
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return false;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')";
|
|
bool ok = false;
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT);
|
|
ok = sqlite3_step(st) == SQLITE_DONE;
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
return ok;
|
|
}
|
|
|
|
bool jobs_list(std::vector<JobRow>* out, int limit) {
|
|
if (!g_state || !out) return false;
|
|
out->clear();
|
|
if (limit < 1) limit = 1;
|
|
if (limit > 1000) limit = 1000;
|
|
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return false;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql =
|
|
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, "
|
|
"progress, stage, COALESCE(error,''), COALESCE(result_json,''), "
|
|
"created_at, COALESCE(started_at,0), COALESCE(finished_at,0) "
|
|
"FROM jobs ORDER BY created_at DESC LIMIT ?";
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
sqlite3_bind_int(st, 1, limit);
|
|
while (sqlite3_step(st) == SQLITE_ROW) {
|
|
JobRow r;
|
|
auto col = [&](int i) {
|
|
const unsigned char* t = sqlite3_column_text(st, i);
|
|
return std::string(t ? (const char*)t : "");
|
|
};
|
|
r.id = col(0);
|
|
r.enricher_id = col(1);
|
|
r.node_id = col(2);
|
|
r.node_name = col(3);
|
|
r.status = col(4);
|
|
r.progress = sqlite3_column_double(st, 5);
|
|
r.stage = col(6);
|
|
r.error = col(7);
|
|
r.result_json = col(8);
|
|
r.created_at = sqlite3_column_int64(st, 9);
|
|
r.started_at = sqlite3_column_int64(st, 10);
|
|
r.finished_at = sqlite3_column_int64(st, 11);
|
|
out->push_back(std::move(r));
|
|
}
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
return true;
|
|
}
|
|
|
|
JobCounters jobs_counters() {
|
|
JobCounters c{};
|
|
if (!g_state) return c;
|
|
sqlite3* db = nullptr;
|
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
|
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
|
if (db) sqlite3_close(db);
|
|
return c;
|
|
}
|
|
sqlite3_stmt* st = nullptr;
|
|
const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status";
|
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
|
while (sqlite3_step(st) == SQLITE_ROW) {
|
|
const unsigned char* s = sqlite3_column_text(st, 0);
|
|
int n = sqlite3_column_int(st, 1);
|
|
if (!s) continue;
|
|
std::string status((const char*)s);
|
|
if (status == "queued") c.queued = n;
|
|
else if (status == "running") c.running = n;
|
|
else if (status == "done") c.done = n;
|
|
else if (status == "error") c.error = n;
|
|
else if (status == "cancelled") c.cancelled = n;
|
|
}
|
|
}
|
|
sqlite3_finalize(st);
|
|
sqlite3_close(db);
|
|
return c;
|
|
}
|
|
|
|
int jobs_dirty_counter() {
|
|
if (!g_state) return 0;
|
|
return g_state->dirty.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void jobs_shutdown() {
|
|
if (!g_state) return;
|
|
g_state->stop_flag.store(true);
|
|
{
|
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
|
for (auto& kv : g_state->running) {
|
|
kv.second->cancel_requested.store(true);
|
|
kill_proc(*kv.second);
|
|
}
|
|
}
|
|
g_state->q_cv.notify_all();
|
|
for (auto& t : g_state->workers) {
|
|
if (t.joinable()) t.join();
|
|
}
|
|
delete g_state;
|
|
g_state = nullptr;
|
|
}
|
|
|
|
} // namespace ge
|