96da9e3015
Cuatro funciones nuevas del grupo eda que nutren el capítulo AGREGACION: - select_groupby_keys (pure): elige categóricas agrupables + numéricas medida desde el TableProfile. - groupby_stats_duckdb (impure): GROUP BY push-down en DuckDB (count/mean/median/std/min/max por grupo). - pivot_table_duckdb (impure): pivot A×B push-down, limitado a top filas/cols para no cortar. - suggest_aggregations_llm (impure): el LLM elige las agregaciones interesantes con fallback determinista. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
185 lines
7.4 KiB
Python
185 lines
7.4 KiB
Python
"""groupby_stats_duckdb — agregaciones GROUP BY con push-down SQL en DuckDB.
|
|
|
|
Funcion impura: lee de disco a traves de DuckDB (via la primitiva read-only
|
|
`duckdb_query_readonly` del grupo `duckdb`). Pertenece al grupo de capacidad `eda`.
|
|
|
|
Ejecuta un `GROUP BY <group_by>` en el motor de DuckDB (split-apply-combine con
|
|
push-down) calculando, para cada columna numerica de `measures`, las agregaciones
|
|
pedidas (mean/median/std/min/max). Solo trae al cliente una fila por grupo, nunca
|
|
las filas crudas: apto para tablas grandes. Es el nucleo de un capitulo de
|
|
agregacion/OLAP de un EDA.
|
|
|
|
Estilo dict-no-throw del grupo duckdb: nunca lanza; captura cualquier error y
|
|
devuelve {status:'error', error:str}.
|
|
"""
|
|
|
|
from infra import duckdb_query_readonly
|
|
|
|
# Mapeo agg -> funcion agregada SQL de DuckDB. `count` se trata aparte: es
|
|
# COUNT(*) (tamanio del grupo), independiente de las measures.
|
|
_AGG_SQL = {
|
|
"mean": "avg",
|
|
"median": "median",
|
|
"std": "stddev_samp",
|
|
"min": "min",
|
|
"max": "max",
|
|
}
|
|
|
|
# Aggs por defecto cuando aggs=None. count primero (tamanio del grupo) + las
|
|
# cinco estadisticas por measure.
|
|
_DEFAULT_AGGS = ["count", "mean", "median", "std", "min", "max"]
|
|
|
|
|
|
def _quote_ident(ident: str) -> str:
|
|
"""Cita un identificador SQL con dobles comillas, escapando las internas.
|
|
|
|
Soporta nombres con espacios o caracteres especiales y evita inyeccion: dentro
|
|
de un identificador entrecomillado el unico caracter peligroso es la propia
|
|
comilla doble, que se duplica ("") segun el estandar SQL. DuckDB no admite
|
|
parametros posicionales para nombres de tabla/columna, asi que esta es la via
|
|
segura de interpolarlos.
|
|
"""
|
|
return '"' + str(ident).replace('"', '""') + '"'
|
|
|
|
|
|
def groupby_stats_duckdb(
|
|
db_path: str,
|
|
table: str,
|
|
group_by: str,
|
|
measures: list,
|
|
aggs: list = None,
|
|
top_n: int = 15,
|
|
) -> dict:
|
|
"""GROUP BY con agregaciones por measure, todo push-down en DuckDB.
|
|
|
|
Args:
|
|
db_path: ruta al archivo DuckDB. Debe existir; el modo read_only NO crea la
|
|
base. Un path inexistente devuelve {status:'error', ...} sin lanzar.
|
|
table: nombre de la tabla. Se interpola citado con dobles comillas (soporta
|
|
nombres con espacios).
|
|
group_by: columna por la que agrupar. Se interpola citada.
|
|
measures: lista de columnas numericas a agregar. Lista vacia es valida:
|
|
cada grupo trae solo su tamanio `n` y `stats` vacio.
|
|
aggs: lista de agregaciones a calcular. None (default) =
|
|
["count", "mean", "median", "std", "min", "max"]. Valores validos:
|
|
count (tamanio del grupo, va a `n`), mean, median, std, min, max
|
|
(estas cinco se calculan por cada measure). Un agg desconocido devuelve
|
|
error.
|
|
top_n: numero maximo de grupos a devolver, ordenados por tamanio de grupo
|
|
descendente (default 15). Se pide top_n+1 internamente para detectar si
|
|
habia mas grupos y marcar `truncated`.
|
|
|
|
Returns:
|
|
dict. En exito:
|
|
{status:'ok',
|
|
group_by:str,
|
|
measures:[...],
|
|
aggs:[...], # las efectivas (incluye count si se pidio)
|
|
n_groups:int, # nº de grupos devueltos (<= top_n)
|
|
truncated:bool, # True si habia mas de top_n grupos
|
|
groups:[{key:<valor grupo>, n:int,
|
|
stats:{<measure>:{mean,median,std,min,max}}}, ...],
|
|
note:str}
|
|
Las estadisticas son float o None (p.ej. stddev_samp de un grupo de una
|
|
sola fila -> NULL -> None). En error (sin lanzar): {status:'error', error:str}.
|
|
"""
|
|
try:
|
|
# 1. Validar entradas.
|
|
if not isinstance(table, str) or table == "":
|
|
return {"status": "error", "error": "table must be a non-empty string"}
|
|
if not isinstance(group_by, str) or group_by == "":
|
|
return {"status": "error", "error": "group_by must be a non-empty string"}
|
|
|
|
if measures is None:
|
|
measures = []
|
|
if not isinstance(measures, list):
|
|
return {"status": "error", "error": "measures must be a list"}
|
|
for m in measures:
|
|
if not isinstance(m, str) or m == "":
|
|
return {
|
|
"status": "error",
|
|
"error": f"invalid measure identifier: {m!r}",
|
|
}
|
|
|
|
if aggs is None:
|
|
aggs = list(_DEFAULT_AGGS)
|
|
if not isinstance(aggs, list) or len(aggs) == 0:
|
|
return {
|
|
"status": "error",
|
|
"error": "aggs must be a non-empty list or None",
|
|
}
|
|
for a in aggs:
|
|
if a != "count" and a not in _AGG_SQL:
|
|
return {
|
|
"status": "error",
|
|
"error": f"unknown agg {a!r}; valid: count, "
|
|
+ ", ".join(_AGG_SQL),
|
|
}
|
|
|
|
if not isinstance(top_n, int) or isinstance(top_n, bool) or top_n < 1:
|
|
return {"status": "error", "error": "top_n must be a positive int"}
|
|
|
|
# 2. Aggs por measure = todas menos count (count es el tamanio del grupo,
|
|
# se mapea siempre a la columna `n`).
|
|
measure_aggs = [a for a in aggs if a != "count"]
|
|
|
|
# 3. Construir el SELECT. grp y n primero; luego un termino por measure x agg
|
|
# con alias posicional (m{idx}_{agg}) para no chocar con nombres de columna
|
|
# que lleven espacios o caracteres raros.
|
|
select_terms = [f"{_quote_ident(group_by)} AS grp", "COUNT(*) AS n"]
|
|
agg_index = [] # (measure_name, agg_name, alias)
|
|
for mi, m in enumerate(measures):
|
|
for a in measure_aggs:
|
|
alias = f"m{mi}_{a}"
|
|
fn = _AGG_SQL[a]
|
|
select_terms.append(f"{fn}({_quote_ident(m)}) AS {alias}")
|
|
agg_index.append((m, a, alias))
|
|
|
|
# Pedimos top_n+1 grupos para detectar truncado (habia mas que top_n).
|
|
sql = (
|
|
f"SELECT {', '.join(select_terms)} "
|
|
f"FROM {_quote_ident(table)} "
|
|
f"GROUP BY {_quote_ident(group_by)} "
|
|
f"ORDER BY n DESC "
|
|
f"LIMIT {top_n + 1}"
|
|
)
|
|
|
|
# 4. Ejecutar push-down. sandbox=True (default) basta: la tabla ya existe en
|
|
# el .db, no necesitamos read_csv/read_blob ni acceso al filesystem.
|
|
result = duckdb_query_readonly(db_path, sql, max_rows=top_n + 1)
|
|
if result.get("status") != "ok":
|
|
return {
|
|
"status": "error",
|
|
"error": "groupby query failed: "
|
|
+ str(result.get("error", "unknown")),
|
|
}
|
|
|
|
rows = result.get("rows", [])
|
|
truncated = len(rows) > top_n
|
|
if truncated:
|
|
rows = rows[:top_n]
|
|
|
|
# 5. Reconstruir la estructura por grupo.
|
|
groups = []
|
|
for row in rows:
|
|
stats = {m: {} for m in measures}
|
|
for (m, a, alias) in agg_index:
|
|
stats[m][a] = row.get(alias)
|
|
groups.append(
|
|
{"key": row.get("grp"), "n": row.get("n"), "stats": stats}
|
|
)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"group_by": group_by,
|
|
"measures": list(measures),
|
|
"aggs": list(aggs),
|
|
"n_groups": len(groups),
|
|
"truncated": truncated,
|
|
"groups": groups,
|
|
"note": f"GROUP BY {group_by}: top {len(groups)} grupos por tamanio sobre "
|
|
f"{len(measures)} measure(s)",
|
|
}
|
|
except Exception as e: # noqa: BLE001
|
|
return {"status": "error", "error": str(e)}
|