63a9cb5273
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
"""GROUP BY + agregaciones sobre datos tabulares list[dict]."""
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
def aggregate_by_group(
|
|
rows: list[dict],
|
|
group_by: list[str],
|
|
aggs: dict[str, str],
|
|
) -> list[dict]:
|
|
"""Agrupa filas por una o varias columnas y aplica agregaciones.
|
|
|
|
Equivalente a SQL GROUP BY con funciones de agregacion.
|
|
La funcion 'collect' acumula todos los valores en una lista.
|
|
Los valores None se ignoran en agregaciones numericas (sum, mean, min, max).
|
|
|
|
Args:
|
|
rows: Lista de dicts con los datos.
|
|
group_by: Lista de columnas por las que agrupar.
|
|
aggs: Dict de {columna: funcion}. Funciones: sum, mean, count,
|
|
min, max, first, last, collect.
|
|
|
|
Returns:
|
|
Lista de dicts con las columnas de group_by mas los campos agregados.
|
|
El orden de las filas sigue el orden de primera aparicion del grupo.
|
|
"""
|
|
# Mantener orden de grupos con lista de claves
|
|
group_keys: list[tuple] = []
|
|
seen_groups: set[tuple] = set()
|
|
buckets: dict[tuple, dict[str, list]] = defaultdict(lambda: defaultdict(list))
|
|
|
|
for row in rows:
|
|
gk = tuple(row.get(col) for col in group_by)
|
|
if gk not in seen_groups:
|
|
seen_groups.add(gk)
|
|
group_keys.append(gk)
|
|
for col in aggs:
|
|
val = row.get(col)
|
|
buckets[gk][col].append(val)
|
|
|
|
def _aggregate(vals: list, func: str):
|
|
if func == "collect":
|
|
return vals
|
|
if func == "count":
|
|
return len(vals)
|
|
if func == "first":
|
|
return vals[0] if vals else None
|
|
if func == "last":
|
|
return vals[-1] if vals else None
|
|
# Para sum, mean, min, max: ignorar None
|
|
numeric = [v for v in vals if v is not None]
|
|
if not numeric:
|
|
return None
|
|
if func == "sum":
|
|
return sum(numeric)
|
|
if func == "mean":
|
|
return sum(numeric) / len(numeric)
|
|
if func == "min":
|
|
return min(numeric)
|
|
if func == "max":
|
|
return max(numeric)
|
|
raise ValueError(f"Funcion de agregacion no soportada: {func}")
|
|
|
|
result = []
|
|
for gk in group_keys:
|
|
record: dict = dict(zip(group_by, gk))
|
|
for col, func in aggs.items():
|
|
record[col] = _aggregate(buckets[gk][col], func)
|
|
result.append(record)
|
|
|
|
return result
|