Files
graph_explorer/enrichers/web_search/run.py
T
egutierrez 7a94160fd2 feat: catch-up de decisiones previas (Webpage→Url, anti-bot, UI 2-col, tests cross-platform)
Bloque de cambios revisados y validados con el usuario en sesiones
previas que no habian aterrizado en commits propios. Lista por tema:

* enrichers: web_search ahora usa lite.duckduckgo.com como endpoint
  primario (mas tolerante con bot detection desde IP residencial),
  con fallback al endpoint html. Detecta pagina captcha y emite
  error claro si ambos fallan. Anyade _DDGLiteParser para el formato
  lite + auto-pick de parser por contenido.

* enrichers: tipo Webpage unificado en Url (campos de cuerpo
  cacheado viven en metadata del Url). Manifests actualizados
  (applies_to: [Url]). fetch_webpage ya no convierte Url->Webpage.

* enrichers/manifest: campo `params` parseado a EnricherSpec.params
  (name, type, default_value, description). UI puede renderizar
  dialog de configuracion.

* jobs: fix de path conversion para Python embebido nativo Windows
  (no convertir a /mnt/c/... cuando el subproceso es Windows-native;
  solo cuando es bash o python via WSL).

* main.cpp: ventana ImGui (no modal) "Run enricher" con layout
  2-col (label izq, input der). Inserta job con JSON tipado. Layout
  clustering apretado: hijos del mismo anchor en un solo anillo
  alrededor del padre, sin desperdigar por anillos crecientes.

* views: inspector con layout 2-col via BeginTable (Identity,
  Schema fields, Extras). Description full-width debajo de su label.

* tests: portable conftest (auto-detecta REGISTRY_ROOT, PYTHON_BIN,
  ENRICHERS_DIR para WSL y Windows portable). _runner.py trampoline
  inyecta stub via sys.path porque embedded Python ignora PYTHONPATH.
  Tests bash-only (vendor_script, freeze, dispatcher bash, resolver
  Linux-binary) skipean en Windows. Tests existentes adaptados a
  Webpage->Url.

Resultado actual: 32 passed WSL, 21 passed + 11 skipped Windows.
2026-05-03 14:41:28 +02:00

