Files
fn_registry/python/functions/datascience/pivot.py
egutierrez 63a9cb5273 feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

90 lines
2.9 KiB
Python

"""Pivot table sin pandas para datos tabulares list[dict]."""
from collections import defaultdict
def pivot(
rows: list[dict],
index: str,
columns: str,
values: str,
agg: str = "sum",
) -> list[dict]:
"""Transforma datos del formato largo al formato ancho (pivot table).
Agrupa por `index`, expande los valores unicos de `columns` como nuevas
columnas y agrega la columna `values` con la funcion indicada.
Args:
rows: Lista de dicts con los datos en formato largo.
index: Nombre de la columna que actua como indice de filas.
columns: Nombre de la columna cuyos valores unicos se convierten en columnas.
values: Nombre de la columna cuyos valores se agregan.
agg: Funcion de agregacion: sum, count, mean, min, max, first, last.
Returns:
Lista de dicts con una fila por valor unico de index y una columna
por cada valor unico de columns. Valores numericos faltantes rellenados
con 0, valores no numericos con None.
"""
# Recopilar valores unicos de columns (orden de aparicion)
col_values: list = []
seen_cols: set = set()
index_order: list = []
seen_index: set = set()
for row in rows:
idx = row.get(index)
col = row.get(columns)
if idx not in seen_index:
seen_index.add(idx)
index_order.append(idx)
if col not in seen_cols:
seen_cols.add(col)
col_values.append(col)
# Acumular: groups[index_val][col_val] = lista de values
groups: dict[any, dict[any, list]] = defaultdict(lambda: defaultdict(list))
for row in rows:
idx = row.get(index)
col = row.get(columns)
val = row.get(values)
if val is not None:
groups[idx][col].append(val)
# Determinar si los valores son numericos (para relleno de 0)
sample_vals = [v for g in groups.values() for vs in g.values() for v in vs]
is_numeric = all(isinstance(v, (int, float)) for v in sample_vals) if sample_vals else True
def _aggregate(vals: list, func: str):
if not vals:
return 0 if is_numeric else None
if func == "sum":
return sum(vals)
if func == "count":
return len(vals)
if func == "mean":
return sum(vals) / len(vals)
if func == "min":
return min(vals)
if func == "max":
return max(vals)
if func == "first":
return vals[0]
if func == "last":
return vals[-1]
raise ValueError(f"Funcion de agregacion no soportada: {func}")
result = []
for idx in index_order:
record: dict = {index: idx}
for col in col_values:
vals = groups[idx][col]
if vals:
record[col] = _aggregate(vals, agg)
else:
record[col] = 0 if is_numeric else None
result.append(record)
return result