837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
239 lines
7.5 KiB
Python
239 lines
7.5 KiB
Python
"""Finance domain — pure functions for financial indicators and calculations."""
|
|
|
|
import math
|
|
|
|
|
|
def sma(data: list, period: int) -> list:
|
|
"""Calcula la media movil simple (SMA) de una serie de precios."""
|
|
if period <= 0 or period > len(data):
|
|
return []
|
|
result = []
|
|
for i in range(period - 1, len(data)):
|
|
window = data[i - period + 1 : i + 1]
|
|
result.append(sum(window) / period)
|
|
return result
|
|
|
|
|
|
def ema(data: list, period: int) -> list:
|
|
"""Calcula la media movil exponencial (EMA) de una serie de precios."""
|
|
if period <= 0 or period > len(data):
|
|
return []
|
|
multiplier = 2.0 / (period + 1)
|
|
# Primer valor es SMA del primer periodo
|
|
first_sma = sum(data[:period]) / period
|
|
result = [first_sma]
|
|
for i in range(period, len(data)):
|
|
val = (data[i] - result[-1]) * multiplier + result[-1]
|
|
result.append(val)
|
|
return result
|
|
|
|
|
|
def rsi(data: list, period: int) -> list:
|
|
"""Calcula el Relative Strength Index (RSI) de una serie de precios."""
|
|
if period <= 0 or len(data) < period + 1:
|
|
return []
|
|
deltas = [data[i] - data[i - 1] for i in range(1, len(data))]
|
|
gains = [d if d > 0 else 0.0 for d in deltas]
|
|
losses = [-d if d < 0 else 0.0 for d in deltas]
|
|
|
|
avg_gain = sum(gains[:period]) / period
|
|
avg_loss = sum(losses[:period]) / period
|
|
|
|
result = []
|
|
if avg_loss == 0:
|
|
result.append(100.0)
|
|
else:
|
|
rs = avg_gain / avg_loss
|
|
result.append(100.0 - 100.0 / (1.0 + rs))
|
|
|
|
for i in range(period, len(deltas)):
|
|
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
|
|
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
|
|
if avg_loss == 0:
|
|
result.append(100.0)
|
|
else:
|
|
rs = avg_gain / avg_loss
|
|
result.append(100.0 - 100.0 / (1.0 + rs))
|
|
|
|
return result
|
|
|
|
|
|
def bollinger_bands(data: list, period: int, num_std: float) -> tuple:
|
|
"""Calcula las Bandas de Bollinger (upper, middle, lower)."""
|
|
if period <= 0 or period > len(data):
|
|
return ([], [], [])
|
|
middle = sma(data, period)
|
|
upper = []
|
|
lower = []
|
|
for i in range(len(middle)):
|
|
window = data[i : i + period]
|
|
mean = middle[i]
|
|
variance = sum((x - mean) ** 2 for x in window) / period
|
|
std = math.sqrt(variance)
|
|
upper.append(mean + num_std * std)
|
|
lower.append(mean - num_std * std)
|
|
return (upper, middle, lower)
|
|
|
|
|
|
def sharpe_ratio(returns: list, risk_free_rate: float, periods_per_year: float) -> float:
|
|
"""Calcula el Sharpe Ratio anualizado."""
|
|
if len(returns) == 0 or periods_per_year <= 0:
|
|
return 0.0
|
|
n = len(returns)
|
|
mean_return = sum(returns) / n
|
|
excess = mean_return - risk_free_rate / periods_per_year
|
|
variance = sum((r - mean_return) ** 2 for r in returns) / n
|
|
std = math.sqrt(variance)
|
|
if std == 0:
|
|
return 0.0
|
|
return (excess / std) * math.sqrt(periods_per_year)
|
|
|
|
|
|
def max_drawdown(values: list) -> tuple:
|
|
"""Calcula el max drawdown y los indices de inicio y fin."""
|
|
if len(values) < 2:
|
|
return (0.0, 0, 0)
|
|
peak = values[0]
|
|
peak_idx = 0
|
|
max_dd = 0.0
|
|
dd_start = 0
|
|
dd_end = 0
|
|
for i in range(1, len(values)):
|
|
if values[i] > peak:
|
|
peak = values[i]
|
|
peak_idx = i
|
|
dd = (peak - values[i]) / peak if peak != 0 else 0.0
|
|
if dd > max_dd:
|
|
max_dd = dd
|
|
dd_start = peak_idx
|
|
dd_end = i
|
|
return (max_dd, dd_start, dd_end)
|
|
|
|
|
|
def vwap(prices: list, volumes: list) -> float:
|
|
"""Calcula el Volume-Weighted Average Price (VWAP)."""
|
|
if len(prices) == 0 or len(prices) != len(volumes):
|
|
return 0.0
|
|
total_volume = sum(volumes)
|
|
if total_volume == 0:
|
|
return 0.0
|
|
return sum(p * v for p, v in zip(prices, volumes)) / total_volume
|
|
|
|
|
|
def log_return(price_start: float, price_end: float) -> float:
|
|
"""Calcula el retorno logaritmico entre dos precios."""
|
|
if price_start <= 0 or price_end <= 0:
|
|
return 0.0
|
|
return math.log(price_end / price_start)
|
|
|
|
|
|
def annualized_volatility(returns: list, periods_per_year: float) -> float:
|
|
"""Calcula la volatilidad anualizada de una serie de retornos."""
|
|
if len(returns) < 2 or periods_per_year <= 0:
|
|
return 0.0
|
|
n = len(returns)
|
|
mean = sum(returns) / n
|
|
variance = sum((r - mean) ** 2 for r in returns) / (n - 1)
|
|
return math.sqrt(variance) * math.sqrt(periods_per_year)
|
|
|
|
|
|
def generate_gbm_prices(
|
|
initial_price: float,
|
|
n_ticks: int,
|
|
sigma: float,
|
|
mu: float = 0.0,
|
|
jump_intensity: float = 0.0,
|
|
jump_size_std: float = 0.05,
|
|
seed: int = 42,
|
|
) -> list:
|
|
"""Genera serie de precios fundamentales con Geometric Brownian Motion + jump-diffusion.
|
|
|
|
S(t+1) = S(t) * exp((mu - sigma^2/2)*dt + sigma*sqrt(dt)*Z + J*N)
|
|
donde Z ~ N(0,1), N ~ Bernoulli(jump_intensity), J ~ N(0, jump_size_std)
|
|
"""
|
|
import numpy as np
|
|
rng = np.random.default_rng(seed)
|
|
prices = [0.0] * n_ticks
|
|
prices[0] = initial_price
|
|
dt = 1.0
|
|
for t in range(1, n_ticks):
|
|
z = rng.standard_normal()
|
|
gbm = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z
|
|
jump = 0.0
|
|
if jump_intensity > 0 and rng.random() < jump_intensity:
|
|
jump = rng.normal(0, jump_size_std)
|
|
prices[t] = prices[t - 1] * np.exp(gbm + jump)
|
|
return prices
|
|
|
|
|
|
def avellaneda_stoikov_quotes(
|
|
mid_price: float,
|
|
inventory: float,
|
|
gamma: float,
|
|
sigma: float,
|
|
spread_base: float,
|
|
n_levels: int = 3,
|
|
qty_base: float = 10.0,
|
|
) -> list:
|
|
"""Genera ordenes de market maker usando el modelo Avellaneda-Stoikov.
|
|
|
|
Precio de reserva: r = mid - inventory * gamma * sigma^2
|
|
Half spread: delta = spread_base/2 + gamma * sigma^2/2
|
|
|
|
Retorna lista de dicts con keys: side, price, qty
|
|
"""
|
|
reservation = mid_price - inventory * gamma * sigma**2
|
|
half_spread = spread_base / 2 + gamma * sigma**2 / 2
|
|
orders = []
|
|
for level in range(n_levels):
|
|
offset = level * half_spread * 0.5
|
|
qty = qty_base * (1 + level * 0.5)
|
|
bid_price = round(reservation - half_spread - offset, 2)
|
|
ask_price = round(reservation + half_spread + offset, 2)
|
|
if bid_price > 0:
|
|
orders.append({'side': 'buy', 'price': bid_price, 'qty': qty})
|
|
if ask_price > 0:
|
|
orders.append({'side': 'sell', 'price': ask_price, 'qty': qty})
|
|
return orders
|
|
|
|
|
|
def generate_taker_order(
|
|
alpha: float = 2.0,
|
|
size_min: float = 1.0,
|
|
size_max: float = 100.0,
|
|
buy_prob: float = 0.5,
|
|
seed: int | None = None,
|
|
) -> dict:
|
|
"""Genera una market order de taker con tamano power-law (Pareto).
|
|
|
|
P(size > x) ~ x^(-alpha). Alpha bajo = mas ballenas.
|
|
Retorna dict con keys: side, qty
|
|
"""
|
|
import numpy as np
|
|
rng = np.random.default_rng(seed)
|
|
side = 'buy' if rng.random() < buy_prob else 'sell'
|
|
raw_size = (rng.pareto(alpha) + 1) * size_min
|
|
size = min(round(raw_size, 1), size_max)
|
|
return {'side': side, 'qty': size}
|
|
|
|
|
|
def hawkes_intensity(
|
|
base_rate: float,
|
|
hawkes_alpha: float,
|
|
hawkes_beta: float,
|
|
event_times: list,
|
|
current_time: float,
|
|
) -> float:
|
|
"""Calcula la intensidad lambda(t) de un proceso de Hawkes en el tiempo actual.
|
|
|
|
lambda(t) = base_rate + sum(alpha * exp(-beta * (t - ti)))
|
|
donde ti son los tiempos de eventos pasados.
|
|
"""
|
|
import numpy as np
|
|
excitation = sum(
|
|
hawkes_alpha * np.exp(-hawkes_beta * (current_time - ti))
|
|
for ti in event_times
|
|
if ti < current_time
|
|
)
|
|
return max(0.0, base_rate + excitation)
|