"""Cybersecurity pure functions: hashing, parsing, and security utilities.""" import hashlib import math import re import base64 import secrets import struct from collections import Counter from urllib.parse import urlparse, urlunparse, parse_qs, urlencode from cryptography.hazmat.primitives.ciphers.aead import AESGCM def hash_sha256(data: bytes) -> str: """Calcula el hash SHA-256 de datos binarios. Retorna hex digest.""" return hashlib.sha256(data).hexdigest() def hash_md5(data: bytes) -> str: """Calcula el hash MD5 de datos binarios. Retorna hex digest.""" return hashlib.md5(data).hexdigest() def entropy_shannon(data: bytes) -> float: """Calcula la entropia de Shannon de datos binarios (0-8 bits por byte). Entropia alta (>7.5) sugiere datos cifrados o comprimidos. Entropia baja (<3) sugiere datos estructurados o repetitivos. """ if not data: return 0.0 length = len(data) counts = Counter(data) entropy = 0.0 for count in counts.values(): p = count / length if p > 0: entropy -= p * math.log2(p) return entropy _SQL_INJECTION_PATTERNS = [ (r"('\s*OR\s+'[^']*'\s*=\s*'[^']*'?)", "string_tautology"), (r"('\s*(OR|AND)\s+'?\d+\s*=\s*\d+)", "tautology"), (r"(;\s*(DROP|DELETE|UPDATE|INSERT)\b)", "stacked_query"), (r"(UNION\s+(ALL\s+)?SELECT)", "union_select"), (r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|EXEC)\b\s)", "sql_keyword"), (r"(--\s*$|/\*|\*/)", "comment_injection"), (r"(BENCHMARK\s*\(|SLEEP\s*\(|WAITFOR\s+DELAY)", "time_based"), (r"(CHAR\s*\(\s*\d+)", "char_function"), (r"(CONCAT\s*\()", "concat_function"), (r"(0x[0-9a-fA-F]{4,})", "hex_literal"), ] def detect_sql_injection(input_str: str) -> tuple: """Detecta patrones de SQL injection en un string. Retorna (is_threat, pattern) donde pattern es el nombre del patron detectado o cadena vacia si no hay amenaza. """ for pattern, name in _SQL_INJECTION_PATTERNS: if re.search(pattern, input_str, re.IGNORECASE): return (True, name) return (False, "") _URL_REGEX = re.compile( r"https?://[^\s<>\"'\)\]]+", re.IGNORECASE, ) def extract_urls(text: str) -> list: """Extrae todas las URLs (http/https) de un texto.""" return _URL_REGEX.findall(text) def is_base64(s: str) -> bool: """Verifica si un string es base64 valido. Acepta base64 estandar y URL-safe. Requiere al menos 4 caracteres. """ if len(s) < 4: return False b64_pattern = re.compile(r"^[A-Za-z0-9+/\-_]*={0,2}$") if not b64_pattern.match(s): return False try: decoded = base64.b64decode(s, validate=True) return len(decoded) > 0 except Exception: try: decoded = base64.urlsafe_b64decode(s) return len(decoded) > 0 except Exception: return False def is_hex(s: str) -> bool: """Verifica si un string es hexadecimal valido. Acepta con o sin prefijo 0x. Requiere al menos 2 caracteres (sin prefijo). """ clean = s.strip() if clean.startswith(("0x", "0X")): clean = clean[2:] if len(clean) < 2: return False return bool(re.fullmatch(r"[0-9a-fA-F]+", clean)) def levenshtein_distance(a: str, b: str) -> int: """Calcula la distancia de Levenshtein (edit distance) entre dos strings. Util para deteccion de typosquatting en dominios y fuzzy matching. """ if len(a) < len(b): return levenshtein_distance(b, a) if len(b) == 0: return len(a) prev_row = list(range(len(b) + 1)) for i, ca in enumerate(a): curr_row = [i + 1] for j, cb in enumerate(b): cost = 0 if ca == cb else 1 curr_row.append( min( curr_row[j] + 1, # insert prev_row[j + 1] + 1, # delete prev_row[j] + cost, # substitute ) ) prev_row = curr_row return prev_row[-1] def jaccard_similarity(a: list, b: list) -> float: """Calcula el coeficiente de similitud de Jaccard entre dos listas. J(A,B) = |A interseccion B| / |A union B|. Retorna 0.0 si ambas vacias. Util para comparar conjuntos de tokens, features, o IoCs. """ set_a = set(a) set_b = set(b) if not set_a and not set_b: return 0.0 intersection = set_a & set_b union = set_a | set_b return len(intersection) / len(union) def normalize_url(raw_url: str) -> str: """Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros. Util para deduplicacion de URLs y comparacion de IoCs. """ parsed = urlparse(raw_url) scheme = parsed.scheme.lower() or "http" netloc = parsed.netloc.lower() path = parsed.path or "/" # Remove trailing slash except for root if path != "/" and path.endswith("/"): path = path.rstrip("/") # Sort query parameters params = parse_qs(parsed.query, keep_blank_values=True) sorted_query = urlencode(sorted(params.items()), doseq=True) # Drop fragment return urlunparse((scheme, netloc, path, parsed.params, sorted_query, "")) # --- Envelope Encryption (AES-256-GCM) --- _ENVELOPE_MAGIC = b"OVE1" _ENVELOPE_VERSION = 0x01 _HEADER_SIZE = 12 # magic(4) + version(1) + reserved(1) + efk_len(2) + kiv_len(2) + div_len(2) def _build_envelope( encrypted_file_key: bytes, key_iv: bytes, data_iv: bytes, encrypted_content: bytes, ) -> bytes: """Construye el formato binario del envelope (helper puro interno). Header (12 bytes): Magic (4B): b"OVE1" Version (1B): 0x01 Reserved (1B): 0x00 EFK_len (2B): longitud de encrypted_file_key (big-endian) KIV_len (2B): longitud de key_iv (big-endian) DIV_len (2B): longitud de data_iv (big-endian) Seguido de: encrypted_file_key + key_iv + data_iv + encrypted_content """ header = ( _ENVELOPE_MAGIC + struct.pack(">BBHHH", _ENVELOPE_VERSION, 0x00, len(encrypted_file_key), len(key_iv), len(data_iv)) ) return header + encrypted_file_key + key_iv + data_iv + encrypted_content def _parse_envelope(ciphertext: bytes) -> tuple: """Parsea el envelope binario y retorna sus componentes (helper puro interno). Returns: (encrypted_file_key, key_iv, data_iv, encrypted_content) Raises: ValueError: si el envelope esta truncado o la version no es soportada. """ if len(ciphertext) < _HEADER_SIZE: raise ValueError( f"Envelope truncado: se esperaban al menos {_HEADER_SIZE} bytes, " f"se recibieron {len(ciphertext)}" ) magic = ciphertext[:4] if magic != _ENVELOPE_MAGIC: raise ValueError(f"Magic invalido: se esperaba {_ENVELOPE_MAGIC!r}, se obtuvo {magic!r}") version, _reserved, efk_len, kiv_len, div_len = struct.unpack(">BBHHH", ciphertext[4:12]) if version != _ENVELOPE_VERSION: raise ValueError(f"Version de envelope no soportada: {version}") offset = _HEADER_SIZE encrypted_file_key = ciphertext[offset : offset + efk_len] offset += efk_len key_iv = ciphertext[offset : offset + kiv_len] offset += kiv_len data_iv = ciphertext[offset : offset + div_len] offset += div_len encrypted_content = ciphertext[offset:] if ( len(encrypted_file_key) != efk_len or len(key_iv) != kiv_len or len(data_iv) != div_len ): raise ValueError("Envelope truncado: longitudes declaradas exceden los datos disponibles") return encrypted_file_key, key_iv, data_iv, encrypted_content def envelope_encrypt(plaintext: bytes, master_key: bytes) -> bytes: """Cifra datos usando patron Envelope Encryption con AES-256-GCM. Genera una file key aleatoria de 32 bytes, cifra los datos con ella, luego cifra la file key con la master_key. El resultado es un envelope binario que contiene todo lo necesario para descifrar con la master_key. Args: plaintext: Datos a cifrar (puede ser vacio). master_key: Clave maestra de 32 bytes (AES-256). Returns: Envelope binario cifrado. Raises: Exception: Si ocurre un error en el cifrado (clave de longitud incorrecta, etc.). """ # 1. Generar file_key aleatoria (DEK: Data Encryption Key) file_key = secrets.token_bytes(32) # 2. Cifrar contenido con la file_key data_iv = secrets.token_bytes(12) aesgcm_data = AESGCM(file_key) encrypted_content = aesgcm_data.encrypt(data_iv, plaintext, None) # 3. Cifrar file_key con la master_key (KEK: Key Encryption Key) key_iv = secrets.token_bytes(12) aesgcm_key = AESGCM(master_key) encrypted_file_key = aesgcm_key.encrypt(key_iv, file_key, None) # 4. Construir envelope return _build_envelope(encrypted_file_key, key_iv, data_iv, encrypted_content) def envelope_decrypt(ciphertext: bytes, master_key: bytes) -> bytes: """Descifra datos cifrados con envelope_encrypt. Si los datos no empiezan con el magic b"OVE1", se asume que no estan cifrados y se retornan tal cual (comportamiento passthrough). Esto permite usar la funcion en archivos que pueden o no estar cifrados. Args: ciphertext: Envelope cifrado (o datos en plano si no tienen magic). master_key: Clave maestra de 32 bytes (AES-256). Returns: Datos descifrados, o ciphertext sin modificar si no tiene magic. Raises: ValueError: Si el envelope esta corrupto o truncado. cryptography.exceptions.InvalidTag: Si la master_key es incorrecta o los datos fueron manipulados (falla de autenticacion GCM). """ # Passthrough: si no comienza con magic, asumir que no esta cifrado if not ciphertext.startswith(_ENVELOPE_MAGIC): return ciphertext # Parsear envelope encrypted_file_key, key_iv, data_iv, encrypted_content = _parse_envelope(ciphertext) # Descifrar file_key con master_key aesgcm_key = AESGCM(master_key) file_key = aesgcm_key.decrypt(key_iv, encrypted_file_key, None) # Descifrar contenido con file_key aesgcm_data = AESGCM(file_key) return aesgcm_data.decrypt(data_iv, encrypted_content, None)