837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
315 lines
10 KiB
Python
315 lines
10 KiB
Python
"""Cybersecurity pure functions: hashing, parsing, and security utilities."""
|
|
|
|
import hashlib
|
|
import math
|
|
import re
|
|
import base64
|
|
import secrets
|
|
import struct
|
|
from collections import Counter
|
|
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
|
|
def hash_sha256(data: bytes) -> str:
|
|
"""Calcula el hash SHA-256 de datos binarios. Retorna hex digest."""
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def hash_md5(data: bytes) -> str:
|
|
"""Calcula el hash MD5 de datos binarios. Retorna hex digest."""
|
|
return hashlib.md5(data).hexdigest()
|
|
|
|
|
|
def entropy_shannon(data: bytes) -> float:
|
|
"""Calcula la entropia de Shannon de datos binarios (0-8 bits por byte).
|
|
|
|
Entropia alta (>7.5) sugiere datos cifrados o comprimidos.
|
|
Entropia baja (<3) sugiere datos estructurados o repetitivos.
|
|
"""
|
|
if not data:
|
|
return 0.0
|
|
length = len(data)
|
|
counts = Counter(data)
|
|
entropy = 0.0
|
|
for count in counts.values():
|
|
p = count / length
|
|
if p > 0:
|
|
entropy -= p * math.log2(p)
|
|
return entropy
|
|
|
|
|
|
_SQL_INJECTION_PATTERNS = [
|
|
(r"('\s*OR\s+'[^']*'\s*=\s*'[^']*'?)", "string_tautology"),
|
|
(r"('\s*(OR|AND)\s+'?\d+\s*=\s*\d+)", "tautology"),
|
|
(r"(;\s*(DROP|DELETE|UPDATE|INSERT)\b)", "stacked_query"),
|
|
(r"(UNION\s+(ALL\s+)?SELECT)", "union_select"),
|
|
(r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|EXEC)\b\s)", "sql_keyword"),
|
|
(r"(--\s*$|/\*|\*/)", "comment_injection"),
|
|
(r"(BENCHMARK\s*\(|SLEEP\s*\(|WAITFOR\s+DELAY)", "time_based"),
|
|
(r"(CHAR\s*\(\s*\d+)", "char_function"),
|
|
(r"(CONCAT\s*\()", "concat_function"),
|
|
(r"(0x[0-9a-fA-F]{4,})", "hex_literal"),
|
|
]
|
|
|
|
|
|
def detect_sql_injection(input_str: str) -> tuple:
|
|
"""Detecta patrones de SQL injection en un string.
|
|
|
|
Retorna (is_threat, pattern) donde pattern es el nombre del patron
|
|
detectado o cadena vacia si no hay amenaza.
|
|
"""
|
|
for pattern, name in _SQL_INJECTION_PATTERNS:
|
|
if re.search(pattern, input_str, re.IGNORECASE):
|
|
return (True, name)
|
|
return (False, "")
|
|
|
|
|
|
_URL_REGEX = re.compile(
|
|
r"https?://[^\s<>\"'\)\]]+",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def extract_urls(text: str) -> list:
|
|
"""Extrae todas las URLs (http/https) de un texto."""
|
|
return _URL_REGEX.findall(text)
|
|
|
|
|
|
def is_base64(s: str) -> bool:
|
|
"""Verifica si un string es base64 valido.
|
|
|
|
Acepta base64 estandar y URL-safe. Requiere al menos 4 caracteres.
|
|
"""
|
|
if len(s) < 4:
|
|
return False
|
|
b64_pattern = re.compile(r"^[A-Za-z0-9+/\-_]*={0,2}$")
|
|
if not b64_pattern.match(s):
|
|
return False
|
|
try:
|
|
decoded = base64.b64decode(s, validate=True)
|
|
return len(decoded) > 0
|
|
except Exception:
|
|
try:
|
|
decoded = base64.urlsafe_b64decode(s)
|
|
return len(decoded) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def is_hex(s: str) -> bool:
|
|
"""Verifica si un string es hexadecimal valido.
|
|
|
|
Acepta con o sin prefijo 0x. Requiere al menos 2 caracteres (sin prefijo).
|
|
"""
|
|
clean = s.strip()
|
|
if clean.startswith(("0x", "0X")):
|
|
clean = clean[2:]
|
|
if len(clean) < 2:
|
|
return False
|
|
return bool(re.fullmatch(r"[0-9a-fA-F]+", clean))
|
|
|
|
|
|
def levenshtein_distance(a: str, b: str) -> int:
|
|
"""Calcula la distancia de Levenshtein (edit distance) entre dos strings.
|
|
|
|
Util para deteccion de typosquatting en dominios y fuzzy matching.
|
|
"""
|
|
if len(a) < len(b):
|
|
return levenshtein_distance(b, a)
|
|
if len(b) == 0:
|
|
return len(a)
|
|
|
|
prev_row = list(range(len(b) + 1))
|
|
for i, ca in enumerate(a):
|
|
curr_row = [i + 1]
|
|
for j, cb in enumerate(b):
|
|
cost = 0 if ca == cb else 1
|
|
curr_row.append(
|
|
min(
|
|
curr_row[j] + 1, # insert
|
|
prev_row[j + 1] + 1, # delete
|
|
prev_row[j] + cost, # substitute
|
|
)
|
|
)
|
|
prev_row = curr_row
|
|
return prev_row[-1]
|
|
|
|
|
|
def jaccard_similarity(a: list, b: list) -> float:
|
|
"""Calcula el coeficiente de similitud de Jaccard entre dos listas.
|
|
|
|
J(A,B) = |A interseccion B| / |A union B|. Retorna 0.0 si ambas vacias.
|
|
Util para comparar conjuntos de tokens, features, o IoCs.
|
|
"""
|
|
set_a = set(a)
|
|
set_b = set(b)
|
|
if not set_a and not set_b:
|
|
return 0.0
|
|
intersection = set_a & set_b
|
|
union = set_a | set_b
|
|
return len(intersection) / len(union)
|
|
|
|
|
|
def normalize_url(raw_url: str) -> str:
|
|
"""Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros.
|
|
|
|
Util para deduplicacion de URLs y comparacion de IoCs.
|
|
"""
|
|
parsed = urlparse(raw_url)
|
|
scheme = parsed.scheme.lower() or "http"
|
|
netloc = parsed.netloc.lower()
|
|
path = parsed.path or "/"
|
|
# Remove trailing slash except for root
|
|
if path != "/" and path.endswith("/"):
|
|
path = path.rstrip("/")
|
|
# Sort query parameters
|
|
params = parse_qs(parsed.query, keep_blank_values=True)
|
|
sorted_query = urlencode(sorted(params.items()), doseq=True)
|
|
# Drop fragment
|
|
return urlunparse((scheme, netloc, path, parsed.params, sorted_query, ""))
|
|
|
|
|
|
# --- Envelope Encryption (AES-256-GCM) ---
|
|
|
|
_ENVELOPE_MAGIC = b"OVE1"
|
|
_ENVELOPE_VERSION = 0x01
|
|
_HEADER_SIZE = 12 # magic(4) + version(1) + reserved(1) + efk_len(2) + kiv_len(2) + div_len(2)
|
|
|
|
|
|
def _build_envelope(
|
|
encrypted_file_key: bytes,
|
|
key_iv: bytes,
|
|
data_iv: bytes,
|
|
encrypted_content: bytes,
|
|
) -> bytes:
|
|
"""Construye el formato binario del envelope (helper puro interno).
|
|
|
|
Header (12 bytes):
|
|
Magic (4B): b"OVE1"
|
|
Version (1B): 0x01
|
|
Reserved (1B): 0x00
|
|
EFK_len (2B): longitud de encrypted_file_key (big-endian)
|
|
KIV_len (2B): longitud de key_iv (big-endian)
|
|
DIV_len (2B): longitud de data_iv (big-endian)
|
|
Seguido de: encrypted_file_key + key_iv + data_iv + encrypted_content
|
|
"""
|
|
header = (
|
|
_ENVELOPE_MAGIC
|
|
+ struct.pack(">BBHHH", _ENVELOPE_VERSION, 0x00,
|
|
len(encrypted_file_key), len(key_iv), len(data_iv))
|
|
)
|
|
return header + encrypted_file_key + key_iv + data_iv + encrypted_content
|
|
|
|
|
|
def _parse_envelope(ciphertext: bytes) -> tuple:
|
|
"""Parsea el envelope binario y retorna sus componentes (helper puro interno).
|
|
|
|
Returns:
|
|
(encrypted_file_key, key_iv, data_iv, encrypted_content)
|
|
|
|
Raises:
|
|
ValueError: si el envelope esta truncado o la version no es soportada.
|
|
"""
|
|
if len(ciphertext) < _HEADER_SIZE:
|
|
raise ValueError(
|
|
f"Envelope truncado: se esperaban al menos {_HEADER_SIZE} bytes, "
|
|
f"se recibieron {len(ciphertext)}"
|
|
)
|
|
|
|
magic = ciphertext[:4]
|
|
if magic != _ENVELOPE_MAGIC:
|
|
raise ValueError(f"Magic invalido: se esperaba {_ENVELOPE_MAGIC!r}, se obtuvo {magic!r}")
|
|
|
|
version, _reserved, efk_len, kiv_len, div_len = struct.unpack(">BBHHH", ciphertext[4:12])
|
|
|
|
if version != _ENVELOPE_VERSION:
|
|
raise ValueError(f"Version de envelope no soportada: {version}")
|
|
|
|
offset = _HEADER_SIZE
|
|
encrypted_file_key = ciphertext[offset : offset + efk_len]
|
|
offset += efk_len
|
|
key_iv = ciphertext[offset : offset + kiv_len]
|
|
offset += kiv_len
|
|
data_iv = ciphertext[offset : offset + div_len]
|
|
offset += div_len
|
|
encrypted_content = ciphertext[offset:]
|
|
|
|
if (
|
|
len(encrypted_file_key) != efk_len
|
|
or len(key_iv) != kiv_len
|
|
or len(data_iv) != div_len
|
|
):
|
|
raise ValueError("Envelope truncado: longitudes declaradas exceden los datos disponibles")
|
|
|
|
return encrypted_file_key, key_iv, data_iv, encrypted_content
|
|
|
|
|
|
def envelope_encrypt(plaintext: bytes, master_key: bytes) -> bytes:
|
|
"""Cifra datos usando patron Envelope Encryption con AES-256-GCM.
|
|
|
|
Genera una file key aleatoria de 32 bytes, cifra los datos con ella,
|
|
luego cifra la file key con la master_key. El resultado es un envelope
|
|
binario que contiene todo lo necesario para descifrar con la master_key.
|
|
|
|
Args:
|
|
plaintext: Datos a cifrar (puede ser vacio).
|
|
master_key: Clave maestra de 32 bytes (AES-256).
|
|
|
|
Returns:
|
|
Envelope binario cifrado.
|
|
|
|
Raises:
|
|
Exception: Si ocurre un error en el cifrado (clave de longitud incorrecta, etc.).
|
|
"""
|
|
# 1. Generar file_key aleatoria (DEK: Data Encryption Key)
|
|
file_key = secrets.token_bytes(32)
|
|
|
|
# 2. Cifrar contenido con la file_key
|
|
data_iv = secrets.token_bytes(12)
|
|
aesgcm_data = AESGCM(file_key)
|
|
encrypted_content = aesgcm_data.encrypt(data_iv, plaintext, None)
|
|
|
|
# 3. Cifrar file_key con la master_key (KEK: Key Encryption Key)
|
|
key_iv = secrets.token_bytes(12)
|
|
aesgcm_key = AESGCM(master_key)
|
|
encrypted_file_key = aesgcm_key.encrypt(key_iv, file_key, None)
|
|
|
|
# 4. Construir envelope
|
|
return _build_envelope(encrypted_file_key, key_iv, data_iv, encrypted_content)
|
|
|
|
|
|
def envelope_decrypt(ciphertext: bytes, master_key: bytes) -> bytes:
|
|
"""Descifra datos cifrados con envelope_encrypt.
|
|
|
|
Si los datos no empiezan con el magic b"OVE1", se asume que no estan
|
|
cifrados y se retornan tal cual (comportamiento passthrough). Esto
|
|
permite usar la funcion en archivos que pueden o no estar cifrados.
|
|
|
|
Args:
|
|
ciphertext: Envelope cifrado (o datos en plano si no tienen magic).
|
|
master_key: Clave maestra de 32 bytes (AES-256).
|
|
|
|
Returns:
|
|
Datos descifrados, o ciphertext sin modificar si no tiene magic.
|
|
|
|
Raises:
|
|
ValueError: Si el envelope esta corrupto o truncado.
|
|
cryptography.exceptions.InvalidTag: Si la master_key es incorrecta
|
|
o los datos fueron manipulados (falla de autenticacion GCM).
|
|
"""
|
|
# Passthrough: si no comienza con magic, asumir que no esta cifrado
|
|
if not ciphertext.startswith(_ENVELOPE_MAGIC):
|
|
return ciphertext
|
|
|
|
# Parsear envelope
|
|
encrypted_file_key, key_iv, data_iv, encrypted_content = _parse_envelope(ciphertext)
|
|
|
|
# Descifrar file_key con master_key
|
|
aesgcm_key = AESGCM(master_key)
|
|
file_key = aesgcm_key.decrypt(key_iv, encrypted_file_key, None)
|
|
|
|
# Descifrar contenido con file_key
|
|
aesgcm_data = AESGCM(file_key)
|
|
return aesgcm_data.decrypt(data_iv, encrypted_content, None)
|