Files
egutierrez 7a94160fd2 feat: catch-up de decisiones previas (Webpage→Url, anti-bot, UI 2-col, tests cross-platform)
Bloque de cambios revisados y validados con el usuario en sesiones
previas que no habian aterrizado en commits propios. Lista por tema:

* enrichers: web_search ahora usa lite.duckduckgo.com como endpoint
  primario (mas tolerante con bot detection desde IP residencial),
  con fallback al endpoint html. Detecta pagina captcha y emite
  error claro si ambos fallan. Anyade _DDGLiteParser para el formato
  lite + auto-pick de parser por contenido.

* enrichers: tipo Webpage unificado en Url (campos de cuerpo
  cacheado viven en metadata del Url). Manifests actualizados
  (applies_to: [Url]). fetch_webpage ya no convierte Url->Webpage.

* enrichers/manifest: campo `params` parseado a EnricherSpec.params
  (name, type, default_value, description). UI puede renderizar
  dialog de configuracion.

* jobs: fix de path conversion para Python embebido nativo Windows
  (no convertir a /mnt/c/... cuando el subproceso es Windows-native;
  solo cuando es bash o python via WSL).

* main.cpp: ventana ImGui (no modal) "Run enricher" con layout
  2-col (label izq, input der). Inserta job con JSON tipado. Layout
  clustering apretado: hijos del mismo anchor en un solo anillo
  alrededor del padre, sin desperdigar por anillos crecientes.

* views: inspector con layout 2-col via BeginTable (Identity,
  Schema fields, Extras). Description full-width debajo de su label.

* tests: portable conftest (auto-detecta REGISTRY_ROOT, PYTHON_BIN,
  ENRICHERS_DIR para WSL y Windows portable). _runner.py trampoline
  inyecta stub via sys.path porque embedded Python ignora PYTHONPATH.
  Tests bash-only (vendor_script, freeze, dispatcher bash, resolver
  Linux-binary) skipean en Windows. Tests existentes adaptados a
  Webpage->Url.

Resultado actual: 32 passed WSL, 21 passed + 11 skipped Windows.
2026-05-03 14:41:28 +02:00

