Files
egutierrez 63a9cb5273 feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

87 lines
2.5 KiB
Python

"""detect_drift — detecta drift estadistico por z-score comparando metricas contra historial."""
import math
def detect_drift(
history: list[dict],
current: dict,
fields: list[str],
threshold: float = 2.0,
) -> list[dict]:
"""Detecta drift estadistico comparando metricas actuales contra el historial.
Usa z-score: si |z| > threshold, el campo ha drifteado. Pensado para
comparar metrics de executions sucesivas en operations.db.
Args:
history: Lista de dicts con metricas historicas. Cada dict puede
contener cualquier combinacion de los campos indicados.
current: Dict con las metricas de la ejecucion actual.
fields: Lista de campos numericos a analizar.
threshold: Umbral de z-score para considerar drift. Default 2.0.
Returns:
Lista de dicts con: field, current, mean, std, z_score, drifted.
Si el historial tiene 0 o 1 punto, z_score=0.0 y drifted=False
porque no hay suficiente informacion estadistica.
"""
results = []
for field in fields:
values = [
float(h[field])
for h in history
if field in h and h[field] is not None
]
current_val = float(current.get(field, 0))
if len(values) == 0:
results.append({
"field": field,
"current": current_val,
"mean": 0.0,
"std": 0.0,
"z_score": 0.0,
"drifted": False,
})
continue
n = len(values)
mean = sum(values) / n
if n < 2:
# Un solo punto: no hay std, no podemos calcular z-score
results.append({
"field": field,
"current": current_val,
"mean": mean,
"std": 0.0,
"z_score": 0.0,
"drifted": False,
})
continue
variance = sum((v - mean) ** 2 for v in values) / n
std = math.sqrt(variance)
if std == 0.0:
# Todos los valores identicos: z_score indeterminado, no drift
z_score = 0.0
drifted = False
else:
z_score = (current_val - mean) / std
drifted = abs(z_score) > threshold
results.append({
"field": field,
"current": current_val,
"mean": mean,
"std": std,
"z_score": z_score,
"drifted": drifted,
})
return results