"""Tests del enricher paste_extract (issue 0013). paste_extract es modo PREVIEW puro: no escribe a operations.db. Recibe texto via params.text y devuelve un JSON con entidades y relaciones propuestas. La aplicacion (panel C++) procesa el JSON y persiste con dedupe via el codigo C++ (probado en TU separadas si se quisiera). Decision: NO probamos la cascada hibrida (GLiNER+GLiREL) en pytest — los modelos pesan cientos de MB y tardan segundos en cargar. El contrato del script en `use_hybrid=false` es lo que cubre el panel en la primera iteracion. Si hybrid esta disponible, simplemente añade entidades adicionales: la logica de merge y dedupe se ejerce con regex+regex (mismo texto pasado dos veces) y con stubs en otros tests. """ from __future__ import annotations import json import os import sqlite3 from pathlib import Path import pytest from conftest import ( base_ctx, list_entities, list_relations, run_enricher, SCHEMA_SQL, ) def _resolve_real_registry_root() -> Path | None: """Localiza la raiz real de fn_registry buscando registry.db + cmd/fn. El conftest tiene un fallback que devuelve `/home/lucas` si encuentra un registry.db perdido en HOME — eso rompe los tests que dependen de importar `python.functions.cybersecurity.extract_iocs`. Aqui buscamos explicitamente por el marker AMBOS (`registry.db` Y `cmd/fn/main.go`). En worktrees, el repo no es un ancestro: aceptamos un override via `FN_REGISTRY_ROOT` env. Tambien probamos paths conocidos comunes. """ env = os.environ.get("FN_REGISTRY_ROOT") if env: p = Path(env) if (p / "registry.db").exists() and \ (p / "cmd" / "fn" / "main.go").exists(): return p p = Path(__file__).resolve() for ancestor in p.parents: if (ancestor / "registry.db").exists() and \ (ancestor / "cmd" / "fn" / "main.go").exists(): return ancestor # Fallback hardcoded — busca el registry mas cercano al worktree. for cand in [Path.home() / "fn_registry", Path("/home/lucas/fn_registry")]: if (cand / "registry.db").exists() and \ (cand / "cmd" / "fn" / "main.go").exists(): return cand return None REAL_REGISTRY_ROOT = _resolve_real_registry_root() @pytest.fixture def real_registry_root(): """Usar este en lugar de `registry_root` cuando el enricher necesite importar paquetes Python del registry.""" if REAL_REGISTRY_ROOT is None: pytest.skip("fn_registry root not found from this worktree") return REAL_REGISTRY_ROOT SAMPLE_BANKING = ( "Acme Corp anuncio que su CEO bad@evil.com firmo un acuerdo. " "Servidores afectados: 192.0.2.55 y 10.0.0.12. " "Vulnerabilidad: CVE-2024-12345. Hash IOC: 44d88612fea8a8f36de82e1278abb02f." ) def _make_ctx(*, ops_db, app_dir, registry_root, text, **params): """Helper — paste_extract no necesita node_id ni ops_db_path.""" ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, node_id="", node_name="", node_type="") ctx["params"] = {"text": text, **params} return ctx def test_paste_extract_returns_entities_no_db_write(ops_db, app_dir, real_registry_root): registry_root = real_registry_root """Modo preview: parsea entidades pero NO escribe a operations.db.""" ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text=SAMPLE_BANKING) rc, out, err = run_enricher("paste_extract", ctx) assert rc == 0, err assert out is not None assert "entities" in out assert "relations" in out assert "stats" in out assert out["stats"]["layers"] == ["regex"] # Tipos esperados (al menos Email, IPAddress, CVE). types = {e["type_ref"] for e in out["entities"]} assert "Email" in types, types assert "CVE" in types, types # Cada entidad tiene los campos del contrato. for e in out["entities"]: assert isinstance(e["id"], str) and e["id"].startswith("tmp_"), e assert e["type_ref"] and e["name"] assert e["source"] in ("regex", "hybrid") assert "metadata" in e # start/end son ints (>=0 en regex matches). assert isinstance(e["start"], int) assert isinstance(e["end"], int) # Crucial: NO se ha escrito a la BD (modo preview). assert list_entities(ops_db) == [] assert list_relations(ops_db) == [] def test_paste_extract_dedupes_within_run(ops_db, app_dir, real_registry_root): registry_root = real_registry_root """Texto con duplicados → cada (type_ref, name) aparece una sola vez.""" text = ("Email a foo@bar.com y otra vez foo@bar.com. " "IP 192.0.2.10. Repite IP 192.0.2.10.") ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text=text) rc, out, err = run_enricher("paste_extract", ctx) assert rc == 0, err keys = [(e["type_ref"], e["name"]) for e in out["entities"]] assert len(keys) == len(set(keys)), keys assert ("Email", "foo@bar.com") in keys assert ("IPAddress", "192.0.2.10") in keys def test_paste_extract_empty_text_fails_clean(ops_db, app_dir, real_registry_root): registry_root = real_registry_root """Sin params.text → exit 2 + JSON con error y entities vacias.""" ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text="") rc, out, err = run_enricher("paste_extract", ctx) assert rc == 2, err assert out is not None assert out["entities"] == [] assert "error" in out def test_paste_extract_max_entities_truncates(ops_db, app_dir, real_registry_root): registry_root = real_registry_root """max_entities=N corta la lista a las N primeras encontradas.""" text = " ".join(f"contact{i:03d}@example.org" for i in range(50)) ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text=text, max_entities=10) rc, out, err = run_enricher("paste_extract", ctx) assert rc == 0, err assert len(out["entities"]) == 10 def test_paste_extract_types_filter(ops_db, app_dir, real_registry_root): registry_root = real_registry_root """params.types filtra qué tipos IoC se extraen.""" ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text=SAMPLE_BANKING, types="email") rc, out, err = run_enricher("paste_extract", ctx) assert rc == 0, err types = {e["type_ref"] for e in out["entities"]} # Solo Email — el filtro paso a extract_iocs y este solo emite emails. assert types == {"Email"}, types def test_paste_extract_use_hybrid_false_skips_layer(ops_db, app_dir, real_registry_root): registry_root = real_registry_root """use_hybrid=False ⇒ stats.layers = ['regex'] (no toca GLiNER).""" ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text=SAMPLE_BANKING, use_hybrid=False) rc, out, err = run_enricher("paste_extract", ctx) assert rc == 0, err assert out["stats"]["layers"] == ["regex"] def test_paste_extract_idempotent_runs_no_duplicate_proposal( ops_db, app_dir, real_registry_root): registry_root = real_registry_root """Llamar paste_extract dos veces con el mismo texto produce la misma propuesta — la dedupe del *commit* es responsabilidad del panel C++, pero el script preview ya devuelve sin duplicados.""" ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, text=SAMPLE_BANKING) rc1, out1, _ = run_enricher("paste_extract", ctx) rc2, out2, _ = run_enricher("paste_extract", ctx) assert rc1 == 0 and rc2 == 0 keys1 = sorted((e["type_ref"], e["name"]) for e in out1["entities"]) keys2 = sorted((e["type_ref"], e["name"]) for e in out2["entities"]) assert keys1 == keys2 # ----------------------------------------------------------------------------- # Apply-side tests — replican la logica de extract_panel_apply en Python para # verificar el contrato de dedupe que el panel C++ implementa. Ejercitan que # (1) entidades nuevas se insertan, (2) duplicadas (type_ref, name) reusan id, # (3) las relaciones cuyos endpoints estan en la BD se persisten, (4) las que # no, se descartan. # # El panel C++ se prueba al compilar (build verde) y en runtime via la CLI; # aqui validamos el *contrato* del JSON output que el panel consume. # ----------------------------------------------------------------------------- def _apply_proposal_python(ops_db_path, proposal: dict) -> dict: """Implementacion de referencia de extract_panel_apply en Python. Coincide con la del C++ — sirve para validar el contrato. Si esta funcion y la del C++ producen el mismo resultado en los mismos inputs, el wire-protocol es correcto. """ conn = sqlite3.connect(ops_db_path) try: ts = "2026-01-01T00:00:00Z" map_id = {} added_e = 0 dedup_e = 0 for i, e in enumerate(proposal.get("entities", [])): if not e.get("selected", True): continue tref = e["type_ref"]; name = e["name"] existing = conn.execute( "SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1", (tref, name)).fetchone() if existing: map_id[e["id"]] = existing[0] dedup_e += 1 else: new_id = f"{tref}_{i}_{name}" conn.execute( "INSERT INTO entities (id, name, type_ref, source, " " metadata, created_at, updated_at) " "VALUES (?, ?, ?, 'panel:paste_extract', ?, ?, ?)", (new_id, name, tref, json.dumps(e.get("metadata", {})), ts, ts)) map_id[e["id"]] = new_id added_e += 1 added_r = 0 skipped_r = 0 for j, r in enumerate(proposal.get("relations", [])): if not r.get("selected", True): continue f = map_id.get(r["from_id"]); t = map_id.get(r["to_id"]) if not f or not t: skipped_r += 1 continue name = r.get("name") or "RELATED_TO" existing = conn.execute( "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " "AND name=? LIMIT 1", (f, t, name)).fetchone() if existing: skipped_r += 1 continue conn.execute( "INSERT INTO relations (id, name, from_entity, to_entity, " " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", (f"rel_{j}_{name}", name, f, t, ts, ts)) added_r += 1 conn.commit() return {"added_entities": added_e, "dedup_entities": dedup_e, "added_relations": added_r, "skipped_relations": skipped_r} finally: conn.close() def test_apply_inserts_only_selected(ops_db): """Solo las entidades con selected=true se insertan.""" proposal = { "entities": [ {"id": "tmp_0", "type_ref": "Email", "name": "a@b.com", "metadata": {}, "selected": True}, {"id": "tmp_1", "type_ref": "IPAddress", "name": "1.2.3.4", "metadata": {}, "selected": False}, # NO seleccionada {"id": "tmp_2", "type_ref": "CVE", "name": "CVE-2024-1", "metadata": {}, "selected": True}, ], "relations": [], } stats = _apply_proposal_python(ops_db, proposal) assert stats["added_entities"] == 2 types = {e["type_ref"] for e in list_entities(ops_db)} assert types == {"Email", "CVE"} def test_apply_dedupes_by_type_and_name(ops_db): """Reaplicar el mismo proposal NO duplica entidades.""" proposal = { "entities": [ {"id": "tmp_0", "type_ref": "Email", "name": "x@y.z", "metadata": {}, "selected": True}, ], "relations": [], } s1 = _apply_proposal_python(ops_db, proposal) s2 = _apply_proposal_python(ops_db, proposal) assert s1["added_entities"] == 1 assert s2["added_entities"] == 0 assert s2["dedup_entities"] == 1 # Solo una fila en la BD. rows = list_entities(ops_db) assert len(rows) == 1 def test_apply_inserts_relations_when_endpoints_resolve(ops_db): """Relaciones con endpoints validos (selected) se persisten.""" proposal = { "entities": [ {"id": "tmp_0", "type_ref": "Person", "name": "Alice", "metadata": {}, "selected": True}, {"id": "tmp_1", "type_ref": "Organization", "name": "Acme", "metadata": {}, "selected": True}, ], "relations": [ {"from_id": "tmp_0", "to_id": "tmp_1", "name": "works_at", "selected": True}, ], } stats = _apply_proposal_python(ops_db, proposal) assert stats["added_entities"] == 2 assert stats["added_relations"] == 1 rels = list_relations(ops_db, name="works_at") assert len(rels) == 1 def test_apply_skips_relation_if_endpoint_unselected(ops_db): """Si un endpoint no se selecciona, su relacion se descarta.""" proposal = { "entities": [ {"id": "tmp_0", "type_ref": "Person", "name": "Alice", "metadata": {}, "selected": True}, {"id": "tmp_1", "type_ref": "Organization", "name": "Acme", "metadata": {}, "selected": False}, # NO se inserta ], "relations": [ {"from_id": "tmp_0", "to_id": "tmp_1", "name": "works_at", "selected": True}, ], } stats = _apply_proposal_python(ops_db, proposal) assert stats["added_entities"] == 1 assert stats["added_relations"] == 0 assert stats["skipped_relations"] == 1 def test_apply_dedupes_relation_on_repeat(ops_db): """Relacion (from, to, name) repetida no se duplica.""" proposal = { "entities": [ {"id": "tmp_0", "type_ref": "Person", "name": "Alice", "metadata": {}, "selected": True}, {"id": "tmp_1", "type_ref": "Organization", "name": "Acme", "metadata": {}, "selected": True}, ], "relations": [ {"from_id": "tmp_0", "to_id": "tmp_1", "name": "works_at", "selected": True}, ], } s1 = _apply_proposal_python(ops_db, proposal) s2 = _apply_proposal_python(ops_db, proposal) assert s1["added_relations"] == 1 assert s2["added_relations"] == 0 assert s2["skipped_relations"] == 1