"""Pure datascience utilities — statistics and numerical functions. Uses only math stdlib. No external dependencies. """ import math def pearson(xs: list, ys: list) -> float: """Pearson correlation coefficient between two lists of floats.""" n = len(xs) if n != len(ys) or n == 0: return 0.0 mean_x = sum(xs) / n mean_y = sum(ys) / n num = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs, ys)) den_x = math.sqrt(sum((x - mean_x) ** 2 for x in xs)) den_y = math.sqrt(sum((y - mean_y) ** 2 for y in ys)) if den_x == 0.0 or den_y == 0.0: return 0.0 return num / (den_x * den_y) def standardize(data: list) -> list: """Z-score standardization (mean=0, std=1).""" n = len(data) if n == 0: return [] mean = sum(data) / n std = math.sqrt(sum((x - mean) ** 2 for x in data) / n) if std == 0.0: return [0.0] * n return [(x - mean) / std for x in data] def min_max_scale(data: list) -> list: """Scale values to [0, 1] range.""" if not data: return [] lo = min(data) hi = max(data) if hi == lo: return [0.0] * len(data) return [(x - lo) / (hi - lo) for x in data] def clip(data: list, lo: float, hi: float) -> list: """Clip values to [lo, hi].""" return [max(lo, min(hi, x)) for x in data] def detect_outliers(data: list, threshold: float) -> list: """Returns list of bools, True where |z-score| > threshold.""" n = len(data) if n == 0: return [] mean = sum(data) / n std = math.sqrt(sum((x - mean) ** 2 for x in data) / n) if std == 0.0: return [False] * n return [abs((x - mean) / std) > threshold for x in data] def impute(data: list) -> list: """Replace None/NaN with mean of non-null values.""" valid = [x for x in data if x is not None and not (isinstance(x, float) and math.isnan(x))] if not valid: return [0.0] * len(data) mean = sum(valid) / len(valid) return [ mean if (x is None or (isinstance(x, float) and math.isnan(x))) else x for x in data ] def histogram(data: list, buckets: int) -> list: """Returns list of counts per bucket.""" if not data or buckets <= 0: return [] lo = min(data) hi = max(data) if hi == lo: counts = [0] * buckets counts[0] = len(data) return counts width = (hi - lo) / buckets counts = [0] * buckets for x in data: idx = int((x - lo) / width) if idx >= buckets: idx = buckets - 1 counts[idx] += 1 return counts def rolling_window(xs: list, size: int) -> list: """Returns list of sublists (sliding windows of given size).""" if size <= 0 or size > len(xs): return [] return [xs[i : i + size] for i in range(len(xs) - size + 1)] def autocorrelation(data: list, lag: int) -> float: """Autocorrelation at given lag.""" n = len(data) if lag < 0 or lag >= n or n == 0: return 0.0 mean = sum(data) / n var = sum((x - mean) ** 2 for x in data) / n if var == 0.0: return 0.0 cov = sum((data[i] - mean) * (data[i + lag] - mean) for i in range(n - lag)) / n return cov / var def linspace(start: float, stop: float, num: int) -> list: """Generate evenly spaced values from start to stop (inclusive).""" if num <= 0: return [] if num == 1: return [start] step = (stop - start) / (num - 1) return [start + i * step for i in range(num)] def estimate_hawkes(arrivals: list[int], max_lag: int = 30) -> dict: """Estima parámetros de un proceso Hawkes desde autocorrelación de arrivals. Ajusta exponencial a*exp(-b*lag) sobre la ACF. Retorna dict con alpha, beta, branching_ratio, acf. """ import numpy as np from scipy.optimize import curve_fit arr = np.array(arrivals, dtype=float) mean_a = np.mean(arr) var_a = np.var(arr) if var_a == 0: return {'alpha': 0.0, 'beta': 1.0, 'branching_ratio': 0.0, 'acf': [1.0]} acf = [1.0] + [ float(np.mean((arr[lag:] - mean_a) * (arr[:-lag] - mean_a)) / var_a) for lag in range(1, max_lag) ] lags = np.arange(1, max_lag) acf_vals = np.array(acf[1:]) if acf_vals[0] <= 0.01: return {'alpha': 0.0, 'beta': 1.0, 'branching_ratio': 0.0, 'acf': acf} exp_decay = lambda x, a, b: a * np.exp(-b * x) try: popt, _ = curve_fit(exp_decay, lags, acf_vals, p0=[0.5, 0.5], maxfev=5000) alpha_est, beta_est = abs(popt[0]), abs(popt[1]) except RuntimeError: alpha_est, beta_est = 0.0, 1.0 branching = alpha_est / beta_est if beta_est > 0 else 0.0 return { 'alpha': round(alpha_est, 4), 'beta': round(beta_est, 4), 'branching_ratio': round(branching, 4), 'acf': acf, } def estimate_pareto_alpha(values: list[float], x_min_percentile: float = 90.0) -> dict: """Estima el exponente alpha de una distribución Pareto via MLE. α = n / Σ ln(xi / x_min) donde x_min es el percentil indicado. Alpha bajo = cola más pesada = más valores extremos. """ import numpy as np arr = np.array([v for v in values if v > 0], dtype=float) if len(arr) < 10: return {'alpha': 0.0, 'x_min': 0.0, 'n_tail': 0} x_min = float(np.percentile(arr, x_min_percentile)) tail = arr[arr >= x_min] if len(tail) < 2 or x_min <= 0: return {'alpha': 0.0, 'x_min': x_min, 'n_tail': len(tail)} alpha = float(len(tail) / np.sum(np.log(tail / x_min))) return { 'alpha': round(alpha, 4), 'x_min': round(x_min, 6), 'n_tail': len(tail), }