Files
fn_registry/python/functions/datascience/datascience.py
T
egutierrez 837563c3ba feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

193 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pure datascience utilities — statistics and numerical functions.
Uses only math stdlib. No external dependencies.
"""
import math
def pearson(xs: list, ys: list) -> float:
"""Pearson correlation coefficient between two lists of floats."""
n = len(xs)
if n != len(ys) or n == 0:
return 0.0
mean_x = sum(xs) / n
mean_y = sum(ys) / n
num = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs, ys))
den_x = math.sqrt(sum((x - mean_x) ** 2 for x in xs))
den_y = math.sqrt(sum((y - mean_y) ** 2 for y in ys))
if den_x == 0.0 or den_y == 0.0:
return 0.0
return num / (den_x * den_y)
def standardize(data: list) -> list:
"""Z-score standardization (mean=0, std=1)."""
n = len(data)
if n == 0:
return []
mean = sum(data) / n
std = math.sqrt(sum((x - mean) ** 2 for x in data) / n)
if std == 0.0:
return [0.0] * n
return [(x - mean) / std for x in data]
def min_max_scale(data: list) -> list:
"""Scale values to [0, 1] range."""
if not data:
return []
lo = min(data)
hi = max(data)
if hi == lo:
return [0.0] * len(data)
return [(x - lo) / (hi - lo) for x in data]
def clip(data: list, lo: float, hi: float) -> list:
"""Clip values to [lo, hi]."""
return [max(lo, min(hi, x)) for x in data]
def detect_outliers(data: list, threshold: float) -> list:
"""Returns list of bools, True where |z-score| > threshold."""
n = len(data)
if n == 0:
return []
mean = sum(data) / n
std = math.sqrt(sum((x - mean) ** 2 for x in data) / n)
if std == 0.0:
return [False] * n
return [abs((x - mean) / std) > threshold for x in data]
def impute(data: list) -> list:
"""Replace None/NaN with mean of non-null values."""
valid = [x for x in data if x is not None and not (isinstance(x, float) and math.isnan(x))]
if not valid:
return [0.0] * len(data)
mean = sum(valid) / len(valid)
return [
mean if (x is None or (isinstance(x, float) and math.isnan(x))) else x
for x in data
]
def histogram(data: list, buckets: int) -> list:
"""Returns list of counts per bucket."""
if not data or buckets <= 0:
return []
lo = min(data)
hi = max(data)
if hi == lo:
counts = [0] * buckets
counts[0] = len(data)
return counts
width = (hi - lo) / buckets
counts = [0] * buckets
for x in data:
idx = int((x - lo) / width)
if idx >= buckets:
idx = buckets - 1
counts[idx] += 1
return counts
def rolling_window(xs: list, size: int) -> list:
"""Returns list of sublists (sliding windows of given size)."""
if size <= 0 or size > len(xs):
return []
return [xs[i : i + size] for i in range(len(xs) - size + 1)]
def autocorrelation(data: list, lag: int) -> float:
"""Autocorrelation at given lag."""
n = len(data)
if lag < 0 or lag >= n or n == 0:
return 0.0
mean = sum(data) / n
var = sum((x - mean) ** 2 for x in data) / n
if var == 0.0:
return 0.0
cov = sum((data[i] - mean) * (data[i + lag] - mean) for i in range(n - lag)) / n
return cov / var
def linspace(start: float, stop: float, num: int) -> list:
"""Generate evenly spaced values from start to stop (inclusive)."""
if num <= 0:
return []
if num == 1:
return [start]
step = (stop - start) / (num - 1)
return [start + i * step for i in range(num)]
def estimate_hawkes(arrivals: list[int], max_lag: int = 30) -> dict:
"""Estima parámetros de un proceso Hawkes desde autocorrelación de arrivals.
Ajusta exponencial a*exp(-b*lag) sobre la ACF.
Retorna dict con alpha, beta, branching_ratio, acf.
"""
import numpy as np
from scipy.optimize import curve_fit
arr = np.array(arrivals, dtype=float)
mean_a = np.mean(arr)
var_a = np.var(arr)
if var_a == 0:
return {'alpha': 0.0, 'beta': 1.0, 'branching_ratio': 0.0, 'acf': [1.0]}
acf = [1.0] + [
float(np.mean((arr[lag:] - mean_a) * (arr[:-lag] - mean_a)) / var_a)
for lag in range(1, max_lag)
]
lags = np.arange(1, max_lag)
acf_vals = np.array(acf[1:])
if acf_vals[0] <= 0.01:
return {'alpha': 0.0, 'beta': 1.0, 'branching_ratio': 0.0, 'acf': acf}
exp_decay = lambda x, a, b: a * np.exp(-b * x)
try:
popt, _ = curve_fit(exp_decay, lags, acf_vals, p0=[0.5, 0.5], maxfev=5000)
alpha_est, beta_est = abs(popt[0]), abs(popt[1])
except RuntimeError:
alpha_est, beta_est = 0.0, 1.0
branching = alpha_est / beta_est if beta_est > 0 else 0.0
return {
'alpha': round(alpha_est, 4),
'beta': round(beta_est, 4),
'branching_ratio': round(branching, 4),
'acf': acf,
}
def estimate_pareto_alpha(values: list[float], x_min_percentile: float = 90.0) -> dict:
"""Estima el exponente alpha de una distribución Pareto via MLE.
α = n / Σ ln(xi / x_min) donde x_min es el percentil indicado.
Alpha bajo = cola más pesada = más valores extremos.
"""
import numpy as np
arr = np.array([v for v in values if v > 0], dtype=float)
if len(arr) < 10:
return {'alpha': 0.0, 'x_min': 0.0, 'n_tail': 0}
x_min = float(np.percentile(arr, x_min_percentile))
tail = arr[arr >= x_min]
if len(tail) < 2 or x_min <= 0:
return {'alpha': 0.0, 'x_min': x_min, 'n_tail': len(tail)}
alpha = float(len(tail) / np.sum(np.log(tail / x_min)))
return {
'alpha': round(alpha, 4),
'x_min': round(x_min, 6),
'n_tail': len(tail),
}