Files
graph_explorer/issues/completed/0026-jobs-system.md
T
egutierrez ee0d26ce2d feat(enrichers): vendoring de funciones Python por enricher (issue 0033b)
Cada enricher con `lang: python` y `uses_functions` no vacio ahora
puede empaquetar las funciones del registry que necesita en
`<enricher>/_vendored/`. El run.py importa de ahi en lugar de
`<registry_root>/python/functions/`, lo que hace al binario
distribuible sin dependencia de un fn_registry montado.

Cambios:

1. tools/vendor_enricher_python.sh
   - Lee `uses_functions` del manifest (filtrando IDs `*_py_*`).
   - Resuelve `file_path` desde registry.db.
   - Copia recursivamente con expansion transitiva: si un fichero
     vendorizado importa siblings del mismo dominio, los siblings
     tambien se copian (resuelve el caso `extract_iocs.py` que
     importa 7 modulos hermanos).
   - Genera `.vendor.lock` con `<id>  <sha256>  <src_path>` por
     funcion declarada para auditoria.
   - Idempotente — si todos los hashes coinciden, no rehace nada.

2. Manifests actualizados con `uses_functions`:
   - fetch_webpage:        normalize_url + html_to_markdown
   - extract_links:        extract_urls
   - extract_text_entities: extract_iocs

3. run.py de los 3 enrichers afectados: importan de `_vendored/`
   si existe, fallback a `<registry_root>/python/functions/` en
   modo dev (mantiene los tests pytest funcionando).

4. app.md: anade `cryptography` a python_runtime_deps porque el
   blob `cybersecurity.cybersecurity` lo importa al top.

5. Tests:
   - test_vendor_script.py — 6 tests del script: layout correcto,
     transitive siblings, lock con SHA256, idempotencia, modulos
     importables en aislamiento.
   - 16 tests de enrichers existentes pasan via vendoring (no usan
     registry_root porque _vendored/ tiene prioridad).

6. Issue 0033b movido a issues/completed/.

Tests: 32/32 verde (16 enrichers + 6 dispatcher + 4 runtime + 6
vendor).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 00:20:41 +02:00

210 lines
7.4 KiB
Markdown