572 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""Enricher web_search — busca en DuckDuckGo HTML y crea nodos Url.
Wire protocol estandar (issue 0026):
- stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir,
cache_dir, registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen.
- exit code 0 = ok, !=0 = error.
DDG endpoints usados:
1. https://lite.duckduckgo.com/lite/ (POST) — endpoint primario.
HTML minimo (ano 2009-style), tabla con `<a class='result-link'>` y
`<td class='result-snippet'>`. Es el menos agresivo con bot
detection; suele responder 200 cuando el endpoint `html.` ya
devuelve un challenge "anomaly" desde IPs residenciales/Windows.
2. https://html.duckduckgo.com/html/ (POST) — fallback. Su parser
usa `result__a` / `result__snippet`. DDG envuelve los enlaces en
`//duckduckgo.com/l/?uddg=<encoded>` que hay que decodificar.
Si ambos endpoints devuelven la pagina anti-bot ("anomaly", challenge
captcha), el enricher emite un error claro indicando que se necesita
`web_search_cdp` (issue 0029) — el fallback simple zero-infra no puede
resolver el challenge.
"""
from __future__ import annotations
import html
import json
import os
import re
import sqlite3
import sys
import time
from datetime import datetime, timezone
from html.parser import HTMLParser
from urllib.parse import parse_qs, unquote, urlparse
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def _ddg_post(url: str, params: dict, headers: dict, timeout: int) -> str:
try:
import requests # type: ignore
r = requests.post(url, data=params, headers=headers, timeout=timeout)
return r.text
except ImportError:
from urllib.parse import urlencode
from urllib.request import Request, urlopen
body = urlencode(params).encode()
req = Request(url, data=body, headers=headers)
with urlopen(req, timeout=timeout) as resp: # type: ignore
return resp.read().decode("utf-8", errors="replace")
def is_anomaly_page(htmltxt: str) -> bool:
"""Detecta la pagina anti-bot de DDG (challenge captcha)."""
s = htmltxt.lower()
return "anomaly" in s and "challenge" in s
def fetch_ddg(query: str, timeout: int, region: str, safe: str) -> tuple[str, str]:
"""Descarga la pagina de resultados de DuckDuckGo.
Intenta primero `lite.duckduckgo.com/lite/` (HTML minimo, ano-2009
style, mucho menos agresivo con bot detection que `html.`). Si
ese endpoint devuelve la pagina anti-bot, cae al endpoint `html.`.
Devuelve `(html, source)` donde source ∈ {"lite", "html"}.
"""
params = {"q": query}
if region:
params["kl"] = region
safe_map = {"strict": "1", "moderate": "-1", "off": "-2"}
if safe in safe_map:
params["kp"] = safe_map[safe]
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.7",
}
htmltxt = _ddg_post("https://lite.duckduckgo.com/lite/", params,
headers, timeout)
if not is_anomaly_page(htmltxt):
return htmltxt, "lite"
log("lite endpoint devolvio challenge — fallback a html endpoint")
htmltxt = _ddg_post("https://html.duckduckgo.com/html/", params,
headers, timeout)
return htmltxt, "html"
def decode_ddg_href(href: str) -> str:
"""Decodifica el href de DDG, que envuelve la URL real en `uddg=`.
Formatos posibles:
//duckduckgo.com/l/?uddg=https%3A...&rut=...
/l/?uddg=https%3A...
https://example.com/... (raro, pero ocurre con anuncios o cuando DDG
no envuelve)
"""
if not href:
return ""
if href.startswith("//"):
href = "https:" + href
elif href.startswith("/l/"):
href = "https://duckduckgo.com" + href
try:
u = urlparse(href)
if u.netloc.endswith("duckduckgo.com") and u.path == "/l/":
qs = parse_qs(u.query)
target = qs.get("uddg", [""])[0]
if target:
return unquote(target)
except Exception:
pass
return href
class _DDGParser(HTMLParser):
"""Extrae resultados (anchor + snippet + rank) del HTML de DDG.
No intenta ser completo — solo busca `<a class="result__a">` para el
titulo/url y `<a class="result__snippet">` (o el div equivalente)
para el texto. Es robusto a cambios menores: si DDG renombra clases,
el enricher devolvera 0 resultados pero no peta.
"""
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.results: list[dict] = []
self._cur: dict | None = None
self._in_title = False
self._in_snippet = False
self._title_buf: list[str] = []
self._snippet_buf: list[str] = []
def _classes(self, attrs: list[tuple[str, str | None]]) -> set[str]:
for k, v in attrs:
if k == "class" and v:
return set(v.split())
return set()
def _href(self, attrs: list[tuple[str, str | None]]) -> str:
for k, v in attrs:
if k == "href" and v:
return v
return ""
def handle_starttag(self, tag: str, attrs):
if tag != "a":
return
cls = self._classes(attrs)
if "result__a" in cls:
if self._cur:
self._flush()
self._cur = {"href": self._href(attrs), "title": "", "snippet": ""}
self._in_title = True
self._title_buf = []
elif "result__snippet" in cls and self._cur is not None:
self._in_snippet = True
self._snippet_buf = []
def handle_endtag(self, tag: str):
if tag != "a":
return
if self._in_title:
self._cur and self._cur.update(
title=" ".join("".join(self._title_buf).split())
)
self._in_title = False
elif self._in_snippet:
self._cur and self._cur.update(
snippet=" ".join("".join(self._snippet_buf).split())
)
self._in_snippet = False
def handle_data(self, data: str):
if self._in_title:
self._title_buf.append(data)
elif self._in_snippet:
self._snippet_buf.append(data)
def _flush(self):
if self._cur and self._cur.get("href"):
self.results.append(self._cur)
self._cur = None
def close(self) -> None:
if self._cur:
self._flush()
super().close()
def parse_ddg_html(htmltxt: str) -> list[dict]:
"""Parsea el HTML del endpoint `html.duckduckgo.com`."""
p = _DDGParser()
try:
p.feed(htmltxt)
p.close()
except Exception as e:
log(f"DDG parser failed: {e}")
out: list[dict] = []
seen: set[str] = set()
for i, r in enumerate(p.results):
url = decode_ddg_href(r.get("href") or "")
if not url or not url.startswith(("http://", "https://")):
continue
if url in seen:
continue
seen.add(url)
out.append({
"url": url,
"title": r.get("title") or "",
"snippet": r.get("snippet") or "",
"rank": len(out) + 1,
})
return out
class _DDGLiteParser(HTMLParser):
"""Parser para `lite.duckduckgo.com/lite/`.
Estructura tipica:
<a rel="nofollow" href="<URL>" class='result-link'>title</a>
...
<td class='result-snippet'>snippet text</td>
Los snippets vienen DESPUES del enlace (no hijo del mismo elemento),
asi que parea por orden: cada `result-link` consume el siguiente
`result-snippet`.
"""
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.results: list[dict] = []
self._in_link = False
self._in_snippet = False
self._cur_href = ""
self._title_buf: list[str] = []
self._snippet_buf: list[str] = []
self._pending_snippet_for: int | None = None
def _attrs_dict(self, attrs):
return {k: (v or "") for k, v in attrs}
def handle_starttag(self, tag: str, attrs):
a = self._attrs_dict(attrs)
cls = a.get("class", "")
if tag == "a" and "result-link" in cls:
href = a.get("href", "")
self._in_link = True
self._cur_href = href
self._title_buf = []
elif tag == "td" and "result-snippet" in cls:
self._in_snippet = True
self._snippet_buf = []
def handle_endtag(self, tag: str):
if self._in_link and tag == "a":
title = " ".join("".join(self._title_buf).split())
self.results.append({
"href": self._cur_href,
"title": title,
"snippet": "",
})
self._pending_snippet_for = len(self.results) - 1
self._in_link = False
elif self._in_snippet and tag == "td":
snippet = " ".join("".join(self._snippet_buf).split())
if self._pending_snippet_for is not None:
self.results[self._pending_snippet_for]["snippet"] = snippet
self._pending_snippet_for = None
self._in_snippet = False
def handle_data(self, data: str):
if self._in_link:
self._title_buf.append(data)
elif self._in_snippet:
self._snippet_buf.append(data)
def parse_ddg_lite(htmltxt: str) -> list[dict]:
"""Parsea el HTML del endpoint `lite.duckduckgo.com/lite/`."""
p = _DDGLiteParser()
try:
p.feed(htmltxt)
p.close()
except Exception as e:
log(f"DDG lite parser failed: {e}")
out: list[dict] = []
seen: set[str] = set()
for r in p.results:
href = r.get("href") or ""
# lite envia URLs absolutas directas; aun asi pasamos por
# decode_ddg_href por si en algun caso DDG envuelve.
url = decode_ddg_href(href)
if not url or not url.startswith(("http://", "https://")):
continue
# Excluir auto-promociones de DDG (paginas de ayuda).
if "duckduckgo.com/duckduckgo-help-pages/" in url:
continue
if url in seen:
continue
seen.add(url)
out.append({
"url": url,
"title": r.get("title") or "",
"snippet": r.get("snippet") or "",
"rank": len(out) + 1,
})
return out
def find_url_entity(conn: sqlite3.Connection, url: str) -> str | None:
"""Busca un nodo Url existente con la misma url en metadata."""
cur = conn.execute(
"SELECT id, metadata FROM entities WHERE type_ref='Url'"
)
for row in cur:
meta_raw = row[1] or "{}"
try:
meta = json.loads(meta_raw)
except Exception:
continue
if isinstance(meta, dict) and meta.get("url") == url:
return row[0]
return None
def insert_url_entity(conn: sqlite3.Connection, url: str, title: str,
snippet: str, rank: int, query: str) -> str:
"""Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca."""
existing = find_url_entity(conn, url)
ts = now_iso()
meta = {
"url": url,
"title": title,
"snippet": snippet,
"rank": rank,
"query": query,
"engine": "duckduckgo",
"found_at": ts,
}
meta_json = json.dumps(meta, ensure_ascii=False)
if existing:
conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(meta_json, ts, existing),
)
return existing
new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}"
name = title[:200] if title else url[:200]
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name=? LIMIT 1",
(from_id, to_id, name),
)
return cur.fetchone() is not None
_REL_COUNTER = 0
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
global _REL_COUNTER
if relation_exists(conn, from_id, to_id, name):
return False
ts = now_iso()
_REL_COUNTER += 1
rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
params = ctx.get("params") or {}
limit = int(params.get("limit", 10))
region = (params.get("region") or "").strip()
safe = (params.get("safe") or "moderate").strip()
timeout_s = int(params.get("timeout_s", 15))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# Normalizar backslashes a forward slashes — el path puede llegar
# con separadores mezclados desde el lado C++ si fs::path se
# construyo en otro contexto (build cross-platform, copy entre
# Windows y WSL, etc.).
ops_db_path = ops_db_path.replace("\\", "/")
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
# Resolver a absoluto si llega relativo, usando app_dir como
# ancla y cwd como fallback. Sin esto sqlite3 crea un fichero
# vacio si el cwd del subprocess no coincide con el del padre.
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path} (cwd={os.getcwd()})")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"cwd": os.getcwd(),
"entities_added": 0, "relations_added": 0}))
return 7
# Schema check — si no hay tabla entities, el path es incorrecto
# o la operations.db esta sin bootstrappear.
try:
_c = sqlite3.connect(ops_db_path)
try:
row = _c.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='entities'"
).fetchone()
finally:
_c.close()
if not row:
log(f"sin tabla 'entities' en {ops_db_path}")
print(json.dumps({
"error": "operations.db sin tabla 'entities'"
"verifica que graph_explorer haya cargado un "
"proyecto valido antes de lanzar el enricher",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 8
except sqlite3.Error as e:
log(f"sqlite open failed: {e}")
return 9
# Query: prioridad metadata.query > metadata.text > node_name.
query = (metadata.get("query") or metadata.get("text") or node_name).strip()
if not query:
log("nodo sin query (metadata.query / metadata.text / name)")
return 2
progress(0.10, "fetching")
try:
htmltxt, source = fetch_ddg(query, timeout=timeout_s,
region=region, safe=safe)
except Exception as e:
log(f"DDG fetch failed: {e}")
print(json.dumps({"error": str(e), "query": query,
"entities_added": 0, "relations_added": 0}))
return 4
if is_anomaly_page(htmltxt):
log("DDG devolvio challenge captcha en ambos endpoints — "
"usar web_search_cdp (issue 0029) para resolver")
print(json.dumps({
"error": "DDG bot challenge — captcha required",
"query": query,
"engine": "duckduckgo",
"source": source,
"results": 0,
"entities_added": 0,
"relations_added": 0,
}, ensure_ascii=False))
return 4
progress(0.55, "parsing")
# El parser se elige por contenido — si el endpoint y el markup no
# coinciden (tests con stub que sirve cualquier URL, o un cambio
# futuro de DDG), aun extraemos resultados. Probamos ambos y nos
# quedamos con el que devuelva mas.
results_lite = parse_ddg_lite(htmltxt) if "result-link" in htmltxt else []
results_html = parse_ddg_html(htmltxt) if "result__a" in htmltxt else []
results = results_lite if len(results_lite) >= len(results_html) else results_html
if limit > 0:
results = results[:limit]
log(f"DDG ({source}) returned {len(results)} results "
f"(lite_parsed={len(results_lite)} html_parsed={len(results_html)})")
progress(0.80, "applying")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
try:
for r in results:
existed = find_url_entity(conn, r["url"]) is not None
url_id = insert_url_entity(
conn,
url=r["url"],
title=r["title"],
snippet=r["snippet"],
rank=r["rank"],
query=query,
)
if not existed:
entities_added += 1
if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"query": query,
"engine": "duckduckgo",
"results": len(results),
"entities_added": entities_added,
"relations_added": relations_added,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())