4300f1242d
Añade un conjunto amplio de funciones al paquete python/functions/metabase: - Nuevos modulos: collections.py, documents.py, maintenance.py, permissions.py, validation.py (+ test). - Ampliacion de cards.py, dashboards.py, client.py e __init__.py para exponer las nuevas operaciones. - Funciones de documentos (create/get/update/delete/archive/copy/move + comentarios), grupos y memberships, permission/collection graphs, copy/move de cards y dashboards, validacion de MBQL/SQL y payloads, actualizacion segura de dashboards y fix_null_ratio. - .md por funcion con frontmatter para que fn index los registre. - Actualiza pyproject.toml y uv.lock con las dependencias resultantes. Impacto: ampliamente mas cobertura de la API de Metabase desde el registry, reutilizable por apps y analisis. No toca Go ni frontend.
237 lines
8.6 KiB
Python
237 lines
8.6 KiB
Python
"""Validacion estatica de dataset_query MBQL antes de enviarlo a Metabase."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
def metabase_mbql_validate(dataset_query: dict) -> list[str]:
|
|
"""Valida la estructura de un dataset_query MBQL sin hacer I/O.
|
|
|
|
Detecta los errores mas comunes que causan respuestas 400/500 de la API de
|
|
Metabase, permitiendo corregirlos antes del round-trip.
|
|
|
|
Checks realizados:
|
|
1. UUIDs duplicados: cualquier ``lib/uuid`` que aparezca mas de una vez en
|
|
el arbol MBQL. Metabase los requiere unicos globalmente por query.
|
|
2. Stage mixing: stages que tienen tanto ``aggregation`` como ``expressions``
|
|
donde las expressions referencian slot names (``sum``, ``sum_N``, etc.)
|
|
generados por aggregations. Esas expressions deben ir en la stage siguiente.
|
|
3. Slot refs rotos: expressions que referencian ``sum_X`` deben tener X menor
|
|
que la cantidad de sums en la stage previa (o misma).
|
|
4. Case structure: nodos ``["case", meta, cases]`` deben tener ``cases``
|
|
como lista de pares ``[cond, result]``.
|
|
5. Name collision: dos expressions con el mismo ``lib/expression-name`` en
|
|
la misma stage.
|
|
|
|
Args:
|
|
dataset_query: Dict con la estructura completa del dataset_query MBQL
|
|
tal como lo devuelve GET /api/card/:id o lo construye el caller.
|
|
Debe tener clave ``stages`` (lista de stage dicts). Si no tiene
|
|
``stages``, se devuelve error de estructura.
|
|
|
|
Returns:
|
|
Lista de strings describiendo errores encontrados. Lista vacia si el
|
|
dataset_query es valido segun todos los checks.
|
|
|
|
Example:
|
|
>>> errors = metabase_mbql_validate(card["dataset_query"])
|
|
>>> if errors:
|
|
... for e in errors:
|
|
... print(e)
|
|
... else:
|
|
... print("Query valida")
|
|
"""
|
|
errors: list[str] = []
|
|
|
|
stages = dataset_query.get("stages")
|
|
if not isinstance(stages, list):
|
|
errors.append("dataset_query.stages ausente o no es lista")
|
|
return errors
|
|
|
|
# ---- Check 1: UUIDs duplicados ------------------------------------------
|
|
uuid_locations: list[tuple[str, str]] = []
|
|
_collect_uuids(dataset_query, root="", out=uuid_locations)
|
|
uuid_counter: dict[str, list[str]] = collections.defaultdict(list)
|
|
for uid, path in uuid_locations:
|
|
uuid_counter[uid].append(path)
|
|
for uid, paths in uuid_counter.items():
|
|
if len(paths) > 1:
|
|
errors.append(
|
|
f"Duplicate lib/uuid '{uid}' aparece {len(paths)} veces: "
|
|
+ ", ".join(paths[:3])
|
|
+ ("..." if len(paths) > 3 else "")
|
|
)
|
|
|
|
# ---- Checks por stage ---------------------------------------------------
|
|
for si, stage in enumerate(stages):
|
|
if not isinstance(stage, dict):
|
|
continue
|
|
tag = f"stage[{si}]"
|
|
|
|
expressions: list[Any] = stage.get("expressions") or []
|
|
aggregations: list[Any] = stage.get("aggregation") or []
|
|
|
|
# Check 5: name collision en expressions
|
|
expr_names: list[str] = []
|
|
for expr in expressions:
|
|
name = _expr_name(expr)
|
|
if name:
|
|
if name in expr_names:
|
|
errors.append(
|
|
f"{tag} tiene dos expressions con mismo "
|
|
f"lib/expression-name '{name}'"
|
|
)
|
|
else:
|
|
expr_names.append(name)
|
|
|
|
# Check 2: stage mixing
|
|
if aggregations and expressions:
|
|
for expr in expressions:
|
|
slot_refs = _find_slot_refs(expr)
|
|
if slot_refs:
|
|
ename = _expr_name(expr) or "?"
|
|
errors.append(
|
|
f"{tag} mezcla aggregations con expressions "
|
|
f"post-agg que referencian slot names "
|
|
f"({', '.join(repr(s) for s in slot_refs)}) "
|
|
f"en expression '{ename}'. "
|
|
f"Mover esas expressions a la stage siguiente."
|
|
)
|
|
|
|
# Check 3: slot refs rotos
|
|
# Contar sums en aggregations de esta stage
|
|
sum_count = sum(1 for agg in aggregations if _agg_is_sum(agg))
|
|
for expr in expressions:
|
|
for slot in _find_slot_refs(expr):
|
|
m = re.match(r"sum(?:_(\d+))?$", slot, re.IGNORECASE)
|
|
if m:
|
|
idx = int(m.group(1)) if m.group(1) else 0
|
|
if idx >= sum_count:
|
|
ename = _expr_name(expr) or "?"
|
|
errors.append(
|
|
f"{tag} expression '{ename}' referencia "
|
|
f"'{slot}' que no existe "
|
|
f"(solo hay {sum_count} sum(s) en aggregation)"
|
|
)
|
|
|
|
# Check 4: case structure
|
|
for expr in expressions:
|
|
_check_case_structure(expr, tag, errors)
|
|
|
|
return errors
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers privados
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _collect_uuids(
|
|
obj: Any,
|
|
root: str,
|
|
out: list[tuple[str, str]],
|
|
) -> None:
|
|
"""Recorre obj recursivamente y añade (uuid, path) a out."""
|
|
if isinstance(obj, dict):
|
|
if "lib/uuid" in obj:
|
|
out.append((obj["lib/uuid"], root))
|
|
for k, v in obj.items():
|
|
_collect_uuids(v, f"{root}.{k}" if root else k, out)
|
|
elif isinstance(obj, list):
|
|
for i, item in enumerate(obj):
|
|
_collect_uuids(item, f"{root}[{i}]", out)
|
|
|
|
|
|
def _expr_name(expr: Any) -> str | None:
|
|
"""Extrae lib/expression-name del segundo elemento de un nodo MBQL."""
|
|
if isinstance(expr, list) and len(expr) >= 2 and isinstance(expr[1], dict):
|
|
return expr[1].get("lib/expression-name")
|
|
return None
|
|
|
|
|
|
# Patron de slot name: word chars, puede terminar en _N
|
|
_SLOT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*(?:_\d+)?$")
|
|
# Slots que corresponden a aggregation functions conocidas
|
|
_AGG_SLOTS = {
|
|
"sum", "avg", "count", "min", "max",
|
|
"distinct", "cum-sum", "cum-count", "share", "stddev",
|
|
}
|
|
|
|
|
|
def _find_slot_refs(obj: Any) -> list[str]:
|
|
"""Devuelve lista de slot names encontrados en refs tipo ["field", meta, slot]."""
|
|
slots: list[str] = []
|
|
_collect_slot_refs(obj, slots)
|
|
return slots
|
|
|
|
|
|
def _collect_slot_refs(obj: Any, out: list[str]) -> None:
|
|
if isinstance(obj, list):
|
|
if (
|
|
len(obj) == 3
|
|
and obj[0] == "field"
|
|
and isinstance(obj[1], dict)
|
|
and isinstance(obj[2], str)
|
|
and not obj[1].get("base-type") # field sin base-type = slot ref
|
|
and _is_slot_name(obj[2])
|
|
):
|
|
out.append(obj[2])
|
|
else:
|
|
for item in obj:
|
|
_collect_slot_refs(item, out)
|
|
elif isinstance(obj, dict):
|
|
for v in obj.values():
|
|
_collect_slot_refs(v, out)
|
|
|
|
|
|
def _is_slot_name(s: str) -> bool:
|
|
"""Devuelve True si s parece un slot name de aggregation."""
|
|
# Slot: nombre sin espacio que es una funcion de agg o variant con sufijo _N
|
|
base = re.sub(r"_\d+$", "", s)
|
|
return base in _AGG_SLOTS
|
|
|
|
|
|
def _agg_is_sum(agg: Any) -> bool:
|
|
"""Retorna True si el nodo aggregation es de tipo sum."""
|
|
if isinstance(agg, list) and len(agg) >= 1:
|
|
return str(agg[0]).lower() == "sum"
|
|
return False
|
|
|
|
|
|
def _check_case_structure(expr: Any, tag: str, errors: list[str]) -> None:
|
|
"""Valida recursivamente nodos case dentro de una expression."""
|
|
if not isinstance(expr, list):
|
|
return
|
|
if expr and expr[0] == "case":
|
|
ename = _expr_name(expr) or "?"
|
|
# Esperado: ["case", meta, [[cond, result], ...]]
|
|
if len(expr) < 3:
|
|
errors.append(
|
|
f"{tag} expression '{ename}': case con menos de 3 elementos"
|
|
)
|
|
return
|
|
cases = expr[2]
|
|
if not isinstance(cases, list):
|
|
errors.append(
|
|
f"{tag} expression '{ename}': tercer elemento de case "
|
|
f"debe ser lista de pares, got {type(cases).__name__}"
|
|
)
|
|
return
|
|
for i, pair in enumerate(cases):
|
|
if not (isinstance(pair, list) and len(pair) == 2):
|
|
errors.append(
|
|
f"{tag} expression '{ename}': case[{i}] no es par "
|
|
f"[cond, result], got {pair!r}"
|
|
)
|
|
# Recursar en ramas
|
|
for pair in cases:
|
|
if isinstance(pair, list):
|
|
for node in pair:
|
|
_check_case_structure(node, tag, errors)
|
|
else:
|
|
for item in expr:
|
|
_check_case_structure(item, tag, errors)
|