2a5127fcaf
El campo `notes` es lo que el usuario escribe en el panel Note del
Inspector (doble click sobre el nodo) — sitio canonico para texto
largo. Antes los enrichers leian metadata.text/description/query como
prioridad, dejando notes ignorado y forzando al usuario a inyectar
texto via la UI metadata-extra (poco descubrible).
Cambios:
- Ambos run.py abren la BD y leen `entities.notes` por SQL antes de
fallback a node_name. metadata.text/description/query ya no se
consultan (KISS — solo notes y name).
- conftest.make_node admite kwarg `notes` para inyectar contenido
en la columna notes desde tests.
- Tests actualizados: SAMPLE_TEXT y los IoC dumps van por `notes=`
en lugar de `metadata={"text": ...}`.
- Renombrado el test que verificaba prioridad: ahora se llama
`*_uses_notes_priority` y verifica notes > name.
Tests verdes WSL (44) y Windows (33 + 11 skipped).
114 lines
4.7 KiB
Python
114 lines
4.7 KiB
Python
"""Tests del enricher extract_iocs_text — variante offline de extract_text_entities."""
|
|
from __future__ import annotations
|
|
|
|
from conftest import (
|
|
base_ctx, list_entities, list_relations, make_node, run_enricher,
|
|
)
|
|
|
|
|
|
SAMPLE_TEXT = (
|
|
"Reporte de incidente. Contactar a bad@evil.example o a otra@victim.example. "
|
|
"IPs vistas: 192.0.2.55 y 10.0.0.12. CVE referenciado: CVE-2024-12345. "
|
|
"Hash: 44d88612fea8a8f36de82e1278abb02f."
|
|
)
|
|
|
|
|
|
def _ioc_paragraph(n: int) -> str:
|
|
"""Genera texto con muchos IoCs (mezcla de emails, IPs, CVEs)."""
|
|
parts = []
|
|
# n/3 emails, n/3 IPs, n/3 CVEs aprox.
|
|
for i in range(n // 3 + 1):
|
|
parts.append(f"contact{i:03d}@example{i % 7}.org")
|
|
for i in range(n // 3 + 1):
|
|
# IPs validas en rango 10.x.x.x
|
|
a = (i // 256) % 256
|
|
b = i % 256
|
|
parts.append(f"10.{a}.{b}.5")
|
|
for i in range(n // 3 + 1):
|
|
parts.append(f"CVE-2024-{10000 + i}")
|
|
return ", ".join(parts) + "."
|
|
|
|
|
|
def test_extract_iocs_text_finds_email_and_ip(ops_db, app_dir, registry_root):
|
|
"""Texto con emails, IPs, CVE, hash → entidades creadas con tipos correctos."""
|
|
make_node(ops_db, node_id="t1", name="incident",
|
|
type_ref="text", notes=SAMPLE_TEXT)
|
|
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
|
node_id="t1", node_name="incident", node_type="text")
|
|
|
|
rc, out, err = run_enricher("extract_iocs_text", ctx)
|
|
assert rc == 0, err
|
|
assert out is not None
|
|
assert out["entities_added"] >= 3, out
|
|
|
|
types = {e["type_ref"] for e in list_entities(ops_db)
|
|
if e["type_ref"] not in ("text", "Group")}
|
|
assert "Email" in types, types
|
|
# CVE casi seguro presente; IP/hash/dominios pueden o no segun extract_iocs.
|
|
assert "CVE" in types, types
|
|
|
|
rels = list_relations(ops_db, name="EXTRACTED_FROM")
|
|
assert len(rels) >= 3
|
|
assert all(r["to_entity"] == "t1" for r in rels)
|
|
|
|
|
|
def test_extract_iocs_text_uses_notes_priority(ops_db, app_dir, registry_root):
|
|
"""`entities.notes` se prioriza sobre node_name."""
|
|
make_node(ops_db, node_id="t1", name="placeholder",
|
|
type_ref="text", notes=SAMPLE_TEXT)
|
|
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
|
node_id="t1", node_name="placeholder", node_type="text")
|
|
rc, out, err = run_enricher("extract_iocs_text", ctx)
|
|
assert rc == 0, err
|
|
# El name "placeholder" no contiene IoCs; si se hubiese usado, no
|
|
# habria entidades. Ergo entities_added > 0 demuestra que leyo notes.
|
|
assert out["entities_added"] >= 2, out
|
|
|
|
|
|
def test_extract_iocs_text_no_text_fails(ops_db, app_dir, registry_root):
|
|
"""Sin texto → exit 2 con error claro."""
|
|
make_node(ops_db, node_id="t1", name="", type_ref="text", metadata={})
|
|
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
|
node_id="t1", node_name="", node_type="text")
|
|
rc, out, err = run_enricher("extract_iocs_text", ctx)
|
|
assert rc == 2
|
|
assert out is not None
|
|
assert "sin texto" in (out.get("error") or "")
|
|
|
|
|
|
def test_extract_iocs_text_above_threshold_creates_group(ops_db, app_dir,
|
|
registry_root):
|
|
""">=50 IoCs → Group heterogeneo con todos dentro (fase 1)."""
|
|
text = _ioc_paragraph(180) # ~60 emails + ~60 IPs + ~60 CVEs
|
|
make_node(ops_db, node_id="t1", name="dump",
|
|
type_ref="text", notes=text)
|
|
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
|
node_id="t1", node_name="dump", node_type="text")
|
|
rc, out, err = run_enricher("extract_iocs_text", ctx)
|
|
assert rc == 0, err
|
|
assert out["iocs_found"] >= 50, out
|
|
|
|
if out["grouped"]:
|
|
groups = list_entities(ops_db, type_ref="Group")
|
|
assert len(groups) == 1
|
|
g = groups[0]
|
|
assert g["metadata"]["enricher"] == "extract_iocs_text"
|
|
assert g["metadata"]["count"] == out["iocs_found"]
|
|
assert g["metadata"]["source_node_id"] == "t1"
|
|
|
|
# K primeros sueltos, resto agrupados (heterogeneo).
|
|
non_group_iocs = [e for e in list_entities(ops_db)
|
|
if e["type_ref"] not in ("text", "Group")]
|
|
sueltos = [e for e in non_group_iocs if e["group_id"] is None]
|
|
agrupados = [e for e in non_group_iocs if e["group_id"] == g["id"]]
|
|
# K=10 sueltos exactos.
|
|
assert len(sueltos) == 10
|
|
assert len(agrupados) == out["iocs_found"] - 10
|
|
|
|
# EXTRACTED_FROM del Group al source.
|
|
rels = list_relations(ops_db, name="EXTRACTED_FROM")
|
|
to_t1_from_group = [r for r in rels
|
|
if r["to_entity"] == "t1"
|
|
and r["from_entity"] == g["id"]]
|
|
assert len(to_t1_from_group) == 1
|