feat(jobs): sistema de jobs asincronos + panel UI (issue 0026)
Infra para correr enrichers en background mientras la app sigue interactiva.
C++:
- jobs.{h,cpp}: tabla jobs en graph_explorer.db, JobRunner con N=2 std::thread
workers, fork+exec POSIX con pipes, parser de PROGRESS:<float> <stage> en
stderr, captura de stdout JSON, persistencia + dirty_counter.
- enrichers.{h,cpp}: scanner de enrichers/<id>/manifest.yaml, parser YAML
minimo (id/name/description/applies_to), filtro por tipo de nodo.
- views_jobs.cpp: panel "Jobs" dockeable con tabla (status/enricher/target/
progress/time), filtro all/active/done/errors, cancelar/borrar inline.
Wiring:
- main.cpp: resolve_registry_root() (FN_REGISTRY_ROOT env o subir desde cwd
buscando registry.db), jobs_init/enrichers_load antes de fn::run_app,
jobs_shutdown al cerrar, dirty_counter -> want_reload, jobs_set_ops_db al
cambiar de proyecto.
- main.cpp:render_context_menu: menu "Run enricher" sustituye placeholder
con submenu filtrado por type_ref via enrichers_for_type. Submit abre
panel Jobs auto.
- views.h: AppState::panel_jobs flag + decl views_jobs().
- CMakeLists.txt: anade jobs.cpp + enrichers.cpp + views_jobs.cpp y enlaza
Threads::Threads.
Wire protocol enricher (subprocess Python):
- stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir,
registry_root, params.
- stderr: PROGRESS:<float> <stage> + LOG lineas libres.
- stdout: JSON resumen al final.
- exit 0 = ok, !=0 = error con stderr capturado en panel Jobs.
El run.py escribe directamente al operations.db (sqlite3 stdlib) — C++ solo
orquesta, no parsea entities/relations.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -18,11 +18,14 @@ add_imgui_app(graph_explorer
|
|||||||
main.cpp
|
main.cpp
|
||||||
data.cpp
|
data.cpp
|
||||||
views.cpp
|
views.cpp
|
||||||
|
views_jobs.cpp
|
||||||
types_registry.cpp
|
types_registry.cpp
|
||||||
layout_store.cpp
|
layout_store.cpp
|
||||||
entity_ops.cpp
|
entity_ops.cpp
|
||||||
project_manager.cpp
|
project_manager.cpp
|
||||||
tableview.cpp
|
tableview.cpp
|
||||||
|
jobs.cpp
|
||||||
|
enrichers.cpp
|
||||||
# --- viz ---
|
# --- viz ---
|
||||||
${FN_CPP_ROOT_DIR}/functions/viz/graph_renderer.cpp
|
${FN_CPP_ROOT_DIR}/functions/viz/graph_renderer.cpp
|
||||||
${FN_CPP_ROOT_DIR}/functions/viz/graph_force_layout.cpp
|
${FN_CPP_ROOT_DIR}/functions/viz/graph_force_layout.cpp
|
||||||
@@ -58,6 +61,10 @@ target_include_directories(graph_explorer PRIVATE
|
|||||||
target_link_libraries(graph_explorer PRIVATE SQLite::SQLite3 DuckDB::DuckDB)
|
target_link_libraries(graph_explorer PRIVATE SQLite::SQLite3 DuckDB::DuckDB)
|
||||||
duckdb_copy_runtime(graph_explorer)
|
duckdb_copy_runtime(graph_explorer)
|
||||||
|
|
||||||
|
# Threads — issue 0026 (jobs system) usa std::thread + std::mutex + condvar.
|
||||||
|
find_package(Threads REQUIRED)
|
||||||
|
target_link_libraries(graph_explorer PRIVATE Threads::Threads)
|
||||||
|
|
||||||
# OpenGL: graph_renderer + graph_force_layout_gpu llaman gl* directamente.
|
# OpenGL: graph_renderer + graph_force_layout_gpu llaman gl* directamente.
|
||||||
# fn::run_app inicializa el loader cuando AppConfig::init_gl_loader = true.
|
# fn::run_app inicializa el loader cuando AppConfig::init_gl_loader = true.
|
||||||
if(NOT WIN32)
|
if(NOT WIN32)
|
||||||
|
|||||||
+172
@@ -0,0 +1,172 @@
|
|||||||
|
#include "enrichers.h"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <cctype>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstring>
|
||||||
|
#include <dirent.h>
|
||||||
|
#include <fstream>
|
||||||
|
#include <sstream>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
|
||||||
|
namespace ge {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
std::vector<EnricherSpec> g_enrichers;
|
||||||
|
|
||||||
|
std::string strip(const std::string& s) {
|
||||||
|
size_t a = 0, b = s.size();
|
||||||
|
while (a < b && std::isspace((unsigned char)s[a])) ++a;
|
||||||
|
while (b > a && std::isspace((unsigned char)s[b - 1])) --b;
|
||||||
|
return s.substr(a, b - a);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string strip_quotes(const std::string& s) {
|
||||||
|
if (s.size() >= 2) {
|
||||||
|
if ((s.front() == '"' && s.back() == '"') ||
|
||||||
|
(s.front() == '\'' && s.back() == '\'')) {
|
||||||
|
return s.substr(1, s.size() - 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string lower(std::string s) {
|
||||||
|
for (auto& c : s) c = (char)std::tolower((unsigned char)c);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parsea una lista inline `[a, b, c]` o "[Webpage, Url]". Tolerante a
|
||||||
|
// espacios y a comillas simples/dobles dentro. NO soporta listas
|
||||||
|
// multi-linea — el manifest las usa siempre inline.
|
||||||
|
std::vector<std::string> parse_inline_list(const std::string& v) {
|
||||||
|
std::vector<std::string> out;
|
||||||
|
std::string s = strip(v);
|
||||||
|
if (s.size() < 2 || s.front() != '[' || s.back() != ']') return out;
|
||||||
|
s = s.substr(1, s.size() - 2);
|
||||||
|
std::string token;
|
||||||
|
auto flush = [&]() {
|
||||||
|
std::string t = strip_quotes(strip(token));
|
||||||
|
if (!t.empty()) out.push_back(std::move(t));
|
||||||
|
token.clear();
|
||||||
|
};
|
||||||
|
for (char c : s) {
|
||||||
|
if (c == ',') flush();
|
||||||
|
else token.push_back(c);
|
||||||
|
}
|
||||||
|
flush();
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manifest YAML soportado (subset):
|
||||||
|
// id: fetch_webpage
|
||||||
|
// name: "Fetch web page"
|
||||||
|
// description: "..."
|
||||||
|
// applies_to: [Webpage, Url]
|
||||||
|
// params: <- v1 ignora bloque
|
||||||
|
// - { name: timeout_s, ... }
|
||||||
|
//
|
||||||
|
// Las claves anidadas bajo `params:` se ignoran (saltamos lineas indentadas).
|
||||||
|
bool parse_manifest(const std::string& path, EnricherSpec* out) {
|
||||||
|
std::ifstream f(path);
|
||||||
|
if (!f) return false;
|
||||||
|
std::string line;
|
||||||
|
bool in_skip_block = false;
|
||||||
|
while (std::getline(f, line)) {
|
||||||
|
// Strip CR de Windows.
|
||||||
|
if (!line.empty() && line.back() == '\r') line.pop_back();
|
||||||
|
|
||||||
|
// Linea blanca o comentario.
|
||||||
|
std::string trim = strip(line);
|
||||||
|
if (trim.empty() || trim.front() == '#') continue;
|
||||||
|
|
||||||
|
// Si la linea NO empieza con whitespace, salimos del bloque skip.
|
||||||
|
bool indented = !line.empty() && std::isspace((unsigned char)line.front());
|
||||||
|
if (!indented) in_skip_block = false;
|
||||||
|
if (in_skip_block) continue;
|
||||||
|
|
||||||
|
size_t colon = trim.find(':');
|
||||||
|
if (colon == std::string::npos) continue;
|
||||||
|
|
||||||
|
std::string key = strip(trim.substr(0, colon));
|
||||||
|
std::string val = strip(trim.substr(colon + 1));
|
||||||
|
|
||||||
|
if (key == "id") out->id = strip_quotes(val);
|
||||||
|
else if (key == "name") out->name = strip_quotes(val);
|
||||||
|
else if (key == "description") out->description = strip_quotes(val);
|
||||||
|
else if (key == "applies_to") out->applies_to = parse_inline_list(val);
|
||||||
|
else if (key == "params" && val.empty()) in_skip_block = true;
|
||||||
|
// emits/relations los ignoramos en v1 (solo informativos).
|
||||||
|
}
|
||||||
|
return !out->id.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
int enrichers_load(const char* enrichers_dir) {
|
||||||
|
g_enrichers.clear();
|
||||||
|
if (!enrichers_dir || !*enrichers_dir) return -1;
|
||||||
|
|
||||||
|
DIR* d = opendir(enrichers_dir);
|
||||||
|
if (!d) return -1;
|
||||||
|
|
||||||
|
struct dirent* ent;
|
||||||
|
while ((ent = readdir(d)) != nullptr) {
|
||||||
|
if (ent->d_name[0] == '.') continue;
|
||||||
|
|
||||||
|
std::string sub = std::string(enrichers_dir) + "/" + ent->d_name;
|
||||||
|
struct stat st{};
|
||||||
|
if (stat(sub.c_str(), &st) != 0 || !S_ISDIR(st.st_mode)) continue;
|
||||||
|
|
||||||
|
std::string manifest = sub + "/manifest.yaml";
|
||||||
|
std::string runpy = sub + "/run.py";
|
||||||
|
if (stat(manifest.c_str(), &st) != 0) continue;
|
||||||
|
if (stat(runpy.c_str(), &st) != 0) continue;
|
||||||
|
|
||||||
|
EnricherSpec spec;
|
||||||
|
if (!parse_manifest(manifest, &spec)) {
|
||||||
|
std::fprintf(stderr, "[enrichers] parse failed: %s\n", manifest.c_str());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
spec.run_path = runpy;
|
||||||
|
g_enrichers.push_back(std::move(spec));
|
||||||
|
}
|
||||||
|
closedir(d);
|
||||||
|
|
||||||
|
std::sort(g_enrichers.begin(), g_enrichers.end(),
|
||||||
|
[](const EnricherSpec& a, const EnricherSpec& b) {
|
||||||
|
return a.name < b.name;
|
||||||
|
});
|
||||||
|
return (int)g_enrichers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::vector<EnricherSpec>& enrichers_all() {
|
||||||
|
return g_enrichers;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<EnricherSpec> enrichers_for_type(const char* type_ref) {
|
||||||
|
std::vector<EnricherSpec> out;
|
||||||
|
if (!type_ref || !*type_ref) return out;
|
||||||
|
std::string want = lower(type_ref);
|
||||||
|
for (const auto& e : g_enrichers) {
|
||||||
|
if (e.applies_to.empty()) {
|
||||||
|
out.push_back(e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (const auto& t : e.applies_to) {
|
||||||
|
if (lower(t) == want) { out.push_back(e); break; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
const EnricherSpec* enricher_by_id(const char* id) {
|
||||||
|
if (!id || !*id) return nullptr;
|
||||||
|
for (const auto& e : g_enrichers) {
|
||||||
|
if (e.id == id) return &e;
|
||||||
|
}
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ge
|
||||||
+40
@@ -0,0 +1,40 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
// Registro estatico de enrichers (issue 0026).
|
||||||
|
//
|
||||||
|
// Al arrancar la app se escanea `<app_dir>/enrichers/*/manifest.yaml` y se
|
||||||
|
// rellena el registro. El context menu del viewport consulta
|
||||||
|
// `enrichers_for_type(type_ref)` para mostrar el submenu filtrado por tipo
|
||||||
|
// del nodo right-clickado.
|
||||||
|
//
|
||||||
|
// Para v1 no parseamos `params` con detalle — solo lo necesario para
|
||||||
|
// presentar el item de menu y submitear el job con `{}`.
|
||||||
|
|
||||||
|
namespace ge {
|
||||||
|
|
||||||
|
struct EnricherSpec {
|
||||||
|
std::string id; // ej: "fetch_webpage"
|
||||||
|
std::string name; // ej: "Fetch web page"
|
||||||
|
std::string description;
|
||||||
|
std::vector<std::string> applies_to; // tipos validos (case-insensitive)
|
||||||
|
std::string run_path; // path absoluto a run.py
|
||||||
|
};
|
||||||
|
|
||||||
|
// Escanea el directorio. Reentrante (limpia el registro anterior). Devuelve
|
||||||
|
// el numero de enrichers cargados, -1 si el dir no existe.
|
||||||
|
int enrichers_load(const char* enrichers_dir);
|
||||||
|
|
||||||
|
// Lista todos los enrichers cargados.
|
||||||
|
const std::vector<EnricherSpec>& enrichers_all();
|
||||||
|
|
||||||
|
// Filtra por tipo. Comparacion case-insensitive. Si applies_to es vacio en el
|
||||||
|
// manifest, el enricher se considera global (aplica a cualquier tipo).
|
||||||
|
std::vector<EnricherSpec> enrichers_for_type(const char* type_ref);
|
||||||
|
|
||||||
|
// Resuelve un enricher por id. Devuelve nullptr si no existe.
|
||||||
|
const EnricherSpec* enricher_by_id(const char* id);
|
||||||
|
|
||||||
|
} // namespace ge
|
||||||
@@ -0,0 +1,862 @@
|
|||||||
|
#include "jobs.h"
|
||||||
|
|
||||||
|
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <cstdarg>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <ctime>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
#include <signal.h>
|
||||||
|
#include <sstream>
|
||||||
|
#include <string>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/wait.h>
|
||||||
|
#include <thread>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace ge {
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
// Internal state
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
struct JobControl {
|
||||||
|
pid_t pid = -1;
|
||||||
|
std::atomic<bool> cancel_requested{false};
|
||||||
|
};
|
||||||
|
|
||||||
|
struct State {
|
||||||
|
std::string app_db_path;
|
||||||
|
std::string ops_db_path; // mutable: cambia con jobs_set_ops_db
|
||||||
|
std::string enrichers_dir;
|
||||||
|
std::string app_dir;
|
||||||
|
std::string registry_root;
|
||||||
|
|
||||||
|
std::mutex q_mu;
|
||||||
|
std::condition_variable q_cv;
|
||||||
|
std::queue<std::string> pending; // job ids
|
||||||
|
std::unordered_map<std::string, std::shared_ptr<JobControl>> running;
|
||||||
|
|
||||||
|
std::vector<std::thread> workers;
|
||||||
|
std::atomic<bool> stop_flag{false};
|
||||||
|
std::atomic<int> dirty{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
State* g_state = nullptr;
|
||||||
|
|
||||||
|
// ---- helpers --------------------------------------------------------------
|
||||||
|
|
||||||
|
long long now_ms() {
|
||||||
|
using namespace std::chrono;
|
||||||
|
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string ulid() {
|
||||||
|
// ULID-ish: timestamp ms + 10 random hex chars. Suficiente para un PK.
|
||||||
|
long long ts = now_ms();
|
||||||
|
static std::atomic<uint32_t> ctr{(uint32_t)(ts & 0xFFFFFFFF)};
|
||||||
|
uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
char buf[64];
|
||||||
|
std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool sql_exec_simple(sqlite3* db, const char* sql) {
|
||||||
|
char* err = nullptr;
|
||||||
|
int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err);
|
||||||
|
if (rc != SQLITE_OK) {
|
||||||
|
std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n",
|
||||||
|
err ? err : "?", sql);
|
||||||
|
if (err) sqlite3_free(err);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool sql_run(sqlite3* db, const char* sql,
|
||||||
|
const std::vector<std::string>& params)
|
||||||
|
{
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) {
|
||||||
|
std::fprintf(stderr, "[jobs] prepare failed: %s :: %s\n",
|
||||||
|
sqlite3_errmsg(db), sql);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < params.size(); ++i) {
|
||||||
|
sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1,
|
||||||
|
SQLITE_TRANSIENT);
|
||||||
|
}
|
||||||
|
int rc = sqlite3_step(st);
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
return rc == SQLITE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ensure_table(const char* db_path) {
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(db_path, &db,
|
||||||
|
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
|
||||||
|
nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
sql_exec_simple(db, "PRAGMA journal_mode=WAL;");
|
||||||
|
bool ok = sql_exec_simple(db,
|
||||||
|
"CREATE TABLE IF NOT EXISTS jobs ("
|
||||||
|
" id TEXT PRIMARY KEY,"
|
||||||
|
" enricher_id TEXT NOT NULL,"
|
||||||
|
" node_id TEXT,"
|
||||||
|
" node_name TEXT NOT NULL DEFAULT '',"
|
||||||
|
" params_json TEXT NOT NULL DEFAULT '{}',"
|
||||||
|
" status TEXT NOT NULL,"
|
||||||
|
" progress REAL NOT NULL DEFAULT 0,"
|
||||||
|
" stage TEXT NOT NULL DEFAULT '',"
|
||||||
|
" result_json TEXT,"
|
||||||
|
" error TEXT,"
|
||||||
|
" pid INTEGER,"
|
||||||
|
" created_at INTEGER NOT NULL,"
|
||||||
|
" started_at INTEGER,"
|
||||||
|
" finished_at INTEGER"
|
||||||
|
");"
|
||||||
|
);
|
||||||
|
sql_exec_simple(db,
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_jobs_status "
|
||||||
|
"ON jobs(status, created_at);");
|
||||||
|
|
||||||
|
// Reaper: jobs `running` huerfanos de una sesion anterior.
|
||||||
|
char ts[32];
|
||||||
|
std::snprintf(ts, sizeof(ts), "%lld", now_ms());
|
||||||
|
sql_run(db,
|
||||||
|
"UPDATE jobs SET status='error', error='process died (app restart)', "
|
||||||
|
"finished_at=? WHERE status='running'",
|
||||||
|
{ts});
|
||||||
|
|
||||||
|
sqlite3_close(db);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Escapa string para JSON. Simplificado: maneja comillas, backslash y
|
||||||
|
// caracteres de control basicos.
|
||||||
|
std::string json_escape(const std::string& s) {
|
||||||
|
std::string out;
|
||||||
|
out.reserve(s.size() + 8);
|
||||||
|
for (char c : s) {
|
||||||
|
switch (c) {
|
||||||
|
case '"': out += "\\\""; break;
|
||||||
|
case '\\': out += "\\\\"; break;
|
||||||
|
case '\b': out += "\\b"; break;
|
||||||
|
case '\f': out += "\\f"; break;
|
||||||
|
case '\n': out += "\\n"; break;
|
||||||
|
case '\r': out += "\\r"; break;
|
||||||
|
case '\t': out += "\\t"; break;
|
||||||
|
default:
|
||||||
|
if ((unsigned char)c < 0x20) {
|
||||||
|
char buf[8];
|
||||||
|
std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c);
|
||||||
|
out += buf;
|
||||||
|
} else {
|
||||||
|
out += c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lee un campo de la entidad como string. Devuelve "" si no existe.
|
||||||
|
std::string read_entity_field(const char* db_path, const char* id,
|
||||||
|
const char* col)
|
||||||
|
{
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr)
|
||||||
|
!= SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
std::string sql = std::string("SELECT ") + col +
|
||||||
|
" FROM entities WHERE id = ? LIMIT 1";
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
std::string out;
|
||||||
|
if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT);
|
||||||
|
if (sqlite3_step(st) == SQLITE_ROW) {
|
||||||
|
const unsigned char* t = sqlite3_column_text(st, 0);
|
||||||
|
if (t) out = (const char*)t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construye el JSON que se entrega al subprocess via stdin. Lee node de la
|
||||||
|
// operations.db actual.
|
||||||
|
std::string build_stdin_json(const std::string& job_id,
|
||||||
|
const std::string& enricher_id,
|
||||||
|
const std::string& node_id,
|
||||||
|
const std::string& params_json,
|
||||||
|
const std::string& ops_db,
|
||||||
|
const std::string& app_dir,
|
||||||
|
const std::string& registry_root)
|
||||||
|
{
|
||||||
|
std::string node_type, node_name, node_metadata = "{}";
|
||||||
|
if (!node_id.empty()) {
|
||||||
|
node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref");
|
||||||
|
node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name");
|
||||||
|
std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata");
|
||||||
|
if (!m.empty()) node_metadata = m;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string cache_dir = app_dir + "/cache";
|
||||||
|
|
||||||
|
std::ostringstream o;
|
||||||
|
o << '{'
|
||||||
|
<< "\"job_id\":\"" << json_escape(job_id) << "\","
|
||||||
|
<< "\"enricher_id\":\""<< json_escape(enricher_id) << "\","
|
||||||
|
<< "\"node_id\":\"" << json_escape(node_id) << "\","
|
||||||
|
<< "\"node_type\":\"" << json_escape(node_type) << "\","
|
||||||
|
<< "\"node_name\":\"" << json_escape(node_name) << "\","
|
||||||
|
<< "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << ","
|
||||||
|
<< "\"params\":" << (params_json.empty() ? "{}" : params_json) << ","
|
||||||
|
<< "\"ops_db_path\":\""<< json_escape(ops_db) << "\","
|
||||||
|
<< "\"app_dir\":\"" << json_escape(app_dir) << "\","
|
||||||
|
<< "\"cache_dir\":\"" << json_escape(cache_dir) << "\","
|
||||||
|
<< "\"registry_root\":\"" << json_escape(registry_root) << "\""
|
||||||
|
<< '}';
|
||||||
|
return o.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- subprocess (POSIX) ---------------------------------------------------
|
||||||
|
|
||||||
|
struct ProcResult {
|
||||||
|
int exit_code = -1;
|
||||||
|
bool signaled = false;
|
||||||
|
int signal = 0;
|
||||||
|
std::string stdout_buf;
|
||||||
|
std::string stderr_tail; // ultimas lineas, para mensajes de error
|
||||||
|
};
|
||||||
|
|
||||||
|
void update_progress(const std::string& job_id, double prog,
|
||||||
|
const std::string& stage)
|
||||||
|
{
|
||||||
|
if (!g_state) return;
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?";
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_double(st, 1, prog);
|
||||||
|
sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_step(st);
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawnea python3 run.py. Pipes para stdin (write), stdout (read),
|
||||||
|
// stderr (read). Lee stdout entero al final; lee stderr line-by-line en un
|
||||||
|
// thread auxiliar parseando "PROGRESS:<float> <stage>".
|
||||||
|
ProcResult run_subprocess(const std::string& job_id,
|
||||||
|
const std::string& run_path,
|
||||||
|
const std::string& stdin_payload,
|
||||||
|
std::shared_ptr<JobControl> ctrl)
|
||||||
|
{
|
||||||
|
ProcResult out;
|
||||||
|
|
||||||
|
int p_in[2] = {-1, -1}; // padre escribe en p_in[1], hijo lee p_in[0]
|
||||||
|
int p_out[2] = {-1, -1};
|
||||||
|
int p_err[2] = {-1, -1};
|
||||||
|
if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) {
|
||||||
|
out.stderr_tail = "pipe() failed";
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = fork();
|
||||||
|
if (pid < 0) {
|
||||||
|
out.stderr_tail = "fork() failed";
|
||||||
|
for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) {
|
||||||
|
if (fd >= 0) close(fd);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pid == 0) {
|
||||||
|
// child
|
||||||
|
dup2(p_in[0], 0);
|
||||||
|
dup2(p_out[1], 1);
|
||||||
|
dup2(p_err[1], 2);
|
||||||
|
close(p_in[0]); close(p_in[1]);
|
||||||
|
close(p_out[0]); close(p_out[1]);
|
||||||
|
close(p_err[0]); close(p_err[1]);
|
||||||
|
|
||||||
|
// Resolver intérprete: <registry_root>/python/.venv/bin/python3
|
||||||
|
std::string py = g_state->registry_root + "/python/.venv/bin/python3";
|
||||||
|
const char* argv[] = { py.c_str(), run_path.c_str(), nullptr };
|
||||||
|
execv(py.c_str(), (char* const*)argv);
|
||||||
|
std::fprintf(stderr, "execv failed: %s\n", py.c_str());
|
||||||
|
_exit(127);
|
||||||
|
}
|
||||||
|
|
||||||
|
// parent
|
||||||
|
ctrl->pid = pid;
|
||||||
|
close(p_in[0]);
|
||||||
|
close(p_out[1]);
|
||||||
|
close(p_err[1]);
|
||||||
|
|
||||||
|
// Persistir pid en BD para mostrarlo en UI.
|
||||||
|
{
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1,
|
||||||
|
&st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_int (st, 1, (int)pid);
|
||||||
|
sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_step(st);
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Escribir stdin entero.
|
||||||
|
if (!stdin_payload.empty()) {
|
||||||
|
ssize_t written = 0;
|
||||||
|
const char* p = stdin_payload.c_str();
|
||||||
|
size_t left = stdin_payload.size();
|
||||||
|
while (left > 0) {
|
||||||
|
ssize_t n = write(p_in[1], p + written, left);
|
||||||
|
if (n < 0) { if (errno == EINTR) continue; break; }
|
||||||
|
written += n; left -= (size_t)n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(p_in[1]);
|
||||||
|
|
||||||
|
// Thread aux para stderr: parsea PROGRESS y guarda tail.
|
||||||
|
std::string stderr_tail_local;
|
||||||
|
std::mutex tail_mu;
|
||||||
|
std::thread err_t([&]() {
|
||||||
|
std::string line;
|
||||||
|
char ch;
|
||||||
|
while (true) {
|
||||||
|
ssize_t n = read(p_err[0], &ch, 1);
|
||||||
|
if (n <= 0) break;
|
||||||
|
if (ch == '\n') {
|
||||||
|
// Parse line.
|
||||||
|
if (line.rfind("PROGRESS:", 0) == 0) {
|
||||||
|
// PROGRESS:<float> <stage...>
|
||||||
|
const char* p = line.c_str() + 9;
|
||||||
|
char* endp = nullptr;
|
||||||
|
double prog = std::strtod(p, &endp);
|
||||||
|
std::string stage;
|
||||||
|
if (endp && *endp) {
|
||||||
|
while (*endp == ' ') ++endp;
|
||||||
|
stage = endp;
|
||||||
|
}
|
||||||
|
update_progress(job_id, prog, stage);
|
||||||
|
} else {
|
||||||
|
std::lock_guard<std::mutex> g(tail_mu);
|
||||||
|
stderr_tail_local += line;
|
||||||
|
stderr_tail_local += '\n';
|
||||||
|
// Cap a ~4 KB.
|
||||||
|
if (stderr_tail_local.size() > 4096) {
|
||||||
|
stderr_tail_local.erase(0, stderr_tail_local.size() - 4096);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line.clear();
|
||||||
|
} else {
|
||||||
|
line.push_back(ch);
|
||||||
|
if (line.size() > 4096) line.clear(); // proteccion
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Leer stdout entero (sincrono).
|
||||||
|
{
|
||||||
|
char buf[4096];
|
||||||
|
while (true) {
|
||||||
|
ssize_t n = read(p_out[0], buf, sizeof(buf));
|
||||||
|
if (n <= 0) break;
|
||||||
|
out.stdout_buf.append(buf, (size_t)n);
|
||||||
|
if (out.stdout_buf.size() > 1024 * 1024) {
|
||||||
|
// 1 MB cap.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(p_out[0]);
|
||||||
|
|
||||||
|
// Esperar al hijo. Si se pidio cancelar, mandamos SIGTERM y SIGKILL.
|
||||||
|
int status = 0;
|
||||||
|
while (true) {
|
||||||
|
if (ctrl->cancel_requested.load() && pid > 0) {
|
||||||
|
kill(pid, SIGTERM);
|
||||||
|
// pequena gracia, luego SIGKILL si hace falta
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
pid_t r = waitpid(pid, &status, WNOHANG);
|
||||||
|
if (r == pid) goto reaped;
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
|
}
|
||||||
|
kill(pid, SIGKILL);
|
||||||
|
}
|
||||||
|
pid_t r = waitpid(pid, &status, 0);
|
||||||
|
if (r == pid) break;
|
||||||
|
if (r < 0 && errno == EINTR) continue;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
reaped:
|
||||||
|
err_t.join();
|
||||||
|
close(p_err[0]);
|
||||||
|
|
||||||
|
if (WIFEXITED(status)) {
|
||||||
|
out.exit_code = WEXITSTATUS(status);
|
||||||
|
} else if (WIFSIGNALED(status)) {
|
||||||
|
out.signaled = true;
|
||||||
|
out.signal = WTERMSIG(status);
|
||||||
|
out.exit_code = -1;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> g(tail_mu);
|
||||||
|
out.stderr_tail = std::move(stderr_tail_local);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- worker ---------------------------------------------------------------
|
||||||
|
|
||||||
|
void persist_status(const std::string& job_id, const std::string& status,
|
||||||
|
const std::string& result_json,
|
||||||
|
const std::string& error,
|
||||||
|
bool set_finished)
|
||||||
|
{
|
||||||
|
if (!g_state) return;
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (set_finished) {
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? "
|
||||||
|
"WHERE id=?";
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_int64 (st, 4, now_ms());
|
||||||
|
sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_step(st);
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
} else {
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?";
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_int64(st, 2, now_ms());
|
||||||
|
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_step(st);
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
}
|
||||||
|
sqlite3_close(db);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lee la fila para reconstruir el contexto del job antes de spawn.
|
||||||
|
struct JobContext {
|
||||||
|
std::string id, enricher_id, node_id, node_name, params_json, status;
|
||||||
|
};
|
||||||
|
bool load_job(const std::string& id, JobContext* out) {
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status "
|
||||||
|
"FROM jobs WHERE id=?";
|
||||||
|
bool ok = false;
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
if (sqlite3_step(st) == SQLITE_ROW) {
|
||||||
|
auto col = [&](int i) {
|
||||||
|
const unsigned char* t = sqlite3_column_text(st, i);
|
||||||
|
return std::string(t ? (const char*)t : "");
|
||||||
|
};
|
||||||
|
out->id = col(0);
|
||||||
|
out->enricher_id = col(1);
|
||||||
|
out->node_id = col(2);
|
||||||
|
out->node_name = col(3);
|
||||||
|
out->params_json = col(4);
|
||||||
|
out->status = col(5);
|
||||||
|
ok = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
void worker_loop() {
|
||||||
|
while (!g_state->stop_flag.load()) {
|
||||||
|
std::string job_id;
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(g_state->q_mu);
|
||||||
|
g_state->q_cv.wait(lk, [] {
|
||||||
|
return g_state->stop_flag.load() || !g_state->pending.empty();
|
||||||
|
});
|
||||||
|
if (g_state->stop_flag.load()) return;
|
||||||
|
job_id = std::move(g_state->pending.front());
|
||||||
|
g_state->pending.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
JobContext ctx;
|
||||||
|
if (!load_job(job_id, &ctx)) continue;
|
||||||
|
if (ctx.status == "cancelled") continue;
|
||||||
|
|
||||||
|
// Resolver run.py por convencion: <enrichers_dir>/<enricher_id>/run.py.
|
||||||
|
std::string run_path = g_state->enrichers_dir + "/" + ctx.enricher_id +
|
||||||
|
"/run.py";
|
||||||
|
|
||||||
|
// Marcar running.
|
||||||
|
persist_status(job_id, "running", "", "", false);
|
||||||
|
|
||||||
|
auto ctrl = std::make_shared<JobControl>();
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
g_state->running[job_id] = ctrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construir stdin y ejecutar.
|
||||||
|
std::string ops_db;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
ops_db = g_state->ops_db_path;
|
||||||
|
}
|
||||||
|
std::string stdin_payload = build_stdin_json(
|
||||||
|
ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json,
|
||||||
|
ops_db, g_state->app_dir, g_state->registry_root);
|
||||||
|
|
||||||
|
ProcResult res = run_subprocess(job_id, run_path, stdin_payload, ctrl);
|
||||||
|
|
||||||
|
// Estado final.
|
||||||
|
std::string final_status, error;
|
||||||
|
std::string result_json = res.stdout_buf;
|
||||||
|
// Trim del result_json (saca trailing whitespace).
|
||||||
|
while (!result_json.empty() &&
|
||||||
|
(result_json.back() == '\n' || result_json.back() == '\r' ||
|
||||||
|
result_json.back() == ' ' || result_json.back() == '\t')) {
|
||||||
|
result_json.pop_back();
|
||||||
|
}
|
||||||
|
if (ctrl->cancel_requested.load()) {
|
||||||
|
final_status = "cancelled";
|
||||||
|
error = "user cancelled";
|
||||||
|
} else if (res.exit_code == 0) {
|
||||||
|
final_status = "done";
|
||||||
|
if (result_json.empty()) result_json = "{}";
|
||||||
|
} else {
|
||||||
|
final_status = "error";
|
||||||
|
char buf[64];
|
||||||
|
if (res.signaled) {
|
||||||
|
std::snprintf(buf, sizeof(buf), "signal %d", res.signal);
|
||||||
|
} else {
|
||||||
|
std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code);
|
||||||
|
}
|
||||||
|
error = std::string(buf);
|
||||||
|
if (!res.stderr_tail.empty()) {
|
||||||
|
error += "\n";
|
||||||
|
error += res.stderr_tail;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
persist_status(job_id, final_status, result_json, error, true);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
g_state->running.erase(job_id);
|
||||||
|
}
|
||||||
|
if (final_status == "done") {
|
||||||
|
g_state->dirty.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
// Public API
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
bool jobs_init(const char* app_db_path,
|
||||||
|
const char* ops_db_path,
|
||||||
|
const char* enrichers_dir,
|
||||||
|
const char* app_dir,
|
||||||
|
const char* registry_root,
|
||||||
|
int n_workers)
|
||||||
|
{
|
||||||
|
if (g_state) return true;
|
||||||
|
if (!app_db_path || !*app_db_path) return false;
|
||||||
|
if (n_workers < 1) n_workers = 1;
|
||||||
|
if (n_workers > 8) n_workers = 8;
|
||||||
|
|
||||||
|
if (!ensure_table(app_db_path)) return false;
|
||||||
|
|
||||||
|
g_state = new State();
|
||||||
|
g_state->app_db_path = app_db_path;
|
||||||
|
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
|
||||||
|
g_state->enrichers_dir = enrichers_dir ? enrichers_dir : "";
|
||||||
|
g_state->app_dir = app_dir ? app_dir : "";
|
||||||
|
g_state->registry_root = registry_root ? registry_root : "";
|
||||||
|
|
||||||
|
// Rehidratacion: jobs queued de sesiones anteriores se reencolan.
|
||||||
|
{
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY,
|
||||||
|
nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"SELECT id FROM jobs WHERE status='queued' ORDER BY created_at";
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||||
|
const unsigned char* t = sqlite3_column_text(st, 0);
|
||||||
|
if (t) g_state->pending.push((const char*)t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < n_workers; ++i) {
|
||||||
|
g_state->workers.emplace_back(worker_loop);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void jobs_set_ops_db(const char* ops_db_path) {
|
||||||
|
if (!g_state) return;
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
|
||||||
|
}
|
||||||
|
|
||||||
|
bool jobs_submit(const char* enricher_id,
|
||||||
|
const char* node_id,
|
||||||
|
const char* node_name,
|
||||||
|
const char* params_json,
|
||||||
|
char* out_id, size_t out_id_n)
|
||||||
|
{
|
||||||
|
if (!g_state || !enricher_id || !*enricher_id) return false;
|
||||||
|
if (!out_id || out_id_n < 32) return false;
|
||||||
|
|
||||||
|
std::string id = ulid();
|
||||||
|
std::snprintf(out_id, out_id_n, "%s", id.c_str());
|
||||||
|
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, "
|
||||||
|
"status, progress, stage, created_at) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)";
|
||||||
|
bool ok = false;
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT);
|
||||||
|
sqlite3_bind_int64(st, 6, now_ms());
|
||||||
|
ok = sqlite3_step(st) == SQLITE_DONE;
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
if (!ok) return false;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
g_state->pending.push(id);
|
||||||
|
}
|
||||||
|
g_state->q_cv.notify_one();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool jobs_cancel(const char* job_id) {
|
||||||
|
if (!g_state || !job_id) return false;
|
||||||
|
std::shared_ptr<JobControl> ctrl;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
auto it = g_state->running.find(job_id);
|
||||||
|
if (it != g_state->running.end()) ctrl = it->second;
|
||||||
|
}
|
||||||
|
if (ctrl) {
|
||||||
|
ctrl->cancel_requested.store(true);
|
||||||
|
if (ctrl->pid > 0) kill(ctrl->pid, SIGTERM);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// No corriendo: marcar cancelled si esta queued.
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"UPDATE jobs SET status='cancelled', finished_at=?, "
|
||||||
|
"error='cancelled before start' WHERE id=? AND status='queued'";
|
||||||
|
bool ok = false;
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_int64(st, 1, now_ms());
|
||||||
|
sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT);
|
||||||
|
ok = sqlite3_step(st) == SQLITE_DONE;
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool jobs_delete(const char* job_id) {
|
||||||
|
if (!g_state || !job_id) return false;
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')";
|
||||||
|
bool ok = false;
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT);
|
||||||
|
ok = sqlite3_step(st) == SQLITE_DONE;
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool jobs_list(std::vector<JobRow>* out, int limit) {
|
||||||
|
if (!g_state || !out) return false;
|
||||||
|
out->clear();
|
||||||
|
if (limit < 1) limit = 1;
|
||||||
|
if (limit > 1000) limit = 1000;
|
||||||
|
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql =
|
||||||
|
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, "
|
||||||
|
"progress, stage, COALESCE(error,''), COALESCE(result_json,''), "
|
||||||
|
"created_at, COALESCE(started_at,0), COALESCE(finished_at,0) "
|
||||||
|
"FROM jobs ORDER BY created_at DESC LIMIT ?";
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
sqlite3_bind_int(st, 1, limit);
|
||||||
|
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||||
|
JobRow r;
|
||||||
|
auto col = [&](int i) {
|
||||||
|
const unsigned char* t = sqlite3_column_text(st, i);
|
||||||
|
return std::string(t ? (const char*)t : "");
|
||||||
|
};
|
||||||
|
r.id = col(0);
|
||||||
|
r.enricher_id = col(1);
|
||||||
|
r.node_id = col(2);
|
||||||
|
r.node_name = col(3);
|
||||||
|
r.status = col(4);
|
||||||
|
r.progress = sqlite3_column_double(st, 5);
|
||||||
|
r.stage = col(6);
|
||||||
|
r.error = col(7);
|
||||||
|
r.result_json = col(8);
|
||||||
|
r.created_at = sqlite3_column_int64(st, 9);
|
||||||
|
r.started_at = sqlite3_column_int64(st, 10);
|
||||||
|
r.finished_at = sqlite3_column_int64(st, 11);
|
||||||
|
out->push_back(std::move(r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
JobCounters jobs_counters() {
|
||||||
|
JobCounters c{};
|
||||||
|
if (!g_state) return c;
|
||||||
|
sqlite3* db = nullptr;
|
||||||
|
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
|
||||||
|
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
||||||
|
if (db) sqlite3_close(db);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
sqlite3_stmt* st = nullptr;
|
||||||
|
const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status";
|
||||||
|
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
|
||||||
|
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||||
|
const unsigned char* s = sqlite3_column_text(st, 0);
|
||||||
|
int n = sqlite3_column_int(st, 1);
|
||||||
|
if (!s) continue;
|
||||||
|
std::string status((const char*)s);
|
||||||
|
if (status == "queued") c.queued = n;
|
||||||
|
else if (status == "running") c.running = n;
|
||||||
|
else if (status == "done") c.done = n;
|
||||||
|
else if (status == "error") c.error = n;
|
||||||
|
else if (status == "cancelled") c.cancelled = n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sqlite3_finalize(st);
|
||||||
|
sqlite3_close(db);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
int jobs_dirty_counter() {
|
||||||
|
if (!g_state) return 0;
|
||||||
|
return g_state->dirty.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void jobs_shutdown() {
|
||||||
|
if (!g_state) return;
|
||||||
|
g_state->stop_flag.store(true);
|
||||||
|
// Cancelar todos los running.
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_state->q_mu);
|
||||||
|
for (auto& kv : g_state->running) {
|
||||||
|
kv.second->cancel_requested.store(true);
|
||||||
|
if (kv.second->pid > 0) kill(kv.second->pid, SIGTERM);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
g_state->q_cv.notify_all();
|
||||||
|
for (auto& t : g_state->workers) {
|
||||||
|
if (t.joinable()) t.join();
|
||||||
|
}
|
||||||
|
delete g_state;
|
||||||
|
g_state = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ge
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
// Sistema de jobs asincronos para enrichers (issue 0026).
|
||||||
|
//
|
||||||
|
// Diseno:
|
||||||
|
// - Tabla `jobs` en graph_explorer.db (NO en operations.db).
|
||||||
|
// - Pool de N std::thread workers (default 2).
|
||||||
|
// - Cada job spawnea un subprocess Python `enrichers/<id>/run.py` que recibe
|
||||||
|
// contexto por stdin (JSON), emite progreso por stderr y resultado final
|
||||||
|
// por stdout (JSON).
|
||||||
|
// - El worker aplica entities/relations/node_updates al operations.db indicado.
|
||||||
|
// - dirty_counter se incrementa al completar un job — el render() lee el
|
||||||
|
// contador y triggerea un reload del grafo cuando cambia.
|
||||||
|
//
|
||||||
|
// El estado vivo (running/queued en RAM) se persiste tambien en SQLite para
|
||||||
|
// que cancelaciones, reinicios y queries de UI vean lo mismo.
|
||||||
|
|
||||||
|
namespace ge {
|
||||||
|
|
||||||
|
struct JobRow {
|
||||||
|
std::string id;
|
||||||
|
std::string enricher_id;
|
||||||
|
std::string node_id;
|
||||||
|
std::string node_name;
|
||||||
|
std::string status; // queued|running|done|error|cancelled
|
||||||
|
double progress = 0.0;
|
||||||
|
std::string stage;
|
||||||
|
std::string error;
|
||||||
|
std::string result_json;
|
||||||
|
long long created_at = 0;
|
||||||
|
long long started_at = 0;
|
||||||
|
long long finished_at = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Inicializa el sistema. Crea la tabla `jobs` si no existe, marca como `error`
|
||||||
|
// los jobs que quedaron `running` de una sesion anterior, lanza los workers.
|
||||||
|
// `app_db_path` es <app_dir>/graph_explorer.db (jobs).
|
||||||
|
// `ops_db_path` es la operations.db actualmente cargada (target de mutaciones).
|
||||||
|
// `enrichers_dir` es <app_dir>/enrichers/.
|
||||||
|
// `app_dir` se usa como root para resolver `cache/` (issue 0027).
|
||||||
|
// `registry_root` se pasa al subprocess como FN_REGISTRY_ROOT para que el
|
||||||
|
// enricher pueda importar funciones del registry.
|
||||||
|
// Retorna false si alguno de los paths falla.
|
||||||
|
bool jobs_init(const char* app_db_path,
|
||||||
|
const char* ops_db_path,
|
||||||
|
const char* enrichers_dir,
|
||||||
|
const char* app_dir,
|
||||||
|
const char* registry_root,
|
||||||
|
int n_workers);
|
||||||
|
|
||||||
|
// Cambia la operations.db objetivo (cuando el usuario abre otra). Drena cola
|
||||||
|
// pendiente — los jobs ya queued NO se reapuntan.
|
||||||
|
void jobs_set_ops_db(const char* ops_db_path);
|
||||||
|
|
||||||
|
// Encola un job. params_json puede ser "{}" o JSON valido. node_id puede ser
|
||||||
|
// vacio (job global). Devuelve false si la BD falla. out_id (>= 64) recibe el
|
||||||
|
// ULID generado.
|
||||||
|
bool jobs_submit(const char* enricher_id,
|
||||||
|
const char* node_id,
|
||||||
|
const char* node_name,
|
||||||
|
const char* params_json,
|
||||||
|
char* out_id, size_t out_id_n);
|
||||||
|
|
||||||
|
// Senala SIGTERM al subprocess y marca el job como `cancelled`. Si esta
|
||||||
|
// `queued` lo marca directamente.
|
||||||
|
bool jobs_cancel(const char* job_id);
|
||||||
|
|
||||||
|
// Borra un job en estado terminal (done/error/cancelled).
|
||||||
|
bool jobs_delete(const char* job_id);
|
||||||
|
|
||||||
|
// Snapshot ordenado por created_at DESC.
|
||||||
|
bool jobs_list(std::vector<JobRow>* out, int limit = 200);
|
||||||
|
|
||||||
|
// Counters para el badge en la toolbar.
|
||||||
|
struct JobCounters {
|
||||||
|
int queued = 0;
|
||||||
|
int running = 0;
|
||||||
|
int done = 0;
|
||||||
|
int error = 0;
|
||||||
|
int cancelled = 0;
|
||||||
|
};
|
||||||
|
JobCounters jobs_counters();
|
||||||
|
|
||||||
|
// Counter monotono que se incrementa cada vez que un job completa con
|
||||||
|
// cambios en operations.db. El main thread compara contra su ultimo valor
|
||||||
|
// conocido y, si cambio, llama reload_graph().
|
||||||
|
int jobs_dirty_counter();
|
||||||
|
|
||||||
|
// Cierra el pool y espera a los workers (cancelando jobs en vuelo).
|
||||||
|
void jobs_shutdown();
|
||||||
|
|
||||||
|
} // namespace ge
|
||||||
@@ -25,6 +25,8 @@
|
|||||||
#include "layout_store.h"
|
#include "layout_store.h"
|
||||||
#include "entity_ops.h"
|
#include "entity_ops.h"
|
||||||
#include "project_manager.h"
|
#include "project_manager.h"
|
||||||
|
#include "jobs.h"
|
||||||
|
#include "enrichers.h"
|
||||||
|
|
||||||
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
|
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
|
||||||
|
|
||||||
@@ -39,6 +41,13 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#ifndef _WIN32
|
||||||
|
#include <unistd.h>
|
||||||
|
#else
|
||||||
|
#include <direct.h>
|
||||||
|
#define getcwd _getcwd
|
||||||
|
#endif
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Estado global de la app
|
// Estado global de la app
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
@@ -116,6 +125,32 @@ static void apply_static_layout(int mode) {
|
|||||||
// Forward decl — definido mas abajo, lo necesita switch_to_project.
|
// Forward decl — definido mas abajo, lo necesita switch_to_project.
|
||||||
static bool load_input();
|
static bool load_input();
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
// Registry path resolution (issue 0026)
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Devuelve el path absoluto al root de fn_registry. Estrategia:
|
||||||
|
// 1) FN_REGISTRY_ROOT env var
|
||||||
|
// 2) Sube desde getcwd() buscando un dir con `registry.db`
|
||||||
|
// 3) "" si no se encuentra
|
||||||
|
static std::string resolve_registry_root() {
|
||||||
|
if (const char* env = std::getenv("FN_REGISTRY_ROOT")) {
|
||||||
|
if (env && *env) return env;
|
||||||
|
}
|
||||||
|
char cwd[4096];
|
||||||
|
if (getcwd(cwd, sizeof(cwd)) == nullptr) return "";
|
||||||
|
std::string p = cwd;
|
||||||
|
for (int i = 0; i < 8; ++i) {
|
||||||
|
std::string probe = p + "/registry.db";
|
||||||
|
FILE* f = std::fopen(probe.c_str(), "rb");
|
||||||
|
if (f) { std::fclose(f); return p; }
|
||||||
|
size_t s = p.find_last_of('/');
|
||||||
|
if (s == std::string::npos || s == 0) break;
|
||||||
|
p = p.substr(0, s);
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Project lifecycle
|
// Project lifecycle
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
@@ -240,6 +275,9 @@ static bool load_input() {
|
|||||||
ge::entity_index_build(g_input.uri, &g_idx);
|
ge::entity_index_build(g_input.uri, &g_idx);
|
||||||
g_app.input_db_path = g_input.uri ? g_input.uri : "";
|
g_app.input_db_path = g_input.uri ? g_input.uri : "";
|
||||||
|
|
||||||
|
// issue 0026 — apunta el JobRunner a la nueva operations.db.
|
||||||
|
if (g_input.uri) ge::jobs_set_ops_db(g_input.uri);
|
||||||
|
|
||||||
// Cargar posiciones guardadas para este graph_hash
|
// Cargar posiciones guardadas para este graph_hash
|
||||||
g_graph_hash = ge::compute_graph_hash(g_input.uri);
|
g_graph_hash = ge::compute_graph_hash(g_input.uri);
|
||||||
int restored = ge::layout_store_load(g_graph_hash, g_graph);
|
int restored = ge::layout_store_load(g_graph_hash, g_graph);
|
||||||
@@ -483,7 +521,27 @@ static void render_context_menu() {
|
|||||||
|
|
||||||
ImGui::Separator();
|
ImGui::Separator();
|
||||||
if (ImGui::BeginMenu("Run enricher")) {
|
if (ImGui::BeginMenu("Run enricher")) {
|
||||||
ImGui::TextDisabled("(coming soon — issues 0001/0002/0003)");
|
// issue 0026 — listamos enrichers cuyo applies_to incluye este type.
|
||||||
|
const char* type_name = (n.type_id < (uint16_t)g_graph.type_count)
|
||||||
|
? g_graph.types[n.type_id].name : "";
|
||||||
|
auto specs = ge::enrichers_for_type(type_name);
|
||||||
|
if (!sql_id) {
|
||||||
|
ImGui::TextDisabled("(node has no entity id)");
|
||||||
|
} else if (specs.empty()) {
|
||||||
|
ImGui::TextDisabled("(no enrichers para tipo '%s')", type_name);
|
||||||
|
} else {
|
||||||
|
for (const auto& s : specs) {
|
||||||
|
if (ImGui::MenuItem(s.name.c_str())) {
|
||||||
|
char job_id[64];
|
||||||
|
bool ok = ge::jobs_submit(s.id.c_str(), sql_id, lbl,
|
||||||
|
"{}", job_id, sizeof(job_id));
|
||||||
|
if (ok) g_app.panel_jobs = true; // abrir panel auto
|
||||||
|
}
|
||||||
|
if (!s.description.empty() && ImGui::IsItemHovered()) {
|
||||||
|
ImGui::SetTooltip("%s", s.description.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
ImGui::EndMenu();
|
ImGui::EndMenu();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -512,6 +570,7 @@ static fn_ui::PanelToggle g_panels[] = {
|
|||||||
{"Note", nullptr, &g_app.panel_note},
|
{"Note", nullptr, &g_app.panel_note},
|
||||||
{"Types", nullptr, &g_app.panel_type_editor},
|
{"Types", nullptr, &g_app.panel_type_editor},
|
||||||
{"Table", nullptr, &g_app.panel_table},
|
{"Table", nullptr, &g_app.panel_table},
|
||||||
|
{"Jobs", nullptr, &g_app.panel_jobs},
|
||||||
};
|
};
|
||||||
|
|
||||||
static void render() {
|
static void render() {
|
||||||
@@ -596,6 +655,16 @@ static void render() {
|
|||||||
g_app.apply_layout_tick = 0;
|
g_app.apply_layout_tick = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// issue 0026 — si un job termino con cambios, dispara reload del grafo.
|
||||||
|
{
|
||||||
|
static int s_last_dirty = 0;
|
||||||
|
int d = ge::jobs_dirty_counter();
|
||||||
|
if (d != s_last_dirty) {
|
||||||
|
s_last_dirty = d;
|
||||||
|
g_app.want_reload = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Triggers desde la toolbar
|
// Triggers desde la toolbar
|
||||||
if (g_app.want_fit) {
|
if (g_app.want_fit) {
|
||||||
graph_viewport_fit(g_graph, g_viewport);
|
graph_viewport_fit(g_graph, g_viewport);
|
||||||
@@ -1194,6 +1263,12 @@ static void render() {
|
|||||||
ge::views_table_window(g_app);
|
ge::views_table_window(g_app);
|
||||||
ge::views_import_dataset_modal(g_app);
|
ge::views_import_dataset_modal(g_app);
|
||||||
|
|
||||||
|
// Jobs panel (issue 0026) — flotante, dockeable.
|
||||||
|
ImGui::SetNextWindowPos (ImVec2(vp->WorkPos.x + W * 0.20f, top + 40.0f),
|
||||||
|
ImGuiCond_FirstUseEver);
|
||||||
|
ImGui::SetNextWindowSize(ImVec2(900.0f, 360.0f), ImGuiCond_FirstUseEver);
|
||||||
|
ge::views_jobs(g_app);
|
||||||
|
|
||||||
g_first_render = false;
|
g_first_render = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1418,6 +1493,34 @@ int main(int argc, char** argv) {
|
|||||||
"cualquier app del registry y permite explorar entidades/relaciones con "
|
"cualquier app del registry y permite explorar entidades/relaciones con "
|
||||||
"shapes/iconos/layouts/filtros.");
|
"shapes/iconos/layouts/filtros.");
|
||||||
|
|
||||||
|
// issue 0026 — sistema de jobs + enrichers.
|
||||||
|
{
|
||||||
|
std::string registry_root = resolve_registry_root();
|
||||||
|
std::string app_dir = registry_root.empty()
|
||||||
|
? "."
|
||||||
|
: registry_root + "/projects/osint_graph/apps/graph_explorer";
|
||||||
|
std::string enrichers_dir = app_dir + "/enrichers";
|
||||||
|
|
||||||
|
// graph_explorer.db es el mismo SQLite usado por layout_store.
|
||||||
|
const char* app_db = g_layout_db_path.empty()
|
||||||
|
? "graph_explorer.db" : g_layout_db_path.c_str();
|
||||||
|
|
||||||
|
ge::enrichers_load(enrichers_dir.c_str());
|
||||||
|
if (!ge::jobs_init(app_db,
|
||||||
|
g_input.uri ? g_input.uri : "",
|
||||||
|
enrichers_dir.c_str(),
|
||||||
|
app_dir.c_str(),
|
||||||
|
registry_root.c_str(),
|
||||||
|
/*n_workers=*/2)) {
|
||||||
|
std::fprintf(stderr, "[graph_explorer] jobs_init failed (panel disabled)\n");
|
||||||
|
} else {
|
||||||
|
std::fprintf(stdout,
|
||||||
|
"[graph_explorer] jobs_init OK — enrichers_dir=%s, registry_root=%s, %d enrichers\n",
|
||||||
|
enrichers_dir.c_str(), registry_root.c_str(),
|
||||||
|
(int)ge::enrichers_all().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int rc = fn::run_app(
|
int rc = fn::run_app(
|
||||||
{.title = "graph_explorer",
|
{.title = "graph_explorer",
|
||||||
.width = 1600,
|
.width = 1600,
|
||||||
@@ -1429,6 +1532,7 @@ int main(int argc, char** argv) {
|
|||||||
render);
|
render);
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
|
ge::jobs_shutdown();
|
||||||
if (g_gpu_ctx) graph_force_layout_gpu_destroy(g_gpu_ctx);
|
if (g_gpu_ctx) graph_force_layout_gpu_destroy(g_gpu_ctx);
|
||||||
if (g_atlas) graph_icons_destroy(g_atlas);
|
if (g_atlas) graph_icons_destroy(g_atlas);
|
||||||
graph_viewport_destroy(g_viewport);
|
graph_viewport_destroy(g_viewport);
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ struct AppState {
|
|||||||
bool panel_stats = true;
|
bool panel_stats = true;
|
||||||
bool panel_viewport = true;
|
bool panel_viewport = true;
|
||||||
bool panel_note = false;
|
bool panel_note = false;
|
||||||
|
bool panel_jobs = false; // issue 0026
|
||||||
bool show_filters_modal = false;
|
bool show_filters_modal = false;
|
||||||
bool show_open_modal = false;
|
bool show_open_modal = false;
|
||||||
|
|
||||||
@@ -353,6 +354,14 @@ void views_type_editor(AppState& app);
|
|||||||
// te_delete_use_count via consulta a operations.db antes de mostrarlo.
|
// te_delete_use_count via consulta a operations.db antes de mostrarlo.
|
||||||
bool views_type_editor_delete_modal(AppState& app);
|
bool views_type_editor_delete_modal(AppState& app);
|
||||||
|
|
||||||
|
// ---- Jobs panel (issue 0026) ---------------------------------------------
|
||||||
|
|
||||||
|
// Renderiza el panel "Jobs" — tabla con todos los jobs (queued/running/done/
|
||||||
|
// error/cancelled). Botones por fila para cancelar / reintentar / borrar.
|
||||||
|
// Click en target_node centra el viewport sobre ese nodo (futuro). Polling
|
||||||
|
// cada N frames para no spammear la BD.
|
||||||
|
void views_jobs(AppState& app);
|
||||||
|
|
||||||
// ---- Filter helpers (issue 0009) -----------------------------------------
|
// ---- Filter helpers (issue 0009) -----------------------------------------
|
||||||
|
|
||||||
// True si el filtro tiene query no vacia o al menos un tag activo.
|
// True si el filtro tiene query no vacia o al menos un tag activo.
|
||||||
|
|||||||
+199
@@ -0,0 +1,199 @@
|
|||||||
|
#include "views.h"
|
||||||
|
#include "jobs.h"
|
||||||
|
|
||||||
|
#include "core/icons_tabler.h"
|
||||||
|
#include "core/tokens.h"
|
||||||
|
|
||||||
|
#include "imgui.h"
|
||||||
|
|
||||||
|
#include <cfloat>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstdio>
|
||||||
|
|
||||||
|
namespace ge {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
// Cache de la lista de jobs. Se refresca cada N frames para no abrir SQLite
|
||||||
|
// en cada frame. ~10 Hz es suficiente para una progress bar fluida.
|
||||||
|
struct JobsCache {
|
||||||
|
std::vector<JobRow> rows;
|
||||||
|
int last_frame_refresh = -100;
|
||||||
|
int filter_idx = 0; // 0=all 1=active 2=done 3=error
|
||||||
|
char buf[8] = {};
|
||||||
|
};
|
||||||
|
JobsCache g_jobs_cache;
|
||||||
|
|
||||||
|
const char* status_icon(const std::string& s) {
|
||||||
|
if (s == "queued") return TI_HOURGLASS;
|
||||||
|
if (s == "running") return TI_PLAYER_PLAY;
|
||||||
|
if (s == "done") return TI_CHECK;
|
||||||
|
if (s == "error") return TI_ALERT_CIRCLE;
|
||||||
|
if (s == "cancelled") return TI_X;
|
||||||
|
return TI_QUESTION_MARK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ImVec4 status_color(const std::string& s) {
|
||||||
|
if (s == "running") return ImVec4(0.36f, 0.78f, 1.0f, 1.0f);
|
||||||
|
if (s == "done") return ImVec4(0.40f, 0.85f, 0.55f, 1.0f);
|
||||||
|
if (s == "error") return ImVec4(0.95f, 0.45f, 0.45f, 1.0f);
|
||||||
|
if (s == "cancelled") return ImVec4(0.65f, 0.65f, 0.65f, 1.0f);
|
||||||
|
if (s == "queued") return ImVec4(0.85f, 0.78f, 0.45f, 1.0f);
|
||||||
|
return ImVec4(0.7f, 0.7f, 0.7f, 1.0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string format_duration(long long started, long long finished) {
|
||||||
|
if (started <= 0) return "—";
|
||||||
|
long long end = finished > 0 ? finished
|
||||||
|
: std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
|
std::chrono::system_clock::now().time_since_epoch())
|
||||||
|
.count();
|
||||||
|
long long ms = end - started;
|
||||||
|
if (ms < 0) ms = 0;
|
||||||
|
char b[32];
|
||||||
|
if (ms < 1000) std::snprintf(b, sizeof(b), "%lld ms", ms);
|
||||||
|
else if (ms < 60'000) std::snprintf(b, sizeof(b), "%.1f s", ms / 1000.0);
|
||||||
|
else std::snprintf(b, sizeof(b), "%.1f m", ms / 60'000.0);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool filter_match(const std::string& status, int idx) {
|
||||||
|
switch (idx) {
|
||||||
|
case 0: return true; // all
|
||||||
|
case 1: return status == "queued" || status == "running"; // active
|
||||||
|
case 2: return status == "done";
|
||||||
|
case 3: return status == "error" || status == "cancelled";
|
||||||
|
default: return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void views_jobs(AppState& app) {
|
||||||
|
if (!app.panel_jobs) return;
|
||||||
|
|
||||||
|
if (!ImGui::Begin("Jobs", &app.panel_jobs)) {
|
||||||
|
ImGui::End();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Refresh cache cada ~10 frames (~6 Hz a 60fps).
|
||||||
|
int frame = ImGui::GetFrameCount();
|
||||||
|
if (frame - g_jobs_cache.last_frame_refresh > 10) {
|
||||||
|
jobs_list(&g_jobs_cache.rows, 200);
|
||||||
|
g_jobs_cache.last_frame_refresh = frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Header: counters + filtro.
|
||||||
|
JobCounters c = jobs_counters();
|
||||||
|
ImGui::TextColored(status_color("running"), "%s", TI_PLAYER_PLAY);
|
||||||
|
ImGui::SameLine(); ImGui::Text("%d", c.running);
|
||||||
|
ImGui::SameLine(0, 16);
|
||||||
|
ImGui::TextColored(status_color("queued"), "%s", TI_HOURGLASS);
|
||||||
|
ImGui::SameLine(); ImGui::Text("%d", c.queued);
|
||||||
|
ImGui::SameLine(0, 16);
|
||||||
|
ImGui::TextColored(status_color("done"), "%s", TI_CHECK);
|
||||||
|
ImGui::SameLine(); ImGui::Text("%d", c.done);
|
||||||
|
ImGui::SameLine(0, 16);
|
||||||
|
ImGui::TextColored(status_color("error"), "%s", TI_ALERT_CIRCLE);
|
||||||
|
ImGui::SameLine(); ImGui::Text("%d", c.error + c.cancelled);
|
||||||
|
|
||||||
|
ImGui::SameLine(0, 24);
|
||||||
|
const char* filter_labels[] = { "All", "Active", "Done", "Errors" };
|
||||||
|
ImGui::SetNextItemWidth(100);
|
||||||
|
ImGui::Combo("##jobs_filter", &g_jobs_cache.filter_idx,
|
||||||
|
filter_labels, IM_ARRAYSIZE(filter_labels));
|
||||||
|
|
||||||
|
ImGui::Separator();
|
||||||
|
|
||||||
|
// Tabla.
|
||||||
|
ImGuiTableFlags tflags = ImGuiTableFlags_Borders |
|
||||||
|
ImGuiTableFlags_RowBg |
|
||||||
|
ImGuiTableFlags_SizingStretchProp |
|
||||||
|
ImGuiTableFlags_ScrollY;
|
||||||
|
if (ImGui::BeginTable("jobs_table", 6, tflags,
|
||||||
|
ImVec2(0, ImGui::GetContentRegionAvail().y - 4))) {
|
||||||
|
ImGui::TableSetupScrollFreeze(0, 1);
|
||||||
|
ImGui::TableSetupColumn("Status", ImGuiTableColumnFlags_WidthFixed, 90);
|
||||||
|
ImGui::TableSetupColumn("Enricher", ImGuiTableColumnFlags_WidthStretch, 1.5f);
|
||||||
|
ImGui::TableSetupColumn("Target", ImGuiTableColumnFlags_WidthStretch, 2.0f);
|
||||||
|
ImGui::TableSetupColumn("Progress", ImGuiTableColumnFlags_WidthStretch, 2.0f);
|
||||||
|
ImGui::TableSetupColumn("Time", ImGuiTableColumnFlags_WidthFixed, 70);
|
||||||
|
ImGui::TableSetupColumn("##actions",ImGuiTableColumnFlags_WidthFixed, 80);
|
||||||
|
ImGui::TableHeadersRow();
|
||||||
|
|
||||||
|
for (const auto& r : g_jobs_cache.rows) {
|
||||||
|
if (!filter_match(r.status, g_jobs_cache.filter_idx)) continue;
|
||||||
|
ImGui::PushID(r.id.c_str());
|
||||||
|
ImGui::TableNextRow();
|
||||||
|
|
||||||
|
// Status.
|
||||||
|
ImGui::TableSetColumnIndex(0);
|
||||||
|
ImGui::TextColored(status_color(r.status), "%s %s",
|
||||||
|
status_icon(r.status), r.status.c_str());
|
||||||
|
|
||||||
|
// Enricher.
|
||||||
|
ImGui::TableSetColumnIndex(1);
|
||||||
|
ImGui::TextUnformatted(r.enricher_id.c_str());
|
||||||
|
|
||||||
|
// Target.
|
||||||
|
ImGui::TableSetColumnIndex(2);
|
||||||
|
if (!r.node_name.empty()) {
|
||||||
|
ImGui::TextUnformatted(r.node_name.c_str());
|
||||||
|
} else if (!r.node_id.empty()) {
|
||||||
|
ImGui::TextDisabled("%s", r.node_id.c_str());
|
||||||
|
} else {
|
||||||
|
ImGui::TextDisabled("(global)");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Progress.
|
||||||
|
ImGui::TableSetColumnIndex(3);
|
||||||
|
if (r.status == "running" || r.status == "queued") {
|
||||||
|
ImGui::ProgressBar((float)r.progress, ImVec2(-FLT_MIN, 0),
|
||||||
|
r.stage.empty() ? nullptr : r.stage.c_str());
|
||||||
|
} else if (r.status == "error" && !r.error.empty()) {
|
||||||
|
ImGui::TextColored(status_color("error"), "%s",
|
||||||
|
r.error.size() > 64
|
||||||
|
? (r.error.substr(0, 64) + "…").c_str()
|
||||||
|
: r.error.c_str());
|
||||||
|
if (ImGui::IsItemHovered()) {
|
||||||
|
ImGui::SetTooltip("%s", r.error.c_str());
|
||||||
|
}
|
||||||
|
} else if (r.status == "done" && !r.result_json.empty()) {
|
||||||
|
ImGui::TextDisabled("%s",
|
||||||
|
r.result_json.size() > 80
|
||||||
|
? (r.result_json.substr(0, 80) + "…").c_str()
|
||||||
|
: r.result_json.c_str());
|
||||||
|
if (ImGui::IsItemHovered() && r.result_json.size() > 80) {
|
||||||
|
ImGui::SetTooltip("%s", r.result_json.c_str());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ImGui::TextDisabled("—");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Time.
|
||||||
|
ImGui::TableSetColumnIndex(4);
|
||||||
|
ImGui::TextDisabled("%s",
|
||||||
|
format_duration(r.started_at, r.finished_at).c_str());
|
||||||
|
|
||||||
|
// Actions.
|
||||||
|
ImGui::TableSetColumnIndex(5);
|
||||||
|
if (r.status == "queued" || r.status == "running") {
|
||||||
|
if (ImGui::SmallButton("Cancel")) {
|
||||||
|
jobs_cancel(r.id.c_str());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (ImGui::SmallButton("Delete")) {
|
||||||
|
jobs_delete(r.id.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ImGui::PopID();
|
||||||
|
}
|
||||||
|
ImGui::EndTable();
|
||||||
|
}
|
||||||
|
|
||||||
|
ImGui::End();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ge
|
||||||
Reference in New Issue
Block a user