357 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""Enricher fetch_webpage — issue 0028.
Lee JSON de stdin, descarga la URL del nodo, convierte HTML a markdown,
guarda blobs en `<cache_dir>/<sha256[0:2]>/<sha256>.{html,md}`, actualiza el
nodo (deja type_ref=Url) con metadata enriquecida y crea/conecta el Domain.
Nota: historicamente fetch_webpage convertia Url -> Webpage, pero esos
dos tipos se han unificado en Url. Los campos de cuerpo cacheado
(html_path, markdown_path, status_code, fetched_at, text_length, ...)
viven en metadata.
Wire protocol (issue 0026):
- stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir,
registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen `{entities_added, ...}`.
- exit code 0 = ok, !=0 = error (stderr capturado se muestra en panel).
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import sqlite3
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
def progress(p: float, stage: str = "") -> None:
"""Emite linea PROGRESS al stderr para que C++ actualice la UI."""
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def load_registry_funcs(registry_root: str):
"""Importa funciones del registry. Prefiere `_vendored/` (issue 0033b);
si no existe, fallback a `<registry_root>/python/functions/` (modo dev)."""
vendored = os.path.join(os.path.dirname(__file__), "_vendored")
if os.path.isdir(vendored):
if vendored not in sys.path:
sys.path.insert(0, vendored)
elif registry_root:
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.cybersecurity import normalize_url # type: ignore
from core.html_to_markdown import html_to_markdown # type: ignore
return normalize_url, html_to_markdown
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def fetch_with_requests(url: str, timeout: int):
"""Descarga la URL y retorna (status_code, content_type, html, encoding).
Usa `requests` si esta disponible, fallback a urllib.
"""
try:
import requests # type: ignore
headers = {
"User-Agent": (
"Mozilla/5.0 (graph_explorer/0.1; "
"https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/graph_explorer) "
"Chrome/120 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
r = requests.get(url, timeout=timeout, headers=headers, allow_redirects=True)
ct = r.headers.get("Content-Type", "text/html")
# `requests` decodifica por encoding HTTP/charset; si falla cae a apparent.
return r.status_code, ct, r.text, r.encoding or "utf-8"
except ImportError:
from urllib.request import Request, urlopen
req = Request(url, headers={"User-Agent": "graph_explorer/0.1"})
with urlopen(req, timeout=timeout) as resp: # type: ignore
data = resp.read()
ct = resp.headers.get("Content-Type", "text/html")
enc = "utf-8"
m = re.search(r"charset=([\w-]+)", ct, re.I)
if m:
enc = m.group(1).lower()
return resp.status, ct, data.decode(enc, errors="replace"), enc
_TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
def extract_title(html: str) -> str:
m = _TITLE_RE.search(html)
if not m:
return ""
title = re.sub(r"\s+", " ", m.group(1)).strip()
if len(title) > 300:
title = title[:300] + ""
return title
def domain_of(url: str) -> str:
try:
host = urlparse(url).hostname or ""
return host.lower()
except Exception:
return ""
def cache_paths(cache_dir: str, key: str) -> tuple[Path, Path]:
"""Devuelve (html_path, md_path) y crea el dir intermedio."""
sub = key[:2]
base = Path(cache_dir) / sub
base.mkdir(parents=True, exist_ok=True)
return base / f"{key}.html", base / f"{key}.md"
def merge_metadata_json(existing: str, patch: dict) -> str:
"""Fusiona patch sobre el JSON existente (string) y devuelve nuevo string."""
try:
cur = json.loads(existing) if existing else {}
if not isinstance(cur, dict):
cur = {}
except Exception:
cur = {}
cur.update(patch)
return json.dumps(cur, ensure_ascii=False)
def upsert_domain(conn: sqlite3.Connection, name: str) -> str:
"""Crea o reusa entidad Domain por nombre. Retorna su id."""
cur = conn.execute(
"SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(name,),
)
row = cur.fetchone()
if row:
return row[0]
new_id = f"Domain_{now_ms()}"
ts = now_iso()
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) "
"VALUES (?, ?, 'Domain', 'enricher:fetch_webpage', ?, ?)",
(new_id, name, ts, ts),
)
return new_id
def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool:
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name=? LIMIT 1",
(from_id, to_id, name),
)
return cur.fetchone() is not None
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool:
if relation_exists(conn, from_id, to_id, name):
return False
ts = now_iso()
rel_id = f"rel_{now_ms()}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_type = ctx.get("node_type") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
cache_dir = ctx.get("cache_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
timeout_s = int(params.get("timeout_s", 15))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# URL puede estar en `url` (Url/Webpage) o `address` (Url legacy).
raw_url = (metadata.get("url") or metadata.get("address") or "").strip()
if not raw_url:
# Fallback: si el nodo no tiene url en metadata, mira el name.
raw_url = (ctx.get("node_name") or "").strip()
if not raw_url:
log("nodo sin url en metadata ni name")
return 2
progress(0.05, "normalize")
try:
normalize_url, html_to_markdown = load_registry_funcs(registry_root)
except Exception as e:
log(f"registry imports failed: {e}")
return 3
try:
url = normalize_url(raw_url)
except Exception:
url = raw_url
if not url.startswith(("http://", "https://")):
url = "https://" + url
progress(0.20, "fetching")
try:
status, content_type, html, _enc = fetch_with_requests(url, timeout=timeout_s)
except Exception as e:
log(f"fetch failed: {e}")
# Marcamos node con status=-1 para evidencia.
conn = sqlite3.connect(ops_db_path)
try:
cur = conn.execute("SELECT metadata FROM entities WHERE id=?", (node_id,))
row = cur.fetchone()
existing_meta = row[0] if row and row[0] else "{}"
patch = {"url": url, "fetched_at": now_iso(), "status_code": -1}
new_meta = merge_metadata_json(existing_meta, patch)
conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(new_meta, now_iso(), node_id),
)
conn.commit()
finally:
conn.close()
print(json.dumps({"error": str(e), "url": url, "entities_added": 0,
"relations_added": 0}))
return 4
progress(0.55, "parsing")
try:
markdown = html_to_markdown(html)
except Exception as e:
log(f"html_to_markdown failed (will save raw): {e}")
markdown = ""
title = extract_title(html)
text_length = len(markdown) if markdown else len(html)
progress(0.80, "writing")
key = hashlib.sha256(url.encode("utf-8")).hexdigest()
html_path, md_path = cache_paths(cache_dir, key)
try:
html_path.write_text(html, encoding="utf-8", errors="replace")
if markdown:
md_path.write_text(markdown, encoding="utf-8")
except Exception as e:
log(f"cache write failed: {e}")
return 5
# Paths en metadata se guardan relativos al app_dir para portabilidad.
rel_html = os.path.relpath(html_path, ctx.get("app_dir") or cache_dir)
rel_md = os.path.relpath(md_path, ctx.get("app_dir") or cache_dir)
progress(0.92, "applying")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
node_updated = False
try:
# 1. Update del nodo: convertir Url -> Webpage si aplica + parche meta.
cur = conn.execute(
"SELECT type_ref, metadata FROM entities WHERE id=?", (node_id,)
)
row = cur.fetchone()
if not row:
log(f"node {node_id} disappeared")
return 6
cur_type, cur_meta = row[0], row[1] or "{}"
# Webpage fue un tipo separado historicamente. Hoy se unifica en
# Url (mismo tipo, los campos de cuerpo cacheado viven en
# metadata): si el nodo entrante es Url o el legacy Webpage, lo
# dejamos como Url; si el nodo no tiene tipo, default Url.
if not cur_type or cur_type.lower() in ("url", "webpage"):
new_type = "Url"
else:
new_type = cur_type
patch = {
"url": url,
"title": title,
"status_code": status,
"content_type": content_type,
"fetched_at": now_iso(),
"html_path": rel_html,
"markdown_path": rel_md if markdown else "",
"text_length": text_length,
}
new_meta = merge_metadata_json(cur_meta, patch)
conn.execute(
"UPDATE entities SET type_ref=?, metadata=?, updated_at=? WHERE id=?",
(new_type, new_meta, now_iso(), node_id),
)
node_updated = True
# 2. Crear/conectar Domain.
dname = domain_of(url)
if dname:
existed_before = conn.execute(
"SELECT 1 FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(dname,),
).fetchone() is not None
domain_id = upsert_domain(conn, dname)
if not existed_before:
entities_added += 1
if insert_relation(conn, node_id, domain_id, "BELONGS_TO"):
relations_added += 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"url": url,
"status_code": status,
"title": title,
"text_length": text_length,
"html_path": rel_html,
"markdown_path": rel_md if markdown else "",
"entities_added": entities_added,
"relations_added": relations_added,
"node_updated": node_updated,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())