diff --git a/python/functions/cybersecurity/tests/__init__.py b/python/functions/cybersecurity/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/functions/cybersecurity/tests/test_extract_iocs.py b/python/functions/cybersecurity/tests/test_extract_iocs.py new file mode 100644 index 00000000..d514757a --- /dev/null +++ b/python/functions/cybersecurity/tests/test_extract_iocs.py @@ -0,0 +1,289 @@ +"""Tests para los extractores de IoC y el pipeline `extract_iocs`.""" + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from extract_ip_addresses import extract_ip_addresses +from extract_emails import extract_emails +from extract_domains import extract_domains +from extract_file_hashes import extract_file_hashes +from extract_crypto_wallets import extract_crypto_wallets +from extract_cve_ids import extract_cve_ids +from extract_mac_addresses import extract_mac_addresses +from extract_phone_numbers import extract_phone_numbers +from extract_iocs import extract_iocs + + +# ---------- IP addresses ---------- + + +def test_ipv4_valida_y_rangos_limite(): + """IPv4 valida y rangos limite.""" + text = "valid 0.0.0.0 and 255.255.255.255 plus 10.0.0.1" + ips = extract_ip_addresses(text) + assert [r["value"] for r in ips] == ["0.0.0.0", "255.255.255.255", "10.0.0.1"] + assert all(r["type"] == "ip_address" for r in ips) + + +def test_ipv4_invalida_descartada(): + """IPv4 invalida (>255 octeto) descartada.""" + text = "fake: 999.999.999.999 and 256.0.0.1 and 1.2.3" + ips = extract_ip_addresses(text) + assert ips == [] + + +def test_ipv6_forma_completa_y_comprimida(): + """IPv6 forma completa y comprimida.""" + text = "v6: 2001:db8:85a3::8a2e:370:7334 and ::1" + ips = extract_ip_addresses(text) + values = [r["value"] for r in ips] + assert "2001:db8:85a3::8a2e:370:7334" in values + assert "::1" in values + + +def test_ipv6_invalida_descartada(): + """IPv6 invalida descartada.""" + # Demasiados grupos (9) — ipaddress lo rechaza aunque la regex lo intente. + text = "v6 fake: 1:2:3:4:5:6:7:8:9" + ips = extract_ip_addresses(text) + assert all(":9" not in r["value"].rsplit(":", 1)[-1] or False for r in ips) or ips == [] + # Con 9 grupos, ipaddress siempre rechaza. + assert "1:2:3:4:5:6:7:8:9" not in {r["value"] for r in ips} + + +def test_texto_sin_ips(): + """Texto sin IPs.""" + assert extract_ip_addresses("nothing to see here") == [] + + +# ---------- Emails ---------- + + +def test_email_simple(): + """Email simple.""" + text = "Contact: alice@example.com" + emails = extract_emails(text) + assert len(emails) == 1 + assert emails[0]["value"] == "alice@example.com" + assert text[emails[0]["start"] : emails[0]["end"]] == "alice@example.com" + + +def test_multiples_emails_con_caracteres_validos_en_local_part(): + """Multiples emails con caracteres validos en local part.""" + text = "alice+work@sub.test.org or first.last_99@a-b.io" + emails = extract_emails(text) + values = [r["value"] for r in emails] + assert "alice+work@sub.test.org" in values + assert "first.last_99@a-b.io" in values + + +def test_no_matchea_texto_sin_arroba(): + """No matchea texto sin @.""" + assert extract_emails("just text, no email here") == [] + + +# ---------- Domains ---------- + + +def test_dominios_con_tld_valido_se_extraen(): + """Dominios con TLD valido se extraen.""" + text = "visit example.com or test.io" + domains = extract_domains(text) + values = [r["value"] for r in domains] + assert "example.com" in values + assert "test.io" in values + + +def test_tld_desconocido_se_descarta(): + """TLD desconocido se descarta.""" + text = "visit example.fakextld for info" + assert extract_domains(text) == [] + + +def test_subdominios_profundos(): + """Subdominios profundos.""" + text = "api.v2.service.example.com is up" + domains = extract_domains(text) + assert any(r["value"] == "api.v2.service.example.com" for r in domains) + + +# ---------- File hashes ---------- + + +def test_md5_sha1_sha256_sha512(): + """MD5 (32 hex), SHA1 (40), SHA256 (64), SHA512 (128).""" + md5 = "5d41402abc4b2a76b9719d911017c592" + sha1 = "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d" + sha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + sha512 = "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e" + text = f"{md5} {sha1} {sha256} {sha512}" + hashes = extract_file_hashes(text) + by_algo = {r["algorithm"]: r["value"] for r in hashes} + assert by_algo["md5"] == md5 + assert by_algo["sha1"] == sha1 + assert by_algo["sha256"] == sha256 + assert by_algo["sha512"] == sha512 + + +def test_longitudes_intermedias_se_ignoran(): + """Longitudes intermedias se ignoran.""" + text = "abcdef" * 10 # 60 hex chars + assert extract_file_hashes(text) == [] + + +def test_insensible_a_mayusculas_en_hex(): + """Insensible a mayusculas en hex.""" + md5 = "5D41402ABC4B2A76B9719D911017C592" + hashes = extract_file_hashes(md5) + assert len(hashes) == 1 + assert hashes[0]["algorithm"] == "md5" + + +# ---------- Crypto wallets ---------- + + +def test_btc_legacy(): + """BTC legacy (P2PKH y P2SH).""" + p2pkh = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa" + p2sh = "3J98t1WpEZ73CNmQviecrnyiWrnqRhWNLy" + text = f"send to {p2pkh} or {p2sh}" + wallets = extract_crypto_wallets(text) + values = [r["value"] for r in wallets] + assert p2pkh in values + assert p2sh in values + assert all(r["asset"] == "btc" for r in wallets) + + +def test_btc_bech32_segwit(): + """BTC bech32 (segwit).""" + bech32 = "bc1qar0srrr7xfkvy5l643lydnw9re59gtzzwf5mdq" + wallets = extract_crypto_wallets(f"address: {bech32}") + assert len(wallets) == 1 + assert wallets[0]["value"] == bech32 + assert wallets[0]["asset"] == "btc" + + +def test_eth_0x_y_40_hex(): + """ETH 0x + 40 hex.""" + eth = "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1" + wallets = extract_crypto_wallets(f"send {eth} to me") + assert len(wallets) == 1 + assert wallets[0]["value"] == eth + assert wallets[0]["asset"] == "eth" + + +# ---------- CVEs ---------- + + +def test_cve_basico(): + """CVE basico (4 digitos).""" + text = "Patch CVE-2014-0160 immediately" + cves = extract_cve_ids(text) + assert [r["value"] for r in cves] == ["CVE-2014-0160"] + + +def test_cve_con_5_o_mas_digitos_post_2014(): + """CVE con 5+ digitos (post-2014).""" + cves = extract_cve_ids("see CVE-2024-1234567 advisory") + assert [r["value"] for r in cves] == ["CVE-2024-1234567"] + + +def test_multiples_cves_en_mismo_texto(): + """Multiples CVEs en mismo texto.""" + text = "Affected: CVE-2021-44228, CVE-2021-45046, CVE-2021-45105" + cves = extract_cve_ids(text) + values = [r["value"] for r in cves] + assert values == ["CVE-2021-44228", "CVE-2021-45046", "CVE-2021-45105"] + + +# ---------- MAC addresses ---------- + + +def test_mac_con_dos_puntos(): + """MAC con dos puntos.""" + text = "iface 00:1A:2B:3C:4D:5E up" + macs = extract_mac_addresses(text) + assert [r["value"] for r in macs] == ["00:1A:2B:3C:4D:5E"] + + +def test_mac_con_guiones(): + """MAC con guiones.""" + text = "AA-BB-CC-DD-EE-FF" + macs = extract_mac_addresses(text) + assert [r["value"] for r in macs] == ["AA-BB-CC-DD-EE-FF"] + + +def test_separadores_mezclados_se_rechazan(): + """Separadores mezclados se rechazan.""" + text = "00:1A-2B:3C-4D:5E" + assert extract_mac_addresses(text) == [] + + +# ---------- Phone numbers ---------- + + +def test_numero_e164_con_espacios(): + """Numero E.164 con espacios.""" + text = "call +34 612 345 678 now" + phones = extract_phone_numbers(text) + assert any(r["value"].startswith("+34") for r in phones) + + +def test_numero_local_es_9_digitos(): + """Numero local ES de 9 digitos.""" + text = "directo 612345678 fijo" + phones = extract_phone_numbers(text) + assert any(r["value"] == "612345678" for r in phones) + + +def test_numero_demasiado_corto_se_descarta(): + """Numero demasiado corto se descarta.""" + text = "ext 1234" + assert extract_phone_numbers(text) == [] + + +# ---------- Pipeline extract_iocs ---------- + + +def test_pipeline_corre_todos_los_extractores(): + """Pipeline corre todos los extractores.""" + text = ( + "Reach alice@example.com from 10.0.0.5; " + "CVE-2023-1234 vendor 00:1A:2B:3C:4D:5E " + "wallet 0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1" + ) + iocs = extract_iocs(text) + types = {r["type"] for r in iocs} + assert "email" in types + assert "ip_address" in types + assert "cve_id" in types + assert "mac_address" in types + assert "crypto_wallet" in types + + +def test_filtro_por_types_subset(): + """Filtro por types subset.""" + text = "alice@example.com 10.0.0.5" + iocs = extract_iocs(text, types=["ip_address"]) + types = {r["type"] for r in iocs} + assert types == {"ip_address"} + + +def test_deduplica_spans_contenidos(): + """Deduplica spans contenidos (dominio dentro de email).""" + text = "Email: alice@example.com nothing else" + iocs = extract_iocs(text) + # El email aparece, el dominio interno se descarta por contenido. + types = [r["type"] for r in iocs] + assert "email" in types + assert "domain" not in types + + +def test_tipos_desconocidos_se_ignoran(): + """Tipos desconocidos se ignoran.""" + text = "alice@example.com" + iocs = extract_iocs(text, types=["nonexistent", "email"]) + assert len(iocs) == 1 + assert iocs[0]["type"] == "email"