ee0d26ce2d
Cada enricher con `lang: python` y `uses_functions` no vacio ahora
puede empaquetar las funciones del registry que necesita en
`<enricher>/_vendored/`. El run.py importa de ahi en lugar de
`<registry_root>/python/functions/`, lo que hace al binario
distribuible sin dependencia de un fn_registry montado.
Cambios:
1. tools/vendor_enricher_python.sh
- Lee `uses_functions` del manifest (filtrando IDs `*_py_*`).
- Resuelve `file_path` desde registry.db.
- Copia recursivamente con expansion transitiva: si un fichero
vendorizado importa siblings del mismo dominio, los siblings
tambien se copian (resuelve el caso `extract_iocs.py` que
importa 7 modulos hermanos).
- Genera `.vendor.lock` con `<id> <sha256> <src_path>` por
funcion declarada para auditoria.
- Idempotente — si todos los hashes coinciden, no rehace nada.
2. Manifests actualizados con `uses_functions`:
- fetch_webpage: normalize_url + html_to_markdown
- extract_links: extract_urls
- extract_text_entities: extract_iocs
3. run.py de los 3 enrichers afectados: importan de `_vendored/`
si existe, fallback a `<registry_root>/python/functions/` en
modo dev (mantiene los tests pytest funcionando).
4. app.md: anade `cryptography` a python_runtime_deps porque el
blob `cybersecurity.cybersecurity` lo importa al top.
5. Tests:
- test_vendor_script.py — 6 tests del script: layout correcto,
transitive siblings, lock con SHA256, idempotencia, modulos
importables en aislamiento.
- 16 tests de enrichers existentes pasan via vendoring (no usan
registry_root porque _vendored/ tiene prioridad).
6. Issue 0033b movido a issues/completed/.
Tests: 32/32 verde (16 enrichers + 6 dispatcher + 4 runtime + 6
vendor).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
135 lines
4.7 KiB
Python
135 lines
4.7 KiB
Python
"""Tests del script tools/vendor_enricher_python.sh (issue 0033b).
|
|
|
|
Verifica:
|
|
- manifest sin uses_functions Python -> no crea _vendored/.
|
|
- manifest con un uses_functions -> copia el .py + __init__.
|
|
- dep transitiva (extract_iocs importa siblings) -> copia siblings.
|
|
- .vendor.lock con SHA256 + path origen.
|
|
- Idempotencia: 2da llamada con mismo estado no rehace nada.
|
|
- Cambio en el manifest invalida el lock.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from conftest import APP_DIR_SRC, REGISTRY_ROOT
|
|
|
|
|
|
SCRIPT = APP_DIR_SRC / "tools" / "vendor_enricher_python.sh"
|
|
|
|
|
|
def _make_enricher_dir(tmp_path: Path, manifest: str) -> Path:
|
|
enr = tmp_path / "test_enricher"
|
|
enr.mkdir()
|
|
(enr / "manifest.yaml").write_text(manifest, encoding="utf-8")
|
|
(enr / "run.py").write_text("# stub\n", encoding="utf-8")
|
|
return enr
|
|
|
|
|
|
def _run_vendor(enr_dir: Path) -> subprocess.CompletedProcess:
|
|
return subprocess.run(
|
|
["bash", str(SCRIPT), str(enr_dir), str(REGISTRY_ROOT)],
|
|
capture_output=True, text=True, timeout=20,
|
|
)
|
|
|
|
|
|
def test_no_uses_functions_does_not_create_vendored(tmp_path):
|
|
enr = _make_enricher_dir(tmp_path,
|
|
"id: x\nname: x\napplies_to: [text]\n")
|
|
proc = _run_vendor(enr)
|
|
assert proc.returncode == 0, proc.stderr
|
|
assert not (enr / "_vendored").exists()
|
|
assert not (enr / ".vendor.lock").exists()
|
|
|
|
|
|
def test_single_dep_creates_vendored_layout(tmp_path):
|
|
enr = _make_enricher_dir(tmp_path,
|
|
"id: x\nname: x\napplies_to: [Url]\n"
|
|
"uses_functions:\n"
|
|
" - normalize_url_py_cybersecurity\n")
|
|
proc = _run_vendor(enr)
|
|
assert proc.returncode == 0, proc.stderr
|
|
assert (enr / "_vendored" / "__init__.py").exists()
|
|
assert (enr / "_vendored" / "cybersecurity" / "__init__.py").exists()
|
|
assert (enr / "_vendored" / "cybersecurity" / "cybersecurity.py").exists()
|
|
assert (enr / ".vendor.lock").exists()
|
|
lock = (enr / ".vendor.lock").read_text()
|
|
assert "normalize_url_py_cybersecurity" in lock
|
|
|
|
|
|
def test_transitive_siblings_are_copied(tmp_path):
|
|
"""extract_iocs.py importa 7 modulos siblings — todos deben venir."""
|
|
enr = _make_enricher_dir(tmp_path,
|
|
"id: x\nname: x\napplies_to: [Webpage]\n"
|
|
"uses_functions:\n"
|
|
" - extract_iocs_py_cybersecurity\n")
|
|
proc = _run_vendor(enr)
|
|
assert proc.returncode == 0, proc.stderr
|
|
cyb = enr / "_vendored" / "cybersecurity"
|
|
assert (cyb / "extract_iocs.py").exists()
|
|
expected_siblings = {
|
|
"extract_ip_addresses.py", "extract_emails.py",
|
|
"extract_domains.py", "extract_file_hashes.py",
|
|
"extract_crypto_wallets.py", "extract_cve_ids.py",
|
|
"extract_mac_addresses.py", "extract_phone_numbers.py",
|
|
}
|
|
found = {p.name for p in cyb.glob("*.py")}
|
|
missing = expected_siblings - found
|
|
assert not missing, f"siblings no copiados: {missing}"
|
|
|
|
|
|
def test_lock_contains_correct_sha256(tmp_path):
|
|
enr = _make_enricher_dir(tmp_path,
|
|
"id: x\nname: x\napplies_to: [Url]\n"
|
|
"uses_functions:\n - normalize_url_py_cybersecurity\n")
|
|
proc = _run_vendor(enr)
|
|
assert proc.returncode == 0, proc.stderr
|
|
|
|
src = REGISTRY_ROOT / "python" / "functions" / "cybersecurity" / "cybersecurity.py"
|
|
expected_sha = hashlib.sha256(src.read_bytes()).hexdigest()
|
|
|
|
lock = (enr / ".vendor.lock").read_text()
|
|
assert expected_sha in lock, lock
|
|
|
|
|
|
def test_idempotency_skips_when_unchanged(tmp_path):
|
|
enr = _make_enricher_dir(tmp_path,
|
|
"id: x\nname: x\napplies_to: [Url]\n"
|
|
"uses_functions:\n - normalize_url_py_cybersecurity\n")
|
|
p1 = _run_vendor(enr)
|
|
assert p1.returncode == 0
|
|
p2 = _run_vendor(enr)
|
|
assert p2.returncode == 0
|
|
assert "sin cambios" in p2.stdout, p2.stdout
|
|
|
|
|
|
def test_vendored_module_can_be_imported_in_isolation(tmp_path):
|
|
"""Smoke: el _vendored/ resultante es importable sin registry_root."""
|
|
enr = _make_enricher_dir(tmp_path,
|
|
"id: x\nname: x\napplies_to: [Webpage]\n"
|
|
"uses_functions:\n - extract_urls_py_cybersecurity\n")
|
|
proc = _run_vendor(enr)
|
|
assert proc.returncode == 0, proc.stderr
|
|
|
|
# Lanzamos un Python externo con _vendored como unico path adicional.
|
|
code = (
|
|
"import sys; sys.path.insert(0, 'enrichers_test/_vendored');"
|
|
"from cybersecurity.cybersecurity import extract_urls;"
|
|
"print(len(extract_urls('foo http://x.com bar')))"
|
|
)
|
|
# Crear symlink temporal con el nombre esperado.
|
|
fake = tmp_path / "enrichers_test"
|
|
fake.symlink_to(enr)
|
|
proc2 = subprocess.run(
|
|
["python3", "-c", code],
|
|
cwd=str(tmp_path), capture_output=True, text=True, timeout=10,
|
|
)
|
|
assert proc2.returncode == 0, proc2.stderr
|
|
assert proc2.stdout.strip() == "1"
|