From 6df04652d88c8daa006c4eb6e5c06d737dc21b49 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:24:37 +0200 Subject: [PATCH] feat(jobs): sistema de jobs asincronos + panel UI (issue 0026) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Infra para correr enrichers en background mientras la app sigue interactiva. C++: - jobs.{h,cpp}: tabla jobs en graph_explorer.db, JobRunner con N=2 std::thread workers, fork+exec POSIX con pipes, parser de PROGRESS: en stderr, captura de stdout JSON, persistencia + dirty_counter. - enrichers.{h,cpp}: scanner de enrichers//manifest.yaml, parser YAML minimo (id/name/description/applies_to), filtro por tipo de nodo. - views_jobs.cpp: panel "Jobs" dockeable con tabla (status/enricher/target/ progress/time), filtro all/active/done/errors, cancelar/borrar inline. Wiring: - main.cpp: resolve_registry_root() (FN_REGISTRY_ROOT env o subir desde cwd buscando registry.db), jobs_init/enrichers_load antes de fn::run_app, jobs_shutdown al cerrar, dirty_counter -> want_reload, jobs_set_ops_db al cambiar de proyecto. - main.cpp:render_context_menu: menu "Run enricher" sustituye placeholder con submenu filtrado por type_ref via enrichers_for_type. Submit abre panel Jobs auto. - views.h: AppState::panel_jobs flag + decl views_jobs(). - CMakeLists.txt: anade jobs.cpp + enrichers.cpp + views_jobs.cpp y enlaza Threads::Threads. Wire protocol enricher (subprocess Python): - stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir, registry_root, params. - stderr: PROGRESS: + LOG lineas libres. - stdout: JSON resumen al final. - exit 0 = ok, !=0 = error con stderr capturado en panel Jobs. El run.py escribe directamente al operations.db (sqlite3 stdlib) — C++ solo orquesta, no parsea entities/relations. Co-Authored-By: Claude Opus 4.7 (1M context) --- CMakeLists.txt | 7 + enrichers.cpp | 172 ++++++++++ enrichers.h | 40 +++ jobs.cpp | 862 +++++++++++++++++++++++++++++++++++++++++++++++++ jobs.h | 97 ++++++ main.cpp | 106 +++++- views.h | 9 + views_jobs.cpp | 199 ++++++++++++ 8 files changed, 1491 insertions(+), 1 deletion(-) create mode 100644 enrichers.cpp create mode 100644 enrichers.h create mode 100644 jobs.cpp create mode 100644 jobs.h create mode 100644 views_jobs.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9be7863..a3f1dfd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,11 +18,14 @@ add_imgui_app(graph_explorer main.cpp data.cpp views.cpp + views_jobs.cpp types_registry.cpp layout_store.cpp entity_ops.cpp project_manager.cpp tableview.cpp + jobs.cpp + enrichers.cpp # --- viz --- ${FN_CPP_ROOT_DIR}/functions/viz/graph_renderer.cpp ${FN_CPP_ROOT_DIR}/functions/viz/graph_force_layout.cpp @@ -58,6 +61,10 @@ target_include_directories(graph_explorer PRIVATE target_link_libraries(graph_explorer PRIVATE SQLite::SQLite3 DuckDB::DuckDB) duckdb_copy_runtime(graph_explorer) +# Threads — issue 0026 (jobs system) usa std::thread + std::mutex + condvar. +find_package(Threads REQUIRED) +target_link_libraries(graph_explorer PRIVATE Threads::Threads) + # OpenGL: graph_renderer + graph_force_layout_gpu llaman gl* directamente. # fn::run_app inicializa el loader cuando AppConfig::init_gl_loader = true. if(NOT WIN32) diff --git a/enrichers.cpp b/enrichers.cpp new file mode 100644 index 0000000..e55d9a8 --- /dev/null +++ b/enrichers.cpp @@ -0,0 +1,172 @@ +#include "enrichers.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ge { + +namespace { + +std::vector g_enrichers; + +std::string strip(const std::string& s) { + size_t a = 0, b = s.size(); + while (a < b && std::isspace((unsigned char)s[a])) ++a; + while (b > a && std::isspace((unsigned char)s[b - 1])) --b; + return s.substr(a, b - a); +} + +std::string strip_quotes(const std::string& s) { + if (s.size() >= 2) { + if ((s.front() == '"' && s.back() == '"') || + (s.front() == '\'' && s.back() == '\'')) { + return s.substr(1, s.size() - 2); + } + } + return s; +} + +std::string lower(std::string s) { + for (auto& c : s) c = (char)std::tolower((unsigned char)c); + return s; +} + +// Parsea una lista inline `[a, b, c]` o "[Webpage, Url]". Tolerante a +// espacios y a comillas simples/dobles dentro. NO soporta listas +// multi-linea — el manifest las usa siempre inline. +std::vector parse_inline_list(const std::string& v) { + std::vector out; + std::string s = strip(v); + if (s.size() < 2 || s.front() != '[' || s.back() != ']') return out; + s = s.substr(1, s.size() - 2); + std::string token; + auto flush = [&]() { + std::string t = strip_quotes(strip(token)); + if (!t.empty()) out.push_back(std::move(t)); + token.clear(); + }; + for (char c : s) { + if (c == ',') flush(); + else token.push_back(c); + } + flush(); + return out; +} + +// Manifest YAML soportado (subset): +// id: fetch_webpage +// name: "Fetch web page" +// description: "..." +// applies_to: [Webpage, Url] +// params: <- v1 ignora bloque +// - { name: timeout_s, ... } +// +// Las claves anidadas bajo `params:` se ignoran (saltamos lineas indentadas). +bool parse_manifest(const std::string& path, EnricherSpec* out) { + std::ifstream f(path); + if (!f) return false; + std::string line; + bool in_skip_block = false; + while (std::getline(f, line)) { + // Strip CR de Windows. + if (!line.empty() && line.back() == '\r') line.pop_back(); + + // Linea blanca o comentario. + std::string trim = strip(line); + if (trim.empty() || trim.front() == '#') continue; + + // Si la linea NO empieza con whitespace, salimos del bloque skip. + bool indented = !line.empty() && std::isspace((unsigned char)line.front()); + if (!indented) in_skip_block = false; + if (in_skip_block) continue; + + size_t colon = trim.find(':'); + if (colon == std::string::npos) continue; + + std::string key = strip(trim.substr(0, colon)); + std::string val = strip(trim.substr(colon + 1)); + + if (key == "id") out->id = strip_quotes(val); + else if (key == "name") out->name = strip_quotes(val); + else if (key == "description") out->description = strip_quotes(val); + else if (key == "applies_to") out->applies_to = parse_inline_list(val); + else if (key == "params" && val.empty()) in_skip_block = true; + // emits/relations los ignoramos en v1 (solo informativos). + } + return !out->id.empty(); +} + +} // namespace + +int enrichers_load(const char* enrichers_dir) { + g_enrichers.clear(); + if (!enrichers_dir || !*enrichers_dir) return -1; + + DIR* d = opendir(enrichers_dir); + if (!d) return -1; + + struct dirent* ent; + while ((ent = readdir(d)) != nullptr) { + if (ent->d_name[0] == '.') continue; + + std::string sub = std::string(enrichers_dir) + "/" + ent->d_name; + struct stat st{}; + if (stat(sub.c_str(), &st) != 0 || !S_ISDIR(st.st_mode)) continue; + + std::string manifest = sub + "/manifest.yaml"; + std::string runpy = sub + "/run.py"; + if (stat(manifest.c_str(), &st) != 0) continue; + if (stat(runpy.c_str(), &st) != 0) continue; + + EnricherSpec spec; + if (!parse_manifest(manifest, &spec)) { + std::fprintf(stderr, "[enrichers] parse failed: %s\n", manifest.c_str()); + continue; + } + spec.run_path = runpy; + g_enrichers.push_back(std::move(spec)); + } + closedir(d); + + std::sort(g_enrichers.begin(), g_enrichers.end(), + [](const EnricherSpec& a, const EnricherSpec& b) { + return a.name < b.name; + }); + return (int)g_enrichers.size(); +} + +const std::vector& enrichers_all() { + return g_enrichers; +} + +std::vector enrichers_for_type(const char* type_ref) { + std::vector out; + if (!type_ref || !*type_ref) return out; + std::string want = lower(type_ref); + for (const auto& e : g_enrichers) { + if (e.applies_to.empty()) { + out.push_back(e); + continue; + } + for (const auto& t : e.applies_to) { + if (lower(t) == want) { out.push_back(e); break; } + } + } + return out; +} + +const EnricherSpec* enricher_by_id(const char* id) { + if (!id || !*id) return nullptr; + for (const auto& e : g_enrichers) { + if (e.id == id) return &e; + } + return nullptr; +} + +} // namespace ge diff --git a/enrichers.h b/enrichers.h new file mode 100644 index 0000000..f2fcfeb --- /dev/null +++ b/enrichers.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +// Registro estatico de enrichers (issue 0026). +// +// Al arrancar la app se escanea `/enrichers/*/manifest.yaml` y se +// rellena el registro. El context menu del viewport consulta +// `enrichers_for_type(type_ref)` para mostrar el submenu filtrado por tipo +// del nodo right-clickado. +// +// Para v1 no parseamos `params` con detalle — solo lo necesario para +// presentar el item de menu y submitear el job con `{}`. + +namespace ge { + +struct EnricherSpec { + std::string id; // ej: "fetch_webpage" + std::string name; // ej: "Fetch web page" + std::string description; + std::vector applies_to; // tipos validos (case-insensitive) + std::string run_path; // path absoluto a run.py +}; + +// Escanea el directorio. Reentrante (limpia el registro anterior). Devuelve +// el numero de enrichers cargados, -1 si el dir no existe. +int enrichers_load(const char* enrichers_dir); + +// Lista todos los enrichers cargados. +const std::vector& enrichers_all(); + +// Filtra por tipo. Comparacion case-insensitive. Si applies_to es vacio en el +// manifest, el enricher se considera global (aplica a cualquier tipo). +std::vector enrichers_for_type(const char* type_ref); + +// Resuelve un enricher por id. Devuelve nullptr si no existe. +const EnricherSpec* enricher_by_id(const char* id); + +} // namespace ge diff --git a/jobs.cpp b/jobs.cpp new file mode 100644 index 0000000..e7e89c0 --- /dev/null +++ b/jobs.cpp @@ -0,0 +1,862 @@ +#include "jobs.h" + +#include "../../../../cpp/vendor/sqlite3/sqlite3.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ge { + +// ---------------------------------------------------------------------------- +// Internal state +// ---------------------------------------------------------------------------- + +namespace { + +struct JobControl { + pid_t pid = -1; + std::atomic cancel_requested{false}; +}; + +struct State { + std::string app_db_path; + std::string ops_db_path; // mutable: cambia con jobs_set_ops_db + std::string enrichers_dir; + std::string app_dir; + std::string registry_root; + + std::mutex q_mu; + std::condition_variable q_cv; + std::queue pending; // job ids + std::unordered_map> running; + + std::vector workers; + std::atomic stop_flag{false}; + std::atomic dirty{0}; +}; + +State* g_state = nullptr; + +// ---- helpers -------------------------------------------------------------- + +long long now_ms() { + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); +} + +std::string ulid() { + // ULID-ish: timestamp ms + 10 random hex chars. Suficiente para un PK. + long long ts = now_ms(); + static std::atomic ctr{(uint32_t)(ts & 0xFFFFFFFF)}; + uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed); + char buf[64]; + std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd); + return buf; +} + +bool sql_exec_simple(sqlite3* db, const char* sql) { + char* err = nullptr; + int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err); + if (rc != SQLITE_OK) { + std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n", + err ? err : "?", sql); + if (err) sqlite3_free(err); + return false; + } + return true; +} + +bool sql_run(sqlite3* db, const char* sql, + const std::vector& params) +{ + sqlite3_stmt* st = nullptr; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) { + std::fprintf(stderr, "[jobs] prepare failed: %s :: %s\n", + sqlite3_errmsg(db), sql); + return false; + } + for (size_t i = 0; i < params.size(); ++i) { + sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1, + SQLITE_TRANSIENT); + } + int rc = sqlite3_step(st); + sqlite3_finalize(st); + return rc == SQLITE_DONE; +} + +bool ensure_table(const char* db_path) { + sqlite3* db = nullptr; + if (sqlite3_open_v2(db_path, &db, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, + nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sql_exec_simple(db, "PRAGMA journal_mode=WAL;"); + bool ok = sql_exec_simple(db, + "CREATE TABLE IF NOT EXISTS jobs (" + " id TEXT PRIMARY KEY," + " enricher_id TEXT NOT NULL," + " node_id TEXT," + " node_name TEXT NOT NULL DEFAULT ''," + " params_json TEXT NOT NULL DEFAULT '{}'," + " status TEXT NOT NULL," + " progress REAL NOT NULL DEFAULT 0," + " stage TEXT NOT NULL DEFAULT ''," + " result_json TEXT," + " error TEXT," + " pid INTEGER," + " created_at INTEGER NOT NULL," + " started_at INTEGER," + " finished_at INTEGER" + ");" + ); + sql_exec_simple(db, + "CREATE INDEX IF NOT EXISTS idx_jobs_status " + "ON jobs(status, created_at);"); + + // Reaper: jobs `running` huerfanos de una sesion anterior. + char ts[32]; + std::snprintf(ts, sizeof(ts), "%lld", now_ms()); + sql_run(db, + "UPDATE jobs SET status='error', error='process died (app restart)', " + "finished_at=? WHERE status='running'", + {ts}); + + sqlite3_close(db); + return ok; +} + +// Escapa string para JSON. Simplificado: maneja comillas, backslash y +// caracteres de control basicos. +std::string json_escape(const std::string& s) { + std::string out; + out.reserve(s.size() + 8); + for (char c : s) { + switch (c) { + case '"': out += "\\\""; break; + case '\\': out += "\\\\"; break; + case '\b': out += "\\b"; break; + case '\f': out += "\\f"; break; + case '\n': out += "\\n"; break; + case '\r': out += "\\r"; break; + case '\t': out += "\\t"; break; + default: + if ((unsigned char)c < 0x20) { + char buf[8]; + std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c); + out += buf; + } else { + out += c; + } + } + } + return out; +} + +// Lee un campo de la entidad como string. Devuelve "" si no existe. +std::string read_entity_field(const char* db_path, const char* id, + const char* col) +{ + sqlite3* db = nullptr; + if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr) + != SQLITE_OK) { + if (db) sqlite3_close(db); + return ""; + } + std::string sql = std::string("SELECT ") + col + + " FROM entities WHERE id = ? LIMIT 1"; + sqlite3_stmt* st = nullptr; + std::string out; + if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT); + if (sqlite3_step(st) == SQLITE_ROW) { + const unsigned char* t = sqlite3_column_text(st, 0); + if (t) out = (const char*)t; + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return out; +} + +// Construye el JSON que se entrega al subprocess via stdin. Lee node de la +// operations.db actual. +std::string build_stdin_json(const std::string& job_id, + const std::string& enricher_id, + const std::string& node_id, + const std::string& params_json, + const std::string& ops_db, + const std::string& app_dir, + const std::string& registry_root) +{ + std::string node_type, node_name, node_metadata = "{}"; + if (!node_id.empty()) { + node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref"); + node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name"); + std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata"); + if (!m.empty()) node_metadata = m; + } + + std::string cache_dir = app_dir + "/cache"; + + std::ostringstream o; + o << '{' + << "\"job_id\":\"" << json_escape(job_id) << "\"," + << "\"enricher_id\":\""<< json_escape(enricher_id) << "\"," + << "\"node_id\":\"" << json_escape(node_id) << "\"," + << "\"node_type\":\"" << json_escape(node_type) << "\"," + << "\"node_name\":\"" << json_escape(node_name) << "\"," + << "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << "," + << "\"params\":" << (params_json.empty() ? "{}" : params_json) << "," + << "\"ops_db_path\":\""<< json_escape(ops_db) << "\"," + << "\"app_dir\":\"" << json_escape(app_dir) << "\"," + << "\"cache_dir\":\"" << json_escape(cache_dir) << "\"," + << "\"registry_root\":\"" << json_escape(registry_root) << "\"" + << '}'; + return o.str(); +} + +// ---- subprocess (POSIX) --------------------------------------------------- + +struct ProcResult { + int exit_code = -1; + bool signaled = false; + int signal = 0; + std::string stdout_buf; + std::string stderr_tail; // ultimas lineas, para mensajes de error +}; + +void update_progress(const std::string& job_id, double prog, + const std::string& stage) +{ + if (!g_state) return; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return; + } + sqlite3_stmt* st = nullptr; + const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_double(st, 1, prog); + sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + sqlite3_close(db); +} + +// Spawnea python3 run.py. Pipes para stdin (write), stdout (read), +// stderr (read). Lee stdout entero al final; lee stderr line-by-line en un +// thread auxiliar parseando "PROGRESS: ". +ProcResult run_subprocess(const std::string& job_id, + const std::string& run_path, + const std::string& stdin_payload, + std::shared_ptr ctrl) +{ + ProcResult out; + + int p_in[2] = {-1, -1}; // padre escribe en p_in[1], hijo lee p_in[0] + int p_out[2] = {-1, -1}; + int p_err[2] = {-1, -1}; + if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) { + out.stderr_tail = "pipe() failed"; + return out; + } + + pid_t pid = fork(); + if (pid < 0) { + out.stderr_tail = "fork() failed"; + for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) { + if (fd >= 0) close(fd); + } + return out; + } + + if (pid == 0) { + // child + dup2(p_in[0], 0); + dup2(p_out[1], 1); + dup2(p_err[1], 2); + close(p_in[0]); close(p_in[1]); + close(p_out[0]); close(p_out[1]); + close(p_err[0]); close(p_err[1]); + + // Resolver intérprete: /python/.venv/bin/python3 + std::string py = g_state->registry_root + "/python/.venv/bin/python3"; + const char* argv[] = { py.c_str(), run_path.c_str(), nullptr }; + execv(py.c_str(), (char* const*)argv); + std::fprintf(stderr, "execv failed: %s\n", py.c_str()); + _exit(127); + } + + // parent + ctrl->pid = pid; + close(p_in[0]); + close(p_out[1]); + close(p_err[1]); + + // Persistir pid en BD para mostrarlo en UI. + { + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) { + sqlite3_stmt* st = nullptr; + if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1, + &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int (st, 1, (int)pid); + sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + sqlite3_close(db); + } + } + + // Escribir stdin entero. + if (!stdin_payload.empty()) { + ssize_t written = 0; + const char* p = stdin_payload.c_str(); + size_t left = stdin_payload.size(); + while (left > 0) { + ssize_t n = write(p_in[1], p + written, left); + if (n < 0) { if (errno == EINTR) continue; break; } + written += n; left -= (size_t)n; + } + } + close(p_in[1]); + + // Thread aux para stderr: parsea PROGRESS y guarda tail. + std::string stderr_tail_local; + std::mutex tail_mu; + std::thread err_t([&]() { + std::string line; + char ch; + while (true) { + ssize_t n = read(p_err[0], &ch, 1); + if (n <= 0) break; + if (ch == '\n') { + // Parse line. + if (line.rfind("PROGRESS:", 0) == 0) { + // PROGRESS: + const char* p = line.c_str() + 9; + char* endp = nullptr; + double prog = std::strtod(p, &endp); + std::string stage; + if (endp && *endp) { + while (*endp == ' ') ++endp; + stage = endp; + } + update_progress(job_id, prog, stage); + } else { + std::lock_guard g(tail_mu); + stderr_tail_local += line; + stderr_tail_local += '\n'; + // Cap a ~4 KB. + if (stderr_tail_local.size() > 4096) { + stderr_tail_local.erase(0, stderr_tail_local.size() - 4096); + } + } + line.clear(); + } else { + line.push_back(ch); + if (line.size() > 4096) line.clear(); // proteccion + } + } + }); + + // Leer stdout entero (sincrono). + { + char buf[4096]; + while (true) { + ssize_t n = read(p_out[0], buf, sizeof(buf)); + if (n <= 0) break; + out.stdout_buf.append(buf, (size_t)n); + if (out.stdout_buf.size() > 1024 * 1024) { + // 1 MB cap. + break; + } + } + } + close(p_out[0]); + + // Esperar al hijo. Si se pidio cancelar, mandamos SIGTERM y SIGKILL. + int status = 0; + while (true) { + if (ctrl->cancel_requested.load() && pid > 0) { + kill(pid, SIGTERM); + // pequena gracia, luego SIGKILL si hace falta + for (int i = 0; i < 5; ++i) { + pid_t r = waitpid(pid, &status, WNOHANG); + if (r == pid) goto reaped; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + kill(pid, SIGKILL); + } + pid_t r = waitpid(pid, &status, 0); + if (r == pid) break; + if (r < 0 && errno == EINTR) continue; + break; + } +reaped: + err_t.join(); + close(p_err[0]); + + if (WIFEXITED(status)) { + out.exit_code = WEXITSTATUS(status); + } else if (WIFSIGNALED(status)) { + out.signaled = true; + out.signal = WTERMSIG(status); + out.exit_code = -1; + } + { + std::lock_guard g(tail_mu); + out.stderr_tail = std::move(stderr_tail_local); + } + return out; +} + +// ---- worker --------------------------------------------------------------- + +void persist_status(const std::string& job_id, const std::string& status, + const std::string& result_json, + const std::string& error, + bool set_finished) +{ + if (!g_state) return; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return; + } + if (set_finished) { + sqlite3_stmt* st = nullptr; + const char* sql = + "UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? " + "WHERE id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_int64 (st, 4, now_ms()); + sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + } else { + sqlite3_stmt* st = nullptr; + const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_int64(st, 2, now_ms()); + sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + } + sqlite3_close(db); +} + +// Lee la fila para reconstruir el contexto del job antes de spawn. +struct JobContext { + std::string id, enricher_id, node_id, node_name, params_json, status; +}; +bool load_job(const std::string& id, JobContext* out) { + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status " + "FROM jobs WHERE id=?"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT); + if (sqlite3_step(st) == SQLITE_ROW) { + auto col = [&](int i) { + const unsigned char* t = sqlite3_column_text(st, i); + return std::string(t ? (const char*)t : ""); + }; + out->id = col(0); + out->enricher_id = col(1); + out->node_id = col(2); + out->node_name = col(3); + out->params_json = col(4); + out->status = col(5); + ok = true; + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return ok; +} + +void worker_loop() { + while (!g_state->stop_flag.load()) { + std::string job_id; + { + std::unique_lock lk(g_state->q_mu); + g_state->q_cv.wait(lk, [] { + return g_state->stop_flag.load() || !g_state->pending.empty(); + }); + if (g_state->stop_flag.load()) return; + job_id = std::move(g_state->pending.front()); + g_state->pending.pop(); + } + + JobContext ctx; + if (!load_job(job_id, &ctx)) continue; + if (ctx.status == "cancelled") continue; + + // Resolver run.py por convencion: //run.py. + std::string run_path = g_state->enrichers_dir + "/" + ctx.enricher_id + + "/run.py"; + + // Marcar running. + persist_status(job_id, "running", "", "", false); + + auto ctrl = std::make_shared(); + { + std::lock_guard lk(g_state->q_mu); + g_state->running[job_id] = ctrl; + } + + // Construir stdin y ejecutar. + std::string ops_db; + { + std::lock_guard lk(g_state->q_mu); + ops_db = g_state->ops_db_path; + } + std::string stdin_payload = build_stdin_json( + ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json, + ops_db, g_state->app_dir, g_state->registry_root); + + ProcResult res = run_subprocess(job_id, run_path, stdin_payload, ctrl); + + // Estado final. + std::string final_status, error; + std::string result_json = res.stdout_buf; + // Trim del result_json (saca trailing whitespace). + while (!result_json.empty() && + (result_json.back() == '\n' || result_json.back() == '\r' || + result_json.back() == ' ' || result_json.back() == '\t')) { + result_json.pop_back(); + } + if (ctrl->cancel_requested.load()) { + final_status = "cancelled"; + error = "user cancelled"; + } else if (res.exit_code == 0) { + final_status = "done"; + if (result_json.empty()) result_json = "{}"; + } else { + final_status = "error"; + char buf[64]; + if (res.signaled) { + std::snprintf(buf, sizeof(buf), "signal %d", res.signal); + } else { + std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code); + } + error = std::string(buf); + if (!res.stderr_tail.empty()) { + error += "\n"; + error += res.stderr_tail; + } + } + + persist_status(job_id, final_status, result_json, error, true); + + { + std::lock_guard lk(g_state->q_mu); + g_state->running.erase(job_id); + } + if (final_status == "done") { + g_state->dirty.fetch_add(1, std::memory_order_relaxed); + } + } +} + +} // namespace + +// ---------------------------------------------------------------------------- +// Public API +// ---------------------------------------------------------------------------- + +bool jobs_init(const char* app_db_path, + const char* ops_db_path, + const char* enrichers_dir, + const char* app_dir, + const char* registry_root, + int n_workers) +{ + if (g_state) return true; + if (!app_db_path || !*app_db_path) return false; + if (n_workers < 1) n_workers = 1; + if (n_workers > 8) n_workers = 8; + + if (!ensure_table(app_db_path)) return false; + + g_state = new State(); + g_state->app_db_path = app_db_path; + g_state->ops_db_path = ops_db_path ? ops_db_path : ""; + g_state->enrichers_dir = enrichers_dir ? enrichers_dir : ""; + g_state->app_dir = app_dir ? app_dir : ""; + g_state->registry_root = registry_root ? registry_root : ""; + + // Rehidratacion: jobs queued de sesiones anteriores se reencolan. + { + sqlite3* db = nullptr; + if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY, + nullptr) == SQLITE_OK) { + sqlite3_stmt* st = nullptr; + const char* sql = + "SELECT id FROM jobs WHERE status='queued' ORDER BY created_at"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + while (sqlite3_step(st) == SQLITE_ROW) { + const unsigned char* t = sqlite3_column_text(st, 0); + if (t) g_state->pending.push((const char*)t); + } + } + sqlite3_finalize(st); + sqlite3_close(db); + } + } + + for (int i = 0; i < n_workers; ++i) { + g_state->workers.emplace_back(worker_loop); + } + return true; +} + +void jobs_set_ops_db(const char* ops_db_path) { + if (!g_state) return; + std::lock_guard lk(g_state->q_mu); + g_state->ops_db_path = ops_db_path ? ops_db_path : ""; +} + +bool jobs_submit(const char* enricher_id, + const char* node_id, + const char* node_name, + const char* params_json, + char* out_id, size_t out_id_n) +{ + if (!g_state || !enricher_id || !*enricher_id) return false; + if (!out_id || out_id_n < 32) return false; + + std::string id = ulid(); + std::snprintf(out_id, out_id_n, "%s", id.c_str()); + + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, " + "status, progress, stage, created_at) " + "VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT); + sqlite3_bind_int64(st, 6, now_ms()); + ok = sqlite3_step(st) == SQLITE_DONE; + } + sqlite3_finalize(st); + sqlite3_close(db); + if (!ok) return false; + + { + std::lock_guard lk(g_state->q_mu); + g_state->pending.push(id); + } + g_state->q_cv.notify_one(); + return true; +} + +bool jobs_cancel(const char* job_id) { + if (!g_state || !job_id) return false; + std::shared_ptr ctrl; + { + std::lock_guard lk(g_state->q_mu); + auto it = g_state->running.find(job_id); + if (it != g_state->running.end()) ctrl = it->second; + } + if (ctrl) { + ctrl->cancel_requested.store(true); + if (ctrl->pid > 0) kill(ctrl->pid, SIGTERM); + return true; + } + // No corriendo: marcar cancelled si esta queued. + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "UPDATE jobs SET status='cancelled', finished_at=?, " + "error='cancelled before start' WHERE id=? AND status='queued'"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int64(st, 1, now_ms()); + sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT); + ok = sqlite3_step(st) == SQLITE_DONE; + } + sqlite3_finalize(st); + sqlite3_close(db); + return ok; +} + +bool jobs_delete(const char* job_id) { + if (!g_state || !job_id) return false; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT); + ok = sqlite3_step(st) == SQLITE_DONE; + } + sqlite3_finalize(st); + sqlite3_close(db); + return ok; +} + +bool jobs_list(std::vector* out, int limit) { + if (!g_state || !out) return false; + out->clear(); + if (limit < 1) limit = 1; + if (limit > 1000) limit = 1000; + + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, " + "progress, stage, COALESCE(error,''), COALESCE(result_json,''), " + "created_at, COALESCE(started_at,0), COALESCE(finished_at,0) " + "FROM jobs ORDER BY created_at DESC LIMIT ?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int(st, 1, limit); + while (sqlite3_step(st) == SQLITE_ROW) { + JobRow r; + auto col = [&](int i) { + const unsigned char* t = sqlite3_column_text(st, i); + return std::string(t ? (const char*)t : ""); + }; + r.id = col(0); + r.enricher_id = col(1); + r.node_id = col(2); + r.node_name = col(3); + r.status = col(4); + r.progress = sqlite3_column_double(st, 5); + r.stage = col(6); + r.error = col(7); + r.result_json = col(8); + r.created_at = sqlite3_column_int64(st, 9); + r.started_at = sqlite3_column_int64(st, 10); + r.finished_at = sqlite3_column_int64(st, 11); + out->push_back(std::move(r)); + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return true; +} + +JobCounters jobs_counters() { + JobCounters c{}; + if (!g_state) return c; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return c; + } + sqlite3_stmt* st = nullptr; + const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + while (sqlite3_step(st) == SQLITE_ROW) { + const unsigned char* s = sqlite3_column_text(st, 0); + int n = sqlite3_column_int(st, 1); + if (!s) continue; + std::string status((const char*)s); + if (status == "queued") c.queued = n; + else if (status == "running") c.running = n; + else if (status == "done") c.done = n; + else if (status == "error") c.error = n; + else if (status == "cancelled") c.cancelled = n; + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return c; +} + +int jobs_dirty_counter() { + if (!g_state) return 0; + return g_state->dirty.load(std::memory_order_relaxed); +} + +void jobs_shutdown() { + if (!g_state) return; + g_state->stop_flag.store(true); + // Cancelar todos los running. + { + std::lock_guard lk(g_state->q_mu); + for (auto& kv : g_state->running) { + kv.second->cancel_requested.store(true); + if (kv.second->pid > 0) kill(kv.second->pid, SIGTERM); + } + } + g_state->q_cv.notify_all(); + for (auto& t : g_state->workers) { + if (t.joinable()) t.join(); + } + delete g_state; + g_state = nullptr; +} + +} // namespace ge diff --git a/jobs.h b/jobs.h new file mode 100644 index 0000000..b51b793 --- /dev/null +++ b/jobs.h @@ -0,0 +1,97 @@ +#pragma once + +#include +#include +#include +#include + +// Sistema de jobs asincronos para enrichers (issue 0026). +// +// Diseno: +// - Tabla `jobs` en graph_explorer.db (NO en operations.db). +// - Pool de N std::thread workers (default 2). +// - Cada job spawnea un subprocess Python `enrichers//run.py` que recibe +// contexto por stdin (JSON), emite progreso por stderr y resultado final +// por stdout (JSON). +// - El worker aplica entities/relations/node_updates al operations.db indicado. +// - dirty_counter se incrementa al completar un job — el render() lee el +// contador y triggerea un reload del grafo cuando cambia. +// +// El estado vivo (running/queued en RAM) se persiste tambien en SQLite para +// que cancelaciones, reinicios y queries de UI vean lo mismo. + +namespace ge { + +struct JobRow { + std::string id; + std::string enricher_id; + std::string node_id; + std::string node_name; + std::string status; // queued|running|done|error|cancelled + double progress = 0.0; + std::string stage; + std::string error; + std::string result_json; + long long created_at = 0; + long long started_at = 0; + long long finished_at = 0; +}; + +// Inicializa el sistema. Crea la tabla `jobs` si no existe, marca como `error` +// los jobs que quedaron `running` de una sesion anterior, lanza los workers. +// `app_db_path` es /graph_explorer.db (jobs). +// `ops_db_path` es la operations.db actualmente cargada (target de mutaciones). +// `enrichers_dir` es /enrichers/. +// `app_dir` se usa como root para resolver `cache/` (issue 0027). +// `registry_root` se pasa al subprocess como FN_REGISTRY_ROOT para que el +// enricher pueda importar funciones del registry. +// Retorna false si alguno de los paths falla. +bool jobs_init(const char* app_db_path, + const char* ops_db_path, + const char* enrichers_dir, + const char* app_dir, + const char* registry_root, + int n_workers); + +// Cambia la operations.db objetivo (cuando el usuario abre otra). Drena cola +// pendiente — los jobs ya queued NO se reapuntan. +void jobs_set_ops_db(const char* ops_db_path); + +// Encola un job. params_json puede ser "{}" o JSON valido. node_id puede ser +// vacio (job global). Devuelve false si la BD falla. out_id (>= 64) recibe el +// ULID generado. +bool jobs_submit(const char* enricher_id, + const char* node_id, + const char* node_name, + const char* params_json, + char* out_id, size_t out_id_n); + +// Senala SIGTERM al subprocess y marca el job como `cancelled`. Si esta +// `queued` lo marca directamente. +bool jobs_cancel(const char* job_id); + +// Borra un job en estado terminal (done/error/cancelled). +bool jobs_delete(const char* job_id); + +// Snapshot ordenado por created_at DESC. +bool jobs_list(std::vector* out, int limit = 200); + +// Counters para el badge en la toolbar. +struct JobCounters { + int queued = 0; + int running = 0; + int done = 0; + int error = 0; + int cancelled = 0; +}; +JobCounters jobs_counters(); + +// Counter monotono que se incrementa cada vez que un job completa con +// cambios en operations.db. El main thread compara contra su ultimo valor +// conocido y, si cambio, llama reload_graph(). +int jobs_dirty_counter(); + +// Cierra el pool y espera a los workers (cancelando jobs en vuelo). +void jobs_shutdown(); + +} // namespace ge diff --git a/main.cpp b/main.cpp index 76faab4..14d759b 100644 --- a/main.cpp +++ b/main.cpp @@ -25,6 +25,8 @@ #include "layout_store.h" #include "entity_ops.h" #include "project_manager.h" +#include "jobs.h" +#include "enrichers.h" #include "../../../../cpp/vendor/sqlite3/sqlite3.h" @@ -39,6 +41,13 @@ #include #include +#ifndef _WIN32 +#include +#else +#include +#define getcwd _getcwd +#endif + // ---------------------------------------------------------------------------- // Estado global de la app // ---------------------------------------------------------------------------- @@ -116,6 +125,32 @@ static void apply_static_layout(int mode) { // Forward decl — definido mas abajo, lo necesita switch_to_project. static bool load_input(); +// ---------------------------------------------------------------------------- +// Registry path resolution (issue 0026) +// ---------------------------------------------------------------------------- + +// Devuelve el path absoluto al root de fn_registry. Estrategia: +// 1) FN_REGISTRY_ROOT env var +// 2) Sube desde getcwd() buscando un dir con `registry.db` +// 3) "" si no se encuentra +static std::string resolve_registry_root() { + if (const char* env = std::getenv("FN_REGISTRY_ROOT")) { + if (env && *env) return env; + } + char cwd[4096]; + if (getcwd(cwd, sizeof(cwd)) == nullptr) return ""; + std::string p = cwd; + for (int i = 0; i < 8; ++i) { + std::string probe = p + "/registry.db"; + FILE* f = std::fopen(probe.c_str(), "rb"); + if (f) { std::fclose(f); return p; } + size_t s = p.find_last_of('/'); + if (s == std::string::npos || s == 0) break; + p = p.substr(0, s); + } + return ""; +} + // ---------------------------------------------------------------------------- // Project lifecycle // ---------------------------------------------------------------------------- @@ -240,6 +275,9 @@ static bool load_input() { ge::entity_index_build(g_input.uri, &g_idx); g_app.input_db_path = g_input.uri ? g_input.uri : ""; + // issue 0026 — apunta el JobRunner a la nueva operations.db. + if (g_input.uri) ge::jobs_set_ops_db(g_input.uri); + // Cargar posiciones guardadas para este graph_hash g_graph_hash = ge::compute_graph_hash(g_input.uri); int restored = ge::layout_store_load(g_graph_hash, g_graph); @@ -483,7 +521,27 @@ static void render_context_menu() { ImGui::Separator(); if (ImGui::BeginMenu("Run enricher")) { - ImGui::TextDisabled("(coming soon — issues 0001/0002/0003)"); + // issue 0026 — listamos enrichers cuyo applies_to incluye este type. + const char* type_name = (n.type_id < (uint16_t)g_graph.type_count) + ? g_graph.types[n.type_id].name : ""; + auto specs = ge::enrichers_for_type(type_name); + if (!sql_id) { + ImGui::TextDisabled("(node has no entity id)"); + } else if (specs.empty()) { + ImGui::TextDisabled("(no enrichers para tipo '%s')", type_name); + } else { + for (const auto& s : specs) { + if (ImGui::MenuItem(s.name.c_str())) { + char job_id[64]; + bool ok = ge::jobs_submit(s.id.c_str(), sql_id, lbl, + "{}", job_id, sizeof(job_id)); + if (ok) g_app.panel_jobs = true; // abrir panel auto + } + if (!s.description.empty() && ImGui::IsItemHovered()) { + ImGui::SetTooltip("%s", s.description.c_str()); + } + } + } ImGui::EndMenu(); } @@ -512,6 +570,7 @@ static fn_ui::PanelToggle g_panels[] = { {"Note", nullptr, &g_app.panel_note}, {"Types", nullptr, &g_app.panel_type_editor}, {"Table", nullptr, &g_app.panel_table}, + {"Jobs", nullptr, &g_app.panel_jobs}, }; static void render() { @@ -596,6 +655,16 @@ static void render() { g_app.apply_layout_tick = 0; } + // issue 0026 — si un job termino con cambios, dispara reload del grafo. + { + static int s_last_dirty = 0; + int d = ge::jobs_dirty_counter(); + if (d != s_last_dirty) { + s_last_dirty = d; + g_app.want_reload = true; + } + } + // Triggers desde la toolbar if (g_app.want_fit) { graph_viewport_fit(g_graph, g_viewport); @@ -1194,6 +1263,12 @@ static void render() { ge::views_table_window(g_app); ge::views_import_dataset_modal(g_app); + // Jobs panel (issue 0026) — flotante, dockeable. + ImGui::SetNextWindowPos (ImVec2(vp->WorkPos.x + W * 0.20f, top + 40.0f), + ImGuiCond_FirstUseEver); + ImGui::SetNextWindowSize(ImVec2(900.0f, 360.0f), ImGuiCond_FirstUseEver); + ge::views_jobs(g_app); + g_first_render = false; } @@ -1418,6 +1493,34 @@ int main(int argc, char** argv) { "cualquier app del registry y permite explorar entidades/relaciones con " "shapes/iconos/layouts/filtros."); + // issue 0026 — sistema de jobs + enrichers. + { + std::string registry_root = resolve_registry_root(); + std::string app_dir = registry_root.empty() + ? "." + : registry_root + "/projects/osint_graph/apps/graph_explorer"; + std::string enrichers_dir = app_dir + "/enrichers"; + + // graph_explorer.db es el mismo SQLite usado por layout_store. + const char* app_db = g_layout_db_path.empty() + ? "graph_explorer.db" : g_layout_db_path.c_str(); + + ge::enrichers_load(enrichers_dir.c_str()); + if (!ge::jobs_init(app_db, + g_input.uri ? g_input.uri : "", + enrichers_dir.c_str(), + app_dir.c_str(), + registry_root.c_str(), + /*n_workers=*/2)) { + std::fprintf(stderr, "[graph_explorer] jobs_init failed (panel disabled)\n"); + } else { + std::fprintf(stdout, + "[graph_explorer] jobs_init OK — enrichers_dir=%s, registry_root=%s, %d enrichers\n", + enrichers_dir.c_str(), registry_root.c_str(), + (int)ge::enrichers_all().size()); + } + } + int rc = fn::run_app( {.title = "graph_explorer", .width = 1600, @@ -1429,6 +1532,7 @@ int main(int argc, char** argv) { render); // Cleanup + ge::jobs_shutdown(); if (g_gpu_ctx) graph_force_layout_gpu_destroy(g_gpu_ctx); if (g_atlas) graph_icons_destroy(g_atlas); graph_viewport_destroy(g_viewport); diff --git a/views.h b/views.h index af42af3..a12ac92 100644 --- a/views.h +++ b/views.h @@ -55,6 +55,7 @@ struct AppState { bool panel_stats = true; bool panel_viewport = true; bool panel_note = false; + bool panel_jobs = false; // issue 0026 bool show_filters_modal = false; bool show_open_modal = false; @@ -353,6 +354,14 @@ void views_type_editor(AppState& app); // te_delete_use_count via consulta a operations.db antes de mostrarlo. bool views_type_editor_delete_modal(AppState& app); +// ---- Jobs panel (issue 0026) --------------------------------------------- + +// Renderiza el panel "Jobs" — tabla con todos los jobs (queued/running/done/ +// error/cancelled). Botones por fila para cancelar / reintentar / borrar. +// Click en target_node centra el viewport sobre ese nodo (futuro). Polling +// cada N frames para no spammear la BD. +void views_jobs(AppState& app); + // ---- Filter helpers (issue 0009) ----------------------------------------- // True si el filtro tiene query no vacia o al menos un tag activo. diff --git a/views_jobs.cpp b/views_jobs.cpp new file mode 100644 index 0000000..36bc679 --- /dev/null +++ b/views_jobs.cpp @@ -0,0 +1,199 @@ +#include "views.h" +#include "jobs.h" + +#include "core/icons_tabler.h" +#include "core/tokens.h" + +#include "imgui.h" + +#include +#include +#include + +namespace ge { + +namespace { + +// Cache de la lista de jobs. Se refresca cada N frames para no abrir SQLite +// en cada frame. ~10 Hz es suficiente para una progress bar fluida. +struct JobsCache { + std::vector rows; + int last_frame_refresh = -100; + int filter_idx = 0; // 0=all 1=active 2=done 3=error + char buf[8] = {}; +}; +JobsCache g_jobs_cache; + +const char* status_icon(const std::string& s) { + if (s == "queued") return TI_HOURGLASS; + if (s == "running") return TI_PLAYER_PLAY; + if (s == "done") return TI_CHECK; + if (s == "error") return TI_ALERT_CIRCLE; + if (s == "cancelled") return TI_X; + return TI_QUESTION_MARK; +} + +ImVec4 status_color(const std::string& s) { + if (s == "running") return ImVec4(0.36f, 0.78f, 1.0f, 1.0f); + if (s == "done") return ImVec4(0.40f, 0.85f, 0.55f, 1.0f); + if (s == "error") return ImVec4(0.95f, 0.45f, 0.45f, 1.0f); + if (s == "cancelled") return ImVec4(0.65f, 0.65f, 0.65f, 1.0f); + if (s == "queued") return ImVec4(0.85f, 0.78f, 0.45f, 1.0f); + return ImVec4(0.7f, 0.7f, 0.7f, 1.0f); +} + +std::string format_duration(long long started, long long finished) { + if (started <= 0) return "—"; + long long end = finished > 0 ? finished + : std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + long long ms = end - started; + if (ms < 0) ms = 0; + char b[32]; + if (ms < 1000) std::snprintf(b, sizeof(b), "%lld ms", ms); + else if (ms < 60'000) std::snprintf(b, sizeof(b), "%.1f s", ms / 1000.0); + else std::snprintf(b, sizeof(b), "%.1f m", ms / 60'000.0); + return b; +} + +bool filter_match(const std::string& status, int idx) { + switch (idx) { + case 0: return true; // all + case 1: return status == "queued" || status == "running"; // active + case 2: return status == "done"; + case 3: return status == "error" || status == "cancelled"; + default: return true; + } +} + +} // namespace + +void views_jobs(AppState& app) { + if (!app.panel_jobs) return; + + if (!ImGui::Begin("Jobs", &app.panel_jobs)) { + ImGui::End(); + return; + } + + // Refresh cache cada ~10 frames (~6 Hz a 60fps). + int frame = ImGui::GetFrameCount(); + if (frame - g_jobs_cache.last_frame_refresh > 10) { + jobs_list(&g_jobs_cache.rows, 200); + g_jobs_cache.last_frame_refresh = frame; + } + + // Header: counters + filtro. + JobCounters c = jobs_counters(); + ImGui::TextColored(status_color("running"), "%s", TI_PLAYER_PLAY); + ImGui::SameLine(); ImGui::Text("%d", c.running); + ImGui::SameLine(0, 16); + ImGui::TextColored(status_color("queued"), "%s", TI_HOURGLASS); + ImGui::SameLine(); ImGui::Text("%d", c.queued); + ImGui::SameLine(0, 16); + ImGui::TextColored(status_color("done"), "%s", TI_CHECK); + ImGui::SameLine(); ImGui::Text("%d", c.done); + ImGui::SameLine(0, 16); + ImGui::TextColored(status_color("error"), "%s", TI_ALERT_CIRCLE); + ImGui::SameLine(); ImGui::Text("%d", c.error + c.cancelled); + + ImGui::SameLine(0, 24); + const char* filter_labels[] = { "All", "Active", "Done", "Errors" }; + ImGui::SetNextItemWidth(100); + ImGui::Combo("##jobs_filter", &g_jobs_cache.filter_idx, + filter_labels, IM_ARRAYSIZE(filter_labels)); + + ImGui::Separator(); + + // Tabla. + ImGuiTableFlags tflags = ImGuiTableFlags_Borders | + ImGuiTableFlags_RowBg | + ImGuiTableFlags_SizingStretchProp | + ImGuiTableFlags_ScrollY; + if (ImGui::BeginTable("jobs_table", 6, tflags, + ImVec2(0, ImGui::GetContentRegionAvail().y - 4))) { + ImGui::TableSetupScrollFreeze(0, 1); + ImGui::TableSetupColumn("Status", ImGuiTableColumnFlags_WidthFixed, 90); + ImGui::TableSetupColumn("Enricher", ImGuiTableColumnFlags_WidthStretch, 1.5f); + ImGui::TableSetupColumn("Target", ImGuiTableColumnFlags_WidthStretch, 2.0f); + ImGui::TableSetupColumn("Progress", ImGuiTableColumnFlags_WidthStretch, 2.0f); + ImGui::TableSetupColumn("Time", ImGuiTableColumnFlags_WidthFixed, 70); + ImGui::TableSetupColumn("##actions",ImGuiTableColumnFlags_WidthFixed, 80); + ImGui::TableHeadersRow(); + + for (const auto& r : g_jobs_cache.rows) { + if (!filter_match(r.status, g_jobs_cache.filter_idx)) continue; + ImGui::PushID(r.id.c_str()); + ImGui::TableNextRow(); + + // Status. + ImGui::TableSetColumnIndex(0); + ImGui::TextColored(status_color(r.status), "%s %s", + status_icon(r.status), r.status.c_str()); + + // Enricher. + ImGui::TableSetColumnIndex(1); + ImGui::TextUnformatted(r.enricher_id.c_str()); + + // Target. + ImGui::TableSetColumnIndex(2); + if (!r.node_name.empty()) { + ImGui::TextUnformatted(r.node_name.c_str()); + } else if (!r.node_id.empty()) { + ImGui::TextDisabled("%s", r.node_id.c_str()); + } else { + ImGui::TextDisabled("(global)"); + } + + // Progress. + ImGui::TableSetColumnIndex(3); + if (r.status == "running" || r.status == "queued") { + ImGui::ProgressBar((float)r.progress, ImVec2(-FLT_MIN, 0), + r.stage.empty() ? nullptr : r.stage.c_str()); + } else if (r.status == "error" && !r.error.empty()) { + ImGui::TextColored(status_color("error"), "%s", + r.error.size() > 64 + ? (r.error.substr(0, 64) + "…").c_str() + : r.error.c_str()); + if (ImGui::IsItemHovered()) { + ImGui::SetTooltip("%s", r.error.c_str()); + } + } else if (r.status == "done" && !r.result_json.empty()) { + ImGui::TextDisabled("%s", + r.result_json.size() > 80 + ? (r.result_json.substr(0, 80) + "…").c_str() + : r.result_json.c_str()); + if (ImGui::IsItemHovered() && r.result_json.size() > 80) { + ImGui::SetTooltip("%s", r.result_json.c_str()); + } + } else { + ImGui::TextDisabled("—"); + } + + // Time. + ImGui::TableSetColumnIndex(4); + ImGui::TextDisabled("%s", + format_duration(r.started_at, r.finished_at).c_str()); + + // Actions. + ImGui::TableSetColumnIndex(5); + if (r.status == "queued" || r.status == "running") { + if (ImGui::SmallButton("Cancel")) { + jobs_cancel(r.id.c_str()); + } + } else { + if (ImGui::SmallButton("Delete")) { + jobs_delete(r.id.c_str()); + } + } + + ImGui::PopID(); + } + ImGui::EndTable(); + } + + ImGui::End(); +} + +} // namespace ge