From 55dcdd1164240671e74794bfc2fcf1971dc1f23a Mon Sep 17 00:00:00 2001 From: egutierrez Date: Thu, 30 Apr 2026 16:24:11 +0200 Subject: [PATCH] feat(cybersecurity): 8 IoC regex extractors + extract_iocs pipeline puro Extractores nuevos en python/functions/cybersecurity/: - extract_ip_addresses (IPv4 + IPv6 con validacion ipaddress) - extract_emails (RFC 5322 simplificado) - extract_domains (FQDNs con TLD valido, lista estatica) - extract_file_hashes (MD5/SHA1/SHA256/SHA512, algoritmo por longitud) - extract_crypto_wallets (BTC legacy + bech32, ETH 0x+40hex) - extract_cve_ids (CVE-YYYY-NNNN+) - extract_mac_addresses (xx:xx:xx + xx-xx-xx, separador uniforme) - extract_phone_numbers (E.164 + ES local 9 digitos) Pipeline: - extract_iocs corre todos, deduplica spans contenidos. Mantiene purity:pure (kind:function con uses_functions no vacio) porque la regla del registry exige que los pipelines sean impuros. Todas devuelven list[dict] con value/start/end/type para que el caller (issues 0038-0040) pueda reconciliar offsets con spans NER sin reparsing. Refs #0037 Co-Authored-By: Claude Opus 4.7 (1M context) --- python/functions/cybersecurity/__init__.py | 18 +++++ .../cybersecurity/extract_crypto_wallets.md | 40 ++++++++++ .../cybersecurity/extract_crypto_wallets.py | 44 +++++++++++ .../cybersecurity/extract_cve_ids.md | 40 ++++++++++ .../cybersecurity/extract_cve_ids.py | 27 +++++++ .../cybersecurity/extract_domains.md | 40 ++++++++++ .../cybersecurity/extract_domains.py | 58 +++++++++++++++ .../functions/cybersecurity/extract_emails.md | 40 ++++++++++ .../functions/cybersecurity/extract_emails.py | 30 ++++++++ .../cybersecurity/extract_file_hashes.md | 42 +++++++++++ .../cybersecurity/extract_file_hashes.py | 40 ++++++++++ .../functions/cybersecurity/extract_iocs.md | 59 +++++++++++++++ .../functions/cybersecurity/extract_iocs.py | 73 +++++++++++++++++++ .../cybersecurity/extract_ip_addresses.md | 45 ++++++++++++ .../cybersecurity/extract_ip_addresses.py | 53 ++++++++++++++ .../cybersecurity/extract_mac_addresses.md | 40 ++++++++++ .../cybersecurity/extract_mac_addresses.py | 31 ++++++++ .../cybersecurity/extract_phone_numbers.md | 40 ++++++++++ .../cybersecurity/extract_phone_numbers.py | 63 ++++++++++++++++ 19 files changed, 823 insertions(+) create mode 100644 python/functions/cybersecurity/extract_crypto_wallets.md create mode 100644 python/functions/cybersecurity/extract_crypto_wallets.py create mode 100644 python/functions/cybersecurity/extract_cve_ids.md create mode 100644 python/functions/cybersecurity/extract_cve_ids.py create mode 100644 python/functions/cybersecurity/extract_domains.md create mode 100644 python/functions/cybersecurity/extract_domains.py create mode 100644 python/functions/cybersecurity/extract_emails.md create mode 100644 python/functions/cybersecurity/extract_emails.py create mode 100644 python/functions/cybersecurity/extract_file_hashes.md create mode 100644 python/functions/cybersecurity/extract_file_hashes.py create mode 100644 python/functions/cybersecurity/extract_iocs.md create mode 100644 python/functions/cybersecurity/extract_iocs.py create mode 100644 python/functions/cybersecurity/extract_ip_addresses.md create mode 100644 python/functions/cybersecurity/extract_ip_addresses.py create mode 100644 python/functions/cybersecurity/extract_mac_addresses.md create mode 100644 python/functions/cybersecurity/extract_mac_addresses.py create mode 100644 python/functions/cybersecurity/extract_phone_numbers.md create mode 100644 python/functions/cybersecurity/extract_phone_numbers.py diff --git a/python/functions/cybersecurity/__init__.py b/python/functions/cybersecurity/__init__.py index caddb4be..5b3eb4a1 100644 --- a/python/functions/cybersecurity/__init__.py +++ b/python/functions/cybersecurity/__init__.py @@ -12,6 +12,15 @@ from .cybersecurity import ( envelope_encrypt, envelope_decrypt, ) +from .extract_ip_addresses import extract_ip_addresses +from .extract_emails import extract_emails +from .extract_domains import extract_domains +from .extract_file_hashes import extract_file_hashes +from .extract_crypto_wallets import extract_crypto_wallets +from .extract_cve_ids import extract_cve_ids +from .extract_mac_addresses import extract_mac_addresses +from .extract_phone_numbers import extract_phone_numbers +from .extract_iocs import extract_iocs __all__ = [ "hash_sha256", @@ -26,4 +35,13 @@ __all__ = [ "normalize_url", "envelope_encrypt", "envelope_decrypt", + "extract_ip_addresses", + "extract_emails", + "extract_domains", + "extract_file_hashes", + "extract_crypto_wallets", + "extract_cve_ids", + "extract_mac_addresses", + "extract_phone_numbers", + "extract_iocs", ] diff --git a/python/functions/cybersecurity/extract_crypto_wallets.md b/python/functions/cybersecurity/extract_crypto_wallets.md new file mode 100644 index 00000000..4b08a424 --- /dev/null +++ b/python/functions/cybersecurity/extract_crypto_wallets.md @@ -0,0 +1,40 @@ +--- +name: extract_crypto_wallets +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_crypto_wallets(text: str) -> list[dict]" +description: "Extrae direcciones BTC (legacy P2PKH/P2SH + bech32) y ETH (0x + 40 hex) de un texto, con offsets y `asset` indicando la moneda. Validacion estructural por regex — no checksum." +tags: [ioc, crypto, btc, eth, wallet, bitcoin, ethereum, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer wallets" +output: "lista de dicts con {value, start, end, type='crypto_wallet', asset} por cada direccion encontrada" +tested: true +tests: + - "BTC legacy (P2PKH y P2SH)" + - "BTC bech32 (segwit)" + - "ETH 0x + 40 hex" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_crypto_wallets.py" +--- + +## Ejemplo + +```python +extract_crypto_wallets("Send to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa or 0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1") +# [{"value": "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", ..., "asset": "btc"}, +# {"value": "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1", ..., "asset": "eth"}] +``` + +## Notas + +BTC legacy: empieza por `1` o `3`, base58 (sin 0/O/I/l), 26-35 chars. BTC bech32: prefijo `bc1`, alfabeto bech32. ETH: `0x` + 40 hex case-insensitive. No se valida checksum — un agente que requiera validacion completa debe correr base58check / EIP-55 sobre los `value` retornados. diff --git a/python/functions/cybersecurity/extract_crypto_wallets.py b/python/functions/cybersecurity/extract_crypto_wallets.py new file mode 100644 index 00000000..b5ce1cd2 --- /dev/null +++ b/python/functions/cybersecurity/extract_crypto_wallets.py @@ -0,0 +1,44 @@ +"""Extrae wallets BTC y ETH de un texto, con offsets.""" + +import re + +_BTC_LEGACY = re.compile( + r"(? list[dict]: + """Extrae direcciones BTC (legacy + bech32) y ETH con offsets. + + BTC legacy (P2PKH/P2SH) empieza por `1` o `3`. BTC bech32 (segwit) + empieza por `bc1`. ETH es `0x` seguido de 40 caracteres hex. No se + valida checksum — la regex es estructural. + """ + results = [] + for regex, asset in ( + (_BTC_LEGACY, "btc"), + (_BTC_BECH32, "btc"), + (_ETH_REGEX, "eth"), + ): + for m in regex.finditer(text): + results.append({ + "value": m.group(0), + "start": m.start(), + "end": m.end(), + "type": "crypto_wallet", + "asset": asset, + }) + results.sort(key=lambda r: r["start"]) + return results diff --git a/python/functions/cybersecurity/extract_cve_ids.md b/python/functions/cybersecurity/extract_cve_ids.md new file mode 100644 index 00000000..2d9463df --- /dev/null +++ b/python/functions/cybersecurity/extract_cve_ids.md @@ -0,0 +1,40 @@ +--- +name: extract_cve_ids +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_cve_ids(text: str) -> list[dict]" +description: "Extrae IDs CVE en formato `CVE-YYYY-NNNN+` de un texto, con offsets. No valida que el CVE exista en NVD." +tags: [ioc, cve, vulnerability, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer CVEs" +output: "lista de dicts con {value, start, end, type='cve_id'} por cada CVE encontrado" +tested: true +tests: + - "CVE basico (4 digitos)" + - "CVE con 5+ digitos (post-2014)" + - "Multiples CVEs en mismo texto" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_cve_ids.py" +--- + +## Ejemplo + +```python +extract_cve_ids("Patches CVE-2021-44228 and CVE-2024-1234567") +# [{"value": "CVE-2021-44228", "start": 8, "end": 22, "type": "cve_id"}, +# {"value": "CVE-2024-1234567", "start": 27, "end": 43, "type": "cve_id"}] +``` + +## Notas + +Acepta el rango oficial NVD: año de 4 digitos seguido de 4 a 7 digitos. No valida que exista en NVD — solo estructura. La parte numerica creciente permite CVEs grandes (post-2014, donde NVD elimino el limite de 4 digitos). diff --git a/python/functions/cybersecurity/extract_cve_ids.py b/python/functions/cybersecurity/extract_cve_ids.py new file mode 100644 index 00000000..09768b54 --- /dev/null +++ b/python/functions/cybersecurity/extract_cve_ids.py @@ -0,0 +1,27 @@ +"""Extrae identificadores CVE de un texto, con offsets.""" + +import re + +_CVE_REGEX = re.compile( + r"(? list[dict]: + """Extrae IDs CVE con formato `CVE-YYYY-NNNN+`. + + Acepta el rango oficial (NVD): año de 4 digitos seguido de 4 a 7 + digitos. No valida que el CVE exista en NVD. Insensible a posicion + (puede aparecer al inicio, en medio o al final del texto). + """ + return [ + { + "value": m.group(0), + "start": m.start(), + "end": m.end(), + "type": "cve_id", + } + for m in _CVE_REGEX.finditer(text) + ] diff --git a/python/functions/cybersecurity/extract_domains.md b/python/functions/cybersecurity/extract_domains.md new file mode 100644 index 00000000..92cf9e38 --- /dev/null +++ b/python/functions/cybersecurity/extract_domains.md @@ -0,0 +1,40 @@ +--- +name: extract_domains +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_domains(text: str) -> list[dict]" +description: "Extrae FQDNs (dominios con TLD valido) de un texto, con offsets start/end. Usa lista estatica de TLDs comunes (gTLD + ccTLD frecuentes). No valida DNS." +tags: [ioc, domain, fqdn, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer dominios" +output: "lista de dicts con {value, start, end, type='domain'} por cada FQDN reconocido" +tested: true +tests: + - "Dominios con TLD valido se extraen" + - "TLD desconocido se descarta" + - "Subdominios profundos" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_domains.py" +--- + +## Ejemplo + +```python +extract_domains("visit example.com or sub.test.io for info") +# [{"value": "example.com", "start": 6, "end": 17, "type": "domain"}, +# {"value": "sub.test.io", "start": 21, "end": 32, "type": "domain"}] +``` + +## Notas + +Lista de TLDs estatica (no IANA completa). Cubre los gTLD originales, los nuevos populares (app, dev, io, ai, cloud, xyz, ...) y ccTLDs frecuentes. Si necesitas un TLD nuevo, ampliar `_VALID_TLDS` en el .py. No usa publicsuffix (dependencia externa). Si el dominio aparece dentro de un email, se extrae igual — el pipeline `extract_iocs` deduplica por offsets. diff --git a/python/functions/cybersecurity/extract_domains.py b/python/functions/cybersecurity/extract_domains.py new file mode 100644 index 00000000..4f137cbe --- /dev/null +++ b/python/functions/cybersecurity/extract_domains.py @@ -0,0 +1,58 @@ +"""Extrae FQDNs validos de un texto, con offsets.""" + +import re + +# Lista estatica de TLDs comunes (no exhaustiva — IANA tiene >1500). +# Incluye los gTLD originales, los nuevos mas usados, y ccTLD frecuentes. +_VALID_TLDS = frozenset({ + # gTLD originales + "com", "org", "net", "edu", "gov", "mil", "int", + # gTLD comunes + "info", "biz", "name", "pro", "mobi", "asia", "jobs", "tel", "travel", + "xxx", "post", + # nuevos gTLD populares + "app", "dev", "io", "ai", "tech", "cloud", "online", "site", "store", + "xyz", "top", "shop", "club", "fun", "live", "blog", "page", "news", + "media", "design", "studio", "agency", "io", "co", "me", "tv", + # ccTLD frecuentes + "us", "uk", "de", "fr", "es", "it", "nl", "be", "se", "no", "fi", "dk", + "ru", "ua", "pl", "cz", "ch", "at", "pt", "gr", "ie", "tr", + "ca", "mx", "br", "ar", "cl", "co", "pe", "ve", "uy", + "cn", "jp", "kr", "in", "id", "th", "vn", "my", "sg", "ph", "tw", "hk", + "au", "nz", + "za", "eg", "ma", "ng", "ke", + "il", "ae", "sa", "qa", + "eu", +}) + +# Componentes: letras/digitos con guiones internos, sin empezar/terminar en guion. +_LABEL = r"[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?" +_DOMAIN_REGEX = re.compile( + rf"(? list[dict]: + """Extrae FQDNs cuyo TLD esta en la lista estatica. + + Solo captura nombres con al menos un punto y un TLD reconocido. No + incluye URLs completas (ver `extract_urls`). Si el dominio aparece + dentro de un email, igual se extrae — el caller puede deduplicar + por offsets si lo necesita. + """ + results = [] + for m in _DOMAIN_REGEX.finditer(text): + candidate = m.group(0) + tld = candidate.rsplit(".", 1)[-1].lower() + if tld not in _VALID_TLDS: + continue + results.append({ + "value": candidate, + "start": m.start(), + "end": m.end(), + "type": "domain", + }) + return results diff --git a/python/functions/cybersecurity/extract_emails.md b/python/functions/cybersecurity/extract_emails.md new file mode 100644 index 00000000..264b4272 --- /dev/null +++ b/python/functions/cybersecurity/extract_emails.md @@ -0,0 +1,40 @@ +--- +name: extract_emails +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_emails(text: str) -> list[dict]" +description: "Extrae direcciones de email (RFC 5322 simplificado) de un texto, con offsets start/end. No valida MX ni que el TLD exista — solo estructura sintactica." +tags: [ioc, email, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer emails" +output: "lista de dicts con {value, start, end, type='email'} por cada email encontrado" +tested: true +tests: + - "Email simple" + - "Multiples emails con caracteres validos en local part" + - "No matchea texto sin @" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_emails.py" +--- + +## Ejemplo + +```python +extract_emails("Contact: alice@example.com or bob+work@sub.test.org") +# [{"value": "alice@example.com", "start": 9, "end": 26, "type": "email"}, +# {"value": "bob+work@sub.test.org", "start": 30, "end": 51, "type": "email"}] +``` + +## Notas + +Acepta `._%+-` en parte local. El dominio exige al menos un punto y termina en componente alfanumerico de 1+ chars. No valida MX ni que el TLD aparezca en lista de TLDs validos — para extraer dominios independientemente, ver `extract_domains_py_cybersecurity`. diff --git a/python/functions/cybersecurity/extract_emails.py b/python/functions/cybersecurity/extract_emails.py new file mode 100644 index 00000000..e6119a4f --- /dev/null +++ b/python/functions/cybersecurity/extract_emails.py @@ -0,0 +1,30 @@ +"""Extrae direcciones de email de un texto, con offsets.""" + +import re + +_EMAIL_REGEX = re.compile( + r"(? list[dict]: + """Extrae emails (RFC 5322 simplificado) con offsets. + + No valida MX ni que el TLD exista — solo estructura sintactica. La + parte local acepta letras, digitos y `._%+-`. El dominio exige al + menos un punto y termina en componente alfanumerico. + """ + return [ + { + "value": m.group(0), + "start": m.start(), + "end": m.end(), + "type": "email", + } + for m in _EMAIL_REGEX.finditer(text) + ] diff --git a/python/functions/cybersecurity/extract_file_hashes.md b/python/functions/cybersecurity/extract_file_hashes.md new file mode 100644 index 00000000..2a2b751c --- /dev/null +++ b/python/functions/cybersecurity/extract_file_hashes.md @@ -0,0 +1,42 @@ +--- +name: extract_file_hashes +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_file_hashes(text: str) -> list[dict]" +description: "Extrae hashes MD5/SHA1/SHA256/SHA512 de un texto, con offsets y algoritmo deducido por longitud (32, 40, 64 o 128 hex). Util para extraer IoCs de reportes de threat intelligence." +tags: [ioc, hash, md5, sha1, sha256, sha512, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer hashes hex" +output: "lista de dicts con {value, start, end, type='file_hash', algorithm} por cada hash encontrado" +tested: true +tests: + - "MD5 (32 hex), SHA1 (40), SHA256 (64), SHA512 (128)" + - "Longitudes intermedias se ignoran" + - "Insensible a mayusculas en hex" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_file_hashes.py" +--- + +## Ejemplo + +```python +extract_file_hashes("MD5: 5d41402abc4b2a76b9719d911017c592 SHA1: aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d") +# [{"value": "5d41402abc4b2a76b9719d911017c592", "start": 5, "end": 37, +# "type": "file_hash", "algorithm": "md5"}, +# {"value": "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d", "start": 44, "end": 84, +# "type": "file_hash", "algorithm": "sha1"}] +``` + +## Notas + +Detecta solo longitudes canonicas (32/40/64/128 hex). Una secuencia hex de 50 caracteres se ignora. Word-boundary `\b` evita matchear sub-strings de hex mas largo. ETH wallets (`0x` + 40 hex = 42 chars totales) NO matchean este extractor por el `\b` y la ausencia del prefijo `0x` en este patron — el pipeline `extract_iocs` deduplica overlaps si los hubiera. diff --git a/python/functions/cybersecurity/extract_file_hashes.py b/python/functions/cybersecurity/extract_file_hashes.py new file mode 100644 index 00000000..10d811a4 --- /dev/null +++ b/python/functions/cybersecurity/extract_file_hashes.py @@ -0,0 +1,40 @@ +"""Extrae hashes MD5/SHA1/SHA256/SHA512 de un texto, con offsets y algoritmo.""" + +import re + +# Mas largo primero para evitar que un SHA256 quede como SHA1+resto. +_HASH_LENGTHS = ( + (128, "sha512"), + (64, "sha256"), + (40, "sha1"), + (32, "md5"), +) + +_HASH_CANDIDATE = re.compile(r"\b[A-Fa-f0-9]{32,128}\b") + + +def extract_file_hashes(text: str) -> list[dict]: + """Extrae hashes hex con su algoritmo deducido por longitud. + + Reconoce MD5 (32), SHA1 (40), SHA256 (64) y SHA512 (128). Hashes + de longitudes intermedias se ignoran. Devuelve `algorithm` ademas + de los campos estandar. + """ + results = [] + for m in _HASH_CANDIDATE.finditer(text): + candidate = m.group(0) + length = len(candidate) + algorithm = next( + (algo for size, algo in _HASH_LENGTHS if size == length), + None, + ) + if algorithm is None: + continue + results.append({ + "value": candidate, + "start": m.start(), + "end": m.end(), + "type": "file_hash", + "algorithm": algorithm, + }) + return results diff --git a/python/functions/cybersecurity/extract_iocs.md b/python/functions/cybersecurity/extract_iocs.md new file mode 100644 index 00000000..9e8bc301 --- /dev/null +++ b/python/functions/cybersecurity/extract_iocs.md @@ -0,0 +1,59 @@ +--- +name: extract_iocs +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]" +description: "Pipeline puro que corre todos los extractores de IoC (IP, email, dominio, hash, wallet, CVE, MAC, telefono) y devuelve lista unificada con `type`. Deduplica spans contenidos. Si types se pasa, filtra los tipos a ejecutar." +tags: [ioc, pipeline, regex, extract, cybersecurity, python] +uses_functions: + - extract_ip_addresses_py_cybersecurity + - extract_emails_py_cybersecurity + - extract_domains_py_cybersecurity + - extract_file_hashes_py_cybersecurity + - extract_crypto_wallets_py_cybersecurity + - extract_cve_ids_py_cybersecurity + - extract_mac_addresses_py_cybersecurity + - extract_phone_numbers_py_cybersecurity +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [] +params: + - name: text + desc: "string de texto del que extraer IoCs" + - name: types + desc: "lista opcional de tipos a extraer (email, ip_address, domain, file_hash, crypto_wallet, cve_id, mac_address, phone_number). None = todos." +output: "lista de dicts {value, start, end, type, ...} ordenada por offset, sin spans contenidos" +tested: true +tests: + - "Pipeline corre todos los extractores" + - "Filtro por types subset" + - "Deduplica spans contenidos (dominio dentro de email)" + - "Tipos desconocidos se ignoran" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_iocs.py" +--- + +## Ejemplo + +```python +extract_iocs("Reach alice@example.com from 10.0.0.5; CVE-2023-1234") +# [{"value": "alice@example.com", "start": 6, "end": 23, "type": "email"}, +# {"value": "10.0.0.5", "start": 29, "end": 37, "type": "ip_address"}, +# {"value": "CVE-2023-1234", "start": 39, "end": 52, "type": "cve_id"}] + +extract_iocs("Only IPs: 8.8.8.8 here", types=["ip_address"]) +# [{"value": "8.8.8.8", ..., "type": "ip_address"}] +``` + +## Notas + +Es **funcion** y no `kind: pipeline` porque la regla del registry exige que pipelines sean impuros — esta no lo es: solo compone funciones puras y deduplica. Mantiene `purity: pure` con `uses_functions` no vacio. + +Deduplicacion: un match completamente contenido en otro (ej. `example.com` dentro de `alice@example.com`) se descarta. Empate exacto de span: gana el primero segun el orden de `_EXTRACTORS` en el modulo (email > ip > crypto_wallet > cve > mac > file_hash > phone > domain). Reordenar el dict cambia la prioridad si tienes overlaps habituales. + +Bench informal: ~50-80 ms por MB de texto sobre CPU moderna (depende del numero de matches). diff --git a/python/functions/cybersecurity/extract_iocs.py b/python/functions/cybersecurity/extract_iocs.py new file mode 100644 index 00000000..6d246ba5 --- /dev/null +++ b/python/functions/cybersecurity/extract_iocs.py @@ -0,0 +1,73 @@ +"""Pipeline puro: corre todos los extractores de IoC y unifica resultados.""" + +import sys +import os + +sys.path.insert(0, os.path.dirname(__file__)) + +from extract_ip_addresses import extract_ip_addresses +from extract_emails import extract_emails +from extract_domains import extract_domains +from extract_file_hashes import extract_file_hashes +from extract_crypto_wallets import extract_crypto_wallets +from extract_cve_ids import extract_cve_ids +from extract_mac_addresses import extract_mac_addresses +from extract_phone_numbers import extract_phone_numbers + + +_EXTRACTORS = { + "email": extract_emails, + "ip_address": extract_ip_addresses, + "crypto_wallet": extract_crypto_wallets, + "cve_id": extract_cve_ids, + "mac_address": extract_mac_addresses, + "file_hash": extract_file_hashes, + "phone_number": extract_phone_numbers, + "domain": extract_domains, +} + + +def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]: + """Extrae todos los IoCs del texto y unifica resultados con `type`. + + Si `types` es None, corre todos los extractores. En caso contrario, + ejecuta solo los tipos solicitados (los desconocidos se ignoran). + + Resultados se ordenan por offset y se desduplican: si un span esta + completamente contenido dentro de otro, el contenido se descarta + (ej. un dominio dentro de un email, o un SHA1 dentro de un wallet + ETH). Empate por span exacto: gana el que aparece primero en el + orden de extractores definido. + """ + if types is None: + types = list(_EXTRACTORS.keys()) + + raw: list[dict] = [] + for t in types: + extractor = _EXTRACTORS.get(t) + if extractor is None: + continue + raw.extend(extractor(text)) + + # Orden: por start ascendente, luego por longitud descendente para + # que el span mas amplio se procese antes y absorba los contenidos. + raw.sort(key=lambda r: (r["start"], -(r["end"] - r["start"]))) + + deduped: list[dict] = [] + for m in raw: + contained = any( + d["start"] <= m["start"] and d["end"] >= m["end"] + and (d["start"], d["end"]) != (m["start"], m["end"]) + for d in deduped + ) + if contained: + continue + # Empate exacto: si ya hay otro con el mismo span, no anadir. + if any( + (d["start"], d["end"]) == (m["start"], m["end"]) + for d in deduped + ): + continue + deduped.append(m) + + return deduped diff --git a/python/functions/cybersecurity/extract_ip_addresses.md b/python/functions/cybersecurity/extract_ip_addresses.md new file mode 100644 index 00000000..dd5fc862 --- /dev/null +++ b/python/functions/cybersecurity/extract_ip_addresses.md @@ -0,0 +1,45 @@ +--- +name: extract_ip_addresses +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_ip_addresses(text: str) -> list[dict]" +description: "Extrae direcciones IPv4 e IPv6 validas de un texto, con offsets start/end. Filtra candidatos invalidos via ipaddress (rechaza 999.999.999.999 y similares). No distingue privadas de publicas — el filtrado de relevancia es del caller." +tags: [ioc, ip, ipv4, ipv6, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re, ipaddress] +params: + - name: text + desc: "string de texto del que extraer IPs" +output: "lista de dicts con {value, start, end, type='ip_address'} por cada IP encontrada" +tested: true +tests: + - "IPv4 valida y rangos limite" + - "IPv4 invalida (>255 octeto) descartada" + - "IPv6 forma completa y comprimida" + - "IPv6 invalida descartada" + - "Texto sin IPs" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_ip_addresses.py" +--- + +## Ejemplo + +```python +extract_ip_addresses("Server 192.168.1.1 talks to 8.8.8.8") +# [{"value": "192.168.1.1", "start": 7, "end": 18, "type": "ip_address"}, +# {"value": "8.8.8.8", "start": 28, "end": 35, "type": "ip_address"}] + +extract_ip_addresses("not an IP: 999.999.999.999") +# [] +``` + +## Notas + +Usa `ipaddress.IPv4Address` / `IPv6Address` para validacion estructural — descarta `999.999.999.999` y otras combinaciones sintacticamente plausibles pero invalidas. IPs privadas (10/8, 172.16/12, 192.168/16) se extraen igual; el filtrado de relevancia es responsabilidad del caller. Pure — solo regex compilado y `ipaddress`, sin red ni disco. diff --git a/python/functions/cybersecurity/extract_ip_addresses.py b/python/functions/cybersecurity/extract_ip_addresses.py new file mode 100644 index 00000000..51cdfc26 --- /dev/null +++ b/python/functions/cybersecurity/extract_ip_addresses.py @@ -0,0 +1,53 @@ +"""Extrae IPv4 + IPv6 validas de un texto, con offsets.""" + +import ipaddress +import re + +_IPV4_CANDIDATE = re.compile(r"\b\d{1,3}(?:\.\d{1,3}){3}\b") +_IPV6_CANDIDATE = re.compile( + r"(? list[dict]: + """Extrae IPv4 e IPv6 validas con offsets. + + Filtra candidatos que no parsean como IP valida con `ipaddress`. No + distingue IP privadas (10.x, 192.168.x) de publicas — el filtrado de + relevancia es responsabilidad del caller. + """ + results: list[dict] = [] + + for m in _IPV4_CANDIDATE.finditer(text): + candidate = m.group(0) + try: + ipaddress.IPv4Address(candidate) + except ValueError: + continue + results.append({ + "value": candidate, + "start": m.start(), + "end": m.end(), + "type": "ip_address", + }) + + for m in _IPV6_CANDIDATE.finditer(text): + candidate = m.group(0).split("%", 1)[0] + if candidate.count(":") < 2: + continue + try: + ipaddress.IPv6Address(candidate) + except ValueError: + continue + results.append({ + "value": m.group(0), + "start": m.start(), + "end": m.end(), + "type": "ip_address", + }) + + results.sort(key=lambda r: r["start"]) + return results diff --git a/python/functions/cybersecurity/extract_mac_addresses.md b/python/functions/cybersecurity/extract_mac_addresses.md new file mode 100644 index 00000000..9dc86393 --- /dev/null +++ b/python/functions/cybersecurity/extract_mac_addresses.md @@ -0,0 +1,40 @@ +--- +name: extract_mac_addresses +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_mac_addresses(text: str) -> list[dict]" +description: "Extrae direcciones MAC en formato `xx:xx:xx:xx:xx:xx` o con guiones (`-`) de un texto, con offsets. Acepta hex en cualquier caso. Rechaza separadores mezclados." +tags: [ioc, mac, network, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer MAC addresses" +output: "lista de dicts con {value, start, end, type='mac_address'} por cada MAC encontrada" +tested: true +tests: + - "MAC con dos puntos" + - "MAC con guiones" + - "Separadores mezclados se rechazan" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_mac_addresses.py" +--- + +## Ejemplo + +```python +extract_mac_addresses("router 00:1A:2B:3C:4D:5E and AA-BB-CC-DD-EE-FF") +# [{"value": "00:1A:2B:3C:4D:5E", ..., "type": "mac_address"}, +# {"value": "AA-BB-CC-DD-EE-FF", ..., "type": "mac_address"}] +``` + +## Notas + +Cada direccion debe usar un solo separador (todos `:` o todos `-`). No se valida OUI ni se distingue unicast/multicast. Para extraer la parte de fabricante OUI: tomar los primeros 6 hex chars del `value` y consultar registro IEEE. diff --git a/python/functions/cybersecurity/extract_mac_addresses.py b/python/functions/cybersecurity/extract_mac_addresses.py new file mode 100644 index 00000000..5d041a49 --- /dev/null +++ b/python/functions/cybersecurity/extract_mac_addresses.py @@ -0,0 +1,31 @@ +"""Extrae direcciones MAC de un texto, con offsets.""" + +import re + +_MAC_REGEX = re.compile( + r"(? list[dict]: + """Extrae MAC addresses en formato `xx:xx:xx:xx:xx:xx` o con guiones. + + Ambos separadores deben ser uniformes (no mezcla `:` y `-` en una + misma direccion — se aceptan independientemente). Insensible a + mayusculas. + """ + results = [] + for m in _MAC_REGEX.finditer(text): + candidate = m.group(0) + # Asegurar separador uniforme. + if ":" in candidate and "-" in candidate: + continue + results.append({ + "value": candidate, + "start": m.start(), + "end": m.end(), + "type": "mac_address", + }) + return results diff --git a/python/functions/cybersecurity/extract_phone_numbers.md b/python/functions/cybersecurity/extract_phone_numbers.md new file mode 100644 index 00000000..c8e23371 --- /dev/null +++ b/python/functions/cybersecurity/extract_phone_numbers.md @@ -0,0 +1,40 @@ +--- +name: extract_phone_numbers +kind: function +lang: py +domain: cybersecurity +version: "1.0.0" +purity: pure +signature: "def extract_phone_numbers(text: str) -> list[dict]" +description: "Extrae numeros de telefono en formato E.164 (`+CC...`) y formato local ES (9 digitos empezando por 6/7/8/9), con offsets. Permite separadores `space` y `-` entre grupos." +tags: [ioc, phone, e164, spain, regex, extract, cybersecurity, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: [re] +params: + - name: text + desc: "string de texto del que extraer telefonos" +output: "lista de dicts con {value, start, end, type='phone_number'}" +tested: true +tests: + - "Numero E.164 con espacios" + - "Numero local ES de 9 digitos" + - "Numero demasiado corto se descarta" +test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py" +file_path: "python/functions/cybersecurity/extract_phone_numbers.py" +--- + +## Ejemplo + +```python +extract_phone_numbers("Llamar al +34 612 345 678 o al 912345678") +# [{"value": "+34 612 345 678", "start": 10, "end": 25, "type": "phone_number"}, +# {"value": "912345678", "start": 31, "end": 40, "type": "phone_number"}] +``` + +## Notas + +E.164 (ITU-T): entre 8 y 15 digitos tras el `+`. ES local: 9 digitos exactos, primero ∈ {6,7,8,9}. No se discrimina entre movil y fijo. No se normaliza el formato — el caller decide. Para parseo robusto multi-pais usar `phonenumbers` (libpostal-style), pero esa dependencia no es necesaria para extraer candidatos como IoC. diff --git a/python/functions/cybersecurity/extract_phone_numbers.py b/python/functions/cybersecurity/extract_phone_numbers.py new file mode 100644 index 00000000..ec1265c3 --- /dev/null +++ b/python/functions/cybersecurity/extract_phone_numbers.py @@ -0,0 +1,63 @@ +"""Extrae numeros de telefono (E.164 + formatos comunes ES/EU) con offsets.""" + +import re + +# E.164: + seguido de 8 a 15 digitos, opcionalmente con espacios/guiones internos. +_E164_REGEX = re.compile( + r"(? list[dict]: + """Extrae numeros de telefono E.164 y formato local ES de 9 digitos. + + Acepta separadores `space`, `-` entre grupos. E.164 requiere `+` y + entre 8 y 15 digitos (ITU-T). Formato local ES son 9 digitos que + empiezan por 6/7/8/9. Tras quitar separadores se valida la longitud + minima. + """ + seen_spans = set() + results = [] + + for m in _E164_REGEX.finditer(text): + candidate = m.group(0) + digits = re.sub(r"[^0-9]", "", candidate) + if not (8 <= len(digits) <= 15): + continue + span = (m.start(), m.end()) + if span in seen_spans: + continue + seen_spans.add(span) + results.append({ + "value": candidate, + "start": m.start(), + "end": m.end(), + "type": "phone_number", + }) + + for m in _ES_LOCAL_REGEX.finditer(text): + candidate = m.group(0) + digits = re.sub(r"[^0-9]", "", candidate) + if len(digits) != 9: + continue + span = (m.start(), m.end()) + if span in seen_spans: + continue + seen_spans.add(span) + results.append({ + "value": candidate, + "start": m.start(), + "end": m.end(), + "type": "phone_number", + }) + + results.sort(key=lambda r: r["start"]) + return results