feat: funciones Python para core, cybersecurity, datascience y finance
Agrega funciones Python reutilizables organizadas por dominio: - core: composicion funcional (pipe, compose, map, filter, reduce, etc.) - cybersecurity: analisis de amenazas y puertos - datascience: estadisticas y deteccion de outliers - finance: indicadores tecnicos y analisis financiero
This commit is contained in:
@@ -0,0 +1,25 @@
|
||||
from .cybersecurity import (
|
||||
hash_sha256,
|
||||
hash_md5,
|
||||
entropy_shannon,
|
||||
detect_sql_injection,
|
||||
extract_urls,
|
||||
is_base64,
|
||||
is_hex,
|
||||
levenshtein_distance,
|
||||
jaccard_similarity,
|
||||
normalize_url,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"hash_sha256",
|
||||
"hash_md5",
|
||||
"entropy_shannon",
|
||||
"detect_sql_injection",
|
||||
"extract_urls",
|
||||
"is_base64",
|
||||
"is_hex",
|
||||
"levenshtein_distance",
|
||||
"jaccard_similarity",
|
||||
"normalize_url",
|
||||
]
|
||||
@@ -0,0 +1,167 @@
|
||||
"""Cybersecurity pure functions: hashing, parsing, and security utilities."""
|
||||
|
||||
import hashlib
|
||||
import math
|
||||
import re
|
||||
import base64
|
||||
from collections import Counter
|
||||
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
|
||||
|
||||
|
||||
def hash_sha256(data: bytes) -> str:
|
||||
"""Calcula el hash SHA-256 de datos binarios. Retorna hex digest."""
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def hash_md5(data: bytes) -> str:
|
||||
"""Calcula el hash MD5 de datos binarios. Retorna hex digest."""
|
||||
return hashlib.md5(data).hexdigest()
|
||||
|
||||
|
||||
def entropy_shannon(data: bytes) -> float:
|
||||
"""Calcula la entropia de Shannon de datos binarios (0-8 bits por byte).
|
||||
|
||||
Entropia alta (>7.5) sugiere datos cifrados o comprimidos.
|
||||
Entropia baja (<3) sugiere datos estructurados o repetitivos.
|
||||
"""
|
||||
if not data:
|
||||
return 0.0
|
||||
length = len(data)
|
||||
counts = Counter(data)
|
||||
entropy = 0.0
|
||||
for count in counts.values():
|
||||
p = count / length
|
||||
if p > 0:
|
||||
entropy -= p * math.log2(p)
|
||||
return entropy
|
||||
|
||||
|
||||
_SQL_INJECTION_PATTERNS = [
|
||||
(r"('\s*OR\s+'[^']*'\s*=\s*'[^']*'?)", "string_tautology"),
|
||||
(r"('\s*(OR|AND)\s+'?\d+\s*=\s*\d+)", "tautology"),
|
||||
(r"(;\s*(DROP|DELETE|UPDATE|INSERT)\b)", "stacked_query"),
|
||||
(r"(UNION\s+(ALL\s+)?SELECT)", "union_select"),
|
||||
(r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|EXEC)\b\s)", "sql_keyword"),
|
||||
(r"(--\s*$|/\*|\*/)", "comment_injection"),
|
||||
(r"(BENCHMARK\s*\(|SLEEP\s*\(|WAITFOR\s+DELAY)", "time_based"),
|
||||
(r"(CHAR\s*\(\s*\d+)", "char_function"),
|
||||
(r"(CONCAT\s*\()", "concat_function"),
|
||||
(r"(0x[0-9a-fA-F]{4,})", "hex_literal"),
|
||||
]
|
||||
|
||||
|
||||
def detect_sql_injection(input_str: str) -> tuple:
|
||||
"""Detecta patrones de SQL injection en un string.
|
||||
|
||||
Retorna (is_threat, pattern) donde pattern es el nombre del patron
|
||||
detectado o cadena vacia si no hay amenaza.
|
||||
"""
|
||||
for pattern, name in _SQL_INJECTION_PATTERNS:
|
||||
if re.search(pattern, input_str, re.IGNORECASE):
|
||||
return (True, name)
|
||||
return (False, "")
|
||||
|
||||
|
||||
_URL_REGEX = re.compile(
|
||||
r"https?://[^\s<>\"'\)\]]+",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def extract_urls(text: str) -> list:
|
||||
"""Extrae todas las URLs (http/https) de un texto."""
|
||||
return _URL_REGEX.findall(text)
|
||||
|
||||
|
||||
def is_base64(s: str) -> bool:
|
||||
"""Verifica si un string es base64 valido.
|
||||
|
||||
Acepta base64 estandar y URL-safe. Requiere al menos 4 caracteres.
|
||||
"""
|
||||
if len(s) < 4:
|
||||
return False
|
||||
b64_pattern = re.compile(r"^[A-Za-z0-9+/\-_]*={0,2}$")
|
||||
if not b64_pattern.match(s):
|
||||
return False
|
||||
try:
|
||||
decoded = base64.b64decode(s, validate=True)
|
||||
return len(decoded) > 0
|
||||
except Exception:
|
||||
try:
|
||||
decoded = base64.urlsafe_b64decode(s)
|
||||
return len(decoded) > 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def is_hex(s: str) -> bool:
|
||||
"""Verifica si un string es hexadecimal valido.
|
||||
|
||||
Acepta con o sin prefijo 0x. Requiere al menos 2 caracteres (sin prefijo).
|
||||
"""
|
||||
clean = s.strip()
|
||||
if clean.startswith(("0x", "0X")):
|
||||
clean = clean[2:]
|
||||
if len(clean) < 2:
|
||||
return False
|
||||
return bool(re.fullmatch(r"[0-9a-fA-F]+", clean))
|
||||
|
||||
|
||||
def levenshtein_distance(a: str, b: str) -> int:
|
||||
"""Calcula la distancia de Levenshtein (edit distance) entre dos strings.
|
||||
|
||||
Util para deteccion de typosquatting en dominios y fuzzy matching.
|
||||
"""
|
||||
if len(a) < len(b):
|
||||
return levenshtein_distance(b, a)
|
||||
if len(b) == 0:
|
||||
return len(a)
|
||||
|
||||
prev_row = list(range(len(b) + 1))
|
||||
for i, ca in enumerate(a):
|
||||
curr_row = [i + 1]
|
||||
for j, cb in enumerate(b):
|
||||
cost = 0 if ca == cb else 1
|
||||
curr_row.append(
|
||||
min(
|
||||
curr_row[j] + 1, # insert
|
||||
prev_row[j + 1] + 1, # delete
|
||||
prev_row[j] + cost, # substitute
|
||||
)
|
||||
)
|
||||
prev_row = curr_row
|
||||
return prev_row[-1]
|
||||
|
||||
|
||||
def jaccard_similarity(a: list, b: list) -> float:
|
||||
"""Calcula el coeficiente de similitud de Jaccard entre dos listas.
|
||||
|
||||
J(A,B) = |A interseccion B| / |A union B|. Retorna 0.0 si ambas vacias.
|
||||
Util para comparar conjuntos de tokens, features, o IoCs.
|
||||
"""
|
||||
set_a = set(a)
|
||||
set_b = set(b)
|
||||
if not set_a and not set_b:
|
||||
return 0.0
|
||||
intersection = set_a & set_b
|
||||
union = set_a | set_b
|
||||
return len(intersection) / len(union)
|
||||
|
||||
|
||||
def normalize_url(raw_url: str) -> str:
|
||||
"""Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros.
|
||||
|
||||
Util para deduplicacion de URLs y comparacion de IoCs.
|
||||
"""
|
||||
parsed = urlparse(raw_url)
|
||||
scheme = parsed.scheme.lower() or "http"
|
||||
netloc = parsed.netloc.lower()
|
||||
path = parsed.path or "/"
|
||||
# Remove trailing slash except for root
|
||||
if path != "/" and path.endswith("/"):
|
||||
path = path.rstrip("/")
|
||||
# Sort query parameters
|
||||
params = parse_qs(parsed.query, keep_blank_values=True)
|
||||
sorted_query = urlencode(sorted(params.items()), doseq=True)
|
||||
# Drop fragment
|
||||
return urlunparse((scheme, netloc, path, parsed.params, sorted_query, ""))
|
||||
@@ -0,0 +1,38 @@
|
||||
---
|
||||
name: detect_sql_injection
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def detect_sql_injection(input_str: str) -> tuple"
|
||||
description: "Detecta patrones de SQL injection en un string. Retorna (is_threat, pattern) con el nombre del patron detectado."
|
||||
tags: [sqli, injection, detection, security, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
detect_sql_injection("' OR '1'='1")
|
||||
# (True, "string_tautology")
|
||||
|
||||
detect_sql_injection("; DROP TABLE users")
|
||||
# (True, "stacked_query")
|
||||
|
||||
detect_sql_injection("hello world")
|
||||
# (False, "")
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Detecta 10 patrones: sql_keyword, tautology, stacked_query, comment_injection, string_tautology, union_select, hex_literal, char_function, concat_function, time_based. No reemplaza un WAF pero es util para logging y alertas tempranas.
|
||||
@@ -0,0 +1,41 @@
|
||||
---
|
||||
name: entropy_shannon
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def entropy_shannon(data: bytes) -> float"
|
||||
description: "Calcula la entropia de Shannon de datos binarios (0-8 bits por byte). Util para detectar datos cifrados o comprimidos."
|
||||
tags: [entropy, shannon, analysis, crypto, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [math, collections]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
# Datos aleatorios (alta entropia)
|
||||
entropy_shannon(bytes(range(256)))
|
||||
# ~8.0
|
||||
|
||||
# Datos repetitivos (baja entropia)
|
||||
entropy_shannon(b"aaaaaaaaaa")
|
||||
# 0.0
|
||||
|
||||
# Texto normal
|
||||
entropy_shannon(b"hello world")
|
||||
# ~2.84
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Entropia alta (>7.5) sugiere datos cifrados o comprimidos. Entropia baja (<3) sugiere datos estructurados o repetitivos. Retorna 0.0 para datos vacios.
|
||||
@@ -0,0 +1,35 @@
|
||||
---
|
||||
name: extract_urls
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def extract_urls(text: str) -> list"
|
||||
description: "Extrae todas las URLs (http/https) de un texto. Util para analisis de IoCs y threat intelligence."
|
||||
tags: [url, extract, parsing, ioc, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
extract_urls("Visit https://example.com and http://test.org/path?q=1")
|
||||
# ["https://example.com", "http://test.org/path?q=1"]
|
||||
|
||||
extract_urls("no urls here")
|
||||
# []
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Usa regex para extraer URLs con esquema http/https. No valida que las URLs sean alcanzables. Util para extraer indicadores de compromiso (IoCs) de logs, emails o reportes de threat intelligence.
|
||||
@@ -0,0 +1,32 @@
|
||||
---
|
||||
name: hash_md5
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def hash_md5(data: bytes) -> str"
|
||||
description: "Calcula el hash MD5 de datos binarios. Retorna hex digest."
|
||||
tags: [hash, md5, crypto, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [hashlib]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
h = hash_md5(b"hello")
|
||||
# "5d41402abc4b2a76b9719d911017c592"
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Usa hashlib de stdlib. MD5 no es seguro para propositos criptograficos pero es util para checksums, fingerprinting de archivos e identificacion rapida de IoCs.
|
||||
@@ -0,0 +1,32 @@
|
||||
---
|
||||
name: hash_sha256
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def hash_sha256(data: bytes) -> str"
|
||||
description: "Calcula el hash SHA-256 de datos binarios. Retorna hex digest."
|
||||
tags: [hash, sha256, crypto, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [hashlib]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
h = hash_sha256(b"hello")
|
||||
# "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Usa hashlib de stdlib. Funcion pura sin side effects. SHA-256 produce un digest de 64 caracteres hexadecimales (256 bits).
|
||||
@@ -0,0 +1,38 @@
|
||||
---
|
||||
name: is_base64
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def is_base64(s: str) -> bool"
|
||||
description: "Verifica si un string es base64 valido. Acepta base64 estandar y URL-safe. Requiere minimo 4 caracteres."
|
||||
tags: [base64, validation, encoding, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re, base64]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
is_base64("aGVsbG8=")
|
||||
# True
|
||||
|
||||
is_base64("not!valid")
|
||||
# False
|
||||
|
||||
is_base64("ab")
|
||||
# False (menos de 4 caracteres)
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Verifica tanto el formato (regex) como que el decode sea exitoso. Util para detectar datos codificados en payloads sospechosos, headers HTTP o parametros de URL.
|
||||
@@ -0,0 +1,41 @@
|
||||
---
|
||||
name: is_hex
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def is_hex(s: str) -> bool"
|
||||
description: "Verifica si un string es hexadecimal valido. Acepta con o sin prefijo 0x. Requiere minimo 2 caracteres."
|
||||
tags: [hex, validation, encoding, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [re]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
is_hex("4a6f686e")
|
||||
# True
|
||||
|
||||
is_hex("0x4a6f686e")
|
||||
# True
|
||||
|
||||
is_hex("xyz")
|
||||
# False
|
||||
|
||||
is_hex("a")
|
||||
# False (menos de 2 caracteres)
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Util para validar hashes, direcciones de memoria, shellcode y otros datos hexadecimales en contexto de seguridad.
|
||||
@@ -0,0 +1,38 @@
|
||||
---
|
||||
name: jaccard_similarity
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def jaccard_similarity(a: list, b: list) -> float"
|
||||
description: "Calcula el coeficiente de similitud de Jaccard entre dos listas. J(A,B) = |A interseccion B| / |A union B|."
|
||||
tags: [jaccard, similarity, comparison, sets, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: []
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
jaccard_similarity(["a", "b", "c"], ["b", "c", "d"])
|
||||
# 0.5
|
||||
|
||||
jaccard_similarity(["a", "b"], ["a", "b"])
|
||||
# 1.0
|
||||
|
||||
jaccard_similarity([], [])
|
||||
# 0.0
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Convierte las listas a sets internamente. Retorna 0.0 si ambas listas son vacias. Util para comparar conjuntos de tokens, features de malware, IoCs compartidos entre muestras, o tags de vulnerabilidades.
|
||||
@@ -0,0 +1,38 @@
|
||||
---
|
||||
name: levenshtein_distance
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def levenshtein_distance(a: str, b: str) -> int"
|
||||
description: "Calcula la distancia de Levenshtein (edit distance) entre dos strings. Util para deteccion de typosquatting en dominios."
|
||||
tags: [levenshtein, distance, fuzzy, typosquatting, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: []
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
levenshtein_distance("google.com", "gooogle.com")
|
||||
# 1
|
||||
|
||||
levenshtein_distance("paypal.com", "paypa1.com")
|
||||
# 1
|
||||
|
||||
levenshtein_distance("abc", "abc")
|
||||
# 0
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Implementacion O(n*m) con optimizacion de espacio (dos filas). Sin dependencias externas. Util para detectar dominios de typosquatting comparando contra dominios legitimos conocidos.
|
||||
@@ -0,0 +1,35 @@
|
||||
---
|
||||
name: normalize_url
|
||||
kind: function
|
||||
lang: py
|
||||
domain: cybersecurity
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "def normalize_url(raw_url: str) -> str"
|
||||
description: "Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros. Util para deduplicacion de IoCs."
|
||||
tags: [url, normalize, ioc, dedup, python]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [urllib.parse]
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "python/functions/cybersecurity/cybersecurity.py"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```python
|
||||
normalize_url("HTTPS://Example.COM/path?b=2&a=1#frag")
|
||||
# "https://example.com/path?a=1&b=2"
|
||||
|
||||
normalize_url("http://test.org/path/")
|
||||
# "http://test.org/path"
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Operaciones de normalizacion: lowercase de scheme y host, eliminacion de trailing slash (excepto root), ordenamiento alfabetico de query parameters, eliminacion de fragmentos. Usa urllib.parse de stdlib.
|
||||
Reference in New Issue
Block a user