Files
fn_registry/python/functions/finance/finance.py
T
egutierrez 837563c3ba feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

239 lines
7.5 KiB
Python

"""Finance domain — pure functions for financial indicators and calculations."""
import math
def sma(data: list, period: int) -> list:
"""Calcula la media movil simple (SMA) de una serie de precios."""
if period <= 0 or period > len(data):
return []
result = []
for i in range(period - 1, len(data)):
window = data[i - period + 1 : i + 1]
result.append(sum(window) / period)
return result
def ema(data: list, period: int) -> list:
"""Calcula la media movil exponencial (EMA) de una serie de precios."""
if period <= 0 or period > len(data):
return []
multiplier = 2.0 / (period + 1)
# Primer valor es SMA del primer periodo
first_sma = sum(data[:period]) / period
result = [first_sma]
for i in range(period, len(data)):
val = (data[i] - result[-1]) * multiplier + result[-1]
result.append(val)
return result
def rsi(data: list, period: int) -> list:
"""Calcula el Relative Strength Index (RSI) de una serie de precios."""
if period <= 0 or len(data) < period + 1:
return []
deltas = [data[i] - data[i - 1] for i in range(1, len(data))]
gains = [d if d > 0 else 0.0 for d in deltas]
losses = [-d if d < 0 else 0.0 for d in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
result = []
if avg_loss == 0:
result.append(100.0)
else:
rs = avg_gain / avg_loss
result.append(100.0 - 100.0 / (1.0 + rs))
for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
result.append(100.0)
else:
rs = avg_gain / avg_loss
result.append(100.0 - 100.0 / (1.0 + rs))
return result
def bollinger_bands(data: list, period: int, num_std: float) -> tuple:
"""Calcula las Bandas de Bollinger (upper, middle, lower)."""
if period <= 0 or period > len(data):
return ([], [], [])
middle = sma(data, period)
upper = []
lower = []
for i in range(len(middle)):
window = data[i : i + period]
mean = middle[i]
variance = sum((x - mean) ** 2 for x in window) / period
std = math.sqrt(variance)
upper.append(mean + num_std * std)
lower.append(mean - num_std * std)
return (upper, middle, lower)
def sharpe_ratio(returns: list, risk_free_rate: float, periods_per_year: float) -> float:
"""Calcula el Sharpe Ratio anualizado."""
if len(returns) == 0 or periods_per_year <= 0:
return 0.0
n = len(returns)
mean_return = sum(returns) / n
excess = mean_return - risk_free_rate / periods_per_year
variance = sum((r - mean_return) ** 2 for r in returns) / n
std = math.sqrt(variance)
if std == 0:
return 0.0
return (excess / std) * math.sqrt(periods_per_year)
def max_drawdown(values: list) -> tuple:
"""Calcula el max drawdown y los indices de inicio y fin."""
if len(values) < 2:
return (0.0, 0, 0)
peak = values[0]
peak_idx = 0
max_dd = 0.0
dd_start = 0
dd_end = 0
for i in range(1, len(values)):
if values[i] > peak:
peak = values[i]
peak_idx = i
dd = (peak - values[i]) / peak if peak != 0 else 0.0
if dd > max_dd:
max_dd = dd
dd_start = peak_idx
dd_end = i
return (max_dd, dd_start, dd_end)
def vwap(prices: list, volumes: list) -> float:
"""Calcula el Volume-Weighted Average Price (VWAP)."""
if len(prices) == 0 or len(prices) != len(volumes):
return 0.0
total_volume = sum(volumes)
if total_volume == 0:
return 0.0
return sum(p * v for p, v in zip(prices, volumes)) / total_volume
def log_return(price_start: float, price_end: float) -> float:
"""Calcula el retorno logaritmico entre dos precios."""
if price_start <= 0 or price_end <= 0:
return 0.0
return math.log(price_end / price_start)
def annualized_volatility(returns: list, periods_per_year: float) -> float:
"""Calcula la volatilidad anualizada de una serie de retornos."""
if len(returns) < 2 or periods_per_year <= 0:
return 0.0
n = len(returns)
mean = sum(returns) / n
variance = sum((r - mean) ** 2 for r in returns) / (n - 1)
return math.sqrt(variance) * math.sqrt(periods_per_year)
def generate_gbm_prices(
initial_price: float,
n_ticks: int,
sigma: float,
mu: float = 0.0,
jump_intensity: float = 0.0,
jump_size_std: float = 0.05,
seed: int = 42,
) -> list:
"""Genera serie de precios fundamentales con Geometric Brownian Motion + jump-diffusion.
S(t+1) = S(t) * exp((mu - sigma^2/2)*dt + sigma*sqrt(dt)*Z + J*N)
donde Z ~ N(0,1), N ~ Bernoulli(jump_intensity), J ~ N(0, jump_size_std)
"""
import numpy as np
rng = np.random.default_rng(seed)
prices = [0.0] * n_ticks
prices[0] = initial_price
dt = 1.0
for t in range(1, n_ticks):
z = rng.standard_normal()
gbm = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z
jump = 0.0
if jump_intensity > 0 and rng.random() < jump_intensity:
jump = rng.normal(0, jump_size_std)
prices[t] = prices[t - 1] * np.exp(gbm + jump)
return prices
def avellaneda_stoikov_quotes(
mid_price: float,
inventory: float,
gamma: float,
sigma: float,
spread_base: float,
n_levels: int = 3,
qty_base: float = 10.0,
) -> list:
"""Genera ordenes de market maker usando el modelo Avellaneda-Stoikov.
Precio de reserva: r = mid - inventory * gamma * sigma^2
Half spread: delta = spread_base/2 + gamma * sigma^2/2
Retorna lista de dicts con keys: side, price, qty
"""
reservation = mid_price - inventory * gamma * sigma**2
half_spread = spread_base / 2 + gamma * sigma**2 / 2
orders = []
for level in range(n_levels):
offset = level * half_spread * 0.5
qty = qty_base * (1 + level * 0.5)
bid_price = round(reservation - half_spread - offset, 2)
ask_price = round(reservation + half_spread + offset, 2)
if bid_price > 0:
orders.append({'side': 'buy', 'price': bid_price, 'qty': qty})
if ask_price > 0:
orders.append({'side': 'sell', 'price': ask_price, 'qty': qty})
return orders
def generate_taker_order(
alpha: float = 2.0,
size_min: float = 1.0,
size_max: float = 100.0,
buy_prob: float = 0.5,
seed: int | None = None,
) -> dict:
"""Genera una market order de taker con tamano power-law (Pareto).
P(size > x) ~ x^(-alpha). Alpha bajo = mas ballenas.
Retorna dict con keys: side, qty
"""
import numpy as np
rng = np.random.default_rng(seed)
side = 'buy' if rng.random() < buy_prob else 'sell'
raw_size = (rng.pareto(alpha) + 1) * size_min
size = min(round(raw_size, 1), size_max)
return {'side': side, 'qty': size}
def hawkes_intensity(
base_rate: float,
hawkes_alpha: float,
hawkes_beta: float,
event_times: list,
current_time: float,
) -> float:
"""Calcula la intensidad lambda(t) de un proceso de Hawkes en el tiempo actual.
lambda(t) = base_rate + sum(alpha * exp(-beta * (t - ti)))
donde ti son los tiempos de eventos pasados.
"""
import numpy as np
excitation = sum(
hawkes_alpha * np.exp(-hawkes_beta * (current_time - ti))
for ti in event_times
if ti < current_time
)
return max(0.0, base_rate + excitation)