6919ebfe9c
Anade enricher web_search aplicable a nodos text/Concept/Topic. Hace POST a html.duckduckgo.com con la query del nodo, parsea resultados con HTMLParser stdlib, decodifica el redirect uddg= y crea N nodos Url con relacion SEARCH_RESULT_OF apuntando al nodo origen. Encadenable: tras web_search, fetch_webpage sobre cada Url completa el pipeline search -> fetch -> extract. Defensa contra ops_db_path mal resuelto: normaliza backslashes, resuelve relativo contra app_dir, valida que la tabla entities exista antes de tocar nada (exit codes 7/8/9 con JSON resumen). Tests pytest (16/16 verde): conftest con operations.db temp + schema minimo, stub de requests via PYTHONPATH para mockear red. Cubre los 5 enrichers (extract_domain, fetch_webpage, extract_links, extract_text_entities, web_search) + sanity check de manifests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
60 lines
2.1 KiB
Python
60 lines
2.1 KiB
Python
"""Tests del enricher extract_text_entities — regex IoCs sobre markdown."""
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from conftest import (
|
|
base_ctx, list_entities, list_relations, make_node, run_enricher,
|
|
)
|
|
|
|
|
|
# Texto con varios IoCs detectables por extract_iocs (regex puro).
|
|
SAMPLE_MD = """# Reporte
|
|
|
|
Indicators:
|
|
- Email: bad@evil.example y otra@victim.example
|
|
- IP: 192.0.2.55
|
|
- CVE: CVE-2024-12345
|
|
- Hash: 44d88612fea8a8f36de82e1278abb02f
|
|
"""
|
|
|
|
|
|
def test_extract_iocs_creates_typed_entities(ops_db, app_dir, registry_root):
|
|
md_dir = Path(app_dir) / "cache" / "cd"
|
|
md_dir.mkdir(parents=True, exist_ok=True)
|
|
md_path = md_dir / "ddd.md"
|
|
md_path.write_text(SAMPLE_MD, encoding="utf-8")
|
|
rel = md_path.relative_to(app_dir)
|
|
|
|
make_node(ops_db, node_id="w1", name="report",
|
|
type_ref="Webpage", metadata={"markdown_path": str(rel)})
|
|
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
|
node_id="w1", node_name="report", node_type="Webpage",
|
|
metadata={"markdown_path": str(rel)})
|
|
|
|
rc, out, err = run_enricher("extract_text_entities", ctx)
|
|
assert rc == 0, err
|
|
assert out is not None
|
|
assert out["entities_added"] >= 3, out
|
|
|
|
types = {e["type_ref"] for e in list_entities(ops_db)
|
|
if e["type_ref"] != "Webpage"}
|
|
# No exigimos todos los tipos — depende de que extract_iocs cubra cada
|
|
# patron — pero al menos Email y CVE deberian estar.
|
|
assert "Email" in types, types
|
|
assert "CVE" in types, types
|
|
|
|
rels = list_relations(ops_db, name="EXTRACTED_FROM")
|
|
assert len(rels) >= 3
|
|
assert all(r["to_entity"] == "w1" for r in rels)
|
|
|
|
|
|
def test_extract_iocs_without_markdown_errors(ops_db, app_dir, registry_root):
|
|
make_node(ops_db, node_id="w1", name="empty",
|
|
type_ref="Webpage", metadata={})
|
|
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
|
|
node_id="w1", node_name="empty", node_type="Webpage")
|
|
rc, out, err = run_enricher("extract_text_entities", ctx)
|
|
assert rc != 0
|
|
assert out and "missing markdown_path" in (out.get("error") or "")
|