"""pivot_table_duckdb — pivot table (index x columns -> agg(value)) con push-down SQL. Funcion impura: lee de disco a traves de DuckDB reusando la primitiva read-only del grupo `duckdb` (`duckdb_query_readonly`). Pertenece al grupo de capacidad `eda` (exploratory data analysis). A diferencia de la funcion pura `pivot` (que pivota un `list[dict]` ya cargado en memoria), esta version empuja la agregacion al motor de DuckDB (push-down): el GROUP BY lo resuelve DuckDB y solo se traen los valores agregados, nunca las filas crudas. Esto la hace apta para tablas grandes. Ademas reduce el resultado a las `top_rows` filas y `top_cols` columnas con mas observaciones, de modo que la pivot quepa entera en un PDF movil / slide PPTX sin cortarse. Marca `truncated_rows`/`truncated_cols` cuando hubo recorte. Estilo dict-no-throw del grupo duckdb: nunca lanza; captura cualquier error y devuelve {status:'error', error:str}. """ from collections import defaultdict from infra import duckdb_query_readonly # Funciones de agregacion permitidas y su nombre en SQL DuckDB. # mean -> avg; el resto mapea directo. count se trata aparte (COUNT(*), sin value). _AGG_SQL = { "mean": "avg", "sum": "sum", "count": "count", "min": "min", "max": "max", "median": "median", } def _quote_ident(ident: str) -> str: """Cita un identificador SQL con dobles comillas, escapando `"` -> `""`. DuckDB no admite parametros posicionales para nombres de tabla/columna, asi que hay que interpolarlos. El quoting con `"` y el doblado de comillas internas evita que un nombre rompa la sentencia (mismo patron que correlation_matrix_duckdb). """ return '"' + str(ident).replace('"', '""') + '"' def pivot_table_duckdb( db_path: str, table: str, index: str, columns: str, value: str, agg: str = "mean", top_rows: int = 10, top_cols: int = 8, ) -> dict: """Pivot table push-down en DuckDB, recortada a top_rows x top_cols. Construye una pivot (filas = valores de `index`, columnas = valores de `columns`, celda = `agg(value)`) agregando en el motor de DuckDB, y la reduce a las filas y columnas con mas observaciones para que quepa en un PDF / slide. Args: db_path: ruta al archivo DuckDB. Debe existir (read_only NO crea la base). table: nombre de la tabla a pivotar. index: columna cuyos valores forman las filas de la pivot. columns: columna cuyos valores forman las columnas de la pivot. value: columna numerica a agregar. Ignorada cuando agg="count". agg: funcion de agregacion. Una de: "mean", "sum", "count", "min", "max", "median". mean se traduce a avg(); count a COUNT(*). top_rows: numero maximo de filas a conservar, elegidas por mayor numero de observaciones (suma de COUNT(*) por valor de index). Default 10. top_cols: numero maximo de columnas a conservar, elegidas por mayor numero de observaciones (suma de COUNT(*) por valor de columns). Default 8. Returns: dict. En exito: {status:'ok', index, columns, value, agg, row_labels:[...], # valores de index, en orden de freq desc col_labels:[...], # valores de columns, en orden de freq desc matrix:[[...], ...], # len == len(row_labels); cada fila # len == len(col_labels); celda = agg o None truncated_rows:bool, truncated_cols:bool, note:str} En error (sin lanzar): {status:'error', error:str}. """ try: if not isinstance(agg, str) or agg not in _AGG_SQL: return { "status": "error", "error": "invalid agg " + repr(agg) + "; allowed: " + ", ".join(sorted(_AGG_SQL)), } # Paso 1 (push-down): agregar (index, columns) -> agg(value) + COUNT(*). if agg == "count": agg_expr = "COUNT(*)" else: agg_expr = f"{_AGG_SQL[agg]}({_quote_ident(value)})" sql = ( f"SELECT {_quote_ident(index)} AS r, " f"{_quote_ident(columns)} AS c, " f"{agg_expr} AS v, " f"COUNT(*) AS n " f"FROM {_quote_ident(table)} " f"GROUP BY {_quote_ident(index)}, {_quote_ident(columns)}" ) # max_rows alto: queremos todos los grupos (index x columns) para elegir el # top con criterio global. sandbox=False igual que correlation_matrix_duckdb, # porque db_path es una ruta interna de confianza. result = duckdb_query_readonly( db_path, sql, max_rows=1_000_000, sandbox=False ) if result.get("status") != "ok": return { "status": "error", "error": "pivot query failed: " + str(result.get("error", "unknown")), } # Paso 2 (en python): contar observaciones por fila y por columna, y guardar # el valor agregado de cada celda (r, c). row_obs: dict = defaultdict(int) col_obs: dict = defaultdict(int) cell: dict = {} for row in result.get("rows", []): r = row.get("r") c = row.get("c") n = row.get("n") or 0 row_obs[r] += n col_obs[c] += n cell[(r, c)] = row.get("v") def _top(obs: dict, limit: int): # Orden: mas observaciones primero; desempate estable por etiqueta. ranked = sorted(obs.items(), key=lambda kv: (-kv[1], str(kv[0]))) selected = [label for label, _ in ranked[:limit]] return selected, len(ranked) > limit row_labels, truncated_rows = _top(row_obs, top_rows) col_labels, truncated_cols = _top(col_obs, top_cols) # Paso 3: materializar la matriz; None donde la combinacion no existe. matrix = [ [cell.get((r, c)) for c in col_labels] for r in row_labels ] note = ( f"pivot {agg}({value}) reducida a {len(row_labels)}x{len(col_labels)} " "(top por observaciones) para caber en PDF/slide" ) if agg == "count": note = ( f"pivot count(*) reducida a {len(row_labels)}x{len(col_labels)} " "(top por observaciones) para caber en PDF/slide" ) return { "status": "ok", "index": index, "columns": columns, "value": value, "agg": agg, "row_labels": row_labels, "col_labels": col_labels, "matrix": matrix, "truncated_rows": truncated_rows, "truncated_cols": truncated_cols, "note": note, } except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)}