"""Tests del enricher extract_iocs_text — variante offline de extract_text_entities.""" from __future__ import annotations from conftest import ( base_ctx, list_entities, list_relations, make_node, run_enricher, ) SAMPLE_TEXT = ( "Reporte de incidente. Contactar a bad@evil.example o a otra@victim.example. " "IPs vistas: 192.0.2.55 y 10.0.0.12. CVE referenciado: CVE-2024-12345. " "Hash: 44d88612fea8a8f36de82e1278abb02f." ) def _ioc_paragraph(n: int) -> str: """Genera texto con muchos IoCs (mezcla de emails, IPs, CVEs).""" parts = [] # n/3 emails, n/3 IPs, n/3 CVEs aprox. for i in range(n // 3 + 1): parts.append(f"contact{i:03d}@example{i % 7}.org") for i in range(n // 3 + 1): # IPs validas en rango 10.x.x.x a = (i // 256) % 256 b = i % 256 parts.append(f"10.{a}.{b}.5") for i in range(n // 3 + 1): parts.append(f"CVE-2024-{10000 + i}") return ", ".join(parts) + "." def test_extract_iocs_text_finds_email_and_ip(ops_db, app_dir, registry_root): """Texto con emails, IPs, CVE, hash → entidades creadas con tipos correctos.""" make_node(ops_db, node_id="t1", name="incident", type_ref="text", notes=SAMPLE_TEXT) ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, node_id="t1", node_name="incident", node_type="text") rc, out, err = run_enricher("extract_iocs_text", ctx) assert rc == 0, err assert out is not None assert out["entities_added"] >= 3, out types = {e["type_ref"] for e in list_entities(ops_db) if e["type_ref"] not in ("text", "Group")} assert "Email" in types, types # CVE casi seguro presente; IP/hash/dominios pueden o no segun extract_iocs. assert "CVE" in types, types rels = list_relations(ops_db, name="EXTRACTED_FROM") assert len(rels) >= 3 assert all(r["to_entity"] == "t1" for r in rels) def test_extract_iocs_text_uses_notes_priority(ops_db, app_dir, registry_root): """`entities.notes` se prioriza sobre node_name.""" make_node(ops_db, node_id="t1", name="placeholder", type_ref="text", notes=SAMPLE_TEXT) ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, node_id="t1", node_name="placeholder", node_type="text") rc, out, err = run_enricher("extract_iocs_text", ctx) assert rc == 0, err # El name "placeholder" no contiene IoCs; si se hubiese usado, no # habria entidades. Ergo entities_added > 0 demuestra que leyo notes. assert out["entities_added"] >= 2, out def test_extract_iocs_text_no_text_fails(ops_db, app_dir, registry_root): """Sin texto → exit 2 con error claro.""" make_node(ops_db, node_id="t1", name="", type_ref="text", metadata={}) ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, node_id="t1", node_name="", node_type="text") rc, out, err = run_enricher("extract_iocs_text", ctx) assert rc == 2 assert out is not None assert "sin texto" in (out.get("error") or "") def test_extract_iocs_text_above_threshold_creates_group(ops_db, app_dir, registry_root): """>=50 IoCs → Group heterogeneo con todos dentro (fase 1).""" text = _ioc_paragraph(180) # ~60 emails + ~60 IPs + ~60 CVEs make_node(ops_db, node_id="t1", name="dump", type_ref="text", notes=text) ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, node_id="t1", node_name="dump", node_type="text") rc, out, err = run_enricher("extract_iocs_text", ctx) assert rc == 0, err assert out["iocs_found"] >= 50, out if out["grouped"]: groups = list_entities(ops_db, type_ref="Group") assert len(groups) == 1 g = groups[0] assert g["metadata"]["enricher"] == "extract_iocs_text" assert g["metadata"]["count"] == out["iocs_found"] assert g["metadata"]["source_node_id"] == "t1" # K primeros sueltos, resto agrupados (heterogeneo). non_group_iocs = [e for e in list_entities(ops_db) if e["type_ref"] not in ("text", "Group")] sueltos = [e for e in non_group_iocs if e["group_id"] is None] agrupados = [e for e in non_group_iocs if e["group_id"] == g["id"]] # K=10 sueltos exactos. assert len(sueltos) == 10 assert len(agrupados) == out["iocs_found"] - 10 # EXTRACTED_FROM del Group al source. rels = list_relations(ops_db, name="EXTRACTED_FROM") to_t1_from_group = [r for r in rels if r["to_entity"] == "t1" and r["from_entity"] == g["id"]] assert len(to_t1_from_group) == 1