diff --git a/tests/test_paste_extract.py b/tests/test_paste_extract.py new file mode 100644 index 0000000..1a782a8 --- /dev/null +++ b/tests/test_paste_extract.py @@ -0,0 +1,367 @@ +"""Tests del enricher paste_extract (issue 0013). + +paste_extract es modo PREVIEW puro: no escribe a operations.db. Recibe +texto via params.text y devuelve un JSON con entidades y relaciones +propuestas. La aplicacion (panel C++) procesa el JSON y persiste con +dedupe via el codigo C++ (probado en TU separadas si se quisiera). + +Decision: NO probamos la cascada hibrida (GLiNER+GLiREL) en pytest — +los modelos pesan cientos de MB y tardan segundos en cargar. El +contrato del script en `use_hybrid=false` es lo que cubre el panel +en la primera iteracion. Si hybrid esta disponible, simplemente +añade entidades adicionales: la logica de merge y dedupe se ejerce +con regex+regex (mismo texto pasado dos veces) y con stubs en otros +tests. +""" +from __future__ import annotations + +import json +import os +import sqlite3 +from pathlib import Path + +import pytest + +from conftest import ( + base_ctx, list_entities, list_relations, run_enricher, SCHEMA_SQL, +) + + +def _resolve_real_registry_root() -> Path | None: + """Localiza la raiz real de fn_registry buscando registry.db + cmd/fn. + + El conftest tiene un fallback que devuelve `/home/lucas` si encuentra + un registry.db perdido en HOME — eso rompe los tests que dependen de + importar `python.functions.cybersecurity.extract_iocs`. Aqui buscamos + explicitamente por el marker AMBOS (`registry.db` Y `cmd/fn/main.go`). + + En worktrees, el repo no es un ancestro: aceptamos un override via + `FN_REGISTRY_ROOT` env. Tambien probamos paths conocidos comunes. + """ + env = os.environ.get("FN_REGISTRY_ROOT") + if env: + p = Path(env) + if (p / "registry.db").exists() and \ + (p / "cmd" / "fn" / "main.go").exists(): + return p + p = Path(__file__).resolve() + for ancestor in p.parents: + if (ancestor / "registry.db").exists() and \ + (ancestor / "cmd" / "fn" / "main.go").exists(): + return ancestor + # Fallback hardcoded — busca el registry mas cercano al worktree. + for cand in [Path.home() / "fn_registry", Path("/home/lucas/fn_registry")]: + if (cand / "registry.db").exists() and \ + (cand / "cmd" / "fn" / "main.go").exists(): + return cand + return None + + +REAL_REGISTRY_ROOT = _resolve_real_registry_root() + + +@pytest.fixture +def real_registry_root(): + """Usar este en lugar de `registry_root` cuando el enricher + necesite importar paquetes Python del registry.""" + if REAL_REGISTRY_ROOT is None: + pytest.skip("fn_registry root not found from this worktree") + return REAL_REGISTRY_ROOT + + +SAMPLE_BANKING = ( + "Acme Corp anuncio que su CEO bad@evil.com firmo un acuerdo. " + "Servidores afectados: 192.0.2.55 y 10.0.0.12. " + "Vulnerabilidad: CVE-2024-12345. Hash IOC: 44d88612fea8a8f36de82e1278abb02f." +) + + +def _make_ctx(*, ops_db, app_dir, registry_root, text, **params): + """Helper — paste_extract no necesita node_id ni ops_db_path.""" + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="", node_name="", node_type="") + ctx["params"] = {"text": text, **params} + return ctx + + +def test_paste_extract_returns_entities_no_db_write(ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """Modo preview: parsea entidades pero NO escribe a operations.db.""" + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text=SAMPLE_BANKING) + rc, out, err = run_enricher("paste_extract", ctx) + assert rc == 0, err + assert out is not None + assert "entities" in out + assert "relations" in out + assert "stats" in out + assert out["stats"]["layers"] == ["regex"] + + # Tipos esperados (al menos Email, IPAddress, CVE). + types = {e["type_ref"] for e in out["entities"]} + assert "Email" in types, types + assert "CVE" in types, types + + # Cada entidad tiene los campos del contrato. + for e in out["entities"]: + assert isinstance(e["id"], str) and e["id"].startswith("tmp_"), e + assert e["type_ref"] and e["name"] + assert e["source"] in ("regex", "hybrid") + assert "metadata" in e + # start/end son ints (>=0 en regex matches). + assert isinstance(e["start"], int) + assert isinstance(e["end"], int) + + # Crucial: NO se ha escrito a la BD (modo preview). + assert list_entities(ops_db) == [] + assert list_relations(ops_db) == [] + + +def test_paste_extract_dedupes_within_run(ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """Texto con duplicados → cada (type_ref, name) aparece una sola vez.""" + text = ("Email a foo@bar.com y otra vez foo@bar.com. " + "IP 192.0.2.10. Repite IP 192.0.2.10.") + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text=text) + rc, out, err = run_enricher("paste_extract", ctx) + assert rc == 0, err + + keys = [(e["type_ref"], e["name"]) for e in out["entities"]] + assert len(keys) == len(set(keys)), keys + assert ("Email", "foo@bar.com") in keys + assert ("IPAddress", "192.0.2.10") in keys + + +def test_paste_extract_empty_text_fails_clean(ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """Sin params.text → exit 2 + JSON con error y entities vacias.""" + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text="") + rc, out, err = run_enricher("paste_extract", ctx) + assert rc == 2, err + assert out is not None + assert out["entities"] == [] + assert "error" in out + + +def test_paste_extract_max_entities_truncates(ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """max_entities=N corta la lista a las N primeras encontradas.""" + text = " ".join(f"contact{i:03d}@example.org" for i in range(50)) + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text=text, max_entities=10) + rc, out, err = run_enricher("paste_extract", ctx) + assert rc == 0, err + assert len(out["entities"]) == 10 + + +def test_paste_extract_types_filter(ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """params.types filtra qué tipos IoC se extraen.""" + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text=SAMPLE_BANKING, types="email") + rc, out, err = run_enricher("paste_extract", ctx) + assert rc == 0, err + types = {e["type_ref"] for e in out["entities"]} + # Solo Email — el filtro paso a extract_iocs y este solo emite emails. + assert types == {"Email"}, types + + +def test_paste_extract_use_hybrid_false_skips_layer(ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """use_hybrid=False ⇒ stats.layers = ['regex'] (no toca GLiNER).""" + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text=SAMPLE_BANKING, use_hybrid=False) + rc, out, err = run_enricher("paste_extract", ctx) + assert rc == 0, err + assert out["stats"]["layers"] == ["regex"] + + +def test_paste_extract_idempotent_runs_no_duplicate_proposal( + ops_db, app_dir, real_registry_root): + registry_root = real_registry_root + """Llamar paste_extract dos veces con el mismo texto produce la + misma propuesta — la dedupe del *commit* es responsabilidad del + panel C++, pero el script preview ya devuelve sin duplicados.""" + ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + text=SAMPLE_BANKING) + rc1, out1, _ = run_enricher("paste_extract", ctx) + rc2, out2, _ = run_enricher("paste_extract", ctx) + assert rc1 == 0 and rc2 == 0 + keys1 = sorted((e["type_ref"], e["name"]) for e in out1["entities"]) + keys2 = sorted((e["type_ref"], e["name"]) for e in out2["entities"]) + assert keys1 == keys2 + + +# ----------------------------------------------------------------------------- +# Apply-side tests — replican la logica de extract_panel_apply en Python para +# verificar el contrato de dedupe que el panel C++ implementa. Ejercitan que +# (1) entidades nuevas se insertan, (2) duplicadas (type_ref, name) reusan id, +# (3) las relaciones cuyos endpoints estan en la BD se persisten, (4) las que +# no, se descartan. +# +# El panel C++ se prueba al compilar (build verde) y en runtime via la CLI; +# aqui validamos el *contrato* del JSON output que el panel consume. +# ----------------------------------------------------------------------------- + + +def _apply_proposal_python(ops_db_path, proposal: dict) -> dict: + """Implementacion de referencia de extract_panel_apply en Python. + + Coincide con la del C++ — sirve para validar el contrato. Si esta + funcion y la del C++ producen el mismo resultado en los mismos + inputs, el wire-protocol es correcto. + """ + conn = sqlite3.connect(ops_db_path) + try: + ts = "2026-01-01T00:00:00Z" + map_id = {} + added_e = 0 + dedup_e = 0 + for i, e in enumerate(proposal.get("entities", [])): + if not e.get("selected", True): + continue + tref = e["type_ref"]; name = e["name"] + existing = conn.execute( + "SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1", + (tref, name)).fetchone() + if existing: + map_id[e["id"]] = existing[0] + dedup_e += 1 + else: + new_id = f"{tref}_{i}_{name}" + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, " + " metadata, created_at, updated_at) " + "VALUES (?, ?, ?, 'panel:paste_extract', ?, ?, ?)", + (new_id, name, tref, + json.dumps(e.get("metadata", {})), ts, ts)) + map_id[e["id"]] = new_id + added_e += 1 + added_r = 0 + skipped_r = 0 + for j, r in enumerate(proposal.get("relations", [])): + if not r.get("selected", True): + continue + f = map_id.get(r["from_id"]); t = map_id.get(r["to_id"]) + if not f or not t: + skipped_r += 1 + continue + name = r.get("name") or "RELATED_TO" + existing = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " + "AND name=? LIMIT 1", (f, t, name)).fetchone() + if existing: + skipped_r += 1 + continue + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, " + " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", + (f"rel_{j}_{name}", name, f, t, ts, ts)) + added_r += 1 + conn.commit() + return {"added_entities": added_e, "dedup_entities": dedup_e, + "added_relations": added_r, "skipped_relations": skipped_r} + finally: + conn.close() + + +def test_apply_inserts_only_selected(ops_db): + """Solo las entidades con selected=true se insertan.""" + proposal = { + "entities": [ + {"id": "tmp_0", "type_ref": "Email", "name": "a@b.com", + "metadata": {}, "selected": True}, + {"id": "tmp_1", "type_ref": "IPAddress", "name": "1.2.3.4", + "metadata": {}, "selected": False}, # NO seleccionada + {"id": "tmp_2", "type_ref": "CVE", "name": "CVE-2024-1", + "metadata": {}, "selected": True}, + ], + "relations": [], + } + stats = _apply_proposal_python(ops_db, proposal) + assert stats["added_entities"] == 2 + types = {e["type_ref"] for e in list_entities(ops_db)} + assert types == {"Email", "CVE"} + + +def test_apply_dedupes_by_type_and_name(ops_db): + """Reaplicar el mismo proposal NO duplica entidades.""" + proposal = { + "entities": [ + {"id": "tmp_0", "type_ref": "Email", "name": "x@y.z", + "metadata": {}, "selected": True}, + ], + "relations": [], + } + s1 = _apply_proposal_python(ops_db, proposal) + s2 = _apply_proposal_python(ops_db, proposal) + assert s1["added_entities"] == 1 + assert s2["added_entities"] == 0 + assert s2["dedup_entities"] == 1 + # Solo una fila en la BD. + rows = list_entities(ops_db) + assert len(rows) == 1 + + +def test_apply_inserts_relations_when_endpoints_resolve(ops_db): + """Relaciones con endpoints validos (selected) se persisten.""" + proposal = { + "entities": [ + {"id": "tmp_0", "type_ref": "Person", "name": "Alice", + "metadata": {}, "selected": True}, + {"id": "tmp_1", "type_ref": "Organization", "name": "Acme", + "metadata": {}, "selected": True}, + ], + "relations": [ + {"from_id": "tmp_0", "to_id": "tmp_1", + "name": "works_at", "selected": True}, + ], + } + stats = _apply_proposal_python(ops_db, proposal) + assert stats["added_entities"] == 2 + assert stats["added_relations"] == 1 + rels = list_relations(ops_db, name="works_at") + assert len(rels) == 1 + + +def test_apply_skips_relation_if_endpoint_unselected(ops_db): + """Si un endpoint no se selecciona, su relacion se descarta.""" + proposal = { + "entities": [ + {"id": "tmp_0", "type_ref": "Person", "name": "Alice", + "metadata": {}, "selected": True}, + {"id": "tmp_1", "type_ref": "Organization", "name": "Acme", + "metadata": {}, "selected": False}, # NO se inserta + ], + "relations": [ + {"from_id": "tmp_0", "to_id": "tmp_1", + "name": "works_at", "selected": True}, + ], + } + stats = _apply_proposal_python(ops_db, proposal) + assert stats["added_entities"] == 1 + assert stats["added_relations"] == 0 + assert stats["skipped_relations"] == 1 + + +def test_apply_dedupes_relation_on_repeat(ops_db): + """Relacion (from, to, name) repetida no se duplica.""" + proposal = { + "entities": [ + {"id": "tmp_0", "type_ref": "Person", "name": "Alice", + "metadata": {}, "selected": True}, + {"id": "tmp_1", "type_ref": "Organization", "name": "Acme", + "metadata": {}, "selected": True}, + ], + "relations": [ + {"from_id": "tmp_0", "to_id": "tmp_1", + "name": "works_at", "selected": True}, + ], + } + s1 = _apply_proposal_python(ops_db, proposal) + s2 = _apply_proposal_python(ops_db, proposal) + assert s1["added_relations"] == 1 + assert s2["added_relations"] == 0 + assert s2["skipped_relations"] == 1