Files
fn_registry/python/functions/cybersecurity/cybersecurity.py
T
egutierrez 95959f713c feat: funciones Python para core, cybersecurity, datascience y finance
Agrega funciones Python reutilizables organizadas por dominio:
- core: composicion funcional (pipe, compose, map, filter, reduce, etc.)
- cybersecurity: analisis de amenazas y puertos
- datascience: estadisticas y deteccion de outliers
- finance: indicadores tecnicos y analisis financiero
2026-03-29 00:13:50 +01:00

168 lines
5.0 KiB
Python

"""Cybersecurity pure functions: hashing, parsing, and security utilities."""
import hashlib
import math
import re
import base64
from collections import Counter
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
def hash_sha256(data: bytes) -> str:
"""Calcula el hash SHA-256 de datos binarios. Retorna hex digest."""
return hashlib.sha256(data).hexdigest()
def hash_md5(data: bytes) -> str:
"""Calcula el hash MD5 de datos binarios. Retorna hex digest."""
return hashlib.md5(data).hexdigest()
def entropy_shannon(data: bytes) -> float:
"""Calcula la entropia de Shannon de datos binarios (0-8 bits por byte).
Entropia alta (>7.5) sugiere datos cifrados o comprimidos.
Entropia baja (<3) sugiere datos estructurados o repetitivos.
"""
if not data:
return 0.0
length = len(data)
counts = Counter(data)
entropy = 0.0
for count in counts.values():
p = count / length
if p > 0:
entropy -= p * math.log2(p)
return entropy
_SQL_INJECTION_PATTERNS = [
(r"('\s*OR\s+'[^']*'\s*=\s*'[^']*'?)", "string_tautology"),
(r"('\s*(OR|AND)\s+'?\d+\s*=\s*\d+)", "tautology"),
(r"(;\s*(DROP|DELETE|UPDATE|INSERT)\b)", "stacked_query"),
(r"(UNION\s+(ALL\s+)?SELECT)", "union_select"),
(r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|EXEC)\b\s)", "sql_keyword"),
(r"(--\s*$|/\*|\*/)", "comment_injection"),
(r"(BENCHMARK\s*\(|SLEEP\s*\(|WAITFOR\s+DELAY)", "time_based"),
(r"(CHAR\s*\(\s*\d+)", "char_function"),
(r"(CONCAT\s*\()", "concat_function"),
(r"(0x[0-9a-fA-F]{4,})", "hex_literal"),
]
def detect_sql_injection(input_str: str) -> tuple:
"""Detecta patrones de SQL injection en un string.
Retorna (is_threat, pattern) donde pattern es el nombre del patron
detectado o cadena vacia si no hay amenaza.
"""
for pattern, name in _SQL_INJECTION_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
return (True, name)
return (False, "")
_URL_REGEX = re.compile(
r"https?://[^\s<>\"'\)\]]+",
re.IGNORECASE,
)
def extract_urls(text: str) -> list:
"""Extrae todas las URLs (http/https) de un texto."""
return _URL_REGEX.findall(text)
def is_base64(s: str) -> bool:
"""Verifica si un string es base64 valido.
Acepta base64 estandar y URL-safe. Requiere al menos 4 caracteres.
"""
if len(s) < 4:
return False
b64_pattern = re.compile(r"^[A-Za-z0-9+/\-_]*={0,2}$")
if not b64_pattern.match(s):
return False
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) > 0
except Exception:
try:
decoded = base64.urlsafe_b64decode(s)
return len(decoded) > 0
except Exception:
return False
def is_hex(s: str) -> bool:
"""Verifica si un string es hexadecimal valido.
Acepta con o sin prefijo 0x. Requiere al menos 2 caracteres (sin prefijo).
"""
clean = s.strip()
if clean.startswith(("0x", "0X")):
clean = clean[2:]
if len(clean) < 2:
return False
return bool(re.fullmatch(r"[0-9a-fA-F]+", clean))
def levenshtein_distance(a: str, b: str) -> int:
"""Calcula la distancia de Levenshtein (edit distance) entre dos strings.
Util para deteccion de typosquatting en dominios y fuzzy matching.
"""
if len(a) < len(b):
return levenshtein_distance(b, a)
if len(b) == 0:
return len(a)
prev_row = list(range(len(b) + 1))
for i, ca in enumerate(a):
curr_row = [i + 1]
for j, cb in enumerate(b):
cost = 0 if ca == cb else 1
curr_row.append(
min(
curr_row[j] + 1, # insert
prev_row[j + 1] + 1, # delete
prev_row[j] + cost, # substitute
)
)
prev_row = curr_row
return prev_row[-1]
def jaccard_similarity(a: list, b: list) -> float:
"""Calcula el coeficiente de similitud de Jaccard entre dos listas.
J(A,B) = |A interseccion B| / |A union B|. Retorna 0.0 si ambas vacias.
Util para comparar conjuntos de tokens, features, o IoCs.
"""
set_a = set(a)
set_b = set(b)
if not set_a and not set_b:
return 0.0
intersection = set_a & set_b
union = set_a | set_b
return len(intersection) / len(union)
def normalize_url(raw_url: str) -> str:
"""Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros.
Util para deduplicacion de URLs y comparacion de IoCs.
"""
parsed = urlparse(raw_url)
scheme = parsed.scheme.lower() or "http"
netloc = parsed.netloc.lower()
path = parsed.path or "/"
# Remove trailing slash except for root
if path != "/" and path.endswith("/"):
path = path.rstrip("/")
# Sort query parameters
params = parse_qs(parsed.query, keep_blank_values=True)
sorted_query = urlencode(sorted(params.items()), doseq=True)
# Drop fragment
return urlunparse((scheme, netloc, path, parsed.params, sorted_query, ""))