---
id: 0026
title: Sistema de jobs — enrichers asincronos en background
status: completed
priority: high
created: 2026-05-01
blocks: [0027, 0028, 0029, 0030]
supersedes: [0001, 0002, 0003]
---
## Objetivo
Convertir el menu "Run enricher" (hoy placeholder en main.cpp:485) en un
sistema real de jobs asincronos: el usuario lanza un enricher sobre un
nodo, vuelve a la app y sigue trabajando mientras el enricher procesa
en background. Al terminar, el grafo se recarga automaticamente con las
entidades/relaciones nuevas.
Este issue solo cubre la **infra**. Los enrichers concretos se escriben
en 0028, 0029, 0030.
## Decisiones tomadas
1. **Workers concurrentes**: 2 por defecto, configurable via Settings.
2. **Cache de documentos**: `<app_dir>/cache/<sha256[0:2]>/<sha256>.{html,md,png}`. Carpeta gitignored en el sub-repo.
3. **Webpage vs Url**: tipos separados (issue 0027). Url = link suelto, Webpage = documento descargado con cuerpo.
4. **Subprocess Python por job** (no daemon residente): cold start ~200 ms aceptable. Si molesta, issue futura.
5. **Estado en `graph_explorer.db`** (NO en `operations.db`): jobs son especificos de la app, no del grafo.
## Tabla `jobs` (graph_explorer.db)
```sql
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY, -- ULID
enricher_id TEXT NOT NULL, -- ej: "fetch_webpage"
node_id TEXT, -- nodo objetivo (NULL si batch)
node_name TEXT DEFAULT '', -- snapshot para mostrar en UI
params_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL, -- queued|running|done|error|cancelled
progress REAL NOT NULL DEFAULT 0, -- 0..1
stage TEXT NOT NULL DEFAULT '', -- mensaje corto: "fetching", "extracting"
result_json TEXT, -- {entities_added: N, relations_added: M, ...}
error TEXT,
pid INTEGER, -- subprocess pid para cancelar
created_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, created_at);
```
## Runtime C++
### `jobs.{h,cpp}` (nuevo)
- `JobRunner`: pool de N std::thread workers (default 2).
- Cola en memoria `std::queue<JobId>` + persistencia en BD.
- API publico:
- `bool jobs_init(const char* db_path, int n_workers);`
- `bool jobs_submit(const char* enricher_id, const char* node_id, const char* params_json, char* out_id);`
- `bool jobs_cancel(const char* job_id);`
- `void jobs_shutdown();`
- `int jobs_dirty_counter();` // incrementa al completar un job; render lo lee
- `bool jobs_list(std::vector<JobRow>* out);`
- Worker hace:
1. Pop de cola → mark running, started_at = now, pid = subprocess pid.
2. Spawn `python/.venv/bin/python3 enrichers/<id>/run.py` con stdin = JSON.
3. Lee stderr line-by-line buscando `PROGRESS:<float> <stage>` para actualizar fila.
4. Lee stdout completo al cerrar — JSON final con entities/relations/node_updates.
5. Aplica al `operations.db` desde el worker (entity_insert/relation_insert/entity_update).
6. Marca done o error con result_json/error, finished_at, increment dirty_counter.
- Al arrancar: marca jobs `running` huerfanos como `error: "process died"`.
### `enrichers.{h,cpp}` (nuevo)
- Escanea `enrichers/*/manifest.yaml` al arrancar.
- Estructura `EnricherSpec`:
```
std::string id, name, description;
std::vector<std::string> applies_to; // tipos validos
std::vector<EnricherParam> params;
std::string run_path; // enrichers/<id>/run.py absoluto
```
- API:
- `void enrichers_load(const char* enrichers_dir);`
- `std::vector<EnricherSpec> enrichers_for_type(const char* type_ref);`
## Contrato enricher (wire protocol)
Cada enricher vive en `apps/graph_explorer/enrichers/<id>/`:
```
enrichers/
fetch_webpage/
manifest.yaml
run.py
```
### `manifest.yaml`
```yaml
id: fetch_webpage
name: "Fetch web page"
description: "Descarga HTML, extrae markdown y guarda en cache."
applies_to: [Webpage, Url]
params:
- { name: timeout_s, type: int, default: 15 }
- { name: use_browser, type: bool, default: false }
```
### `run.py` — stdin/stdout/stderr
**stdin** (una linea JSON):
```json
{"node_id":"webpage_123","node_type":"Webpage","node_name":"...",
"metadata":{"url":"https://..."},"params":{"timeout_s":15},
"db_path":"/path/operations.db","cache_dir":"/path/cache",
"registry_root":"/home/lucas/fn_registry"}
```
**stderr** (lineas de progreso, opcional):
```
PROGRESS:0.10 connecting
PROGRESS:0.50 parsing
PROGRESS:0.90 writing
```
**stdout** (una linea JSON al final):
```json
{"node_updates":[
{"id":"webpage_123","metadata_patch":{"title":"...","status_code":200}}
],
"entities":[
{"type":"Domain","name":"example.com","metadata":{}}
],
"relations":[
{"from_id":"webpage_123","to_name":"example.com","to_type":"Domain","kind":"BELONGS_TO"}
],
"notes":""}
```
Resolucion de relaciones:
- Si `to_id` esta presente, se usa directamente.
- Si no, se busca por `(to_name, to_type)` en operations.db; si no existe, se crea primero.
## UI
### Panel "Jobs"
- Nuevo panel dockeable (entry en `g_panels[]`).
- Tabla con columnas: enricher, target node (clicable → centra viewport), status (badge), progress bar, stage, duracion, error tooltip.
- Botones inline por fila: cancelar (running), reintentar (error/cancelled), borrar (terminal).
- Filtro: all | active | done | error.
### Toolbar
- Badge en la toolbar superior con contador de `running + queued`. Click abre el panel Jobs.
### Context menu (main.cpp:485)
Reemplazar el `TextDisabled("coming soon")` por:
```cpp
auto specs = ge::enrichers_for_type(node->type_ref);
if (specs.empty()) {
ImGui::TextDisabled("(no hay enrichers para este tipo)");
} else {
for (const auto& s : specs) {
if (ImGui::MenuItem(s.name.c_str())) {
char job_id[64];
ge::jobs_submit(s.id.c_str(), node->id.c_str(), "{}", job_id);
}
}
}
```
## Fases del bucle reactivo en BD
- **CONSTRUIR**: enricher escrito en `enrichers/<id>/`.
- **EJECUTAR**: subprocess corre, escribe progress en `jobs`.
- **RECOPILAR**: stdout JSON parseado, entities/relations aplicadas a operations.db.
- **ANALIZAR**: jobs.result_json contiene metricas (entities_added, duration_ms).
- **MEJORAR**: si un enricher falla repetidamente, futura issue de health checks.
## Cancelacion
- Boton "Cancel" en panel Jobs:
- Lee `pid` de la fila.
- `kill(pid, SIGTERM)` (Linux/WSL) o `TerminateProcess` (Windows).
- El worker captura el exit code y marca `cancelled`.
- Si el job aun no salio de la cola (status = queued), el worker simplemente no lo coge — al pop chequea status y skipea cancelled.
## Definicion de hecho
- Tabla `jobs` se crea al arrancar la app.
- `JobRunner` con 2 workers acepta `jobs_submit` y procesa.
- Panel Jobs muestra estado en tiempo real (progress bar avanza visiblemente).
- Cancelar mata subprocess y marca `cancelled`.
- Al completar, el grafo se recarga (dirty_counter detectado en render).
- Subir el contador de jobs en la toolbar.
- Test manual: enricher dummy `noop` (incluido en este issue) que duerme 3 s emitiendo PROGRESS y termina sin entidades. Lanzarlo y comprobar UI.
## Trabajo posterior
- 0027: tipo Webpage + cache.
- 0028: primer enricher real (`fetch_webpage`) end-to-end.
- 0028b: enrichers extract_domain, extract_links, extract_text_entities.
- 0029: enrichers via CDP (browser headless).
- 0030: macro "Deep enrich" + expand_domain.