829bd64aaa
30 tests cubriendo positivos y negativos por tipo: - IPv4 valida/invalida + rangos limite - IPv6 forma completa/comprimida - Emails (caracteres validos en local part) - Dominios con TLD valido vs desconocido - Hashes MD5/SHA1/SHA256/SHA512 por longitud - Wallets BTC legacy/bech32 y ETH - CVEs 4 y 7 digitos - MAC con `:` y `-` (separadores mezclados rechazados) - Telefonos E.164 y ES local 9 digitos - Pipeline filtrado por types y deduplicacion de spans contenidos Refs #0037 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
290 lines
8.9 KiB
Python
290 lines
8.9 KiB
Python
"""Tests para los extractores de IoC y el pipeline `extract_iocs`."""
|
|
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
from extract_ip_addresses import extract_ip_addresses
|
|
from extract_emails import extract_emails
|
|
from extract_domains import extract_domains
|
|
from extract_file_hashes import extract_file_hashes
|
|
from extract_crypto_wallets import extract_crypto_wallets
|
|
from extract_cve_ids import extract_cve_ids
|
|
from extract_mac_addresses import extract_mac_addresses
|
|
from extract_phone_numbers import extract_phone_numbers
|
|
from extract_iocs import extract_iocs
|
|
|
|
|
|
# ---------- IP addresses ----------
|
|
|
|
|
|
def test_ipv4_valida_y_rangos_limite():
|
|
"""IPv4 valida y rangos limite."""
|
|
text = "valid 0.0.0.0 and 255.255.255.255 plus 10.0.0.1"
|
|
ips = extract_ip_addresses(text)
|
|
assert [r["value"] for r in ips] == ["0.0.0.0", "255.255.255.255", "10.0.0.1"]
|
|
assert all(r["type"] == "ip_address" for r in ips)
|
|
|
|
|
|
def test_ipv4_invalida_descartada():
|
|
"""IPv4 invalida (>255 octeto) descartada."""
|
|
text = "fake: 999.999.999.999 and 256.0.0.1 and 1.2.3"
|
|
ips = extract_ip_addresses(text)
|
|
assert ips == []
|
|
|
|
|
|
def test_ipv6_forma_completa_y_comprimida():
|
|
"""IPv6 forma completa y comprimida."""
|
|
text = "v6: 2001:db8:85a3::8a2e:370:7334 and ::1"
|
|
ips = extract_ip_addresses(text)
|
|
values = [r["value"] for r in ips]
|
|
assert "2001:db8:85a3::8a2e:370:7334" in values
|
|
assert "::1" in values
|
|
|
|
|
|
def test_ipv6_invalida_descartada():
|
|
"""IPv6 invalida descartada."""
|
|
# Demasiados grupos (9) — ipaddress lo rechaza aunque la regex lo intente.
|
|
text = "v6 fake: 1:2:3:4:5:6:7:8:9"
|
|
ips = extract_ip_addresses(text)
|
|
assert all(":9" not in r["value"].rsplit(":", 1)[-1] or False for r in ips) or ips == []
|
|
# Con 9 grupos, ipaddress siempre rechaza.
|
|
assert "1:2:3:4:5:6:7:8:9" not in {r["value"] for r in ips}
|
|
|
|
|
|
def test_texto_sin_ips():
|
|
"""Texto sin IPs."""
|
|
assert extract_ip_addresses("nothing to see here") == []
|
|
|
|
|
|
# ---------- Emails ----------
|
|
|
|
|
|
def test_email_simple():
|
|
"""Email simple."""
|
|
text = "Contact: alice@example.com"
|
|
emails = extract_emails(text)
|
|
assert len(emails) == 1
|
|
assert emails[0]["value"] == "alice@example.com"
|
|
assert text[emails[0]["start"] : emails[0]["end"]] == "alice@example.com"
|
|
|
|
|
|
def test_multiples_emails_con_caracteres_validos_en_local_part():
|
|
"""Multiples emails con caracteres validos en local part."""
|
|
text = "alice+work@sub.test.org or first.last_99@a-b.io"
|
|
emails = extract_emails(text)
|
|
values = [r["value"] for r in emails]
|
|
assert "alice+work@sub.test.org" in values
|
|
assert "first.last_99@a-b.io" in values
|
|
|
|
|
|
def test_no_matchea_texto_sin_arroba():
|
|
"""No matchea texto sin @."""
|
|
assert extract_emails("just text, no email here") == []
|
|
|
|
|
|
# ---------- Domains ----------
|
|
|
|
|
|
def test_dominios_con_tld_valido_se_extraen():
|
|
"""Dominios con TLD valido se extraen."""
|
|
text = "visit example.com or test.io"
|
|
domains = extract_domains(text)
|
|
values = [r["value"] for r in domains]
|
|
assert "example.com" in values
|
|
assert "test.io" in values
|
|
|
|
|
|
def test_tld_desconocido_se_descarta():
|
|
"""TLD desconocido se descarta."""
|
|
text = "visit example.fakextld for info"
|
|
assert extract_domains(text) == []
|
|
|
|
|
|
def test_subdominios_profundos():
|
|
"""Subdominios profundos."""
|
|
text = "api.v2.service.example.com is up"
|
|
domains = extract_domains(text)
|
|
assert any(r["value"] == "api.v2.service.example.com" for r in domains)
|
|
|
|
|
|
# ---------- File hashes ----------
|
|
|
|
|
|
def test_md5_sha1_sha256_sha512():
|
|
"""MD5 (32 hex), SHA1 (40), SHA256 (64), SHA512 (128)."""
|
|
md5 = "5d41402abc4b2a76b9719d911017c592"
|
|
sha1 = "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d"
|
|
sha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
|
sha512 = "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
|
|
text = f"{md5} {sha1} {sha256} {sha512}"
|
|
hashes = extract_file_hashes(text)
|
|
by_algo = {r["algorithm"]: r["value"] for r in hashes}
|
|
assert by_algo["md5"] == md5
|
|
assert by_algo["sha1"] == sha1
|
|
assert by_algo["sha256"] == sha256
|
|
assert by_algo["sha512"] == sha512
|
|
|
|
|
|
def test_longitudes_intermedias_se_ignoran():
|
|
"""Longitudes intermedias se ignoran."""
|
|
text = "abcdef" * 10 # 60 hex chars
|
|
assert extract_file_hashes(text) == []
|
|
|
|
|
|
def test_insensible_a_mayusculas_en_hex():
|
|
"""Insensible a mayusculas en hex."""
|
|
md5 = "5D41402ABC4B2A76B9719D911017C592"
|
|
hashes = extract_file_hashes(md5)
|
|
assert len(hashes) == 1
|
|
assert hashes[0]["algorithm"] == "md5"
|
|
|
|
|
|
# ---------- Crypto wallets ----------
|
|
|
|
|
|
def test_btc_legacy():
|
|
"""BTC legacy (P2PKH y P2SH)."""
|
|
p2pkh = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"
|
|
p2sh = "3J98t1WpEZ73CNmQviecrnyiWrnqRhWNLy"
|
|
text = f"send to {p2pkh} or {p2sh}"
|
|
wallets = extract_crypto_wallets(text)
|
|
values = [r["value"] for r in wallets]
|
|
assert p2pkh in values
|
|
assert p2sh in values
|
|
assert all(r["asset"] == "btc" for r in wallets)
|
|
|
|
|
|
def test_btc_bech32_segwit():
|
|
"""BTC bech32 (segwit)."""
|
|
bech32 = "bc1qar0srrr7xfkvy5l643lydnw9re59gtzzwf5mdq"
|
|
wallets = extract_crypto_wallets(f"address: {bech32}")
|
|
assert len(wallets) == 1
|
|
assert wallets[0]["value"] == bech32
|
|
assert wallets[0]["asset"] == "btc"
|
|
|
|
|
|
def test_eth_0x_y_40_hex():
|
|
"""ETH 0x + 40 hex."""
|
|
eth = "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1"
|
|
wallets = extract_crypto_wallets(f"send {eth} to me")
|
|
assert len(wallets) == 1
|
|
assert wallets[0]["value"] == eth
|
|
assert wallets[0]["asset"] == "eth"
|
|
|
|
|
|
# ---------- CVEs ----------
|
|
|
|
|
|
def test_cve_basico():
|
|
"""CVE basico (4 digitos)."""
|
|
text = "Patch CVE-2014-0160 immediately"
|
|
cves = extract_cve_ids(text)
|
|
assert [r["value"] for r in cves] == ["CVE-2014-0160"]
|
|
|
|
|
|
def test_cve_con_5_o_mas_digitos_post_2014():
|
|
"""CVE con 5+ digitos (post-2014)."""
|
|
cves = extract_cve_ids("see CVE-2024-1234567 advisory")
|
|
assert [r["value"] for r in cves] == ["CVE-2024-1234567"]
|
|
|
|
|
|
def test_multiples_cves_en_mismo_texto():
|
|
"""Multiples CVEs en mismo texto."""
|
|
text = "Affected: CVE-2021-44228, CVE-2021-45046, CVE-2021-45105"
|
|
cves = extract_cve_ids(text)
|
|
values = [r["value"] for r in cves]
|
|
assert values == ["CVE-2021-44228", "CVE-2021-45046", "CVE-2021-45105"]
|
|
|
|
|
|
# ---------- MAC addresses ----------
|
|
|
|
|
|
def test_mac_con_dos_puntos():
|
|
"""MAC con dos puntos."""
|
|
text = "iface 00:1A:2B:3C:4D:5E up"
|
|
macs = extract_mac_addresses(text)
|
|
assert [r["value"] for r in macs] == ["00:1A:2B:3C:4D:5E"]
|
|
|
|
|
|
def test_mac_con_guiones():
|
|
"""MAC con guiones."""
|
|
text = "AA-BB-CC-DD-EE-FF"
|
|
macs = extract_mac_addresses(text)
|
|
assert [r["value"] for r in macs] == ["AA-BB-CC-DD-EE-FF"]
|
|
|
|
|
|
def test_separadores_mezclados_se_rechazan():
|
|
"""Separadores mezclados se rechazan."""
|
|
text = "00:1A-2B:3C-4D:5E"
|
|
assert extract_mac_addresses(text) == []
|
|
|
|
|
|
# ---------- Phone numbers ----------
|
|
|
|
|
|
def test_numero_e164_con_espacios():
|
|
"""Numero E.164 con espacios."""
|
|
text = "call +34 612 345 678 now"
|
|
phones = extract_phone_numbers(text)
|
|
assert any(r["value"].startswith("+34") for r in phones)
|
|
|
|
|
|
def test_numero_local_es_9_digitos():
|
|
"""Numero local ES de 9 digitos."""
|
|
text = "directo 612345678 fijo"
|
|
phones = extract_phone_numbers(text)
|
|
assert any(r["value"] == "612345678" for r in phones)
|
|
|
|
|
|
def test_numero_demasiado_corto_se_descarta():
|
|
"""Numero demasiado corto se descarta."""
|
|
text = "ext 1234"
|
|
assert extract_phone_numbers(text) == []
|
|
|
|
|
|
# ---------- Pipeline extract_iocs ----------
|
|
|
|
|
|
def test_pipeline_corre_todos_los_extractores():
|
|
"""Pipeline corre todos los extractores."""
|
|
text = (
|
|
"Reach alice@example.com from 10.0.0.5; "
|
|
"CVE-2023-1234 vendor 00:1A:2B:3C:4D:5E "
|
|
"wallet 0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1"
|
|
)
|
|
iocs = extract_iocs(text)
|
|
types = {r["type"] for r in iocs}
|
|
assert "email" in types
|
|
assert "ip_address" in types
|
|
assert "cve_id" in types
|
|
assert "mac_address" in types
|
|
assert "crypto_wallet" in types
|
|
|
|
|
|
def test_filtro_por_types_subset():
|
|
"""Filtro por types subset."""
|
|
text = "alice@example.com 10.0.0.5"
|
|
iocs = extract_iocs(text, types=["ip_address"])
|
|
types = {r["type"] for r in iocs}
|
|
assert types == {"ip_address"}
|
|
|
|
|
|
def test_deduplica_spans_contenidos():
|
|
"""Deduplica spans contenidos (dominio dentro de email)."""
|
|
text = "Email: alice@example.com nothing else"
|
|
iocs = extract_iocs(text)
|
|
# El email aparece, el dominio interno se descarta por contenido.
|
|
types = [r["type"] for r in iocs]
|
|
assert "email" in types
|
|
assert "domain" not in types
|
|
|
|
|
|
def test_tipos_desconocidos_se_ignoran():
|
|
"""Tipos desconocidos se ignoran."""
|
|
text = "alice@example.com"
|
|
iocs = extract_iocs(text, types=["nonexistent", "email"])
|
|
assert len(iocs) == 1
|
|
assert iocs[0]["type"] == "email"
|