fix(fn-run): propagar stdout/stderr de bash functions library-style #1

Open
dataforge wants to merge 537 commits from auto/0077-fn-run-bash-mudo into master
19 changed files with 823 additions and 0 deletions
Showing only changes of commit dff0c0d2b7 - Show all commits
@@ -12,6 +12,15 @@ from .cybersecurity import (
envelope_encrypt,
envelope_decrypt,
)
from .extract_ip_addresses import extract_ip_addresses
from .extract_emails import extract_emails
from .extract_domains import extract_domains
from .extract_file_hashes import extract_file_hashes
from .extract_crypto_wallets import extract_crypto_wallets
from .extract_cve_ids import extract_cve_ids
from .extract_mac_addresses import extract_mac_addresses
from .extract_phone_numbers import extract_phone_numbers
from .extract_iocs import extract_iocs
__all__ = [
"hash_sha256",
@@ -26,4 +35,13 @@ __all__ = [
"normalize_url",
"envelope_encrypt",
"envelope_decrypt",
"extract_ip_addresses",
"extract_emails",
"extract_domains",
"extract_file_hashes",
"extract_crypto_wallets",
"extract_cve_ids",
"extract_mac_addresses",
"extract_phone_numbers",
"extract_iocs",
]
@@ -0,0 +1,40 @@
---
name: extract_crypto_wallets
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_crypto_wallets(text: str) -> list[dict]"
description: "Extrae direcciones BTC (legacy P2PKH/P2SH + bech32) y ETH (0x + 40 hex) de un texto, con offsets y `asset` indicando la moneda. Validacion estructural por regex — no checksum."
tags: [ioc, crypto, btc, eth, wallet, bitcoin, ethereum, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer wallets"
output: "lista de dicts con {value, start, end, type='crypto_wallet', asset} por cada direccion encontrada"
tested: true
tests:
- "BTC legacy (P2PKH y P2SH)"
- "BTC bech32 (segwit)"
- "ETH 0x + 40 hex"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_crypto_wallets.py"
---
## Ejemplo
```python
extract_crypto_wallets("Send to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa or 0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1")
# [{"value": "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", ..., "asset": "btc"},
# {"value": "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb1", ..., "asset": "eth"}]
```
## Notas
BTC legacy: empieza por `1` o `3`, base58 (sin 0/O/I/l), 26-35 chars. BTC bech32: prefijo `bc1`, alfabeto bech32. ETH: `0x` + 40 hex case-insensitive. No se valida checksum — un agente que requiera validacion completa debe correr base58check / EIP-55 sobre los `value` retornados.
@@ -0,0 +1,44 @@
"""Extrae wallets BTC y ETH de un texto, con offsets."""
import re
_BTC_LEGACY = re.compile(
r"(?<![A-Za-z0-9])"
r"[13][1-9A-HJ-NP-Za-km-z]{25,34}"
r"(?![A-Za-z0-9])"
)
_BTC_BECH32 = re.compile(
r"(?<![A-Za-z0-9])"
r"bc1[02-9ac-hj-np-z]{6,87}"
r"(?![A-Za-z0-9])"
)
_ETH_REGEX = re.compile(
r"(?<![A-Za-z0-9])"
r"0x[a-fA-F0-9]{40}"
r"(?![A-Za-z0-9])"
)
def extract_crypto_wallets(text: str) -> list[dict]:
"""Extrae direcciones BTC (legacy + bech32) y ETH con offsets.
BTC legacy (P2PKH/P2SH) empieza por `1` o `3`. BTC bech32 (segwit)
empieza por `bc1`. ETH es `0x` seguido de 40 caracteres hex. No se
valida checksum — la regex es estructural.
"""
results = []
for regex, asset in (
(_BTC_LEGACY, "btc"),
(_BTC_BECH32, "btc"),
(_ETH_REGEX, "eth"),
):
for m in regex.finditer(text):
results.append({
"value": m.group(0),
"start": m.start(),
"end": m.end(),
"type": "crypto_wallet",
"asset": asset,
})
results.sort(key=lambda r: r["start"])
return results
@@ -0,0 +1,40 @@
---
name: extract_cve_ids
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_cve_ids(text: str) -> list[dict]"
description: "Extrae IDs CVE en formato `CVE-YYYY-NNNN+` de un texto, con offsets. No valida que el CVE exista en NVD."
tags: [ioc, cve, vulnerability, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer CVEs"
output: "lista de dicts con {value, start, end, type='cve_id'} por cada CVE encontrado"
tested: true
tests:
- "CVE basico (4 digitos)"
- "CVE con 5+ digitos (post-2014)"
- "Multiples CVEs en mismo texto"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_cve_ids.py"
---
## Ejemplo
```python
extract_cve_ids("Patches CVE-2021-44228 and CVE-2024-1234567")
# [{"value": "CVE-2021-44228", "start": 8, "end": 22, "type": "cve_id"},
# {"value": "CVE-2024-1234567", "start": 27, "end": 43, "type": "cve_id"}]
```
## Notas
Acepta el rango oficial NVD: año de 4 digitos seguido de 4 a 7 digitos. No valida que exista en NVD — solo estructura. La parte numerica creciente permite CVEs grandes (post-2014, donde NVD elimino el limite de 4 digitos).
@@ -0,0 +1,27 @@
"""Extrae identificadores CVE de un texto, con offsets."""
import re
_CVE_REGEX = re.compile(
r"(?<![A-Za-z0-9])"
r"CVE-\d{4}-\d{4,7}"
r"(?![A-Za-z0-9])"
)
def extract_cve_ids(text: str) -> list[dict]:
"""Extrae IDs CVE con formato `CVE-YYYY-NNNN+`.
Acepta el rango oficial (NVD): año de 4 digitos seguido de 4 a 7
digitos. No valida que el CVE exista en NVD. Insensible a posicion
(puede aparecer al inicio, en medio o al final del texto).
"""
return [
{
"value": m.group(0),
"start": m.start(),
"end": m.end(),
"type": "cve_id",
}
for m in _CVE_REGEX.finditer(text)
]
@@ -0,0 +1,40 @@
---
name: extract_domains
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_domains(text: str) -> list[dict]"
description: "Extrae FQDNs (dominios con TLD valido) de un texto, con offsets start/end. Usa lista estatica de TLDs comunes (gTLD + ccTLD frecuentes). No valida DNS."
tags: [ioc, domain, fqdn, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer dominios"
output: "lista de dicts con {value, start, end, type='domain'} por cada FQDN reconocido"
tested: true
tests:
- "Dominios con TLD valido se extraen"
- "TLD desconocido se descarta"
- "Subdominios profundos"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_domains.py"
---
## Ejemplo
```python
extract_domains("visit example.com or sub.test.io for info")
# [{"value": "example.com", "start": 6, "end": 17, "type": "domain"},
# {"value": "sub.test.io", "start": 21, "end": 32, "type": "domain"}]
```
## Notas
Lista de TLDs estatica (no IANA completa). Cubre los gTLD originales, los nuevos populares (app, dev, io, ai, cloud, xyz, ...) y ccTLDs frecuentes. Si necesitas un TLD nuevo, ampliar `_VALID_TLDS` en el .py. No usa publicsuffix (dependencia externa). Si el dominio aparece dentro de un email, se extrae igual — el pipeline `extract_iocs` deduplica por offsets.
@@ -0,0 +1,58 @@
"""Extrae FQDNs validos de un texto, con offsets."""
import re
# Lista estatica de TLDs comunes (no exhaustiva — IANA tiene >1500).
# Incluye los gTLD originales, los nuevos mas usados, y ccTLD frecuentes.
_VALID_TLDS = frozenset({
# gTLD originales
"com", "org", "net", "edu", "gov", "mil", "int",
# gTLD comunes
"info", "biz", "name", "pro", "mobi", "asia", "jobs", "tel", "travel",
"xxx", "post",
# nuevos gTLD populares
"app", "dev", "io", "ai", "tech", "cloud", "online", "site", "store",
"xyz", "top", "shop", "club", "fun", "live", "blog", "page", "news",
"media", "design", "studio", "agency", "io", "co", "me", "tv",
# ccTLD frecuentes
"us", "uk", "de", "fr", "es", "it", "nl", "be", "se", "no", "fi", "dk",
"ru", "ua", "pl", "cz", "ch", "at", "pt", "gr", "ie", "tr",
"ca", "mx", "br", "ar", "cl", "co", "pe", "ve", "uy",
"cn", "jp", "kr", "in", "id", "th", "vn", "my", "sg", "ph", "tw", "hk",
"au", "nz",
"za", "eg", "ma", "ng", "ke",
"il", "ae", "sa", "qa",
"eu",
})
# Componentes: letras/digitos con guiones internos, sin empezar/terminar en guion.
_LABEL = r"[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?"
_DOMAIN_REGEX = re.compile(
rf"(?<![A-Za-z0-9.-])"
rf"(?:{_LABEL}\.)+"
rf"[A-Za-z]{{2,63}}"
rf"(?![A-Za-z0-9.-])"
)
def extract_domains(text: str) -> list[dict]:
"""Extrae FQDNs cuyo TLD esta en la lista estatica.
Solo captura nombres con al menos un punto y un TLD reconocido. No
incluye URLs completas (ver `extract_urls`). Si el dominio aparece
dentro de un email, igual se extrae — el caller puede deduplicar
por offsets si lo necesita.
"""
results = []
for m in _DOMAIN_REGEX.finditer(text):
candidate = m.group(0)
tld = candidate.rsplit(".", 1)[-1].lower()
if tld not in _VALID_TLDS:
continue
results.append({
"value": candidate,
"start": m.start(),
"end": m.end(),
"type": "domain",
})
return results
@@ -0,0 +1,40 @@
---
name: extract_emails
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_emails(text: str) -> list[dict]"
description: "Extrae direcciones de email (RFC 5322 simplificado) de un texto, con offsets start/end. No valida MX ni que el TLD exista — solo estructura sintactica."
tags: [ioc, email, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer emails"
output: "lista de dicts con {value, start, end, type='email'} por cada email encontrado"
tested: true
tests:
- "Email simple"
- "Multiples emails con caracteres validos en local part"
- "No matchea texto sin @"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_emails.py"
---
## Ejemplo
```python
extract_emails("Contact: alice@example.com or bob+work@sub.test.org")
# [{"value": "alice@example.com", "start": 9, "end": 26, "type": "email"},
# {"value": "bob+work@sub.test.org", "start": 30, "end": 51, "type": "email"}]
```
## Notas
Acepta `._%+-` en parte local. El dominio exige al menos un punto y termina en componente alfanumerico de 1+ chars. No valida MX ni que el TLD aparezca en lista de TLDs validos — para extraer dominios independientemente, ver `extract_domains_py_cybersecurity`.
@@ -0,0 +1,30 @@
"""Extrae direcciones de email de un texto, con offsets."""
import re
_EMAIL_REGEX = re.compile(
r"(?<![A-Za-z0-9._%+-])"
r"[A-Za-z0-9._%+-]+"
r"@"
r"[A-Za-z0-9](?:[A-Za-z0-9-]*[A-Za-z0-9])?"
r"(?:\.[A-Za-z0-9](?:[A-Za-z0-9-]*[A-Za-z0-9])?)+"
r"(?![A-Za-z0-9._%+-])"
)
def extract_emails(text: str) -> list[dict]:
"""Extrae emails (RFC 5322 simplificado) con offsets.
No valida MX ni que el TLD exista — solo estructura sintactica. La
parte local acepta letras, digitos y `._%+-`. El dominio exige al
menos un punto y termina en componente alfanumerico.
"""
return [
{
"value": m.group(0),
"start": m.start(),
"end": m.end(),
"type": "email",
}
for m in _EMAIL_REGEX.finditer(text)
]
@@ -0,0 +1,42 @@
---
name: extract_file_hashes
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_file_hashes(text: str) -> list[dict]"
description: "Extrae hashes MD5/SHA1/SHA256/SHA512 de un texto, con offsets y algoritmo deducido por longitud (32, 40, 64 o 128 hex). Util para extraer IoCs de reportes de threat intelligence."
tags: [ioc, hash, md5, sha1, sha256, sha512, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer hashes hex"
output: "lista de dicts con {value, start, end, type='file_hash', algorithm} por cada hash encontrado"
tested: true
tests:
- "MD5 (32 hex), SHA1 (40), SHA256 (64), SHA512 (128)"
- "Longitudes intermedias se ignoran"
- "Insensible a mayusculas en hex"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_file_hashes.py"
---
## Ejemplo
```python
extract_file_hashes("MD5: 5d41402abc4b2a76b9719d911017c592 SHA1: aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d")
# [{"value": "5d41402abc4b2a76b9719d911017c592", "start": 5, "end": 37,
# "type": "file_hash", "algorithm": "md5"},
# {"value": "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d", "start": 44, "end": 84,
# "type": "file_hash", "algorithm": "sha1"}]
```
## Notas
Detecta solo longitudes canonicas (32/40/64/128 hex). Una secuencia hex de 50 caracteres se ignora. Word-boundary `\b` evita matchear sub-strings de hex mas largo. ETH wallets (`0x` + 40 hex = 42 chars totales) NO matchean este extractor por el `\b` y la ausencia del prefijo `0x` en este patron — el pipeline `extract_iocs` deduplica overlaps si los hubiera.
@@ -0,0 +1,40 @@
"""Extrae hashes MD5/SHA1/SHA256/SHA512 de un texto, con offsets y algoritmo."""
import re
# Mas largo primero para evitar que un SHA256 quede como SHA1+resto.
_HASH_LENGTHS = (
(128, "sha512"),
(64, "sha256"),
(40, "sha1"),
(32, "md5"),
)
_HASH_CANDIDATE = re.compile(r"\b[A-Fa-f0-9]{32,128}\b")
def extract_file_hashes(text: str) -> list[dict]:
"""Extrae hashes hex con su algoritmo deducido por longitud.
Reconoce MD5 (32), SHA1 (40), SHA256 (64) y SHA512 (128). Hashes
de longitudes intermedias se ignoran. Devuelve `algorithm` ademas
de los campos estandar.
"""
results = []
for m in _HASH_CANDIDATE.finditer(text):
candidate = m.group(0)
length = len(candidate)
algorithm = next(
(algo for size, algo in _HASH_LENGTHS if size == length),
None,
)
if algorithm is None:
continue
results.append({
"value": candidate,
"start": m.start(),
"end": m.end(),
"type": "file_hash",
"algorithm": algorithm,
})
return results
@@ -0,0 +1,59 @@
---
name: extract_iocs
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]"
description: "Pipeline puro que corre todos los extractores de IoC (IP, email, dominio, hash, wallet, CVE, MAC, telefono) y devuelve lista unificada con `type`. Deduplica spans contenidos. Si types se pasa, filtra los tipos a ejecutar."
tags: [ioc, pipeline, regex, extract, cybersecurity, python]
uses_functions:
- extract_ip_addresses_py_cybersecurity
- extract_emails_py_cybersecurity
- extract_domains_py_cybersecurity
- extract_file_hashes_py_cybersecurity
- extract_crypto_wallets_py_cybersecurity
- extract_cve_ids_py_cybersecurity
- extract_mac_addresses_py_cybersecurity
- extract_phone_numbers_py_cybersecurity
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: text
desc: "string de texto del que extraer IoCs"
- name: types
desc: "lista opcional de tipos a extraer (email, ip_address, domain, file_hash, crypto_wallet, cve_id, mac_address, phone_number). None = todos."
output: "lista de dicts {value, start, end, type, ...} ordenada por offset, sin spans contenidos"
tested: true
tests:
- "Pipeline corre todos los extractores"
- "Filtro por types subset"
- "Deduplica spans contenidos (dominio dentro de email)"
- "Tipos desconocidos se ignoran"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_iocs.py"
---
## Ejemplo
```python
extract_iocs("Reach alice@example.com from 10.0.0.5; CVE-2023-1234")
# [{"value": "alice@example.com", "start": 6, "end": 23, "type": "email"},
# {"value": "10.0.0.5", "start": 29, "end": 37, "type": "ip_address"},
# {"value": "CVE-2023-1234", "start": 39, "end": 52, "type": "cve_id"}]
extract_iocs("Only IPs: 8.8.8.8 here", types=["ip_address"])
# [{"value": "8.8.8.8", ..., "type": "ip_address"}]
```
## Notas
Es **funcion** y no `kind: pipeline` porque la regla del registry exige que pipelines sean impuros — esta no lo es: solo compone funciones puras y deduplica. Mantiene `purity: pure` con `uses_functions` no vacio.
Deduplicacion: un match completamente contenido en otro (ej. `example.com` dentro de `alice@example.com`) se descarta. Empate exacto de span: gana el primero segun el orden de `_EXTRACTORS` en el modulo (email > ip > crypto_wallet > cve > mac > file_hash > phone > domain). Reordenar el dict cambia la prioridad si tienes overlaps habituales.
Bench informal: ~50-80 ms por MB de texto sobre CPU moderna (depende del numero de matches).
@@ -0,0 +1,73 @@
"""Pipeline puro: corre todos los extractores de IoC y unifica resultados."""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from extract_ip_addresses import extract_ip_addresses
from extract_emails import extract_emails
from extract_domains import extract_domains
from extract_file_hashes import extract_file_hashes
from extract_crypto_wallets import extract_crypto_wallets
from extract_cve_ids import extract_cve_ids
from extract_mac_addresses import extract_mac_addresses
from extract_phone_numbers import extract_phone_numbers
_EXTRACTORS = {
"email": extract_emails,
"ip_address": extract_ip_addresses,
"crypto_wallet": extract_crypto_wallets,
"cve_id": extract_cve_ids,
"mac_address": extract_mac_addresses,
"file_hash": extract_file_hashes,
"phone_number": extract_phone_numbers,
"domain": extract_domains,
}
def extract_iocs(text: str, types: list[str] | None = None) -> list[dict]:
"""Extrae todos los IoCs del texto y unifica resultados con `type`.
Si `types` es None, corre todos los extractores. En caso contrario,
ejecuta solo los tipos solicitados (los desconocidos se ignoran).
Resultados se ordenan por offset y se desduplican: si un span esta
completamente contenido dentro de otro, el contenido se descarta
(ej. un dominio dentro de un email, o un SHA1 dentro de un wallet
ETH). Empate por span exacto: gana el que aparece primero en el
orden de extractores definido.
"""
if types is None:
types = list(_EXTRACTORS.keys())
raw: list[dict] = []
for t in types:
extractor = _EXTRACTORS.get(t)
if extractor is None:
continue
raw.extend(extractor(text))
# Orden: por start ascendente, luego por longitud descendente para
# que el span mas amplio se procese antes y absorba los contenidos.
raw.sort(key=lambda r: (r["start"], -(r["end"] - r["start"])))
deduped: list[dict] = []
for m in raw:
contained = any(
d["start"] <= m["start"] and d["end"] >= m["end"]
and (d["start"], d["end"]) != (m["start"], m["end"])
for d in deduped
)
if contained:
continue
# Empate exacto: si ya hay otro con el mismo span, no anadir.
if any(
(d["start"], d["end"]) == (m["start"], m["end"])
for d in deduped
):
continue
deduped.append(m)
return deduped
@@ -0,0 +1,45 @@
---
name: extract_ip_addresses
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_ip_addresses(text: str) -> list[dict]"
description: "Extrae direcciones IPv4 e IPv6 validas de un texto, con offsets start/end. Filtra candidatos invalidos via ipaddress (rechaza 999.999.999.999 y similares). No distingue privadas de publicas — el filtrado de relevancia es del caller."
tags: [ioc, ip, ipv4, ipv6, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re, ipaddress]
params:
- name: text
desc: "string de texto del que extraer IPs"
output: "lista de dicts con {value, start, end, type='ip_address'} por cada IP encontrada"
tested: true
tests:
- "IPv4 valida y rangos limite"
- "IPv4 invalida (>255 octeto) descartada"
- "IPv6 forma completa y comprimida"
- "IPv6 invalida descartada"
- "Texto sin IPs"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_ip_addresses.py"
---
## Ejemplo
```python
extract_ip_addresses("Server 192.168.1.1 talks to 8.8.8.8")
# [{"value": "192.168.1.1", "start": 7, "end": 18, "type": "ip_address"},
# {"value": "8.8.8.8", "start": 28, "end": 35, "type": "ip_address"}]
extract_ip_addresses("not an IP: 999.999.999.999")
# []
```
## Notas
Usa `ipaddress.IPv4Address` / `IPv6Address` para validacion estructural — descarta `999.999.999.999` y otras combinaciones sintacticamente plausibles pero invalidas. IPs privadas (10/8, 172.16/12, 192.168/16) se extraen igual; el filtrado de relevancia es responsabilidad del caller. Pure — solo regex compilado y `ipaddress`, sin red ni disco.
@@ -0,0 +1,53 @@
"""Extrae IPv4 + IPv6 validas de un texto, con offsets."""
import ipaddress
import re
_IPV4_CANDIDATE = re.compile(r"\b\d{1,3}(?:\.\d{1,3}){3}\b")
_IPV6_CANDIDATE = re.compile(
r"(?<![0-9A-Fa-f:])"
r"(?:[0-9A-Fa-f]{0,4}:){2,7}[0-9A-Fa-f]{0,4}"
r"(?:%[0-9A-Za-z]+)?"
r"(?![0-9A-Fa-f:])"
)
def extract_ip_addresses(text: str) -> list[dict]:
"""Extrae IPv4 e IPv6 validas con offsets.
Filtra candidatos que no parsean como IP valida con `ipaddress`. No
distingue IP privadas (10.x, 192.168.x) de publicas — el filtrado de
relevancia es responsabilidad del caller.
"""
results: list[dict] = []
for m in _IPV4_CANDIDATE.finditer(text):
candidate = m.group(0)
try:
ipaddress.IPv4Address(candidate)
except ValueError:
continue
results.append({
"value": candidate,
"start": m.start(),
"end": m.end(),
"type": "ip_address",
})
for m in _IPV6_CANDIDATE.finditer(text):
candidate = m.group(0).split("%", 1)[0]
if candidate.count(":") < 2:
continue
try:
ipaddress.IPv6Address(candidate)
except ValueError:
continue
results.append({
"value": m.group(0),
"start": m.start(),
"end": m.end(),
"type": "ip_address",
})
results.sort(key=lambda r: r["start"])
return results
@@ -0,0 +1,40 @@
---
name: extract_mac_addresses
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_mac_addresses(text: str) -> list[dict]"
description: "Extrae direcciones MAC en formato `xx:xx:xx:xx:xx:xx` o con guiones (`-`) de un texto, con offsets. Acepta hex en cualquier caso. Rechaza separadores mezclados."
tags: [ioc, mac, network, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer MAC addresses"
output: "lista de dicts con {value, start, end, type='mac_address'} por cada MAC encontrada"
tested: true
tests:
- "MAC con dos puntos"
- "MAC con guiones"
- "Separadores mezclados se rechazan"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_mac_addresses.py"
---
## Ejemplo
```python
extract_mac_addresses("router 00:1A:2B:3C:4D:5E and AA-BB-CC-DD-EE-FF")
# [{"value": "00:1A:2B:3C:4D:5E", ..., "type": "mac_address"},
# {"value": "AA-BB-CC-DD-EE-FF", ..., "type": "mac_address"}]
```
## Notas
Cada direccion debe usar un solo separador (todos `:` o todos `-`). No se valida OUI ni se distingue unicast/multicast. Para extraer la parte de fabricante OUI: tomar los primeros 6 hex chars del `value` y consultar registro IEEE.
@@ -0,0 +1,31 @@
"""Extrae direcciones MAC de un texto, con offsets."""
import re
_MAC_REGEX = re.compile(
r"(?<![A-Fa-f0-9:-])"
r"(?:[A-Fa-f0-9]{2}[:-]){5}[A-Fa-f0-9]{2}"
r"(?![A-Fa-f0-9:-])"
)
def extract_mac_addresses(text: str) -> list[dict]:
"""Extrae MAC addresses en formato `xx:xx:xx:xx:xx:xx` o con guiones.
Ambos separadores deben ser uniformes (no mezcla `:` y `-` en una
misma direccion — se aceptan independientemente). Insensible a
mayusculas.
"""
results = []
for m in _MAC_REGEX.finditer(text):
candidate = m.group(0)
# Asegurar separador uniforme.
if ":" in candidate and "-" in candidate:
continue
results.append({
"value": candidate,
"start": m.start(),
"end": m.end(),
"type": "mac_address",
})
return results
@@ -0,0 +1,40 @@
---
name: extract_phone_numbers
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_phone_numbers(text: str) -> list[dict]"
description: "Extrae numeros de telefono en formato E.164 (`+CC...`) y formato local ES (9 digitos empezando por 6/7/8/9), con offsets. Permite separadores `space` y `-` entre grupos."
tags: [ioc, phone, e164, spain, regex, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
params:
- name: text
desc: "string de texto del que extraer telefonos"
output: "lista de dicts con {value, start, end, type='phone_number'}"
tested: true
tests:
- "Numero E.164 con espacios"
- "Numero local ES de 9 digitos"
- "Numero demasiado corto se descarta"
test_file_path: "python/functions/cybersecurity/tests/test_extract_iocs.py"
file_path: "python/functions/cybersecurity/extract_phone_numbers.py"
---
## Ejemplo
```python
extract_phone_numbers("Llamar al +34 612 345 678 o al 912345678")
# [{"value": "+34 612 345 678", "start": 10, "end": 25, "type": "phone_number"},
# {"value": "912345678", "start": 31, "end": 40, "type": "phone_number"}]
```
## Notas
E.164 (ITU-T): entre 8 y 15 digitos tras el `+`. ES local: 9 digitos exactos, primero ∈ {6,7,8,9}. No se discrimina entre movil y fijo. No se normaliza el formato — el caller decide. Para parseo robusto multi-pais usar `phonenumbers` (libpostal-style), pero esa dependencia no es necesaria para extraer candidatos como IoC.
@@ -0,0 +1,63 @@
"""Extrae numeros de telefono (E.164 + formatos comunes ES/EU) con offsets."""
import re
# E.164: + seguido de 8 a 15 digitos, opcionalmente con espacios/guiones internos.
_E164_REGEX = re.compile(
r"(?<![A-Za-z0-9])"
r"\+\d{1,3}[\s\-]?\d{1,4}(?:[\s\-]?\d{1,4}){1,4}"
r"(?![A-Za-z0-9])"
)
# ES: 9 digitos empezando por 6, 7, 8 o 9 (movil/fijo).
_ES_LOCAL_REGEX = re.compile(
r"(?<![A-Za-z0-9+])"
r"[6789]\d{2}[\s\-]?\d{3}[\s\-]?\d{3}"
r"(?![A-Za-z0-9])"
)
def extract_phone_numbers(text: str) -> list[dict]:
"""Extrae numeros de telefono E.164 y formato local ES de 9 digitos.
Acepta separadores `space`, `-` entre grupos. E.164 requiere `+` y
entre 8 y 15 digitos (ITU-T). Formato local ES son 9 digitos que
empiezan por 6/7/8/9. Tras quitar separadores se valida la longitud
minima.
"""
seen_spans = set()
results = []
for m in _E164_REGEX.finditer(text):
candidate = m.group(0)
digits = re.sub(r"[^0-9]", "", candidate)
if not (8 <= len(digits) <= 15):
continue
span = (m.start(), m.end())
if span in seen_spans:
continue
seen_spans.add(span)
results.append({
"value": candidate,
"start": m.start(),
"end": m.end(),
"type": "phone_number",
})
for m in _ES_LOCAL_REGEX.finditer(text):
candidate = m.group(0)
digits = re.sub(r"[^0-9]", "", candidate)
if len(digits) != 9:
continue
span = (m.start(), m.end())
if span in seen_spans:
continue
seen_spans.add(span)
results.append({
"value": candidate,
"start": m.start(),
"end": m.end(),
"type": "phone_number",
})
results.sort(key=lambda r: r["start"])
return results