Files
egutierrez 837563c3ba feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

72 lines
2.3 KiB
Python

"""GROUP BY + agregaciones sobre datos tabulares list[dict]."""
from collections import defaultdict
def aggregate_by_group(
rows: list[dict],
group_by: list[str],
aggs: dict[str, str],
) -> list[dict]:
"""Agrupa filas por una o varias columnas y aplica agregaciones.
Equivalente a SQL GROUP BY con funciones de agregacion.
La funcion 'collect' acumula todos los valores en una lista.
Los valores None se ignoran en agregaciones numericas (sum, mean, min, max).
Args:
rows: Lista de dicts con los datos.
group_by: Lista de columnas por las que agrupar.
aggs: Dict de {columna: funcion}. Funciones: sum, mean, count,
min, max, first, last, collect.
Returns:
Lista de dicts con las columnas de group_by mas los campos agregados.
El orden de las filas sigue el orden de primera aparicion del grupo.
"""
# Mantener orden de grupos con lista de claves
group_keys: list[tuple] = []
seen_groups: set[tuple] = set()
buckets: dict[tuple, dict[str, list]] = defaultdict(lambda: defaultdict(list))
for row in rows:
gk = tuple(row.get(col) for col in group_by)
if gk not in seen_groups:
seen_groups.add(gk)
group_keys.append(gk)
for col in aggs:
val = row.get(col)
buckets[gk][col].append(val)
def _aggregate(vals: list, func: str):
if func == "collect":
return vals
if func == "count":
return len(vals)
if func == "first":
return vals[0] if vals else None
if func == "last":
return vals[-1] if vals else None
# Para sum, mean, min, max: ignorar None
numeric = [v for v in vals if v is not None]
if not numeric:
return None
if func == "sum":
return sum(numeric)
if func == "mean":
return sum(numeric) / len(numeric)
if func == "min":
return min(numeric)
if func == "max":
return max(numeric)
raise ValueError(f"Funcion de agregacion no soportada: {func}")
result = []
for gk in group_keys:
record: dict = dict(zip(group_by, gk))
for col, func in aggs.items():
record[col] = _aggregate(buckets[gk][col], func)
result.append(record)
return result