feat(cybersecurity): 8 IoC regex extractors + extract_iocs pipeline puro
Extractores nuevos en python/functions/cybersecurity/: - extract_ip_addresses (IPv4 + IPv6 con validacion ipaddress) - extract_emails (RFC 5322 simplificado) - extract_domains (FQDNs con TLD valido, lista estatica) - extract_file_hashes (MD5/SHA1/SHA256/SHA512, algoritmo por longitud) - extract_crypto_wallets (BTC legacy + bech32, ETH 0x+40hex) - extract_cve_ids (CVE-YYYY-NNNN+) - extract_mac_addresses (xx:xx:xx + xx-xx-xx, separador uniforme) - extract_phone_numbers (E.164 + ES local 9 digitos) Pipeline: - extract_iocs corre todos, deduplica spans contenidos. Mantiene purity:pure (kind:function con uses_functions no vacio) porque la regla del registry exige que los pipelines sean impuros. Todas devuelven list[dict] con value/start/end/type para que el caller (issues 0038-0040) pueda reconciliar offsets con spans NER sin reparsing. Refs #0037 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -12,6 +12,15 @@ from .cybersecurity import (
|
||||
envelope_encrypt,
|
||||
envelope_decrypt,
|
||||
)
|
||||
from .extract_ip_addresses import extract_ip_addresses
|
||||
from .extract_emails import extract_emails
|
||||
from .extract_domains import extract_domains
|
||||
from .extract_file_hashes import extract_file_hashes
|
||||
from .extract_crypto_wallets import extract_crypto_wallets
|
||||
from .extract_cve_ids import extract_cve_ids
|
||||
from .extract_mac_addresses import extract_mac_addresses
|
||||
from .extract_phone_numbers import extract_phone_numbers
|
||||
from .extract_iocs import extract_iocs
|
||||
|
||||
__all__ = [
|
||||
"hash_sha256",
|
||||
@@ -26,4 +35,13 @@ __all__ = [
|
||||
"normalize_url",
|
||||
"envelope_encrypt",
|
||||
"envelope_decrypt",
|
||||
"extract_ip_addresses",
|
||||
"extract_emails",
|
||||
"extract_domains",
|
||||
"extract_file_hashes",
|
||||
"extract_crypto_wallets",
|
||||
"extract_cve_ids",
|
||||
"extract_mac_addresses",
|
||||
"extract_phone_numbers",
|
||||
"extract_iocs",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
---
|
||||
name: extract_crypto_wallets
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_crypto_wallets(text: str) -> list[dict]"
|
||||
description: "Extrae direcciones BTC (legacy P2PKH/P2SH + bech32) y ETH (0x + 40 hex) de un texto, con offsets y `asset` indicando la moneda. Validacion estructural por regex — no checksum."
|
||||
tags: [ioc, crypto, btc, eth, wallet, bitcoin, ethereum, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer wallets"
|
||||
output: "lista de dicts con {value, start, end, type='crypto_wallet', asset} por cada direccion encontrada"
|
||||
tested: true
|
||||
tests:
|
||||
- "BTC legacy (P2PKH y P2SH)"
|
||||
- "BTC bech32 (segwit)"
|
||||
- "ETH 0x + 40 hex"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_crypto_wallets.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_crypto_wallets("Send to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa or 0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1")
|
||||
# [{"value": "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", ..., "asset": "btc"},
|
||||
# {"value": "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1", ..., "asset": "eth"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
BTC legacy: empieza por `1` o `3`, base58 (sin 0/O/I/l), 26-35 chars. BTC bech32: prefijo `bc1`, alfabeto bech32. ETH: `0x` + 40 hex case-insensitive. No se valida checksum — un agente que requiera validacion completa debe correr base58check / EIP-55 sobre los `value` retornados.
|
||||
@@ -0,0 +1,44 @@
|
||||
"""Extrae wallets BTC y ETH de un texto, con offsets."""
|
||||
|
||||
import re
|
||||
|
||||
_BTC_LEGACY = re.compile(
|
||||
r"(?<![A-Za-z0-9])"
|
||||
r"[13][1-9A-HJ-NP-Za-km-z]{25,34}"
|
||||
r"(?![A-Za-z0-9])"
|
||||
)
|
||||
_BTC_BECH32 = re.compile(
|
||||
r"(?<![A-Za-z0-9])"
|
||||
r"bc1[02-9ac-hj-np-z]{6,87}"
|
||||
r"(?![A-Za-z0-9])"
|
||||
)
|
||||
_ETH_REGEX = re.compile(
|
||||
r"(?<![A-Za-z0-9])"
|
||||
r"0x[a-fA-F0-9]{40}"
|
||||
r"(?![A-Za-z0-9])"
|
||||
)
|
||||
|
||||
|
||||
def extract_crypto_wallets(text: str) -> list[dict]:
|
||||
"""Extrae direcciones BTC (legacy + bech32) y ETH con offsets.
|
||||
|
||||
BTC legacy (P2PKH/P2SH) empieza por `1` o `3`. BTC bech32 (segwit)
|
||||
empieza por `bc1`. ETH es `0x` seguido de 40 caracteres hex. No se
|
||||
valida checksum — la regex es estructural.
|
||||
"""
|
||||
results = []
|
||||
for regex, asset in (
|
||||
(_BTC_LEGACY, "btc"),
|
||||
(_BTC_BECH32, "btc"),
|
||||
(_ETH_REGEX, "eth"),
|
||||
):
|
||||
for m in regex.finditer(text):
|
||||
results.append({
|
||||
"value": m.group(0),
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "crypto_wallet",
|
||||
"asset": asset,
|
||||
})
|
||||
results.sort(key=lambda r: r["start"])
|
||||
return results
|
||||
@@ -0,0 +1,40 @@
|
||||
---
|
||||
name: extract_cve_ids
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_cve_ids(text: str) -> list[dict]"
|
||||
description: "Extrae IDs CVE en formato `CVE-YYYY-NNNN+` de un texto, con offsets. No valida que el CVE exista en NVD."
|
||||
tags: [ioc, cve, vulnerability, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer CVEs"
|
||||
output: "lista de dicts con {value, start, end, type='cve_id'} por cada CVE encontrado"
|
||||
tested: true
|
||||
tests:
|
||||
- "CVE basico (4 digitos)"
|
||||
- "CVE con 5+ digitos (post-2014)"
|
||||
- "Multiples CVEs en mismo texto"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_cve_ids.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_cve_ids("Patches CVE-2021-44228 and CVE-2024-1234567")
|
||||
# [{"value": "CVE-2021-44228", "start": 8, "end": 22, "type": "cve_id"},
|
||||
# {"value": "CVE-2024-1234567", "start": 27, "end": 43, "type": "cve_id"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Acepta el rango oficial NVD: año de 4 digitos seguido de 4 a 7 digitos. No valida que exista en NVD — solo estructura. La parte numerica creciente permite CVEs grandes (post-2014, donde NVD elimino el limite de 4 digitos).
|
||||
@@ -0,0 +1,27 @@
|
||||
"""Extrae identificadores CVE de un texto, con offsets."""
|
||||
|
||||
import re
|
||||
|
||||
_CVE_REGEX = re.compile(
|
||||
r"(?<![A-Za-z0-9])"
|
||||
r"CVE-\d{4}-\d{4,7}"
|
||||
r"(?![A-Za-z0-9])"
|
||||
)
|
||||
|
||||
|
||||
def extract_cve_ids(text: str) -> list[dict]:
|
||||
"""Extrae IDs CVE con formato `CVE-YYYY-NNNN+`.
|
||||
|
||||
Acepta el rango oficial (NVD): año de 4 digitos seguido de 4 a 7
|
||||
digitos. No valida que el CVE exista en NVD. Insensible a posicion
|
||||
(puede aparecer al inicio, en medio o al final del texto).
|
||||
"""
|
||||
return [
|
||||
{
|
||||
"value": m.group(0),
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "cve_id",
|
||||
}
|
||||
for m in _CVE_REGEX.finditer(text)
|
||||
]
|
||||
@@ -0,0 +1,40 @@
|
||||
---
|
||||
name: extract_domains
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_domains(text: str) -> list[dict]"
|
||||
description: "Extrae FQDNs (dominios con TLD valido) de un texto, con offsets start/end. Usa lista estatica de TLDs comunes (gTLD + ccTLD frecuentes). No valida DNS."
|
||||
tags: [ioc, domain, fqdn, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer dominios"
|
||||
output: "lista de dicts con {value, start, end, type='domain'} por cada FQDN reconocido"
|
||||
tested: true
|
||||
tests:
|
||||
- "Dominios con TLD valido se extraen"
|
||||
- "TLD desconocido se descarta"
|
||||
- "Subdominios profundos"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_domains.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_domains("visit example.com or sub.test.io for info")
|
||||
# [{"value": "example.com", "start": 6, "end": 17, "type": "domain"},
|
||||
# {"value": "sub.test.io", "start": 21, "end": 32, "type": "domain"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Lista de TLDs estatica (no IANA completa). Cubre los gTLD originales, los nuevos populares (app, dev, io, ai, cloud, xyz, ...) y ccTLDs frecuentes. Si necesitas un TLD nuevo, ampliar `_VALID_TLDS` en el .py. No usa publicsuffix (dependencia externa). Si el dominio aparece dentro de un email, se extrae igual — el pipeline `extract_iocs` deduplica por offsets.
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Extrae FQDNs validos de un texto, con offsets."""
|
||||
|
||||
import re
|
||||
|
||||
# Lista estatica de TLDs comunes (no exhaustiva — IANA tiene >1500).
|
||||
# Incluye los gTLD originales, los nuevos mas usados, y ccTLD frecuentes.
|
||||
_VALID_TLDS = frozenset({
|
||||
# gTLD originales
|
||||
"com", "org", "net", "edu", "gov", "mil", "int",
|
||||
# gTLD comunes
|
||||
"info", "biz", "name", "pro", "mobi", "asia", "jobs", "tel", "travel",
|
||||
"xxx", "post",
|
||||
# nuevos gTLD populares
|
||||
"app", "dev", "io", "ai", "tech", "cloud", "online", "site", "store",
|
||||
"xyz", "top", "shop", "club", "fun", "live", "blog", "page", "news",
|
||||
"media", "design", "studio", "agency", "io", "co", "me", "tv",
|
||||
# ccTLD frecuentes
|
||||
"us", "uk", "de", "fr", "es", "it", "nl", "be", "se", "no", "fi", "dk",
|
||||
"ru", "ua", "pl", "cz", "ch", "at", "pt", "gr", "ie", "tr",
|
||||
"ca", "mx", "br", "ar", "cl", "co", "pe", "ve", "uy",
|
||||
"cn", "jp", "kr", "in", "id", "th", "vn", "my", "sg", "ph", "tw", "hk",
|
||||
"au", "nz",
|
||||
"za", "eg", "ma", "ng", "ke",
|
||||
"il", "ae", "sa", "qa",
|
||||
"eu",
|
||||
})
|
||||
|
||||
# Componentes: letras/digitos con guiones internos, sin empezar/terminar en guion.
|
||||
_LABEL = r"[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?"
|
||||
_DOMAIN_REGEX = re.compile(
|
||||
rf"(?<![A-Za-z0-9.-])"
|
||||
rf"(?:{_LABEL}\.)+"
|
||||
rf"[A-Za-z]{{2,63}}"
|
||||
rf"(?![A-Za-z0-9.-])"
|
||||
)
|
||||
|
||||
|
||||
def extract_domains(text: str) -> list[dict]:
|
||||
"""Extrae FQDNs cuyo TLD esta en la lista estatica.
|
||||
|
||||
Solo captura nombres con al menos un punto y un TLD reconocido. No
|
||||
incluye URLs completas (ver `extract_urls`). Si el dominio aparece
|
||||
dentro de un email, igual se extrae — el caller puede deduplicar
|
||||
por offsets si lo necesita.
|
||||
"""
|
||||
results = []
|
||||
for m in _DOMAIN_REGEX.finditer(text):
|
||||
candidate = m.group(0)
|
||||
tld = candidate.rsplit(".", 1)[-1].lower()
|
||||
if tld not in _VALID_TLDS:
|
||||
continue
|
||||
results.append({
|
||||
"value": candidate,
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "domain",
|
||||
})
|
||||
return results
|
||||
@@ -0,0 +1,40 @@
|
||||
---
|
||||
name: extract_emails
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_emails(text: str) -> list[dict]"
|
||||
description: "Extrae direcciones de email (RFC 5322 simplificado) de un texto, con offsets start/end. No valida MX ni que el TLD exista — solo estructura sintactica."
|
||||
tags: [ioc, email, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer emails"
|
||||
output: "lista de dicts con {value, start, end, type='email'} por cada email encontrado"
|
||||
tested: true
|
||||
tests:
|
||||
- "Email simple"
|
||||
- "Multiples emails con caracteres validos en local part"
|
||||
- "No matchea texto sin @"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_emails.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_emails("Contact: alice@example.com or bob+work@sub.test.org")
|
||||
# [{"value": "alice@example.com", "start": 9, "end": 26, "type": "email"},
|
||||
# {"value": "bob+work@sub.test.org", "start": 30, "end": 51, "type": "email"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Acepta `._%+-` en parte local. El dominio exige al menos un punto y termina en componente alfanumerico de 1+ chars. No valida MX ni que el TLD aparezca en lista de TLDs validos — para extraer dominios independientemente, ver `extract_domains_py_cybersecurity`.
|
||||
@@ -0,0 +1,30 @@
|
||||
"""Extrae direcciones de email de un texto, con offsets."""
|
||||
|
||||
import re
|
||||
|
||||
_EMAIL_REGEX = re.compile(
|
||||
r"(?<![A-Za-z0-9._%+-])"
|
||||
r"[A-Za-z0-9._%+-]+"
|
||||
r"@"
|
||||
r"[A-Za-z0-9](?:[A-Za-z0-9-]*[A-Za-z0-9])?"
|
||||
r"(?:\.[A-Za-z0-9](?:[A-Za-z0-9-]*[A-Za-z0-9])?)+"
|
||||
r"(?![A-Za-z0-9._%+-])"
|
||||
)
|
||||
|
||||
|
||||
def extract_emails(text: str) -> list[dict]:
|
||||
"""Extrae emails (RFC 5322 simplificado) con offsets.
|
||||
|
||||
No valida MX ni que el TLD exista — solo estructura sintactica. La
|
||||
parte local acepta letras, digitos y `._%+-`. El dominio exige al
|
||||
menos un punto y termina en componente alfanumerico.
|
||||
"""
|
||||
return [
|
||||
{
|
||||
"value": m.group(0),
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "email",
|
||||
}
|
||||
for m in _EMAIL_REGEX.finditer(text)
|
||||
]
|
||||
@@ -0,0 +1,42 @@
|
||||
---
|
||||
name: extract_file_hashes
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_file_hashes(text: str) -> list[dict]"
|
||||
description: "Extrae hashes MD5/SHA1/SHA256/SHA512 de un texto, con offsets y algoritmo deducido por longitud (32, 40, 64 o 128 hex). Util para extraer IoCs de reportes de threat intelligence."
|
||||
tags: [ioc, hash, md5, sha1, sha256, sha512, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer hashes hex"
|
||||
output: "lista de dicts con {value, start, end, type='file_hash', algorithm} por cada hash encontrado"
|
||||
tested: true
|
||||
tests:
|
||||
- "MD5 (32 hex), SHA1 (40), SHA256 (64), SHA512 (128)"
|
||||
- "Longitudes intermedias se ignoran"
|
||||
- "Insensible a mayusculas en hex"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_file_hashes.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_file_hashes("MD5: 5d41402abc4b2a76b9719d911017c592 SHA1: aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d")
|
||||
# [{"value": "5d41402abc4b2a76b9719d911017c592", "start": 5, "end": 37,
|
||||
# "type": "file_hash", "algorithm": "md5"},
|
||||
# {"value": "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d", "start": 44, "end": 84,
|
||||
# "type": "file_hash", "algorithm": "sha1"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Detecta solo longitudes canonicas (32/40/64/128 hex). Una secuencia hex de 50 caracteres se ignora. Word-boundary `\b` evita matchear sub-strings de hex mas largo. ETH wallets (`0x` + 40 hex = 42 chars totales) NO matchean este extractor por el `\b` y la ausencia del prefijo `0x` en este patron — el pipeline `extract_iocs` deduplica overlaps si los hubiera.
|
||||
@@ -0,0 +1,40 @@
|
||||
"""Extrae hashes MD5/SHA1/SHA256/SHA512 de un texto, con offsets y algoritmo."""
|
||||
|
||||
import re
|
||||
|
||||
# Mas largo primero para evitar que un SHA256 quede como SHA1+resto.
|
||||
_HASH_LENGTHS = (
|
||||
(128, "sha512"),
|
||||
(64, "sha256"),
|
||||
(40, "sha1"),
|
||||
(32, "md5"),
|
||||
)
|
||||
|
||||
_HASH_CANDIDATE = re.compile(r"\b[A-Fa-f0-9]{32,128}\b")
|
||||
|
||||
|
||||
def extract_file_hashes(text: str) -> list[dict]:
|
||||
"""Extrae hashes hex con su algoritmo deducido por longitud.
|
||||
|
||||
Reconoce MD5 (32), SHA1 (40), SHA256 (64) y SHA512 (128). Hashes
|
||||
de longitudes intermedias se ignoran. Devuelve `algorithm` ademas
|
||||
de los campos estandar.
|
||||
"""
|
||||
results = []
|
||||
for m in _HASH_CANDIDATE.finditer(text):
|
||||
candidate = m.group(0)
|
||||
length = len(candidate)
|
||||
algorithm = next(
|
||||
(algo for size, algo in _HASH_LENGTHS if size == length),
|
||||
None,
|
||||
)
|
||||
if algorithm is None:
|
||||
continue
|
||||
results.append({
|
||||
"value": candidate,
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "file_hash",
|
||||
"algorithm": algorithm,
|
||||
})
|
||||
return results
|
||||
@@ -0,0 +1,59 @@
|
||||
---
|
||||
name: extract_iocs
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]"
|
||||
description: "Pipeline puro que corre todos los extractores de IoC (IP, email, dominio, hash, wallet, CVE, MAC, telefono) y devuelve lista unificada con `type`. Deduplica spans contenidos. Si types se pasa, filtra los tipos a ejecutar."
|
||||
tags: [ioc, pipeline, regex, extract, cybersecurity, python]
|
||||
uses_functions:
|
||||
- extract_ip_addresses_py_cybersecurity
|
||||
- extract_emails_py_cybersecurity
|
||||
- extract_domains_py_cybersecurity
|
||||
- extract_file_hashes_py_cybersecurity
|
||||
- extract_crypto_wallets_py_cybersecurity
|
||||
- extract_cve_ids_py_cybersecurity
|
||||
- extract_mac_addresses_py_cybersecurity
|
||||
- extract_phone_numbers_py_cybersecurity
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: []
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer IoCs"
|
||||
- name: types
|
||||
desc: "lista opcional de tipos a extraer (email, ip_address, domain, file_hash, crypto_wallet, cve_id, mac_address, phone_number). None = todos."
|
||||
output: "lista de dicts {value, start, end, type, ...} ordenada por offset, sin spans contenidos"
|
||||
tested: true
|
||||
tests:
|
||||
- "Pipeline corre todos los extractores"
|
||||
- "Filtro por types subset"
|
||||
- "Deduplica spans contenidos (dominio dentro de email)"
|
||||
- "Tipos desconocidos se ignoran"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_iocs.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_iocs("Reach alice@example.com from 10.0.0.5; CVE-2023-1234")
|
||||
# [{"value": "alice@example.com", "start": 6, "end": 23, "type": "email"},
|
||||
# {"value": "10.0.0.5", "start": 29, "end": 37, "type": "ip_address"},
|
||||
# {"value": "CVE-2023-1234", "start": 39, "end": 52, "type": "cve_id"}]
|
||||
|
||||
extract_iocs("Only IPs: 8.8.8.8 here", types=["ip_address"])
|
||||
# [{"value": "8.8.8.8", ..., "type": "ip_address"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Es **funcion** y no `kind: pipeline` porque la regla del registry exige que pipelines sean impuros — esta no lo es: solo compone funciones puras y deduplica. Mantiene `purity: pure` con `uses_functions` no vacio.
|
||||
|
||||
Deduplicacion: un match completamente contenido en otro (ej. `example.com` dentro de `alice@example.com`) se descarta. Empate exacto de span: gana el primero segun el orden de `_EXTRACTORS` en el modulo (email > ip > crypto_wallet > cve > mac > file_hash > phone > domain). Reordenar el dict cambia la prioridad si tienes overlaps habituales.
|
||||
|
||||
Bench informal: ~50-80 ms por MB de texto sobre CPU moderna (depende del numero de matches).
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Pipeline puro: corre todos los extractores de IoC y unifica resultados."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
from extract_ip_addresses import extract_ip_addresses
|
||||
from extract_emails import extract_emails
|
||||
from extract_domains import extract_domains
|
||||
from extract_file_hashes import extract_file_hashes
|
||||
from extract_crypto_wallets import extract_crypto_wallets
|
||||
from extract_cve_ids import extract_cve_ids
|
||||
from extract_mac_addresses import extract_mac_addresses
|
||||
from extract_phone_numbers import extract_phone_numbers
|
||||
|
||||
|
||||
_EXTRACTORS = {
|
||||
"email": extract_emails,
|
||||
"ip_address": extract_ip_addresses,
|
||||
"crypto_wallet": extract_crypto_wallets,
|
||||
"cve_id": extract_cve_ids,
|
||||
"mac_address": extract_mac_addresses,
|
||||
"file_hash": extract_file_hashes,
|
||||
"phone_number": extract_phone_numbers,
|
||||
"domain": extract_domains,
|
||||
}
|
||||
|
||||
|
||||
def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]:
|
||||
"""Extrae todos los IoCs del texto y unifica resultados con `type`.
|
||||
|
||||
Si `types` es None, corre todos los extractores. En caso contrario,
|
||||
ejecuta solo los tipos solicitados (los desconocidos se ignoran).
|
||||
|
||||
Resultados se ordenan por offset y se desduplican: si un span esta
|
||||
completamente contenido dentro de otro, el contenido se descarta
|
||||
(ej. un dominio dentro de un email, o un SHA1 dentro de un wallet
|
||||
ETH). Empate por span exacto: gana el que aparece primero en el
|
||||
orden de extractores definido.
|
||||
"""
|
||||
if types is None:
|
||||
types = list(_EXTRACTORS.keys())
|
||||
|
||||
raw: list[dict] = []
|
||||
for t in types:
|
||||
extractor = _EXTRACTORS.get(t)
|
||||
if extractor is None:
|
||||
continue
|
||||
raw.extend(extractor(text))
|
||||
|
||||
# Orden: por start ascendente, luego por longitud descendente para
|
||||
# que el span mas amplio se procese antes y absorba los contenidos.
|
||||
raw.sort(key=lambda r: (r["start"], -(r["end"] - r["start"])))
|
||||
|
||||
deduped: list[dict] = []
|
||||
for m in raw:
|
||||
contained = any(
|
||||
d["start"] <= m["start"] and d["end"] >= m["end"]
|
||||
and (d["start"], d["end"]) != (m["start"], m["end"])
|
||||
for d in deduped
|
||||
)
|
||||
if contained:
|
||||
continue
|
||||
# Empate exacto: si ya hay otro con el mismo span, no anadir.
|
||||
if any(
|
||||
(d["start"], d["end"]) == (m["start"], m["end"])
|
||||
for d in deduped
|
||||
):
|
||||
continue
|
||||
deduped.append(m)
|
||||
|
||||
return deduped
|
||||
@@ -0,0 +1,45 @@
|
||||
---
|
||||
name: extract_ip_addresses
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_ip_addresses(text: str) -> list[dict]"
|
||||
description: "Extrae direcciones IPv4 e IPv6 validas de un texto, con offsets start/end. Filtra candidatos invalidos via ipaddress (rechaza 999.999.999.999 y similares). No distingue privadas de publicas — el filtrado de relevancia es del caller."
|
||||
tags: [ioc, ip, ipv4, ipv6, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re, ipaddress]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer IPs"
|
||||
output: "lista de dicts con {value, start, end, type='ip_address'} por cada IP encontrada"
|
||||
tested: true
|
||||
tests:
|
||||
- "IPv4 valida y rangos limite"
|
||||
- "IPv4 invalida (>255 octeto) descartada"
|
||||
- "IPv6 forma completa y comprimida"
|
||||
- "IPv6 invalida descartada"
|
||||
- "Texto sin IPs"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_ip_addresses.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_ip_addresses("Server 192.168.1.1 talks to 8.8.8.8")
|
||||
# [{"value": "192.168.1.1", "start": 7, "end": 18, "type": "ip_address"},
|
||||
# {"value": "8.8.8.8", "start": 28, "end": 35, "type": "ip_address"}]
|
||||
|
||||
extract_ip_addresses("not an IP: 999.999.999.999")
|
||||
# []
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Usa `ipaddress.IPv4Address` / `IPv6Address` para validacion estructural — descarta `999.999.999.999` y otras combinaciones sintacticamente plausibles pero invalidas. IPs privadas (10/8, 172.16/12, 192.168/16) se extraen igual; el filtrado de relevancia es responsabilidad del caller. Pure — solo regex compilado y `ipaddress`, sin red ni disco.
|
||||
@@ -0,0 +1,53 @@
|
||||
"""Extrae IPv4 + IPv6 validas de un texto, con offsets."""
|
||||
|
||||
import ipaddress
|
||||
import re
|
||||
|
||||
_IPV4_CANDIDATE = re.compile(r"\b\d{1,3}(?:\.\d{1,3}){3}\b")
|
||||
_IPV6_CANDIDATE = re.compile(
|
||||
r"(?<![0-9A-Fa-f:])"
|
||||
r"(?:[0-9A-Fa-f]{0,4}:){2,7}[0-9A-Fa-f]{0,4}"
|
||||
r"(?:%[0-9A-Za-z]+)?"
|
||||
r"(?![0-9A-Fa-f:])"
|
||||
)
|
||||
|
||||
|
||||
def extract_ip_addresses(text: str) -> list[dict]:
|
||||
"""Extrae IPv4 e IPv6 validas con offsets.
|
||||
|
||||
Filtra candidatos que no parsean como IP valida con `ipaddress`. No
|
||||
distingue IP privadas (10.x, 192.168.x) de publicas — el filtrado de
|
||||
relevancia es responsabilidad del caller.
|
||||
"""
|
||||
results: list[dict] = []
|
||||
|
||||
for m in _IPV4_CANDIDATE.finditer(text):
|
||||
candidate = m.group(0)
|
||||
try:
|
||||
ipaddress.IPv4Address(candidate)
|
||||
except ValueError:
|
||||
continue
|
||||
results.append({
|
||||
"value": candidate,
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "ip_address",
|
||||
})
|
||||
|
||||
for m in _IPV6_CANDIDATE.finditer(text):
|
||||
candidate = m.group(0).split("%", 1)[0]
|
||||
if candidate.count(":") < 2:
|
||||
continue
|
||||
try:
|
||||
ipaddress.IPv6Address(candidate)
|
||||
except ValueError:
|
||||
continue
|
||||
results.append({
|
||||
"value": m.group(0),
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "ip_address",
|
||||
})
|
||||
|
||||
results.sort(key=lambda r: r["start"])
|
||||
return results
|
||||
@@ -0,0 +1,40 @@
|
||||
---
|
||||
name: extract_mac_addresses
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_mac_addresses(text: str) -> list[dict]"
|
||||
description: "Extrae direcciones MAC en formato `xx:xx:xx:xx:xx:xx` o con guiones (`-`) de un texto, con offsets. Acepta hex en cualquier caso. Rechaza separadores mezclados."
|
||||
tags: [ioc, mac, network, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer MAC addresses"
|
||||
output: "lista de dicts con {value, start, end, type='mac_address'} por cada MAC encontrada"
|
||||
tested: true
|
||||
tests:
|
||||
- "MAC con dos puntos"
|
||||
- "MAC con guiones"
|
||||
- "Separadores mezclados se rechazan"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_mac_addresses.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_mac_addresses("router 00:1A:2B:3C:4D:5E and AA-BB-CC-DD-EE-FF")
|
||||
# [{"value": "00:1A:2B:3C:4D:5E", ..., "type": "mac_address"},
|
||||
# {"value": "AA-BB-CC-DD-EE-FF", ..., "type": "mac_address"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Cada direccion debe usar un solo separador (todos `:` o todos `-`). No se valida OUI ni se distingue unicast/multicast. Para extraer la parte de fabricante OUI: tomar los primeros 6 hex chars del `value` y consultar registro IEEE.
|
||||
@@ -0,0 +1,31 @@
|
||||
"""Extrae direcciones MAC de un texto, con offsets."""
|
||||
|
||||
import re
|
||||
|
||||
_MAC_REGEX = re.compile(
|
||||
r"(?<![A-Fa-f0-9:-])"
|
||||
r"(?:[A-Fa-f0-9]{2}[:-]){5}[A-Fa-f0-9]{2}"
|
||||
r"(?![A-Fa-f0-9:-])"
|
||||
)
|
||||
|
||||
|
||||
def extract_mac_addresses(text: str) -> list[dict]:
|
||||
"""Extrae MAC addresses en formato `xx:xx:xx:xx:xx:xx` o con guiones.
|
||||
|
||||
Ambos separadores deben ser uniformes (no mezcla `:` y `-` en una
|
||||
misma direccion — se aceptan independientemente). Insensible a
|
||||
mayusculas.
|
||||
"""
|
||||
results = []
|
||||
for m in _MAC_REGEX.finditer(text):
|
||||
candidate = m.group(0)
|
||||
# Asegurar separador uniforme.
|
||||
if ":" in candidate and "-" in candidate:
|
||||
continue
|
||||
results.append({
|
||||
"value": candidate,
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "mac_address",
|
||||
})
|
||||
return results
|
||||
@@ -0,0 +1,40 @@
|
||||
---
|
||||
name: extract_phone_numbers
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_phone_numbers(text: str) -> list[dict]"
|
||||
description: "Extrae numeros de telefono en formato E.164 (`+CC...`) y formato local ES (9 digitos empezando por 6/7/8/9), con offsets. Permite separadores `space` y `-` entre grupos."
|
||||
tags: [ioc, phone, e164, spain, regex, extract, cybersecurity, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
params:
|
||||
- name: text
|
||||
desc: "string de texto del que extraer telefonos"
|
||||
output: "lista de dicts con {value, start, end, type='phone_number'}"
|
||||
tested: true
|
||||
tests:
|
||||
- "Numero E.164 con espacios"
|
||||
- "Numero local ES de 9 digitos"
|
||||
- "Numero demasiado corto se descarta"
|
||||
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
|
||||
file_path: "python/functions/cybersecurity/extract_phone_numbers.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_phone_numbers("Llamar al +34 612 345 678 o al 912345678")
|
||||
# [{"value": "+34 612 345 678", "start": 10, "end": 25, "type": "phone_number"},
|
||||
# {"value": "912345678", "start": 31, "end": 40, "type": "phone_number"}]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
E.164 (ITU-T): entre 8 y 15 digitos tras el `+`. ES local: 9 digitos exactos, primero ∈ {6,7,8,9}. No se discrimina entre movil y fijo. No se normaliza el formato — el caller decide. Para parseo robusto multi-pais usar `phonenumbers` (libpostal-style), pero esa dependencia no es necesaria para extraer candidatos como IoC.
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Extrae numeros de telefono (E.164 + formatos comunes ES/EU) con offsets."""
|
||||
|
||||
import re
|
||||
|
||||
# E.164: + seguido de 8 a 15 digitos, opcionalmente con espacios/guiones internos.
|
||||
_E164_REGEX = re.compile(
|
||||
r"(?<![A-Za-z0-9])"
|
||||
r"\+\d{1,3}[\s\-]?\d{1,4}(?:[\s\-]?\d{1,4}){1,4}"
|
||||
r"(?![A-Za-z0-9])"
|
||||
)
|
||||
# ES: 9 digitos empezando por 6, 7, 8 o 9 (movil/fijo).
|
||||
_ES_LOCAL_REGEX = re.compile(
|
||||
r"(?<![A-Za-z0-9+])"
|
||||
r"[6789]\d{2}[\s\-]?\d{3}[\s\-]?\d{3}"
|
||||
r"(?![A-Za-z0-9])"
|
||||
)
|
||||
|
||||
|
||||
def extract_phone_numbers(text: str) -> list[dict]:
|
||||
"""Extrae numeros de telefono E.164 y formato local ES de 9 digitos.
|
||||
|
||||
Acepta separadores `space`, `-` entre grupos. E.164 requiere `+` y
|
||||
entre 8 y 15 digitos (ITU-T). Formato local ES son 9 digitos que
|
||||
empiezan por 6/7/8/9. Tras quitar separadores se valida la longitud
|
||||
minima.
|
||||
"""
|
||||
seen_spans = set()
|
||||
results = []
|
||||
|
||||
for m in _E164_REGEX.finditer(text):
|
||||
candidate = m.group(0)
|
||||
digits = re.sub(r"[^0-9]", "", candidate)
|
||||
if not (8 <= len(digits) <= 15):
|
||||
continue
|
||||
span = (m.start(), m.end())
|
||||
if span in seen_spans:
|
||||
continue
|
||||
seen_spans.add(span)
|
||||
results.append({
|
||||
"value": candidate,
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "phone_number",
|
||||
})
|
||||
|
||||
for m in _ES_LOCAL_REGEX.finditer(text):
|
||||
candidate = m.group(0)
|
||||
digits = re.sub(r"[^0-9]", "", candidate)
|
||||
if len(digits) != 9:
|
||||
continue
|
||||
span = (m.start(), m.end())
|
||||
if span in seen_spans:
|
||||
continue
|
||||
seen_spans.add(span)
|
||||
results.append({
|
||||
"value": candidate,
|
||||
"start": m.start(),
|
||||
"end": m.end(),
|
||||
"type": "phone_number",
|
||||
})
|
||||
|
||||
results.sort(key=lambda r: r["start"])
|
||||
return results
|
||||
Reference in New Issue
Block a user