"""groupby_stats_duckdb — agregaciones GROUP BY con push-down SQL en DuckDB. Funcion impura: lee de disco a traves de DuckDB (via la primitiva read-only `duckdb_query_readonly` del grupo `duckdb`). Pertenece al grupo de capacidad `eda`. Ejecuta un `GROUP BY ` en el motor de DuckDB (split-apply-combine con push-down) calculando, para cada columna numerica de `measures`, las agregaciones pedidas (mean/median/std/min/max). Solo trae al cliente una fila por grupo, nunca las filas crudas: apto para tablas grandes. Es el nucleo de un capitulo de agregacion/OLAP de un EDA. Estilo dict-no-throw del grupo duckdb: nunca lanza; captura cualquier error y devuelve {status:'error', error:str}. """ from infra import duckdb_query_readonly # Mapeo agg -> funcion agregada SQL de DuckDB. `count` se trata aparte: es # COUNT(*) (tamanio del grupo), independiente de las measures. _AGG_SQL = { "mean": "avg", "median": "median", "std": "stddev_samp", "min": "min", "max": "max", } # Aggs por defecto cuando aggs=None. count primero (tamanio del grupo) + las # cinco estadisticas por measure. _DEFAULT_AGGS = ["count", "mean", "median", "std", "min", "max"] def _quote_ident(ident: str) -> str: """Cita un identificador SQL con dobles comillas, escapando las internas. Soporta nombres con espacios o caracteres especiales y evita inyeccion: dentro de un identificador entrecomillado el unico caracter peligroso es la propia comilla doble, que se duplica ("") segun el estandar SQL. DuckDB no admite parametros posicionales para nombres de tabla/columna, asi que esta es la via segura de interpolarlos. """ return '"' + str(ident).replace('"', '""') + '"' def groupby_stats_duckdb( db_path: str, table: str, group_by: str, measures: list, aggs: list = None, top_n: int = 15, ) -> dict: """GROUP BY con agregaciones por measure, todo push-down en DuckDB. Args: db_path: ruta al archivo DuckDB. Debe existir; el modo read_only NO crea la base. Un path inexistente devuelve {status:'error', ...} sin lanzar. table: nombre de la tabla. Se interpola citado con dobles comillas (soporta nombres con espacios). group_by: columna por la que agrupar. Se interpola citada. measures: lista de columnas numericas a agregar. Lista vacia es valida: cada grupo trae solo su tamanio `n` y `stats` vacio. aggs: lista de agregaciones a calcular. None (default) = ["count", "mean", "median", "std", "min", "max"]. Valores validos: count (tamanio del grupo, va a `n`), mean, median, std, min, max (estas cinco se calculan por cada measure). Un agg desconocido devuelve error. top_n: numero maximo de grupos a devolver, ordenados por tamanio de grupo descendente (default 15). Se pide top_n+1 internamente para detectar si habia mas grupos y marcar `truncated`. Returns: dict. En exito: {status:'ok', group_by:str, measures:[...], aggs:[...], # las efectivas (incluye count si se pidio) n_groups:int, # nº de grupos devueltos (<= top_n) truncated:bool, # True si habia mas de top_n grupos groups:[{key:, n:int, stats:{:{mean,median,std,min,max}}}, ...], note:str} Las estadisticas son float o None (p.ej. stddev_samp de un grupo de una sola fila -> NULL -> None). En error (sin lanzar): {status:'error', error:str}. """ try: # 1. Validar entradas. if not isinstance(table, str) or table == "": return {"status": "error", "error": "table must be a non-empty string"} if not isinstance(group_by, str) or group_by == "": return {"status": "error", "error": "group_by must be a non-empty string"} if measures is None: measures = [] if not isinstance(measures, list): return {"status": "error", "error": "measures must be a list"} for m in measures: if not isinstance(m, str) or m == "": return { "status": "error", "error": f"invalid measure identifier: {m!r}", } if aggs is None: aggs = list(_DEFAULT_AGGS) if not isinstance(aggs, list) or len(aggs) == 0: return { "status": "error", "error": "aggs must be a non-empty list or None", } for a in aggs: if a != "count" and a not in _AGG_SQL: return { "status": "error", "error": f"unknown agg {a!r}; valid: count, " + ", ".join(_AGG_SQL), } if not isinstance(top_n, int) or isinstance(top_n, bool) or top_n < 1: return {"status": "error", "error": "top_n must be a positive int"} # 2. Aggs por measure = todas menos count (count es el tamanio del grupo, # se mapea siempre a la columna `n`). measure_aggs = [a for a in aggs if a != "count"] # 3. Construir el SELECT. grp y n primero; luego un termino por measure x agg # con alias posicional (m{idx}_{agg}) para no chocar con nombres de columna # que lleven espacios o caracteres raros. select_terms = [f"{_quote_ident(group_by)} AS grp", "COUNT(*) AS n"] agg_index = [] # (measure_name, agg_name, alias) for mi, m in enumerate(measures): for a in measure_aggs: alias = f"m{mi}_{a}" fn = _AGG_SQL[a] select_terms.append(f"{fn}({_quote_ident(m)}) AS {alias}") agg_index.append((m, a, alias)) # Pedimos top_n+1 grupos para detectar truncado (habia mas que top_n). sql = ( f"SELECT {', '.join(select_terms)} " f"FROM {_quote_ident(table)} " f"GROUP BY {_quote_ident(group_by)} " f"ORDER BY n DESC " f"LIMIT {top_n + 1}" ) # 4. Ejecutar push-down. sandbox=True (default) basta: la tabla ya existe en # el .db, no necesitamos read_csv/read_blob ni acceso al filesystem. result = duckdb_query_readonly(db_path, sql, max_rows=top_n + 1) if result.get("status") != "ok": return { "status": "error", "error": "groupby query failed: " + str(result.get("error", "unknown")), } rows = result.get("rows", []) truncated = len(rows) > top_n if truncated: rows = rows[:top_n] # 5. Reconstruir la estructura por grupo. groups = [] for row in rows: stats = {m: {} for m in measures} for (m, a, alias) in agg_index: stats[m][a] = row.get(alias) groups.append( {"key": row.get("grp"), "n": row.get("n"), "stats": stats} ) return { "status": "ok", "group_by": group_by, "measures": list(measures), "aggs": list(aggs), "n_groups": len(groups), "truncated": truncated, "groups": groups, "note": f"GROUP BY {group_by}: top {len(groups)} grupos por tamanio sobre " f"{len(measures)} measure(s)", } except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)}