"""Finance domain — pure functions for financial indicators and calculations.""" import math def sma(data: list, period: int) -> list: """Calcula la media movil simple (SMA) de una serie de precios.""" if period <= 0 or period > len(data): return [] result = [] for i in range(period - 1, len(data)): window = data[i - period + 1 : i + 1] result.append(sum(window) / period) return result def ema(data: list, period: int) -> list: """Calcula la media movil exponencial (EMA) de una serie de precios.""" if period <= 0 or period > len(data): return [] multiplier = 2.0 / (period + 1) # Primer valor es SMA del primer periodo first_sma = sum(data[:period]) / period result = [first_sma] for i in range(period, len(data)): val = (data[i] - result[-1]) * multiplier + result[-1] result.append(val) return result def rsi(data: list, period: int) -> list: """Calcula el Relative Strength Index (RSI) de una serie de precios.""" if period <= 0 or len(data) < period + 1: return [] deltas = [data[i] - data[i - 1] for i in range(1, len(data))] gains = [d if d > 0 else 0.0 for d in deltas] losses = [-d if d < 0 else 0.0 for d in deltas] avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period result = [] if avg_loss == 0: result.append(100.0) else: rs = avg_gain / avg_loss result.append(100.0 - 100.0 / (1.0 + rs)) for i in range(period, len(deltas)): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: result.append(100.0) else: rs = avg_gain / avg_loss result.append(100.0 - 100.0 / (1.0 + rs)) return result def bollinger_bands(data: list, period: int, num_std: float) -> tuple: """Calcula las Bandas de Bollinger (upper, middle, lower).""" if period <= 0 or period > len(data): return ([], [], []) middle = sma(data, period) upper = [] lower = [] for i in range(len(middle)): window = data[i : i + period] mean = middle[i] variance = sum((x - mean) ** 2 for x in window) / period std = math.sqrt(variance) upper.append(mean + num_std * std) lower.append(mean - num_std * std) return (upper, middle, lower) def sharpe_ratio(returns: list, risk_free_rate: float, periods_per_year: float) -> float: """Calcula el Sharpe Ratio anualizado.""" if len(returns) == 0 or periods_per_year <= 0: return 0.0 n = len(returns) mean_return = sum(returns) / n excess = mean_return - risk_free_rate / periods_per_year variance = sum((r - mean_return) ** 2 for r in returns) / n std = math.sqrt(variance) if std == 0: return 0.0 return (excess / std) * math.sqrt(periods_per_year) def max_drawdown(values: list) -> tuple: """Calcula el max drawdown y los indices de inicio y fin.""" if len(values) < 2: return (0.0, 0, 0) peak = values[0] peak_idx = 0 max_dd = 0.0 dd_start = 0 dd_end = 0 for i in range(1, len(values)): if values[i] > peak: peak = values[i] peak_idx = i dd = (peak - values[i]) / peak if peak != 0 else 0.0 if dd > max_dd: max_dd = dd dd_start = peak_idx dd_end = i return (max_dd, dd_start, dd_end) def vwap(prices: list, volumes: list) -> float: """Calcula el Volume-Weighted Average Price (VWAP).""" if len(prices) == 0 or len(prices) != len(volumes): return 0.0 total_volume = sum(volumes) if total_volume == 0: return 0.0 return sum(p * v for p, v in zip(prices, volumes)) / total_volume def log_return(price_start: float, price_end: float) -> float: """Calcula el retorno logaritmico entre dos precios.""" if price_start <= 0 or price_end <= 0: return 0.0 return math.log(price_end / price_start) def annualized_volatility(returns: list, periods_per_year: float) -> float: """Calcula la volatilidad anualizada de una serie de retornos.""" if len(returns) < 2 or periods_per_year <= 0: return 0.0 n = len(returns) mean = sum(returns) / n variance = sum((r - mean) ** 2 for r in returns) / (n - 1) return math.sqrt(variance) * math.sqrt(periods_per_year) def generate_gbm_prices( initial_price: float, n_ticks: int, sigma: float, mu: float = 0.0, jump_intensity: float = 0.0, jump_size_std: float = 0.05, seed: int = 42, ) -> list: """Genera serie de precios fundamentales con Geometric Brownian Motion + jump-diffusion. S(t+1) = S(t) * exp((mu - sigma^2/2)*dt + sigma*sqrt(dt)*Z + J*N) donde Z ~ N(0,1), N ~ Bernoulli(jump_intensity), J ~ N(0, jump_size_std) """ import numpy as np rng = np.random.default_rng(seed) prices = [0.0] * n_ticks prices[0] = initial_price dt = 1.0 for t in range(1, n_ticks): z = rng.standard_normal() gbm = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z jump = 0.0 if jump_intensity > 0 and rng.random() < jump_intensity: jump = rng.normal(0, jump_size_std) prices[t] = prices[t - 1] * np.exp(gbm + jump) return prices def avellaneda_stoikov_quotes( mid_price: float, inventory: float, gamma: float, sigma: float, spread_base: float, n_levels: int = 3, qty_base: float = 10.0, ) -> list: """Genera ordenes de market maker usando el modelo Avellaneda-Stoikov. Precio de reserva: r = mid - inventory * gamma * sigma^2 Half spread: delta = spread_base/2 + gamma * sigma^2/2 Retorna lista de dicts con keys: side, price, qty """ reservation = mid_price - inventory * gamma * sigma**2 half_spread = spread_base / 2 + gamma * sigma**2 / 2 orders = [] for level in range(n_levels): offset = level * half_spread * 0.5 qty = qty_base * (1 + level * 0.5) bid_price = round(reservation - half_spread - offset, 2) ask_price = round(reservation + half_spread + offset, 2) if bid_price > 0: orders.append({'side': 'buy', 'price': bid_price, 'qty': qty}) if ask_price > 0: orders.append({'side': 'sell', 'price': ask_price, 'qty': qty}) return orders def generate_taker_order( alpha: float = 2.0, size_min: float = 1.0, size_max: float = 100.0, buy_prob: float = 0.5, seed: int | None = None, ) -> dict: """Genera una market order de taker con tamano power-law (Pareto). P(size > x) ~ x^(-alpha). Alpha bajo = mas ballenas. Retorna dict con keys: side, qty """ import numpy as np rng = np.random.default_rng(seed) side = 'buy' if rng.random() < buy_prob else 'sell' raw_size = (rng.pareto(alpha) + 1) * size_min size = min(round(raw_size, 1), size_max) return {'side': side, 'qty': size} def hawkes_intensity( base_rate: float, hawkes_alpha: float, hawkes_beta: float, event_times: list, current_time: float, ) -> float: """Calcula la intensidad lambda(t) de un proceso de Hawkes en el tiempo actual. lambda(t) = base_rate + sum(alpha * exp(-beta * (t - ti))) donde ti son los tiempos de eventos pasados. """ import numpy as np excitation = sum( hawkes_alpha * np.exp(-hawkes_beta * (current_time - ti)) for ti in event_times if ti < current_time ) return max(0.0, base_rate + excitation)