Files
fn_registry/python/functions/cybersecurity/extract_iocs.py
T
egutierrez cce7764510 feat(cybersecurity): 8 IoC regex extractors + extract_iocs pipeline puro
Extractores nuevos en python/functions/cybersecurity/:
- extract_ip_addresses (IPv4 + IPv6 con validacion ipaddress)
- extract_emails (RFC 5322 simplificado)
- extract_domains (FQDNs con TLD valido, lista estatica)
- extract_file_hashes (MD5/SHA1/SHA256/SHA512, algoritmo por longitud)
- extract_crypto_wallets (BTC legacy + bech32, ETH 0x+40hex)
- extract_cve_ids (CVE-YYYY-NNNN+)
- extract_mac_addresses (xx:xx:xx + xx-xx-xx, separador uniforme)
- extract_phone_numbers (E.164 + ES local 9 digitos)

Pipeline:
- extract_iocs corre todos, deduplica spans contenidos. Mantiene
  purity:pure (kind:function con uses_functions no vacio) porque la
  regla del registry exige que los pipelines sean impuros.

Todas devuelven list[dict] con value/start/end/type para que el
caller (issues 0038-0040) pueda reconciliar offsets con spans NER
sin reparsing.

Refs #0037

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 16:41:30 +02:00

74 lines
2.4 KiB
Python

"""Pipeline puro: corre todos los extractores de IoC y unifica resultados."""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from extract_ip_addresses import extract_ip_addresses
from extract_emails import extract_emails
from extract_domains import extract_domains
from extract_file_hashes import extract_file_hashes
from extract_crypto_wallets import extract_crypto_wallets
from extract_cve_ids import extract_cve_ids
from extract_mac_addresses import extract_mac_addresses
from extract_phone_numbers import extract_phone_numbers
_EXTRACTORS = {
"email": extract_emails,
"ip_address": extract_ip_addresses,
"crypto_wallet": extract_crypto_wallets,
"cve_id": extract_cve_ids,
"mac_address": extract_mac_addresses,
"file_hash": extract_file_hashes,
"phone_number": extract_phone_numbers,
"domain": extract_domains,
}
def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]:
"""Extrae todos los IoCs del texto y unifica resultados con `type`.
Si `types` es None, corre todos los extractores. En caso contrario,
ejecuta solo los tipos solicitados (los desconocidos se ignoran).
Resultados se ordenan por offset y se desduplican: si un span esta
completamente contenido dentro de otro, el contenido se descarta
(ej. un dominio dentro de un email, o un SHA1 dentro de un wallet
ETH). Empate por span exacto: gana el que aparece primero en el
orden de extractores definido.
"""
if types is None:
types = list(_EXTRACTORS.keys())
raw: list[dict] = []
for t in types:
extractor = _EXTRACTORS.get(t)
if extractor is None:
continue
raw.extend(extractor(text))
# Orden: por start ascendente, luego por longitud descendente para
# que el span mas amplio se procese antes y absorba los contenidos.
raw.sort(key=lambda r: (r["start"], -(r["end"] - r["start"])))
deduped: list[dict] = []
for m in raw:
contained = any(
d["start"] <= m["start"] and d["end"] >= m["end"]
and (d["start"], d["end"]) != (m["start"], m["end"])
for d in deduped
)
if contained:
continue
# Empate exacto: si ya hay otro con el mismo span, no anadir.
if any(
(d["start"], d["end"]) == (m["start"], m["end"])
for d in deduped
):
continue
deduped.append(m)
return deduped