Files
graph_explorer/tests/conftest.py
T
egutierrez 6919ebfe9c feat(enrichers): web_search DuckDuckGo + tests pytest de los 5 enrichers
Anade enricher web_search aplicable a nodos text/Concept/Topic. Hace
POST a html.duckduckgo.com con la query del nodo, parsea resultados
con HTMLParser stdlib, decodifica el redirect uddg= y crea N nodos
Url con relacion SEARCH_RESULT_OF apuntando al nodo origen.

Encadenable: tras web_search, fetch_webpage sobre cada Url completa
el pipeline search -> fetch -> extract.

Defensa contra ops_db_path mal resuelto: normaliza backslashes,
resuelve relativo contra app_dir, valida que la tabla entities
exista antes de tocar nada (exit codes 7/8/9 con JSON resumen).

Tests pytest (16/16 verde): conftest con operations.db temp +
schema minimo, stub de requests via PYTHONPATH para mockear red.
Cubre los 5 enrichers (extract_domain, fetch_webpage, extract_links,
extract_text_entities, web_search) + sanity check de manifests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 16:10:13 +02:00

238 lines
7.4 KiB
Python

"""Fixtures comunes para tests de enrichers de graph_explorer.
Cada test recibe:
- `ops_db`: path a una operations.db con schema minimo en tmp dir
- `app_dir`: tmp dir que actua como app_dir (cache_dir = <app_dir>/cache)
- `registry_root`: ruta absoluta del registry (para imports en run.py)
- `run_enricher(enricher, ctx_overrides)`: helper que invoca run.py via
subprocess con el mismo wire protocol que jobs.cpp.
El schema se replica de `fn_operations/project_template/operations.db` —
solo las columnas que usan los enrichers. Si fn_operations cambia el
schema, este conftest se actualiza.
"""
from __future__ import annotations
import json
import os
import sqlite3
import subprocess
import sys
from pathlib import Path
import pytest
REGISTRY_ROOT = Path(__file__).resolve().parents[5]
APP_DIR_SRC = Path(__file__).resolve().parents[1] # graph_explorer/
ENRICHERS_DIR = APP_DIR_SRC / "enrichers"
TESTS_DIR = Path(__file__).resolve().parent
STUBS_DIR = TESTS_DIR / "_stubs"
PYTHON_BIN = REGISTRY_ROOT / "python" / ".venv" / "bin" / "python3"
def stub_requests(tmp_path: Path, plan: dict) -> dict:
"""Escribe el plan de respuestas y devuelve el env que activa el stub.
El stub vive en tests/_stubs/requests.py y se activa via PYTHONPATH.
Plan acepta `default` y/o `match` (lista de {contains, status, text}).
"""
plan_file = tmp_path / "_stub_plan.json"
plan_file.write_text(json.dumps(plan), encoding="utf-8")
return {
"PYTHONPATH": str(STUBS_DIR) + os.pathsep + os.environ.get("PYTHONPATH", ""),
"_STUB_REQUESTS_PLAN": str(plan_file),
}
SCHEMA_SQL = """
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type_ref TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
description TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE relations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
from_entity TEXT NOT NULL DEFAULT '',
to_entity TEXT NOT NULL,
via TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
purity TEXT NOT NULL DEFAULT '',
direction TEXT NOT NULL DEFAULT 'unidirectional',
weight REAL,
status TEXT NOT NULL DEFAULT 'designed',
started_at TEXT,
ended_at TEXT,
"order" INTEGER,
tags TEXT NOT NULL DEFAULT '[]',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
"""
@pytest.fixture
def ops_db(tmp_path):
"""operations.db vacia con schema minimo, lista para insertar nodos."""
db = tmp_path / "operations.db"
conn = sqlite3.connect(db)
conn.executescript(SCHEMA_SQL)
conn.commit()
conn.close()
return db
@pytest.fixture
def app_dir(tmp_path):
"""Directorio raiz de una 'app' para los enrichers (cache va dentro)."""
d = tmp_path / "app"
d.mkdir()
(d / "cache").mkdir()
return d
@pytest.fixture
def registry_root():
return REGISTRY_ROOT
def make_node(ops_db: Path, *, node_id: str, name: str, type_ref: str,
metadata: dict | None = None, source: str = "test") -> None:
"""Inserta un nodo de tipo arbitrario en operations.db."""
conn = sqlite3.connect(ops_db)
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, "
" '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')",
(node_id, name, type_ref, source,
json.dumps(metadata or {}, ensure_ascii=False)),
)
conn.commit()
conn.close()
def get_entity(ops_db: Path, entity_id: str) -> dict | None:
conn = sqlite3.connect(ops_db)
try:
cur = conn.execute(
"SELECT id, name, type_ref, source, metadata "
"FROM entities WHERE id=?", (entity_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
return None
md = {}
try:
md = json.loads(row[4]) if row[4] else {}
except Exception:
pass
return {"id": row[0], "name": row[1], "type_ref": row[2],
"source": row[3], "metadata": md}
def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]:
conn = sqlite3.connect(ops_db)
try:
if type_ref:
cur = conn.execute(
"SELECT id, name, type_ref, source, metadata "
"FROM entities WHERE type_ref=? ORDER BY id", (type_ref,))
else:
cur = conn.execute(
"SELECT id, name, type_ref, source, metadata "
"FROM entities ORDER BY id")
rows = cur.fetchall()
finally:
conn.close()
out = []
for r in rows:
try:
md = json.loads(r[4]) if r[4] else {}
except Exception:
md = {}
out.append({"id": r[0], "name": r[1], "type_ref": r[2],
"source": r[3], "metadata": md})
return out
def list_relations(ops_db: Path, name: str | None = None) -> list[dict]:
conn = sqlite3.connect(ops_db)
try:
if name:
cur = conn.execute(
"SELECT id, name, from_entity, to_entity FROM relations "
"WHERE name=? ORDER BY id", (name,))
else:
cur = conn.execute(
"SELECT id, name, from_entity, to_entity FROM relations "
"ORDER BY id")
rows = cur.fetchall()
finally:
conn.close()
return [{"id": r[0], "name": r[1], "from_entity": r[2], "to_entity": r[3]}
for r in rows]
def run_enricher(enricher_id: str, ctx: dict, *, env: dict | None = None,
timeout: int = 30) -> tuple[int, dict | None, str]:
"""Lanza enrichers/<id>/run.py con el wire protocol estandar.
Returns: (exit_code, stdout_json_or_None, stderr_text)
"""
run_py = ENRICHERS_DIR / enricher_id / "run.py"
assert run_py.exists(), f"no existe {run_py}"
full_env = os.environ.copy()
if env:
full_env.update(env)
proc = subprocess.run(
[str(PYTHON_BIN), str(run_py)],
input=json.dumps(ctx),
capture_output=True,
text=True,
timeout=timeout,
env=full_env,
)
parsed: dict | None = None
if proc.stdout.strip():
# Ultima linea no vacia es el JSON resumen.
for line in reversed(proc.stdout.strip().splitlines()):
line = line.strip()
if not line:
continue
try:
parsed = json.loads(line)
except Exception:
pass
break
return proc.returncode, parsed, proc.stderr
def base_ctx(*, ops_db, app_dir, registry_root, node_id, node_name,
node_type, metadata=None, params=None) -> dict:
"""Construye el ctx tipico que jobs.cpp pasa por stdin."""
return {
"node_id": node_id,
"node_name": node_name,
"node_type": node_type,
"metadata": metadata or {},
"ops_db_path": str(ops_db),
"app_dir": str(app_dir),
"cache_dir": str(Path(app_dir) / "cache"),
"registry_root": str(registry_root),
"params": params or {},
}