"""Validacion estatica de dataset_query MBQL antes de enviarlo a Metabase.""" from __future__ import annotations import collections import re from typing import Any def metabase_mbql_validate(dataset_query: dict) -> list[str]: """Valida la estructura de un dataset_query MBQL sin hacer I/O. Detecta los errores mas comunes que causan respuestas 400/500 de la API de Metabase, permitiendo corregirlos antes del round-trip. Checks realizados: 1. UUIDs duplicados: cualquier ``lib/uuid`` que aparezca mas de una vez en el arbol MBQL. Metabase los requiere unicos globalmente por query. 2. Stage mixing: stages que tienen tanto ``aggregation`` como ``expressions`` donde las expressions referencian slot names (``sum``, ``sum_N``, etc.) generados por aggregations. Esas expressions deben ir en la stage siguiente. 3. Slot refs rotos: expressions que referencian ``sum_X`` deben tener X menor que la cantidad de sums en la stage previa (o misma). 4. Case structure: nodos ``["case", meta, cases]`` deben tener ``cases`` como lista de pares ``[cond, result]``. 5. Name collision: dos expressions con el mismo ``lib/expression-name`` en la misma stage. Args: dataset_query: Dict con la estructura completa del dataset_query MBQL tal como lo devuelve GET /api/card/:id o lo construye el caller. Debe tener clave ``stages`` (lista de stage dicts). Si no tiene ``stages``, se devuelve error de estructura. Returns: Lista de strings describiendo errores encontrados. Lista vacia si el dataset_query es valido segun todos los checks. Example: >>> errors = metabase_mbql_validate(card["dataset_query"]) >>> if errors: ... for e in errors: ... print(e) ... else: ... print("Query valida") """ errors: list[str] = [] stages = dataset_query.get("stages") if not isinstance(stages, list): errors.append("dataset_query.stages ausente o no es lista") return errors # ---- Check 1: UUIDs duplicados ------------------------------------------ uuid_locations: list[tuple[str, str]] = [] _collect_uuids(dataset_query, root="", out=uuid_locations) uuid_counter: dict[str, list[str]] = collections.defaultdict(list) for uid, path in uuid_locations: uuid_counter[uid].append(path) for uid, paths in uuid_counter.items(): if len(paths) > 1: errors.append( f"Duplicate lib/uuid '{uid}' aparece {len(paths)} veces: " + ", ".join(paths[:3]) + ("..." if len(paths) > 3 else "") ) # ---- Checks por stage --------------------------------------------------- for si, stage in enumerate(stages): if not isinstance(stage, dict): continue tag = f"stage[{si}]" expressions: list[Any] = stage.get("expressions") or [] aggregations: list[Any] = stage.get("aggregation") or [] # Check 5: name collision en expressions expr_names: list[str] = [] for expr in expressions: name = _expr_name(expr) if name: if name in expr_names: errors.append( f"{tag} tiene dos expressions con mismo " f"lib/expression-name '{name}'" ) else: expr_names.append(name) # Check 2: stage mixing if aggregations and expressions: for expr in expressions: slot_refs = _find_slot_refs(expr) if slot_refs: ename = _expr_name(expr) or "?" errors.append( f"{tag} mezcla aggregations con expressions " f"post-agg que referencian slot names " f"({', '.join(repr(s) for s in slot_refs)}) " f"en expression '{ename}'. " f"Mover esas expressions a la stage siguiente." ) # Check 3: slot refs rotos # Contar sums en aggregations de esta stage sum_count = sum(1 for agg in aggregations if _agg_is_sum(agg)) for expr in expressions: for slot in _find_slot_refs(expr): m = re.match(r"sum(?:_(\d+))?$", slot, re.IGNORECASE) if m: idx = int(m.group(1)) if m.group(1) else 0 if idx >= sum_count: ename = _expr_name(expr) or "?" errors.append( f"{tag} expression '{ename}' referencia " f"'{slot}' que no existe " f"(solo hay {sum_count} sum(s) en aggregation)" ) # Check 4: case structure for expr in expressions: _check_case_structure(expr, tag, errors) return errors # --------------------------------------------------------------------------- # Helpers privados # --------------------------------------------------------------------------- def _collect_uuids( obj: Any, root: str, out: list[tuple[str, str]], ) -> None: """Recorre obj recursivamente y aƱade (uuid, path) a out.""" if isinstance(obj, dict): if "lib/uuid" in obj: out.append((obj["lib/uuid"], root)) for k, v in obj.items(): _collect_uuids(v, f"{root}.{k}" if root else k, out) elif isinstance(obj, list): for i, item in enumerate(obj): _collect_uuids(item, f"{root}[{i}]", out) def _expr_name(expr: Any) -> str | None: """Extrae lib/expression-name del segundo elemento de un nodo MBQL.""" if isinstance(expr, list) and len(expr) >= 2 and isinstance(expr[1], dict): return expr[1].get("lib/expression-name") return None # Patron de slot name: word chars, puede terminar en _N _SLOT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*(?:_\d+)?$") # Slots que corresponden a aggregation functions conocidas _AGG_SLOTS = { "sum", "avg", "count", "min", "max", "distinct", "cum-sum", "cum-count", "share", "stddev", } def _find_slot_refs(obj: Any) -> list[str]: """Devuelve lista de slot names encontrados en refs tipo ["field", meta, slot].""" slots: list[str] = [] _collect_slot_refs(obj, slots) return slots def _collect_slot_refs(obj: Any, out: list[str]) -> None: if isinstance(obj, list): if ( len(obj) == 3 and obj[0] == "field" and isinstance(obj[1], dict) and isinstance(obj[2], str) and not obj[1].get("base-type") # field sin base-type = slot ref and _is_slot_name(obj[2]) ): out.append(obj[2]) else: for item in obj: _collect_slot_refs(item, out) elif isinstance(obj, dict): for v in obj.values(): _collect_slot_refs(v, out) def _is_slot_name(s: str) -> bool: """Devuelve True si s parece un slot name de aggregation.""" # Slot: nombre sin espacio que es una funcion de agg o variant con sufijo _N base = re.sub(r"_\d+$", "", s) return base in _AGG_SLOTS def _agg_is_sum(agg: Any) -> bool: """Retorna True si el nodo aggregation es de tipo sum.""" if isinstance(agg, list) and len(agg) >= 1: return str(agg[0]).lower() == "sum" return False def _check_case_structure(expr: Any, tag: str, errors: list[str]) -> None: """Valida recursivamente nodos case dentro de una expression.""" if not isinstance(expr, list): return if expr and expr[0] == "case": ename = _expr_name(expr) or "?" # Esperado: ["case", meta, [[cond, result], ...]] if len(expr) < 3: errors.append( f"{tag} expression '{ename}': case con menos de 3 elementos" ) return cases = expr[2] if not isinstance(cases, list): errors.append( f"{tag} expression '{ename}': tercer elemento de case " f"debe ser lista de pares, got {type(cases).__name__}" ) return for i, pair in enumerate(cases): if not (isinstance(pair, list) and len(pair) == 2): errors.append( f"{tag} expression '{ename}': case[{i}] no es par " f"[cond, result], got {pair!r}" ) # Recursar en ramas for pair in cases: if isinstance(pair, list): for node in pair: _check_case_structure(node, tag, errors) else: for item in expr: _check_case_structure(item, tag, errors)