merge: issue/0026-jobs-system — sistema de jobs + 4 enrichers web

Cubre issues 0026 (jobs), 0027 (Webpage type), 0028 (fetch_webpage MVP),
0028b (extract_domain/extract_links/extract_text_entities). Issues 0029
(CDP variants) y 0030 (Deep enrich macro) quedan documentadas para
proxima iteracion.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-01 18:24:59 +02:00
39 changed files with 3538 additions and 1 deletions
+13
View File
@@ -0,0 +1,13 @@
# SQLite app DB (jobs + layouts) — local de cada PC
graph_explorer.db
graph_explorer.db-shm
graph_explorer.db-wal
# Cache de documentos descargados por enrichers (issue 0027)
cache/
# Build artifacts
build/
*.exe
*.o
*.obj
+7
View File
@@ -18,11 +18,14 @@ add_imgui_app(graph_explorer
main.cpp
data.cpp
views.cpp
views_jobs.cpp
types_registry.cpp
layout_store.cpp
entity_ops.cpp
project_manager.cpp
tableview.cpp
jobs.cpp
enrichers.cpp
# --- viz ---
${FN_CPP_ROOT_DIR}/functions/viz/graph_renderer.cpp
${FN_CPP_ROOT_DIR}/functions/viz/graph_force_layout.cpp
@@ -58,6 +61,10 @@ target_include_directories(graph_explorer PRIVATE
target_link_libraries(graph_explorer PRIVATE SQLite::SQLite3 DuckDB::DuckDB)
duckdb_copy_runtime(graph_explorer)
# Threads — issue 0026 (jobs system) usa std::thread + std::mutex + condvar.
find_package(Threads REQUIRED)
target_link_libraries(graph_explorer PRIVATE Threads::Threads)
# OpenGL: graph_renderer + graph_force_layout_gpu llaman gl* directamente.
# fn::run_app inicializa el loader cuando AppConfig::init_gl_loader = true.
if(NOT WIN32)
+172
View File
@@ -0,0 +1,172 @@
#include "enrichers.h"
#include <algorithm>
#include <cctype>
#include <cstdio>
#include <cstring>
#include <dirent.h>
#include <fstream>
#include <sstream>
#include <sys/stat.h>
namespace ge {
namespace {
std::vector<EnricherSpec> g_enrichers;
std::string strip(const std::string& s) {
size_t a = 0, b = s.size();
while (a < b && std::isspace((unsigned char)s[a])) ++a;
while (b > a && std::isspace((unsigned char)s[b - 1])) --b;
return s.substr(a, b - a);
}
std::string strip_quotes(const std::string& s) {
if (s.size() >= 2) {
if ((s.front() == '"' && s.back() == '"') ||
(s.front() == '\'' && s.back() == '\'')) {
return s.substr(1, s.size() - 2);
}
}
return s;
}
std::string lower(std::string s) {
for (auto& c : s) c = (char)std::tolower((unsigned char)c);
return s;
}
// Parsea una lista inline `[a, b, c]` o "[Webpage, Url]". Tolerante a
// espacios y a comillas simples/dobles dentro. NO soporta listas
// multi-linea — el manifest las usa siempre inline.
std::vector<std::string> parse_inline_list(const std::string& v) {
std::vector<std::string> out;
std::string s = strip(v);
if (s.size() < 2 || s.front() != '[' || s.back() != ']') return out;
s = s.substr(1, s.size() - 2);
std::string token;
auto flush = [&]() {
std::string t = strip_quotes(strip(token));
if (!t.empty()) out.push_back(std::move(t));
token.clear();
};
for (char c : s) {
if (c == ',') flush();
else token.push_back(c);
}
flush();
return out;
}
// Manifest YAML soportado (subset):
// id: fetch_webpage
// name: "Fetch web page"
// description: "..."
// applies_to: [Webpage, Url]
// params: <- v1 ignora bloque
// - { name: timeout_s, ... }
//
// Las claves anidadas bajo `params:` se ignoran (saltamos lineas indentadas).
bool parse_manifest(const std::string& path, EnricherSpec* out) {
std::ifstream f(path);
if (!f) return false;
std::string line;
bool in_skip_block = false;
while (std::getline(f, line)) {
// Strip CR de Windows.
if (!line.empty() && line.back() == '\r') line.pop_back();
// Linea blanca o comentario.
std::string trim = strip(line);
if (trim.empty() || trim.front() == '#') continue;
// Si la linea NO empieza con whitespace, salimos del bloque skip.
bool indented = !line.empty() && std::isspace((unsigned char)line.front());
if (!indented) in_skip_block = false;
if (in_skip_block) continue;
size_t colon = trim.find(':');
if (colon == std::string::npos) continue;
std::string key = strip(trim.substr(0, colon));
std::string val = strip(trim.substr(colon + 1));
if (key == "id") out->id = strip_quotes(val);
else if (key == "name") out->name = strip_quotes(val);
else if (key == "description") out->description = strip_quotes(val);
else if (key == "applies_to") out->applies_to = parse_inline_list(val);
else if (key == "params" && val.empty()) in_skip_block = true;
// emits/relations los ignoramos en v1 (solo informativos).
}
return !out->id.empty();
}
} // namespace
int enrichers_load(const char* enrichers_dir) {
g_enrichers.clear();
if (!enrichers_dir || !*enrichers_dir) return -1;
DIR* d = opendir(enrichers_dir);
if (!d) return -1;
struct dirent* ent;
while ((ent = readdir(d)) != nullptr) {
if (ent->d_name[0] == '.') continue;
std::string sub = std::string(enrichers_dir) + "/" + ent->d_name;
struct stat st{};
if (stat(sub.c_str(), &st) != 0 || !S_ISDIR(st.st_mode)) continue;
std::string manifest = sub + "/manifest.yaml";
std::string runpy = sub + "/run.py";
if (stat(manifest.c_str(), &st) != 0) continue;
if (stat(runpy.c_str(), &st) != 0) continue;
EnricherSpec spec;
if (!parse_manifest(manifest, &spec)) {
std::fprintf(stderr, "[enrichers] parse failed: %s\n", manifest.c_str());
continue;
}
spec.run_path = runpy;
g_enrichers.push_back(std::move(spec));
}
closedir(d);
std::sort(g_enrichers.begin(), g_enrichers.end(),
[](const EnricherSpec& a, const EnricherSpec& b) {
return a.name < b.name;
});
return (int)g_enrichers.size();
}
const std::vector<EnricherSpec>& enrichers_all() {
return g_enrichers;
}
std::vector<EnricherSpec> enrichers_for_type(const char* type_ref) {
std::vector<EnricherSpec> out;
if (!type_ref || !*type_ref) return out;
std::string want = lower(type_ref);
for (const auto& e : g_enrichers) {
if (e.applies_to.empty()) {
out.push_back(e);
continue;
}
for (const auto& t : e.applies_to) {
if (lower(t) == want) { out.push_back(e); break; }
}
}
return out;
}
const EnricherSpec* enricher_by_id(const char* id) {
if (!id || !*id) return nullptr;
for (const auto& e : g_enrichers) {
if (e.id == id) return &e;
}
return nullptr;
}
} // namespace ge
+40
View File
@@ -0,0 +1,40 @@
#pragma once
#include <string>
#include <vector>
// Registro estatico de enrichers (issue 0026).
//
// Al arrancar la app se escanea `<app_dir>/enrichers/*/manifest.yaml` y se
// rellena el registro. El context menu del viewport consulta
// `enrichers_for_type(type_ref)` para mostrar el submenu filtrado por tipo
// del nodo right-clickado.
//
// Para v1 no parseamos `params` con detalle — solo lo necesario para
// presentar el item de menu y submitear el job con `{}`.
namespace ge {
struct EnricherSpec {
std::string id; // ej: "fetch_webpage"
std::string name; // ej: "Fetch web page"
std::string description;
std::vector<std::string> applies_to; // tipos validos (case-insensitive)
std::string run_path; // path absoluto a run.py
};
// Escanea el directorio. Reentrante (limpia el registro anterior). Devuelve
// el numero de enrichers cargados, -1 si el dir no existe.
int enrichers_load(const char* enrichers_dir);
// Lista todos los enrichers cargados.
const std::vector<EnricherSpec>& enrichers_all();
// Filtra por tipo. Comparacion case-insensitive. Si applies_to es vacio en el
// manifest, el enricher se considera global (aplica a cualquier tipo).
std::vector<EnricherSpec> enrichers_for_type(const char* type_ref);
// Resuelve un enricher por id. Devuelve nullptr si no existe.
const EnricherSpec* enricher_by_id(const char* id);
} // namespace ge
+7
View File
@@ -0,0 +1,7 @@
id: extract_domain
name: "Extract domain"
description: "Saca el dominio de la url/email del nodo y crea/conecta una entidad Domain con relacion BELONGS_TO. No descarga nada."
applies_to: [Url, Webpage, Email]
emits: [Domain]
relations: [BELONGS_TO]
params: []
+125
View File
@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Enricher extract_domain — issue 0028b.
Saca el dominio de un nodo Url/Webpage (campo metadata.url) o Email (campo
metadata.address) y crea/conecta una entidad Domain con relacion BELONGS_TO.
No hace I/O de red.
"""
from __future__ import annotations
import json
import sqlite3
import sys
import time
from datetime import datetime, timezone
from urllib.parse import urlparse
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def domain_from_url(url: str) -> str:
if not url:
return ""
if "://" not in url:
url = "https://" + url
try:
return (urlparse(url).hostname or "").lower()
except Exception:
return ""
def domain_from_email(addr: str) -> str:
if "@" not in addr:
return ""
return addr.split("@", 1)[1].strip().lower()
def main() -> int:
ctx = json.loads(sys.stdin.read())
node_id = ctx.get("node_id") or ""
node_type = (ctx.get("node_type") or "").lower()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db = ctx.get("ops_db_path") or ""
if not node_id or not ops_db:
sys.stderr.write("missing node_id / ops_db_path\n")
return 2
progress(0.30, "extracting")
dname = ""
if node_type == "email":
addr = metadata.get("address") or ctx.get("node_name") or ""
dname = domain_from_email(addr)
else:
url = metadata.get("url") or ctx.get("node_name") or ""
dname = domain_from_url(url)
if not dname:
print(json.dumps({"warning": "no domain extractable",
"entities_added": 0, "relations_added": 0}))
return 0
progress(0.70, "writing")
conn = sqlite3.connect(ops_db)
entities_added = 0
relations_added = 0
try:
existed = conn.execute(
"SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(dname,),
).fetchone()
if existed:
domain_id = existed[0]
else:
domain_id = f"Domain_{now_ms()}"
ts = now_iso()
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) "
"VALUES (?, ?, 'Domain', 'enricher:extract_domain', ?, ?)",
(domain_id, dname, ts, ts),
)
entities_added = 1
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='BELONGS_TO' LIMIT 1",
(node_id, domain_id),
).fetchone()
if not rel_exists:
ts = now_iso()
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, 'BELONGS_TO', ?, ?, ?, ?)",
(f"rel_{now_ms()}_belongs_to", node_id, domain_id, ts, ts),
)
relations_added = 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"domain": dname,
"entities_added": entities_added,
"relations_added": relations_added,
}))
return 0
if __name__ == "__main__":
sys.exit(main())
+8
View File
@@ -0,0 +1,8 @@
id: extract_links
name: "Extract links"
description: "Lee la markdown cacheada de un Webpage (metadata.markdown_path) y crea nodos Url para cada enlace encontrado, conectados con relacion LINKS_TO. Requiere haber ejecutado fetch_webpage antes."
applies_to: [Webpage]
emits: [Url]
relations: [LINKS_TO]
params:
- { name: max_links, type: int, default: 50 }
+139
View File
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""Enricher extract_links — issue 0028b.
Lee la markdown cacheada de un Webpage (metadata.markdown_path), saca todas
las URLs unicas con `extract_urls_py_cybersecurity`, y crea/conecta un nodo
Url por cada URL nueva con relacion LINKS_TO desde el Webpage origen.
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def main() -> int:
ctx = json.loads(sys.stdin.read())
node_id = ctx.get("node_id") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try: metadata = json.loads(metadata)
except Exception: metadata = {}
ops_db = ctx.get("ops_db_path") or ""
app_dir = ctx.get("app_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
max_links = int(params.get("max_links", 50))
if not node_id or not ops_db:
log("missing node_id / ops_db_path")
return 2
md_path = metadata.get("markdown_path") or ""
if not md_path:
log("nodo sin markdown_path — corre fetch_webpage primero")
print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.",
"entities_added": 0, "relations_added": 0}))
return 3
# Path relativo a app_dir.
abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path)
if not os.path.exists(abs_md):
log(f"markdown not found at {abs_md}")
print(json.dumps({"error": f"markdown not found: {abs_md}",
"entities_added": 0, "relations_added": 0}))
return 4
progress(0.20, "reading")
text = open(abs_md, "r", encoding="utf-8", errors="replace").read()
progress(0.45, "extracting")
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.cybersecurity import extract_urls # type: ignore
urls = extract_urls(text)
# Dedup conservando orden.
seen = set()
unique = []
for u in urls:
if u not in seen:
seen.add(u)
unique.append(u)
if max_links > 0:
unique = unique[:max_links]
progress(0.65, "writing")
conn = sqlite3.connect(ops_db)
entities_added = 0
relations_added = 0
try:
for i, u in enumerate(unique):
existed = conn.execute(
"SELECT id FROM entities WHERE type_ref='Url' AND name=? LIMIT 1",
(u,),
).fetchone()
if existed:
target_id = existed[0]
else:
target_id = f"Url_{now_ms()}_{i}"
ts = now_iso()
meta_json = json.dumps({"url": u})
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:extract_links', ?, ?, ?)",
(target_id, u, meta_json, ts, ts),
)
entities_added += 1
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='LINKS_TO' LIMIT 1",
(node_id, target_id),
).fetchone()
if not rel_exists:
ts = now_iso()
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, 'LINKS_TO', ?, ?, ?, ?)",
(f"rel_{now_ms()}_{i}_links_to", node_id, target_id, ts, ts),
)
relations_added += 1
if i % 10 == 0:
progress(0.65 + 0.30 * (i / max(1, len(unique))), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"links_found": len(unique),
"entities_added": entities_added,
"relations_added": relations_added,
}))
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,9 @@
id: extract_text_entities
name: "Extract entities from text"
description: "Lee la markdown cacheada de un Webpage y extrae IoCs (IPs, emails, dominios, hashes, crypto wallets, CVEs, MAC, telefonos) creando entidades + relacion EXTRACTED_FROM. Sin coste — solo regex. Modelos ML (GLiNER/GLiREL) en futura iteracion."
applies_to: [Webpage]
emits: [Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone]
relations: [EXTRACTED_FROM]
params:
- { name: types, type: string, default: "" }
- { name: max_entities, type: int, default: 200 }
+187
View File
@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""Enricher extract_text_entities — issue 0028b.
Lee la markdown cacheada de un Webpage (metadata.markdown_path) y corre el
pipeline puro `extract_iocs` (regex puro, sin coste, sin modelos ML).
Para cada IoC encontrado:
- Crea o reusa la entidad por (type, name).
- Crea relacion EXTRACTED_FROM desde la entidad nueva al Webpage origen.
Tipos soportados (mapeo IoC -> type_ref del registry):
email -> Email
ip_address -> IPAddress
domain -> Domain
file_hash -> FileHash
crypto_wallet -> CryptoWallet
cve_id -> CVE
mac_address -> MACAddress
phone_number -> Phone
Futura iteracion: añadir GLiNER/GLiREL para Person/Org/Location etc.
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
_TYPE_MAP = {
"email": ("Email", "address"),
"ip_address": ("IPAddress", "address"),
"domain": ("Domain", "name"),
"file_hash": ("FileHash", "value"),
"crypto_wallet": ("CryptoWallet", "address"),
"cve_id": ("CVE", "id"),
"mac_address": ("MACAddress", "address"),
"phone_number": ("Phone", "number"),
}
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def main() -> int:
ctx = json.loads(sys.stdin.read())
node_id = ctx.get("node_id") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try: metadata = json.loads(metadata)
except Exception: metadata = {}
ops_db = ctx.get("ops_db_path") or ""
app_dir = ctx.get("app_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
types_csv = (params.get("types") or "").strip()
types_list = [t.strip() for t in types_csv.split(",") if t.strip()] if types_csv else None
max_entities = int(params.get("max_entities", 200))
if not node_id or not ops_db:
log("missing node_id / ops_db_path")
return 2
md_path = metadata.get("markdown_path") or ""
if not md_path:
log("nodo sin markdown_path — corre fetch_webpage primero")
print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.",
"entities_added": 0, "relations_added": 0}))
return 3
abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path)
if not os.path.exists(abs_md):
log(f"markdown not found at {abs_md}")
print(json.dumps({"error": f"markdown not found: {abs_md}",
"entities_added": 0, "relations_added": 0}))
return 4
progress(0.10, "reading")
text = open(abs_md, "r", encoding="utf-8", errors="replace").read()
progress(0.30, "extracting iocs")
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.extract_iocs import extract_iocs # type: ignore
iocs = extract_iocs(text, types_list)
# Dedup por (type, value).
seen = set()
unique = []
for it in iocs:
t = it.get("type")
v = it.get("value") or it.get("address") or it.get("name") or ""
if not t or not v:
continue
key = (t, v)
if key in seen:
continue
seen.add(key)
unique.append(it)
if len(unique) >= max_entities:
break
progress(0.55, "writing")
conn = sqlite3.connect(ops_db)
entities_added = 0
relations_added = 0
new_by_type: dict[str, int] = {}
try:
n = len(unique)
for i, it in enumerate(unique):
ioc_type = it.get("type")
value = it.get("value") or it.get("address") or it.get("name") or ""
if not value:
continue
type_ref, value_field = _TYPE_MAP.get(ioc_type, (ioc_type or "Text", "value"))
existed = conn.execute(
"SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1",
(type_ref, value),
).fetchone()
if existed:
target_id = existed[0]
else:
target_id = f"{type_ref}_{now_ms()}_{i}"
ts = now_iso()
meta = {value_field: value}
if "start" in it: meta["text_offset"] = it["start"]
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) "
"VALUES (?, ?, ?, 'enricher:extract_text_entities', ?, ?, ?)",
(target_id, value, type_ref, json.dumps(meta), ts, ts),
)
entities_added += 1
new_by_type[type_ref] = new_by_type.get(type_ref, 0) + 1
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='EXTRACTED_FROM' LIMIT 1",
(target_id, node_id),
).fetchone()
if not rel_exists:
ts = now_iso()
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)",
(f"rel_{now_ms()}_{i}_extracted", target_id, node_id, ts, ts),
)
relations_added += 1
if i % 20 == 0 and n > 0:
progress(0.55 + 0.40 * (i / n), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"iocs_found": len(unique),
"by_type": new_by_type,
"entities_added": entities_added,
"relations_added": relations_added,
}))
return 0
if __name__ == "__main__":
sys.exit(main())
+8
View File
@@ -0,0 +1,8 @@
id: fetch_webpage
name: "Fetch web page"
description: "Descarga HTML de una URL, extrae markdown limpio (readabilipy) y guarda los blobs en cache. Crea/actualiza el nodo Webpage con title/status_code/paths y crea el Domain con relacion BELONGS_TO."
applies_to: [Url, Webpage]
emits: [Domain]
relations: [BELONGS_TO]
params:
- { name: timeout_s, type: int, default: 15 }
+338
View File
@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""Enricher fetch_webpage — issue 0028.
Lee JSON de stdin, descarga la URL del nodo, convierte HTML a markdown,
guarda blobs en `<cache_dir>/<sha256[0:2]>/<sha256>.{html,md}`, actualiza el
nodo a tipo Webpage con metadata enriquecida y crea/conecta el Domain.
Wire protocol (issue 0026):
- stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir,
registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen `{entities_added, ...}`.
- exit code 0 = ok, !=0 = error (stderr capturado se muestra en panel).
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import sqlite3
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
def progress(p: float, stage: str = "") -> None:
"""Emite linea PROGRESS al stderr para que C++ actualice la UI."""
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def load_registry_funcs(registry_root: str):
"""Anade el registry al sys.path e importa funciones que usamos."""
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.cybersecurity import normalize_url # type: ignore
from core.html_to_markdown import html_to_markdown # type: ignore
return normalize_url, html_to_markdown
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def fetch_with_requests(url: str, timeout: int):
"""Descarga la URL y retorna (status_code, content_type, html, encoding).
Usa `requests` si esta disponible, fallback a urllib.
"""
try:
import requests # type: ignore
headers = {
"User-Agent": (
"Mozilla/5.0 (graph_explorer/0.1; "
"https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/graph_explorer) "
"Chrome/120 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
r = requests.get(url, timeout=timeout, headers=headers, allow_redirects=True)
ct = r.headers.get("Content-Type", "text/html")
# `requests` decodifica por encoding HTTP/charset; si falla cae a apparent.
return r.status_code, ct, r.text, r.encoding or "utf-8"
except ImportError:
from urllib.request import Request, urlopen
req = Request(url, headers={"User-Agent": "graph_explorer/0.1"})
with urlopen(req, timeout=timeout) as resp: # type: ignore
data = resp.read()
ct = resp.headers.get("Content-Type", "text/html")
enc = "utf-8"
m = re.search(r"charset=([\w-]+)", ct, re.I)
if m:
enc = m.group(1).lower()
return resp.status, ct, data.decode(enc, errors="replace"), enc
_TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
def extract_title(html: str) -> str:
m = _TITLE_RE.search(html)
if not m:
return ""
title = re.sub(r"\s+", " ", m.group(1)).strip()
if len(title) > 300:
title = title[:300] + ""
return title
def domain_of(url: str) -> str:
try:
host = urlparse(url).hostname or ""
return host.lower()
except Exception:
return ""
def cache_paths(cache_dir: str, key: str) -> tuple[Path, Path]:
"""Devuelve (html_path, md_path) y crea el dir intermedio."""
sub = key[:2]
base = Path(cache_dir) / sub
base.mkdir(parents=True, exist_ok=True)
return base / f"{key}.html", base / f"{key}.md"
def merge_metadata_json(existing: str, patch: dict) -> str:
"""Fusiona patch sobre el JSON existente (string) y devuelve nuevo string."""
try:
cur = json.loads(existing) if existing else {}
if not isinstance(cur, dict):
cur = {}
except Exception:
cur = {}
cur.update(patch)
return json.dumps(cur, ensure_ascii=False)
def upsert_domain(conn: sqlite3.Connection, name: str) -> str:
"""Crea o reusa entidad Domain por nombre. Retorna su id."""
cur = conn.execute(
"SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(name,),
)
row = cur.fetchone()
if row:
return row[0]
new_id = f"Domain_{now_ms()}"
ts = now_iso()
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) "
"VALUES (?, ?, 'Domain', 'enricher:fetch_webpage', ?, ?)",
(new_id, name, ts, ts),
)
return new_id
def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool:
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name=? LIMIT 1",
(from_id, to_id, name),
)
return cur.fetchone() is not None
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool:
if relation_exists(conn, from_id, to_id, name):
return False
ts = now_iso()
rel_id = f"rel_{now_ms()}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_type = ctx.get("node_type") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
cache_dir = ctx.get("cache_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
timeout_s = int(params.get("timeout_s", 15))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# URL puede estar en `url` (Url/Webpage) o `address` (Url legacy).
raw_url = (metadata.get("url") or metadata.get("address") or "").strip()
if not raw_url:
# Fallback: si el nodo no tiene url en metadata, mira el name.
raw_url = (ctx.get("node_name") or "").strip()
if not raw_url:
log("nodo sin url en metadata ni name")
return 2
progress(0.05, "normalize")
try:
normalize_url, html_to_markdown = load_registry_funcs(registry_root)
except Exception as e:
log(f"registry imports failed: {e}")
return 3
try:
url = normalize_url(raw_url)
except Exception:
url = raw_url
if not url.startswith(("http://", "https://")):
url = "https://" + url
progress(0.20, "fetching")
try:
status, content_type, html, _enc = fetch_with_requests(url, timeout=timeout_s)
except Exception as e:
log(f"fetch failed: {e}")
# Marcamos node con status=-1 para evidencia.
conn = sqlite3.connect(ops_db_path)
try:
cur = conn.execute("SELECT metadata FROM entities WHERE id=?", (node_id,))
row = cur.fetchone()
existing_meta = row[0] if row and row[0] else "{}"
patch = {"url": url, "fetched_at": now_iso(), "status_code": -1}
new_meta = merge_metadata_json(existing_meta, patch)
conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(new_meta, now_iso(), node_id),
)
conn.commit()
finally:
conn.close()
print(json.dumps({"error": str(e), "url": url, "entities_added": 0,
"relations_added": 0}))
return 4
progress(0.55, "parsing")
try:
markdown = html_to_markdown(html)
except Exception as e:
log(f"html_to_markdown failed (will save raw): {e}")
markdown = ""
title = extract_title(html)
text_length = len(markdown) if markdown else len(html)
progress(0.80, "writing")
key = hashlib.sha256(url.encode("utf-8")).hexdigest()
html_path, md_path = cache_paths(cache_dir, key)
try:
html_path.write_text(html, encoding="utf-8", errors="replace")
if markdown:
md_path.write_text(markdown, encoding="utf-8")
except Exception as e:
log(f"cache write failed: {e}")
return 5
# Paths en metadata se guardan relativos al app_dir para portabilidad.
rel_html = os.path.relpath(html_path, ctx.get("app_dir") or cache_dir)
rel_md = os.path.relpath(md_path, ctx.get("app_dir") or cache_dir)
progress(0.92, "applying")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
node_updated = False
try:
# 1. Update del nodo: convertir Url -> Webpage si aplica + parche meta.
cur = conn.execute(
"SELECT type_ref, metadata FROM entities WHERE id=?", (node_id,)
)
row = cur.fetchone()
if not row:
log(f"node {node_id} disappeared")
return 6
cur_type, cur_meta = row[0], row[1] or "{}"
new_type = "Webpage" if cur_type.lower() == "url" else cur_type or "Webpage"
patch = {
"url": url,
"title": title,
"status_code": status,
"content_type": content_type,
"fetched_at": now_iso(),
"html_path": rel_html,
"markdown_path": rel_md if markdown else "",
"text_length": text_length,
}
new_meta = merge_metadata_json(cur_meta, patch)
conn.execute(
"UPDATE entities SET type_ref=?, metadata=?, updated_at=? WHERE id=?",
(new_type, new_meta, now_iso(), node_id),
)
node_updated = True
# 2. Crear/conectar Domain.
dname = domain_of(url)
if dname:
existed_before = conn.execute(
"SELECT 1 FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(dname,),
).fetchone() is not None
domain_id = upsert_domain(conn, dname)
if not existed_before:
entities_added += 1
if insert_relation(conn, node_id, domain_id, "BELONGS_TO"):
relations_added += 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"url": url,
"status_code": status,
"title": title,
"text_length": text_length,
"html_path": rel_html,
"markdown_path": rel_md if markdown else "",
"entities_added": entities_added,
"relations_added": relations_added,
"node_updated": node_updated,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())
+19
View File
@@ -110,6 +110,25 @@ entities:
- { name: title, type: string }
- { name: domain, type: string }
# Documento web descargado. Issue 0027: tipo separado de Url para nodos
# con cuerpo cacheado (HTML+markdown+screenshot). Los enrichers
# fetch_webpage / extract_links / extract_text_entities lo pueblan.
- name: Webpage
color: "#89E0FC"
icon: ti-file-text
principal_field: url
fields:
- { name: url, type: url, required: true }
- { name: title, type: string }
- { name: status_code, type: int }
- { name: content_type, type: string }
- { name: fetched_at, type: date }
- { name: html_path, type: string }
- { name: markdown_path, type: string }
- { name: screenshot_path, type: string }
- { name: text_length, type: int }
- { name: lang, type: string }
# Nodo tabla — cuadrado (regla de forma). Issue 0010: contenedor con
# filas que son nodos del grafo.
- name: Table
+54
View File
@@ -0,0 +1,54 @@
---
id: 0012
title: Endpoint HTTP local de ingesta y consulta
status: pending
priority: medium
created: 2026-05-01
---
## Objetivo
Exponer un servidor HTTP local en `graph_explorer` (o como `ingest_server`
hermano) que sea el punto de entrada unico para todo flujo externo:
extension de navegador, CLI `gx`, watcher de portapapeles, bots, OCR, etc.
Sin este endpoint cada cliente externo tendria que abrir `operations.db`
directamente — colisiona con el lock del proceso vivo y duplica logica de
extraccion.
## Endpoints minimos
- `POST /entity` — crea entidad. Body: `{type, name, metadata}`.
- `POST /relation` — crea relacion. Body: `{from_id, to_id, kind, metadata}`.
- `POST /ingest/text` — texto libre -> `extract_graph_hybrid` -> preview o auto-commit.
- `POST /ingest/url` — URL -> fetch + extract -> preview o auto-commit.
- `POST /ingest/file` — multipart upload (PDF/CSV/JSON/.eml/imagen) -> router por mime -> extract.
- `GET /search?q=` — fuzzy / FTS sobre entidades.
- `GET /entity/:id`, `GET /entity/:id/neighbors`.
## Decisiones
- Bind a `127.0.0.1` por defecto, puerto fijo (ej. 7878) o aleatorio
escrito a `~/.fn_graph_port`.
- Auth: token compartido en `~/.fn_graph_token` (header `X-Token`).
Generado al primer arranque.
- Modo "preview": las rutas de ingesta aceptan `?commit=false` y
devuelven entities/relations propuestas para que el cliente las muestre
antes de persistir. Cuando es `true`, escribe directo.
- Implementacion: httplib o Mongoose embebido en C++ (sin nuevas deps
pesadas). Alternativa: lanzar el servidor en Go/Python aparte si la
integracion C++ se complica.
## Bloquea
Issues 0014, 0017, 0018, 0019, 0020 dependen de este.
## Definicion de hecho
- `curl -H "X-Token: ..." -d '{"type":"person","name":"X"}' localhost:7878/entity`
crea entidad.
- `POST /ingest/text` con texto en castellano devuelve entities/relations
detectadas por el pipeline hibrido.
- El endpoint corre en background mientras la UI sigue interactiva.
- Si `graph_explorer` no esta abierto, un binario `ingest_server`
standalone ofrece el mismo API contra la misma `operations.db`.
+45
View File
@@ -0,0 +1,45 @@
---
id: 0013
title: Panel "Paste & Extract" — texto libre a entidades con extract_graph_hybrid
status: pending
priority: high
created: 2026-05-01
---
## Objetivo
Panel dockeable dentro de `graph_explorer` con un textarea grande. Pegas
texto (articulo, mensaje, transcripcion, documento), pulsas Extract, corre
el pipeline `extract_graph_hybrid` (regex + GLiNER + GLiREL + LLM fallback)
y muestra preview de entidades y relaciones detectadas. El usuario marca
cuales aceptar antes de commit a `operations.db`.
Es el quick-win de mas alto valor: aprovecha el pipeline ya mergeado
(commit 1a353878) y elimina la friccion de tipear datos a mano.
## Alcance
- Panel "Extract" con textarea, combo de proyecto/tipos esperados, boton
"Extract".
- Lanza el pipeline en hilo aparte (es Python — invocar via subprocess
o el endpoint HTTP de 0012 con `commit=false`).
- Tabla de entidades propuestas: checkbox, type, name, source span. Tabla
de relaciones propuestas: from, kind, to, checkbox.
- Edicion inline de tipo/nombre antes de commit.
- "Apply selected" -> escribe a operations.db, refresca grafo, posiciona
los nuevos nodos cerca del centro o vinculados al ultimo seleccionado.
- Dedupe: si una entidad propuesta ya existe (mismo type+name) reusar el
id en lugar de duplicar.
## Decisiones
- Invocacion del pipeline: via 0012 si esta disponible, o subprocess
directo como fallback (para que el panel funcione sin levantar HTTP).
- Resaltado de spans en el textarea (v2 — primera version solo lista).
## Definicion de hecho
- Pego un parrafo en castellano sobre una empresa y un directivo, pulso
Extract, veo entidades correctas tipadas y la relacion entre ambas.
- Apply crea los nodos en el grafo en menos de 1 s tras click.
- Re-extraer el mismo texto no duplica entidades (dedupe funciona).
+49
View File
@@ -0,0 +1,49 @@
---
id: 0014
title: Extension de navegador "Add to graph"
status: pending
priority: high
created: 2026-05-01
depends_on: [0012]
---
## Objetivo
Extension Firefox/Chrome que añade items al grafo desde el navegador con
un click. Cubre el flujo Maltego "estoy leyendo algo en web -> nodo en
mi grafo" sin abandonar el navegador.
## Casos
- Click derecho sobre seleccion de texto -> "Add to graph" (manda texto
via `/ingest/text`).
- Click derecho sobre link -> "Add link" (crea entidad URL + metadata
del href, opcionalmente trigger fetch).
- Boton de toolbar -> "Add this page" (URL + titulo + meta description +
texto principal extraido con Readability).
- Modo "select & relate": dos selecciones consecutivas -> crea relacion
entre las entidades resultantes.
## Alcance
- WebExtension API (compatible Firefox/Chrome, Manifest v3).
- Settings: URL del endpoint (default `http://localhost:7878`), token,
proyecto destino.
- Preview popup tras extraccion: muestra entities propuestas, el usuario
acepta o edita antes de commit (reusa `?commit=false` de 0012).
- Atajo configurable (ej. `Ctrl+Shift+G`) para "add page".
## Decisiones
- Sin auth OAuth — token local compartido es suficiente para localhost.
- Empaquetar en `apps/graph_explorer/extension/` o como sub-repo propio
bajo `dataforge/graph_explorer_extension`.
- Si `graph_explorer` no esta corriendo: la extension muestra error
claro y guarda la accion en cola para reintentar.
## Definicion de hecho
- Selecciono un parrafo en una pagina, click derecho -> Add, en menos de
2 s veo los nodos en `graph_explorer`.
- Funciona en Firefox y Chrome con la misma build.
- Reintento automatico de la cola cuando vuelve a haber endpoint vivo.
+47
View File
@@ -0,0 +1,47 @@
---
id: 0015
title: Drag & drop de archivos sobre el viewport para ingesta
status: pending
priority: medium
created: 2026-05-01
---
## Objetivo
Soltar archivos sobre la ventana de `graph_explorer` lanza el extractor
adecuado segun extension y mete las entidades en el grafo, sin abrir
modales ni navegar menus.
## Tipos soportados
- `.pdf` -> texto + `extract_graph_hybrid`.
- `.eml` / `.msg` -> headers (from/to/cc) como entidades persona/email +
cuerpo via extract.
- `.csv` / `.parquet` -> ingesta como tabla DuckDB (encadena con 0011).
- `.json` / `.jsonl` -> si tiene shape entity/relation, importar; si no,
extract sobre stringify.
- `.png` / `.jpg` -> OCR (issue 0019) y luego extract.
- `.txt` / `.md` -> extract directo.
## Alcance
- Hook de drop de ImGui -> dispatcher por mime/extension -> pipeline
correspondiente -> preview con seleccion antes de commit (igual UX que
0013).
- Indicador visual de zona drop activa cuando hay drag sobre la ventana.
- Multiples archivos en un drop: procesar en cola, mostrar progreso.
## Decisiones
- Dispatcher reutiliza `/ingest/file` del endpoint 0012 si esta vivo, o
resuelve localmente como fallback.
- Limite de tamaño por archivo configurable (default 50 MB) para evitar
bloqueos en PDFs gigantes.
## Definicion de hecho
- Suelto un PDF en castellano sobre el canvas, en menos de 30 s veo
preview con entidades correctas.
- Suelto un .eml y aparecen `from`/`to` como nodos persona conectados
por una relacion `mailed`.
- Cancelar durante el preview no toca operations.db.
+44
View File
@@ -0,0 +1,44 @@
---
id: 0016
title: Watcher de portapapeles con deteccion de patrones
status: pending
priority: low
created: 2026-05-01
---
## Objetivo
Servicio (toggle desde la toolbar) que escucha el portapapeles y, cuando
detecta patrones de interes, ofrece añadir como entidad sin abandonar el
flujo en otra app. Pensado para sesiones de OSINT manual donde el coste
de "abrir la app y tipear" rompe el ritmo.
## Patrones detectados
- URL -> entidad URL (con fetch + extract opcional).
- Email, telefono, IBAN, DNI/NIE/CIF, BIC -> entidad tipada con regex.
- Coordenadas (lat,lon), hash (sha1/sha256), wallet crypto (BTC/ETH).
## Alcance
- Polling del clipboard (ImGui `GetClipboardText` + diff) o API nativa
(X11 selection / Win32 clipboard listener).
- Toast / notificacion no intrusiva con boton "Add". El usuario decide
por defecto.
- Modo "auto-add" para tipos seguros (IBAN/DNI raras veces son ruido).
- Lista de patrones configurable en `graph_explorer.db`.
## Decisiones
- Por defecto OFF — opt-in desde settings, para evitar leer todo lo que
el usuario copia.
- Anonimizar logs: nunca persistir el contenido del clipboard si el
usuario no lo añade.
- Deduplicar: copiar la misma cadena dos veces seguidas no notifica.
## Definicion de hecho
- Activo el watcher, copio un IBAN, recibo notificacion, click en Add y
el nodo aparece en el grafo.
- Apagar el watcher detiene la escucha en menos de 1 s.
- Patrones configurados como lista de regex editable desde settings.
+38
View File
@@ -0,0 +1,38 @@
---
id: 0017
title: CLI `gx` para hablar con el endpoint local
status: pending
priority: medium
created: 2026-05-01
depends_on: [0012]
---
## Objetivo
Cliente CLI fino, instalable en `~/.local/bin/gx`, que habla con el
endpoint HTTP local de `graph_explorer` (issue 0012). Permite ingesta y
consulta desde terminal o scripts sin abrir la app.
## Comandos
- `gx add <type> <name> [--metadata k=v ...]` — crea entidad.
- `gx rel <from_id> <kind> <to_id>` — crea relacion.
- `gx ingest <file>` — manda archivo al endpoint, abre preview en TUI.
- `gx from-url <url>` — fetch + extract.
- `gx search "query"` — devuelve hits del grafo activo (json o tabla).
- `gx neighbors <id> [--depth N]`.
- `gx open <id>` — abre el grafo y enfoca el nodo en `graph_explorer`.
## Decisiones
- Implementar como sub-comando del `fn` CLI existente (`fn gx ...`) o
binario aparte? Probablemente sub-comando para reusar config y auth.
- Output JSON por defecto si stdout no es TTY (componible con jq).
- Tabla legible si stdout es TTY.
## Definicion de hecho
- `gx add person "Juan Perez"` añade el nodo en el grafo en vivo.
- `gx ingest articulo.pdf` lanza preview interactivo en terminal y commit.
- `gx neighbors <id> --depth 2 --format json | jq` funciona en pipeline.
- Errores de conexion al endpoint se reportan claros (no stack traces).
@@ -0,0 +1,54 @@
---
id: 0018
title: Transforms automatizadas tipo Maltego con browser headless
status: pending
priority: low
created: 2026-05-01
depends_on: [0012]
---
## Objetivo
Dada una entidad seleccionada, ejecutar un script (Playwright/Puppeteer)
que enriquece el grafo con datos derivados — equivalente a las
"transforms" de Maltego. Es la pieza que diferencia frente a alternativas
mas estaticas.
## Ejemplos para banking/OSINT espanol
- Persona/empresa -> consulta BORME, registro mercantil, axesor.
- Dominio -> whois, DNS records, certificados (crt.sh).
- Email -> haveibeenpwned, hunter.io.
- Telefono -> truecaller-like.
- Empresa -> LinkedIn search publico, opencorporates.
## Alcance
- Cada transform es un pipeline del registry con tag `transform` y un
contrato fijo: input = `{id, type, metadata}`, output = `{entities,
relations}`.
- Registro de transforms aplicables por entity_type.
- UI: context menu sobre nodo -> "Run transform..." -> lista filtrada
por type aplicable -> ejecuta async -> notifica al terminar -> preview
antes de commit.
- Sandbox: cada transform en proceso aparte, timeout configurable.
## Riesgos y mitigaciones
- Los scrapers se rompen cuando los sitios cambian -> mantener una suite
de "transform health checks" automaticos (cron ligero) que avisa de
fallos antes de que el usuario los descubra en vivo.
- Cumplimiento legal y robots.txt -> documentar en cada transform su
fuente, politica y ToS.
- Rate limiting -> cooldown por host configurable.
## Definicion de hecho
- Selecciono un dominio en el grafo, lanzo "whois", aparecen registrant,
registrar y nameservers como nodos vinculados con relaciones tipadas.
- Un transform que falla loguea el error y no afecta a otros que
esten corriendo en paralelo.
- La lista de transforms aplicables a una entidad se computa segun su
type (no se ofrecen los inaplicables).
- Health check cron escribe a `proposals` cuando un transform empieza a
fallar repetidamente.
+39
View File
@@ -0,0 +1,39 @@
---
id: 0019
title: OCR de region de pantalla y archivos imagen
status: pending
priority: low
created: 2026-05-01
depends_on: [0012]
---
## Objetivo
Capturar una region de pantalla (atajo global) o soltar imagen sobre la
app (issue 0015) -> Tesseract / PaddleOCR -> texto -> `extract_graph_hybrid`.
Util cuando la fuente solo esta como captura, PDF escaneado, o pantalla
de un sistema sin copy/paste.
## Alcance
- Captura: usar herramienta del SO (gnome-screenshot, flameshot, snipping
tool) con flag de region. Linux primero, Windows con Snip & Sketch.
- OCR: Tesseract con datos de espanol (`spa.traineddata`). PaddleOCR
como alternativa para texto manuscrito o calidades bajas.
- Pipeline: imagen -> OCR -> texto -> panel preview de 0013.
## Decisiones
- Atajo global configurable (default `Ctrl+Alt+G`).
- Idiomas OCR como lista en settings (default `[spa, eng]`).
- Persistir la imagen original como `metadata.source_image_path` en la
entidad creada para trazabilidad.
## Definicion de hecho
- Atajo abre selector de region, capturo un parrafo en pantalla, en
menos de 5 s veo entidades extraidas.
- Suelto un PNG con texto sobre el canvas, mismo flujo (encadena con 0015).
- Calidad de OCR para espanol > 90% en capturas estandar 1080p de texto
impreso.
+46
View File
@@ -0,0 +1,46 @@
---
id: 0020
title: Ingesta via email forwarding y bot Telegram/Signal
status: pending
priority: low
created: 2026-05-01
depends_on: [0012]
---
## Objetivo
Ingerir entidades sin estar delante del PC. Util para capturar cosas
sobre la marcha (movil, lectura en otra pantalla, conversaciones).
## Canales
- **Email**: direccion dedicada (mailbox o alias) que se chequea via
IMAP cada N minutos. Adjuntos -> ingesta como en 0015. Cuerpo ->
extract.
- **Bot Telegram/Signal**: forwardear un mensaje, una imagen, o escribir
comandos (`/add empresa Acme`, `/relate Acme owns Bravo`). El bot
habla con el endpoint 0012.
## Alcance
- Cliente IMAP minimo o uso de un MTA local (postfix+dovecot) que
redirija a un script.
- Bot Telegram: BotFather + python-telegram-bot o equivalente Go (vive
en `apps/<bot_name>/` con tag `service`).
- Auth: solo procesar mensajes de chat IDs / direcciones whitelisted en
config.
- Confirmacion: el bot responde con preview ("¿añado estas 3 entidades?
responde 'si' o 'no'") antes de commit.
## Decisiones
- Bot self-hosted (no SaaS) — corre como service en VPS o en el PC.
- Multiples grafos: el bot puede targetear distintos `operations.db`
segun el chat de origen (mapping en config).
## Definicion de hecho
- Reenvio un PDF a la direccion dedicada y, en menos de 2 minutos, veo
las entidades en el grafo con notificacion del bot.
- El bot rechaza mensajes de chat IDs no autorizados sin responder.
- Comando `/search Acme` desde el bot devuelve hits del grafo.
+42
View File
@@ -0,0 +1,42 @@
---
id: 0021
title: Command palette Ctrl+K — busqueda y acciones globales
status: pending
priority: high
created: 2026-05-01
---
## Objetivo
Atajo `Ctrl+K` (configurable) abre overlay flotante con input de busqueda
fuzzy global. Lo que mas acelera el dia a dia: cero navegacion por menus
para encontrar un nodo o disparar una accion.
## Alcance
Indexa y matchea sobre:
- Entidades del grafo (por name, type, metadata).
- Acciones de la app ("Toggle inspector", "Save layout", "Run transform",
"Export subgraph", "Switch project", "Open settings").
- Comandos recientes (MRU al tope sin escribir).
Selecciono con flechas + Enter -> ejecuta accion o enfoca nodo en
el viewport.
## Implementacion
- Overlay modal centrado, input de texto + lista virtualizada
(`ImGuiListClipper`).
- Indexador en memoria sobre entidades; refresh al cambiar grafo.
- Fuzzy matcher (fzf-like, p.ej. `fts_fuzzy_match` de Forrest the woods,
o algo equivalente).
- Acciones registrables desde cualquier panel — registro central tipo
`cmd_palette_register("name", lambda)`.
## Definicion de hecho
- Ctrl+K, escribo 3 letras del nombre de un nodo, lo enfoca en el grafo.
- Ctrl+K, "exp", veo accion "Export subgraph as Markdown" disponible.
- Latencia de matching imperceptible con 50k entidades.
- MRU pone arriba lo usado recientemente.
+41
View File
@@ -0,0 +1,41 @@
---
id: 0022
title: Consulta del grafo en lenguaje natural via LLM
status: pending
priority: medium
created: 2026-05-01
depends_on: [0001]
---
## Objetivo
Input de texto ("personas relacionadas con BancoX que aparecen en mas de
3 documentos") -> LLM traduce a SQL sobre `operations.db` o a un set de
filtros sobre el grafo en memoria -> resalta el subgrafo resultante.
Complementa el chat de 0001 con un modo "consulta puntual" sin
conversacion: input -> resultado destacado, sin chat history.
## Alcance
- Tool-use ya disponible en 0001 (`query_entities`, `query_relations`).
- Modo "highlight": en lugar de devolver texto, el LLM emite un set de
ids -> la UI dibuja el subgrafo con resaltado y oscurece el resto.
- Boton "save as filter" -> persiste como vista nombrada (issue 0023).
- Historial de queries recientes en un dropdown.
- Indicador de query en curso (puede tardar varios segundos).
## Decisiones
- ¿Mismo cliente HTTP/Anthropic que 0001 o duplicado? Reusar.
- Modelo por defecto el mismo que 0001 (`claude-sonnet-4-6`).
- Query schema (que campos ve el LLM) dado por `types_registry` para
que aprenda los nombres de campos del proyecto.
## Definicion de hecho
- "personas con mas de 5 conexiones" devuelve subgrafo correcto.
- "documentos publicados en 2025" funciona si la metadata tiene fechas.
- Fallo silencioso (LLM mal interpreta) -> mensaje claro y opcion de
reintentar refinando.
- Query guardada como filtro reutilizable.
+39
View File
@@ -0,0 +1,39 @@
---
id: 0023
title: Vistas guardadas y filtros nombrados
status: pending
priority: medium
created: 2026-05-01
---
## Objetivo
Guardar combinaciones de filtros (tipo, tag, FTS, layout, zoom, nodos
fijados) bajo un nombre y reaplicarlas con un click o atajo.
Util para volver siempre al "mapa de la red de empresa X" o "vista de
emails sospechosos" sin reconfigurar todo cada vez.
## Alcance
- Tabla `saved_views(graph_hash, name, payload_json, hotkey, created_at)`
en `graph_explorer.db`.
- Panel/menu "Views" con lista, atajos asignables (Ctrl+1..9).
- Payload incluye: filtros activos, expanded nodes, viewport rect, layout
mode, theme overrides, nodos pinned.
- Boton "Save current as view..." en toolbar.
- Boton "Update view" cuando una view esta activa y el usuario cambia algo.
## Decisiones
- Las views son por `graph_hash` (no globales) — cada `operations.db`
tiene su set propio.
- Compartir view entre PCs: export/import JSON manual (v2 podria sync via
`fn sync`).
## Definicion de hecho
- Configuro filtros, "Save view as 'Banca'", la veo en el menu.
- Reload de la app -> "Banca" aplica todo lo guardado.
- Un atajo (Ctrl+1..9) salta a la vista correspondiente al instante.
- "Update view" persiste cambios sin crear duplicados.
+45
View File
@@ -0,0 +1,45 @@
---
id: 0024
title: Exportar subgrafo seleccionado a Markdown / Mermaid / CSV / PNG
status: pending
priority: medium
created: 2026-05-01
---
## Objetivo
Seleccion de nodos (rect drag o filtro activo) -> menu "Export as..." con
varios formatos de salida segun el destino.
## Formatos
- **Markdown**: una pagina por entidad con sus campos y links a vecinos.
Encaja con 0025 (sync con vault).
- **Mermaid `graph TD`**: para pegar en notas o issues.
- **CSV**: dos archivos `nodes.csv` + `edges.csv` para Gephi/Cytoscape.
- **PNG / SVG**: render del subgrafo con layout actual.
- **JSON**: shape `{nodes:[], edges:[]}` para reimportar o procesar.
## Alcance
- Menu "Export selected" en context menu del canvas y en menu superior.
- Cada exportador es una funcion del registry reutilizable
(`export_subgraph_md_cpp_viz`, `export_subgraph_mermaid_cpp_viz`, etc).
- Para PNG/SVG: reusar el render actual a un framebuffer offscreen, con
factor de escalado configurable (1x / 2x / 4x).
- Diccionario de plantillas configurable para Markdown (por entity_type).
## Decisiones
- Mermaid copiado al portapapeles automaticamente; otros formatos
abren dialogo de guardado.
- Limite suave a 500 nodos para Mermaid (ilegible mas alla).
## Definicion de hecho
- Selecciono 20 nodos, exporto Markdown -> directorio con 20 .md y
enlaces cruzados validos.
- Exporto Mermaid -> string copiado al portapapeles, valido en
mermaid.live.
- Exporto PNG con layout fijo, calidad 2x, fidelidad pixel a la vista.
- CSV importable directo en Gephi sin transformaciones.
+44
View File
@@ -0,0 +1,44 @@
---
id: 0025
title: Sync bidireccional con vault Obsidian / markdown
status: pending
priority: low
created: 2026-05-01
depends_on: [0024]
---
## Objetivo
Espejar el grafo activo a un vault de markdown (estilo Obsidian) en
`projects/osint_graph/vaults/<name>/`. Cada entidad = una nota; cada
relacion = un wikilink. El usuario puede navegar el grafo desde Obsidian
y editar campos alli; los cambios vuelven al grafo.
Encaja con `vaults/` ya conceptualizados en el registry.
## Alcance
- Watcher de filesystem + serializer/parser de notas con frontmatter
YAML para los campos del entity_type.
- Plantilla por entity_type configurable (apoyandose en el exporter
Markdown de 0024).
- Resolucion de conflictos: timestamp + merge campo a campo; preferencia
configurable (vault wins / db wins / prompt).
- Modo unidireccional inicial (graph -> vault) si la ida y vuelta es
mucho trabajo. v2 anade sync de vuelta.
## Decisiones
- Sync continuo o on-demand (boton "Sync now")? Empezar on-demand. El
watcher se anade en una segunda fase.
- Detectar cambios externos via `mtime` + checksum.
- Wikilinks usan ids del registry, no nombres (estables ante renames).
## Definicion de hecho
- Boton "Sync to vault" genera N notas con frontmatter correcto y
wikilinks navegables en Obsidian.
- Editar un campo en la nota y "Sync from vault" actualiza la entidad
en operations.db.
- No se pierden datos cuando hay edicion concurrente en ambos lados
(resolucion de conflicto explicita).
+209
View File
@@ -0,0 +1,209 @@
---
id: 0026
title: Sistema de jobs — enrichers asincronos en background
status: in_progress
priority: high
created: 2026-05-01
blocks: [0027, 0028, 0029, 0030]
supersedes: [0001, 0002, 0003]
---
## Objetivo
Convertir el menu "Run enricher" (hoy placeholder en main.cpp:485) en un
sistema real de jobs asincronos: el usuario lanza un enricher sobre un
nodo, vuelve a la app y sigue trabajando mientras el enricher procesa
en background. Al terminar, el grafo se recarga automaticamente con las
entidades/relaciones nuevas.
Este issue solo cubre la **infra**. Los enrichers concretos se escriben
en 0028, 0029, 0030.
## Decisiones tomadas
1. **Workers concurrentes**: 2 por defecto, configurable via Settings.
2. **Cache de documentos**: `<app_dir>/cache/<sha256[0:2]>/<sha256>.{html,md,png}`. Carpeta gitignored en el sub-repo.
3. **Webpage vs Url**: tipos separados (issue 0027). Url = link suelto, Webpage = documento descargado con cuerpo.
4. **Subprocess Python por job** (no daemon residente): cold start ~200 ms aceptable. Si molesta, issue futura.
5. **Estado en `graph_explorer.db`** (NO en `operations.db`): jobs son especificos de la app, no del grafo.
## Tabla `jobs` (graph_explorer.db)
```sql
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY, -- ULID
enricher_id TEXT NOT NULL, -- ej: "fetch_webpage"
node_id TEXT, -- nodo objetivo (NULL si batch)
node_name TEXT DEFAULT '', -- snapshot para mostrar en UI
params_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL, -- queued|running|done|error|cancelled
progress REAL NOT NULL DEFAULT 0, -- 0..1
stage TEXT NOT NULL DEFAULT '', -- mensaje corto: "fetching", "extracting"
result_json TEXT, -- {entities_added: N, relations_added: M, ...}
error TEXT,
pid INTEGER, -- subprocess pid para cancelar
created_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, created_at);
```
## Runtime C++
### `jobs.{h,cpp}` (nuevo)
- `JobRunner`: pool de N std::thread workers (default 2).
- Cola en memoria `std::queue<JobId>` + persistencia en BD.
- API publico:
- `bool jobs_init(const char* db_path, int n_workers);`
- `bool jobs_submit(const char* enricher_id, const char* node_id, const char* params_json, char* out_id);`
- `bool jobs_cancel(const char* job_id);`
- `void jobs_shutdown();`
- `int jobs_dirty_counter();` // incrementa al completar un job; render lo lee
- `bool jobs_list(std::vector<JobRow>* out);`
- Worker hace:
1. Pop de cola → mark running, started_at = now, pid = subprocess pid.
2. Spawn `python/.venv/bin/python3 enrichers/<id>/run.py` con stdin = JSON.
3. Lee stderr line-by-line buscando `PROGRESS:<float> <stage>` para actualizar fila.
4. Lee stdout completo al cerrar — JSON final con entities/relations/node_updates.
5. Aplica al `operations.db` desde el worker (entity_insert/relation_insert/entity_update).
6. Marca done o error con result_json/error, finished_at, increment dirty_counter.
- Al arrancar: marca jobs `running` huerfanos como `error: "process died"`.
### `enrichers.{h,cpp}` (nuevo)
- Escanea `enrichers/*/manifest.yaml` al arrancar.
- Estructura `EnricherSpec`:
```
std::string id, name, description;
std::vector<std::string> applies_to; // tipos validos
std::vector<EnricherParam> params;
std::string run_path; // enrichers/<id>/run.py absoluto
```
- API:
- `void enrichers_load(const char* enrichers_dir);`
- `std::vector<EnricherSpec> enrichers_for_type(const char* type_ref);`
## Contrato enricher (wire protocol)
Cada enricher vive en `apps/graph_explorer/enrichers/<id>/`:
```
enrichers/
fetch_webpage/
manifest.yaml
run.py
```
### `manifest.yaml`
```yaml
id: fetch_webpage
name: "Fetch web page"
description: "Descarga HTML, extrae markdown y guarda en cache."
applies_to: [Webpage, Url]
params:
- { name: timeout_s, type: int, default: 15 }
- { name: use_browser, type: bool, default: false }
```
### `run.py` — stdin/stdout/stderr
**stdin** (una linea JSON):
```json
{"node_id":"webpage_123","node_type":"Webpage","node_name":"...",
"metadata":{"url":"https://..."},"params":{"timeout_s":15},
"db_path":"/path/operations.db","cache_dir":"/path/cache",
"registry_root":"/home/lucas/fn_registry"}
```
**stderr** (lineas de progreso, opcional):
```
PROGRESS:0.10 connecting
PROGRESS:0.50 parsing
PROGRESS:0.90 writing
```
**stdout** (una linea JSON al final):
```json
{"node_updates":[
{"id":"webpage_123","metadata_patch":{"title":"...","status_code":200}}
],
"entities":[
{"type":"Domain","name":"example.com","metadata":{}}
],
"relations":[
{"from_id":"webpage_123","to_name":"example.com","to_type":"Domain","kind":"BELONGS_TO"}
],
"notes":""}
```
Resolucion de relaciones:
- Si `to_id` esta presente, se usa directamente.
- Si no, se busca por `(to_name, to_type)` en operations.db; si no existe, se crea primero.
## UI
### Panel "Jobs"
- Nuevo panel dockeable (entry en `g_panels[]`).
- Tabla con columnas: enricher, target node (clicable → centra viewport), status (badge), progress bar, stage, duracion, error tooltip.
- Botones inline por fila: cancelar (running), reintentar (error/cancelled), borrar (terminal).
- Filtro: all | active | done | error.
### Toolbar
- Badge en la toolbar superior con contador de `running + queued`. Click abre el panel Jobs.
### Context menu (main.cpp:485)
Reemplazar el `TextDisabled("coming soon")` por:
```cpp
auto specs = ge::enrichers_for_type(node->type_ref);
if (specs.empty()) {
ImGui::TextDisabled("(no hay enrichers para este tipo)");
} else {
for (const auto& s : specs) {
if (ImGui::MenuItem(s.name.c_str())) {
char job_id[64];
ge::jobs_submit(s.id.c_str(), node->id.c_str(), "{}", job_id);
}
}
}
```
## Fases del bucle reactivo en BD
- **CONSTRUIR**: enricher escrito en `enrichers/<id>/`.
- **EJECUTAR**: subprocess corre, escribe progress en `jobs`.
- **RECOPILAR**: stdout JSON parseado, entities/relations aplicadas a operations.db.
- **ANALIZAR**: jobs.result_json contiene metricas (entities_added, duration_ms).
- **MEJORAR**: si un enricher falla repetidamente, futura issue de health checks.
## Cancelacion
- Boton "Cancel" en panel Jobs:
- Lee `pid` de la fila.
- `kill(pid, SIGTERM)` (Linux/WSL) o `TerminateProcess` (Windows).
- El worker captura el exit code y marca `cancelled`.
- Si el job aun no salio de la cola (status = queued), el worker simplemente no lo coge — al pop chequea status y skipea cancelled.
## Definicion de hecho
- Tabla `jobs` se crea al arrancar la app.
- `JobRunner` con 2 workers acepta `jobs_submit` y procesa.
- Panel Jobs muestra estado en tiempo real (progress bar avanza visiblemente).
- Cancelar mata subprocess y marca `cancelled`.
- Al completar, el grafo se recarga (dirty_counter detectado en render).
- Subir el contador de jobs en la toolbar.
- Test manual: enricher dummy `noop` (incluido en este issue) que duerme 3 s emitiendo PROGRESS y termina sin entidades. Lanzarlo y comprobar UI.
## Trabajo posterior
- 0027: tipo Webpage + cache.
- 0028: primer enricher real (`fetch_webpage`) end-to-end.
- 0028b: enrichers extract_domain, extract_links, extract_text_entities.
- 0029: enrichers via CDP (browser headless).
- 0030: macro "Deep enrich" + expand_domain.
+82
View File
@@ -0,0 +1,82 @@
---
id: 0027
title: Tipo Webpage + cache de documentos descargados
status: pending
priority: high
created: 2026-05-01
depends_on: [0026]
blocks: [0028, 0029, 0030]
---
## Objetivo
Anadir un tipo `Webpage` al `examples/types.yaml` y un layout
estandarizado de cache donde los enrichers guardan HTML, markdown y
screenshots descargados. El tipo `Url` existente queda como link suelto;
`Webpage` es un documento descargado con cuerpo.
## Cambios en `examples/types.yaml`
Anadir tras el bloque `Url`:
```yaml
- name: Webpage
color: "#89E0FC"
icon: ti-file-text
principal_field: url
fields:
- { name: url, type: url, required: true }
- { name: title, type: string }
- { name: status_code, type: int }
- { name: content_type, type: string }
- { name: fetched_at, type: date }
- { name: html_path, type: string } # cache/<sha256[0:2]>/<sha256>.html
- { name: markdown_path, type: string } # cache/<sha256[0:2]>/<sha256>.md
- { name: screenshot_path,type: string } # cache/<sha256[0:2]>/<sha256>.png
- { name: text_length, type: int }
- { name: lang, type: string }
```
## Layout del cache
```
<app_dir>/cache/
ab/
abcd1234...ef.html
abcd1234...ef.md
abcd1234...ef.png
cd/
cdef5678...01.html
...
```
- `sha256[0:2]` para evitar miles de archivos en un solo dir.
- Path absoluto desde `<app_dir>` para que sea portable entre PCs (paths relativos en metadata).
- `cache/` se anade al `.gitignore` del sub-repo.
## Helper C++
Funcion en `data.{h,cpp}` (o nuevo `cache_paths.{h,cpp}`):
```cpp
namespace ge {
// Resuelve el path absoluto donde un enricher debe escribir el blob.
// Crea el dir si no existe. ext sin punto: "html", "md", "png".
// hash_input: tipicamente la URL canonica (normalizada).
std::string cache_path(const char* app_dir,
const char* hash_input,
const char* ext);
}
```
- SHA256 calculado en C++ (usar implementacion existente en cpp/functions/core/ si la hay; si no, vendor/sqlite3 trae uno o se anade simple).
- O: el enricher Python calcula el sha256 (mas simple) y lo devuelve como parte de `node_updates`. Decidido: Python calcula el sha256, C++ solo expone `app_dir/cache/` como path absoluto al enricher.
## Definicion de hecho
- `Webpage` aparece en types.yaml con icono `ti-file-text`.
- El icono se renderiza correctamente (existe en tabler_codepoint_by_name).
- `cache/` esta en `.gitignore` del sub-repo del app.
- C++ pasa `cache_dir` al enricher en el JSON de stdin.
- Test manual: crear nodo `Webpage` desde el inspector, comprobar que
aparece con el color/icono correctos.
+78
View File
@@ -0,0 +1,78 @@
---
id: 0028
title: Enricher fetch_webpage (MVP end-to-end)
status: pending
priority: high
created: 2026-05-01
depends_on: [0026, 0027]
---
## Objetivo
Primer enricher real sobre el sistema de jobs (0026). Right-click sobre
un nodo `Url` o `Webpage` → "Fetch web page". Descarga el HTML, lo
convierte a markdown, guarda los blobs en cache, actualiza el nodo
(o lo convierte a Webpage si era Url) y crea el nodo `Domain` con
relacion `BELONGS_TO`.
Este enricher valida el contrato entero. Los siguientes (0028b) reusan
exactamente el mismo wire protocol.
## Archivos
```
apps/graph_explorer/enrichers/fetch_webpage/
manifest.yaml
run.py
```
## `manifest.yaml`
```yaml
id: fetch_webpage
name: "Fetch web page"
description: "Descarga HTML, extrae markdown limpio y guarda en cache."
applies_to: [Webpage, Url]
emits: [Domain]
relations: [BELONGS_TO]
params:
- { name: timeout_s, type: int, default: 15 }
```
## `run.py`
Logica:
1. Lee JSON de stdin.
2. Saca `url` de `metadata.url` (o `metadata.address` si es Url legacy).
3. `PROGRESS:0.05 normalize``normalize_url_py_cybersecurity`.
4. `PROGRESS:0.20 fetching` — descarga via `requests.get(url, timeout=N)`.
5. `PROGRESS:0.60 parsing``html_to_markdown_py_core` con readabilipy.
6. `PROGRESS:0.85 writing` — calcula sha256(url), escribe `cache/<sha[0:2]>/<sha>.html` y `.md`.
7. Emite stdout JSON:
- `node_updates`: cambia type a Webpage si era Url, anade title/status_code/content_type/fetched_at/html_path/markdown_path/text_length.
- `entities`: `{type: Domain, name: <dominio>, metadata: {}}`.
- `relations`: `from_id: <node_id>, to_name: <dominio>, to_type: Domain, kind: BELONGS_TO`.
## Funciones del registry usadas
- `normalize_url_py_cybersecurity` — limpia tracking params.
- `html_to_markdown_py_core` — readabilipy + markdownify.
- `extract_domain` se hace inline en el enricher (regex trivial sobre la URL parseada).
## Manejo de errores
- HTTP error (4xx/5xx) → escribe status_code en metadata pero NO marca el job como error (el nodo guarda evidencia del fallo).
- Timeout / DNS error / etc → exit con error JSON en stdout: `{"error": "...", "node_updates": [], "entities": [], "relations": []}`.
- Si el enricher levanta excepcion, sale con codigo != 0 y stderr capturado va a `jobs.error`.
## Definicion de hecho
- Crear nodo Url con `https://example.com` → click derecho → "Fetch web page".
- En segundos aparece en panel Jobs como `running` con progress.
- Al terminar:
- El nodo cambia a tipo `Webpage` con icono `ti-file-text`.
- El inspector muestra title, status_code, html_path, markdown_path.
- Aparece nodo `Domain` "example.com" conectado por `BELONGS_TO`.
- El archivo `cache/<sha>.md` existe en disco.
- El job aparece en panel Jobs como `done` con `entities_added=1, relations_added=1`.
- Tirar la red (sin internet) → el job acaba en `error` con mensaje claro.
+72
View File
@@ -0,0 +1,72 @@
---
id: 0028b
title: Enrichers extract_domain, extract_links, extract_text_entities
status: pending
priority: high
created: 2026-05-01
depends_on: [0028]
---
## Objetivo
Tres enrichers Python adicionales que reusan el contrato validado por
`fetch_webpage`. Cada uno cubre un eje de extraccion distinto.
## 1. `extract_domain`
```
applies_to: [Url, Webpage, Email]
emits: [Domain]
relations: [BELONGS_TO]
```
- Saca el dominio de `metadata.url` o `metadata.address`.
- Crea nodo `Domain` si no existe + relacion `BELONGS_TO`.
- Util cuando el usuario tiene un Url/Email que aun no ha sido fetched
pero quiere ver el dominio en el grafo.
## 2. `extract_links`
```
applies_to: [Webpage]
emits: [Url]
relations: [LINKS_TO]
```
- Lee `metadata.markdown_path`. Si vacio → exit con error "run fetch_webpage first".
- `extract_urls_py_cybersecurity` sobre el contenido.
- Para cada URL distinta encontrada:
- Crea nodo `Url` con `metadata.url` (si no existe).
- Relacion `LINKS_TO` desde la Webpage origen.
- Param: `max_links` (default 50) para no saturar el grafo.
## 3. `extract_text_entities`
```
applies_to: [Webpage]
emits: [Person, Org, Email, Phone, Domain, Location, IPAddress, CVE, ...]
relations: [EXTRACTED_FROM, ...relaciones que GLiREL detecte]
```
- Lee `metadata.markdown_path`.
- Llama `extract_graph_hybrid_py_pipelines` (regex IoCs + GLiNER + GLiREL + LLM fallback).
- Para cada entidad detectada:
- Resuelve por `(name, type)` en operations.db. Si no existe la crea.
- Relacion `EXTRACTED_FROM` desde la entidad nueva al nodo Webpage.
- Para cada relacion detectada por GLiREL:
- Relacion entre las dos entidades con el `kind` predicho.
- Params:
- `chunk_size` (default 2000)
- `use_llm_fallback` (default false — evitar coste; el usuario lo activa en jobs concretos)
## Definicion de hecho
- Los tres enrichers aparecen en el menu "Run enricher" segun el tipo
del nodo right-clickado.
- En un nodo Webpage el menu muestra los 3 + fetch_webpage.
- Test integracion:
- Crear Url → fetch_webpage → run extract_links sobre el resultado
→ run extract_text_entities → grafo se llena con persons/orgs/etc.
- Cada paso es un job independiente visible en panel Jobs.
- `extract_text_entities` con LLM off termina sin coste y produce
entidades de IoC + entidades GLiNER (gratis).
+63
View File
@@ -0,0 +1,63 @@
---
id: 0029
title: Enrichers via Chrome headless (CDP) — fetch_webpage_browser, fetch_screenshot
status: pending
priority: medium
created: 2026-05-01
depends_on: [0028]
---
## Objetivo
Variantes de los enrichers basicos que usan Chrome headless via CDP,
para sitios con contenido renderizado por JavaScript (SPA, paginas con
auth visual, etc.) o cuando se quiere capturar evidencia visual.
## 1. `fetch_webpage_browser`
```
applies_to: [Url, Webpage]
emits: [Domain]
relations: [BELONGS_TO]
params:
- { name: chrome_port, type: int, default: 9222 }
- { name: wait_after_load_ms, type: int, default: 1500 }
```
- Usa funciones del registry:
- `chrome_launch_go_browser` — lanza Chrome en port (reusa si ya esta).
- `cdp_connect_go_browser`
- `cdp_navigate_go_browser`
- `cdp_wait_load_go_browser`
- `cdp_get_html_go_browser` — DOM post-JS.
- El run.py shell-out a un binario Go pequeno o llama estas funciones via
un wrapper Python que invoca el Go function como subprocess.
- Decision pendiente: empaquetar las funciones Go en un binario CLI
`cdp-fetcher` que el run.py invoque, o reescribir la logica en Python
con `pychrome` / `playwright`. Preferencia: binario Go para reusar las
funciones del registry.
## 2. `fetch_screenshot`
```
applies_to: [Webpage, Url]
params:
- { name: full_page, type: bool, default: true }
```
- `cdp_screenshot_go_browser` → guarda `cache/<sha>.png`.
- `node_updates`: anade `screenshot_path` a metadata del Webpage.
- No emite entidades nuevas.
## Definicion de hecho
- `fetch_webpage_browser` extrae correctamente DOM de una SPA (test:
twitter.com, linkedin.com publico).
- `fetch_screenshot` produce PNG legible en el cache.
- Inspector del nodo Webpage muestra una preview del screenshot
cuando `screenshot_path` existe (mejora UI opcional).
## Out of scope
- Login flows / auth via CDP — fuera de v1.
- Adblock / fingerprint evasion — el user-agent default es suficiente.
+62
View File
@@ -0,0 +1,62 @@
---
id: 0030
title: Macro "Deep enrich" + enricher expand_domain
status: pending
priority: medium
created: 2026-05-01
depends_on: [0028, 0028b]
---
## Objetivo
Encadenar varios enrichers con un solo click. Cubre dos flujos:
1. **Deep enrich Webpage**: sobre un nodo Webpage, ejecuta en orden
`fetch_webpage` (si no fetched aun) → `extract_domain``extract_links`
`extract_text_entities`. Cuatro jobs separados, en cadena.
2. **Expand domain**: sobre un nodo Domain, fetch homepage + 1 nivel de
links + extraccion de entidades sobre cada pagina. Util para "dame
todo lo que sepas de este dominio en un click".
## Implementacion
### Macro Deep enrich (no es un enricher Python — es UI + orquestacion en C++)
- Boton/menu item "Deep enrich" en el context menu del nodo Webpage.
- Encolar 4 jobs con dependencias: cada job tiene `depends_on_job_id`.
- Worker pool respeta dependencias: si el job tiene depends_on y el
predecesor no esta `done`, lo deja en cola.
- Anadir columna a tabla `jobs`: `depends_on_job_id TEXT`.
### Enricher `expand_domain`
```
applies_to: [Domain]
params:
- { name: max_pages, type: int, default: 5 }
- { name: deep, type: bool, default: false } # si true, deep enrich cada pagina
```
- run.py:
1. Fetch `https://<domain>/` y `http://<domain>/` (probando ambos esquemas).
2. Crea Webpage homepage + relacion `HOMEPAGE_OF` desde Domain.
3. Si `deep`, encola un job `extract_text_entities` por pagina via
un endpoint local de control (out of scope v1) o emite un campo
especial `chained_jobs: [...]` que el worker C++ encola.
4. Decision: v1 solo crea las paginas. La cadena con extract_*
se puede hacer manualmente desde la UI o esperar a un sistema
de chained jobs decente.
## Definicion de hecho
- Click derecho en Webpage → "Deep enrich" → 4 jobs en cadena visibles
en panel Jobs. Al terminar el ultimo, el grafo tiene domain + links +
persons/orgs/etc.
- Click derecho en Domain → "Expand domain" → Webpage homepage aparece
conectada al Domain.
- Cancelar el job intermedio cancela en cascada los que dependen.
## Out of scope v1
- Cron / repeat schedule de enrichers.
- Progress agregado de la cadena (cada job mantiene su progress propio).
+862
View File
@@ -0,0 +1,862 @@
#include "jobs.h"
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdarg>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <errno.h>
#include <fcntl.h>
#include <mutex>
#include <queue>
#include <signal.h>
#include <sstream>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <thread>
#include <unistd.h>
#include <unordered_map>
#include <vector>
namespace ge {
// ----------------------------------------------------------------------------
// Internal state
// ----------------------------------------------------------------------------
namespace {
struct JobControl {
pid_t pid = -1;
std::atomic<bool> cancel_requested{false};
};
struct State {
std::string app_db_path;
std::string ops_db_path; // mutable: cambia con jobs_set_ops_db
std::string enrichers_dir;
std::string app_dir;
std::string registry_root;
std::mutex q_mu;
std::condition_variable q_cv;
std::queue<std::string> pending; // job ids
std::unordered_map<std::string, std::shared_ptr<JobControl>> running;
std::vector<std::thread> workers;
std::atomic<bool> stop_flag{false};
std::atomic<int> dirty{0};
};
State* g_state = nullptr;
// ---- helpers --------------------------------------------------------------
long long now_ms() {
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
std::string ulid() {
// ULID-ish: timestamp ms + 10 random hex chars. Suficiente para un PK.
long long ts = now_ms();
static std::atomic<uint32_t> ctr{(uint32_t)(ts & 0xFFFFFFFF)};
uint32_t rnd = ctr.fetch_add(1, std::memory_order_relaxed);
char buf[64];
std::snprintf(buf, sizeof(buf), "j_%013lld_%08x", ts, rnd);
return buf;
}
bool sql_exec_simple(sqlite3* db, const char* sql) {
char* err = nullptr;
int rc = sqlite3_exec(db, sql, nullptr, nullptr, &err);
if (rc != SQLITE_OK) {
std::fprintf(stderr, "[jobs] sql error: %s\n sql: %s\n",
err ? err : "?", sql);
if (err) sqlite3_free(err);
return false;
}
return true;
}
bool sql_run(sqlite3* db, const char* sql,
const std::vector<std::string>& params)
{
sqlite3_stmt* st = nullptr;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) {
std::fprintf(stderr, "[jobs] prepare failed: %s :: %s\n",
sqlite3_errmsg(db), sql);
return false;
}
for (size_t i = 0; i < params.size(); ++i) {
sqlite3_bind_text(st, (int)(i + 1), params[i].c_str(), -1,
SQLITE_TRANSIENT);
}
int rc = sqlite3_step(st);
sqlite3_finalize(st);
return rc == SQLITE_DONE;
}
bool ensure_table(const char* db_path) {
sqlite3* db = nullptr;
if (sqlite3_open_v2(db_path, &db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sql_exec_simple(db, "PRAGMA journal_mode=WAL;");
bool ok = sql_exec_simple(db,
"CREATE TABLE IF NOT EXISTS jobs ("
" id TEXT PRIMARY KEY,"
" enricher_id TEXT NOT NULL,"
" node_id TEXT,"
" node_name TEXT NOT NULL DEFAULT '',"
" params_json TEXT NOT NULL DEFAULT '{}',"
" status TEXT NOT NULL,"
" progress REAL NOT NULL DEFAULT 0,"
" stage TEXT NOT NULL DEFAULT '',"
" result_json TEXT,"
" error TEXT,"
" pid INTEGER,"
" created_at INTEGER NOT NULL,"
" started_at INTEGER,"
" finished_at INTEGER"
");"
);
sql_exec_simple(db,
"CREATE INDEX IF NOT EXISTS idx_jobs_status "
"ON jobs(status, created_at);");
// Reaper: jobs `running` huerfanos de una sesion anterior.
char ts[32];
std::snprintf(ts, sizeof(ts), "%lld", now_ms());
sql_run(db,
"UPDATE jobs SET status='error', error='process died (app restart)', "
"finished_at=? WHERE status='running'",
{ts});
sqlite3_close(db);
return ok;
}
// Escapa string para JSON. Simplificado: maneja comillas, backslash y
// caracteres de control basicos.
std::string json_escape(const std::string& s) {
std::string out;
out.reserve(s.size() + 8);
for (char c : s) {
switch (c) {
case '"': out += "\\\""; break;
case '\\': out += "\\\\"; break;
case '\b': out += "\\b"; break;
case '\f': out += "\\f"; break;
case '\n': out += "\\n"; break;
case '\r': out += "\\r"; break;
case '\t': out += "\\t"; break;
default:
if ((unsigned char)c < 0x20) {
char buf[8];
std::snprintf(buf, sizeof(buf), "\\u%04x", (unsigned char)c);
out += buf;
} else {
out += c;
}
}
}
return out;
}
// Lee un campo de la entidad como string. Devuelve "" si no existe.
std::string read_entity_field(const char* db_path, const char* id,
const char* col)
{
sqlite3* db = nullptr;
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr)
!= SQLITE_OK) {
if (db) sqlite3_close(db);
return "";
}
std::string sql = std::string("SELECT ") + col +
" FROM entities WHERE id = ? LIMIT 1";
sqlite3_stmt* st = nullptr;
std::string out;
if (sqlite3_prepare_v2(db, sql.c_str(), -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text(st, 1, id, -1, SQLITE_TRANSIENT);
if (sqlite3_step(st) == SQLITE_ROW) {
const unsigned char* t = sqlite3_column_text(st, 0);
if (t) out = (const char*)t;
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return out;
}
// Construye el JSON que se entrega al subprocess via stdin. Lee node de la
// operations.db actual.
std::string build_stdin_json(const std::string& job_id,
const std::string& enricher_id,
const std::string& node_id,
const std::string& params_json,
const std::string& ops_db,
const std::string& app_dir,
const std::string& registry_root)
{
std::string node_type, node_name, node_metadata = "{}";
if (!node_id.empty()) {
node_type = read_entity_field(ops_db.c_str(), node_id.c_str(), "type_ref");
node_name = read_entity_field(ops_db.c_str(), node_id.c_str(), "name");
std::string m = read_entity_field(ops_db.c_str(), node_id.c_str(), "metadata");
if (!m.empty()) node_metadata = m;
}
std::string cache_dir = app_dir + "/cache";
std::ostringstream o;
o << '{'
<< "\"job_id\":\"" << json_escape(job_id) << "\","
<< "\"enricher_id\":\""<< json_escape(enricher_id) << "\","
<< "\"node_id\":\"" << json_escape(node_id) << "\","
<< "\"node_type\":\"" << json_escape(node_type) << "\","
<< "\"node_name\":\"" << json_escape(node_name) << "\","
<< "\"metadata\":" << (node_metadata.empty() ? "{}" : node_metadata) << ","
<< "\"params\":" << (params_json.empty() ? "{}" : params_json) << ","
<< "\"ops_db_path\":\""<< json_escape(ops_db) << "\","
<< "\"app_dir\":\"" << json_escape(app_dir) << "\","
<< "\"cache_dir\":\"" << json_escape(cache_dir) << "\","
<< "\"registry_root\":\"" << json_escape(registry_root) << "\""
<< '}';
return o.str();
}
// ---- subprocess (POSIX) ---------------------------------------------------
struct ProcResult {
int exit_code = -1;
bool signaled = false;
int signal = 0;
std::string stdout_buf;
std::string stderr_tail; // ultimas lineas, para mensajes de error
};
void update_progress(const std::string& job_id, double prog,
const std::string& stage)
{
if (!g_state) return;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return;
}
sqlite3_stmt* st = nullptr;
const char* sql = "UPDATE jobs SET progress=?, stage=? WHERE id=?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_double(st, 1, prog);
sqlite3_bind_text (st, 2, stage.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
sqlite3_close(db);
}
// Spawnea python3 run.py. Pipes para stdin (write), stdout (read),
// stderr (read). Lee stdout entero al final; lee stderr line-by-line en un
// thread auxiliar parseando "PROGRESS:<float> <stage>".
ProcResult run_subprocess(const std::string& job_id,
const std::string& run_path,
const std::string& stdin_payload,
std::shared_ptr<JobControl> ctrl)
{
ProcResult out;
int p_in[2] = {-1, -1}; // padre escribe en p_in[1], hijo lee p_in[0]
int p_out[2] = {-1, -1};
int p_err[2] = {-1, -1};
if (pipe(p_in) != 0 || pipe(p_out) != 0 || pipe(p_err) != 0) {
out.stderr_tail = "pipe() failed";
return out;
}
pid_t pid = fork();
if (pid < 0) {
out.stderr_tail = "fork() failed";
for (int fd : {p_in[0], p_in[1], p_out[0], p_out[1], p_err[0], p_err[1]}) {
if (fd >= 0) close(fd);
}
return out;
}
if (pid == 0) {
// child
dup2(p_in[0], 0);
dup2(p_out[1], 1);
dup2(p_err[1], 2);
close(p_in[0]); close(p_in[1]);
close(p_out[0]); close(p_out[1]);
close(p_err[0]); close(p_err[1]);
// Resolver intérprete: <registry_root>/python/.venv/bin/python3
std::string py = g_state->registry_root + "/python/.venv/bin/python3";
const char* argv[] = { py.c_str(), run_path.c_str(), nullptr };
execv(py.c_str(), (char* const*)argv);
std::fprintf(stderr, "execv failed: %s\n", py.c_str());
_exit(127);
}
// parent
ctrl->pid = pid;
close(p_in[0]);
close(p_out[1]);
close(p_err[1]);
// Persistir pid en BD para mostrarlo en UI.
{
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) == SQLITE_OK) {
sqlite3_stmt* st = nullptr;
if (sqlite3_prepare_v2(db, "UPDATE jobs SET pid=? WHERE id=?", -1,
&st, nullptr) == SQLITE_OK) {
sqlite3_bind_int (st, 1, (int)pid);
sqlite3_bind_text(st, 2, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
sqlite3_close(db);
}
}
// Escribir stdin entero.
if (!stdin_payload.empty()) {
ssize_t written = 0;
const char* p = stdin_payload.c_str();
size_t left = stdin_payload.size();
while (left > 0) {
ssize_t n = write(p_in[1], p + written, left);
if (n < 0) { if (errno == EINTR) continue; break; }
written += n; left -= (size_t)n;
}
}
close(p_in[1]);
// Thread aux para stderr: parsea PROGRESS y guarda tail.
std::string stderr_tail_local;
std::mutex tail_mu;
std::thread err_t([&]() {
std::string line;
char ch;
while (true) {
ssize_t n = read(p_err[0], &ch, 1);
if (n <= 0) break;
if (ch == '\n') {
// Parse line.
if (line.rfind("PROGRESS:", 0) == 0) {
// PROGRESS:<float> <stage...>
const char* p = line.c_str() + 9;
char* endp = nullptr;
double prog = std::strtod(p, &endp);
std::string stage;
if (endp && *endp) {
while (*endp == ' ') ++endp;
stage = endp;
}
update_progress(job_id, prog, stage);
} else {
std::lock_guard<std::mutex> g(tail_mu);
stderr_tail_local += line;
stderr_tail_local += '\n';
// Cap a ~4 KB.
if (stderr_tail_local.size() > 4096) {
stderr_tail_local.erase(0, stderr_tail_local.size() - 4096);
}
}
line.clear();
} else {
line.push_back(ch);
if (line.size() > 4096) line.clear(); // proteccion
}
}
});
// Leer stdout entero (sincrono).
{
char buf[4096];
while (true) {
ssize_t n = read(p_out[0], buf, sizeof(buf));
if (n <= 0) break;
out.stdout_buf.append(buf, (size_t)n);
if (out.stdout_buf.size() > 1024 * 1024) {
// 1 MB cap.
break;
}
}
}
close(p_out[0]);
// Esperar al hijo. Si se pidio cancelar, mandamos SIGTERM y SIGKILL.
int status = 0;
while (true) {
if (ctrl->cancel_requested.load() && pid > 0) {
kill(pid, SIGTERM);
// pequena gracia, luego SIGKILL si hace falta
for (int i = 0; i < 5; ++i) {
pid_t r = waitpid(pid, &status, WNOHANG);
if (r == pid) goto reaped;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
kill(pid, SIGKILL);
}
pid_t r = waitpid(pid, &status, 0);
if (r == pid) break;
if (r < 0 && errno == EINTR) continue;
break;
}
reaped:
err_t.join();
close(p_err[0]);
if (WIFEXITED(status)) {
out.exit_code = WEXITSTATUS(status);
} else if (WIFSIGNALED(status)) {
out.signaled = true;
out.signal = WTERMSIG(status);
out.exit_code = -1;
}
{
std::lock_guard<std::mutex> g(tail_mu);
out.stderr_tail = std::move(stderr_tail_local);
}
return out;
}
// ---- worker ---------------------------------------------------------------
void persist_status(const std::string& job_id, const std::string& status,
const std::string& result_json,
const std::string& error,
bool set_finished)
{
if (!g_state) return;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return;
}
if (set_finished) {
sqlite3_stmt* st = nullptr;
const char* sql =
"UPDATE jobs SET status=?, result_json=?, error=?, finished_at=? "
"WHERE id=?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 2, result_json.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 3, error.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int64 (st, 4, now_ms());
sqlite3_bind_text (st, 5, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
} else {
sqlite3_stmt* st = nullptr;
const char* sql = "UPDATE jobs SET status=?, started_at=? WHERE id=?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text (st, 1, status.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(st, 2, now_ms());
sqlite3_bind_text (st, 3, job_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_step(st);
}
sqlite3_finalize(st);
}
sqlite3_close(db);
}
// Lee la fila para reconstruir el contexto del job antes de spawn.
struct JobContext {
std::string id, enricher_id, node_id, node_name, params_json, status;
};
bool load_job(const std::string& id, JobContext* out) {
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, params_json, status "
"FROM jobs WHERE id=?";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text(st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
if (sqlite3_step(st) == SQLITE_ROW) {
auto col = [&](int i) {
const unsigned char* t = sqlite3_column_text(st, i);
return std::string(t ? (const char*)t : "");
};
out->id = col(0);
out->enricher_id = col(1);
out->node_id = col(2);
out->node_name = col(3);
out->params_json = col(4);
out->status = col(5);
ok = true;
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return ok;
}
void worker_loop() {
while (!g_state->stop_flag.load()) {
std::string job_id;
{
std::unique_lock<std::mutex> lk(g_state->q_mu);
g_state->q_cv.wait(lk, [] {
return g_state->stop_flag.load() || !g_state->pending.empty();
});
if (g_state->stop_flag.load()) return;
job_id = std::move(g_state->pending.front());
g_state->pending.pop();
}
JobContext ctx;
if (!load_job(job_id, &ctx)) continue;
if (ctx.status == "cancelled") continue;
// Resolver run.py por convencion: <enrichers_dir>/<enricher_id>/run.py.
std::string run_path = g_state->enrichers_dir + "/" + ctx.enricher_id +
"/run.py";
// Marcar running.
persist_status(job_id, "running", "", "", false);
auto ctrl = std::make_shared<JobControl>();
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->running[job_id] = ctrl;
}
// Construir stdin y ejecutar.
std::string ops_db;
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
ops_db = g_state->ops_db_path;
}
std::string stdin_payload = build_stdin_json(
ctx.id, ctx.enricher_id, ctx.node_id, ctx.params_json,
ops_db, g_state->app_dir, g_state->registry_root);
ProcResult res = run_subprocess(job_id, run_path, stdin_payload, ctrl);
// Estado final.
std::string final_status, error;
std::string result_json = res.stdout_buf;
// Trim del result_json (saca trailing whitespace).
while (!result_json.empty() &&
(result_json.back() == '\n' || result_json.back() == '\r' ||
result_json.back() == ' ' || result_json.back() == '\t')) {
result_json.pop_back();
}
if (ctrl->cancel_requested.load()) {
final_status = "cancelled";
error = "user cancelled";
} else if (res.exit_code == 0) {
final_status = "done";
if (result_json.empty()) result_json = "{}";
} else {
final_status = "error";
char buf[64];
if (res.signaled) {
std::snprintf(buf, sizeof(buf), "signal %d", res.signal);
} else {
std::snprintf(buf, sizeof(buf), "exit %d", res.exit_code);
}
error = std::string(buf);
if (!res.stderr_tail.empty()) {
error += "\n";
error += res.stderr_tail;
}
}
persist_status(job_id, final_status, result_json, error, true);
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->running.erase(job_id);
}
if (final_status == "done") {
g_state->dirty.fetch_add(1, std::memory_order_relaxed);
}
}
}
} // namespace
// ----------------------------------------------------------------------------
// Public API
// ----------------------------------------------------------------------------
bool jobs_init(const char* app_db_path,
const char* ops_db_path,
const char* enrichers_dir,
const char* app_dir,
const char* registry_root,
int n_workers)
{
if (g_state) return true;
if (!app_db_path || !*app_db_path) return false;
if (n_workers < 1) n_workers = 1;
if (n_workers > 8) n_workers = 8;
if (!ensure_table(app_db_path)) return false;
g_state = new State();
g_state->app_db_path = app_db_path;
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
g_state->enrichers_dir = enrichers_dir ? enrichers_dir : "";
g_state->app_dir = app_dir ? app_dir : "";
g_state->registry_root = registry_root ? registry_root : "";
// Rehidratacion: jobs queued de sesiones anteriores se reencolan.
{
sqlite3* db = nullptr;
if (sqlite3_open_v2(app_db_path, &db, SQLITE_OPEN_READONLY,
nullptr) == SQLITE_OK) {
sqlite3_stmt* st = nullptr;
const char* sql =
"SELECT id FROM jobs WHERE status='queued' ORDER BY created_at";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
while (sqlite3_step(st) == SQLITE_ROW) {
const unsigned char* t = sqlite3_column_text(st, 0);
if (t) g_state->pending.push((const char*)t);
}
}
sqlite3_finalize(st);
sqlite3_close(db);
}
}
for (int i = 0; i < n_workers; ++i) {
g_state->workers.emplace_back(worker_loop);
}
return true;
}
void jobs_set_ops_db(const char* ops_db_path) {
if (!g_state) return;
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->ops_db_path = ops_db_path ? ops_db_path : "";
}
bool jobs_submit(const char* enricher_id,
const char* node_id,
const char* node_name,
const char* params_json,
char* out_id, size_t out_id_n)
{
if (!g_state || !enricher_id || !*enricher_id) return false;
if (!out_id || out_id_n < 32) return false;
std::string id = ulid();
std::snprintf(out_id, out_id_n, "%s", id.c_str());
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"INSERT INTO jobs (id, enricher_id, node_id, node_name, params_json, "
"status, progress, stage, created_at) "
"VALUES (?, ?, ?, ?, ?, 'queued', 0, '', ?)";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text (st, 1, id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 2, enricher_id, -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 3, node_id ? node_id : "", -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 4, node_name ? node_name : "", -1, SQLITE_TRANSIENT);
sqlite3_bind_text (st, 5, params_json ? params_json : "{}", -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(st, 6, now_ms());
ok = sqlite3_step(st) == SQLITE_DONE;
}
sqlite3_finalize(st);
sqlite3_close(db);
if (!ok) return false;
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
g_state->pending.push(id);
}
g_state->q_cv.notify_one();
return true;
}
bool jobs_cancel(const char* job_id) {
if (!g_state || !job_id) return false;
std::shared_ptr<JobControl> ctrl;
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
auto it = g_state->running.find(job_id);
if (it != g_state->running.end()) ctrl = it->second;
}
if (ctrl) {
ctrl->cancel_requested.store(true);
if (ctrl->pid > 0) kill(ctrl->pid, SIGTERM);
return true;
}
// No corriendo: marcar cancelled si esta queued.
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"UPDATE jobs SET status='cancelled', finished_at=?, "
"error='cancelled before start' WHERE id=? AND status='queued'";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_int64(st, 1, now_ms());
sqlite3_bind_text (st, 2, job_id, -1, SQLITE_TRANSIENT);
ok = sqlite3_step(st) == SQLITE_DONE;
}
sqlite3_finalize(st);
sqlite3_close(db);
return ok;
}
bool jobs_delete(const char* job_id) {
if (!g_state || !job_id) return false;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"DELETE FROM jobs WHERE id=? AND status IN ('done','error','cancelled')";
bool ok = false;
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_text(st, 1, job_id, -1, SQLITE_TRANSIENT);
ok = sqlite3_step(st) == SQLITE_DONE;
}
sqlite3_finalize(st);
sqlite3_close(db);
return ok;
}
bool jobs_list(std::vector<JobRow>* out, int limit) {
if (!g_state || !out) return false;
out->clear();
if (limit < 1) limit = 1;
if (limit > 1000) limit = 1000;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return false;
}
sqlite3_stmt* st = nullptr;
const char* sql =
"SELECT id, enricher_id, COALESCE(node_id,''), node_name, status, "
"progress, stage, COALESCE(error,''), COALESCE(result_json,''), "
"created_at, COALESCE(started_at,0), COALESCE(finished_at,0) "
"FROM jobs ORDER BY created_at DESC LIMIT ?";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
sqlite3_bind_int(st, 1, limit);
while (sqlite3_step(st) == SQLITE_ROW) {
JobRow r;
auto col = [&](int i) {
const unsigned char* t = sqlite3_column_text(st, i);
return std::string(t ? (const char*)t : "");
};
r.id = col(0);
r.enricher_id = col(1);
r.node_id = col(2);
r.node_name = col(3);
r.status = col(4);
r.progress = sqlite3_column_double(st, 5);
r.stage = col(6);
r.error = col(7);
r.result_json = col(8);
r.created_at = sqlite3_column_int64(st, 9);
r.started_at = sqlite3_column_int64(st, 10);
r.finished_at = sqlite3_column_int64(st, 11);
out->push_back(std::move(r));
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return true;
}
JobCounters jobs_counters() {
JobCounters c{};
if (!g_state) return c;
sqlite3* db = nullptr;
if (sqlite3_open_v2(g_state->app_db_path.c_str(), &db,
SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
if (db) sqlite3_close(db);
return c;
}
sqlite3_stmt* st = nullptr;
const char* sql = "SELECT status, COUNT(*) FROM jobs GROUP BY status";
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) == SQLITE_OK) {
while (sqlite3_step(st) == SQLITE_ROW) {
const unsigned char* s = sqlite3_column_text(st, 0);
int n = sqlite3_column_int(st, 1);
if (!s) continue;
std::string status((const char*)s);
if (status == "queued") c.queued = n;
else if (status == "running") c.running = n;
else if (status == "done") c.done = n;
else if (status == "error") c.error = n;
else if (status == "cancelled") c.cancelled = n;
}
}
sqlite3_finalize(st);
sqlite3_close(db);
return c;
}
int jobs_dirty_counter() {
if (!g_state) return 0;
return g_state->dirty.load(std::memory_order_relaxed);
}
void jobs_shutdown() {
if (!g_state) return;
g_state->stop_flag.store(true);
// Cancelar todos los running.
{
std::lock_guard<std::mutex> lk(g_state->q_mu);
for (auto& kv : g_state->running) {
kv.second->cancel_requested.store(true);
if (kv.second->pid > 0) kill(kv.second->pid, SIGTERM);
}
}
g_state->q_cv.notify_all();
for (auto& t : g_state->workers) {
if (t.joinable()) t.join();
}
delete g_state;
g_state = nullptr;
}
} // namespace ge
+97
View File
@@ -0,0 +1,97 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <string>
#include <vector>
// Sistema de jobs asincronos para enrichers (issue 0026).
//
// Diseno:
// - Tabla `jobs` en graph_explorer.db (NO en operations.db).
// - Pool de N std::thread workers (default 2).
// - Cada job spawnea un subprocess Python `enrichers/<id>/run.py` que recibe
// contexto por stdin (JSON), emite progreso por stderr y resultado final
// por stdout (JSON).
// - El worker aplica entities/relations/node_updates al operations.db indicado.
// - dirty_counter se incrementa al completar un job — el render() lee el
// contador y triggerea un reload del grafo cuando cambia.
//
// El estado vivo (running/queued en RAM) se persiste tambien en SQLite para
// que cancelaciones, reinicios y queries de UI vean lo mismo.
namespace ge {
struct JobRow {
std::string id;
std::string enricher_id;
std::string node_id;
std::string node_name;
std::string status; // queued|running|done|error|cancelled
double progress = 0.0;
std::string stage;
std::string error;
std::string result_json;
long long created_at = 0;
long long started_at = 0;
long long finished_at = 0;
};
// Inicializa el sistema. Crea la tabla `jobs` si no existe, marca como `error`
// los jobs que quedaron `running` de una sesion anterior, lanza los workers.
// `app_db_path` es <app_dir>/graph_explorer.db (jobs).
// `ops_db_path` es la operations.db actualmente cargada (target de mutaciones).
// `enrichers_dir` es <app_dir>/enrichers/.
// `app_dir` se usa como root para resolver `cache/` (issue 0027).
// `registry_root` se pasa al subprocess como FN_REGISTRY_ROOT para que el
// enricher pueda importar funciones del registry.
// Retorna false si alguno de los paths falla.
bool jobs_init(const char* app_db_path,
const char* ops_db_path,
const char* enrichers_dir,
const char* app_dir,
const char* registry_root,
int n_workers);
// Cambia la operations.db objetivo (cuando el usuario abre otra). Drena cola
// pendiente — los jobs ya queued NO se reapuntan.
void jobs_set_ops_db(const char* ops_db_path);
// Encola un job. params_json puede ser "{}" o JSON valido. node_id puede ser
// vacio (job global). Devuelve false si la BD falla. out_id (>= 64) recibe el
// ULID generado.
bool jobs_submit(const char* enricher_id,
const char* node_id,
const char* node_name,
const char* params_json,
char* out_id, size_t out_id_n);
// Senala SIGTERM al subprocess y marca el job como `cancelled`. Si esta
// `queued` lo marca directamente.
bool jobs_cancel(const char* job_id);
// Borra un job en estado terminal (done/error/cancelled).
bool jobs_delete(const char* job_id);
// Snapshot ordenado por created_at DESC.
bool jobs_list(std::vector<JobRow>* out, int limit = 200);
// Counters para el badge en la toolbar.
struct JobCounters {
int queued = 0;
int running = 0;
int done = 0;
int error = 0;
int cancelled = 0;
};
JobCounters jobs_counters();
// Counter monotono que se incrementa cada vez que un job completa con
// cambios en operations.db. El main thread compara contra su ultimo valor
// conocido y, si cambio, llama reload_graph().
int jobs_dirty_counter();
// Cierra el pool y espera a los workers (cancelando jobs en vuelo).
void jobs_shutdown();
} // namespace ge
+105 -1
View File
@@ -25,6 +25,8 @@
#include "layout_store.h"
#include "entity_ops.h"
#include "project_manager.h"
#include "jobs.h"
#include "enrichers.h"
#include "../../../../cpp/vendor/sqlite3/sqlite3.h"
@@ -39,6 +41,13 @@
#include <string>
#include <vector>
#ifndef _WIN32
#include <unistd.h>
#else
#include <direct.h>
#define getcwd _getcwd
#endif
// ----------------------------------------------------------------------------
// Estado global de la app
// ----------------------------------------------------------------------------
@@ -116,6 +125,32 @@ static void apply_static_layout(int mode) {
// Forward decl — definido mas abajo, lo necesita switch_to_project.
static bool load_input();
// ----------------------------------------------------------------------------
// Registry path resolution (issue 0026)
// ----------------------------------------------------------------------------
// Devuelve el path absoluto al root de fn_registry. Estrategia:
// 1) FN_REGISTRY_ROOT env var
// 2) Sube desde getcwd() buscando un dir con `registry.db`
// 3) "" si no se encuentra
static std::string resolve_registry_root() {
if (const char* env = std::getenv("FN_REGISTRY_ROOT")) {
if (env && *env) return env;
}
char cwd[4096];
if (getcwd(cwd, sizeof(cwd)) == nullptr) return "";
std::string p = cwd;
for (int i = 0; i < 8; ++i) {
std::string probe = p + "/registry.db";
FILE* f = std::fopen(probe.c_str(), "rb");
if (f) { std::fclose(f); return p; }
size_t s = p.find_last_of('/');
if (s == std::string::npos || s == 0) break;
p = p.substr(0, s);
}
return "";
}
// ----------------------------------------------------------------------------
// Project lifecycle
// ----------------------------------------------------------------------------
@@ -240,6 +275,9 @@ static bool load_input() {
ge::entity_index_build(g_input.uri, &g_idx);
g_app.input_db_path = g_input.uri ? g_input.uri : "";
// issue 0026 — apunta el JobRunner a la nueva operations.db.
if (g_input.uri) ge::jobs_set_ops_db(g_input.uri);
// Cargar posiciones guardadas para este graph_hash
g_graph_hash = ge::compute_graph_hash(g_input.uri);
int restored = ge::layout_store_load(g_graph_hash, g_graph);
@@ -483,7 +521,27 @@ static void render_context_menu() {
ImGui::Separator();
if (ImGui::BeginMenu("Run enricher")) {
ImGui::TextDisabled("(coming soon — issues 0001/0002/0003)");
// issue 0026 — listamos enrichers cuyo applies_to incluye este type.
const char* type_name = (n.type_id < (uint16_t)g_graph.type_count)
? g_graph.types[n.type_id].name : "";
auto specs = ge::enrichers_for_type(type_name);
if (!sql_id) {
ImGui::TextDisabled("(node has no entity id)");
} else if (specs.empty()) {
ImGui::TextDisabled("(no enrichers para tipo '%s')", type_name);
} else {
for (const auto& s : specs) {
if (ImGui::MenuItem(s.name.c_str())) {
char job_id[64];
bool ok = ge::jobs_submit(s.id.c_str(), sql_id, lbl,
"{}", job_id, sizeof(job_id));
if (ok) g_app.panel_jobs = true; // abrir panel auto
}
if (!s.description.empty() && ImGui::IsItemHovered()) {
ImGui::SetTooltip("%s", s.description.c_str());
}
}
}
ImGui::EndMenu();
}
@@ -512,6 +570,7 @@ static fn_ui::PanelToggle g_panels[] = {
{"Note", nullptr, &g_app.panel_note},
{"Types", nullptr, &g_app.panel_type_editor},
{"Table", nullptr, &g_app.panel_table},
{"Jobs", nullptr, &g_app.panel_jobs},
};
static void render() {
@@ -596,6 +655,16 @@ static void render() {
g_app.apply_layout_tick = 0;
}
// issue 0026 — si un job termino con cambios, dispara reload del grafo.
{
static int s_last_dirty = 0;
int d = ge::jobs_dirty_counter();
if (d != s_last_dirty) {
s_last_dirty = d;
g_app.want_reload = true;
}
}
// Triggers desde la toolbar
if (g_app.want_fit) {
graph_viewport_fit(g_graph, g_viewport);
@@ -1194,6 +1263,12 @@ static void render() {
ge::views_table_window(g_app);
ge::views_import_dataset_modal(g_app);
// Jobs panel (issue 0026) — flotante, dockeable.
ImGui::SetNextWindowPos (ImVec2(vp->WorkPos.x + W * 0.20f, top + 40.0f),
ImGuiCond_FirstUseEver);
ImGui::SetNextWindowSize(ImVec2(900.0f, 360.0f), ImGuiCond_FirstUseEver);
ge::views_jobs(g_app);
g_first_render = false;
}
@@ -1418,6 +1493,34 @@ int main(int argc, char** argv) {
"cualquier app del registry y permite explorar entidades/relaciones con "
"shapes/iconos/layouts/filtros.");
// issue 0026 — sistema de jobs + enrichers.
{
std::string registry_root = resolve_registry_root();
std::string app_dir = registry_root.empty()
? "."
: registry_root + "/projects/osint_graph/apps/graph_explorer";
std::string enrichers_dir = app_dir + "/enrichers";
// graph_explorer.db es el mismo SQLite usado por layout_store.
const char* app_db = g_layout_db_path.empty()
? "graph_explorer.db" : g_layout_db_path.c_str();
ge::enrichers_load(enrichers_dir.c_str());
if (!ge::jobs_init(app_db,
g_input.uri ? g_input.uri : "",
enrichers_dir.c_str(),
app_dir.c_str(),
registry_root.c_str(),
/*n_workers=*/2)) {
std::fprintf(stderr, "[graph_explorer] jobs_init failed (panel disabled)\n");
} else {
std::fprintf(stdout,
"[graph_explorer] jobs_init OK — enrichers_dir=%s, registry_root=%s, %d enrichers\n",
enrichers_dir.c_str(), registry_root.c_str(),
(int)ge::enrichers_all().size());
}
}
int rc = fn::run_app(
{.title = "graph_explorer",
.width = 1600,
@@ -1429,6 +1532,7 @@ int main(int argc, char** argv) {
render);
// Cleanup
ge::jobs_shutdown();
if (g_gpu_ctx) graph_force_layout_gpu_destroy(g_gpu_ctx);
if (g_atlas) graph_icons_destroy(g_atlas);
graph_viewport_destroy(g_viewport);
+1
View File
@@ -523,6 +523,7 @@ uint16_t tabler_codepoint_by_name(const char* name) {
{"ti-at", 0xEA2B}, // TI_AT
{"ti-home", 0xEAC1}, // TI_HOME
{"ti-database", 0xEA88}, // TI_DATABASE
{"ti-file-text", 0xEAA2}, // TI_FILE_TEXT
};
auto it = map.find(name);
if (it == map.end()) return 0;
+9
View File
@@ -55,6 +55,7 @@ struct AppState {
bool panel_stats = true;
bool panel_viewport = true;
bool panel_note = false;
bool panel_jobs = false; // issue 0026
bool show_filters_modal = false;
bool show_open_modal = false;
@@ -353,6 +354,14 @@ void views_type_editor(AppState& app);
// te_delete_use_count via consulta a operations.db antes de mostrarlo.
bool views_type_editor_delete_modal(AppState& app);
// ---- Jobs panel (issue 0026) ---------------------------------------------
// Renderiza el panel "Jobs" — tabla con todos los jobs (queued/running/done/
// error/cancelled). Botones por fila para cancelar / reintentar / borrar.
// Click en target_node centra el viewport sobre ese nodo (futuro). Polling
// cada N frames para no spammear la BD.
void views_jobs(AppState& app);
// ---- Filter helpers (issue 0009) -----------------------------------------
// True si el filtro tiene query no vacia o al menos un tag activo.
+199
View File
@@ -0,0 +1,199 @@
#include "views.h"
#include "jobs.h"
#include "core/icons_tabler.h"
#include "core/tokens.h"
#include "imgui.h"
#include <cfloat>
#include <chrono>
#include <cstdio>
namespace ge {
namespace {
// Cache de la lista de jobs. Se refresca cada N frames para no abrir SQLite
// en cada frame. ~10 Hz es suficiente para una progress bar fluida.
struct JobsCache {
std::vector<JobRow> rows;
int last_frame_refresh = -100;
int filter_idx = 0; // 0=all 1=active 2=done 3=error
char buf[8] = {};
};
JobsCache g_jobs_cache;
const char* status_icon(const std::string& s) {
if (s == "queued") return TI_HOURGLASS;
if (s == "running") return TI_PLAYER_PLAY;
if (s == "done") return TI_CHECK;
if (s == "error") return TI_ALERT_CIRCLE;
if (s == "cancelled") return TI_X;
return TI_QUESTION_MARK;
}
ImVec4 status_color(const std::string& s) {
if (s == "running") return ImVec4(0.36f, 0.78f, 1.0f, 1.0f);
if (s == "done") return ImVec4(0.40f, 0.85f, 0.55f, 1.0f);
if (s == "error") return ImVec4(0.95f, 0.45f, 0.45f, 1.0f);
if (s == "cancelled") return ImVec4(0.65f, 0.65f, 0.65f, 1.0f);
if (s == "queued") return ImVec4(0.85f, 0.78f, 0.45f, 1.0f);
return ImVec4(0.7f, 0.7f, 0.7f, 1.0f);
}
std::string format_duration(long long started, long long finished) {
if (started <= 0) return "";
long long end = finished > 0 ? finished
: std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
long long ms = end - started;
if (ms < 0) ms = 0;
char b[32];
if (ms < 1000) std::snprintf(b, sizeof(b), "%lld ms", ms);
else if (ms < 60'000) std::snprintf(b, sizeof(b), "%.1f s", ms / 1000.0);
else std::snprintf(b, sizeof(b), "%.1f m", ms / 60'000.0);
return b;
}
bool filter_match(const std::string& status, int idx) {
switch (idx) {
case 0: return true; // all
case 1: return status == "queued" || status == "running"; // active
case 2: return status == "done";
case 3: return status == "error" || status == "cancelled";
default: return true;
}
}
} // namespace
void views_jobs(AppState& app) {
if (!app.panel_jobs) return;
if (!ImGui::Begin("Jobs", &app.panel_jobs)) {
ImGui::End();
return;
}
// Refresh cache cada ~10 frames (~6 Hz a 60fps).
int frame = ImGui::GetFrameCount();
if (frame - g_jobs_cache.last_frame_refresh > 10) {
jobs_list(&g_jobs_cache.rows, 200);
g_jobs_cache.last_frame_refresh = frame;
}
// Header: counters + filtro.
JobCounters c = jobs_counters();
ImGui::TextColored(status_color("running"), "%s", TI_PLAYER_PLAY);
ImGui::SameLine(); ImGui::Text("%d", c.running);
ImGui::SameLine(0, 16);
ImGui::TextColored(status_color("queued"), "%s", TI_HOURGLASS);
ImGui::SameLine(); ImGui::Text("%d", c.queued);
ImGui::SameLine(0, 16);
ImGui::TextColored(status_color("done"), "%s", TI_CHECK);
ImGui::SameLine(); ImGui::Text("%d", c.done);
ImGui::SameLine(0, 16);
ImGui::TextColored(status_color("error"), "%s", TI_ALERT_CIRCLE);
ImGui::SameLine(); ImGui::Text("%d", c.error + c.cancelled);
ImGui::SameLine(0, 24);
const char* filter_labels[] = { "All", "Active", "Done", "Errors" };
ImGui::SetNextItemWidth(100);
ImGui::Combo("##jobs_filter", &g_jobs_cache.filter_idx,
filter_labels, IM_ARRAYSIZE(filter_labels));
ImGui::Separator();
// Tabla.
ImGuiTableFlags tflags = ImGuiTableFlags_Borders |
ImGuiTableFlags_RowBg |
ImGuiTableFlags_SizingStretchProp |
ImGuiTableFlags_ScrollY;
if (ImGui::BeginTable("jobs_table", 6, tflags,
ImVec2(0, ImGui::GetContentRegionAvail().y - 4))) {
ImGui::TableSetupScrollFreeze(0, 1);
ImGui::TableSetupColumn("Status", ImGuiTableColumnFlags_WidthFixed, 90);
ImGui::TableSetupColumn("Enricher", ImGuiTableColumnFlags_WidthStretch, 1.5f);
ImGui::TableSetupColumn("Target", ImGuiTableColumnFlags_WidthStretch, 2.0f);
ImGui::TableSetupColumn("Progress", ImGuiTableColumnFlags_WidthStretch, 2.0f);
ImGui::TableSetupColumn("Time", ImGuiTableColumnFlags_WidthFixed, 70);
ImGui::TableSetupColumn("##actions",ImGuiTableColumnFlags_WidthFixed, 80);
ImGui::TableHeadersRow();
for (const auto& r : g_jobs_cache.rows) {
if (!filter_match(r.status, g_jobs_cache.filter_idx)) continue;
ImGui::PushID(r.id.c_str());
ImGui::TableNextRow();
// Status.
ImGui::TableSetColumnIndex(0);
ImGui::TextColored(status_color(r.status), "%s %s",
status_icon(r.status), r.status.c_str());
// Enricher.
ImGui::TableSetColumnIndex(1);
ImGui::TextUnformatted(r.enricher_id.c_str());
// Target.
ImGui::TableSetColumnIndex(2);
if (!r.node_name.empty()) {
ImGui::TextUnformatted(r.node_name.c_str());
} else if (!r.node_id.empty()) {
ImGui::TextDisabled("%s", r.node_id.c_str());
} else {
ImGui::TextDisabled("(global)");
}
// Progress.
ImGui::TableSetColumnIndex(3);
if (r.status == "running" || r.status == "queued") {
ImGui::ProgressBar((float)r.progress, ImVec2(-FLT_MIN, 0),
r.stage.empty() ? nullptr : r.stage.c_str());
} else if (r.status == "error" && !r.error.empty()) {
ImGui::TextColored(status_color("error"), "%s",
r.error.size() > 64
? (r.error.substr(0, 64) + "").c_str()
: r.error.c_str());
if (ImGui::IsItemHovered()) {
ImGui::SetTooltip("%s", r.error.c_str());
}
} else if (r.status == "done" && !r.result_json.empty()) {
ImGui::TextDisabled("%s",
r.result_json.size() > 80
? (r.result_json.substr(0, 80) + "").c_str()
: r.result_json.c_str());
if (ImGui::IsItemHovered() && r.result_json.size() > 80) {
ImGui::SetTooltip("%s", r.result_json.c_str());
}
} else {
ImGui::TextDisabled("");
}
// Time.
ImGui::TableSetColumnIndex(4);
ImGui::TextDisabled("%s",
format_duration(r.started_at, r.finished_at).c_str());
// Actions.
ImGui::TableSetColumnIndex(5);
if (r.status == "queued" || r.status == "running") {
if (ImGui::SmallButton("Cancel")) {
jobs_cancel(r.id.c_str());
}
} else {
if (ImGui::SmallButton("Delete")) {
jobs_delete(r.id.c_str());
}
}
ImGui::PopID();
}
ImGui::EndTable();
}
ImGui::End();
}
} // namespace ge