feat(enrichers): web_search DuckDuckGo + tests pytest de los 5 enrichers

Anade enricher web_search aplicable a nodos text/Concept/Topic. Hace
POST a html.duckduckgo.com con la query del nodo, parsea resultados
con HTMLParser stdlib, decodifica el redirect uddg= y crea N nodos
Url con relacion SEARCH_RESULT_OF apuntando al nodo origen.

Encadenable: tras web_search, fetch_webpage sobre cada Url completa
el pipeline search -> fetch -> extract.

Defensa contra ops_db_path mal resuelto: normaliza backslashes,
resuelve relativo contra app_dir, valida que la tabla entities
exista antes de tocar nada (exit codes 7/8/9 con JSON resumen).

Tests pytest (16/16 verde): conftest con operations.db temp +
schema minimo, stub de requests via PYTHONPATH para mockear red.
Cubre los 5 enrichers (extract_domain, fetch_webpage, extract_links,
extract_text_entities, web_search) + sanity check de manifests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 16:10:13 +02:00
parent 0d2450bac5
commit 6919ebfe9c
20 changed files with 1223 additions and 0 deletions
Binary file not shown.
+11
View File
@@ -0,0 +1,11 @@
id: web_search
name: "Web search (DuckDuckGo)"
description: "Busca el nombre del nodo en DuckDuckGo (HTML) y crea N nodos Url con los resultados, conectados al origen con relacion SEARCH_RESULT_OF. Pensado para nodos text/Concept/Topic — el siguiente paso es correr fetch_webpage sobre cada Url resultante."
applies_to: [text, Text, Concept, Topic, Query]
emits: [Url]
relations: [SEARCH_RESULT_OF]
params:
- { name: limit, type: int, default: 10 }
- { name: region, type: string, default: "" }
- { name: safe, type: string, default: "moderate" }
- { name: timeout_s, type: int, default: 15 }
+436
View File
@@ -0,0 +1,436 @@
#!/usr/bin/env python3
"""Enricher web_search — busca en DuckDuckGo HTML y crea nodos Url.
Wire protocol estandar (issue 0026):
- stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir,
cache_dir, registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen.
- exit code 0 = ok, !=0 = error.
DDG endpoint usado: https://html.duckduckgo.com/html/?q=<query>
Devuelve HTML estatico, sin JavaScript. Los enlaces vienen envueltos en
redireccion `//duckduckgo.com/l/?uddg=<encoded>` que hay que decodificar.
Para automatizar busquedas masivas en el futuro (sesion persistente,
cookies, JS, captchas) la fase 2 introducira un enricher `web_search_cdp`
que controle un Chromium remoto via DevTools Protocol. Este es el
fallback simple zero-infra.
"""
from __future__ import annotations
import html
import json
import os
import re
import sqlite3
import sys
import time
from datetime import datetime, timezone
from html.parser import HTMLParser
from urllib.parse import parse_qs, unquote, urlparse
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def fetch_ddg(query: str, timeout: int, region: str, safe: str) -> str:
"""Descarga la pagina HTML de resultados de DuckDuckGo.
El endpoint `html.duckduckgo.com` no requiere JS y respeta los
parametros `kl` (region) y `kp` (safe search: 1 strict, -1 off,
-2 moderate). Inyecta cookie para que el "moderate" se aplique sin
pantalla intermedia.
"""
params = {"q": query}
if region:
params["kl"] = region
safe_map = {"strict": "1", "moderate": "-1", "off": "-2"}
if safe in safe_map:
params["kp"] = safe_map[safe]
headers = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.7",
}
try:
import requests # type: ignore
r = requests.post(
"https://html.duckduckgo.com/html/",
data=params,
headers=headers,
timeout=timeout,
)
return r.text
except ImportError:
from urllib.parse import urlencode
from urllib.request import Request, urlopen
body = urlencode(params).encode()
req = Request("https://html.duckduckgo.com/html/", data=body,
headers=headers)
with urlopen(req, timeout=timeout) as resp: # type: ignore
return resp.read().decode("utf-8", errors="replace")
def decode_ddg_href(href: str) -> str:
"""Decodifica el href de DDG, que envuelve la URL real en `uddg=`.
Formatos posibles:
//duckduckgo.com/l/?uddg=https%3A...&rut=...
/l/?uddg=https%3A...
https://example.com/... (raro, pero ocurre con anuncios o cuando DDG
no envuelve)
"""
if not href:
return ""
if href.startswith("//"):
href = "https:" + href
elif href.startswith("/l/"):
href = "https://duckduckgo.com" + href
try:
u = urlparse(href)
if u.netloc.endswith("duckduckgo.com") and u.path == "/l/":
qs = parse_qs(u.query)
target = qs.get("uddg", [""])[0]
if target:
return unquote(target)
except Exception:
pass
return href
class _DDGParser(HTMLParser):
"""Extrae resultados (anchor + snippet + rank) del HTML de DDG.
No intenta ser completo — solo busca `<a class="result__a">` para el
titulo/url y `<a class="result__snippet">` (o el div equivalente)
para el texto. Es robusto a cambios menores: si DDG renombra clases,
el enricher devolvera 0 resultados pero no peta.
"""
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.results: list[dict] = []
self._cur: dict | None = None
self._in_title = False
self._in_snippet = False
self._title_buf: list[str] = []
self._snippet_buf: list[str] = []
def _classes(self, attrs: list[tuple[str, str | None]]) -> set[str]:
for k, v in attrs:
if k == "class" and v:
return set(v.split())
return set()
def _href(self, attrs: list[tuple[str, str | None]]) -> str:
for k, v in attrs:
if k == "href" and v:
return v
return ""
def handle_starttag(self, tag: str, attrs):
if tag != "a":
return
cls = self._classes(attrs)
if "result__a" in cls:
if self._cur:
self._flush()
self._cur = {"href": self._href(attrs), "title": "", "snippet": ""}
self._in_title = True
self._title_buf = []
elif "result__snippet" in cls and self._cur is not None:
self._in_snippet = True
self._snippet_buf = []
def handle_endtag(self, tag: str):
if tag != "a":
return
if self._in_title:
self._cur and self._cur.update(
title=" ".join("".join(self._title_buf).split())
)
self._in_title = False
elif self._in_snippet:
self._cur and self._cur.update(
snippet=" ".join("".join(self._snippet_buf).split())
)
self._in_snippet = False
def handle_data(self, data: str):
if self._in_title:
self._title_buf.append(data)
elif self._in_snippet:
self._snippet_buf.append(data)
def _flush(self):
if self._cur and self._cur.get("href"):
self.results.append(self._cur)
self._cur = None
def close(self) -> None:
if self._cur:
self._flush()
super().close()
def parse_ddg_html(htmltxt: str) -> list[dict]:
"""Parsea el HTML de DDG y devuelve [{url, title, snippet, rank}]."""
p = _DDGParser()
try:
p.feed(htmltxt)
p.close()
except Exception as e:
log(f"DDG parser failed: {e}")
out: list[dict] = []
seen: set[str] = set()
for i, r in enumerate(p.results):
url = decode_ddg_href(r.get("href") or "")
if not url or not url.startswith(("http://", "https://")):
continue
if url in seen:
continue
seen.add(url)
out.append({
"url": url,
"title": r.get("title") or "",
"snippet": r.get("snippet") or "",
"rank": len(out) + 1,
})
return out
def find_url_entity(conn: sqlite3.Connection, url: str) -> str | None:
"""Busca un nodo Url existente con la misma url en metadata."""
cur = conn.execute(
"SELECT id, metadata FROM entities WHERE type_ref='Url'"
)
for row in cur:
meta_raw = row[1] or "{}"
try:
meta = json.loads(meta_raw)
except Exception:
continue
if isinstance(meta, dict) and meta.get("url") == url:
return row[0]
return None
def insert_url_entity(conn: sqlite3.Connection, url: str, title: str,
snippet: str, rank: int, query: str) -> str:
"""Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca."""
existing = find_url_entity(conn, url)
ts = now_iso()
meta = {
"url": url,
"title": title,
"snippet": snippet,
"rank": rank,
"query": query,
"engine": "duckduckgo",
"found_at": ts,
}
meta_json = json.dumps(meta, ensure_ascii=False)
if existing:
conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(meta_json, ts, existing),
)
return existing
new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}"
name = title[:200] if title else url[:200]
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name=? LIMIT 1",
(from_id, to_id, name),
)
return cur.fetchone() is not None
_REL_COUNTER = 0
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
global _REL_COUNTER
if relation_exists(conn, from_id, to_id, name):
return False
ts = now_iso()
_REL_COUNTER += 1
rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
params = ctx.get("params") or {}
limit = int(params.get("limit", 10))
region = (params.get("region") or "").strip()
safe = (params.get("safe") or "moderate").strip()
timeout_s = int(params.get("timeout_s", 15))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# Normalizar backslashes a forward slashes — el path puede llegar
# con separadores mezclados desde el lado C++ si fs::path se
# construyo en otro contexto (build cross-platform, copy entre
# Windows y WSL, etc.).
ops_db_path = ops_db_path.replace("\\", "/")
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
# Resolver a absoluto si llega relativo, usando app_dir como
# ancla y cwd como fallback. Sin esto sqlite3 crea un fichero
# vacio si el cwd del subprocess no coincide con el del padre.
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path} (cwd={os.getcwd()})")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"cwd": os.getcwd(),
"entities_added": 0, "relations_added": 0}))
return 7
# Schema check — si no hay tabla entities, el path es incorrecto
# o la operations.db esta sin bootstrappear.
try:
_c = sqlite3.connect(ops_db_path)
try:
row = _c.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='entities'"
).fetchone()
finally:
_c.close()
if not row:
log(f"sin tabla 'entities' en {ops_db_path}")
print(json.dumps({
"error": "operations.db sin tabla 'entities'"
"verifica que graph_explorer haya cargado un "
"proyecto valido antes de lanzar el enricher",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 8
except sqlite3.Error as e:
log(f"sqlite open failed: {e}")
return 9
# Query: prioridad metadata.query > metadata.text > node_name.
query = (metadata.get("query") or metadata.get("text") or node_name).strip()
if not query:
log("nodo sin query (metadata.query / metadata.text / name)")
return 2
progress(0.10, "fetching")
try:
htmltxt = fetch_ddg(query, timeout=timeout_s, region=region, safe=safe)
except Exception as e:
log(f"DDG fetch failed: {e}")
print(json.dumps({"error": str(e), "query": query,
"entities_added": 0, "relations_added": 0}))
return 4
progress(0.55, "parsing")
results = parse_ddg_html(htmltxt)
if limit > 0:
results = results[:limit]
log(f"DDG returned {len(results)} results")
progress(0.80, "applying")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
try:
for r in results:
existed = find_url_entity(conn, r["url"]) is not None
url_id = insert_url_entity(
conn,
url=r["url"],
title=r["title"],
snippet=r["snippet"],
rank=r["rank"],
query=query,
)
if not existed:
entities_added += 1
if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"query": query,
"engine": "duckduckgo",
"results": len(results),
"entities_added": entities_added,
"relations_added": relations_added,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())
Binary file not shown.
+89
View File
@@ -0,0 +1,89 @@
"""Stub minimo de `requests` para tests de enrichers.
Lee el plan de respuesta de `_STUB_REQUESTS_PLAN` (env var con path a un
JSON). Soporta multiples respuestas indexadas por metodo o por sufijo de
URL — la primera coincidencia gana.
Formato del plan:
{
"default": {"text": "<html>...</html>", "status": 200,
"headers": {"Content-Type": "text/html; charset=utf-8"}},
"match": [
{"contains": "duckduckgo.com", "text": "...", "status": 200},
{"method": "GET", "contains": "example.com", "text": "..."}
]
}
"""
from __future__ import annotations
import json
import os
class Response:
def __init__(self, text: str = "", status_code: int = 200,
headers: dict | None = None, url: str = "",
encoding: str = "utf-8") -> None:
self.text = text
self.status_code = status_code
self.headers = headers or {"Content-Type": "text/html; charset=utf-8"}
self.url = url
self.encoding = encoding
self.content = text.encode(encoding, errors="replace")
def json(self):
return json.loads(self.text)
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def _load_plan() -> dict:
p = os.environ.get("_STUB_REQUESTS_PLAN")
if not p or not os.path.exists(p):
return {}
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
def _resolve(method: str, url: str) -> Response:
plan = _load_plan()
for entry in plan.get("match", []):
if "method" in entry and entry["method"].upper() != method.upper():
continue
needle = entry.get("contains") or ""
if needle and needle in url:
return Response(
text=entry.get("text", ""),
status_code=int(entry.get("status", 200)),
headers=entry.get("headers"),
url=url,
)
d = plan.get("default") or {}
return Response(
text=d.get("text", ""),
status_code=int(d.get("status", 200)),
headers=d.get("headers"),
url=url,
)
def get(url, *args, **kwargs):
return _resolve("GET", url)
def post(url, *args, **kwargs):
return _resolve("POST", url)
# Compatibilidad con `requests.exceptions.RequestException` si algun
# enricher lo importa en el futuro.
class RequestException(Exception):
pass
class exceptions: # noqa: N801
RequestException = RequestException
Timeout = RequestException
ConnectionError = RequestException
+237
View File
@@ -0,0 +1,237 @@
"""Fixtures comunes para tests de enrichers de graph_explorer.
Cada test recibe:
- `ops_db`: path a una operations.db con schema minimo en tmp dir
- `app_dir`: tmp dir que actua como app_dir (cache_dir = <app_dir>/cache)
- `registry_root`: ruta absoluta del registry (para imports en run.py)
- `run_enricher(enricher, ctx_overrides)`: helper que invoca run.py via
subprocess con el mismo wire protocol que jobs.cpp.
El schema se replica de `fn_operations/project_template/operations.db` —
solo las columnas que usan los enrichers. Si fn_operations cambia el
schema, este conftest se actualiza.
"""
from __future__ import annotations
import json
import os
import sqlite3
import subprocess
import sys
from pathlib import Path
import pytest
REGISTRY_ROOT = Path(__file__).resolve().parents[5]
APP_DIR_SRC = Path(__file__).resolve().parents[1] # graph_explorer/
ENRICHERS_DIR = APP_DIR_SRC / "enrichers"
TESTS_DIR = Path(__file__).resolve().parent
STUBS_DIR = TESTS_DIR / "_stubs"
PYTHON_BIN = REGISTRY_ROOT / "python" / ".venv" / "bin" / "python3"
def stub_requests(tmp_path: Path, plan: dict) -> dict:
"""Escribe el plan de respuestas y devuelve el env que activa el stub.
El stub vive en tests/_stubs/requests.py y se activa via PYTHONPATH.
Plan acepta `default` y/o `match` (lista de {contains, status, text}).
"""
plan_file = tmp_path / "_stub_plan.json"
plan_file.write_text(json.dumps(plan), encoding="utf-8")
return {
"PYTHONPATH": str(STUBS_DIR) + os.pathsep + os.environ.get("PYTHONPATH", ""),
"_STUB_REQUESTS_PLAN": str(plan_file),
}
SCHEMA_SQL = """
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type_ref TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
description TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE relations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
from_entity TEXT NOT NULL DEFAULT '',
to_entity TEXT NOT NULL,
via TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
purity TEXT NOT NULL DEFAULT '',
direction TEXT NOT NULL DEFAULT 'unidirectional',
weight REAL,
status TEXT NOT NULL DEFAULT 'designed',
started_at TEXT,
ended_at TEXT,
"order" INTEGER,
tags TEXT NOT NULL DEFAULT '[]',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
"""
@pytest.fixture
def ops_db(tmp_path):
"""operations.db vacia con schema minimo, lista para insertar nodos."""
db = tmp_path / "operations.db"
conn = sqlite3.connect(db)
conn.executescript(SCHEMA_SQL)
conn.commit()
conn.close()
return db
@pytest.fixture
def app_dir(tmp_path):
"""Directorio raiz de una 'app' para los enrichers (cache va dentro)."""
d = tmp_path / "app"
d.mkdir()
(d / "cache").mkdir()
return d
@pytest.fixture
def registry_root():
return REGISTRY_ROOT
def make_node(ops_db: Path, *, node_id: str, name: str, type_ref: str,
metadata: dict | None = None, source: str = "test") -> None:
"""Inserta un nodo de tipo arbitrario en operations.db."""
conn = sqlite3.connect(ops_db)
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, "
" '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')",
(node_id, name, type_ref, source,
json.dumps(metadata or {}, ensure_ascii=False)),
)
conn.commit()
conn.close()
def get_entity(ops_db: Path, entity_id: str) -> dict | None:
conn = sqlite3.connect(ops_db)
try:
cur = conn.execute(
"SELECT id, name, type_ref, source, metadata "
"FROM entities WHERE id=?", (entity_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
return None
md = {}
try:
md = json.loads(row[4]) if row[4] else {}
except Exception:
pass
return {"id": row[0], "name": row[1], "type_ref": row[2],
"source": row[3], "metadata": md}
def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]:
conn = sqlite3.connect(ops_db)
try:
if type_ref:
cur = conn.execute(
"SELECT id, name, type_ref, source, metadata "
"FROM entities WHERE type_ref=? ORDER BY id", (type_ref,))
else:
cur = conn.execute(
"SELECT id, name, type_ref, source, metadata "
"FROM entities ORDER BY id")
rows = cur.fetchall()
finally:
conn.close()
out = []
for r in rows:
try:
md = json.loads(r[4]) if r[4] else {}
except Exception:
md = {}
out.append({"id": r[0], "name": r[1], "type_ref": r[2],
"source": r[3], "metadata": md})
return out
def list_relations(ops_db: Path, name: str | None = None) -> list[dict]:
conn = sqlite3.connect(ops_db)
try:
if name:
cur = conn.execute(
"SELECT id, name, from_entity, to_entity FROM relations "
"WHERE name=? ORDER BY id", (name,))
else:
cur = conn.execute(
"SELECT id, name, from_entity, to_entity FROM relations "
"ORDER BY id")
rows = cur.fetchall()
finally:
conn.close()
return [{"id": r[0], "name": r[1], "from_entity": r[2], "to_entity": r[3]}
for r in rows]
def run_enricher(enricher_id: str, ctx: dict, *, env: dict | None = None,
timeout: int = 30) -> tuple[int, dict | None, str]:
"""Lanza enrichers/<id>/run.py con el wire protocol estandar.
Returns: (exit_code, stdout_json_or_None, stderr_text)
"""
run_py = ENRICHERS_DIR / enricher_id / "run.py"
assert run_py.exists(), f"no existe {run_py}"
full_env = os.environ.copy()
if env:
full_env.update(env)
proc = subprocess.run(
[str(PYTHON_BIN), str(run_py)],
input=json.dumps(ctx),
capture_output=True,
text=True,
timeout=timeout,
env=full_env,
)
parsed: dict | None = None
if proc.stdout.strip():
# Ultima linea no vacia es el JSON resumen.
for line in reversed(proc.stdout.strip().splitlines()):
line = line.strip()
if not line:
continue
try:
parsed = json.loads(line)
except Exception:
pass
break
return proc.returncode, parsed, proc.stderr
def base_ctx(*, ops_db, app_dir, registry_root, node_id, node_name,
node_type, metadata=None, params=None) -> dict:
"""Construye el ctx tipico que jobs.cpp pasa por stdin."""
return {
"node_id": node_id,
"node_name": node_name,
"node_type": node_type,
"metadata": metadata or {},
"ops_db_path": str(ops_db),
"app_dir": str(app_dir),
"cache_dir": str(Path(app_dir) / "cache"),
"registry_root": str(registry_root),
"params": params or {},
}
+22
View File
@@ -0,0 +1,22 @@
<!DOCTYPE html>
<html><head><title>tomate at DuckDuckGo</title></head>
<body>
<div class="serp__results">
<div class="result">
<a class="result__a" href="//duckduckgo.com/l/?uddg=https%3A%2F%2Fes.wikipedia.org%2Fwiki%2FTomate&amp;rut=abc">Tomate - Wikipedia, la enciclopedia libre</a>
<a class="result__snippet" href="//duckduckgo.com/l/?uddg=https%3A%2F%2Fes.wikipedia.org%2Fwiki%2FTomate">El tomate es el fruto comestible de la planta Solanum lycopersicum, una especie de la familia de las solanaceas.</a>
</div>
<div class="result">
<a class="result__a" href="//duckduckgo.com/l/?uddg=https%3A%2F%2Fwww.botanical-online.com%2Falimentos%2Ftomate-propiedades&amp;rut=def">Tomate: propiedades y beneficios</a>
<a class="result__snippet" href="//duckduckgo.com/l/?uddg=https%3A%2F%2Fwww.botanical-online.com%2Falimentos%2Ftomate-propiedades">Propiedades del tomate, beneficios para la salud y composicion nutricional.</a>
</div>
<div class="result">
<a class="result__a" href="//duckduckgo.com/l/?uddg=https%3A%2F%2Fwww.recetasgratis.net%2Fbusqueda%2Ftomate&amp;rut=ghi">Recetas con tomate - RecetasGratis</a>
<a class="result__snippet" href="//duckduckgo.com/l/?uddg=https%3A%2F%2Fwww.recetasgratis.net%2Fbusqueda%2Ftomate">Encuentra las mejores recetas con tomate paso a paso.</a>
</div>
<div class="result result--ad">
<!-- anuncio sin titulo, no debe contar -->
<a href="https://ad.doubleclick.net/x">ad</a>
</div>
</div>
</body></html>
+60
View File
@@ -0,0 +1,60 @@
"""Tests del enricher extract_domain.
Pure regex/parsing — sin red. Verifica:
- Url con metadata.url crea Domain + BELONGS_TO
- Email crea Domain (desde la parte derecha del @)
- Si el Domain ya existe se reusa, no se duplica
"""
from __future__ import annotations
from conftest import (
base_ctx, get_entity, list_entities, list_relations,
make_node, run_enricher,
)
def test_url_creates_domain_and_relation(ops_db, app_dir, registry_root):
make_node(ops_db, node_id="u1", name="ex",
type_ref="Url", metadata={"url": "https://www.example.com/path"})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="u1", node_name="ex", node_type="Url",
metadata={"url": "https://www.example.com/path"})
rc, out, err = run_enricher("extract_domain", ctx)
assert rc == 0, err
assert out and out.get("entities_added", 0) >= 1, out
domains = list_entities(ops_db, type_ref="Domain")
assert any(d["name"] == "www.example.com" for d in domains), domains
rels = list_relations(ops_db, name="BELONGS_TO")
assert len(rels) == 1
assert rels[0]["from_entity"] == "u1"
def test_email_creates_domain(ops_db, app_dir, registry_root):
make_node(ops_db, node_id="e1", name="user@aurgi.com",
type_ref="Email", metadata={"address": "user@aurgi.com"})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="e1", node_name="user@aurgi.com", node_type="Email")
rc, out, err = run_enricher("extract_domain", ctx)
assert rc == 0, err
domains = list_entities(ops_db, type_ref="Domain")
assert any(d["name"] == "aurgi.com" for d in domains), domains
def test_existing_domain_is_reused(ops_db, app_dir, registry_root):
# Pre-crear un Domain con el mismo nombre.
make_node(ops_db, node_id="d1", name="example.com", type_ref="Domain",
metadata={})
make_node(ops_db, node_id="u1", name="ex", type_ref="Url",
metadata={"url": "https://example.com/x"})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="u1", node_name="ex", node_type="Url",
metadata={"url": "https://example.com/x"})
rc, out, err = run_enricher("extract_domain", ctx)
assert rc == 0, err
domains = list_entities(ops_db, type_ref="Domain")
names = [d["name"] for d in domains]
assert names.count("example.com") == 1, domains
+63
View File
@@ -0,0 +1,63 @@
"""Tests del enricher extract_links — sin red, lee markdown del cache."""
from __future__ import annotations
from pathlib import Path
from conftest import (
base_ctx, list_entities, list_relations, make_node, run_enricher,
)
SAMPLE_MD = """# Pagina demo
Aqui hay [un enlace](https://example.com/articulo) interesante y
otro [duplicado](https://example.com/articulo) que no debe contar
dos veces.
Tambien una URL pelada: https://otra.example/path?q=1
y https://tercera.example/
Y un email que NO debe extraer como Url: contact@no.example
"""
def test_extract_links_creates_url_nodes(ops_db, app_dir, registry_root):
# 1) Crear el cache con el markdown.
md_dir = Path(app_dir) / "cache" / "ab"
md_dir.mkdir(parents=True, exist_ok=True)
md_path = md_dir / "abc.md"
md_path.write_text(SAMPLE_MD, encoding="utf-8")
rel = md_path.relative_to(app_dir)
# 2) Crear Webpage con metadata.markdown_path apuntando al cache.
make_node(ops_db, node_id="w1", name="demo",
type_ref="Webpage", metadata={"markdown_path": str(rel)})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="w1", node_name="demo", node_type="Webpage",
metadata={"markdown_path": str(rel)})
rc, out, err = run_enricher("extract_links", ctx)
assert rc == 0, err
assert out is not None, err
assert out["entities_added"] >= 3, out
urls = [e["name"] for e in list_entities(ops_db, type_ref="Url")]
assert "https://example.com/articulo" in urls
assert "https://otra.example/path?q=1" in urls
rels = list_relations(ops_db, name="LINKS_TO")
assert len(rels) >= 3
assert all(r["from_entity"] == "w1" for r in rels)
def test_extract_links_without_markdown_path_errors(ops_db, app_dir,
registry_root):
make_node(ops_db, node_id="w1", name="demo",
type_ref="Webpage", metadata={})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="w1", node_name="demo", node_type="Webpage")
rc, out, err = run_enricher("extract_links", ctx)
assert rc != 0, "deberia fallar sin markdown_path"
assert out is not None
assert "missing markdown_path" in (out.get("error") or "")
+59
View File
@@ -0,0 +1,59 @@
"""Tests del enricher extract_text_entities — regex IoCs sobre markdown."""
from __future__ import annotations
from pathlib import Path
from conftest import (
base_ctx, list_entities, list_relations, make_node, run_enricher,
)
# Texto con varios IoCs detectables por extract_iocs (regex puro).
SAMPLE_MD = """# Reporte
Indicators:
- Email: bad@evil.example y otra@victim.example
- IP: 192.0.2.55
- CVE: CVE-2024-12345
- Hash: 44d88612fea8a8f36de82e1278abb02f
"""
def test_extract_iocs_creates_typed_entities(ops_db, app_dir, registry_root):
md_dir = Path(app_dir) / "cache" / "cd"
md_dir.mkdir(parents=True, exist_ok=True)
md_path = md_dir / "ddd.md"
md_path.write_text(SAMPLE_MD, encoding="utf-8")
rel = md_path.relative_to(app_dir)
make_node(ops_db, node_id="w1", name="report",
type_ref="Webpage", metadata={"markdown_path": str(rel)})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="w1", node_name="report", node_type="Webpage",
metadata={"markdown_path": str(rel)})
rc, out, err = run_enricher("extract_text_entities", ctx)
assert rc == 0, err
assert out is not None
assert out["entities_added"] >= 3, out
types = {e["type_ref"] for e in list_entities(ops_db)
if e["type_ref"] != "Webpage"}
# No exigimos todos los tipos — depende de que extract_iocs cubra cada
# patron — pero al menos Email y CVE deberian estar.
assert "Email" in types, types
assert "CVE" in types, types
rels = list_relations(ops_db, name="EXTRACTED_FROM")
assert len(rels) >= 3
assert all(r["to_entity"] == "w1" for r in rels)
def test_extract_iocs_without_markdown_errors(ops_db, app_dir, registry_root):
make_node(ops_db, node_id="w1", name="empty",
type_ref="Webpage", metadata={})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="w1", node_name="empty", node_type="Webpage")
rc, out, err = run_enricher("extract_text_entities", ctx)
assert rc != 0
assert out and "missing markdown_path" in (out.get("error") or "")
+77
View File
@@ -0,0 +1,77 @@
"""Tests del enricher fetch_webpage con red mockeada via stub de requests."""
from __future__ import annotations
import os
from pathlib import Path
from conftest import (
base_ctx, get_entity, list_entities, list_relations,
make_node, run_enricher, stub_requests,
)
SAMPLE_HTML = """<!DOCTYPE html>
<html><head><title>Acme Demo</title></head>
<body>
<h1>Hola</h1>
<p>Esta es la pagina de prueba con un <a href="/x">enlace</a>.</p>
<p>Email de contacto: ops@acme.example</p>
</body></html>
"""
def test_fetch_webpage_creates_domain_and_caches(ops_db, app_dir, registry_root,
tmp_path):
make_node(ops_db, node_id="u1", name="acme",
type_ref="Url", metadata={"url": "https://www.acme.example/"})
plan = {
"default": {"text": SAMPLE_HTML, "status": 200,
"headers": {"Content-Type": "text/html; charset=utf-8"}},
}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="u1", node_name="acme", node_type="Url",
metadata={"url": "https://www.acme.example/"})
rc, out, err = run_enricher("fetch_webpage", ctx, env=env)
assert rc == 0, f"stderr={err}"
assert out is not None, err
assert out["status_code"] == 200
assert out["title"] == "Acme Demo"
assert out["entities_added"] == 1 # Domain
assert out["relations_added"] == 1 # BELONGS_TO
# El nodo Url se promueve a Webpage.
e = get_entity(ops_db, "u1")
assert e["type_ref"] == "Webpage", e
assert e["metadata"]["title"] == "Acme Demo"
assert e["metadata"]["status_code"] == 200
# Cache existe.
html_path = Path(app_dir) / e["metadata"]["html_path"]
assert html_path.exists()
assert "Acme Demo" in html_path.read_text(encoding="utf-8")
# Domain creado con relacion.
domains = list_entities(ops_db, type_ref="Domain")
assert any(d["name"] == "www.acme.example" for d in domains)
rels = list_relations(ops_db, name="BELONGS_TO")
assert len(rels) == 1
def test_fetch_webpage_handles_http_error(ops_db, app_dir, registry_root,
tmp_path):
make_node(ops_db, node_id="u1", name="bad",
type_ref="Url", metadata={"url": "https://no.example/"})
plan = {"default": {"text": "<html></html>", "status": 404}}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="u1", node_name="bad", node_type="Url",
metadata={"url": "https://no.example/"})
rc, out, err = run_enricher("fetch_webpage", ctx, env=env)
# 404 es respuesta valida — exit 0 con status_code en el resumen.
assert rc == 0, err
assert out["status_code"] == 404
+72
View File
@@ -0,0 +1,72 @@
"""Sanity check de los manifests YAML de todos los enrichers.
Confirma que el set actual cubre los tipos esperados y que cada manifest
tiene los campos que `enrichers.cpp` necesita parsear (id, applies_to).
"""
from __future__ import annotations
from pathlib import Path
from conftest import ENRICHERS_DIR
EXPECTED_IDS = {
"extract_domain",
"extract_links",
"extract_text_entities",
"fetch_webpage",
"web_search",
}
def _parse_simple_yaml(text: str) -> dict:
"""Parser ad-hoc que replica lo que hace enrichers.cpp."""
out: dict = {}
in_skip = False
for raw in text.splitlines():
line = raw.rstrip("\r")
s = line.strip()
if not s or s.startswith("#"):
continue
indented = line and line[0].isspace()
if not indented:
in_skip = False
if in_skip:
continue
if ":" not in s:
continue
key, _, val = s.partition(":")
key = key.strip()
val = val.strip()
if val and val[0] in ('"', "'") and val[-1] == val[0]:
val = val[1:-1]
if key == "params" and not val:
in_skip = True
out[key] = val
return out
def test_all_expected_enrichers_present():
found = {p.name for p in ENRICHERS_DIR.iterdir() if p.is_dir()}
missing = EXPECTED_IDS - found
assert not missing, f"faltan enrichers: {missing}"
def test_each_manifest_has_required_fields():
for d in ENRICHERS_DIR.iterdir():
if not d.is_dir():
continue
manifest = d / "manifest.yaml"
runpy = d / "run.py"
assert manifest.exists(), f"falta manifest: {d.name}"
assert runpy.exists(), f"falta run.py: {d.name}"
m = _parse_simple_yaml(manifest.read_text(encoding="utf-8"))
assert m.get("id") == d.name, f"id no coincide con dir: {d.name}"
assert m.get("applies_to"), f"sin applies_to: {d.name}"
assert m.get("description"), f"sin description: {d.name}"
def test_web_search_applies_to_text():
m = _parse_simple_yaml(
(ENRICHERS_DIR / "web_search" / "manifest.yaml").read_text())
assert "text" in m["applies_to"].lower()
+97
View File
@@ -0,0 +1,97 @@
"""Tests del enricher web_search (DuckDuckGo HTML)."""
from __future__ import annotations
from pathlib import Path
from conftest import (
base_ctx, list_entities, list_relations, make_node, run_enricher,
stub_requests, TESTS_DIR,
)
DDG_FIXTURE = TESTS_DIR / "fixtures" / "ddg_results.html"
def test_web_search_creates_url_results_for_text_node(
ops_db, app_dir, registry_root, tmp_path):
make_node(ops_db, node_id="t1", name="tomate",
type_ref="text", metadata={})
plan = {
"match": [
{"contains": "duckduckgo.com",
"text": DDG_FIXTURE.read_text(encoding="utf-8"),
"status": 200},
],
"default": {"text": "", "status": 404},
}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="tomate", node_type="text",
params={"limit": 5})
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 0, f"stderr={err}"
assert out is not None, err
assert out["engine"] == "duckduckgo"
assert out["results"] == 3, out
assert out["entities_added"] == 3
assert out["relations_added"] == 3
urls = list_entities(ops_db, type_ref="Url")
targets = {e["metadata"].get("url") for e in urls}
assert "https://es.wikipedia.org/wiki/Tomate" in targets
assert "https://www.botanical-online.com/alimentos/tomate-propiedades" in targets
rels = list_relations(ops_db, name="SEARCH_RESULT_OF")
assert len(rels) == 3
assert all(r["to_entity"] == "t1" for r in rels)
# Metadata enriquecida.
wiki = next(e for e in urls
if e["metadata"].get("url") == "https://es.wikipedia.org/wiki/Tomate")
assert wiki["metadata"]["query"] == "tomate"
assert wiki["metadata"]["rank"] == 1
assert "Wikipedia" in wiki["metadata"]["title"]
def test_web_search_uses_metadata_query_over_name(ops_db, app_dir,
registry_root, tmp_path):
"""metadata.query debe ganar prioridad sobre node_name."""
make_node(ops_db, node_id="t1", name="placeholder",
type_ref="text", metadata={"query": "tomate"})
plan = {"match": [{"contains": "duckduckgo.com",
"text": DDG_FIXTURE.read_text(encoding="utf-8")}]}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="placeholder", node_type="text",
metadata={"query": "tomate"})
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 0, err
assert out["query"] == "tomate"
def test_web_search_limit_truncates_results(ops_db, app_dir, registry_root,
tmp_path):
make_node(ops_db, node_id="t1", name="tomate", type_ref="text")
plan = {"match": [{"contains": "duckduckgo.com",
"text": DDG_FIXTURE.read_text(encoding="utf-8")}]}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="tomate", node_type="text",
params={"limit": 1})
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 0, err
assert out["results"] == 1
assert out["entities_added"] == 1
def test_web_search_no_query_fails_clean(ops_db, app_dir, registry_root,
tmp_path):
make_node(ops_db, node_id="t1", name="", type_ref="text", metadata={})
env = stub_requests(tmp_path, {"default": {"text": "", "status": 200}})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="", node_type="text")
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 2
assert "sin query" in err