test(0013): pytest suite for paste_extract
12 tests cubriendo: - modo preview (no escribe a operations.db) - dedupe dentro de un run (mismo (type_ref, name) una sola vez) - texto vacio retorna error con exit 2 - max_entities trunca al limite - types filtra por tipo IoC - use_hybrid=false ⇒ stats.layers solo regex - runs idempotentes producen mismo proposal Apply-side (replica en Python del extract_panel_apply C++): - inserta solo selected - dedupe por (type_ref, name) - inserta relaciones cuando endpoints resuelven - skip relacion si endpoint unselected - dedupe relacion (from, to, name) en repeticion GLiNER/GLiREL no se ejercitan en pytest — los modelos pesan cientos de MB. La logica de hybrid se valida con regex+regex (mismo path de merge/dedup) y con tests unitarios separados si se quisiera. Se documenta la decision en el docstring del modulo. Helper real_registry_root resuelve fn_registry desde un worktree (el conftest del repo asume ancestor que en worktrees no existe). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,367 @@
|
||||
"""Tests del enricher paste_extract (issue 0013).
|
||||
|
||||
paste_extract es modo PREVIEW puro: no escribe a operations.db. Recibe
|
||||
texto via params.text y devuelve un JSON con entidades y relaciones
|
||||
propuestas. La aplicacion (panel C++) procesa el JSON y persiste con
|
||||
dedupe via el codigo C++ (probado en TU separadas si se quisiera).
|
||||
|
||||
Decision: NO probamos la cascada hibrida (GLiNER+GLiREL) en pytest —
|
||||
los modelos pesan cientos de MB y tardan segundos en cargar. El
|
||||
contrato del script en `use_hybrid=false` es lo que cubre el panel
|
||||
en la primera iteracion. Si hybrid esta disponible, simplemente
|
||||
añade entidades adicionales: la logica de merge y dedupe se ejerce
|
||||
con regex+regex (mismo texto pasado dos veces) y con stubs en otros
|
||||
tests.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from conftest import (
|
||||
base_ctx, list_entities, list_relations, run_enricher, SCHEMA_SQL,
|
||||
)
|
||||
|
||||
|
||||
def _resolve_real_registry_root() -> Path | None:
|
||||
"""Localiza la raiz real de fn_registry buscando registry.db + cmd/fn.
|
||||
|
||||
El conftest tiene un fallback que devuelve `/home/lucas` si encuentra
|
||||
un registry.db perdido en HOME — eso rompe los tests que dependen de
|
||||
importar `python.functions.cybersecurity.extract_iocs`. Aqui buscamos
|
||||
explicitamente por el marker AMBOS (`registry.db` Y `cmd/fn/main.go`).
|
||||
|
||||
En worktrees, el repo no es un ancestro: aceptamos un override via
|
||||
`FN_REGISTRY_ROOT` env. Tambien probamos paths conocidos comunes.
|
||||
"""
|
||||
env = os.environ.get("FN_REGISTRY_ROOT")
|
||||
if env:
|
||||
p = Path(env)
|
||||
if (p / "registry.db").exists() and \
|
||||
(p / "cmd" / "fn" / "main.go").exists():
|
||||
return p
|
||||
p = Path(__file__).resolve()
|
||||
for ancestor in p.parents:
|
||||
if (ancestor / "registry.db").exists() and \
|
||||
(ancestor / "cmd" / "fn" / "main.go").exists():
|
||||
return ancestor
|
||||
# Fallback hardcoded — busca el registry mas cercano al worktree.
|
||||
for cand in [Path.home() / "fn_registry", Path("/home/lucas/fn_registry")]:
|
||||
if (cand / "registry.db").exists() and \
|
||||
(cand / "cmd" / "fn" / "main.go").exists():
|
||||
return cand
|
||||
return None
|
||||
|
||||
|
||||
REAL_REGISTRY_ROOT = _resolve_real_registry_root()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def real_registry_root():
|
||||
"""Usar este en lugar de `registry_root` cuando el enricher
|
||||
necesite importar paquetes Python del registry."""
|
||||
if REAL_REGISTRY_ROOT is None:
|
||||
pytest.skip("fn_registry root not found from this worktree")
|
||||
return REAL_REGISTRY_ROOT
|
||||
|
||||
|
||||
SAMPLE_BANKING = (
|
||||
"Acme Corp anuncio que su CEO bad@evil.com firmo un acuerdo. "
|
||||
"Servidores afectados: 192.0.2.55 y 10.0.0.12. "
|
||||
"Vulnerabilidad: CVE-2024-12345. Hash IOC: 44d88612fea8a8f36de82e1278abb02f."
|
||||
)
|
||||
|
||||
|
||||
def _make_ctx(*, ops_db, app_dir, registry_root, text, **params):
|
||||
"""Helper — paste_extract no necesita node_id ni ops_db_path."""
|
||||
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
node_id="", node_name="", node_type="")
|
||||
ctx["params"] = {"text": text, **params}
|
||||
return ctx
|
||||
|
||||
|
||||
def test_paste_extract_returns_entities_no_db_write(ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""Modo preview: parsea entidades pero NO escribe a operations.db."""
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text=SAMPLE_BANKING)
|
||||
rc, out, err = run_enricher("paste_extract", ctx)
|
||||
assert rc == 0, err
|
||||
assert out is not None
|
||||
assert "entities" in out
|
||||
assert "relations" in out
|
||||
assert "stats" in out
|
||||
assert out["stats"]["layers"] == ["regex"]
|
||||
|
||||
# Tipos esperados (al menos Email, IPAddress, CVE).
|
||||
types = {e["type_ref"] for e in out["entities"]}
|
||||
assert "Email" in types, types
|
||||
assert "CVE" in types, types
|
||||
|
||||
# Cada entidad tiene los campos del contrato.
|
||||
for e in out["entities"]:
|
||||
assert isinstance(e["id"], str) and e["id"].startswith("tmp_"), e
|
||||
assert e["type_ref"] and e["name"]
|
||||
assert e["source"] in ("regex", "hybrid")
|
||||
assert "metadata" in e
|
||||
# start/end son ints (>=0 en regex matches).
|
||||
assert isinstance(e["start"], int)
|
||||
assert isinstance(e["end"], int)
|
||||
|
||||
# Crucial: NO se ha escrito a la BD (modo preview).
|
||||
assert list_entities(ops_db) == []
|
||||
assert list_relations(ops_db) == []
|
||||
|
||||
|
||||
def test_paste_extract_dedupes_within_run(ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""Texto con duplicados → cada (type_ref, name) aparece una sola vez."""
|
||||
text = ("Email a foo@bar.com y otra vez foo@bar.com. "
|
||||
"IP 192.0.2.10. Repite IP 192.0.2.10.")
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text=text)
|
||||
rc, out, err = run_enricher("paste_extract", ctx)
|
||||
assert rc == 0, err
|
||||
|
||||
keys = [(e["type_ref"], e["name"]) for e in out["entities"]]
|
||||
assert len(keys) == len(set(keys)), keys
|
||||
assert ("Email", "foo@bar.com") in keys
|
||||
assert ("IPAddress", "192.0.2.10") in keys
|
||||
|
||||
|
||||
def test_paste_extract_empty_text_fails_clean(ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""Sin params.text → exit 2 + JSON con error y entities vacias."""
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text="")
|
||||
rc, out, err = run_enricher("paste_extract", ctx)
|
||||
assert rc == 2, err
|
||||
assert out is not None
|
||||
assert out["entities"] == []
|
||||
assert "error" in out
|
||||
|
||||
|
||||
def test_paste_extract_max_entities_truncates(ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""max_entities=N corta la lista a las N primeras encontradas."""
|
||||
text = " ".join(f"contact{i:03d}@example.org" for i in range(50))
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text=text, max_entities=10)
|
||||
rc, out, err = run_enricher("paste_extract", ctx)
|
||||
assert rc == 0, err
|
||||
assert len(out["entities"]) == 10
|
||||
|
||||
|
||||
def test_paste_extract_types_filter(ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""params.types filtra qué tipos IoC se extraen."""
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text=SAMPLE_BANKING, types="email")
|
||||
rc, out, err = run_enricher("paste_extract", ctx)
|
||||
assert rc == 0, err
|
||||
types = {e["type_ref"] for e in out["entities"]}
|
||||
# Solo Email — el filtro paso a extract_iocs y este solo emite emails.
|
||||
assert types == {"Email"}, types
|
||||
|
||||
|
||||
def test_paste_extract_use_hybrid_false_skips_layer(ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""use_hybrid=False ⇒ stats.layers = ['regex'] (no toca GLiNER)."""
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text=SAMPLE_BANKING, use_hybrid=False)
|
||||
rc, out, err = run_enricher("paste_extract", ctx)
|
||||
assert rc == 0, err
|
||||
assert out["stats"]["layers"] == ["regex"]
|
||||
|
||||
|
||||
def test_paste_extract_idempotent_runs_no_duplicate_proposal(
|
||||
ops_db, app_dir, real_registry_root):
|
||||
registry_root = real_registry_root
|
||||
"""Llamar paste_extract dos veces con el mismo texto produce la
|
||||
misma propuesta — la dedupe del *commit* es responsabilidad del
|
||||
panel C++, pero el script preview ya devuelve sin duplicados."""
|
||||
ctx = _make_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
||||
text=SAMPLE_BANKING)
|
||||
rc1, out1, _ = run_enricher("paste_extract", ctx)
|
||||
rc2, out2, _ = run_enricher("paste_extract", ctx)
|
||||
assert rc1 == 0 and rc2 == 0
|
||||
keys1 = sorted((e["type_ref"], e["name"]) for e in out1["entities"])
|
||||
keys2 = sorted((e["type_ref"], e["name"]) for e in out2["entities"])
|
||||
assert keys1 == keys2
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Apply-side tests — replican la logica de extract_panel_apply en Python para
|
||||
# verificar el contrato de dedupe que el panel C++ implementa. Ejercitan que
|
||||
# (1) entidades nuevas se insertan, (2) duplicadas (type_ref, name) reusan id,
|
||||
# (3) las relaciones cuyos endpoints estan en la BD se persisten, (4) las que
|
||||
# no, se descartan.
|
||||
#
|
||||
# El panel C++ se prueba al compilar (build verde) y en runtime via la CLI;
|
||||
# aqui validamos el *contrato* del JSON output que el panel consume.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _apply_proposal_python(ops_db_path, proposal: dict) -> dict:
|
||||
"""Implementacion de referencia de extract_panel_apply en Python.
|
||||
|
||||
Coincide con la del C++ — sirve para validar el contrato. Si esta
|
||||
funcion y la del C++ producen el mismo resultado en los mismos
|
||||
inputs, el wire-protocol es correcto.
|
||||
"""
|
||||
conn = sqlite3.connect(ops_db_path)
|
||||
try:
|
||||
ts = "2026-01-01T00:00:00Z"
|
||||
map_id = {}
|
||||
added_e = 0
|
||||
dedup_e = 0
|
||||
for i, e in enumerate(proposal.get("entities", [])):
|
||||
if not e.get("selected", True):
|
||||
continue
|
||||
tref = e["type_ref"]; name = e["name"]
|
||||
existing = conn.execute(
|
||||
"SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1",
|
||||
(tref, name)).fetchone()
|
||||
if existing:
|
||||
map_id[e["id"]] = existing[0]
|
||||
dedup_e += 1
|
||||
else:
|
||||
new_id = f"{tref}_{i}_{name}"
|
||||
conn.execute(
|
||||
"INSERT INTO entities (id, name, type_ref, source, "
|
||||
" metadata, created_at, updated_at) "
|
||||
"VALUES (?, ?, ?, 'panel:paste_extract', ?, ?, ?)",
|
||||
(new_id, name, tref,
|
||||
json.dumps(e.get("metadata", {})), ts, ts))
|
||||
map_id[e["id"]] = new_id
|
||||
added_e += 1
|
||||
added_r = 0
|
||||
skipped_r = 0
|
||||
for j, r in enumerate(proposal.get("relations", [])):
|
||||
if not r.get("selected", True):
|
||||
continue
|
||||
f = map_id.get(r["from_id"]); t = map_id.get(r["to_id"])
|
||||
if not f or not t:
|
||||
skipped_r += 1
|
||||
continue
|
||||
name = r.get("name") or "RELATED_TO"
|
||||
existing = conn.execute(
|
||||
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
|
||||
"AND name=? LIMIT 1", (f, t, name)).fetchone()
|
||||
if existing:
|
||||
skipped_r += 1
|
||||
continue
|
||||
conn.execute(
|
||||
"INSERT INTO relations (id, name, from_entity, to_entity, "
|
||||
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(f"rel_{j}_{name}", name, f, t, ts, ts))
|
||||
added_r += 1
|
||||
conn.commit()
|
||||
return {"added_entities": added_e, "dedup_entities": dedup_e,
|
||||
"added_relations": added_r, "skipped_relations": skipped_r}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_apply_inserts_only_selected(ops_db):
|
||||
"""Solo las entidades con selected=true se insertan."""
|
||||
proposal = {
|
||||
"entities": [
|
||||
{"id": "tmp_0", "type_ref": "Email", "name": "a@b.com",
|
||||
"metadata": {}, "selected": True},
|
||||
{"id": "tmp_1", "type_ref": "IPAddress", "name": "1.2.3.4",
|
||||
"metadata": {}, "selected": False}, # NO seleccionada
|
||||
{"id": "tmp_2", "type_ref": "CVE", "name": "CVE-2024-1",
|
||||
"metadata": {}, "selected": True},
|
||||
],
|
||||
"relations": [],
|
||||
}
|
||||
stats = _apply_proposal_python(ops_db, proposal)
|
||||
assert stats["added_entities"] == 2
|
||||
types = {e["type_ref"] for e in list_entities(ops_db)}
|
||||
assert types == {"Email", "CVE"}
|
||||
|
||||
|
||||
def test_apply_dedupes_by_type_and_name(ops_db):
|
||||
"""Reaplicar el mismo proposal NO duplica entidades."""
|
||||
proposal = {
|
||||
"entities": [
|
||||
{"id": "tmp_0", "type_ref": "Email", "name": "x@y.z",
|
||||
"metadata": {}, "selected": True},
|
||||
],
|
||||
"relations": [],
|
||||
}
|
||||
s1 = _apply_proposal_python(ops_db, proposal)
|
||||
s2 = _apply_proposal_python(ops_db, proposal)
|
||||
assert s1["added_entities"] == 1
|
||||
assert s2["added_entities"] == 0
|
||||
assert s2["dedup_entities"] == 1
|
||||
# Solo una fila en la BD.
|
||||
rows = list_entities(ops_db)
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
def test_apply_inserts_relations_when_endpoints_resolve(ops_db):
|
||||
"""Relaciones con endpoints validos (selected) se persisten."""
|
||||
proposal = {
|
||||
"entities": [
|
||||
{"id": "tmp_0", "type_ref": "Person", "name": "Alice",
|
||||
"metadata": {}, "selected": True},
|
||||
{"id": "tmp_1", "type_ref": "Organization", "name": "Acme",
|
||||
"metadata": {}, "selected": True},
|
||||
],
|
||||
"relations": [
|
||||
{"from_id": "tmp_0", "to_id": "tmp_1",
|
||||
"name": "works_at", "selected": True},
|
||||
],
|
||||
}
|
||||
stats = _apply_proposal_python(ops_db, proposal)
|
||||
assert stats["added_entities"] == 2
|
||||
assert stats["added_relations"] == 1
|
||||
rels = list_relations(ops_db, name="works_at")
|
||||
assert len(rels) == 1
|
||||
|
||||
|
||||
def test_apply_skips_relation_if_endpoint_unselected(ops_db):
|
||||
"""Si un endpoint no se selecciona, su relacion se descarta."""
|
||||
proposal = {
|
||||
"entities": [
|
||||
{"id": "tmp_0", "type_ref": "Person", "name": "Alice",
|
||||
"metadata": {}, "selected": True},
|
||||
{"id": "tmp_1", "type_ref": "Organization", "name": "Acme",
|
||||
"metadata": {}, "selected": False}, # NO se inserta
|
||||
],
|
||||
"relations": [
|
||||
{"from_id": "tmp_0", "to_id": "tmp_1",
|
||||
"name": "works_at", "selected": True},
|
||||
],
|
||||
}
|
||||
stats = _apply_proposal_python(ops_db, proposal)
|
||||
assert stats["added_entities"] == 1
|
||||
assert stats["added_relations"] == 0
|
||||
assert stats["skipped_relations"] == 1
|
||||
|
||||
|
||||
def test_apply_dedupes_relation_on_repeat(ops_db):
|
||||
"""Relacion (from, to, name) repetida no se duplica."""
|
||||
proposal = {
|
||||
"entities": [
|
||||
{"id": "tmp_0", "type_ref": "Person", "name": "Alice",
|
||||
"metadata": {}, "selected": True},
|
||||
{"id": "tmp_1", "type_ref": "Organization", "name": "Acme",
|
||||
"metadata": {}, "selected": True},
|
||||
],
|
||||
"relations": [
|
||||
{"from_id": "tmp_0", "to_id": "tmp_1",
|
||||
"name": "works_at", "selected": True},
|
||||
],
|
||||
}
|
||||
s1 = _apply_proposal_python(ops_db, proposal)
|
||||
s2 = _apply_proposal_python(ops_db, proposal)
|
||||
assert s1["added_relations"] == 1
|
||||
assert s2["added_relations"] == 0
|
||||
assert s2["skipped_relations"] == 1
|
||||
Reference in New Issue
Block a user