"""Fixtures comunes para tests de enrichers de graph_explorer. Cada test recibe: - `ops_db`: path a una operations.db con schema minimo en tmp dir - `app_dir`: tmp dir que actua como app_dir (cache_dir = /cache) - `registry_root`: ruta absoluta del registry (para imports en run.py) - `run_enricher(enricher, ctx_overrides)`: helper que invoca run.py via subprocess con el mismo wire protocol que jobs.cpp. El schema se replica de `fn_operations/project_template/operations.db` — solo las columnas que usan los enrichers. Si fn_operations cambia el schema, este conftest se actualiza. """ from __future__ import annotations import json import os import sqlite3 import subprocess import sys from pathlib import Path import pytest APP_DIR_SRC = Path(__file__).resolve().parents[1] # graph_explorer/ TESTS_DIR = Path(__file__).resolve().parent STUBS_DIR = TESTS_DIR / "_stubs" # Los enrichers viven en `/enrichers/` en el repo dev y en # `/assets/enrichers/` en la carpeta portable de Windows # (convencion `assets/` desde el ADR de feb-2026). Detectar cual # existe y usar ese. def _resolve_enrichers_dir() -> Path: cands = [ APP_DIR_SRC / "enrichers", APP_DIR_SRC / "assets" / "enrichers", ] for c in cands: if c.is_dir(): return c # Default a la primera para mensajes de error consistentes con el dev layout. return cands[0] ENRICHERS_DIR = _resolve_enrichers_dir() def _resolve_registry_root() -> Path: """Sube desde el directorio de tests buscando un marker del registry. En el repo: APP_DIR/projects/osint_graph/apps/graph_explorer/tests -> 5 niveles arriba esta fn_registry/. En la carpeta de Windows (Desktop/apps/graph_explorer/tests) NO hay registry — usamos el propio app dir como fallback. Los tests no leen registry.db; solo se pasa registry_root via ctx por compatibilidad con run.py. """ # Marker fiable: fichero `cmd/fn/main.go` o `registry.db`. p = APP_DIR_SRC for _ in range(8): if (p / "cmd" / "fn" / "main.go").exists() or \ (p / "registry.db").exists(): return p if p.parent == p: break p = p.parent # Sin registry: usa el app dir como pseudo-root. Los tests funcionan # igual mientras no haya un test que importe paquetes del registry. return APP_DIR_SRC REGISTRY_ROOT = _resolve_registry_root() def _resolve_python_bin() -> Path: """Elige el Python con el que ejecutar los enrichers. Prioridad (cubre Linux/WSL dev y Windows portable instalado): 1. $FN_TEST_PYTHON env override 2. /assets/runtime/python/python.exe (Windows portable, solo Windows) 3. /runtime/python/python.exe (legacy, solo Windows) 4. /python/.venv/bin/python3 (WSL dev venv) 5. sys.executable (whatever runs pytest) Los candidatos `python.exe` solo se aceptan si corremos en Windows nativo. En WSL/Linux pueden existir vendored en el repo (los distribuibles), pero no son ejecutables en este OS. """ env = os.environ.get("FN_TEST_PYTHON") if env and Path(env).exists(): return Path(env) is_windows = sys.platform.startswith("win") cands: list[Path] = [] if is_windows: cands += [ APP_DIR_SRC / "assets" / "runtime" / "python" / "python.exe", APP_DIR_SRC / "runtime" / "python" / "python.exe", ] cands += [REGISTRY_ROOT / "python" / ".venv" / "bin" / "python3"] for c in cands: if c.exists(): return c return Path(sys.executable) PYTHON_BIN = _resolve_python_bin() def stub_requests(tmp_path: Path, plan: dict) -> dict: """Escribe el plan de respuestas y devuelve el env que activa el stub. Devuelve dos vias por las que `_runner.py` y un Python no-embedded pueden inyectar el stub: - `PYTHONPATH`: la ruta estandar; respeta el orden y el resto del entorno. Funciona en Linux y en Python full instalado (no-embed). - `_STUB_PATHS`: lo lee `_runner.py` y hace `sys.path.insert(0, ...)`. Necesario en el Python embebido de Windows, que ignora PYTHONPATH (lo controla `python312._pth`). Plan acepta `default` y/o `match` (lista de {contains, status, text}). """ plan_file = tmp_path / "_stub_plan.json" plan_file.write_text(json.dumps(plan), encoding="utf-8") return { "PYTHONPATH": str(STUBS_DIR) + os.pathsep + os.environ.get("PYTHONPATH", ""), "_STUB_PATHS": str(STUBS_DIR), "_STUB_REQUESTS_PLAN": str(plan_file), } SCHEMA_SQL = """ CREATE TABLE entities ( id TEXT PRIMARY KEY, name TEXT NOT NULL, type_ref TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', description TEXT NOT NULL DEFAULT '', domain TEXT NOT NULL DEFAULT '', tags TEXT NOT NULL DEFAULT '[]', source TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', notes TEXT NOT NULL DEFAULT '', group_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE relations ( id TEXT PRIMARY KEY, name TEXT NOT NULL, from_entity TEXT NOT NULL DEFAULT '', to_entity TEXT NOT NULL, via TEXT NOT NULL DEFAULT '', description TEXT NOT NULL DEFAULT '', purity TEXT NOT NULL DEFAULT '', direction TEXT NOT NULL DEFAULT 'unidirectional', weight REAL, status TEXT NOT NULL DEFAULT 'designed', started_at TEXT, ended_at TEXT, "order" INTEGER, tags TEXT NOT NULL DEFAULT '[]', notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); """ @pytest.fixture def ops_db(tmp_path): """operations.db vacia con schema minimo, lista para insertar nodos.""" db = tmp_path / "operations.db" conn = sqlite3.connect(db) conn.executescript(SCHEMA_SQL) conn.commit() conn.close() return db @pytest.fixture def app_dir(tmp_path): """Directorio raiz de una 'app' para los enrichers (cache va dentro).""" d = tmp_path / "app" d.mkdir() (d / "cache").mkdir() return d @pytest.fixture def registry_root(): return REGISTRY_ROOT def make_node(ops_db: Path, *, node_id: str, name: str, type_ref: str, metadata: dict | None = None, source: str = "test") -> None: """Inserta un nodo de tipo arbitrario en operations.db.""" conn = sqlite3.connect(ops_db) conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) VALUES (?, ?, ?, ?, ?, " " '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')", (node_id, name, type_ref, source, json.dumps(metadata or {}, ensure_ascii=False)), ) conn.commit() conn.close() def get_entity(ops_db: Path, entity_id: str) -> dict | None: conn = sqlite3.connect(ops_db) try: cur = conn.execute( "SELECT id, name, type_ref, source, metadata " "FROM entities WHERE id=?", (entity_id,)) row = cur.fetchone() finally: conn.close() if not row: return None md = {} try: md = json.loads(row[4]) if row[4] else {} except Exception: pass return {"id": row[0], "name": row[1], "type_ref": row[2], "source": row[3], "metadata": md} def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]: conn = sqlite3.connect(ops_db) try: if type_ref: cur = conn.execute( "SELECT id, name, type_ref, source, metadata, group_id " "FROM entities WHERE type_ref=? ORDER BY id", (type_ref,)) else: cur = conn.execute( "SELECT id, name, type_ref, source, metadata, group_id " "FROM entities ORDER BY id") rows = cur.fetchall() finally: conn.close() out = [] for r in rows: try: md = json.loads(r[4]) if r[4] else {} except Exception: md = {} out.append({"id": r[0], "name": r[1], "type_ref": r[2], "source": r[3], "metadata": md, "group_id": r[5]}) return out def list_relations(ops_db: Path, name: str | None = None) -> list[dict]: conn = sqlite3.connect(ops_db) try: if name: cur = conn.execute( "SELECT id, name, from_entity, to_entity FROM relations " "WHERE name=? ORDER BY id", (name,)) else: cur = conn.execute( "SELECT id, name, from_entity, to_entity FROM relations " "ORDER BY id") rows = cur.fetchall() finally: conn.close() return [{"id": r[0], "name": r[1], "from_entity": r[2], "to_entity": r[3]} for r in rows] def run_enricher(enricher_id: str, ctx: dict, *, env: dict | None = None, timeout: int = 30) -> tuple[int, dict | None, str]: """Lanza enrichers//run.py con el wire protocol estandar. Usa siempre el trampoline `_runner.py` para que el stub de requests se inyecte tanto con PYTHONPATH (Python normal) como con `_STUB_PATHS` (Python embebido de Windows que ignora PYTHONPATH). Returns: (exit_code, stdout_json_or_None, stderr_text) """ run_py = ENRICHERS_DIR / enricher_id / "run.py" assert run_py.exists(), f"no existe {run_py}" runner = TESTS_DIR / "_runner.py" assert runner.exists(), f"no existe {runner}" full_env = os.environ.copy() if env: full_env.update(env) proc = subprocess.run( [str(PYTHON_BIN), str(runner), str(run_py)], input=json.dumps(ctx), capture_output=True, text=True, timeout=timeout, env=full_env, ) parsed: dict | None = None if proc.stdout.strip(): # Ultima linea no vacia es el JSON resumen. for line in reversed(proc.stdout.strip().splitlines()): line = line.strip() if not line: continue try: parsed = json.loads(line) except Exception: pass break return proc.returncode, parsed, proc.stderr def base_ctx(*, ops_db, app_dir, registry_root, node_id, node_name, node_type, metadata=None, params=None) -> dict: """Construye el ctx tipico que jobs.cpp pasa por stdin.""" return { "node_id": node_id, "node_name": node_name, "node_type": node_type, "metadata": metadata or {}, "ops_db_path": str(ops_db), "app_dir": str(app_dir), "cache_dir": str(Path(app_dir) / "cache"), "registry_root": str(registry_root), "params": params or {}, }