#!/usr/bin/env python3 """Enricher web_search — busca en DuckDuckGo HTML y crea nodos Url. Wire protocol estandar (issue 0026): - stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir, cache_dir, registry_root, params. - stderr: lineas `PROGRESS: ` para feedback de UI. - stdout: una linea JSON al final con resumen. - exit code 0 = ok, !=0 = error. DDG endpoints usados: 1. https://lite.duckduckgo.com/lite/ (POST) — endpoint primario. HTML minimo (ano 2009-style), tabla con `` y ``. Es el menos agresivo con bot detection; suele responder 200 cuando el endpoint `html.` ya devuelve un challenge "anomaly" desde IPs residenciales/Windows. 2. https://html.duckduckgo.com/html/ (POST) — fallback. Su parser usa `result__a` / `result__snippet`. DDG envuelve los enlaces en `//duckduckgo.com/l/?uddg=` que hay que decodificar. Si ambos endpoints devuelven la pagina anti-bot ("anomaly", challenge captcha), el enricher emite un error claro indicando que se necesita `web_search_cdp` (issue 0029) — el fallback simple zero-infra no puede resolver el challenge. """ from __future__ import annotations import html import json import os import re import sqlite3 import sys import time from datetime import datetime, timezone from html.parser import HTMLParser from urllib.parse import parse_qs, unquote, urlparse def progress(p: float, stage: str = "") -> None: sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.flush() def log(msg: str) -> None: sys.stderr.write(f"{msg}\n") sys.stderr.flush() def now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def now_ms() -> int: return int(time.time() * 1000) def _ddg_post(url: str, params: dict, headers: dict, timeout: int) -> str: try: import requests # type: ignore r = requests.post(url, data=params, headers=headers, timeout=timeout) return r.text except ImportError: from urllib.parse import urlencode from urllib.request import Request, urlopen body = urlencode(params).encode() req = Request(url, data=body, headers=headers) with urlopen(req, timeout=timeout) as resp: # type: ignore return resp.read().decode("utf-8", errors="replace") def is_anomaly_page(htmltxt: str) -> bool: """Detecta la pagina anti-bot de DDG (challenge captcha).""" s = htmltxt.lower() return "anomaly" in s and "challenge" in s def fetch_ddg(query: str, timeout: int, region: str, safe: str) -> tuple[str, str]: """Descarga la pagina de resultados de DuckDuckGo. Intenta primero `lite.duckduckgo.com/lite/` (HTML minimo, ano-2009 style, mucho menos agresivo con bot detection que `html.`). Si ese endpoint devuelve la pagina anti-bot, cae al endpoint `html.`. Devuelve `(html, source)` donde source ∈ {"lite", "html"}. """ params = {"q": query} if region: params["kl"] = region safe_map = {"strict": "1", "moderate": "-1", "off": "-2"} if safe in safe_map: params["kp"] = safe_map[safe] headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.7", } htmltxt = _ddg_post("https://lite.duckduckgo.com/lite/", params, headers, timeout) if not is_anomaly_page(htmltxt): return htmltxt, "lite" log("lite endpoint devolvio challenge — fallback a html endpoint") htmltxt = _ddg_post("https://html.duckduckgo.com/html/", params, headers, timeout) return htmltxt, "html" def decode_ddg_href(href: str) -> str: """Decodifica el href de DDG, que envuelve la URL real en `uddg=`. Formatos posibles: //duckduckgo.com/l/?uddg=https%3A...&rut=... /l/?uddg=https%3A... https://example.com/... (raro, pero ocurre con anuncios o cuando DDG no envuelve) """ if not href: return "" if href.startswith("//"): href = "https:" + href elif href.startswith("/l/"): href = "https://duckduckgo.com" + href try: u = urlparse(href) if u.netloc.endswith("duckduckgo.com") and u.path == "/l/": qs = parse_qs(u.query) target = qs.get("uddg", [""])[0] if target: return unquote(target) except Exception: pass return href class _DDGParser(HTMLParser): """Extrae resultados (anchor + snippet + rank) del HTML de DDG. No intenta ser completo — solo busca `` para el titulo/url y `` (o el div equivalente) para el texto. Es robusto a cambios menores: si DDG renombra clases, el enricher devolvera 0 resultados pero no peta. """ def __init__(self) -> None: super().__init__(convert_charrefs=True) self.results: list[dict] = [] self._cur: dict | None = None self._in_title = False self._in_snippet = False self._title_buf: list[str] = [] self._snippet_buf: list[str] = [] def _classes(self, attrs: list[tuple[str, str | None]]) -> set[str]: for k, v in attrs: if k == "class" and v: return set(v.split()) return set() def _href(self, attrs: list[tuple[str, str | None]]) -> str: for k, v in attrs: if k == "href" and v: return v return "" def handle_starttag(self, tag: str, attrs): if tag != "a": return cls = self._classes(attrs) if "result__a" in cls: if self._cur: self._flush() self._cur = {"href": self._href(attrs), "title": "", "snippet": ""} self._in_title = True self._title_buf = [] elif "result__snippet" in cls and self._cur is not None: self._in_snippet = True self._snippet_buf = [] def handle_endtag(self, tag: str): if tag != "a": return if self._in_title: self._cur and self._cur.update( title=" ".join("".join(self._title_buf).split()) ) self._in_title = False elif self._in_snippet: self._cur and self._cur.update( snippet=" ".join("".join(self._snippet_buf).split()) ) self._in_snippet = False def handle_data(self, data: str): if self._in_title: self._title_buf.append(data) elif self._in_snippet: self._snippet_buf.append(data) def _flush(self): if self._cur and self._cur.get("href"): self.results.append(self._cur) self._cur = None def close(self) -> None: if self._cur: self._flush() super().close() def parse_ddg_html(htmltxt: str) -> list[dict]: """Parsea el HTML del endpoint `html.duckduckgo.com`.""" p = _DDGParser() try: p.feed(htmltxt) p.close() except Exception as e: log(f"DDG parser failed: {e}") out: list[dict] = [] seen: set[str] = set() for i, r in enumerate(p.results): url = decode_ddg_href(r.get("href") or "") if not url or not url.startswith(("http://", "https://")): continue if url in seen: continue seen.add(url) out.append({ "url": url, "title": r.get("title") or "", "snippet": r.get("snippet") or "", "rank": len(out) + 1, }) return out class _DDGLiteParser(HTMLParser): """Parser para `lite.duckduckgo.com/lite/`. Estructura tipica: title ... snippet text Los snippets vienen DESPUES del enlace (no hijo del mismo elemento), asi que parea por orden: cada `result-link` consume el siguiente `result-snippet`. """ def __init__(self) -> None: super().__init__(convert_charrefs=True) self.results: list[dict] = [] self._in_link = False self._in_snippet = False self._cur_href = "" self._title_buf: list[str] = [] self._snippet_buf: list[str] = [] self._pending_snippet_for: int | None = None def _attrs_dict(self, attrs): return {k: (v or "") for k, v in attrs} def handle_starttag(self, tag: str, attrs): a = self._attrs_dict(attrs) cls = a.get("class", "") if tag == "a" and "result-link" in cls: href = a.get("href", "") self._in_link = True self._cur_href = href self._title_buf = [] elif tag == "td" and "result-snippet" in cls: self._in_snippet = True self._snippet_buf = [] def handle_endtag(self, tag: str): if self._in_link and tag == "a": title = " ".join("".join(self._title_buf).split()) self.results.append({ "href": self._cur_href, "title": title, "snippet": "", }) self._pending_snippet_for = len(self.results) - 1 self._in_link = False elif self._in_snippet and tag == "td": snippet = " ".join("".join(self._snippet_buf).split()) if self._pending_snippet_for is not None: self.results[self._pending_snippet_for]["snippet"] = snippet self._pending_snippet_for = None self._in_snippet = False def handle_data(self, data: str): if self._in_link: self._title_buf.append(data) elif self._in_snippet: self._snippet_buf.append(data) def parse_ddg_lite(htmltxt: str) -> list[dict]: """Parsea el HTML del endpoint `lite.duckduckgo.com/lite/`.""" p = _DDGLiteParser() try: p.feed(htmltxt) p.close() except Exception as e: log(f"DDG lite parser failed: {e}") out: list[dict] = [] seen: set[str] = set() for r in p.results: href = r.get("href") or "" # lite envia URLs absolutas directas; aun asi pasamos por # decode_ddg_href por si en algun caso DDG envuelve. url = decode_ddg_href(href) if not url or not url.startswith(("http://", "https://")): continue # Excluir auto-promociones de DDG (paginas de ayuda). if "duckduckgo.com/duckduckgo-help-pages/" in url: continue if url in seen: continue seen.add(url) out.append({ "url": url, "title": r.get("title") or "", "snippet": r.get("snippet") or "", "rank": len(out) + 1, }) return out def find_url_entity(conn: sqlite3.Connection, url: str) -> str | None: """Busca un nodo Url existente con la misma url en metadata.""" cur = conn.execute( "SELECT id, metadata FROM entities WHERE type_ref='Url'" ) for row in cur: meta_raw = row[1] or "{}" try: meta = json.loads(meta_raw) except Exception: continue if isinstance(meta, dict) and meta.get("url") == url: return row[0] return None def insert_url_entity(conn: sqlite3.Connection, url: str, title: str, snippet: str, rank: int, query: str) -> str: """Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca.""" existing = find_url_entity(conn, url) ts = now_iso() meta = { "url": url, "title": title, "snippet": snippet, "rank": rank, "query": query, "engine": "duckduckgo", "found_at": ts, } meta_json = json.dumps(meta, ensure_ascii=False) if existing: conn.execute( "UPDATE entities SET metadata=?, updated_at=? WHERE id=?", (meta_json, ts, existing), ) return existing new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}" name = title[:200] if title else url[:200] conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) " "VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)", (new_id, name, meta_json, ts, ts), ) return new_id def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: cur = conn.execute( "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " "AND name=? LIMIT 1", (from_id, to_id, name), ) return cur.fetchone() is not None _REL_COUNTER = 0 def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: global _REL_COUNTER if relation_exists(conn, from_id, to_id, name): return False ts = now_iso() _REL_COUNTER += 1 rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}" conn.execute( "INSERT INTO relations (id, name, from_entity, to_entity, " " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", (rel_id, name, from_id, to_id, ts, ts), ) return True def main() -> int: raw = sys.stdin.read() try: ctx = json.loads(raw) except Exception as e: log(f"stdin not valid JSON: {e}") return 2 node_id = ctx.get("node_id") or "" node_name = (ctx.get("node_name") or "").strip() metadata = ctx.get("metadata") or {} if isinstance(metadata, str): try: metadata = json.loads(metadata) except Exception: metadata = {} ops_db_path = ctx.get("ops_db_path") or "" params = ctx.get("params") or {} limit = int(params.get("limit", 10)) region = (params.get("region") or "").strip() safe = (params.get("safe") or "moderate").strip() timeout_s = int(params.get("timeout_s", 15)) if not node_id or not ops_db_path: log("missing node_id / ops_db_path") return 2 # Normalizar backslashes a forward slashes — el path puede llegar # con separadores mezclados desde el lado C++ si fs::path se # construyo en otro contexto (build cross-platform, copy entre # Windows y WSL, etc.). ops_db_path = ops_db_path.replace("\\", "/") app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/") # Resolver a absoluto si llega relativo, usando app_dir como # ancla y cwd como fallback. Sin esto sqlite3 crea un fichero # vacio si el cwd del subprocess no coincide con el del padre. if not os.path.isabs(ops_db_path): if app_dir_raw and os.path.isdir(app_dir_raw): cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path)) if os.path.exists(cand): ops_db_path = cand if not os.path.isabs(ops_db_path): ops_db_path = os.path.abspath(ops_db_path) if not os.path.exists(ops_db_path): log(f"ops_db_path no existe: {ops_db_path} (cwd={os.getcwd()})") print(json.dumps({"error": "ops_db not found", "ops_db_path": ops_db_path, "cwd": os.getcwd(), "entities_added": 0, "relations_added": 0})) return 7 # Schema check — si no hay tabla entities, el path es incorrecto # o la operations.db esta sin bootstrappear. try: _c = sqlite3.connect(ops_db_path) try: row = _c.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='entities'" ).fetchone() finally: _c.close() if not row: log(f"sin tabla 'entities' en {ops_db_path}") print(json.dumps({ "error": "operations.db sin tabla 'entities' — " "verifica que graph_explorer haya cargado un " "proyecto valido antes de lanzar el enricher", "ops_db_path": ops_db_path, "entities_added": 0, "relations_added": 0})) return 8 except sqlite3.Error as e: log(f"sqlite open failed: {e}") return 9 # Query: prioridad metadata.query > metadata.text > node_name. query = (metadata.get("query") or metadata.get("text") or node_name).strip() if not query: log("nodo sin query (metadata.query / metadata.text / name)") return 2 progress(0.10, "fetching") try: htmltxt, source = fetch_ddg(query, timeout=timeout_s, region=region, safe=safe) except Exception as e: log(f"DDG fetch failed: {e}") print(json.dumps({"error": str(e), "query": query, "entities_added": 0, "relations_added": 0})) return 4 if is_anomaly_page(htmltxt): log("DDG devolvio challenge captcha en ambos endpoints — " "usar web_search_cdp (issue 0029) para resolver") print(json.dumps({ "error": "DDG bot challenge — captcha required", "query": query, "engine": "duckduckgo", "source": source, "results": 0, "entities_added": 0, "relations_added": 0, }, ensure_ascii=False)) return 4 progress(0.55, "parsing") # El parser se elige por contenido — si el endpoint y el markup no # coinciden (tests con stub que sirve cualquier URL, o un cambio # futuro de DDG), aun extraemos resultados. Probamos ambos y nos # quedamos con el que devuelva mas. results_lite = parse_ddg_lite(htmltxt) if "result-link" in htmltxt else [] results_html = parse_ddg_html(htmltxt) if "result__a" in htmltxt else [] results = results_lite if len(results_lite) >= len(results_html) else results_html if limit > 0: results = results[:limit] log(f"DDG ({source}) returned {len(results)} results " f"(lite_parsed={len(results_lite)} html_parsed={len(results_html)})") progress(0.80, "applying") conn = sqlite3.connect(ops_db_path) conn.execute("PRAGMA foreign_keys=OFF") entities_added = 0 relations_added = 0 try: for r in results: existed = find_url_entity(conn, r["url"]) is not None url_id = insert_url_entity( conn, url=r["url"], title=r["title"], snippet=r["snippet"], rank=r["rank"], query=query, ) if not existed: entities_added += 1 if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"): relations_added += 1 conn.commit() finally: conn.close() progress(1.0, "done") print(json.dumps({ "query": query, "engine": "duckduckgo", "results": len(results), "entities_added": entities_added, "relations_added": relations_added, }, ensure_ascii=False)) return 0 if __name__ == "__main__": sys.exit(main())