837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
"""Pivot table sin pandas para datos tabulares list[dict]."""
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
def pivot(
|
|
rows: list[dict],
|
|
index: str,
|
|
columns: str,
|
|
values: str,
|
|
agg: str = "sum",
|
|
) -> list[dict]:
|
|
"""Transforma datos del formato largo al formato ancho (pivot table).
|
|
|
|
Agrupa por `index`, expande los valores unicos de `columns` como nuevas
|
|
columnas y agrega la columna `values` con la funcion indicada.
|
|
|
|
Args:
|
|
rows: Lista de dicts con los datos en formato largo.
|
|
index: Nombre de la columna que actua como indice de filas.
|
|
columns: Nombre de la columna cuyos valores unicos se convierten en columnas.
|
|
values: Nombre de la columna cuyos valores se agregan.
|
|
agg: Funcion de agregacion: sum, count, mean, min, max, first, last.
|
|
|
|
Returns:
|
|
Lista de dicts con una fila por valor unico de index y una columna
|
|
por cada valor unico de columns. Valores numericos faltantes rellenados
|
|
con 0, valores no numericos con None.
|
|
"""
|
|
# Recopilar valores unicos de columns (orden de aparicion)
|
|
col_values: list = []
|
|
seen_cols: set = set()
|
|
index_order: list = []
|
|
seen_index: set = set()
|
|
|
|
for row in rows:
|
|
idx = row.get(index)
|
|
col = row.get(columns)
|
|
if idx not in seen_index:
|
|
seen_index.add(idx)
|
|
index_order.append(idx)
|
|
if col not in seen_cols:
|
|
seen_cols.add(col)
|
|
col_values.append(col)
|
|
|
|
# Acumular: groups[index_val][col_val] = lista de values
|
|
groups: dict[any, dict[any, list]] = defaultdict(lambda: defaultdict(list))
|
|
for row in rows:
|
|
idx = row.get(index)
|
|
col = row.get(columns)
|
|
val = row.get(values)
|
|
if val is not None:
|
|
groups[idx][col].append(val)
|
|
|
|
# Determinar si los valores son numericos (para relleno de 0)
|
|
sample_vals = [v for g in groups.values() for vs in g.values() for v in vs]
|
|
is_numeric = all(isinstance(v, (int, float)) for v in sample_vals) if sample_vals else True
|
|
|
|
def _aggregate(vals: list, func: str):
|
|
if not vals:
|
|
return 0 if is_numeric else None
|
|
if func == "sum":
|
|
return sum(vals)
|
|
if func == "count":
|
|
return len(vals)
|
|
if func == "mean":
|
|
return sum(vals) / len(vals)
|
|
if func == "min":
|
|
return min(vals)
|
|
if func == "max":
|
|
return max(vals)
|
|
if func == "first":
|
|
return vals[0]
|
|
if func == "last":
|
|
return vals[-1]
|
|
raise ValueError(f"Funcion de agregacion no soportada: {func}")
|
|
|
|
result = []
|
|
for idx in index_order:
|
|
record: dict = {index: idx}
|
|
for col in col_values:
|
|
vals = groups[idx][col]
|
|
if vals:
|
|
record[col] = _aggregate(vals, agg)
|
|
else:
|
|
record[col] = 0 if is_numeric else None
|
|
result.append(record)
|
|
|
|
return result
|