9a28d08e38
Añade un conjunto amplio de funciones al paquete python/functions/metabase: - Nuevos modulos: collections.py, documents.py, maintenance.py, permissions.py, validation.py (+ test). - Ampliacion de cards.py, dashboards.py, client.py e __init__.py para exponer las nuevas operaciones. - Funciones de documentos (create/get/update/delete/archive/copy/move + comentarios), grupos y memberships, permission/collection graphs, copy/move de cards y dashboards, validacion de MBQL/SQL y payloads, actualizacion segura de dashboards y fix_null_ratio. - .md por funcion con frontmatter para que fn index los registre. - Actualiza pyproject.toml y uv.lock con las dependencias resultantes. Impacto: ampliamente mas cobertura de la API de Metabase desde el registry, reutilizable por apps y analisis. No toca Go ni frontend.
420 lines
15 KiB
Python
420 lines
15 KiB
Python
"""Mantenimiento y reparacion de cards MBQL de Metabase."""
|
|
|
|
import copy
|
|
import time
|
|
import uuid
|
|
|
|
from .client import MetabaseClient
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers internos compartidos
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _new_uuid() -> str:
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
def _field_name_of(node) -> tuple[str | None, str | None]:
|
|
"""Extrae (name, kind) de un node ['field'|'expression', meta, 'name']."""
|
|
if isinstance(node, list) and len(node) >= 3 and node[0] in ("field", "expression"):
|
|
nm = node[-1]
|
|
if isinstance(nm, str):
|
|
return nm, node[0]
|
|
return None, None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# metabase_fix_null_ratio
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _analyze_stage_null_ratio(stage: dict) -> tuple[dict, dict]:
|
|
"""Detecta slots vulnerables al patron SUM(a-b)/SUM(b) en una stage MBQL.
|
|
|
|
Devuelve:
|
|
vulnerable: {slot_diff: (slot_a, slot_b, expr_name)}
|
|
subtractions: {expr_name: (op_a_name, op_b_name)}
|
|
"""
|
|
subtractions = {}
|
|
for e in stage.get("expressions", []) or []:
|
|
if not (isinstance(e, list) and len(e) == 4 and e[0] == "-"):
|
|
continue
|
|
meta = e[1] if isinstance(e[1], dict) else {}
|
|
name = meta.get("lib/expression-name")
|
|
if not name:
|
|
continue
|
|
a_name, a_kind = _field_name_of(e[2])
|
|
b_name, b_kind = _field_name_of(e[3])
|
|
if a_name and b_name and a_kind == "field" and b_kind == "field":
|
|
subtractions[name] = (a_name, b_name)
|
|
|
|
aggs = stage.get("aggregation", []) or []
|
|
func_counts: dict[str, int] = {}
|
|
sum_field_to_slot: dict[str, str] = {}
|
|
sum_expr_to_slot: dict[str, str] = {}
|
|
for agg in aggs:
|
|
if not isinstance(agg, list) or not agg:
|
|
continue
|
|
func = agg[0]
|
|
func_counts[func] = func_counts.get(func, 0) + 1
|
|
slot = func if func_counts[func] == 1 else f"{func}_{func_counts[func]}"
|
|
if func == "sum" and len(agg) >= 3:
|
|
operand = agg[2]
|
|
nm, kind = _field_name_of(operand)
|
|
if kind == "field" and nm:
|
|
sum_field_to_slot[nm] = slot
|
|
elif kind == "expression" and nm:
|
|
sum_expr_to_slot[nm] = slot
|
|
|
|
vulnerable = {}
|
|
for expr_name, slot_diff in sum_expr_to_slot.items():
|
|
if expr_name not in subtractions:
|
|
continue
|
|
op_a, op_b = subtractions[expr_name]
|
|
slot_a = sum_field_to_slot.get(op_a)
|
|
slot_b = sum_field_to_slot.get(op_b)
|
|
if slot_a and slot_b:
|
|
vulnerable[slot_diff] = (slot_a, slot_b, expr_name)
|
|
return vulnerable, subtractions
|
|
|
|
|
|
def _rewrite_field_refs(node, slot_map: dict):
|
|
"""Reemplaza recursivamente [field, meta, slot] donde slot in slot_map
|
|
por [-, new_meta, [field, ..., slot_a], [field, ..., slot_b]]."""
|
|
if isinstance(node, list):
|
|
if (
|
|
len(node) >= 3
|
|
and node[0] == "field"
|
|
and isinstance(node[-1], str)
|
|
and node[-1] in slot_map
|
|
):
|
|
slot_a, slot_b, _ = slot_map[node[-1]]
|
|
base_type = (
|
|
node[1].get("base-type", "type/Decimal")
|
|
if isinstance(node[1], dict)
|
|
else "type/Decimal"
|
|
)
|
|
return [
|
|
"-",
|
|
{"lib/uuid": _new_uuid()},
|
|
["field", {"base-type": base_type, "lib/uuid": _new_uuid()}, slot_a],
|
|
["field", {"base-type": base_type, "lib/uuid": _new_uuid()}, slot_b],
|
|
]
|
|
return [_rewrite_field_refs(x, slot_map) for x in node]
|
|
return node
|
|
|
|
|
|
def _fix_card_query(dq: dict) -> list[str]:
|
|
"""Aplica el fix in-place al dataset_query. Devuelve lista de cambios."""
|
|
stages = dq.get("stages", [])
|
|
changes = []
|
|
for si, stage in enumerate(stages):
|
|
vulnerable, subtractions = _analyze_stage_null_ratio(stage)
|
|
if not vulnerable:
|
|
continue
|
|
|
|
preagg_names = set(subtractions.keys())
|
|
new_exprs = []
|
|
for e in stage.get("expressions", []) or []:
|
|
if (
|
|
isinstance(e, list)
|
|
and len(e) == 4
|
|
and e[0] == "-"
|
|
and isinstance(e[1], dict)
|
|
and e[1].get("lib/expression-name") in preagg_names
|
|
):
|
|
new_exprs.append(e)
|
|
else:
|
|
new_exprs.append(_rewrite_field_refs(e, vulnerable))
|
|
stage["expressions"] = new_exprs
|
|
|
|
for sj in range(si + 1, len(stages)):
|
|
for key in ("expressions", "aggregation", "breakout", "filters", "order-by", "fields"):
|
|
if key in stages[sj]:
|
|
stages[sj][key] = [
|
|
_rewrite_field_refs(x, vulnerable) for x in stages[sj][key]
|
|
]
|
|
|
|
for slot, (sa, sb, ename) in vulnerable.items():
|
|
changes.append(f"stage[{si}] {slot}=sum({ename!r}) -> ({sa} - {sb})")
|
|
return changes
|
|
|
|
|
|
def metabase_fix_null_ratio(
|
|
client: MetabaseClient,
|
|
*,
|
|
dry_run: bool = True,
|
|
card_ids: list[int] | None = None,
|
|
) -> dict:
|
|
"""Detecta y repara el patron SUM(a-b)/SUM(b) en cards MBQL de Metabase.
|
|
|
|
El patron vulnerable ocurre cuando una agregacion computa SUM(expr_resta)
|
|
donde expr_resta es una resta pre-agg de dos campos. Si alguna fila tiene
|
|
NULL, SUM(A-B) != SUM(A) - SUM(B). El fix reescribe las referencias
|
|
post-agg al slot diferencia para usar (SUM(A) - SUM(B)) en su lugar.
|
|
|
|
Solo procesa cards MBQL activas (type='query', no archivadas). Las cards
|
|
SQL nativas o modelos se omiten silenciosamente.
|
|
|
|
Args:
|
|
client: Cliente Metabase autenticado.
|
|
dry_run: Si True (default), escanea y reporta sin modificar nada.
|
|
Si False, aplica el fix via PUT /api/card/:id.
|
|
card_ids: Lista de IDs a procesar. None = todas las cards MBQL activas.
|
|
|
|
Returns:
|
|
dict con campos:
|
|
scanned (int): cards MBQL evaluadas.
|
|
affected (int): cards donde se detecto el patron vulnerable.
|
|
fixed (int): cards efectivamente actualizadas (0 si dry_run=True).
|
|
errors (list[dict]): lista de {card_id, error} para fallos en PUT.
|
|
|
|
Example:
|
|
>>> from metabase import MetabaseClient, metabase_fix_null_ratio
|
|
>>> c = MetabaseClient("https://metabase.example.com", "mb_apikey")
|
|
>>> report = metabase_fix_null_ratio(c, dry_run=True)
|
|
>>> print(report)
|
|
{'scanned': 312, 'affected': 4, 'fixed': 0, 'errors': []}
|
|
>>> # Para aplicar:
|
|
>>> report = metabase_fix_null_ratio(c, dry_run=False)
|
|
"""
|
|
all_cards = client.request("GET", "/api/card")
|
|
|
|
if card_ids is not None:
|
|
card_id_set = set(card_ids)
|
|
all_cards = [c for c in all_cards if c.get("id") in card_id_set]
|
|
|
|
mbql_cards = [
|
|
c for c in all_cards
|
|
if not c.get("archived", False)
|
|
and isinstance(c.get("dataset_query"), dict)
|
|
and c["dataset_query"].get("type") == "query"
|
|
and isinstance(c["dataset_query"].get("query", {}).get("stages") if "query" in c["dataset_query"] else c["dataset_query"].get("stages"), list)
|
|
]
|
|
|
|
# Metabase MBQL puede tener stages en dataset_query.query.stages (legacy)
|
|
# o en dataset_query.stages (v2). Normalizar:
|
|
def _get_stages(dq: dict) -> list | None:
|
|
if isinstance(dq.get("stages"), list):
|
|
return dq["stages"]
|
|
q = dq.get("query", {})
|
|
if isinstance(q.get("stages"), list):
|
|
return q["stages"]
|
|
return None
|
|
|
|
affected_cards = []
|
|
scanned = 0
|
|
for card in all_cards:
|
|
if card_ids is not None and card.get("id") not in set(card_ids):
|
|
continue
|
|
if card.get("archived", False):
|
|
continue
|
|
dq = card.get("dataset_query")
|
|
if not isinstance(dq, dict):
|
|
continue
|
|
stages = _get_stages(dq)
|
|
if stages is None:
|
|
continue
|
|
scanned += 1
|
|
dq_copy = copy.deepcopy(dq)
|
|
# Operar sobre el stages del objeto correcto
|
|
target = dq_copy if isinstance(dq_copy.get("stages"), list) else dq_copy.get("query", {})
|
|
changes = _fix_card_query(target)
|
|
if changes:
|
|
affected_cards.append((card, dq_copy, changes))
|
|
|
|
fixed = 0
|
|
errors: list[dict] = []
|
|
|
|
if not dry_run:
|
|
for card, dq_fixed, _changes in affected_cards:
|
|
try:
|
|
client.request("PUT", f"/api/card/{card['id']}", json={"dataset_query": dq_fixed})
|
|
fixed += 1
|
|
except Exception as exc:
|
|
errors.append({"card_id": card["id"], "error": str(exc)[:200]})
|
|
time.sleep(0.05)
|
|
|
|
return {
|
|
"scanned": scanned,
|
|
"affected": len(affected_cards),
|
|
"fixed": fixed,
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# metabase_pair_n_n1_columns
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _slot_for_sum_field(stage: dict, target_field_name: str) -> str | None:
|
|
"""Devuelve el slot MBQL (ej: 'sum', 'sum_4') de sum(target_field) en la stage."""
|
|
aggs = stage.get("aggregation", []) or []
|
|
func_counts: dict[str, int] = {}
|
|
for agg in aggs:
|
|
if not isinstance(agg, list) or not agg:
|
|
continue
|
|
func = agg[0]
|
|
func_counts[func] = func_counts.get(func, 0) + 1
|
|
slot = func if func_counts[func] == 1 else f"{func}_{func_counts[func]}"
|
|
if func == "sum" and len(agg) >= 3:
|
|
nm, kind = _field_name_of(agg[2])
|
|
if kind == "field" and nm == target_field_name:
|
|
return slot
|
|
return None
|
|
|
|
|
|
def _find_paired_slots(dq: dict, base_name: str) -> tuple[str | None, str | None]:
|
|
"""Busca (slot_base, slot_n1) para sum(base_name) y sum(base_name_1) en el MBQL."""
|
|
for stage in (dq.get("stages") or []):
|
|
if not stage.get("aggregation"):
|
|
continue
|
|
slot_n = _slot_for_sum_field(stage, base_name)
|
|
slot_n1 = _slot_for_sum_field(stage, f"{base_name}_1")
|
|
if slot_n and slot_n1:
|
|
return slot_n, slot_n1
|
|
return None, None
|
|
|
|
|
|
def _reorder_table_columns(
|
|
cols: list[dict],
|
|
slot_n: str,
|
|
slot_n1: str,
|
|
) -> tuple[list[dict], bool, str]:
|
|
"""Habilita slot_n1 y lo reubica inmediatamente despues de slot_n.
|
|
|
|
Returns:
|
|
(new_cols, changed, reason)
|
|
"""
|
|
cols = [dict(c) for c in cols]
|
|
idx_n = next((i for i, c in enumerate(cols) if c.get("name") == slot_n), -1)
|
|
if idx_n < 0:
|
|
return cols, False, "slot_n no presente en table.columns"
|
|
|
|
idx_n1 = next((i for i, c in enumerate(cols) if c.get("name") == slot_n1), -1)
|
|
|
|
# Ya en posicion correcta y habilitada: no hay cambio
|
|
if idx_n1 == idx_n + 1 and cols[idx_n1].get("enabled") is True:
|
|
return cols, False, "ya en la posicion correcta y habilitado"
|
|
|
|
if idx_n1 < 0:
|
|
entry: dict = {"name": slot_n1, "enabled": True}
|
|
else:
|
|
entry = cols.pop(idx_n1)
|
|
entry["enabled"] = True
|
|
if idx_n1 < idx_n:
|
|
idx_n -= 1
|
|
|
|
insert_at = idx_n + 1
|
|
cols.insert(insert_at, entry)
|
|
return cols, True, "reubicado y habilitado"
|
|
|
|
|
|
def metabase_pair_n_n1_columns(
|
|
client: MetabaseClient,
|
|
*,
|
|
dry_run: bool = True,
|
|
card_ids: list[int] | None = None,
|
|
base_field: str = "Valor_vendido",
|
|
) -> dict:
|
|
"""Habilita y posiciona la columna _1 junto a su par en cards tabla/pivot de Metabase.
|
|
|
|
Para cards con display 'table' o 'pivot' que contienen agregaciones
|
|
SUM(base_field) y SUM(base_field_1), busca la columna base_field_1 en
|
|
visualization_settings.table.columns, la habilita (enabled=True) y la
|
|
reubica inmediatamente despues de base_field para comparacion visual.
|
|
|
|
Solo procesa cards con display 'table' o 'pivot' que tengan ambos slots
|
|
y tengan table.columns definido en visualization_settings.
|
|
|
|
Args:
|
|
client: Cliente Metabase autenticado.
|
|
dry_run: Si True (default), escanea y reporta sin modificar nada.
|
|
Si False, aplica el cambio via PUT /api/card/:id.
|
|
card_ids: Lista de IDs a procesar. None = todas las cards activas.
|
|
base_field: Nombre del campo base MBQL (sin sufijo _1). Por defecto
|
|
'Valor_vendido'. La funcion buscara sum(base_field) y
|
|
sum(base_field_1) en las agregaciones.
|
|
|
|
Returns:
|
|
dict con campos:
|
|
scanned (int): cards con display table/pivot evaluadas.
|
|
affected (int): cards donde se encontro el par y habia que mover.
|
|
fixed (int): cards efectivamente actualizadas (0 si dry_run=True).
|
|
skipped (int): cards ya correctas o sin table.columns.
|
|
errors (list[dict]): lista de {card_id, error} para fallos en PUT.
|
|
|
|
Example:
|
|
>>> from metabase import MetabaseClient, metabase_pair_n_n1_columns
|
|
>>> c = MetabaseClient("https://metabase.example.com", "mb_apikey")
|
|
>>> report = metabase_pair_n_n1_columns(c, dry_run=True)
|
|
>>> print(report)
|
|
{'scanned': 45, 'affected': 3, 'fixed': 0, 'skipped': 42, 'errors': []}
|
|
>>> # Con campo personalizado:
|
|
>>> report = metabase_pair_n_n1_columns(c, dry_run=False, base_field="Importe")
|
|
"""
|
|
all_cards = client.request("GET", "/api/card")
|
|
|
|
tabular_displays = {"table", "pivot"}
|
|
scanned = 0
|
|
skipped = 0
|
|
to_update: list[tuple[dict, list[dict], str, str]] = []
|
|
|
|
for card in all_cards:
|
|
if card_ids is not None and card.get("id") not in set(card_ids):
|
|
continue
|
|
if card.get("archived", False):
|
|
continue
|
|
if card.get("display") not in tabular_displays:
|
|
continue
|
|
dq = card.get("dataset_query")
|
|
if not isinstance(dq, dict):
|
|
continue
|
|
|
|
slot_n, slot_n1 = _find_paired_slots(dq, base_field)
|
|
if not (slot_n and slot_n1):
|
|
continue
|
|
|
|
scanned += 1
|
|
vs = card.get("visualization_settings") or {}
|
|
cols = vs.get("table.columns")
|
|
if not isinstance(cols, list):
|
|
skipped += 1
|
|
continue
|
|
|
|
new_cols, changed, _reason = _reorder_table_columns(cols, slot_n, slot_n1)
|
|
if not changed:
|
|
skipped += 1
|
|
continue
|
|
|
|
to_update.append((card, new_cols, slot_n, slot_n1))
|
|
|
|
fixed = 0
|
|
errors: list[dict] = []
|
|
|
|
if not dry_run:
|
|
for card, new_cols, _slot_n, _slot_n1 in to_update:
|
|
new_vs = copy.deepcopy(card.get("visualization_settings") or {})
|
|
new_vs["table.columns"] = new_cols
|
|
try:
|
|
client.request(
|
|
"PUT",
|
|
f"/api/card/{card['id']}",
|
|
json={"visualization_settings": new_vs},
|
|
)
|
|
fixed += 1
|
|
except Exception as exc:
|
|
errors.append({"card_id": card["id"], "error": str(exc)[:200]})
|
|
time.sleep(0.05)
|
|
|
|
return {
|
|
"scanned": scanned,
|
|
"affected": len(to_update),
|
|
"fixed": fixed,
|
|
"skipped": skipped,
|
|
"errors": errors,
|
|
}
|