Files
graph_explorer/jobs.cpp
T
egutierrez a7c227354b fix(jobs): stub Windows para que la build cross-compile (issue 0026)
El sistema de jobs usa fork+exec+pipes POSIX que no existen en MinGW.
Anade un stub _WIN32 que devuelve false en jobs_init y no-op en el resto,
de forma que la app compila para Windows pero los enrichers quedan
desactivados ahi. La build Linux/WSL conserva la implementacion completa.

TODO futuro: implementacion Windows con CreateProcess + anonymous pipes
+ TerminateProcess. No urgente — el desarrollo principal es WSL.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 18:42:46 +02:00

910 lines
31 KiB
C++

#include "jobs.h"
#ifdef _WIN32
// ----------------------------------------------------------------------------
// Windows stub (issue 0026): la implementacion real usa fork+exec+pipes POSIX.
// La version Windows debe escribirse con CreateProcess + anonymous pipes +
// ReadFile/WriteFile + TerminateProcess. Por ahora, en Windows el panel Jobs
// queda inactivo — el resto de la app funciona normal. TODO: implementar
// con la API Win32.
// ----------------------------------------------------------------------------
#include <cstdio>
namespace ge {
bool jobs_init(const char*, const char*, const char*, const char*, const char*, int) {
std::fprintf(stderr,
"[jobs] Windows stub: enrichers no disponibles en esta build "
"(usa la build Linux/WSL para correr enrichers).\n");
return false;
}
void jobs_set_ops_db(const char*) {}
bool jobs_submit(const char*, const char*, const char*, const char*,
char* out_id, size_t out_id_n)
{
if (out_id && out_id_n > 0) out_id[0] = '\0';
return false;
}
bool jobs_cancel(const char*) { return false; }
bool jobs_delete(const char*) { return false; }
bool jobs_list (std::vector<JobRow>* out, int) {
if (out) out->clear();
return true;
}
JobCounters jobs_counters() { return JobCounters{}; }
int jobs_dirty_counter() { return 0; }
void jobs_shutdown() {}
} // namespace ge
#else
// ----------------------------------------------------------------------------
// POSIX (Linux/WSL/macOS): implementacion real con fork+exec+pipes.
// ----------------------------------------------------------------------------
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdarg>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <errno.h>
#include <fcntl.h>
#include <mutex>
#include <queue>
#include <signal.h>
#include <sstream>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <thread>
#include <unistd.h>
#include <unordered_map>
#include <vector>
namespace ge {
// ----------------------------------------------------------------------------
// Internal state
// ----------------------------------------------------------------------------
namespace {
struct JobControl {
pid_t pid = -1;
std::atomic<bool> cancel_requested{false};
};
struct State {
std::string app_db_path;
std::string ops_db_path; // mutable: cambia con jobs_set_ops_db
std::string enrichers_dir;
std::string app_dir;
std::string registry_root;
std::mutex q_mu;
std::condition_variable q_cv;
std::queue<std::string> pending; // job ids
std::unordered_map<std::string, std::shared_ptr<JobControl>> running;
std::vector<std::thread> workers;
std::atomic<bool> stop_flag{false};
std::atomic<int> dirty{0};
};
State* g_state = nullptr;
// ---- helpers --------------------------------------------------------------
long long now_ms() {
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
std::string ulid() {
// ULID-ish: timestamp ms + 10 random hex chars. Suficiente para un PK.
long long ts = now_ms();
static std::atomic<uint32_t> ctr{(uint32_t)(ts & 0xFFFFFFFF)};
uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed);
char buf[64];
std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd);
return buf;
}
bool sql_exec_simple(sqlite3* db, const char* sql) {
char* err = nullptr;
int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err);
if (rc != SQLITE_OK) {
std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n",
err ? err : "?", sql);
if (err) sqlite3_free(err);
return false;
}
return true;
}
bool sql_run(sqlite3* db, const char* sql,
const std::vector<std::string>& params)
{
sqlite3_stmt* st = nullptr;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) {
std::fprintf(stderr, "[jobs] prepare failed: %s :: %s\n",
sqlite3_errmsg(db), sql);
return false;
}
for (size_t i = 0; i < params.size(); ++i) {
sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1,
SQLITE_TRANSIENT);
}
int rc = sqlite3_step(st);
sqlite3_finalize(st);
return rc == SQLITE_DONE;
}
bool ensure_table(const char* db_path) {
sqlite3* db = nullptr;
if (sqlite3_open_v2(db_path, &db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sql_exec_simple(db, "PRAGMA journal_mode=WAL;");
bool ok = sql_exec_simple(db,
"CREATE TABLE IF NOT EXISTS jobs ("
" id TEXT PRIMARY KEY,"
" enricher_id TEXT NOT NULL,"
" node_id TEXT,"
" node_name TEXT NOT NULL DEFAULT '',"
" params_json TEXT NOT NULL DEFAULT '{}',"
" status TEXT NOT NULL,"
" progress REAL NOT NULL DEFAULT 0,"
" stage TEXT NOT NULL DEFAULT '',"
" result_json TEXT,"
" error TEXT,"
" pid INTEGER,"
" created_at INTEGER NOT NULL,"
" started_at INTEGER,"
" finished_at INTEGER"
");"
);
sql_exec_simple(db,
"CREATE INDEX IF NOT EXISTS idx_jobs_status "
"ON jobs(status, created_at);");
// Reaper: jobs `running` huerfanos de una sesion anterior.
char ts[32];
std::snprintf(ts, sizeof(ts), "%lld", now_ms());
sql_run(db,
"UPDATE jobs SET status='error', error='process died (app restart)', "
"finished_at=? WHERE status='running'",
{ts});
sqlite3_close(db);
return ok;
}
// Escapa string para JSON. Simplificado: maneja comillas, backslash y
// caracteres de control basicos.
std::string json_escape(const std::string& s) {
std::string out;
out.reserve(s.size() + 8);
for (char c : s) {
switch (c) {
case '"': out += "\\\""; break;
case '\\': out += "\\\\"; break;
case '\b': out += "\\b"; break;
case '\f': out += "\\f"; break;
case '\n': out += "\\n"; break;
case '\r': out += "\\r"; break;
case '\t': out += "\\t"; break;
default:
if ((unsigned char)c < 0x20) {
char buf[8];
std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c);
out += buf;
} else {
out += c;
}
}
}
return out;
}
// Lee un campo de la entidad como string. Devuelve "" si no existe.
std::string read_entity_field(const char* db_path, const char* id,
const char* col)
{
sqlite3* db = nullptr;
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr)
!= SQLITE_OK) {
if (db) sqlite3_close(db);
return "";
}
std::string sql = std::string("SELECT ") + col +
" FROM entities WHERE id = ? LIMIT 1";
sqlite3_stmt* st = nullptr;
std::string out;
if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT);
if (sqlite3_step(st) == SQLITE_ROW) {
const unsigned char* t = sqlite3_column_text(st, 0);
if (t) out = (const char*)t;
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return out;
}
// Construye el JSON que se entrega al subprocess via stdin. Lee node de la
// operations.db actual.
std::string build_stdin_json(const std::string& job_id,
const std::string& enricher_id,
const std::string& node_id,
const std::string& params_json,
const std::string& ops_db,
const std::string& app_dir,
const std::string& registry_root)
{
std::string node_type, node_name, node_metadata = "{}";
if (!node_id.empty()) {
node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref");
node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name");
std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata");
if (!m.empty()) node_metadata = m;
}
std::string cache_dir = app_dir + "/cache";
std::ostringstream o;
o << '{'
<< "\"job_id\":\"" << json_escape(job_id) << "\","
<< "\"enricher_id\":\""<< json_escape(enricher_id) << "\","
<< "\"node_id\":\"" << json_escape(node_id) << "\","
<< "\"node_type\":\"" << json_escape(node_type) << "\","
<< "\"node_name\":\"" << json_escape(node_name) << "\","
<< "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << ","
<< "\"params\":" << (params_json.empty() ? "{}" : params_json) << ","
<< "\"ops_db_path\":\""<< json_escape(ops_db) << "\","
<< "\"app_dir\":\"" << json_escape(app_dir) << "\","
<< "\"cache_dir\":\"" << json_escape(cache_dir) << "\","
<< "\"registry_root\":\"" << json_escape(registry_root) << "\""
<< '}';
return o.str();
}
// ---- subprocess (POSIX) ---------------------------------------------------
struct ProcResult {
int exit_code = -1;
bool signaled = false;
int signal = 0;
std::string stdout_buf;
std::string stderr_tail; // ultimas lineas, para mensajes de error
};
void update_progress(const std::string& job_id, double prog,
const std::string& stage)
{
if (!g_state) return;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return;
}
sqlite3_stmt* st = nullptr;
const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_double(st, 1, prog);
sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
sqlite3_close(db);
}
// Spawnea python3 run.py. Pipes para stdin (write), stdout (read),
// stderr (read). Lee stdout entero al final; lee stderr line-by-line en un
// thread auxiliar parseando "PROGRESS:<float> <stage>".
ProcResult run_subprocess(const std::string& job_id,
const std::string& run_path,
const std::string& stdin_payload,
std::shared_ptr<JobControl> ctrl)
{
ProcResult out;
int p_in[2] = {-1, -1}; // padre escribe en p_in[1], hijo lee p_in[0]
int p_out[2] = {-1, -1};
int p_err[2] = {-1, -1};
if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) {
out.stderr_tail = "pipe() failed";
return out;
}
pid_t pid = fork();
if (pid < 0) {
out.stderr_tail = "fork() failed";
for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) {
if (fd >= 0) close(fd);
}
return out;
}
if (pid == 0) {
// child
dup2(p_in[0], 0);
dup2(p_out[1], 1);
dup2(p_err[1], 2);
close(p_in[0]); close(p_in[1]);
close(p_out[0]); close(p_out[1]);
close(p_err[0]); close(p_err[1]);
// Resolver intérprete: <registry_root>/python/.venv/bin/python3
std::string py = g_state->registry_root + "/python/.venv/bin/python3";
const char* argv[] = { py.c_str(), run_path.c_str(), nullptr };
execv(py.c_str(), (char* const*)argv);
std::fprintf(stderr, "execv failed: %s\n", py.c_str());
_exit(127);
}
// parent
ctrl->pid = pid;
close(p_in[0]);
close(p_out[1]);
close(p_err[1]);
// Persistir pid en BD para mostrarlo en UI.
{
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) {
sqlite3_stmt* st = nullptr;
if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1,
&st, nullptr) == SQLITE_OK) {
sqlite3_bind_int (st, 1, (int)pid);
sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
sqlite3_close(db);
}
}
// Escribir stdin entero.
if (!stdin_payload.empty()) {
ssize_t written = 0;
const char* p = stdin_payload.c_str();
size_t left = stdin_payload.size();
while (left > 0) {
ssize_t n = write(p_in[1], p + written, left);
if (n < 0) { if (errno == EINTR) continue; break; }
written += n; left -= (size_t)n;
}
}
close(p_in[1]);
// Thread aux para stderr: parsea PROGRESS y guarda tail.
std::string stderr_tail_local;
std::mutex tail_mu;
std::thread err_t([&]() {
std::string line;
char ch;
while (true) {
ssize_t n = read(p_err[0], &ch, 1);
if (n <= 0) break;
if (ch == '\n') {
// Parse line.
if (line.rfind("PROGRESS:", 0) == 0) {
// PROGRESS:<float> <stage...>
const char* p = line.c_str() + 9;
char* endp = nullptr;
double prog = std::strtod(p, &endp);
std::string stage;
if (endp && *endp) {
while (*endp == ' ') ++endp;
stage = endp;
}
update_progress(job_id, prog, stage);
} else {
std::lock_guard<std::mutex> g(tail_mu);
stderr_tail_local += line;
stderr_tail_local += '\n';
// Cap a ~4 KB.
if (stderr_tail_local.size() > 4096) {
stderr_tail_local.erase(0, stderr_tail_local.size() - 4096);
}
}
line.clear();
} else {
line.push_back(ch);
if (line.size() > 4096) line.clear(); // proteccion
}
}
});
// Leer stdout entero (sincrono).
{
char buf[4096];
while (true) {
ssize_t n = read(p_out[0], buf, sizeof(buf));
if (n <= 0) break;
out.stdout_buf.append(buf, (size_t)n);
if (out.stdout_buf.size() > 1024 * 1024) {
// 1 MB cap.
break;
}
}
}
close(p_out[0]);
// Esperar al hijo. Si se pidio cancelar, mandamos SIGTERM y SIGKILL.
int status = 0;
while (true) {
if (ctrl->cancel_requested.load() && pid > 0) {
kill(pid, SIGTERM);
// pequena gracia, luego SIGKILL si hace falta
for (int i = 0; i < 5; ++i) {
pid_t r = waitpid(pid, &status, WNOHANG);
if (r == pid) goto reaped;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
kill(pid, SIGKILL);
}
pid_t r = waitpid(pid, &status, 0);
if (r == pid) break;
if (r < 0 && errno == EINTR) continue;
break;
}
reaped:
err_t.join();
close(p_err[0]);
if (WIFEXITED(status)) {
out.exit_code = WEXITSTATUS(status);
} else if (WIFSIGNALED(status)) {
out.signaled = true;
out.signal = WTERMSIG(status);
out.exit_code = -1;
}
{
std::lock_guard<std::mutex> g(tail_mu);
out.stderr_tail = std::move(stderr_tail_local);
}
return out;
}
// ---- worker ---------------------------------------------------------------
void persist_status(const std::string& job_id, const std::string& status,
const std::string& result_json,
const std::string& error,
bool set_finished)
{
if (!g_state) return;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return;
}
if (set_finished) {
sqlite3_stmt* st = nullptr;
const char* sql =
"UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? "
"WHERE id=?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int64 (st, 4, now_ms());
sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
} else {
sqlite3_stmt* st = nullptr;
const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(st, 2, now_ms());
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
}
sqlite3_close(db);
}
// Lee la fila para reconstruir el contexto del job antes de spawn.
struct JobContext {
std::string id, enricher_id, node_id, node_name, params_json, status;
};
bool load_job(const std::string& id, JobContext* out) {
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status "
"FROM jobs WHERE id=?";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
if (sqlite3_step(st) == SQLITE_ROW) {
auto col = [&](int i) {
const unsigned char* t = sqlite3_column_text(st, i);
return std::string(t ? (const char*)t : "");
};
out->id = col(0);
out->enricher_id = col(1);
out->node_id = col(2);
out->node_name = col(3);
out->params_json = col(4);
out->status = col(5);
ok = true;
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return ok;
}
void worker_loop() {
while (!g_state->stop_flag.load()) {
std::string job_id;
{
std::unique_lock<std::mutex> lk(g_state->q_mu);
g_state->q_cv.wait(lk, [] {
return g_state->stop_flag.load() || !g_state->pending.empty();
});
if (g_state->stop_flag.load()) return;
job_id = std::move(g_state->pending.front());
g_state->pending.pop();
}
JobContext ctx;
if (!load_job(job_id, &ctx)) continue;
if (ctx.status == "cancelled") continue;
// Resolver run.py por convencion: <enrichers_dir>/<enricher_id>/run.py.
std::string run_path = g_state->enrichers_dir + "/" + ctx.enricher_id +
"/run.py";
// Marcar running.
persist_status(job_id, "running", "", "", false);
auto ctrl = std::make_shared<JobControl>();
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->running[job_id] = ctrl;
}
// Construir stdin y ejecutar.
std::string ops_db;
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
ops_db = g_state->ops_db_path;
}
std::string stdin_payload = build_stdin_json(
ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json,
ops_db, g_state->app_dir, g_state->registry_root);
ProcResult res = run_subprocess(job_id, run_path, stdin_payload, ctrl);
// Estado final.
std::string final_status, error;
std::string result_json = res.stdout_buf;
// Trim del result_json (saca trailing whitespace).
while (!result_json.empty() &&
(result_json.back() == '\n' || result_json.back() == '\r' ||
result_json.back() == ' ' || result_json.back() == '\t')) {
result_json.pop_back();
}
if (ctrl->cancel_requested.load()) {
final_status = "cancelled";
error = "user cancelled";
} else if (res.exit_code == 0) {
final_status = "done";
if (result_json.empty()) result_json = "{}";
} else {
final_status = "error";
char buf[64];
if (res.signaled) {
std::snprintf(buf, sizeof(buf), "signal %d", res.signal);
} else {
std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code);
}
error = std::string(buf);
if (!res.stderr_tail.empty()) {
error += "\n";
error += res.stderr_tail;
}
}
persist_status(job_id, final_status, result_json, error, true);
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->running.erase(job_id);
}
if (final_status == "done") {
g_state->dirty.fetch_add(1, std::memory_order_relaxed);
}
}
}
} // namespace
// ----------------------------------------------------------------------------
// Public API
// ----------------------------------------------------------------------------
bool jobs_init(const char* app_db_path,
const char* ops_db_path,
const char* enrichers_dir,
const char* app_dir,
const char* registry_root,
int n_workers)
{
if (g_state) return true;
if (!app_db_path || !*app_db_path) return false;
if (n_workers < 1) n_workers = 1;
if (n_workers > 8) n_workers = 8;
if (!ensure_table(app_db_path)) return false;
g_state = new State();
g_state->app_db_path = app_db_path;
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
g_state->enrichers_dir = enrichers_dir ? enrichers_dir : "";
g_state->app_dir = app_dir ? app_dir : "";
g_state->registry_root = registry_root ? registry_root : "";
// Rehidratacion: jobs queued de sesiones anteriores se reencolan.
{
sqlite3* db = nullptr;
if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY,
nullptr) == SQLITE_OK) {
sqlite3_stmt* st = nullptr;
const char* sql =
"SELECT id FROM jobs WHERE status='queued' ORDER BY created_at";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
while (sqlite3_step(st) == SQLITE_ROW) {
const unsigned char* t = sqlite3_column_text(st, 0);
if (t) g_state->pending.push((const char*)t);
}
}
sqlite3_finalize(st);
sqlite3_close(db);
}
}
for (int i = 0; i < n_workers; ++i) {
g_state->workers.emplace_back(worker_loop);
}
return true;
}
void jobs_set_ops_db(const char* ops_db_path) {
if (!g_state) return;
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
}
bool jobs_submit(const char* enricher_id,
const char* node_id,
const char* node_name,
const char* params_json,
char* out_id, size_t out_id_n)
{
if (!g_state || !enricher_id || !*enricher_id) return false;
if (!out_id || out_id_n < 32) return false;
std::string id = ulid();
std::snprintf(out_id, out_id_n, "%s", id.c_str());
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, "
"status, progress, stage, created_at) "
"VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(st, 6, now_ms());
ok = sqlite3_step(st) == SQLITE_DONE;
}
sqlite3_finalize(st);
sqlite3_close(db);
if (!ok) return false;
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->pending.push(id);
}
g_state->q_cv.notify_one();
return true;
}
bool jobs_cancel(const char* job_id) {
if (!g_state || !job_id) return false;
std::shared_ptr<JobControl> ctrl;
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
auto it = g_state->running.find(job_id);
if (it != g_state->running.end()) ctrl = it->second;
}
if (ctrl) {
ctrl->cancel_requested.store(true);
if (ctrl->pid > 0) kill(ctrl->pid, SIGTERM);
return true;
}
// No corriendo: marcar cancelled si esta queued.
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"UPDATE jobs SET status='cancelled', finished_at=?, "
"error='cancelled before start' WHERE id=? AND status='queued'";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_int64(st, 1, now_ms());
sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT);
ok = sqlite3_step(st) == SQLITE_DONE;
}
sqlite3_finalize(st);
sqlite3_close(db);
return ok;
}
bool jobs_delete(const char* job_id) {
if (!g_state || !job_id) return false;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT);
ok = sqlite3_step(st) == SQLITE_DONE;
}
sqlite3_finalize(st);
sqlite3_close(db);
return ok;
}
bool jobs_list(std::vector<JobRow>* out, int limit) {
if (!g_state || !out) return false;
out->clear();
if (limit < 1) limit = 1;
if (limit > 1000) limit = 1000;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, "
"progress, stage, COALESCE(error,''), COALESCE(result_json,''), "
"created_at, COALESCE(started_at,0), COALESCE(finished_at,0) "
"FROM jobs ORDER BY created_at DESC LIMIT ?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_int(st, 1, limit);
while (sqlite3_step(st) == SQLITE_ROW) {
JobRow r;
auto col = [&](int i) {
const unsigned char* t = sqlite3_column_text(st, i);
return std::string(t ? (const char*)t : "");
};
r.id = col(0);
r.enricher_id = col(1);
r.node_id = col(2);
r.node_name = col(3);
r.status = col(4);
r.progress = sqlite3_column_double(st, 5);
r.stage = col(6);
r.error = col(7);
r.result_json = col(8);
r.created_at = sqlite3_column_int64(st, 9);
r.started_at = sqlite3_column_int64(st, 10);
r.finished_at = sqlite3_column_int64(st, 11);
out->push_back(std::move(r));
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return true;
}
JobCounters jobs_counters() {
JobCounters c{};
if (!g_state) return c;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return c;
}
sqlite3_stmt* st = nullptr;
const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
while (sqlite3_step(st) == SQLITE_ROW) {
const unsigned char* s = sqlite3_column_text(st, 0);
int n = sqlite3_column_int(st, 1);
if (!s) continue;
std::string status((const char*)s);
if (status == "queued") c.queued = n;
else if (status == "running") c.running = n;
else if (status == "done") c.done = n;
else if (status == "error") c.error = n;
else if (status == "cancelled") c.cancelled = n;
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return c;
}
int jobs_dirty_counter() {
if (!g_state) return 0;
return g_state->dirty.load(std::memory_order_relaxed);
}
void jobs_shutdown() {
if (!g_state) return;
g_state->stop_flag.store(true);
// Cancelar todos los running.
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
for (auto& kv : g_state->running) {
kv.second->cancel_requested.store(true);
if (kv.second->pid > 0) kill(kv.second->pid, SIGTERM);
}
}
g_state->q_cv.notify_all();
for (auto& t : g_state->workers) {
if (t.joinable()) t.join();
}
delete g_state;
g_state = nullptr;
}
} // namespace ge
#endif // _WIN32