From 9042110ea272ac0672a89de2d4778e79b7bc2530 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:24:13 +0200 Subject: [PATCH 1/4] docs(issues): plan enrichers asincronos + recoleccion web (0026-0030) Cinco issues que componen el plan: - 0026: sistema de jobs (infra, contrato wire) - 0027: tipo Webpage + cache de documentos - 0028: enricher fetch_webpage (MVP end-to-end) - 0028b: enrichers extract_domain / extract_links / extract_text_entities - 0029: variantes CDP (Chrome headless, screenshot) - 0030: macro "Deep enrich" + expand_domain Tambien anade los issues previos 0012-0025 que estaban untracked. Co-Authored-By: Claude Opus 4.7 (1M context) --- issues/0012-http-ingest-endpoint.md | 54 ++++++ issues/0013-paste-extract-panel.md | 45 +++++ issues/0014-browser-extension.md | 49 +++++ issues/0015-drag-drop-ingest.md | 47 +++++ issues/0016-clipboard-watcher.md | 44 +++++ issues/0017-gx-cli.md | 38 ++++ issues/0018-headless-browser-transforms.md | 54 ++++++ issues/0019-ocr-ingest.md | 39 ++++ issues/0020-email-bot-ingest.md | 46 +++++ issues/0021-command-palette.md | 42 +++++ issues/0022-nl-graph-query.md | 41 ++++ issues/0023-saved-views.md | 39 ++++ issues/0024-subgraph-export.md | 45 +++++ issues/0025-vault-sync.md | 44 +++++ issues/0026-jobs-system.md | 209 +++++++++++++++++++++ issues/0027-webpage-type-cache.md | 82 ++++++++ issues/0028-enricher-fetch-webpage.md | 78 ++++++++ issues/0028b-enrichers-extract-trio.md | 72 +++++++ issues/0029-enrichers-cdp.md | 63 +++++++ issues/0030-deep-enrich-macro.md | 62 ++++++ 20 files changed, 1193 insertions(+) create mode 100644 issues/0012-http-ingest-endpoint.md create mode 100644 issues/0013-paste-extract-panel.md create mode 100644 issues/0014-browser-extension.md create mode 100644 issues/0015-drag-drop-ingest.md create mode 100644 issues/0016-clipboard-watcher.md create mode 100644 issues/0017-gx-cli.md create mode 100644 issues/0018-headless-browser-transforms.md create mode 100644 issues/0019-ocr-ingest.md create mode 100644 issues/0020-email-bot-ingest.md create mode 100644 issues/0021-command-palette.md create mode 100644 issues/0022-nl-graph-query.md create mode 100644 issues/0023-saved-views.md create mode 100644 issues/0024-subgraph-export.md create mode 100644 issues/0025-vault-sync.md create mode 100644 issues/0026-jobs-system.md create mode 100644 issues/0027-webpage-type-cache.md create mode 100644 issues/0028-enricher-fetch-webpage.md create mode 100644 issues/0028b-enrichers-extract-trio.md create mode 100644 issues/0029-enrichers-cdp.md create mode 100644 issues/0030-deep-enrich-macro.md diff --git a/issues/0012-http-ingest-endpoint.md b/issues/0012-http-ingest-endpoint.md new file mode 100644 index 0000000..4ca0ddf --- /dev/null +++ b/issues/0012-http-ingest-endpoint.md @@ -0,0 +1,54 @@ +--- +id: 0012 +title: Endpoint HTTP local de ingesta y consulta +status: pending +priority: medium +created: 2026-05-01 +--- + +## Objetivo + +Exponer un servidor HTTP local en `graph_explorer` (o como `ingest_server` +hermano) que sea el punto de entrada unico para todo flujo externo: +extension de navegador, CLI `gx`, watcher de portapapeles, bots, OCR, etc. + +Sin este endpoint cada cliente externo tendria que abrir `operations.db` +directamente — colisiona con el lock del proceso vivo y duplica logica de +extraccion. + +## Endpoints minimos + +- `POST /entity` — crea entidad. Body: `{type, name, metadata}`. +- `POST /relation` — crea relacion. Body: `{from_id, to_id, kind, metadata}`. +- `POST /ingest/text` — texto libre -> `extract_graph_hybrid` -> preview o auto-commit. +- `POST /ingest/url` — URL -> fetch + extract -> preview o auto-commit. +- `POST /ingest/file` — multipart upload (PDF/CSV/JSON/.eml/imagen) -> router por mime -> extract. +- `GET /search?q=` — fuzzy / FTS sobre entidades. +- `GET /entity/:id`, `GET /entity/:id/neighbors`. + +## Decisiones + +- Bind a `127.0.0.1` por defecto, puerto fijo (ej. 7878) o aleatorio + escrito a `~/.fn_graph_port`. +- Auth: token compartido en `~/.fn_graph_token` (header `X-Token`). + Generado al primer arranque. +- Modo "preview": las rutas de ingesta aceptan `?commit=false` y + devuelven entities/relations propuestas para que el cliente las muestre + antes de persistir. Cuando es `true`, escribe directo. +- Implementacion: httplib o Mongoose embebido en C++ (sin nuevas deps + pesadas). Alternativa: lanzar el servidor en Go/Python aparte si la + integracion C++ se complica. + +## Bloquea + +Issues 0014, 0017, 0018, 0019, 0020 dependen de este. + +## Definicion de hecho + +- `curl -H "X-Token: ..." -d '{"type":"person","name":"X"}' localhost:7878/entity` + crea entidad. +- `POST /ingest/text` con texto en castellano devuelve entities/relations + detectadas por el pipeline hibrido. +- El endpoint corre en background mientras la UI sigue interactiva. +- Si `graph_explorer` no esta abierto, un binario `ingest_server` + standalone ofrece el mismo API contra la misma `operations.db`. diff --git a/issues/0013-paste-extract-panel.md b/issues/0013-paste-extract-panel.md new file mode 100644 index 0000000..44bf622 --- /dev/null +++ b/issues/0013-paste-extract-panel.md @@ -0,0 +1,45 @@ +--- +id: 0013 +title: Panel "Paste & Extract" — texto libre a entidades con extract_graph_hybrid +status: pending +priority: high +created: 2026-05-01 +--- + +## Objetivo + +Panel dockeable dentro de `graph_explorer` con un textarea grande. Pegas +texto (articulo, mensaje, transcripcion, documento), pulsas Extract, corre +el pipeline `extract_graph_hybrid` (regex + GLiNER + GLiREL + LLM fallback) +y muestra preview de entidades y relaciones detectadas. El usuario marca +cuales aceptar antes de commit a `operations.db`. + +Es el quick-win de mas alto valor: aprovecha el pipeline ya mergeado +(commit 1a353878) y elimina la friccion de tipear datos a mano. + +## Alcance + +- Panel "Extract" con textarea, combo de proyecto/tipos esperados, boton + "Extract". +- Lanza el pipeline en hilo aparte (es Python — invocar via subprocess + o el endpoint HTTP de 0012 con `commit=false`). +- Tabla de entidades propuestas: checkbox, type, name, source span. Tabla + de relaciones propuestas: from, kind, to, checkbox. +- Edicion inline de tipo/nombre antes de commit. +- "Apply selected" -> escribe a operations.db, refresca grafo, posiciona + los nuevos nodos cerca del centro o vinculados al ultimo seleccionado. +- Dedupe: si una entidad propuesta ya existe (mismo type+name) reusar el + id en lugar de duplicar. + +## Decisiones + +- Invocacion del pipeline: via 0012 si esta disponible, o subprocess + directo como fallback (para que el panel funcione sin levantar HTTP). +- Resaltado de spans en el textarea (v2 — primera version solo lista). + +## Definicion de hecho + +- Pego un parrafo en castellano sobre una empresa y un directivo, pulso + Extract, veo entidades correctas tipadas y la relacion entre ambas. +- Apply crea los nodos en el grafo en menos de 1 s tras click. +- Re-extraer el mismo texto no duplica entidades (dedupe funciona). diff --git a/issues/0014-browser-extension.md b/issues/0014-browser-extension.md new file mode 100644 index 0000000..d085473 --- /dev/null +++ b/issues/0014-browser-extension.md @@ -0,0 +1,49 @@ +--- +id: 0014 +title: Extension de navegador "Add to graph" +status: pending +priority: high +created: 2026-05-01 +depends_on: [0012] +--- + +## Objetivo + +Extension Firefox/Chrome que añade items al grafo desde el navegador con +un click. Cubre el flujo Maltego "estoy leyendo algo en web -> nodo en +mi grafo" sin abandonar el navegador. + +## Casos + +- Click derecho sobre seleccion de texto -> "Add to graph" (manda texto + via `/ingest/text`). +- Click derecho sobre link -> "Add link" (crea entidad URL + metadata + del href, opcionalmente trigger fetch). +- Boton de toolbar -> "Add this page" (URL + titulo + meta description + + texto principal extraido con Readability). +- Modo "select & relate": dos selecciones consecutivas -> crea relacion + entre las entidades resultantes. + +## Alcance + +- WebExtension API (compatible Firefox/Chrome, Manifest v3). +- Settings: URL del endpoint (default `http://localhost:7878`), token, + proyecto destino. +- Preview popup tras extraccion: muestra entities propuestas, el usuario + acepta o edita antes de commit (reusa `?commit=false` de 0012). +- Atajo configurable (ej. `Ctrl+Shift+G`) para "add page". + +## Decisiones + +- Sin auth OAuth — token local compartido es suficiente para localhost. +- Empaquetar en `apps/graph_explorer/extension/` o como sub-repo propio + bajo `dataforge/graph_explorer_extension`. +- Si `graph_explorer` no esta corriendo: la extension muestra error + claro y guarda la accion en cola para reintentar. + +## Definicion de hecho + +- Selecciono un parrafo en una pagina, click derecho -> Add, en menos de + 2 s veo los nodos en `graph_explorer`. +- Funciona en Firefox y Chrome con la misma build. +- Reintento automatico de la cola cuando vuelve a haber endpoint vivo. diff --git a/issues/0015-drag-drop-ingest.md b/issues/0015-drag-drop-ingest.md new file mode 100644 index 0000000..ca21be3 --- /dev/null +++ b/issues/0015-drag-drop-ingest.md @@ -0,0 +1,47 @@ +--- +id: 0015 +title: Drag & drop de archivos sobre el viewport para ingesta +status: pending +priority: medium +created: 2026-05-01 +--- + +## Objetivo + +Soltar archivos sobre la ventana de `graph_explorer` lanza el extractor +adecuado segun extension y mete las entidades en el grafo, sin abrir +modales ni navegar menus. + +## Tipos soportados + +- `.pdf` -> texto + `extract_graph_hybrid`. +- `.eml` / `.msg` -> headers (from/to/cc) como entidades persona/email + + cuerpo via extract. +- `.csv` / `.parquet` -> ingesta como tabla DuckDB (encadena con 0011). +- `.json` / `.jsonl` -> si tiene shape entity/relation, importar; si no, + extract sobre stringify. +- `.png` / `.jpg` -> OCR (issue 0019) y luego extract. +- `.txt` / `.md` -> extract directo. + +## Alcance + +- Hook de drop de ImGui -> dispatcher por mime/extension -> pipeline + correspondiente -> preview con seleccion antes de commit (igual UX que + 0013). +- Indicador visual de zona drop activa cuando hay drag sobre la ventana. +- Multiples archivos en un drop: procesar en cola, mostrar progreso. + +## Decisiones + +- Dispatcher reutiliza `/ingest/file` del endpoint 0012 si esta vivo, o + resuelve localmente como fallback. +- Limite de tamaño por archivo configurable (default 50 MB) para evitar + bloqueos en PDFs gigantes. + +## Definicion de hecho + +- Suelto un PDF en castellano sobre el canvas, en menos de 30 s veo + preview con entidades correctas. +- Suelto un .eml y aparecen `from`/`to` como nodos persona conectados + por una relacion `mailed`. +- Cancelar durante el preview no toca operations.db. diff --git a/issues/0016-clipboard-watcher.md b/issues/0016-clipboard-watcher.md new file mode 100644 index 0000000..3db6c5b --- /dev/null +++ b/issues/0016-clipboard-watcher.md @@ -0,0 +1,44 @@ +--- +id: 0016 +title: Watcher de portapapeles con deteccion de patrones +status: pending +priority: low +created: 2026-05-01 +--- + +## Objetivo + +Servicio (toggle desde la toolbar) que escucha el portapapeles y, cuando +detecta patrones de interes, ofrece añadir como entidad sin abandonar el +flujo en otra app. Pensado para sesiones de OSINT manual donde el coste +de "abrir la app y tipear" rompe el ritmo. + +## Patrones detectados + +- URL -> entidad URL (con fetch + extract opcional). +- Email, telefono, IBAN, DNI/NIE/CIF, BIC -> entidad tipada con regex. +- Coordenadas (lat,lon), hash (sha1/sha256), wallet crypto (BTC/ETH). + +## Alcance + +- Polling del clipboard (ImGui `GetClipboardText` + diff) o API nativa + (X11 selection / Win32 clipboard listener). +- Toast / notificacion no intrusiva con boton "Add". El usuario decide + por defecto. +- Modo "auto-add" para tipos seguros (IBAN/DNI raras veces son ruido). +- Lista de patrones configurable en `graph_explorer.db`. + +## Decisiones + +- Por defecto OFF — opt-in desde settings, para evitar leer todo lo que + el usuario copia. +- Anonimizar logs: nunca persistir el contenido del clipboard si el + usuario no lo añade. +- Deduplicar: copiar la misma cadena dos veces seguidas no notifica. + +## Definicion de hecho + +- Activo el watcher, copio un IBAN, recibo notificacion, click en Add y + el nodo aparece en el grafo. +- Apagar el watcher detiene la escucha en menos de 1 s. +- Patrones configurados como lista de regex editable desde settings. diff --git a/issues/0017-gx-cli.md b/issues/0017-gx-cli.md new file mode 100644 index 0000000..2dffa46 --- /dev/null +++ b/issues/0017-gx-cli.md @@ -0,0 +1,38 @@ +--- +id: 0017 +title: CLI `gx` para hablar con el endpoint local +status: pending +priority: medium +created: 2026-05-01 +depends_on: [0012] +--- + +## Objetivo + +Cliente CLI fino, instalable en `~/.local/bin/gx`, que habla con el +endpoint HTTP local de `graph_explorer` (issue 0012). Permite ingesta y +consulta desde terminal o scripts sin abrir la app. + +## Comandos + +- `gx add [--metadata k=v ...]` — crea entidad. +- `gx rel ` — crea relacion. +- `gx ingest ` — manda archivo al endpoint, abre preview en TUI. +- `gx from-url ` — fetch + extract. +- `gx search "query"` — devuelve hits del grafo activo (json o tabla). +- `gx neighbors [--depth N]`. +- `gx open ` — abre el grafo y enfoca el nodo en `graph_explorer`. + +## Decisiones + +- Implementar como sub-comando del `fn` CLI existente (`fn gx ...`) o + binario aparte? Probablemente sub-comando para reusar config y auth. +- Output JSON por defecto si stdout no es TTY (componible con jq). +- Tabla legible si stdout es TTY. + +## Definicion de hecho + +- `gx add person "Juan Perez"` añade el nodo en el grafo en vivo. +- `gx ingest articulo.pdf` lanza preview interactivo en terminal y commit. +- `gx neighbors --depth 2 --format json | jq` funciona en pipeline. +- Errores de conexion al endpoint se reportan claros (no stack traces). diff --git a/issues/0018-headless-browser-transforms.md b/issues/0018-headless-browser-transforms.md new file mode 100644 index 0000000..4fbff7a --- /dev/null +++ b/issues/0018-headless-browser-transforms.md @@ -0,0 +1,54 @@ +--- +id: 0018 +title: Transforms automatizadas tipo Maltego con browser headless +status: pending +priority: low +created: 2026-05-01 +depends_on: [0012] +--- + +## Objetivo + +Dada una entidad seleccionada, ejecutar un script (Playwright/Puppeteer) +que enriquece el grafo con datos derivados — equivalente a las +"transforms" de Maltego. Es la pieza que diferencia frente a alternativas +mas estaticas. + +## Ejemplos para banking/OSINT espanol + +- Persona/empresa -> consulta BORME, registro mercantil, axesor. +- Dominio -> whois, DNS records, certificados (crt.sh). +- Email -> haveibeenpwned, hunter.io. +- Telefono -> truecaller-like. +- Empresa -> LinkedIn search publico, opencorporates. + +## Alcance + +- Cada transform es un pipeline del registry con tag `transform` y un + contrato fijo: input = `{id, type, metadata}`, output = `{entities, + relations}`. +- Registro de transforms aplicables por entity_type. +- UI: context menu sobre nodo -> "Run transform..." -> lista filtrada + por type aplicable -> ejecuta async -> notifica al terminar -> preview + antes de commit. +- Sandbox: cada transform en proceso aparte, timeout configurable. + +## Riesgos y mitigaciones + +- Los scrapers se rompen cuando los sitios cambian -> mantener una suite + de "transform health checks" automaticos (cron ligero) que avisa de + fallos antes de que el usuario los descubra en vivo. +- Cumplimiento legal y robots.txt -> documentar en cada transform su + fuente, politica y ToS. +- Rate limiting -> cooldown por host configurable. + +## Definicion de hecho + +- Selecciono un dominio en el grafo, lanzo "whois", aparecen registrant, + registrar y nameservers como nodos vinculados con relaciones tipadas. +- Un transform que falla loguea el error y no afecta a otros que + esten corriendo en paralelo. +- La lista de transforms aplicables a una entidad se computa segun su + type (no se ofrecen los inaplicables). +- Health check cron escribe a `proposals` cuando un transform empieza a + fallar repetidamente. diff --git a/issues/0019-ocr-ingest.md b/issues/0019-ocr-ingest.md new file mode 100644 index 0000000..b3ad76e --- /dev/null +++ b/issues/0019-ocr-ingest.md @@ -0,0 +1,39 @@ +--- +id: 0019 +title: OCR de region de pantalla y archivos imagen +status: pending +priority: low +created: 2026-05-01 +depends_on: [0012] +--- + +## Objetivo + +Capturar una region de pantalla (atajo global) o soltar imagen sobre la +app (issue 0015) -> Tesseract / PaddleOCR -> texto -> `extract_graph_hybrid`. + +Util cuando la fuente solo esta como captura, PDF escaneado, o pantalla +de un sistema sin copy/paste. + +## Alcance + +- Captura: usar herramienta del SO (gnome-screenshot, flameshot, snipping + tool) con flag de region. Linux primero, Windows con Snip & Sketch. +- OCR: Tesseract con datos de espanol (`spa.traineddata`). PaddleOCR + como alternativa para texto manuscrito o calidades bajas. +- Pipeline: imagen -> OCR -> texto -> panel preview de 0013. + +## Decisiones + +- Atajo global configurable (default `Ctrl+Alt+G`). +- Idiomas OCR como lista en settings (default `[spa, eng]`). +- Persistir la imagen original como `metadata.source_image_path` en la + entidad creada para trazabilidad. + +## Definicion de hecho + +- Atajo abre selector de region, capturo un parrafo en pantalla, en + menos de 5 s veo entidades extraidas. +- Suelto un PNG con texto sobre el canvas, mismo flujo (encadena con 0015). +- Calidad de OCR para espanol > 90% en capturas estandar 1080p de texto + impreso. diff --git a/issues/0020-email-bot-ingest.md b/issues/0020-email-bot-ingest.md new file mode 100644 index 0000000..ffb6e73 --- /dev/null +++ b/issues/0020-email-bot-ingest.md @@ -0,0 +1,46 @@ +--- +id: 0020 +title: Ingesta via email forwarding y bot Telegram/Signal +status: pending +priority: low +created: 2026-05-01 +depends_on: [0012] +--- + +## Objetivo + +Ingerir entidades sin estar delante del PC. Util para capturar cosas +sobre la marcha (movil, lectura en otra pantalla, conversaciones). + +## Canales + +- **Email**: direccion dedicada (mailbox o alias) que se chequea via + IMAP cada N minutos. Adjuntos -> ingesta como en 0015. Cuerpo -> + extract. +- **Bot Telegram/Signal**: forwardear un mensaje, una imagen, o escribir + comandos (`/add empresa Acme`, `/relate Acme owns Bravo`). El bot + habla con el endpoint 0012. + +## Alcance + +- Cliente IMAP minimo o uso de un MTA local (postfix+dovecot) que + redirija a un script. +- Bot Telegram: BotFather + python-telegram-bot o equivalente Go (vive + en `apps//` con tag `service`). +- Auth: solo procesar mensajes de chat IDs / direcciones whitelisted en + config. +- Confirmacion: el bot responde con preview ("¿añado estas 3 entidades? + responde 'si' o 'no'") antes de commit. + +## Decisiones + +- Bot self-hosted (no SaaS) — corre como service en VPS o en el PC. +- Multiples grafos: el bot puede targetear distintos `operations.db` + segun el chat de origen (mapping en config). + +## Definicion de hecho + +- Reenvio un PDF a la direccion dedicada y, en menos de 2 minutos, veo + las entidades en el grafo con notificacion del bot. +- El bot rechaza mensajes de chat IDs no autorizados sin responder. +- Comando `/search Acme` desde el bot devuelve hits del grafo. diff --git a/issues/0021-command-palette.md b/issues/0021-command-palette.md new file mode 100644 index 0000000..534d5b2 --- /dev/null +++ b/issues/0021-command-palette.md @@ -0,0 +1,42 @@ +--- +id: 0021 +title: Command palette Ctrl+K — busqueda y acciones globales +status: pending +priority: high +created: 2026-05-01 +--- + +## Objetivo + +Atajo `Ctrl+K` (configurable) abre overlay flotante con input de busqueda +fuzzy global. Lo que mas acelera el dia a dia: cero navegacion por menus +para encontrar un nodo o disparar una accion. + +## Alcance + +Indexa y matchea sobre: + +- Entidades del grafo (por name, type, metadata). +- Acciones de la app ("Toggle inspector", "Save layout", "Run transform", + "Export subgraph", "Switch project", "Open settings"). +- Comandos recientes (MRU al tope sin escribir). + +Selecciono con flechas + Enter -> ejecuta accion o enfoca nodo en +el viewport. + +## Implementacion + +- Overlay modal centrado, input de texto + lista virtualizada + (`ImGuiListClipper`). +- Indexador en memoria sobre entidades; refresh al cambiar grafo. +- Fuzzy matcher (fzf-like, p.ej. `fts_fuzzy_match` de Forrest the woods, + o algo equivalente). +- Acciones registrables desde cualquier panel — registro central tipo + `cmd_palette_register("name", lambda)`. + +## Definicion de hecho + +- Ctrl+K, escribo 3 letras del nombre de un nodo, lo enfoca en el grafo. +- Ctrl+K, "exp", veo accion "Export subgraph as Markdown" disponible. +- Latencia de matching imperceptible con 50k entidades. +- MRU pone arriba lo usado recientemente. diff --git a/issues/0022-nl-graph-query.md b/issues/0022-nl-graph-query.md new file mode 100644 index 0000000..f1d0a86 --- /dev/null +++ b/issues/0022-nl-graph-query.md @@ -0,0 +1,41 @@ +--- +id: 0022 +title: Consulta del grafo en lenguaje natural via LLM +status: pending +priority: medium +created: 2026-05-01 +depends_on: [0001] +--- + +## Objetivo + +Input de texto ("personas relacionadas con BancoX que aparecen en mas de +3 documentos") -> LLM traduce a SQL sobre `operations.db` o a un set de +filtros sobre el grafo en memoria -> resalta el subgrafo resultante. + +Complementa el chat de 0001 con un modo "consulta puntual" sin +conversacion: input -> resultado destacado, sin chat history. + +## Alcance + +- Tool-use ya disponible en 0001 (`query_entities`, `query_relations`). +- Modo "highlight": en lugar de devolver texto, el LLM emite un set de + ids -> la UI dibuja el subgrafo con resaltado y oscurece el resto. +- Boton "save as filter" -> persiste como vista nombrada (issue 0023). +- Historial de queries recientes en un dropdown. +- Indicador de query en curso (puede tardar varios segundos). + +## Decisiones + +- ¿Mismo cliente HTTP/Anthropic que 0001 o duplicado? Reusar. +- Modelo por defecto el mismo que 0001 (`claude-sonnet-4-6`). +- Query schema (que campos ve el LLM) dado por `types_registry` para + que aprenda los nombres de campos del proyecto. + +## Definicion de hecho + +- "personas con mas de 5 conexiones" devuelve subgrafo correcto. +- "documentos publicados en 2025" funciona si la metadata tiene fechas. +- Fallo silencioso (LLM mal interpreta) -> mensaje claro y opcion de + reintentar refinando. +- Query guardada como filtro reutilizable. diff --git a/issues/0023-saved-views.md b/issues/0023-saved-views.md new file mode 100644 index 0000000..80e29d7 --- /dev/null +++ b/issues/0023-saved-views.md @@ -0,0 +1,39 @@ +--- +id: 0023 +title: Vistas guardadas y filtros nombrados +status: pending +priority: medium +created: 2026-05-01 +--- + +## Objetivo + +Guardar combinaciones de filtros (tipo, tag, FTS, layout, zoom, nodos +fijados) bajo un nombre y reaplicarlas con un click o atajo. + +Util para volver siempre al "mapa de la red de empresa X" o "vista de +emails sospechosos" sin reconfigurar todo cada vez. + +## Alcance + +- Tabla `saved_views(graph_hash, name, payload_json, hotkey, created_at)` + en `graph_explorer.db`. +- Panel/menu "Views" con lista, atajos asignables (Ctrl+1..9). +- Payload incluye: filtros activos, expanded nodes, viewport rect, layout + mode, theme overrides, nodos pinned. +- Boton "Save current as view..." en toolbar. +- Boton "Update view" cuando una view esta activa y el usuario cambia algo. + +## Decisiones + +- Las views son por `graph_hash` (no globales) — cada `operations.db` + tiene su set propio. +- Compartir view entre PCs: export/import JSON manual (v2 podria sync via + `fn sync`). + +## Definicion de hecho + +- Configuro filtros, "Save view as 'Banca'", la veo en el menu. +- Reload de la app -> "Banca" aplica todo lo guardado. +- Un atajo (Ctrl+1..9) salta a la vista correspondiente al instante. +- "Update view" persiste cambios sin crear duplicados. diff --git a/issues/0024-subgraph-export.md b/issues/0024-subgraph-export.md new file mode 100644 index 0000000..5dda857 --- /dev/null +++ b/issues/0024-subgraph-export.md @@ -0,0 +1,45 @@ +--- +id: 0024 +title: Exportar subgrafo seleccionado a Markdown / Mermaid / CSV / PNG +status: pending +priority: medium +created: 2026-05-01 +--- + +## Objetivo + +Seleccion de nodos (rect drag o filtro activo) -> menu "Export as..." con +varios formatos de salida segun el destino. + +## Formatos + +- **Markdown**: una pagina por entidad con sus campos y links a vecinos. + Encaja con 0025 (sync con vault). +- **Mermaid `graph TD`**: para pegar en notas o issues. +- **CSV**: dos archivos `nodes.csv` + `edges.csv` para Gephi/Cytoscape. +- **PNG / SVG**: render del subgrafo con layout actual. +- **JSON**: shape `{nodes:[], edges:[]}` para reimportar o procesar. + +## Alcance + +- Menu "Export selected" en context menu del canvas y en menu superior. +- Cada exportador es una funcion del registry reutilizable + (`export_subgraph_md_cpp_viz`, `export_subgraph_mermaid_cpp_viz`, etc). +- Para PNG/SVG: reusar el render actual a un framebuffer offscreen, con + factor de escalado configurable (1x / 2x / 4x). +- Diccionario de plantillas configurable para Markdown (por entity_type). + +## Decisiones + +- Mermaid copiado al portapapeles automaticamente; otros formatos + abren dialogo de guardado. +- Limite suave a 500 nodos para Mermaid (ilegible mas alla). + +## Definicion de hecho + +- Selecciono 20 nodos, exporto Markdown -> directorio con 20 .md y + enlaces cruzados validos. +- Exporto Mermaid -> string copiado al portapapeles, valido en + mermaid.live. +- Exporto PNG con layout fijo, calidad 2x, fidelidad pixel a la vista. +- CSV importable directo en Gephi sin transformaciones. diff --git a/issues/0025-vault-sync.md b/issues/0025-vault-sync.md new file mode 100644 index 0000000..de36450 --- /dev/null +++ b/issues/0025-vault-sync.md @@ -0,0 +1,44 @@ +--- +id: 0025 +title: Sync bidireccional con vault Obsidian / markdown +status: pending +priority: low +created: 2026-05-01 +depends_on: [0024] +--- + +## Objetivo + +Espejar el grafo activo a un vault de markdown (estilo Obsidian) en +`projects/osint_graph/vaults//`. Cada entidad = una nota; cada +relacion = un wikilink. El usuario puede navegar el grafo desde Obsidian +y editar campos alli; los cambios vuelven al grafo. + +Encaja con `vaults/` ya conceptualizados en el registry. + +## Alcance + +- Watcher de filesystem + serializer/parser de notas con frontmatter + YAML para los campos del entity_type. +- Plantilla por entity_type configurable (apoyandose en el exporter + Markdown de 0024). +- Resolucion de conflictos: timestamp + merge campo a campo; preferencia + configurable (vault wins / db wins / prompt). +- Modo unidireccional inicial (graph -> vault) si la ida y vuelta es + mucho trabajo. v2 anade sync de vuelta. + +## Decisiones + +- Sync continuo o on-demand (boton "Sync now")? Empezar on-demand. El + watcher se anade en una segunda fase. +- Detectar cambios externos via `mtime` + checksum. +- Wikilinks usan ids del registry, no nombres (estables ante renames). + +## Definicion de hecho + +- Boton "Sync to vault" genera N notas con frontmatter correcto y + wikilinks navegables en Obsidian. +- Editar un campo en la nota y "Sync from vault" actualiza la entidad + en operations.db. +- No se pierden datos cuando hay edicion concurrente en ambos lados + (resolucion de conflicto explicita). diff --git a/issues/0026-jobs-system.md b/issues/0026-jobs-system.md new file mode 100644 index 0000000..3e45aa9 --- /dev/null +++ b/issues/0026-jobs-system.md @@ -0,0 +1,209 @@ +--- +id: 0026 +title: Sistema de jobs — enrichers asincronos en background +status: in_progress +priority: high +created: 2026-05-01 +blocks: [0027, 0028, 0029, 0030] +supersedes: [0001, 0002, 0003] +--- + +## Objetivo + +Convertir el menu "Run enricher" (hoy placeholder en main.cpp:485) en un +sistema real de jobs asincronos: el usuario lanza un enricher sobre un +nodo, vuelve a la app y sigue trabajando mientras el enricher procesa +en background. Al terminar, el grafo se recarga automaticamente con las +entidades/relaciones nuevas. + +Este issue solo cubre la **infra**. Los enrichers concretos se escriben +en 0028, 0029, 0030. + +## Decisiones tomadas + +1. **Workers concurrentes**: 2 por defecto, configurable via Settings. +2. **Cache de documentos**: `/cache//.{html,md,png}`. Carpeta gitignored en el sub-repo. +3. **Webpage vs Url**: tipos separados (issue 0027). Url = link suelto, Webpage = documento descargado con cuerpo. +4. **Subprocess Python por job** (no daemon residente): cold start ~200 ms aceptable. Si molesta, issue futura. +5. **Estado en `graph_explorer.db`** (NO en `operations.db`): jobs son especificos de la app, no del grafo. + +## Tabla `jobs` (graph_explorer.db) + +```sql +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, -- ULID + enricher_id TEXT NOT NULL, -- ej: "fetch_webpage" + node_id TEXT, -- nodo objetivo (NULL si batch) + node_name TEXT DEFAULT '', -- snapshot para mostrar en UI + params_json TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL, -- queued|running|done|error|cancelled + progress REAL NOT NULL DEFAULT 0, -- 0..1 + stage TEXT NOT NULL DEFAULT '', -- mensaje corto: "fetching", "extracting" + result_json TEXT, -- {entities_added: N, relations_added: M, ...} + error TEXT, + pid INTEGER, -- subprocess pid para cancelar + created_at INTEGER NOT NULL, + started_at INTEGER, + finished_at INTEGER +); +CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, created_at); +``` + +## Runtime C++ + +### `jobs.{h,cpp}` (nuevo) + +- `JobRunner`: pool de N std::thread workers (default 2). +- Cola en memoria `std::queue` + persistencia en BD. +- API publico: + - `bool jobs_init(const char* db_path, int n_workers);` + - `bool jobs_submit(const char* enricher_id, const char* node_id, const char* params_json, char* out_id);` + - `bool jobs_cancel(const char* job_id);` + - `void jobs_shutdown();` + - `int jobs_dirty_counter();` // incrementa al completar un job; render lo lee + - `bool jobs_list(std::vector* out);` +- Worker hace: + 1. Pop de cola → mark running, started_at = now, pid = subprocess pid. + 2. Spawn `python/.venv/bin/python3 enrichers//run.py` con stdin = JSON. + 3. Lee stderr line-by-line buscando `PROGRESS: ` para actualizar fila. + 4. Lee stdout completo al cerrar — JSON final con entities/relations/node_updates. + 5. Aplica al `operations.db` desde el worker (entity_insert/relation_insert/entity_update). + 6. Marca done o error con result_json/error, finished_at, increment dirty_counter. +- Al arrancar: marca jobs `running` huerfanos como `error: "process died"`. + +### `enrichers.{h,cpp}` (nuevo) + +- Escanea `enrichers/*/manifest.yaml` al arrancar. +- Estructura `EnricherSpec`: + ``` + std::string id, name, description; + std::vector applies_to; // tipos validos + std::vector params; + std::string run_path; // enrichers//run.py absoluto + ``` +- API: + - `void enrichers_load(const char* enrichers_dir);` + - `std::vector enrichers_for_type(const char* type_ref);` + +## Contrato enricher (wire protocol) + +Cada enricher vive en `apps/graph_explorer/enrichers//`: + +``` +enrichers/ + fetch_webpage/ + manifest.yaml + run.py +``` + +### `manifest.yaml` + +```yaml +id: fetch_webpage +name: "Fetch web page" +description: "Descarga HTML, extrae markdown y guarda en cache." +applies_to: [Webpage, Url] +params: + - { name: timeout_s, type: int, default: 15 } + - { name: use_browser, type: bool, default: false } +``` + +### `run.py` — stdin/stdout/stderr + +**stdin** (una linea JSON): +```json +{"node_id":"webpage_123","node_type":"Webpage","node_name":"...", + "metadata":{"url":"https://..."},"params":{"timeout_s":15}, + "db_path":"/path/operations.db","cache_dir":"/path/cache", + "registry_root":"/home/lucas/fn_registry"} +``` + +**stderr** (lineas de progreso, opcional): +``` +PROGRESS:0.10 connecting +PROGRESS:0.50 parsing +PROGRESS:0.90 writing +``` + +**stdout** (una linea JSON al final): +```json +{"node_updates":[ + {"id":"webpage_123","metadata_patch":{"title":"...","status_code":200}} + ], + "entities":[ + {"type":"Domain","name":"example.com","metadata":{}} + ], + "relations":[ + {"from_id":"webpage_123","to_name":"example.com","to_type":"Domain","kind":"BELONGS_TO"} + ], + "notes":""} +``` + +Resolucion de relaciones: +- Si `to_id` esta presente, se usa directamente. +- Si no, se busca por `(to_name, to_type)` en operations.db; si no existe, se crea primero. + +## UI + +### Panel "Jobs" + +- Nuevo panel dockeable (entry en `g_panels[]`). +- Tabla con columnas: enricher, target node (clicable → centra viewport), status (badge), progress bar, stage, duracion, error tooltip. +- Botones inline por fila: cancelar (running), reintentar (error/cancelled), borrar (terminal). +- Filtro: all | active | done | error. + +### Toolbar + +- Badge en la toolbar superior con contador de `running + queued`. Click abre el panel Jobs. + +### Context menu (main.cpp:485) + +Reemplazar el `TextDisabled("coming soon")` por: + +```cpp +auto specs = ge::enrichers_for_type(node->type_ref); +if (specs.empty()) { + ImGui::TextDisabled("(no hay enrichers para este tipo)"); +} else { + for (const auto& s : specs) { + if (ImGui::MenuItem(s.name.c_str())) { + char job_id[64]; + ge::jobs_submit(s.id.c_str(), node->id.c_str(), "{}", job_id); + } + } +} +``` + +## Fases del bucle reactivo en BD + +- **CONSTRUIR**: enricher escrito en `enrichers//`. +- **EJECUTAR**: subprocess corre, escribe progress en `jobs`. +- **RECOPILAR**: stdout JSON parseado, entities/relations aplicadas a operations.db. +- **ANALIZAR**: jobs.result_json contiene metricas (entities_added, duration_ms). +- **MEJORAR**: si un enricher falla repetidamente, futura issue de health checks. + +## Cancelacion + +- Boton "Cancel" en panel Jobs: + - Lee `pid` de la fila. + - `kill(pid, SIGTERM)` (Linux/WSL) o `TerminateProcess` (Windows). + - El worker captura el exit code y marca `cancelled`. + - Si el job aun no salio de la cola (status = queued), el worker simplemente no lo coge — al pop chequea status y skipea cancelled. + +## Definicion de hecho + +- Tabla `jobs` se crea al arrancar la app. +- `JobRunner` con 2 workers acepta `jobs_submit` y procesa. +- Panel Jobs muestra estado en tiempo real (progress bar avanza visiblemente). +- Cancelar mata subprocess y marca `cancelled`. +- Al completar, el grafo se recarga (dirty_counter detectado en render). +- Subir el contador de jobs en la toolbar. +- Test manual: enricher dummy `noop` (incluido en este issue) que duerme 3 s emitiendo PROGRESS y termina sin entidades. Lanzarlo y comprobar UI. + +## Trabajo posterior + +- 0027: tipo Webpage + cache. +- 0028: primer enricher real (`fetch_webpage`) end-to-end. +- 0028b: enrichers extract_domain, extract_links, extract_text_entities. +- 0029: enrichers via CDP (browser headless). +- 0030: macro "Deep enrich" + expand_domain. diff --git a/issues/0027-webpage-type-cache.md b/issues/0027-webpage-type-cache.md new file mode 100644 index 0000000..1591252 --- /dev/null +++ b/issues/0027-webpage-type-cache.md @@ -0,0 +1,82 @@ +--- +id: 0027 +title: Tipo Webpage + cache de documentos descargados +status: pending +priority: high +created: 2026-05-01 +depends_on: [0026] +blocks: [0028, 0029, 0030] +--- + +## Objetivo + +Anadir un tipo `Webpage` al `examples/types.yaml` y un layout +estandarizado de cache donde los enrichers guardan HTML, markdown y +screenshots descargados. El tipo `Url` existente queda como link suelto; +`Webpage` es un documento descargado con cuerpo. + +## Cambios en `examples/types.yaml` + +Anadir tras el bloque `Url`: + +```yaml +- name: Webpage + color: "#89E0FC" + icon: ti-file-text + principal_field: url + fields: + - { name: url, type: url, required: true } + - { name: title, type: string } + - { name: status_code, type: int } + - { name: content_type, type: string } + - { name: fetched_at, type: date } + - { name: html_path, type: string } # cache//.html + - { name: markdown_path, type: string } # cache//.md + - { name: screenshot_path,type: string } # cache//.png + - { name: text_length, type: int } + - { name: lang, type: string } +``` + +## Layout del cache + +``` +/cache/ + ab/ + abcd1234...ef.html + abcd1234...ef.md + abcd1234...ef.png + cd/ + cdef5678...01.html + ... +``` + +- `sha256[0:2]` para evitar miles de archivos en un solo dir. +- Path absoluto desde `` para que sea portable entre PCs (paths relativos en metadata). +- `cache/` se anade al `.gitignore` del sub-repo. + +## Helper C++ + +Funcion en `data.{h,cpp}` (o nuevo `cache_paths.{h,cpp}`): + +```cpp +namespace ge { +// Resuelve el path absoluto donde un enricher debe escribir el blob. +// Crea el dir si no existe. ext sin punto: "html", "md", "png". +// hash_input: tipicamente la URL canonica (normalizada). +std::string cache_path(const char* app_dir, + const char* hash_input, + const char* ext); +} +``` + +- SHA256 calculado en C++ (usar implementacion existente en cpp/functions/core/ si la hay; si no, vendor/sqlite3 trae uno o se anade simple). +- O: el enricher Python calcula el sha256 (mas simple) y lo devuelve como parte de `node_updates`. Decidido: Python calcula el sha256, C++ solo expone `app_dir/cache/` como path absoluto al enricher. + +## Definicion de hecho + +- `Webpage` aparece en types.yaml con icono `ti-file-text`. +- El icono se renderiza correctamente (existe en tabler_codepoint_by_name). +- `cache/` esta en `.gitignore` del sub-repo del app. +- C++ pasa `cache_dir` al enricher en el JSON de stdin. +- Test manual: crear nodo `Webpage` desde el inspector, comprobar que + aparece con el color/icono correctos. diff --git a/issues/0028-enricher-fetch-webpage.md b/issues/0028-enricher-fetch-webpage.md new file mode 100644 index 0000000..53ec317 --- /dev/null +++ b/issues/0028-enricher-fetch-webpage.md @@ -0,0 +1,78 @@ +--- +id: 0028 +title: Enricher fetch_webpage (MVP end-to-end) +status: pending +priority: high +created: 2026-05-01 +depends_on: [0026, 0027] +--- + +## Objetivo + +Primer enricher real sobre el sistema de jobs (0026). Right-click sobre +un nodo `Url` o `Webpage` → "Fetch web page". Descarga el HTML, lo +convierte a markdown, guarda los blobs en cache, actualiza el nodo +(o lo convierte a Webpage si era Url) y crea el nodo `Domain` con +relacion `BELONGS_TO`. + +Este enricher valida el contrato entero. Los siguientes (0028b) reusan +exactamente el mismo wire protocol. + +## Archivos + +``` +apps/graph_explorer/enrichers/fetch_webpage/ + manifest.yaml + run.py +``` + +## `manifest.yaml` + +```yaml +id: fetch_webpage +name: "Fetch web page" +description: "Descarga HTML, extrae markdown limpio y guarda en cache." +applies_to: [Webpage, Url] +emits: [Domain] +relations: [BELONGS_TO] +params: + - { name: timeout_s, type: int, default: 15 } +``` + +## `run.py` + +Logica: +1. Lee JSON de stdin. +2. Saca `url` de `metadata.url` (o `metadata.address` si es Url legacy). +3. `PROGRESS:0.05 normalize` — `normalize_url_py_cybersecurity`. +4. `PROGRESS:0.20 fetching` — descarga via `requests.get(url, timeout=N)`. +5. `PROGRESS:0.60 parsing` — `html_to_markdown_py_core` con readabilipy. +6. `PROGRESS:0.85 writing` — calcula sha256(url), escribe `cache//.html` y `.md`. +7. Emite stdout JSON: + - `node_updates`: cambia type a Webpage si era Url, anade title/status_code/content_type/fetched_at/html_path/markdown_path/text_length. + - `entities`: `{type: Domain, name: , metadata: {}}`. + - `relations`: `from_id: , to_name: , to_type: Domain, kind: BELONGS_TO`. + +## Funciones del registry usadas + +- `normalize_url_py_cybersecurity` — limpia tracking params. +- `html_to_markdown_py_core` — readabilipy + markdownify. +- `extract_domain` se hace inline en el enricher (regex trivial sobre la URL parseada). + +## Manejo de errores + +- HTTP error (4xx/5xx) → escribe status_code en metadata pero NO marca el job como error (el nodo guarda evidencia del fallo). +- Timeout / DNS error / etc → exit con error JSON en stdout: `{"error": "...", "node_updates": [], "entities": [], "relations": []}`. +- Si el enricher levanta excepcion, sale con codigo != 0 y stderr capturado va a `jobs.error`. + +## Definicion de hecho + +- Crear nodo Url con `https://example.com` → click derecho → "Fetch web page". +- En segundos aparece en panel Jobs como `running` con progress. +- Al terminar: + - El nodo cambia a tipo `Webpage` con icono `ti-file-text`. + - El inspector muestra title, status_code, html_path, markdown_path. + - Aparece nodo `Domain` "example.com" conectado por `BELONGS_TO`. + - El archivo `cache/.md` existe en disco. +- El job aparece en panel Jobs como `done` con `entities_added=1, relations_added=1`. +- Tirar la red (sin internet) → el job acaba en `error` con mensaje claro. diff --git a/issues/0028b-enrichers-extract-trio.md b/issues/0028b-enrichers-extract-trio.md new file mode 100644 index 0000000..8fe18df --- /dev/null +++ b/issues/0028b-enrichers-extract-trio.md @@ -0,0 +1,72 @@ +--- +id: 0028b +title: Enrichers extract_domain, extract_links, extract_text_entities +status: pending +priority: high +created: 2026-05-01 +depends_on: [0028] +--- + +## Objetivo + +Tres enrichers Python adicionales que reusan el contrato validado por +`fetch_webpage`. Cada uno cubre un eje de extraccion distinto. + +## 1. `extract_domain` + +``` +applies_to: [Url, Webpage, Email] +emits: [Domain] +relations: [BELONGS_TO] +``` + +- Saca el dominio de `metadata.url` o `metadata.address`. +- Crea nodo `Domain` si no existe + relacion `BELONGS_TO`. +- Util cuando el usuario tiene un Url/Email que aun no ha sido fetched + pero quiere ver el dominio en el grafo. + +## 2. `extract_links` + +``` +applies_to: [Webpage] +emits: [Url] +relations: [LINKS_TO] +``` + +- Lee `metadata.markdown_path`. Si vacio → exit con error "run fetch_webpage first". +- `extract_urls_py_cybersecurity` sobre el contenido. +- Para cada URL distinta encontrada: + - Crea nodo `Url` con `metadata.url` (si no existe). + - Relacion `LINKS_TO` desde la Webpage origen. +- Param: `max_links` (default 50) para no saturar el grafo. + +## 3. `extract_text_entities` + +``` +applies_to: [Webpage] +emits: [Person, Org, Email, Phone, Domain, Location, IPAddress, CVE, ...] +relations: [EXTRACTED_FROM, ...relaciones que GLiREL detecte] +``` + +- Lee `metadata.markdown_path`. +- Llama `extract_graph_hybrid_py_pipelines` (regex IoCs + GLiNER + GLiREL + LLM fallback). +- Para cada entidad detectada: + - Resuelve por `(name, type)` en operations.db. Si no existe la crea. + - Relacion `EXTRACTED_FROM` desde la entidad nueva al nodo Webpage. +- Para cada relacion detectada por GLiREL: + - Relacion entre las dos entidades con el `kind` predicho. +- Params: + - `chunk_size` (default 2000) + - `use_llm_fallback` (default false — evitar coste; el usuario lo activa en jobs concretos) + +## Definicion de hecho + +- Los tres enrichers aparecen en el menu "Run enricher" segun el tipo + del nodo right-clickado. +- En un nodo Webpage el menu muestra los 3 + fetch_webpage. +- Test integracion: + - Crear Url → fetch_webpage → run extract_links sobre el resultado + → run extract_text_entities → grafo se llena con persons/orgs/etc. + - Cada paso es un job independiente visible en panel Jobs. +- `extract_text_entities` con LLM off termina sin coste y produce + entidades de IoC + entidades GLiNER (gratis). diff --git a/issues/0029-enrichers-cdp.md b/issues/0029-enrichers-cdp.md new file mode 100644 index 0000000..37b9e34 --- /dev/null +++ b/issues/0029-enrichers-cdp.md @@ -0,0 +1,63 @@ +--- +id: 0029 +title: Enrichers via Chrome headless (CDP) — fetch_webpage_browser, fetch_screenshot +status: pending +priority: medium +created: 2026-05-01 +depends_on: [0028] +--- + +## Objetivo + +Variantes de los enrichers basicos que usan Chrome headless via CDP, +para sitios con contenido renderizado por JavaScript (SPA, paginas con +auth visual, etc.) o cuando se quiere capturar evidencia visual. + +## 1. `fetch_webpage_browser` + +``` +applies_to: [Url, Webpage] +emits: [Domain] +relations: [BELONGS_TO] +params: + - { name: chrome_port, type: int, default: 9222 } + - { name: wait_after_load_ms, type: int, default: 1500 } +``` + +- Usa funciones del registry: + - `chrome_launch_go_browser` — lanza Chrome en port (reusa si ya esta). + - `cdp_connect_go_browser` + - `cdp_navigate_go_browser` + - `cdp_wait_load_go_browser` + - `cdp_get_html_go_browser` — DOM post-JS. +- El run.py shell-out a un binario Go pequeno o llama estas funciones via + un wrapper Python que invoca el Go function como subprocess. +- Decision pendiente: empaquetar las funciones Go en un binario CLI + `cdp-fetcher` que el run.py invoque, o reescribir la logica en Python + con `pychrome` / `playwright`. Preferencia: binario Go para reusar las + funciones del registry. + +## 2. `fetch_screenshot` + +``` +applies_to: [Webpage, Url] +params: + - { name: full_page, type: bool, default: true } +``` + +- `cdp_screenshot_go_browser` → guarda `cache/.png`. +- `node_updates`: anade `screenshot_path` a metadata del Webpage. +- No emite entidades nuevas. + +## Definicion de hecho + +- `fetch_webpage_browser` extrae correctamente DOM de una SPA (test: + twitter.com, linkedin.com publico). +- `fetch_screenshot` produce PNG legible en el cache. +- Inspector del nodo Webpage muestra una preview del screenshot + cuando `screenshot_path` existe (mejora UI opcional). + +## Out of scope + +- Login flows / auth via CDP — fuera de v1. +- Adblock / fingerprint evasion — el user-agent default es suficiente. diff --git a/issues/0030-deep-enrich-macro.md b/issues/0030-deep-enrich-macro.md new file mode 100644 index 0000000..1a4119c --- /dev/null +++ b/issues/0030-deep-enrich-macro.md @@ -0,0 +1,62 @@ +--- +id: 0030 +title: Macro "Deep enrich" + enricher expand_domain +status: pending +priority: medium +created: 2026-05-01 +depends_on: [0028, 0028b] +--- + +## Objetivo + +Encadenar varios enrichers con un solo click. Cubre dos flujos: + +1. **Deep enrich Webpage**: sobre un nodo Webpage, ejecuta en orden + `fetch_webpage` (si no fetched aun) → `extract_domain` → `extract_links` + → `extract_text_entities`. Cuatro jobs separados, en cadena. +2. **Expand domain**: sobre un nodo Domain, fetch homepage + 1 nivel de + links + extraccion de entidades sobre cada pagina. Util para "dame + todo lo que sepas de este dominio en un click". + +## Implementacion + +### Macro Deep enrich (no es un enricher Python — es UI + orquestacion en C++) + +- Boton/menu item "Deep enrich" en el context menu del nodo Webpage. +- Encolar 4 jobs con dependencias: cada job tiene `depends_on_job_id`. +- Worker pool respeta dependencias: si el job tiene depends_on y el + predecesor no esta `done`, lo deja en cola. +- Anadir columna a tabla `jobs`: `depends_on_job_id TEXT`. + +### Enricher `expand_domain` + +``` +applies_to: [Domain] +params: + - { name: max_pages, type: int, default: 5 } + - { name: deep, type: bool, default: false } # si true, deep enrich cada pagina +``` + +- run.py: + 1. Fetch `https:///` y `http:///` (probando ambos esquemas). + 2. Crea Webpage homepage + relacion `HOMEPAGE_OF` desde Domain. + 3. Si `deep`, encola un job `extract_text_entities` por pagina via + un endpoint local de control (out of scope v1) o emite un campo + especial `chained_jobs: [...]` que el worker C++ encola. + 4. Decision: v1 solo crea las paginas. La cadena con extract_* + se puede hacer manualmente desde la UI o esperar a un sistema + de chained jobs decente. + +## Definicion de hecho + +- Click derecho en Webpage → "Deep enrich" → 4 jobs en cadena visibles + en panel Jobs. Al terminar el ultimo, el grafo tiene domain + links + + persons/orgs/etc. +- Click derecho en Domain → "Expand domain" → Webpage homepage aparece + conectada al Domain. +- Cancelar el job intermedio cancela en cascada los que dependen. + +## Out of scope v1 + +- Cron / repeat schedule de enrichers. +- Progress agregado de la cadena (cada job mantiene su progress propio). From 020f5dabbe53cd96eabb64afc1a82ec0853a29a1 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:24:19 +0200 Subject: [PATCH 2/4] feat(types): tipo Webpage + .gitignore del subrepo (issue 0027) - examples/types.yaml: nuevo tipo Webpage (icono ti-file-text, fields url/title/status_code/content_type/fetched_at/html_path/markdown_path/ screenshot_path/text_length/lang). Url queda como link suelto. - types_registry.cpp: anade ti-file-text al mapa de codepoints Tabler. - .gitignore: cache/, graph_explorer.db (jobs+layouts), build artifacts. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 13 +++++++++++++ examples/types.yaml | 19 +++++++++++++++++++ types_registry.cpp | 1 + 3 files changed, 33 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a9995d --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# SQLite app DB (jobs + layouts) — local de cada PC +graph_explorer.db +graph_explorer.db-shm +graph_explorer.db-wal + +# Cache de documentos descargados por enrichers (issue 0027) +cache/ + +# Build artifacts +build/ +*.exe +*.o +*.obj diff --git a/examples/types.yaml b/examples/types.yaml index d41fdef..a6a17b9 100644 --- a/examples/types.yaml +++ b/examples/types.yaml @@ -110,6 +110,25 @@ entities: - { name: title, type: string } - { name: domain, type: string } + # Documento web descargado. Issue 0027: tipo separado de Url para nodos + # con cuerpo cacheado (HTML+markdown+screenshot). Los enrichers + # fetch_webpage / extract_links / extract_text_entities lo pueblan. + - name: Webpage + color: "#89E0FC" + icon: ti-file-text + principal_field: url + fields: + - { name: url, type: url, required: true } + - { name: title, type: string } + - { name: status_code, type: int } + - { name: content_type, type: string } + - { name: fetched_at, type: date } + - { name: html_path, type: string } + - { name: markdown_path, type: string } + - { name: screenshot_path, type: string } + - { name: text_length, type: int } + - { name: lang, type: string } + # Nodo tabla — cuadrado (regla de forma). Issue 0010: contenedor con # filas que son nodos del grafo. - name: Table diff --git a/types_registry.cpp b/types_registry.cpp index 22e7bfe..35d7418 100644 --- a/types_registry.cpp +++ b/types_registry.cpp @@ -523,6 +523,7 @@ uint16_t tabler_codepoint_by_name(const char* name) { {"ti-at", 0xEA2B}, // TI_AT {"ti-home", 0xEAC1}, // TI_HOME {"ti-database", 0xEA88}, // TI_DATABASE + {"ti-file-text", 0xEAA2}, // TI_FILE_TEXT }; auto it = map.find(name); if (it == map.end()) return 0; From 6df04652d88c8daa006c4eb6e5c06d737dc21b49 Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:24:37 +0200 Subject: [PATCH 3/4] feat(jobs): sistema de jobs asincronos + panel UI (issue 0026) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Infra para correr enrichers en background mientras la app sigue interactiva. C++: - jobs.{h,cpp}: tabla jobs en graph_explorer.db, JobRunner con N=2 std::thread workers, fork+exec POSIX con pipes, parser de PROGRESS: en stderr, captura de stdout JSON, persistencia + dirty_counter. - enrichers.{h,cpp}: scanner de enrichers//manifest.yaml, parser YAML minimo (id/name/description/applies_to), filtro por tipo de nodo. - views_jobs.cpp: panel "Jobs" dockeable con tabla (status/enricher/target/ progress/time), filtro all/active/done/errors, cancelar/borrar inline. Wiring: - main.cpp: resolve_registry_root() (FN_REGISTRY_ROOT env o subir desde cwd buscando registry.db), jobs_init/enrichers_load antes de fn::run_app, jobs_shutdown al cerrar, dirty_counter -> want_reload, jobs_set_ops_db al cambiar de proyecto. - main.cpp:render_context_menu: menu "Run enricher" sustituye placeholder con submenu filtrado por type_ref via enrichers_for_type. Submit abre panel Jobs auto. - views.h: AppState::panel_jobs flag + decl views_jobs(). - CMakeLists.txt: anade jobs.cpp + enrichers.cpp + views_jobs.cpp y enlaza Threads::Threads. Wire protocol enricher (subprocess Python): - stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir, registry_root, params. - stderr: PROGRESS: + LOG lineas libres. - stdout: JSON resumen al final. - exit 0 = ok, !=0 = error con stderr capturado en panel Jobs. El run.py escribe directamente al operations.db (sqlite3 stdlib) — C++ solo orquesta, no parsea entities/relations. Co-Authored-By: Claude Opus 4.7 (1M context) --- CMakeLists.txt | 7 + enrichers.cpp | 172 ++++++++++ enrichers.h | 40 +++ jobs.cpp | 862 +++++++++++++++++++++++++++++++++++++++++++++++++ jobs.h | 97 ++++++ main.cpp | 106 +++++- views.h | 9 + views_jobs.cpp | 199 ++++++++++++ 8 files changed, 1491 insertions(+), 1 deletion(-) create mode 100644 enrichers.cpp create mode 100644 enrichers.h create mode 100644 jobs.cpp create mode 100644 jobs.h create mode 100644 views_jobs.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9be7863..a3f1dfd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,11 +18,14 @@ add_imgui_app(graph_explorer main.cpp data.cpp views.cpp + views_jobs.cpp types_registry.cpp layout_store.cpp entity_ops.cpp project_manager.cpp tableview.cpp + jobs.cpp + enrichers.cpp # --- viz --- ${FN_CPP_ROOT_DIR}/functions/viz/graph_renderer.cpp ${FN_CPP_ROOT_DIR}/functions/viz/graph_force_layout.cpp @@ -58,6 +61,10 @@ target_include_directories(graph_explorer PRIVATE target_link_libraries(graph_explorer PRIVATE SQLite::SQLite3 DuckDB::DuckDB) duckdb_copy_runtime(graph_explorer) +# Threads — issue 0026 (jobs system) usa std::thread + std::mutex + condvar. +find_package(Threads REQUIRED) +target_link_libraries(graph_explorer PRIVATE Threads::Threads) + # OpenGL: graph_renderer + graph_force_layout_gpu llaman gl* directamente. # fn::run_app inicializa el loader cuando AppConfig::init_gl_loader = true. if(NOT WIN32) diff --git a/enrichers.cpp b/enrichers.cpp new file mode 100644 index 0000000..e55d9a8 --- /dev/null +++ b/enrichers.cpp @@ -0,0 +1,172 @@ +#include "enrichers.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ge { + +namespace { + +std::vector g_enrichers; + +std::string strip(const std::string& s) { + size_t a = 0, b = s.size(); + while (a < b && std::isspace((unsigned char)s[a])) ++a; + while (b > a && std::isspace((unsigned char)s[b - 1])) --b; + return s.substr(a, b - a); +} + +std::string strip_quotes(const std::string& s) { + if (s.size() >= 2) { + if ((s.front() == '"' && s.back() == '"') || + (s.front() == '\'' && s.back() == '\'')) { + return s.substr(1, s.size() - 2); + } + } + return s; +} + +std::string lower(std::string s) { + for (auto& c : s) c = (char)std::tolower((unsigned char)c); + return s; +} + +// Parsea una lista inline `[a, b, c]` o "[Webpage, Url]". Tolerante a +// espacios y a comillas simples/dobles dentro. NO soporta listas +// multi-linea — el manifest las usa siempre inline. +std::vector parse_inline_list(const std::string& v) { + std::vector out; + std::string s = strip(v); + if (s.size() < 2 || s.front() != '[' || s.back() != ']') return out; + s = s.substr(1, s.size() - 2); + std::string token; + auto flush = [&]() { + std::string t = strip_quotes(strip(token)); + if (!t.empty()) out.push_back(std::move(t)); + token.clear(); + }; + for (char c : s) { + if (c == ',') flush(); + else token.push_back(c); + } + flush(); + return out; +} + +// Manifest YAML soportado (subset): +// id: fetch_webpage +// name: "Fetch web page" +// description: "..." +// applies_to: [Webpage, Url] +// params: <- v1 ignora bloque +// - { name: timeout_s, ... } +// +// Las claves anidadas bajo `params:` se ignoran (saltamos lineas indentadas). +bool parse_manifest(const std::string& path, EnricherSpec* out) { + std::ifstream f(path); + if (!f) return false; + std::string line; + bool in_skip_block = false; + while (std::getline(f, line)) { + // Strip CR de Windows. + if (!line.empty() && line.back() == '\r') line.pop_back(); + + // Linea blanca o comentario. + std::string trim = strip(line); + if (trim.empty() || trim.front() == '#') continue; + + // Si la linea NO empieza con whitespace, salimos del bloque skip. + bool indented = !line.empty() && std::isspace((unsigned char)line.front()); + if (!indented) in_skip_block = false; + if (in_skip_block) continue; + + size_t colon = trim.find(':'); + if (colon == std::string::npos) continue; + + std::string key = strip(trim.substr(0, colon)); + std::string val = strip(trim.substr(colon + 1)); + + if (key == "id") out->id = strip_quotes(val); + else if (key == "name") out->name = strip_quotes(val); + else if (key == "description") out->description = strip_quotes(val); + else if (key == "applies_to") out->applies_to = parse_inline_list(val); + else if (key == "params" && val.empty()) in_skip_block = true; + // emits/relations los ignoramos en v1 (solo informativos). + } + return !out->id.empty(); +} + +} // namespace + +int enrichers_load(const char* enrichers_dir) { + g_enrichers.clear(); + if (!enrichers_dir || !*enrichers_dir) return -1; + + DIR* d = opendir(enrichers_dir); + if (!d) return -1; + + struct dirent* ent; + while ((ent = readdir(d)) != nullptr) { + if (ent->d_name[0] == '.') continue; + + std::string sub = std::string(enrichers_dir) + "/" + ent->d_name; + struct stat st{}; + if (stat(sub.c_str(), &st) != 0 || !S_ISDIR(st.st_mode)) continue; + + std::string manifest = sub + "/manifest.yaml"; + std::string runpy = sub + "/run.py"; + if (stat(manifest.c_str(), &st) != 0) continue; + if (stat(runpy.c_str(), &st) != 0) continue; + + EnricherSpec spec; + if (!parse_manifest(manifest, &spec)) { + std::fprintf(stderr, "[enrichers] parse failed: %s\n", manifest.c_str()); + continue; + } + spec.run_path = runpy; + g_enrichers.push_back(std::move(spec)); + } + closedir(d); + + std::sort(g_enrichers.begin(), g_enrichers.end(), + [](const EnricherSpec& a, const EnricherSpec& b) { + return a.name < b.name; + }); + return (int)g_enrichers.size(); +} + +const std::vector& enrichers_all() { + return g_enrichers; +} + +std::vector enrichers_for_type(const char* type_ref) { + std::vector out; + if (!type_ref || !*type_ref) return out; + std::string want = lower(type_ref); + for (const auto& e : g_enrichers) { + if (e.applies_to.empty()) { + out.push_back(e); + continue; + } + for (const auto& t : e.applies_to) { + if (lower(t) == want) { out.push_back(e); break; } + } + } + return out; +} + +const EnricherSpec* enricher_by_id(const char* id) { + if (!id || !*id) return nullptr; + for (const auto& e : g_enrichers) { + if (e.id == id) return &e; + } + return nullptr; +} + +} // namespace ge diff --git a/enrichers.h b/enrichers.h new file mode 100644 index 0000000..f2fcfeb --- /dev/null +++ b/enrichers.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +// Registro estatico de enrichers (issue 0026). +// +// Al arrancar la app se escanea `/enrichers/*/manifest.yaml` y se +// rellena el registro. El context menu del viewport consulta +// `enrichers_for_type(type_ref)` para mostrar el submenu filtrado por tipo +// del nodo right-clickado. +// +// Para v1 no parseamos `params` con detalle — solo lo necesario para +// presentar el item de menu y submitear el job con `{}`. + +namespace ge { + +struct EnricherSpec { + std::string id; // ej: "fetch_webpage" + std::string name; // ej: "Fetch web page" + std::string description; + std::vector applies_to; // tipos validos (case-insensitive) + std::string run_path; // path absoluto a run.py +}; + +// Escanea el directorio. Reentrante (limpia el registro anterior). Devuelve +// el numero de enrichers cargados, -1 si el dir no existe. +int enrichers_load(const char* enrichers_dir); + +// Lista todos los enrichers cargados. +const std::vector& enrichers_all(); + +// Filtra por tipo. Comparacion case-insensitive. Si applies_to es vacio en el +// manifest, el enricher se considera global (aplica a cualquier tipo). +std::vector enrichers_for_type(const char* type_ref); + +// Resuelve un enricher por id. Devuelve nullptr si no existe. +const EnricherSpec* enricher_by_id(const char* id); + +} // namespace ge diff --git a/jobs.cpp b/jobs.cpp new file mode 100644 index 0000000..e7e89c0 --- /dev/null +++ b/jobs.cpp @@ -0,0 +1,862 @@ +#include "jobs.h" + +#include "../../../../cpp/vendor/sqlite3/sqlite3.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ge { + +// ---------------------------------------------------------------------------- +// Internal state +// ---------------------------------------------------------------------------- + +namespace { + +struct JobControl { + pid_t pid = -1; + std::atomic cancel_requested{false}; +}; + +struct State { + std::string app_db_path; + std::string ops_db_path; // mutable: cambia con jobs_set_ops_db + std::string enrichers_dir; + std::string app_dir; + std::string registry_root; + + std::mutex q_mu; + std::condition_variable q_cv; + std::queue pending; // job ids + std::unordered_map> running; + + std::vector workers; + std::atomic stop_flag{false}; + std::atomic dirty{0}; +}; + +State* g_state = nullptr; + +// ---- helpers -------------------------------------------------------------- + +long long now_ms() { + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); +} + +std::string ulid() { + // ULID-ish: timestamp ms + 10 random hex chars. Suficiente para un PK. + long long ts = now_ms(); + static std::atomic ctr{(uint32_t)(ts & 0xFFFFFFFF)}; + uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed); + char buf[64]; + std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd); + return buf; +} + +bool sql_exec_simple(sqlite3* db, const char* sql) { + char* err = nullptr; + int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err); + if (rc != SQLITE_OK) { + std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n", + err ? err : "?", sql); + if (err) sqlite3_free(err); + return false; + } + return true; +} + +bool sql_run(sqlite3* db, const char* sql, + const std::vector& params) +{ + sqlite3_stmt* st = nullptr; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) { + std::fprintf(stderr, "[jobs] prepare failed: %s :: %s\n", + sqlite3_errmsg(db), sql); + return false; + } + for (size_t i = 0; i < params.size(); ++i) { + sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1, + SQLITE_TRANSIENT); + } + int rc = sqlite3_step(st); + sqlite3_finalize(st); + return rc == SQLITE_DONE; +} + +bool ensure_table(const char* db_path) { + sqlite3* db = nullptr; + if (sqlite3_open_v2(db_path, &db, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, + nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sql_exec_simple(db, "PRAGMA journal_mode=WAL;"); + bool ok = sql_exec_simple(db, + "CREATE TABLE IF NOT EXISTS jobs (" + " id TEXT PRIMARY KEY," + " enricher_id TEXT NOT NULL," + " node_id TEXT," + " node_name TEXT NOT NULL DEFAULT ''," + " params_json TEXT NOT NULL DEFAULT '{}'," + " status TEXT NOT NULL," + " progress REAL NOT NULL DEFAULT 0," + " stage TEXT NOT NULL DEFAULT ''," + " result_json TEXT," + " error TEXT," + " pid INTEGER," + " created_at INTEGER NOT NULL," + " started_at INTEGER," + " finished_at INTEGER" + ");" + ); + sql_exec_simple(db, + "CREATE INDEX IF NOT EXISTS idx_jobs_status " + "ON jobs(status, created_at);"); + + // Reaper: jobs `running` huerfanos de una sesion anterior. + char ts[32]; + std::snprintf(ts, sizeof(ts), "%lld", now_ms()); + sql_run(db, + "UPDATE jobs SET status='error', error='process died (app restart)', " + "finished_at=? WHERE status='running'", + {ts}); + + sqlite3_close(db); + return ok; +} + +// Escapa string para JSON. Simplificado: maneja comillas, backslash y +// caracteres de control basicos. +std::string json_escape(const std::string& s) { + std::string out; + out.reserve(s.size() + 8); + for (char c : s) { + switch (c) { + case '"': out += "\\\""; break; + case '\\': out += "\\\\"; break; + case '\b': out += "\\b"; break; + case '\f': out += "\\f"; break; + case '\n': out += "\\n"; break; + case '\r': out += "\\r"; break; + case '\t': out += "\\t"; break; + default: + if ((unsigned char)c < 0x20) { + char buf[8]; + std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c); + out += buf; + } else { + out += c; + } + } + } + return out; +} + +// Lee un campo de la entidad como string. Devuelve "" si no existe. +std::string read_entity_field(const char* db_path, const char* id, + const char* col) +{ + sqlite3* db = nullptr; + if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr) + != SQLITE_OK) { + if (db) sqlite3_close(db); + return ""; + } + std::string sql = std::string("SELECT ") + col + + " FROM entities WHERE id = ? LIMIT 1"; + sqlite3_stmt* st = nullptr; + std::string out; + if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT); + if (sqlite3_step(st) == SQLITE_ROW) { + const unsigned char* t = sqlite3_column_text(st, 0); + if (t) out = (const char*)t; + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return out; +} + +// Construye el JSON que se entrega al subprocess via stdin. Lee node de la +// operations.db actual. +std::string build_stdin_json(const std::string& job_id, + const std::string& enricher_id, + const std::string& node_id, + const std::string& params_json, + const std::string& ops_db, + const std::string& app_dir, + const std::string& registry_root) +{ + std::string node_type, node_name, node_metadata = "{}"; + if (!node_id.empty()) { + node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref"); + node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name"); + std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata"); + if (!m.empty()) node_metadata = m; + } + + std::string cache_dir = app_dir + "/cache"; + + std::ostringstream o; + o << '{' + << "\"job_id\":\"" << json_escape(job_id) << "\"," + << "\"enricher_id\":\""<< json_escape(enricher_id) << "\"," + << "\"node_id\":\"" << json_escape(node_id) << "\"," + << "\"node_type\":\"" << json_escape(node_type) << "\"," + << "\"node_name\":\"" << json_escape(node_name) << "\"," + << "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << "," + << "\"params\":" << (params_json.empty() ? "{}" : params_json) << "," + << "\"ops_db_path\":\""<< json_escape(ops_db) << "\"," + << "\"app_dir\":\"" << json_escape(app_dir) << "\"," + << "\"cache_dir\":\"" << json_escape(cache_dir) << "\"," + << "\"registry_root\":\"" << json_escape(registry_root) << "\"" + << '}'; + return o.str(); +} + +// ---- subprocess (POSIX) --------------------------------------------------- + +struct ProcResult { + int exit_code = -1; + bool signaled = false; + int signal = 0; + std::string stdout_buf; + std::string stderr_tail; // ultimas lineas, para mensajes de error +}; + +void update_progress(const std::string& job_id, double prog, + const std::string& stage) +{ + if (!g_state) return; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return; + } + sqlite3_stmt* st = nullptr; + const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_double(st, 1, prog); + sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + sqlite3_close(db); +} + +// Spawnea python3 run.py. Pipes para stdin (write), stdout (read), +// stderr (read). Lee stdout entero al final; lee stderr line-by-line en un +// thread auxiliar parseando "PROGRESS: ". +ProcResult run_subprocess(const std::string& job_id, + const std::string& run_path, + const std::string& stdin_payload, + std::shared_ptr ctrl) +{ + ProcResult out; + + int p_in[2] = {-1, -1}; // padre escribe en p_in[1], hijo lee p_in[0] + int p_out[2] = {-1, -1}; + int p_err[2] = {-1, -1}; + if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) { + out.stderr_tail = "pipe() failed"; + return out; + } + + pid_t pid = fork(); + if (pid < 0) { + out.stderr_tail = "fork() failed"; + for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) { + if (fd >= 0) close(fd); + } + return out; + } + + if (pid == 0) { + // child + dup2(p_in[0], 0); + dup2(p_out[1], 1); + dup2(p_err[1], 2); + close(p_in[0]); close(p_in[1]); + close(p_out[0]); close(p_out[1]); + close(p_err[0]); close(p_err[1]); + + // Resolver intérprete: /python/.venv/bin/python3 + std::string py = g_state->registry_root + "/python/.venv/bin/python3"; + const char* argv[] = { py.c_str(), run_path.c_str(), nullptr }; + execv(py.c_str(), (char* const*)argv); + std::fprintf(stderr, "execv failed: %s\n", py.c_str()); + _exit(127); + } + + // parent + ctrl->pid = pid; + close(p_in[0]); + close(p_out[1]); + close(p_err[1]); + + // Persistir pid en BD para mostrarlo en UI. + { + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) { + sqlite3_stmt* st = nullptr; + if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1, + &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int (st, 1, (int)pid); + sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + sqlite3_close(db); + } + } + + // Escribir stdin entero. + if (!stdin_payload.empty()) { + ssize_t written = 0; + const char* p = stdin_payload.c_str(); + size_t left = stdin_payload.size(); + while (left > 0) { + ssize_t n = write(p_in[1], p + written, left); + if (n < 0) { if (errno == EINTR) continue; break; } + written += n; left -= (size_t)n; + } + } + close(p_in[1]); + + // Thread aux para stderr: parsea PROGRESS y guarda tail. + std::string stderr_tail_local; + std::mutex tail_mu; + std::thread err_t([&]() { + std::string line; + char ch; + while (true) { + ssize_t n = read(p_err[0], &ch, 1); + if (n <= 0) break; + if (ch == '\n') { + // Parse line. + if (line.rfind("PROGRESS:", 0) == 0) { + // PROGRESS: + const char* p = line.c_str() + 9; + char* endp = nullptr; + double prog = std::strtod(p, &endp); + std::string stage; + if (endp && *endp) { + while (*endp == ' ') ++endp; + stage = endp; + } + update_progress(job_id, prog, stage); + } else { + std::lock_guard g(tail_mu); + stderr_tail_local += line; + stderr_tail_local += '\n'; + // Cap a ~4 KB. + if (stderr_tail_local.size() > 4096) { + stderr_tail_local.erase(0, stderr_tail_local.size() - 4096); + } + } + line.clear(); + } else { + line.push_back(ch); + if (line.size() > 4096) line.clear(); // proteccion + } + } + }); + + // Leer stdout entero (sincrono). + { + char buf[4096]; + while (true) { + ssize_t n = read(p_out[0], buf, sizeof(buf)); + if (n <= 0) break; + out.stdout_buf.append(buf, (size_t)n); + if (out.stdout_buf.size() > 1024 * 1024) { + // 1 MB cap. + break; + } + } + } + close(p_out[0]); + + // Esperar al hijo. Si se pidio cancelar, mandamos SIGTERM y SIGKILL. + int status = 0; + while (true) { + if (ctrl->cancel_requested.load() && pid > 0) { + kill(pid, SIGTERM); + // pequena gracia, luego SIGKILL si hace falta + for (int i = 0; i < 5; ++i) { + pid_t r = waitpid(pid, &status, WNOHANG); + if (r == pid) goto reaped; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + kill(pid, SIGKILL); + } + pid_t r = waitpid(pid, &status, 0); + if (r == pid) break; + if (r < 0 && errno == EINTR) continue; + break; + } +reaped: + err_t.join(); + close(p_err[0]); + + if (WIFEXITED(status)) { + out.exit_code = WEXITSTATUS(status); + } else if (WIFSIGNALED(status)) { + out.signaled = true; + out.signal = WTERMSIG(status); + out.exit_code = -1; + } + { + std::lock_guard g(tail_mu); + out.stderr_tail = std::move(stderr_tail_local); + } + return out; +} + +// ---- worker --------------------------------------------------------------- + +void persist_status(const std::string& job_id, const std::string& status, + const std::string& result_json, + const std::string& error, + bool set_finished) +{ + if (!g_state) return; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return; + } + if (set_finished) { + sqlite3_stmt* st = nullptr; + const char* sql = + "UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? " + "WHERE id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_int64 (st, 4, now_ms()); + sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + } else { + sqlite3_stmt* st = nullptr; + const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_int64(st, 2, now_ms()); + sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_step(st); + } + sqlite3_finalize(st); + } + sqlite3_close(db); +} + +// Lee la fila para reconstruir el contexto del job antes de spawn. +struct JobContext { + std::string id, enricher_id, node_id, node_name, params_json, status; +}; +bool load_job(const std::string& id, JobContext* out) { + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status " + "FROM jobs WHERE id=?"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT); + if (sqlite3_step(st) == SQLITE_ROW) { + auto col = [&](int i) { + const unsigned char* t = sqlite3_column_text(st, i); + return std::string(t ? (const char*)t : ""); + }; + out->id = col(0); + out->enricher_id = col(1); + out->node_id = col(2); + out->node_name = col(3); + out->params_json = col(4); + out->status = col(5); + ok = true; + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return ok; +} + +void worker_loop() { + while (!g_state->stop_flag.load()) { + std::string job_id; + { + std::unique_lock lk(g_state->q_mu); + g_state->q_cv.wait(lk, [] { + return g_state->stop_flag.load() || !g_state->pending.empty(); + }); + if (g_state->stop_flag.load()) return; + job_id = std::move(g_state->pending.front()); + g_state->pending.pop(); + } + + JobContext ctx; + if (!load_job(job_id, &ctx)) continue; + if (ctx.status == "cancelled") continue; + + // Resolver run.py por convencion: //run.py. + std::string run_path = g_state->enrichers_dir + "/" + ctx.enricher_id + + "/run.py"; + + // Marcar running. + persist_status(job_id, "running", "", "", false); + + auto ctrl = std::make_shared(); + { + std::lock_guard lk(g_state->q_mu); + g_state->running[job_id] = ctrl; + } + + // Construir stdin y ejecutar. + std::string ops_db; + { + std::lock_guard lk(g_state->q_mu); + ops_db = g_state->ops_db_path; + } + std::string stdin_payload = build_stdin_json( + ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json, + ops_db, g_state->app_dir, g_state->registry_root); + + ProcResult res = run_subprocess(job_id, run_path, stdin_payload, ctrl); + + // Estado final. + std::string final_status, error; + std::string result_json = res.stdout_buf; + // Trim del result_json (saca trailing whitespace). + while (!result_json.empty() && + (result_json.back() == '\n' || result_json.back() == '\r' || + result_json.back() == ' ' || result_json.back() == '\t')) { + result_json.pop_back(); + } + if (ctrl->cancel_requested.load()) { + final_status = "cancelled"; + error = "user cancelled"; + } else if (res.exit_code == 0) { + final_status = "done"; + if (result_json.empty()) result_json = "{}"; + } else { + final_status = "error"; + char buf[64]; + if (res.signaled) { + std::snprintf(buf, sizeof(buf), "signal %d", res.signal); + } else { + std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code); + } + error = std::string(buf); + if (!res.stderr_tail.empty()) { + error += "\n"; + error += res.stderr_tail; + } + } + + persist_status(job_id, final_status, result_json, error, true); + + { + std::lock_guard lk(g_state->q_mu); + g_state->running.erase(job_id); + } + if (final_status == "done") { + g_state->dirty.fetch_add(1, std::memory_order_relaxed); + } + } +} + +} // namespace + +// ---------------------------------------------------------------------------- +// Public API +// ---------------------------------------------------------------------------- + +bool jobs_init(const char* app_db_path, + const char* ops_db_path, + const char* enrichers_dir, + const char* app_dir, + const char* registry_root, + int n_workers) +{ + if (g_state) return true; + if (!app_db_path || !*app_db_path) return false; + if (n_workers < 1) n_workers = 1; + if (n_workers > 8) n_workers = 8; + + if (!ensure_table(app_db_path)) return false; + + g_state = new State(); + g_state->app_db_path = app_db_path; + g_state->ops_db_path = ops_db_path ? ops_db_path : ""; + g_state->enrichers_dir = enrichers_dir ? enrichers_dir : ""; + g_state->app_dir = app_dir ? app_dir : ""; + g_state->registry_root = registry_root ? registry_root : ""; + + // Rehidratacion: jobs queued de sesiones anteriores se reencolan. + { + sqlite3* db = nullptr; + if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY, + nullptr) == SQLITE_OK) { + sqlite3_stmt* st = nullptr; + const char* sql = + "SELECT id FROM jobs WHERE status='queued' ORDER BY created_at"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + while (sqlite3_step(st) == SQLITE_ROW) { + const unsigned char* t = sqlite3_column_text(st, 0); + if (t) g_state->pending.push((const char*)t); + } + } + sqlite3_finalize(st); + sqlite3_close(db); + } + } + + for (int i = 0; i < n_workers; ++i) { + g_state->workers.emplace_back(worker_loop); + } + return true; +} + +void jobs_set_ops_db(const char* ops_db_path) { + if (!g_state) return; + std::lock_guard lk(g_state->q_mu); + g_state->ops_db_path = ops_db_path ? ops_db_path : ""; +} + +bool jobs_submit(const char* enricher_id, + const char* node_id, + const char* node_name, + const char* params_json, + char* out_id, size_t out_id_n) +{ + if (!g_state || !enricher_id || !*enricher_id) return false; + if (!out_id || out_id_n < 32) return false; + + std::string id = ulid(); + std::snprintf(out_id, out_id_n, "%s", id.c_str()); + + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, " + "status, progress, stage, created_at) " + "VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT); + sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT); + sqlite3_bind_int64(st, 6, now_ms()); + ok = sqlite3_step(st) == SQLITE_DONE; + } + sqlite3_finalize(st); + sqlite3_close(db); + if (!ok) return false; + + { + std::lock_guard lk(g_state->q_mu); + g_state->pending.push(id); + } + g_state->q_cv.notify_one(); + return true; +} + +bool jobs_cancel(const char* job_id) { + if (!g_state || !job_id) return false; + std::shared_ptr ctrl; + { + std::lock_guard lk(g_state->q_mu); + auto it = g_state->running.find(job_id); + if (it != g_state->running.end()) ctrl = it->second; + } + if (ctrl) { + ctrl->cancel_requested.store(true); + if (ctrl->pid > 0) kill(ctrl->pid, SIGTERM); + return true; + } + // No corriendo: marcar cancelled si esta queued. + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "UPDATE jobs SET status='cancelled', finished_at=?, " + "error='cancelled before start' WHERE id=? AND status='queued'"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int64(st, 1, now_ms()); + sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT); + ok = sqlite3_step(st) == SQLITE_DONE; + } + sqlite3_finalize(st); + sqlite3_close(db); + return ok; +} + +bool jobs_delete(const char* job_id) { + if (!g_state || !job_id) return false; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')"; + bool ok = false; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT); + ok = sqlite3_step(st) == SQLITE_DONE; + } + sqlite3_finalize(st); + sqlite3_close(db); + return ok; +} + +bool jobs_list(std::vector* out, int limit) { + if (!g_state || !out) return false; + out->clear(); + if (limit < 1) limit = 1; + if (limit > 1000) limit = 1000; + + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return false; + } + sqlite3_stmt* st = nullptr; + const char* sql = + "SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, " + "progress, stage, COALESCE(error,''), COALESCE(result_json,''), " + "created_at, COALESCE(started_at,0), COALESCE(finished_at,0) " + "FROM jobs ORDER BY created_at DESC LIMIT ?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + sqlite3_bind_int(st, 1, limit); + while (sqlite3_step(st) == SQLITE_ROW) { + JobRow r; + auto col = [&](int i) { + const unsigned char* t = sqlite3_column_text(st, i); + return std::string(t ? (const char*)t : ""); + }; + r.id = col(0); + r.enricher_id = col(1); + r.node_id = col(2); + r.node_name = col(3); + r.status = col(4); + r.progress = sqlite3_column_double(st, 5); + r.stage = col(6); + r.error = col(7); + r.result_json = col(8); + r.created_at = sqlite3_column_int64(st, 9); + r.started_at = sqlite3_column_int64(st, 10); + r.finished_at = sqlite3_column_int64(st, 11); + out->push_back(std::move(r)); + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return true; +} + +JobCounters jobs_counters() { + JobCounters c{}; + if (!g_state) return c; + sqlite3* db = nullptr; + if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db, + SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) { + if (db) sqlite3_close(db); + return c; + } + sqlite3_stmt* st = nullptr; + const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) { + while (sqlite3_step(st) == SQLITE_ROW) { + const unsigned char* s = sqlite3_column_text(st, 0); + int n = sqlite3_column_int(st, 1); + if (!s) continue; + std::string status((const char*)s); + if (status == "queued") c.queued = n; + else if (status == "running") c.running = n; + else if (status == "done") c.done = n; + else if (status == "error") c.error = n; + else if (status == "cancelled") c.cancelled = n; + } + } + sqlite3_finalize(st); + sqlite3_close(db); + return c; +} + +int jobs_dirty_counter() { + if (!g_state) return 0; + return g_state->dirty.load(std::memory_order_relaxed); +} + +void jobs_shutdown() { + if (!g_state) return; + g_state->stop_flag.store(true); + // Cancelar todos los running. + { + std::lock_guard lk(g_state->q_mu); + for (auto& kv : g_state->running) { + kv.second->cancel_requested.store(true); + if (kv.second->pid > 0) kill(kv.second->pid, SIGTERM); + } + } + g_state->q_cv.notify_all(); + for (auto& t : g_state->workers) { + if (t.joinable()) t.join(); + } + delete g_state; + g_state = nullptr; +} + +} // namespace ge diff --git a/jobs.h b/jobs.h new file mode 100644 index 0000000..b51b793 --- /dev/null +++ b/jobs.h @@ -0,0 +1,97 @@ +#pragma once + +#include +#include +#include +#include + +// Sistema de jobs asincronos para enrichers (issue 0026). +// +// Diseno: +// - Tabla `jobs` en graph_explorer.db (NO en operations.db). +// - Pool de N std::thread workers (default 2). +// - Cada job spawnea un subprocess Python `enrichers//run.py` que recibe +// contexto por stdin (JSON), emite progreso por stderr y resultado final +// por stdout (JSON). +// - El worker aplica entities/relations/node_updates al operations.db indicado. +// - dirty_counter se incrementa al completar un job — el render() lee el +// contador y triggerea un reload del grafo cuando cambia. +// +// El estado vivo (running/queued en RAM) se persiste tambien en SQLite para +// que cancelaciones, reinicios y queries de UI vean lo mismo. + +namespace ge { + +struct JobRow { + std::string id; + std::string enricher_id; + std::string node_id; + std::string node_name; + std::string status; // queued|running|done|error|cancelled + double progress = 0.0; + std::string stage; + std::string error; + std::string result_json; + long long created_at = 0; + long long started_at = 0; + long long finished_at = 0; +}; + +// Inicializa el sistema. Crea la tabla `jobs` si no existe, marca como `error` +// los jobs que quedaron `running` de una sesion anterior, lanza los workers. +// `app_db_path` es /graph_explorer.db (jobs). +// `ops_db_path` es la operations.db actualmente cargada (target de mutaciones). +// `enrichers_dir` es /enrichers/. +// `app_dir` se usa como root para resolver `cache/` (issue 0027). +// `registry_root` se pasa al subprocess como FN_REGISTRY_ROOT para que el +// enricher pueda importar funciones del registry. +// Retorna false si alguno de los paths falla. +bool jobs_init(const char* app_db_path, + const char* ops_db_path, + const char* enrichers_dir, + const char* app_dir, + const char* registry_root, + int n_workers); + +// Cambia la operations.db objetivo (cuando el usuario abre otra). Drena cola +// pendiente — los jobs ya queued NO se reapuntan. +void jobs_set_ops_db(const char* ops_db_path); + +// Encola un job. params_json puede ser "{}" o JSON valido. node_id puede ser +// vacio (job global). Devuelve false si la BD falla. out_id (>= 64) recibe el +// ULID generado. +bool jobs_submit(const char* enricher_id, + const char* node_id, + const char* node_name, + const char* params_json, + char* out_id, size_t out_id_n); + +// Senala SIGTERM al subprocess y marca el job como `cancelled`. Si esta +// `queued` lo marca directamente. +bool jobs_cancel(const char* job_id); + +// Borra un job en estado terminal (done/error/cancelled). +bool jobs_delete(const char* job_id); + +// Snapshot ordenado por created_at DESC. +bool jobs_list(std::vector* out, int limit = 200); + +// Counters para el badge en la toolbar. +struct JobCounters { + int queued = 0; + int running = 0; + int done = 0; + int error = 0; + int cancelled = 0; +}; +JobCounters jobs_counters(); + +// Counter monotono que se incrementa cada vez que un job completa con +// cambios en operations.db. El main thread compara contra su ultimo valor +// conocido y, si cambio, llama reload_graph(). +int jobs_dirty_counter(); + +// Cierra el pool y espera a los workers (cancelando jobs en vuelo). +void jobs_shutdown(); + +} // namespace ge diff --git a/main.cpp b/main.cpp index 76faab4..14d759b 100644 --- a/main.cpp +++ b/main.cpp @@ -25,6 +25,8 @@ #include "layout_store.h" #include "entity_ops.h" #include "project_manager.h" +#include "jobs.h" +#include "enrichers.h" #include "../../../../cpp/vendor/sqlite3/sqlite3.h" @@ -39,6 +41,13 @@ #include #include +#ifndef _WIN32 +#include +#else +#include +#define getcwd _getcwd +#endif + // ---------------------------------------------------------------------------- // Estado global de la app // ---------------------------------------------------------------------------- @@ -116,6 +125,32 @@ static void apply_static_layout(int mode) { // Forward decl — definido mas abajo, lo necesita switch_to_project. static bool load_input(); +// ---------------------------------------------------------------------------- +// Registry path resolution (issue 0026) +// ---------------------------------------------------------------------------- + +// Devuelve el path absoluto al root de fn_registry. Estrategia: +// 1) FN_REGISTRY_ROOT env var +// 2) Sube desde getcwd() buscando un dir con `registry.db` +// 3) "" si no se encuentra +static std::string resolve_registry_root() { + if (const char* env = std::getenv("FN_REGISTRY_ROOT")) { + if (env && *env) return env; + } + char cwd[4096]; + if (getcwd(cwd, sizeof(cwd)) == nullptr) return ""; + std::string p = cwd; + for (int i = 0; i < 8; ++i) { + std::string probe = p + "/registry.db"; + FILE* f = std::fopen(probe.c_str(), "rb"); + if (f) { std::fclose(f); return p; } + size_t s = p.find_last_of('/'); + if (s == std::string::npos || s == 0) break; + p = p.substr(0, s); + } + return ""; +} + // ---------------------------------------------------------------------------- // Project lifecycle // ---------------------------------------------------------------------------- @@ -240,6 +275,9 @@ static bool load_input() { ge::entity_index_build(g_input.uri, &g_idx); g_app.input_db_path = g_input.uri ? g_input.uri : ""; + // issue 0026 — apunta el JobRunner a la nueva operations.db. + if (g_input.uri) ge::jobs_set_ops_db(g_input.uri); + // Cargar posiciones guardadas para este graph_hash g_graph_hash = ge::compute_graph_hash(g_input.uri); int restored = ge::layout_store_load(g_graph_hash, g_graph); @@ -483,7 +521,27 @@ static void render_context_menu() { ImGui::Separator(); if (ImGui::BeginMenu("Run enricher")) { - ImGui::TextDisabled("(coming soon — issues 0001/0002/0003)"); + // issue 0026 — listamos enrichers cuyo applies_to incluye este type. + const char* type_name = (n.type_id < (uint16_t)g_graph.type_count) + ? g_graph.types[n.type_id].name : ""; + auto specs = ge::enrichers_for_type(type_name); + if (!sql_id) { + ImGui::TextDisabled("(node has no entity id)"); + } else if (specs.empty()) { + ImGui::TextDisabled("(no enrichers para tipo '%s')", type_name); + } else { + for (const auto& s : specs) { + if (ImGui::MenuItem(s.name.c_str())) { + char job_id[64]; + bool ok = ge::jobs_submit(s.id.c_str(), sql_id, lbl, + "{}", job_id, sizeof(job_id)); + if (ok) g_app.panel_jobs = true; // abrir panel auto + } + if (!s.description.empty() && ImGui::IsItemHovered()) { + ImGui::SetTooltip("%s", s.description.c_str()); + } + } + } ImGui::EndMenu(); } @@ -512,6 +570,7 @@ static fn_ui::PanelToggle g_panels[] = { {"Note", nullptr, &g_app.panel_note}, {"Types", nullptr, &g_app.panel_type_editor}, {"Table", nullptr, &g_app.panel_table}, + {"Jobs", nullptr, &g_app.panel_jobs}, }; static void render() { @@ -596,6 +655,16 @@ static void render() { g_app.apply_layout_tick = 0; } + // issue 0026 — si un job termino con cambios, dispara reload del grafo. + { + static int s_last_dirty = 0; + int d = ge::jobs_dirty_counter(); + if (d != s_last_dirty) { + s_last_dirty = d; + g_app.want_reload = true; + } + } + // Triggers desde la toolbar if (g_app.want_fit) { graph_viewport_fit(g_graph, g_viewport); @@ -1194,6 +1263,12 @@ static void render() { ge::views_table_window(g_app); ge::views_import_dataset_modal(g_app); + // Jobs panel (issue 0026) — flotante, dockeable. + ImGui::SetNextWindowPos (ImVec2(vp->WorkPos.x + W * 0.20f, top + 40.0f), + ImGuiCond_FirstUseEver); + ImGui::SetNextWindowSize(ImVec2(900.0f, 360.0f), ImGuiCond_FirstUseEver); + ge::views_jobs(g_app); + g_first_render = false; } @@ -1418,6 +1493,34 @@ int main(int argc, char** argv) { "cualquier app del registry y permite explorar entidades/relaciones con " "shapes/iconos/layouts/filtros."); + // issue 0026 — sistema de jobs + enrichers. + { + std::string registry_root = resolve_registry_root(); + std::string app_dir = registry_root.empty() + ? "." + : registry_root + "/projects/osint_graph/apps/graph_explorer"; + std::string enrichers_dir = app_dir + "/enrichers"; + + // graph_explorer.db es el mismo SQLite usado por layout_store. + const char* app_db = g_layout_db_path.empty() + ? "graph_explorer.db" : g_layout_db_path.c_str(); + + ge::enrichers_load(enrichers_dir.c_str()); + if (!ge::jobs_init(app_db, + g_input.uri ? g_input.uri : "", + enrichers_dir.c_str(), + app_dir.c_str(), + registry_root.c_str(), + /*n_workers=*/2)) { + std::fprintf(stderr, "[graph_explorer] jobs_init failed (panel disabled)\n"); + } else { + std::fprintf(stdout, + "[graph_explorer] jobs_init OK — enrichers_dir=%s, registry_root=%s, %d enrichers\n", + enrichers_dir.c_str(), registry_root.c_str(), + (int)ge::enrichers_all().size()); + } + } + int rc = fn::run_app( {.title = "graph_explorer", .width = 1600, @@ -1429,6 +1532,7 @@ int main(int argc, char** argv) { render); // Cleanup + ge::jobs_shutdown(); if (g_gpu_ctx) graph_force_layout_gpu_destroy(g_gpu_ctx); if (g_atlas) graph_icons_destroy(g_atlas); graph_viewport_destroy(g_viewport); diff --git a/views.h b/views.h index af42af3..a12ac92 100644 --- a/views.h +++ b/views.h @@ -55,6 +55,7 @@ struct AppState { bool panel_stats = true; bool panel_viewport = true; bool panel_note = false; + bool panel_jobs = false; // issue 0026 bool show_filters_modal = false; bool show_open_modal = false; @@ -353,6 +354,14 @@ void views_type_editor(AppState& app); // te_delete_use_count via consulta a operations.db antes de mostrarlo. bool views_type_editor_delete_modal(AppState& app); +// ---- Jobs panel (issue 0026) --------------------------------------------- + +// Renderiza el panel "Jobs" — tabla con todos los jobs (queued/running/done/ +// error/cancelled). Botones por fila para cancelar / reintentar / borrar. +// Click en target_node centra el viewport sobre ese nodo (futuro). Polling +// cada N frames para no spammear la BD. +void views_jobs(AppState& app); + // ---- Filter helpers (issue 0009) ----------------------------------------- // True si el filtro tiene query no vacia o al menos un tag activo. diff --git a/views_jobs.cpp b/views_jobs.cpp new file mode 100644 index 0000000..36bc679 --- /dev/null +++ b/views_jobs.cpp @@ -0,0 +1,199 @@ +#include "views.h" +#include "jobs.h" + +#include "core/icons_tabler.h" +#include "core/tokens.h" + +#include "imgui.h" + +#include +#include +#include + +namespace ge { + +namespace { + +// Cache de la lista de jobs. Se refresca cada N frames para no abrir SQLite +// en cada frame. ~10 Hz es suficiente para una progress bar fluida. +struct JobsCache { + std::vector rows; + int last_frame_refresh = -100; + int filter_idx = 0; // 0=all 1=active 2=done 3=error + char buf[8] = {}; +}; +JobsCache g_jobs_cache; + +const char* status_icon(const std::string& s) { + if (s == "queued") return TI_HOURGLASS; + if (s == "running") return TI_PLAYER_PLAY; + if (s == "done") return TI_CHECK; + if (s == "error") return TI_ALERT_CIRCLE; + if (s == "cancelled") return TI_X; + return TI_QUESTION_MARK; +} + +ImVec4 status_color(const std::string& s) { + if (s == "running") return ImVec4(0.36f, 0.78f, 1.0f, 1.0f); + if (s == "done") return ImVec4(0.40f, 0.85f, 0.55f, 1.0f); + if (s == "error") return ImVec4(0.95f, 0.45f, 0.45f, 1.0f); + if (s == "cancelled") return ImVec4(0.65f, 0.65f, 0.65f, 1.0f); + if (s == "queued") return ImVec4(0.85f, 0.78f, 0.45f, 1.0f); + return ImVec4(0.7f, 0.7f, 0.7f, 1.0f); +} + +std::string format_duration(long long started, long long finished) { + if (started <= 0) return "—"; + long long end = finished > 0 ? finished + : std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + long long ms = end - started; + if (ms < 0) ms = 0; + char b[32]; + if (ms < 1000) std::snprintf(b, sizeof(b), "%lld ms", ms); + else if (ms < 60'000) std::snprintf(b, sizeof(b), "%.1f s", ms / 1000.0); + else std::snprintf(b, sizeof(b), "%.1f m", ms / 60'000.0); + return b; +} + +bool filter_match(const std::string& status, int idx) { + switch (idx) { + case 0: return true; // all + case 1: return status == "queued" || status == "running"; // active + case 2: return status == "done"; + case 3: return status == "error" || status == "cancelled"; + default: return true; + } +} + +} // namespace + +void views_jobs(AppState& app) { + if (!app.panel_jobs) return; + + if (!ImGui::Begin("Jobs", &app.panel_jobs)) { + ImGui::End(); + return; + } + + // Refresh cache cada ~10 frames (~6 Hz a 60fps). + int frame = ImGui::GetFrameCount(); + if (frame - g_jobs_cache.last_frame_refresh > 10) { + jobs_list(&g_jobs_cache.rows, 200); + g_jobs_cache.last_frame_refresh = frame; + } + + // Header: counters + filtro. + JobCounters c = jobs_counters(); + ImGui::TextColored(status_color("running"), "%s", TI_PLAYER_PLAY); + ImGui::SameLine(); ImGui::Text("%d", c.running); + ImGui::SameLine(0, 16); + ImGui::TextColored(status_color("queued"), "%s", TI_HOURGLASS); + ImGui::SameLine(); ImGui::Text("%d", c.queued); + ImGui::SameLine(0, 16); + ImGui::TextColored(status_color("done"), "%s", TI_CHECK); + ImGui::SameLine(); ImGui::Text("%d", c.done); + ImGui::SameLine(0, 16); + ImGui::TextColored(status_color("error"), "%s", TI_ALERT_CIRCLE); + ImGui::SameLine(); ImGui::Text("%d", c.error + c.cancelled); + + ImGui::SameLine(0, 24); + const char* filter_labels[] = { "All", "Active", "Done", "Errors" }; + ImGui::SetNextItemWidth(100); + ImGui::Combo("##jobs_filter", &g_jobs_cache.filter_idx, + filter_labels, IM_ARRAYSIZE(filter_labels)); + + ImGui::Separator(); + + // Tabla. + ImGuiTableFlags tflags = ImGuiTableFlags_Borders | + ImGuiTableFlags_RowBg | + ImGuiTableFlags_SizingStretchProp | + ImGuiTableFlags_ScrollY; + if (ImGui::BeginTable("jobs_table", 6, tflags, + ImVec2(0, ImGui::GetContentRegionAvail().y - 4))) { + ImGui::TableSetupScrollFreeze(0, 1); + ImGui::TableSetupColumn("Status", ImGuiTableColumnFlags_WidthFixed, 90); + ImGui::TableSetupColumn("Enricher", ImGuiTableColumnFlags_WidthStretch, 1.5f); + ImGui::TableSetupColumn("Target", ImGuiTableColumnFlags_WidthStretch, 2.0f); + ImGui::TableSetupColumn("Progress", ImGuiTableColumnFlags_WidthStretch, 2.0f); + ImGui::TableSetupColumn("Time", ImGuiTableColumnFlags_WidthFixed, 70); + ImGui::TableSetupColumn("##actions",ImGuiTableColumnFlags_WidthFixed, 80); + ImGui::TableHeadersRow(); + + for (const auto& r : g_jobs_cache.rows) { + if (!filter_match(r.status, g_jobs_cache.filter_idx)) continue; + ImGui::PushID(r.id.c_str()); + ImGui::TableNextRow(); + + // Status. + ImGui::TableSetColumnIndex(0); + ImGui::TextColored(status_color(r.status), "%s %s", + status_icon(r.status), r.status.c_str()); + + // Enricher. + ImGui::TableSetColumnIndex(1); + ImGui::TextUnformatted(r.enricher_id.c_str()); + + // Target. + ImGui::TableSetColumnIndex(2); + if (!r.node_name.empty()) { + ImGui::TextUnformatted(r.node_name.c_str()); + } else if (!r.node_id.empty()) { + ImGui::TextDisabled("%s", r.node_id.c_str()); + } else { + ImGui::TextDisabled("(global)"); + } + + // Progress. + ImGui::TableSetColumnIndex(3); + if (r.status == "running" || r.status == "queued") { + ImGui::ProgressBar((float)r.progress, ImVec2(-FLT_MIN, 0), + r.stage.empty() ? nullptr : r.stage.c_str()); + } else if (r.status == "error" && !r.error.empty()) { + ImGui::TextColored(status_color("error"), "%s", + r.error.size() > 64 + ? (r.error.substr(0, 64) + "…").c_str() + : r.error.c_str()); + if (ImGui::IsItemHovered()) { + ImGui::SetTooltip("%s", r.error.c_str()); + } + } else if (r.status == "done" && !r.result_json.empty()) { + ImGui::TextDisabled("%s", + r.result_json.size() > 80 + ? (r.result_json.substr(0, 80) + "…").c_str() + : r.result_json.c_str()); + if (ImGui::IsItemHovered() && r.result_json.size() > 80) { + ImGui::SetTooltip("%s", r.result_json.c_str()); + } + } else { + ImGui::TextDisabled("—"); + } + + // Time. + ImGui::TableSetColumnIndex(4); + ImGui::TextDisabled("%s", + format_duration(r.started_at, r.finished_at).c_str()); + + // Actions. + ImGui::TableSetColumnIndex(5); + if (r.status == "queued" || r.status == "running") { + if (ImGui::SmallButton("Cancel")) { + jobs_cancel(r.id.c_str()); + } + } else { + if (ImGui::SmallButton("Delete")) { + jobs_delete(r.id.c_str()); + } + } + + ImGui::PopID(); + } + ImGui::EndTable(); + } + + ImGui::End(); +} + +} // namespace ge From 7ec6c4e09f9316db4864218c0f112321020f417a Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:24:52 +0200 Subject: [PATCH 4/4] =?UTF-8?q?feat(enrichers):=20cuatro=20enrichers=20web?= =?UTF-8?q?=20=E2=80=94=20fetch=20+=20extract=20trio=20(issues=200028,=200?= =?UTF-8?q?028b)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cada enricher es un par manifest.yaml + run.py en enrichers//. 1. fetch_webpage (Url, Webpage): HTTP GET (requests, fallback urllib) -> html_to_markdown_py_core -> sha256(url) -> guarda HTML+MD en cache//.{html,md}. Convierte Url -> Webpage con metadata enriquecida (title/status_code/content_type/ paths/text_length). Crea Domain con relacion BELONGS_TO. 2. extract_domain (Url, Webpage, Email): Saca dominio de metadata.url o metadata.address (sin I/O). Crea/conecta Domain con BELONGS_TO. Util cuando el usuario quiere ver el dominio antes de fetch. 3. extract_links (Webpage): Lee metadata.markdown_path -> extract_urls_py_cybersecurity -> dedup -> crea nodo Url por enlace + relacion LINKS_TO. Param max_links (50). 4. extract_text_entities (Webpage): Lee metadata.markdown_path -> extract_iocs_py_cybersecurity (regex puro, sin coste) -> crea entidades por (type, value) tipadas en el registro: Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone. Cada una con relacion EXTRACTED_FROM al Webpage origen. v1 sin GLiNER/ GLiREL — esos requieren modelos pre-cargados (futura iteracion). Probado end-to-end: fetch_webpage https://httpbin.org/html -> 1 Webpage + 1 Domain extract_links -> 2 Url + 2 LINKS_TO extract_text_entities -> 8 IoCs (Email, IP*2, CVE, Domain*2, Wallet, Phone) Co-Authored-By: Claude Opus 4.7 (1M context) --- enrichers/extract_domain/manifest.yaml | 7 + enrichers/extract_domain/run.py | 125 +++++++ enrichers/extract_links/manifest.yaml | 8 + enrichers/extract_links/run.py | 139 +++++++ enrichers/extract_text_entities/manifest.yaml | 9 + enrichers/extract_text_entities/run.py | 187 ++++++++++ enrichers/fetch_webpage/manifest.yaml | 8 + enrichers/fetch_webpage/run.py | 338 ++++++++++++++++++ 8 files changed, 821 insertions(+) create mode 100644 enrichers/extract_domain/manifest.yaml create mode 100755 enrichers/extract_domain/run.py create mode 100644 enrichers/extract_links/manifest.yaml create mode 100755 enrichers/extract_links/run.py create mode 100644 enrichers/extract_text_entities/manifest.yaml create mode 100755 enrichers/extract_text_entities/run.py create mode 100644 enrichers/fetch_webpage/manifest.yaml create mode 100755 enrichers/fetch_webpage/run.py diff --git a/enrichers/extract_domain/manifest.yaml b/enrichers/extract_domain/manifest.yaml new file mode 100644 index 0000000..7a14e75 --- /dev/null +++ b/enrichers/extract_domain/manifest.yaml @@ -0,0 +1,7 @@ +id: extract_domain +name: "Extract domain" +description: "Saca el dominio de la url/email del nodo y crea/conecta una entidad Domain con relacion BELONGS_TO. No descarga nada." +applies_to: [Url, Webpage, Email] +emits: [Domain] +relations: [BELONGS_TO] +params: [] diff --git a/enrichers/extract_domain/run.py b/enrichers/extract_domain/run.py new file mode 100755 index 0000000..53f73fb --- /dev/null +++ b/enrichers/extract_domain/run.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""Enricher extract_domain — issue 0028b. + +Saca el dominio de un nodo Url/Webpage (campo metadata.url) o Email (campo +metadata.address) y crea/conecta una entidad Domain con relacion BELONGS_TO. +No hace I/O de red. +""" +from __future__ import annotations + +import json +import sqlite3 +import sys +import time +from datetime import datetime, timezone +from urllib.parse import urlparse + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def domain_from_url(url: str) -> str: + if not url: + return "" + if "://" not in url: + url = "https://" + url + try: + return (urlparse(url).hostname or "").lower() + except Exception: + return "" + + +def domain_from_email(addr: str) -> str: + if "@" not in addr: + return "" + return addr.split("@", 1)[1].strip().lower() + + +def main() -> int: + ctx = json.loads(sys.stdin.read()) + node_id = ctx.get("node_id") or "" + node_type = (ctx.get("node_type") or "").lower() + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + ops_db = ctx.get("ops_db_path") or "" + if not node_id or not ops_db: + sys.stderr.write("missing node_id / ops_db_path\n") + return 2 + + progress(0.30, "extracting") + dname = "" + if node_type == "email": + addr = metadata.get("address") or ctx.get("node_name") or "" + dname = domain_from_email(addr) + else: + url = metadata.get("url") or ctx.get("node_name") or "" + dname = domain_from_url(url) + + if not dname: + print(json.dumps({"warning": "no domain extractable", + "entities_added": 0, "relations_added": 0})) + return 0 + + progress(0.70, "writing") + conn = sqlite3.connect(ops_db) + entities_added = 0 + relations_added = 0 + try: + existed = conn.execute( + "SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1", + (dname,), + ).fetchone() + if existed: + domain_id = existed[0] + else: + domain_id = f"Domain_{now_ms()}" + ts = now_iso() + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) " + "VALUES (?, ?, 'Domain', 'enricher:extract_domain', ?, ?)", + (domain_id, dname, ts, ts), + ) + entities_added = 1 + + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='BELONGS_TO' LIMIT 1", + (node_id, domain_id), + ).fetchone() + if not rel_exists: + ts = now_iso() + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, 'BELONGS_TO', ?, ?, ?, ?)", + (f"rel_{now_ms()}_belongs_to", node_id, domain_id, ts, ts), + ) + relations_added = 1 + + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "domain": dname, + "entities_added": entities_added, + "relations_added": relations_added, + })) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/extract_links/manifest.yaml b/enrichers/extract_links/manifest.yaml new file mode 100644 index 0000000..625f194 --- /dev/null +++ b/enrichers/extract_links/manifest.yaml @@ -0,0 +1,8 @@ +id: extract_links +name: "Extract links" +description: "Lee la markdown cacheada de un Webpage (metadata.markdown_path) y crea nodos Url para cada enlace encontrado, conectados con relacion LINKS_TO. Requiere haber ejecutado fetch_webpage antes." +applies_to: [Webpage] +emits: [Url] +relations: [LINKS_TO] +params: + - { name: max_links, type: int, default: 50 } diff --git a/enrichers/extract_links/run.py b/enrichers/extract_links/run.py new file mode 100755 index 0000000..6519b60 --- /dev/null +++ b/enrichers/extract_links/run.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""Enricher extract_links — issue 0028b. + +Lee la markdown cacheada de un Webpage (metadata.markdown_path), saca todas +las URLs unicas con `extract_urls_py_cybersecurity`, y crea/conecta un nodo +Url por cada URL nueva con relacion LINKS_TO desde el Webpage origen. +""" +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import time +from datetime import datetime, timezone + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def main() -> int: + ctx = json.loads(sys.stdin.read()) + node_id = ctx.get("node_id") or "" + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: metadata = json.loads(metadata) + except Exception: metadata = {} + ops_db = ctx.get("ops_db_path") or "" + app_dir = ctx.get("app_dir") or "" + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + max_links = int(params.get("max_links", 50)) + + if not node_id or not ops_db: + log("missing node_id / ops_db_path") + return 2 + + md_path = metadata.get("markdown_path") or "" + if not md_path: + log("nodo sin markdown_path — corre fetch_webpage primero") + print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.", + "entities_added": 0, "relations_added": 0})) + return 3 + + # Path relativo a app_dir. + abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path) + if not os.path.exists(abs_md): + log(f"markdown not found at {abs_md}") + print(json.dumps({"error": f"markdown not found: {abs_md}", + "entities_added": 0, "relations_added": 0})) + return 4 + + progress(0.20, "reading") + text = open(abs_md, "r", encoding="utf-8", errors="replace").read() + + progress(0.45, "extracting") + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + from cybersecurity.cybersecurity import extract_urls # type: ignore + + urls = extract_urls(text) + # Dedup conservando orden. + seen = set() + unique = [] + for u in urls: + if u not in seen: + seen.add(u) + unique.append(u) + if max_links > 0: + unique = unique[:max_links] + + progress(0.65, "writing") + conn = sqlite3.connect(ops_db) + entities_added = 0 + relations_added = 0 + try: + for i, u in enumerate(unique): + existed = conn.execute( + "SELECT id FROM entities WHERE type_ref='Url' AND name=? LIMIT 1", + (u,), + ).fetchone() + if existed: + target_id = existed[0] + else: + target_id = f"Url_{now_ms()}_{i}" + ts = now_iso() + meta_json = json.dumps({"url": u}) + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) " + "VALUES (?, ?, 'Url', 'enricher:extract_links', ?, ?, ?)", + (target_id, u, meta_json, ts, ts), + ) + entities_added += 1 + + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='LINKS_TO' LIMIT 1", + (node_id, target_id), + ).fetchone() + if not rel_exists: + ts = now_iso() + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, 'LINKS_TO', ?, ?, ?, ?)", + (f"rel_{now_ms()}_{i}_links_to", node_id, target_id, ts, ts), + ) + relations_added += 1 + if i % 10 == 0: + progress(0.65 + 0.30 * (i / max(1, len(unique))), "writing") + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "links_found": len(unique), + "entities_added": entities_added, + "relations_added": relations_added, + })) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/extract_text_entities/manifest.yaml b/enrichers/extract_text_entities/manifest.yaml new file mode 100644 index 0000000..8afc75f --- /dev/null +++ b/enrichers/extract_text_entities/manifest.yaml @@ -0,0 +1,9 @@ +id: extract_text_entities +name: "Extract entities from text" +description: "Lee la markdown cacheada de un Webpage y extrae IoCs (IPs, emails, dominios, hashes, crypto wallets, CVEs, MAC, telefonos) creando entidades + relacion EXTRACTED_FROM. Sin coste — solo regex. Modelos ML (GLiNER/GLiREL) en futura iteracion." +applies_to: [Webpage] +emits: [Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone] +relations: [EXTRACTED_FROM] +params: + - { name: types, type: string, default: "" } + - { name: max_entities, type: int, default: 200 } diff --git a/enrichers/extract_text_entities/run.py b/enrichers/extract_text_entities/run.py new file mode 100755 index 0000000..1d7290d --- /dev/null +++ b/enrichers/extract_text_entities/run.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +"""Enricher extract_text_entities — issue 0028b. + +Lee la markdown cacheada de un Webpage (metadata.markdown_path) y corre el +pipeline puro `extract_iocs` (regex puro, sin coste, sin modelos ML). + +Para cada IoC encontrado: + - Crea o reusa la entidad por (type, name). + - Crea relacion EXTRACTED_FROM desde la entidad nueva al Webpage origen. + +Tipos soportados (mapeo IoC -> type_ref del registry): + email -> Email + ip_address -> IPAddress + domain -> Domain + file_hash -> FileHash + crypto_wallet -> CryptoWallet + cve_id -> CVE + mac_address -> MACAddress + phone_number -> Phone + +Futura iteracion: añadir GLiNER/GLiREL para Person/Org/Location etc. +""" +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import time +from datetime import datetime, timezone + + +_TYPE_MAP = { + "email": ("Email", "address"), + "ip_address": ("IPAddress", "address"), + "domain": ("Domain", "name"), + "file_hash": ("FileHash", "value"), + "crypto_wallet": ("CryptoWallet", "address"), + "cve_id": ("CVE", "id"), + "mac_address": ("MACAddress", "address"), + "phone_number": ("Phone", "number"), +} + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def main() -> int: + ctx = json.loads(sys.stdin.read()) + node_id = ctx.get("node_id") or "" + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: metadata = json.loads(metadata) + except Exception: metadata = {} + ops_db = ctx.get("ops_db_path") or "" + app_dir = ctx.get("app_dir") or "" + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + + types_csv = (params.get("types") or "").strip() + types_list = [t.strip() for t in types_csv.split(",") if t.strip()] if types_csv else None + max_entities = int(params.get("max_entities", 200)) + + if not node_id or not ops_db: + log("missing node_id / ops_db_path") + return 2 + + md_path = metadata.get("markdown_path") or "" + if not md_path: + log("nodo sin markdown_path — corre fetch_webpage primero") + print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.", + "entities_added": 0, "relations_added": 0})) + return 3 + + abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path) + if not os.path.exists(abs_md): + log(f"markdown not found at {abs_md}") + print(json.dumps({"error": f"markdown not found: {abs_md}", + "entities_added": 0, "relations_added": 0})) + return 4 + + progress(0.10, "reading") + text = open(abs_md, "r", encoding="utf-8", errors="replace").read() + + progress(0.30, "extracting iocs") + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + from cybersecurity.extract_iocs import extract_iocs # type: ignore + + iocs = extract_iocs(text, types_list) + + # Dedup por (type, value). + seen = set() + unique = [] + for it in iocs: + t = it.get("type") + v = it.get("value") or it.get("address") or it.get("name") or "" + if not t or not v: + continue + key = (t, v) + if key in seen: + continue + seen.add(key) + unique.append(it) + if len(unique) >= max_entities: + break + + progress(0.55, "writing") + conn = sqlite3.connect(ops_db) + entities_added = 0 + relations_added = 0 + new_by_type: dict[str, int] = {} + try: + n = len(unique) + for i, it in enumerate(unique): + ioc_type = it.get("type") + value = it.get("value") or it.get("address") or it.get("name") or "" + if not value: + continue + type_ref, value_field = _TYPE_MAP.get(ioc_type, (ioc_type or "Text", "value")) + + existed = conn.execute( + "SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1", + (type_ref, value), + ).fetchone() + if existed: + target_id = existed[0] + else: + target_id = f"{type_ref}_{now_ms()}_{i}" + ts = now_iso() + meta = {value_field: value} + if "start" in it: meta["text_offset"] = it["start"] + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) " + "VALUES (?, ?, ?, 'enricher:extract_text_entities', ?, ?, ?)", + (target_id, value, type_ref, json.dumps(meta), ts, ts), + ) + entities_added += 1 + new_by_type[type_ref] = new_by_type.get(type_ref, 0) + 1 + + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='EXTRACTED_FROM' LIMIT 1", + (target_id, node_id), + ).fetchone() + if not rel_exists: + ts = now_iso() + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)", + (f"rel_{now_ms()}_{i}_extracted", target_id, node_id, ts, ts), + ) + relations_added += 1 + + if i % 20 == 0 and n > 0: + progress(0.55 + 0.40 * (i / n), "writing") + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "iocs_found": len(unique), + "by_type": new_by_type, + "entities_added": entities_added, + "relations_added": relations_added, + })) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/fetch_webpage/manifest.yaml b/enrichers/fetch_webpage/manifest.yaml new file mode 100644 index 0000000..d2a7535 --- /dev/null +++ b/enrichers/fetch_webpage/manifest.yaml @@ -0,0 +1,8 @@ +id: fetch_webpage +name: "Fetch web page" +description: "Descarga HTML de una URL, extrae markdown limpio (readabilipy) y guarda los blobs en cache. Crea/actualiza el nodo Webpage con title/status_code/paths y crea el Domain con relacion BELONGS_TO." +applies_to: [Url, Webpage] +emits: [Domain] +relations: [BELONGS_TO] +params: + - { name: timeout_s, type: int, default: 15 } diff --git a/enrichers/fetch_webpage/run.py b/enrichers/fetch_webpage/run.py new file mode 100755 index 0000000..b23d7d3 --- /dev/null +++ b/enrichers/fetch_webpage/run.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +"""Enricher fetch_webpage — issue 0028. + +Lee JSON de stdin, descarga la URL del nodo, convierte HTML a markdown, +guarda blobs en `//.{html,md}`, actualiza el +nodo a tipo Webpage con metadata enriquecida y crea/conecta el Domain. + +Wire protocol (issue 0026): + - stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir, + registry_root, params. + - stderr: lineas `PROGRESS: ` para feedback de UI. + - stdout: una linea JSON al final con resumen `{entities_added, ...}`. + - exit code 0 = ok, !=0 = error (stderr capturado se muestra en panel). +""" +from __future__ import annotations + +import hashlib +import json +import os +import re +import sqlite3 +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from urllib.parse import urlparse + + +def progress(p: float, stage: str = "") -> None: + """Emite linea PROGRESS al stderr para que C++ actualice la UI.""" + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def load_registry_funcs(registry_root: str): + """Anade el registry al sys.path e importa funciones que usamos.""" + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + from cybersecurity.cybersecurity import normalize_url # type: ignore + from core.html_to_markdown import html_to_markdown # type: ignore + return normalize_url, html_to_markdown + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def fetch_with_requests(url: str, timeout: int): + """Descarga la URL y retorna (status_code, content_type, html, encoding). + + Usa `requests` si esta disponible, fallback a urllib. + """ + try: + import requests # type: ignore + headers = { + "User-Agent": ( + "Mozilla/5.0 (graph_explorer/0.1; " + "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/graph_explorer) " + "Chrome/120 Safari/537.36" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + } + r = requests.get(url, timeout=timeout, headers=headers, allow_redirects=True) + ct = r.headers.get("Content-Type", "text/html") + # `requests` decodifica por encoding HTTP/charset; si falla cae a apparent. + return r.status_code, ct, r.text, r.encoding or "utf-8" + except ImportError: + from urllib.request import Request, urlopen + req = Request(url, headers={"User-Agent": "graph_explorer/0.1"}) + with urlopen(req, timeout=timeout) as resp: # type: ignore + data = resp.read() + ct = resp.headers.get("Content-Type", "text/html") + enc = "utf-8" + m = re.search(r"charset=([\w-]+)", ct, re.I) + if m: + enc = m.group(1).lower() + return resp.status, ct, data.decode(enc, errors="replace"), enc + + +_TITLE_RE = re.compile(r"]*>(.*?)", re.I | re.S) + + +def extract_title(html: str) -> str: + m = _TITLE_RE.search(html) + if not m: + return "" + title = re.sub(r"\s+", " ", m.group(1)).strip() + if len(title) > 300: + title = title[:300] + "…" + return title + + +def domain_of(url: str) -> str: + try: + host = urlparse(url).hostname or "" + return host.lower() + except Exception: + return "" + + +def cache_paths(cache_dir: str, key: str) -> tuple[Path, Path]: + """Devuelve (html_path, md_path) y crea el dir intermedio.""" + sub = key[:2] + base = Path(cache_dir) / sub + base.mkdir(parents=True, exist_ok=True) + return base / f"{key}.html", base / f"{key}.md" + + +def merge_metadata_json(existing: str, patch: dict) -> str: + """Fusiona patch sobre el JSON existente (string) y devuelve nuevo string.""" + try: + cur = json.loads(existing) if existing else {} + if not isinstance(cur, dict): + cur = {} + except Exception: + cur = {} + cur.update(patch) + return json.dumps(cur, ensure_ascii=False) + + +def upsert_domain(conn: sqlite3.Connection, name: str) -> str: + """Crea o reusa entidad Domain por nombre. Retorna su id.""" + cur = conn.execute( + "SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1", + (name,), + ) + row = cur.fetchone() + if row: + return row[0] + new_id = f"Domain_{now_ms()}" + ts = now_iso() + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) " + "VALUES (?, ?, 'Domain', 'enricher:fetch_webpage', ?, ?)", + (new_id, name, ts, ts), + ) + return new_id + + +def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: + cur = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name=? LIMIT 1", + (from_id, to_id, name), + ) + return cur.fetchone() is not None + + +def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: + if relation_exists(conn, from_id, to_id, name): + return False + ts = now_iso() + rel_id = f"rel_{now_ms()}_{name.lower()}" + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (rel_id, name, from_id, to_id, ts, ts), + ) + return True + + +def main() -> int: + raw = sys.stdin.read() + try: + ctx = json.loads(raw) + except Exception as e: + log(f"stdin not valid JSON: {e}") + return 2 + + node_id = ctx.get("node_id") or "" + node_type = ctx.get("node_type") or "" + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + ops_db_path = ctx.get("ops_db_path") or "" + cache_dir = ctx.get("cache_dir") or "" + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + timeout_s = int(params.get("timeout_s", 15)) + + if not node_id or not ops_db_path: + log("missing node_id / ops_db_path") + return 2 + + # URL puede estar en `url` (Url/Webpage) o `address` (Url legacy). + raw_url = (metadata.get("url") or metadata.get("address") or "").strip() + if not raw_url: + # Fallback: si el nodo no tiene url en metadata, mira el name. + raw_url = (ctx.get("node_name") or "").strip() + if not raw_url: + log("nodo sin url en metadata ni name") + return 2 + + progress(0.05, "normalize") + try: + normalize_url, html_to_markdown = load_registry_funcs(registry_root) + except Exception as e: + log(f"registry imports failed: {e}") + return 3 + + try: + url = normalize_url(raw_url) + except Exception: + url = raw_url + if not url.startswith(("http://", "https://")): + url = "https://" + url + + progress(0.20, "fetching") + try: + status, content_type, html, _enc = fetch_with_requests(url, timeout=timeout_s) + except Exception as e: + log(f"fetch failed: {e}") + # Marcamos node con status=-1 para evidencia. + conn = sqlite3.connect(ops_db_path) + try: + cur = conn.execute("SELECT metadata FROM entities WHERE id=?", (node_id,)) + row = cur.fetchone() + existing_meta = row[0] if row and row[0] else "{}" + patch = {"url": url, "fetched_at": now_iso(), "status_code": -1} + new_meta = merge_metadata_json(existing_meta, patch) + conn.execute( + "UPDATE entities SET metadata=?, updated_at=? WHERE id=?", + (new_meta, now_iso(), node_id), + ) + conn.commit() + finally: + conn.close() + print(json.dumps({"error": str(e), "url": url, "entities_added": 0, + "relations_added": 0})) + return 4 + + progress(0.55, "parsing") + try: + markdown = html_to_markdown(html) + except Exception as e: + log(f"html_to_markdown failed (will save raw): {e}") + markdown = "" + + title = extract_title(html) + text_length = len(markdown) if markdown else len(html) + + progress(0.80, "writing") + key = hashlib.sha256(url.encode("utf-8")).hexdigest() + html_path, md_path = cache_paths(cache_dir, key) + try: + html_path.write_text(html, encoding="utf-8", errors="replace") + if markdown: + md_path.write_text(markdown, encoding="utf-8") + except Exception as e: + log(f"cache write failed: {e}") + return 5 + + # Paths en metadata se guardan relativos al app_dir para portabilidad. + rel_html = os.path.relpath(html_path, ctx.get("app_dir") or cache_dir) + rel_md = os.path.relpath(md_path, ctx.get("app_dir") or cache_dir) + + progress(0.92, "applying") + conn = sqlite3.connect(ops_db_path) + conn.execute("PRAGMA foreign_keys=OFF") + entities_added = 0 + relations_added = 0 + node_updated = False + try: + # 1. Update del nodo: convertir Url -> Webpage si aplica + parche meta. + cur = conn.execute( + "SELECT type_ref, metadata FROM entities WHERE id=?", (node_id,) + ) + row = cur.fetchone() + if not row: + log(f"node {node_id} disappeared") + return 6 + cur_type, cur_meta = row[0], row[1] or "{}" + new_type = "Webpage" if cur_type.lower() == "url" else cur_type or "Webpage" + + patch = { + "url": url, + "title": title, + "status_code": status, + "content_type": content_type, + "fetched_at": now_iso(), + "html_path": rel_html, + "markdown_path": rel_md if markdown else "", + "text_length": text_length, + } + new_meta = merge_metadata_json(cur_meta, patch) + conn.execute( + "UPDATE entities SET type_ref=?, metadata=?, updated_at=? WHERE id=?", + (new_type, new_meta, now_iso(), node_id), + ) + node_updated = True + + # 2. Crear/conectar Domain. + dname = domain_of(url) + if dname: + existed_before = conn.execute( + "SELECT 1 FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1", + (dname,), + ).fetchone() is not None + domain_id = upsert_domain(conn, dname) + if not existed_before: + entities_added += 1 + if insert_relation(conn, node_id, domain_id, "BELONGS_TO"): + relations_added += 1 + + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "url": url, + "status_code": status, + "title": title, + "text_length": text_length, + "html_path": rel_html, + "markdown_path": rel_md if markdown else "", + "entities_added": entities_added, + "relations_added": relations_added, + "node_updated": node_updated, + }, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + sys.exit(main())