Files
graph_explorer/enrichers/web_search/run.py
T
egutierrez 52495af779 feat(0035e): manifest auto_group_threshold override + propagacion a Python
Manifest YAML puede declarar 'auto_group_threshold: <int>' a nivel
top-level. enrichers.cpp lo parsea y lo guarda en EnricherSpec.
jobs.cpp lo inyecta como campo opcional 'auto_group_threshold' en el
JSON stdin del subprocess. Los enrichers Python que crean Groups
(web_search, split_words, split_sentences, extract_iocs_text) leen el
campo y, si viene > 0, lo usan en lugar de su DEFAULT_GROUP_THRESHOLD.
Helper _coerce_threshold tolera int / str / None / 0 cayendo al default.
2026-05-04 14:20:52 +02:00

716 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""Enricher web_search — busca en DuckDuckGo HTML y crea nodos Url.
Wire protocol estandar (issue 0026):
- stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir,
cache_dir, registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen.
- exit code 0 = ok, !=0 = error.
DDG endpoints usados:
1. https://lite.duckduckgo.com/lite/ (POST) — endpoint primario.
HTML minimo (ano 2009-style), tabla con `<a class='result-link'>` y
`<td class='result-snippet'>`. Es el menos agresivo con bot
detection; suele responder 200 cuando el endpoint `html.` ya
devuelve un challenge "anomaly" desde IPs residenciales/Windows.
2. https://html.duckduckgo.com/html/ (POST) — fallback. Su parser
usa `result__a` / `result__snippet`. DDG envuelve los enlaces en
`//duckduckgo.com/l/?uddg=<encoded>` que hay que decodificar.
Si ambos endpoints devuelven la pagina anti-bot ("anomaly", challenge
captcha), el enricher emite un error claro indicando que se necesita
`web_search_cdp` (issue 0029) — el fallback simple zero-infra no puede
resolver el challenge.
"""
from __future__ import annotations
import html
import json
import os
import re
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timezone
from html.parser import HTMLParser
from urllib.parse import parse_qs, unquote, urlparse
# Issue 0035c — agrupacion automatica de resultados.
#
# Cuando un enricher produce >= GROUP_THRESHOLD resultados, los primeros
# GROUP_PREVIEW_K quedan sueltos colgando del source (estilo
# Twitter/Reddit timeline) y los N-K restantes entran en un nodo Group
# cuadrado. El manifest puede declarar `auto_group_threshold` para
# overridear el default; mas adelante settings UI permitira override
# global. Por ahora esta hardcoded.
DEFAULT_GROUP_THRESHOLD = 50
GROUP_PREVIEW_K = 10
def _coerce_threshold(raw, default: int) -> int:
"""Acepta int / str numerico / None, devuelve >0 o el default.
Issue 0035e: el manifest puede declarar `auto_group_threshold: <int>`
y jobs.cpp lo propaga al subprocess. Cualquier otro valor (None,
"", 0, no parseable) cae al default global.
"""
if raw is None or raw == "":
return default
try:
v = int(raw)
except (TypeError, ValueError):
return default
return v if v > 0 else default
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def _ddg_post(url: str, params: dict, headers: dict, timeout: int) -> str:
try:
import requests # type: ignore
r = requests.post(url, data=params, headers=headers, timeout=timeout)
return r.text
except ImportError:
from urllib.parse import urlencode
from urllib.request import Request, urlopen
body = urlencode(params).encode()
req = Request(url, data=body, headers=headers)
with urlopen(req, timeout=timeout) as resp: # type: ignore
return resp.read().decode("utf-8", errors="replace")
def is_anomaly_page(htmltxt: str) -> bool:
"""Detecta la pagina anti-bot de DDG (challenge captcha)."""
s = htmltxt.lower()
return "anomaly" in s and "challenge" in s
def fetch_ddg(query: str, timeout: int, region: str, safe: str) -> tuple[str, str]:
"""Descarga la pagina de resultados de DuckDuckGo.
Intenta primero `lite.duckduckgo.com/lite/` (HTML minimo, ano-2009
style, mucho menos agresivo con bot detection que `html.`). Si
ese endpoint devuelve la pagina anti-bot, cae al endpoint `html.`.
Devuelve `(html, source)` donde source ∈ {"lite", "html"}.
"""
params = {"q": query}
if region:
params["kl"] = region
safe_map = {"strict": "1", "moderate": "-1", "off": "-2"}
if safe in safe_map:
params["kp"] = safe_map[safe]
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.7",
}
htmltxt = _ddg_post("https://lite.duckduckgo.com/lite/", params,
headers, timeout)
if not is_anomaly_page(htmltxt):
return htmltxt, "lite"
log("lite endpoint devolvio challenge — fallback a html endpoint")
htmltxt = _ddg_post("https://html.duckduckgo.com/html/", params,
headers, timeout)
return htmltxt, "html"
def decode_ddg_href(href: str) -> str:
"""Decodifica el href de DDG, que envuelve la URL real en `uddg=`.
Formatos posibles:
//duckduckgo.com/l/?uddg=https%3A...&rut=...
/l/?uddg=https%3A...
https://example.com/... (raro, pero ocurre con anuncios o cuando DDG
no envuelve)
"""
if not href:
return ""
if href.startswith("//"):
href = "https:" + href
elif href.startswith("/l/"):
href = "https://duckduckgo.com" + href
try:
u = urlparse(href)
if u.netloc.endswith("duckduckgo.com") and u.path == "/l/":
qs = parse_qs(u.query)
target = qs.get("uddg", [""])[0]
if target:
return unquote(target)
except Exception:
pass
return href
class _DDGParser(HTMLParser):
"""Extrae resultados (anchor + snippet + rank) del HTML de DDG.
No intenta ser completo — solo busca `<a class="result__a">` para el
titulo/url y `<a class="result__snippet">` (o el div equivalente)
para el texto. Es robusto a cambios menores: si DDG renombra clases,
el enricher devolvera 0 resultados pero no peta.
"""
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.results: list[dict] = []
self._cur: dict | None = None
self._in_title = False
self._in_snippet = False
self._title_buf: list[str] = []
self._snippet_buf: list[str] = []
def _classes(self, attrs: list[tuple[str, str | None]]) -> set[str]:
for k, v in attrs:
if k == "class" and v:
return set(v.split())
return set()
def _href(self, attrs: list[tuple[str, str | None]]) -> str:
for k, v in attrs:
if k == "href" and v:
return v
return ""
def handle_starttag(self, tag: str, attrs):
if tag != "a":
return
cls = self._classes(attrs)
if "result__a" in cls:
if self._cur:
self._flush()
self._cur = {"href": self._href(attrs), "title": "", "snippet": ""}
self._in_title = True
self._title_buf = []
elif "result__snippet" in cls and self._cur is not None:
self._in_snippet = True
self._snippet_buf = []
def handle_endtag(self, tag: str):
if tag != "a":
return
if self._in_title:
self._cur and self._cur.update(
title=" ".join("".join(self._title_buf).split())
)
self._in_title = False
elif self._in_snippet:
self._cur and self._cur.update(
snippet=" ".join("".join(self._snippet_buf).split())
)
self._in_snippet = False
def handle_data(self, data: str):
if self._in_title:
self._title_buf.append(data)
elif self._in_snippet:
self._snippet_buf.append(data)
def _flush(self):
if self._cur and self._cur.get("href"):
self.results.append(self._cur)
self._cur = None
def close(self) -> None:
if self._cur:
self._flush()
super().close()
def parse_ddg_html(htmltxt: str) -> list[dict]:
"""Parsea el HTML del endpoint `html.duckduckgo.com`."""
p = _DDGParser()
try:
p.feed(htmltxt)
p.close()
except Exception as e:
log(f"DDG parser failed: {e}")
out: list[dict] = []
seen: set[str] = set()
for i, r in enumerate(p.results):
url = decode_ddg_href(r.get("href") or "")
if not url or not url.startswith(("http://", "https://")):
continue
if url in seen:
continue
seen.add(url)
out.append({
"url": url,
"title": r.get("title") or "",
"snippet": r.get("snippet") or "",
"rank": len(out) + 1,
})
return out
class _DDGLiteParser(HTMLParser):
"""Parser para `lite.duckduckgo.com/lite/`.
Estructura tipica:
<a rel="nofollow" href="<URL>" class='result-link'>title</a>
...
<td class='result-snippet'>snippet text</td>
Los snippets vienen DESPUES del enlace (no hijo del mismo elemento),
asi que parea por orden: cada `result-link` consume el siguiente
`result-snippet`.
"""
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.results: list[dict] = []
self._in_link = False
self._in_snippet = False
self._cur_href = ""
self._title_buf: list[str] = []
self._snippet_buf: list[str] = []
self._pending_snippet_for: int | None = None
def _attrs_dict(self, attrs):
return {k: (v or "") for k, v in attrs}
def handle_starttag(self, tag: str, attrs):
a = self._attrs_dict(attrs)
cls = a.get("class", "")
if tag == "a" and "result-link" in cls:
href = a.get("href", "")
self._in_link = True
self._cur_href = href
self._title_buf = []
elif tag == "td" and "result-snippet" in cls:
self._in_snippet = True
self._snippet_buf = []
def handle_endtag(self, tag: str):
if self._in_link and tag == "a":
title = " ".join("".join(self._title_buf).split())
self.results.append({
"href": self._cur_href,
"title": title,
"snippet": "",
})
self._pending_snippet_for = len(self.results) - 1
self._in_link = False
elif self._in_snippet and tag == "td":
snippet = " ".join("".join(self._snippet_buf).split())
if self._pending_snippet_for is not None:
self.results[self._pending_snippet_for]["snippet"] = snippet
self._pending_snippet_for = None
self._in_snippet = False
def handle_data(self, data: str):
if self._in_link:
self._title_buf.append(data)
elif self._in_snippet:
self._snippet_buf.append(data)
def parse_ddg_lite(htmltxt: str) -> list[dict]:
"""Parsea el HTML del endpoint `lite.duckduckgo.com/lite/`."""
p = _DDGLiteParser()
try:
p.feed(htmltxt)
p.close()
except Exception as e:
log(f"DDG lite parser failed: {e}")
out: list[dict] = []
seen: set[str] = set()
for r in p.results:
href = r.get("href") or ""
# lite envia URLs absolutas directas; aun asi pasamos por
# decode_ddg_href por si en algun caso DDG envuelve.
url = decode_ddg_href(href)
if not url or not url.startswith(("http://", "https://")):
continue
# Excluir auto-promociones de DDG (paginas de ayuda).
if "duckduckgo.com/duckduckgo-help-pages/" in url:
continue
if url in seen:
continue
seen.add(url)
out.append({
"url": url,
"title": r.get("title") or "",
"snippet": r.get("snippet") or "",
"rank": len(out) + 1,
})
return out
def find_url_entity(conn: sqlite3.Connection, url: str) -> str | None:
"""Busca un nodo Url existente con la misma url en metadata."""
cur = conn.execute(
"SELECT id, metadata FROM entities WHERE type_ref='Url'"
)
for row in cur:
meta_raw = row[1] or "{}"
try:
meta = json.loads(meta_raw)
except Exception:
continue
if isinstance(meta, dict) and meta.get("url") == url:
return row[0]
return None
def has_group_id_column(conn: sqlite3.Connection) -> bool:
"""Detecta si la columna `group_id` existe en `entities`.
El proyecto graph_explorer la añade via migracion (issue 0035a),
pero podriamos correr contra una BD vieja. Si no esta, insertamos
sin esa columna (resultados sueltos pero con `batch_id` en metadata).
"""
try:
cur = conn.execute("PRAGMA table_info(entities)")
for row in cur:
if row[1] == "group_id":
return True
except sqlite3.Error:
pass
return False
def insert_url_entity(conn: sqlite3.Connection, url: str, title: str,
snippet: str, rank: int, query: str,
batch_id: str = "",
group_id: str | None = None,
has_group_col: bool = False) -> str:
"""Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca.
`batch_id` se inyecta en metadata si no esta vacio. `group_id` se
escribe en la columna homonima cuando existe en el schema y se ha
pasado un valor; si no, queda NULL (nodo suelto).
"""
existing = find_url_entity(conn, url)
ts = now_iso()
meta = {
"url": url,
"title": title,
"snippet": snippet,
"rank": rank,
"query": query,
"engine": "duckduckgo",
"found_at": ts,
}
if batch_id:
meta["batch_id"] = batch_id
meta_json = json.dumps(meta, ensure_ascii=False)
if existing:
# Si la entidad ya existia, mantenemos su group_id actual (no
# lo machacamos): un mismo Url puede aparecer en multiples
# busquedas y el primer Group que lo capturo gana. Solo
# actualizamos metadata + updated_at.
conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(meta_json, ts, existing),
)
return existing
new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}"
name = title[:200] if title else url[:200]
if has_group_col:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" group_id, created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?, ?)",
(new_id, name, meta_json, group_id, ts, ts),
)
else:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
def insert_group_entity(conn: sqlite3.Connection, *, query: str,
count: int, batch_id: str) -> str:
"""Crea un nodo Group para los resultados restantes de una busqueda.
Devuelve el id del Group recien creado.
"""
ts = now_iso()
new_id = f"Group_{now_ms()}_{abs(hash(query + batch_id)) % 100000}"
name = f"web_search: {query} ({count})"
meta = {
"enricher": "web_search",
"query": query,
"count": count,
"batch_id": batch_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Group', 'enricher:web_search', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name=? LIMIT 1",
(from_id, to_id, name),
)
return cur.fetchone() is not None
_REL_COUNTER = 0
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
global _REL_COUNTER
if relation_exists(conn, from_id, to_id, name):
return False
ts = now_iso()
_REL_COUNTER += 1
rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
params = ctx.get("params") or {}
limit = int(params.get("limit", 10))
region = (params.get("region") or "").strip()
safe = (params.get("safe") or "moderate").strip()
timeout_s = int(params.get("timeout_s", 15))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# Normalizar backslashes a forward slashes — el path puede llegar
# con separadores mezclados desde el lado C++ si fs::path se
# construyo en otro contexto (build cross-platform, copy entre
# Windows y WSL, etc.).
ops_db_path = ops_db_path.replace("\\", "/")
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
# Resolver a absoluto si llega relativo, usando app_dir como
# ancla y cwd como fallback. Sin esto sqlite3 crea un fichero
# vacio si el cwd del subprocess no coincide con el del padre.
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path} (cwd={os.getcwd()})")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"cwd": os.getcwd(),
"entities_added": 0, "relations_added": 0}))
return 7
# Schema check — si no hay tabla entities, el path es incorrecto
# o la operations.db esta sin bootstrappear.
try:
_c = sqlite3.connect(ops_db_path)
try:
row = _c.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='entities'"
).fetchone()
finally:
_c.close()
if not row:
log(f"sin tabla 'entities' en {ops_db_path}")
print(json.dumps({
"error": "operations.db sin tabla 'entities'"
"verifica que graph_explorer haya cargado un "
"proyecto valido antes de lanzar el enricher",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 8
except sqlite3.Error as e:
log(f"sqlite open failed: {e}")
return 9
# Query: prioridad metadata.query > metadata.text > node_name.
query = (metadata.get("query") or metadata.get("text") or node_name).strip()
if not query:
log("nodo sin query (metadata.query / metadata.text / name)")
return 2
progress(0.10, "fetching")
try:
htmltxt, source = fetch_ddg(query, timeout=timeout_s,
region=region, safe=safe)
except Exception as e:
log(f"DDG fetch failed: {e}")
print(json.dumps({"error": str(e), "query": query,
"entities_added": 0, "relations_added": 0}))
return 4
if is_anomaly_page(htmltxt):
log("DDG devolvio challenge captcha en ambos endpoints — "
"usar web_search_cdp (issue 0029) para resolver")
print(json.dumps({
"error": "DDG bot challenge — captcha required",
"query": query,
"engine": "duckduckgo",
"source": source,
"results": 0,
"entities_added": 0,
"relations_added": 0,
}, ensure_ascii=False))
return 4
progress(0.55, "parsing")
# El parser se elige por contenido — si el endpoint y el markup no
# coinciden (tests con stub que sirve cualquier URL, o un cambio
# futuro de DDG), aun extraemos resultados. Probamos ambos y nos
# quedamos con el que devuelva mas.
results_lite = parse_ddg_lite(htmltxt) if "result-link" in htmltxt else []
results_html = parse_ddg_html(htmltxt) if "result__a" in htmltxt else []
results = results_lite if len(results_lite) >= len(results_html) else results_html
if limit > 0:
results = results[:limit]
log(f"DDG ({source}) returned {len(results)} results "
f"(lite_parsed={len(results_lite)} html_parsed={len(results_html)})")
progress(0.80, "applying")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
group_id: str | None = None
batch_id = uuid.uuid4().hex
try:
has_group_col = has_group_id_column(conn)
n_total = len(results)
# Threshold: el manifest puede declarar `auto_group_threshold` y
# jobs.cpp lo propaga via stdin (issue 0035e). Si no viene, se
# usa el default interno del enricher.
threshold = _coerce_threshold(ctx.get("auto_group_threshold"),
DEFAULT_GROUP_THRESHOLD)
if n_total >= threshold and has_group_col:
# Modo Twitter/Reddit: K sueltos + Group con N-K hijos.
group_id = insert_group_entity(
conn, query=query, count=n_total, batch_id=batch_id,
)
entities_added += 1
if insert_relation(conn, group_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
preview = results[:GROUP_PREVIEW_K]
grouped = results[GROUP_PREVIEW_K:]
else:
# Comportamiento clasico: todo suelto, sin Group.
preview = results
grouped = []
for r in preview:
existed = find_url_entity(conn, r["url"]) is not None
url_id = insert_url_entity(
conn,
url=r["url"],
title=r["title"],
snippet=r["snippet"],
rank=r["rank"],
query=query,
batch_id=batch_id,
group_id=None,
has_group_col=has_group_col,
)
if not existed:
entities_added += 1
if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
for r in grouped:
existed = find_url_entity(conn, r["url"]) is not None
url_id = insert_url_entity(
conn,
url=r["url"],
title=r["title"],
snippet=r["snippet"],
rank=r["rank"],
query=query,
batch_id=batch_id,
group_id=group_id,
has_group_col=has_group_col,
)
if not existed:
entities_added += 1
# La procedencia es la relacion al source original, no al
# grupo — el grupo es solo un contenedor visual.
if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"query": query,
"engine": "duckduckgo",
"results": len(results),
"entities_added": entities_added,
"relations_added": relations_added,
"batch_id": batch_id,
"group_id": group_id or "",
"grouped": bool(group_id),
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())