Files
egutierrez 6df04652d8 feat(jobs): sistema de jobs asincronos + panel UI (issue 0026)
Infra para correr enrichers en background mientras la app sigue interactiva.

C++:
- jobs.{h,cpp}: tabla jobs en graph_explorer.db, JobRunner con N=2 std::thread
  workers, fork+exec POSIX con pipes, parser de PROGRESS:<float> <stage> en
  stderr, captura de stdout JSON, persistencia + dirty_counter.
- enrichers.{h,cpp}: scanner de enrichers/<id>/manifest.yaml, parser YAML
  minimo (id/name/description/applies_to), filtro por tipo de nodo.
- views_jobs.cpp: panel "Jobs" dockeable con tabla (status/enricher/target/
  progress/time), filtro all/active/done/errors, cancelar/borrar inline.

Wiring:
- main.cpp: resolve_registry_root() (FN_REGISTRY_ROOT env o subir desde cwd
  buscando registry.db), jobs_init/enrichers_load antes de fn::run_app,
  jobs_shutdown al cerrar, dirty_counter -> want_reload, jobs_set_ops_db al
  cambiar de proyecto.
- main.cpp:render_context_menu: menu "Run enricher" sustituye placeholder
  con submenu filtrado por type_ref via enrichers_for_type. Submit abre
  panel Jobs auto.
- views.h: AppState::panel_jobs flag + decl views_jobs().
- CMakeLists.txt: anade jobs.cpp + enrichers.cpp + views_jobs.cpp y enlaza
  Threads::Threads.

Wire protocol enricher (subprocess Python):
- stdin:  JSON con node_id, metadata, ops_db_path, app_dir, cache_dir,
          registry_root, params.
- stderr: PROGRESS:<float> <stage> + LOG lineas libres.
- stdout: JSON resumen al final.
- exit 0 = ok, !=0 = error con stderr capturado en panel Jobs.

El run.py escribe directamente al operations.db (sqlite3 stdlib) — C++ solo
orquesta, no parsea entities/relations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 18:24:37 +02:00

98 lines
3.4 KiB
C++

#pragma once
#include <cstddef>
#include <cstdint>
#include <string>
#include <vector>
// Sistema de jobs asincronos para enrichers (issue 0026).
//
// Diseno:
// - Tabla `jobs` en graph_explorer.db (NO en operations.db).
// - Pool de N std::thread workers (default 2).
// - Cada job spawnea un subprocess Python `enrichers/<id>/run.py` que recibe
// contexto por stdin (JSON), emite progreso por stderr y resultado final
// por stdout (JSON).
// - El worker aplica entities/relations/node_updates al operations.db indicado.
// - dirty_counter se incrementa al completar un job — el render() lee el
// contador y triggerea un reload del grafo cuando cambia.
//
// El estado vivo (running/queued en RAM) se persiste tambien en SQLite para
// que cancelaciones, reinicios y queries de UI vean lo mismo.
namespace ge {
struct JobRow {
std::string id;
std::string enricher_id;
std::string node_id;
std::string node_name;
std::string status; // queued|running|done|error|cancelled
double progress = 0.0;
std::string stage;
std::string error;
std::string result_json;
long long created_at = 0;
long long started_at = 0;
long long finished_at = 0;
};
// Inicializa el sistema. Crea la tabla `jobs` si no existe, marca como `error`
// los jobs que quedaron `running` de una sesion anterior, lanza los workers.
// `app_db_path` es <app_dir>/graph_explorer.db (jobs).
// `ops_db_path` es la operations.db actualmente cargada (target de mutaciones).
// `enrichers_dir` es <app_dir>/enrichers/.
// `app_dir` se usa como root para resolver `cache/` (issue 0027).
// `registry_root` se pasa al subprocess como FN_REGISTRY_ROOT para que el
// enricher pueda importar funciones del registry.
// Retorna false si alguno de los paths falla.
bool jobs_init(const char* app_db_path,
const char* ops_db_path,
const char* enrichers_dir,
const char* app_dir,
const char* registry_root,
int n_workers);
// Cambia la operations.db objetivo (cuando el usuario abre otra). Drena cola
// pendiente — los jobs ya queued NO se reapuntan.
void jobs_set_ops_db(const char* ops_db_path);
// Encola un job. params_json puede ser "{}" o JSON valido. node_id puede ser
// vacio (job global). Devuelve false si la BD falla. out_id (>= 64) recibe el
// ULID generado.
bool jobs_submit(const char* enricher_id,
const char* node_id,
const char* node_name,
const char* params_json,
char* out_id, size_t out_id_n);
// Senala SIGTERM al subprocess y marca el job como `cancelled`. Si esta
// `queued` lo marca directamente.
bool jobs_cancel(const char* job_id);
// Borra un job en estado terminal (done/error/cancelled).
bool jobs_delete(const char* job_id);
// Snapshot ordenado por created_at DESC.
bool jobs_list(std::vector<JobRow>* out, int limit = 200);
// Counters para el badge en la toolbar.
struct JobCounters {
int queued = 0;
int running = 0;
int done = 0;
int error = 0;
int cancelled = 0;
};
JobCounters jobs_counters();
// Counter monotono que se incrementa cada vez que un job completa con
// cambios en operations.db. El main thread compara contra su ultimo valor
// conocido y, si cambio, llama reload_graph().
int jobs_dirty_counter();
// Cierra el pool y espera a los workers (cancelando jobs en vuelo).
void jobs_shutdown();
} // namespace ge