#pragma once #include #include #include #include // Sistema de jobs asincronos para enrichers (issue 0026). // // Diseno: // - Tabla `jobs` en graph_explorer.db (NO en operations.db). // - Pool de N std::thread workers (default 2). // - Cada job spawnea un subprocess Python `enrichers//run.py` que recibe // contexto por stdin (JSON), emite progreso por stderr y resultado final // por stdout (JSON). // - El worker aplica entities/relations/node_updates al operations.db indicado. // - dirty_counter se incrementa al completar un job — el render() lee el // contador y triggerea un reload del grafo cuando cambia. // // El estado vivo (running/queued en RAM) se persiste tambien en SQLite para // que cancelaciones, reinicios y queries de UI vean lo mismo. namespace ge { struct JobRow { std::string id; std::string enricher_id; std::string node_id; std::string node_name; std::string status; // queued|running|done|error|cancelled double progress = 0.0; std::string stage; std::string error; std::string result_json; long long created_at = 0; long long started_at = 0; long long finished_at = 0; }; // Inicializa el sistema. Crea la tabla `jobs` si no existe, marca como `error` // los jobs que quedaron `running` de una sesion anterior, lanza los workers. // `app_db_path` es /graph_explorer.db (jobs). // `ops_db_path` es la operations.db actualmente cargada (target de mutaciones). // `enrichers_dir` es /enrichers/. // `app_dir` se usa como root para resolver `cache/` (issue 0027). // `registry_root` se pasa al subprocess como FN_REGISTRY_ROOT para que el // enricher pueda importar funciones del registry. // Retorna false si alguno de los paths falla. bool jobs_init(const char* app_db_path, const char* ops_db_path, const char* enrichers_dir, const char* app_dir, const char* registry_root, int n_workers); // Cambia la operations.db objetivo (cuando el usuario abre otra). Drena cola // pendiente — los jobs ya queued NO se reapuntan. void jobs_set_ops_db(const char* ops_db_path); // Encola un job. params_json puede ser "{}" o JSON valido. node_id puede ser // vacio (job global). Devuelve false si la BD falla. out_id (>= 64) recibe el // ULID generado. bool jobs_submit(const char* enricher_id, const char* node_id, const char* node_name, const char* params_json, char* out_id, size_t out_id_n); // Senala SIGTERM al subprocess y marca el job como `cancelled`. Si esta // `queued` lo marca directamente. bool jobs_cancel(const char* job_id); // Borra un job en estado terminal (done/error/cancelled). bool jobs_delete(const char* job_id); // Snapshot ordenado por created_at DESC. bool jobs_list(std::vector* out, int limit = 200); // Counters para el badge en la toolbar. struct JobCounters { int queued = 0; int running = 0; int done = 0; int error = 0; int cancelled = 0; }; JobCounters jobs_counters(); // Counter monotono que se incrementa cada vez que un job completa con // cambios en operations.db. El main thread compara contra su ultimo valor // conocido y, si cambio, llama reload_graph(). int jobs_dirty_counter(); // Cierra el pool y espera a los workers (cancelando jobs en vuelo). void jobs_shutdown(); } // namespace ge