feat(metabase): auto-commit con 17 cambios

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 18:40:22 +02:00
parent aec5d82011
commit d110aa40f9
17 changed files with 1946 additions and 2 deletions
+12
View File
@@ -12,8 +12,14 @@ from .permissions import metabase_list_groups, metabase_get_group, metabase_crea
from .setup import metabase_setup
from .maintenance import metabase_fix_null_ratio, metabase_pair_n_n1_columns
from .metabase_mbql_validate import metabase_mbql_validate
from .metabase_mbql_from_source_card import metabase_mbql_from_source_card
from .metabase_update_dashboard_safe import metabase_update_dashboard_safe
from .metabase_copy_dashcard_mappings import metabase_copy_dashcard_mappings
from .metabase_dashboard_next_row import metabase_dashboard_next_row
from .smartscalar import metabase_smartscalar_kpi_sql, metabase_smartscalar_dimension_tag, metabase_smartscalar_kpi_payload
from .metabase_viz_column_format import metabase_viz_column_format
from .metabase_smartscalar_anothercolumn_viz import metabase_smartscalar_anothercolumn_viz
from .metabase_dashboard_append_row import metabase_dashboard_append_row
__all__ = [
"MetabaseClient",
@@ -38,6 +44,12 @@ __all__ = [
"metabase_fix_null_ratio",
"metabase_pair_n_n1_columns",
"metabase_mbql_validate",
"metabase_mbql_from_source_card",
"metabase_update_dashboard_safe",
"metabase_copy_dashcard_mappings",
"metabase_dashboard_next_row",
"metabase_smartscalar_kpi_sql", "metabase_smartscalar_dimension_tag", "metabase_smartscalar_kpi_payload",
"metabase_viz_column_format",
"metabase_smartscalar_anothercolumn_viz",
"metabase_dashboard_append_row",
]
@@ -0,0 +1,70 @@
---
name: metabase_copy_dashcard_mappings
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def metabase_copy_dashcard_mappings(client: MetabaseClient, *, dashboard_id: int, source_card_id: int, dest_card_id: int) -> list[dict]"
description: "Copia los parameter_mappings del primer dashcard con source_card_id al card destino (dest_card_id), devolviendo una lista nueva de mappings sin mutar el original. Util para replicar filtros de dashboard a cards nuevas sin copiar manualmente cada mapping."
tags: [metabase, dashboard, parameter-mappings, dashcard, copy, api, python]
uses_functions:
- metabase_get_dashboard_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: client
desc: "MetabaseClient autenticado con sesion activa"
- name: dashboard_id
desc: "ID del dashboard que contiene los dashcards fuente y destino"
- name: source_card_id
desc: "card_id del dashcard del que se copian los parameter_mappings"
- name: dest_card_id
desc: "card_id que se asigna como card_id en cada mapping copiado"
output: "list[dict]: parameter_mappings adaptados al card destino, cada uno con {parameter_id, card_id: dest_card_id, target}. Lista vacia si el dashcard fuente no tiene mappings."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/metabase/metabase_copy_dashcard_mappings.py"
---
## Ejemplo
```python
from metabase import MetabaseClient, metabase_copy_dashcard_mappings
client = MetabaseClient("https://metabase.example.com", "token...")
# Copiar los 18 filtros de la card 42 a la card nueva 99
mappings = metabase_copy_dashcard_mappings(
client,
dashboard_id=10,
source_card_id=42,
dest_card_id=99,
)
# Usar los mappings al añadir el nuevo dashcard
new_dashcard = {
"id": -1,
"card_id": 99,
"col": 0, "row": 20, "size_x": 6, "size_y": 4,
"parameter_mappings": mappings,
"visualization_settings": {},
}
```
## Notas
- Localiza el **primer** dashcard con `card_id == source_card_id`. Si la misma card
aparece varias veces en el dashboard (e.g. en distintas tabs), solo se usan los
mappings del primero que encuentre.
- Lanza `ValueError` si `source_card_id` no aparece en el dashboard — falla rápido
en vez de devolver lista vacía silenciosamente.
- No muta ni el dashboard ni los mappings originales: cada dict del resultado es
construido desde cero con las tres claves que acepta la API
(`parameter_id`, `card_id`, `target`).
- Requiere 1 request HTTP (GET dashboard). Para aplicar los mappings al dashboard
usar `metabase_update_dashboard_safe` con `dashcards_add`.
@@ -0,0 +1,75 @@
"""Copia parameter_mappings de un dashcard a otro dentro del mismo dashboard."""
from __future__ import annotations
from .client import MetabaseClient
from .dashboards import metabase_get_dashboard
def metabase_copy_dashcard_mappings(
client: MetabaseClient,
*,
dashboard_id: int,
source_card_id: int,
dest_card_id: int,
) -> list[dict]:
"""Copia los parameter_mappings del primer dashcard fuente al card destino.
Obtiene el dashboard, localiza el primer dashcard cuyo card_id coincide con
source_card_id y devuelve una lista nueva de parameter_mappings con card_id
sustituido por dest_card_id. No muta el dashboard ni los mappings originales.
Args:
client: Cliente autenticado con sesion activa.
dashboard_id: ID del dashboard que contiene ambas cards.
source_card_id: card_id del dashcard del que se copian los mappings.
dest_card_id: card_id que se asigna en los mappings copiados.
Returns:
Lista de dicts con los parameter_mappings adaptados al card destino.
Cada elemento tiene: parameter_id, card_id (dest_card_id), target.
Retorna lista vacia si el dashcard fuente no tiene mappings.
Raises:
ValueError: Si source_card_id no aparece en ninguna dashcard del dashboard.
Example:
>>> mappings = metabase_copy_dashcard_mappings(
... client,
... dashboard_id=10,
... source_card_id=42,
... dest_card_id=99,
... )
>>> # Usar los mappings al añadir el nuevo dashcard al dashboard
>>> new_dashcard = {
... "id": -1,
... "card_id": 99,
... "col": 0, "row": 20, "size_x": 6, "size_y": 4,
... "parameter_mappings": mappings,
... "visualization_settings": {},
... }
"""
dashboard = metabase_get_dashboard(client, dashboard_id)
dashcards: list[dict] = dashboard.get("dashcards") or []
source_dc: dict | None = None
for dc in dashcards:
if dc.get("card_id") == source_card_id:
source_dc = dc
break
if source_dc is None:
raise ValueError(
f"card {source_card_id} not in dashboard {dashboard_id}"
)
source_mappings: list[dict] = source_dc.get("parameter_mappings") or []
return [
{
"parameter_id": m["parameter_id"],
"card_id": dest_card_id,
"target": m["target"],
}
for m in source_mappings
]
@@ -0,0 +1,103 @@
---
name: metabase_dashboard_append_row
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def metabase_dashboard_append_row(client: MetabaseClient, *, dashboard_id: int, tab_id: int, card_ids: list[int], height: int = 4, donor_card_id: int = 0, grid_width: int = 24) -> dict"
description: "Añade N cards como fila horizontal al final de una tab de dashboard. Calcula la primera fila libre con metabase_dashboard_next_row, distribuye las cards con ancho uniforme (grid_width // N), copia parameter_mappings de un dashcard donante si se indica y llama a metabase_update_dashboard_safe. Util para añadir filas de KPIs con los filtros del dashboard ya conectados."
tags: [metabase, dashboard, dashcard, layout, row, append, parameter-mappings, api, python]
uses_functions:
- metabase_dashboard_next_row_py_infra
- metabase_copy_dashcard_mappings_py_infra
- metabase_update_dashboard_safe_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: client
desc: "MetabaseClient autenticado con sesion activa"
- name: dashboard_id
desc: "ID del dashboard donde se añaden las cards"
- name: tab_id
desc: "ID de la tab donde se insertan las cards. Usar 0 para dashboards sin tabs o tab raiz"
- name: card_ids
desc: "Lista de IDs de cards a colocar como fila, de izquierda a derecha. No puede estar vacia"
- name: height
desc: "Altura en filas del grid para cada card. Default 4"
- name: donor_card_id
desc: "Si distinto de 0, copia los parameter_mappings de esa card a cada nueva card. Util para replicar los filtros de dashboard ya configurados. Default 0 (sin mappings)"
- name: grid_width
desc: "Anchura total del grid de Metabase. Default 24 (estandar). Cada card recibe grid_width // len(card_ids) columnas"
output: "Dict con el resumen de metabase_update_dashboard_safe: {'added': [negative_ids], 'updated': int, 'removed': int, 'response': dict_respuesta_PUT}"
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/metabase/metabase_dashboard_append_row.py"
---
## Por que existe
Añadir una fila de KPIs a un dashboard de Metabase que ya tiene 18 filtros
requiere: (1) calcular la fila libre, (2) construir cada dashcard con su
``parameter_mappings`` copiado, y (3) enviar el PUT sin activar los gotchas
conocidos (413, 500 FK). Esta funcion compone las tres operaciones atomicas
del registry en un solo paso.
## Ejemplo
```python
from metabase import MetabaseClient, metabase_dashboard_append_row
client = MetabaseClient("https://metabase.example.com", "token...")
# Añadir 4 KPIs al final de la tab 7, copiando filtros de card 42
result = metabase_dashboard_append_row(
client,
dashboard_id=10,
tab_id=7,
card_ids=[101, 102, 103, 104],
height=4,
donor_card_id=42,
)
print(result["added"]) # [-1, -2, -3, -4] IDs temporales asignados
# Añadir 2 cards sin filtros en dashboard sin tabs
result = metabase_dashboard_append_row(
client,
dashboard_id=15,
tab_id=0,
card_ids=[200, 201],
height=6,
)
# Añadir 1 card de ancho completo (24 columnas)
result = metabase_dashboard_append_row(
client,
dashboard_id=10,
tab_id=7,
card_ids=[300],
height=8,
donor_card_id=42,
)
```
## Notas
- Realiza entre 2 y ``1 + 2*len(card_ids)`` requests HTTP: 1 GET para
``next_row`` + (si ``donor_card_id != 0``) 1 GET por card para los
mappings (que comparten el GET del siguiente ``update_safe``) + 1 GET + 1
PUT para ``update_dashboard_safe``. En practica ``metabase_get_dashboard``
se llama 2-3 veces segun si hay donor.
- Si ``grid_width // len(card_ids)`` produce un cociente con resto, las
columnas sobrantes (al lado derecho) quedan vacias en el grid. Para
ajuste preciso, construir los dashcards manualmente y usar
``metabase_update_dashboard_safe`` directamente.
- ``donor_card_id`` debe ser el ``card_id`` de una card que ya existe en el
dashboard (no el ID del dashcard). Lanza ``ValueError`` si no se
encuentra.
- Si ``tab_id != 0``, ``metabase_update_dashboard_safe`` conserva las tabs
del dashboard (evita 500 FK violation).
@@ -0,0 +1,124 @@
"""Añade N cards como fila horizontal al final de una tab de dashboard."""
from __future__ import annotations
from .client import MetabaseClient
from .metabase_dashboard_next_row import metabase_dashboard_next_row
from .metabase_copy_dashcard_mappings import metabase_copy_dashcard_mappings
from .metabase_update_dashboard_safe import metabase_update_dashboard_safe
def metabase_dashboard_append_row(
client: MetabaseClient,
*,
dashboard_id: int,
tab_id: int,
card_ids: list[int],
height: int = 4,
donor_card_id: int = 0,
grid_width: int = 24,
) -> dict:
"""Añade N cards como fila horizontal al final de una tab de dashboard.
Calcula la primera fila libre de la tab indicada, distribuye las cards
horizontalmente con ancho uniforme (``grid_width // len(card_ids)``) y
opcionalmente copia los ``parameter_mappings`` de una card donante a cada
nueva card. Internamente delega en ``metabase_dashboard_next_row``,
``metabase_copy_dashcard_mappings`` y ``metabase_update_dashboard_safe``.
Args:
client: MetabaseClient autenticado con sesion activa.
dashboard_id: ID del dashboard donde se añaden las cards.
tab_id: ID de la tab donde se insertan las cards. Usar ``0`` para
dashboards sin tabs o para la tab raiz.
card_ids: Lista de IDs de cards a colocar como fila. El orden de
la lista determina el orden de izquierda a derecha.
height: Altura en filas del grid para cada card. Default ``4``.
donor_card_id: Si distinto de ``0``, copia los ``parameter_mappings``
de esa card (que debe existir ya en el dashboard) a cada nueva
card. Util para replicar los 18 filtros de dashboard sin configurar
manualmente cada mapping. Default ``0`` (sin mappings).
grid_width: Anchura total del grid de Metabase. Default ``24``
(estandar de Metabase). Cada card recibe
``grid_width // len(card_ids)`` columnas.
Returns:
Dict con el resumen de la operacion devuelto por
``metabase_update_dashboard_safe``::
{
"added": [lista de IDs negativos asignados],
"updated": int, # dashcards existentes conservados
"removed": int, # dashcards eliminados (0 en este caso)
"response": dict, # respuesta raw de PUT /api/dashboard/:id
}
Raises:
ValueError: Si ``card_ids`` esta vacio, o si ``donor_card_id != 0``
y esa card no existe en el dashboard.
httpx.HTTPStatusError: Si la API de Metabase devuelve 4xx/5xx.
Example:
>>> # Añadir 4 KPIs al final de la tab 7, copiando filtros de card 42
>>> result = metabase_dashboard_append_row(
... client,
... dashboard_id=10,
... tab_id=7,
... card_ids=[101, 102, 103, 104],
... height=4,
... donor_card_id=42,
... )
>>> print(result["added"]) # [-1, -2, -3, -4]
>>> # Sin filtros, 2 cards en dashboard sin tabs
>>> result = metabase_dashboard_append_row(
... client,
... dashboard_id=10,
... tab_id=0,
... card_ids=[200, 201],
... height=6,
... )
"""
if not card_ids:
raise ValueError("card_ids no puede estar vacio")
# 1. Calcular siguiente fila libre
next_row = metabase_dashboard_next_row(
client, dashboard_id=dashboard_id, tab_id=tab_id
)
# 2. Ancho de cada card
size_x = grid_width // len(card_ids)
# 3. Construir dashcards
new_dashcards: list[dict] = []
for i, cid in enumerate(card_ids):
# Copiar mappings del donor si se especifico
if donor_card_id != 0:
mappings = metabase_copy_dashcard_mappings(
client,
dashboard_id=dashboard_id,
source_card_id=donor_card_id,
dest_card_id=cid,
)
else:
mappings = []
dashcard: dict = {
"card_id": cid,
"dashboard_tab_id": tab_id,
"row": next_row,
"col": i * size_x,
"size_x": size_x,
"size_y": height,
"parameter_mappings": mappings,
"visualization_settings": {},
}
new_dashcards.append(dashcard)
# 4. Actualizar el dashboard
return metabase_update_dashboard_safe(
client,
dashboard_id,
dashcards_add=new_dashcards,
)
@@ -0,0 +1,62 @@
---
name: metabase_dashboard_next_row
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def metabase_dashboard_next_row(client: MetabaseClient, *, dashboard_id: int, tab_id: int = 0) -> int"
description: "Calcula la primera fila libre al final de una tab de dashboard: max(dc.row + dc.size_y) entre las dashcards de esa tab. Retorna 0 si la tab esta vacia. Evita el boilerplate manual de max() al colocar nuevas cards al final del dashboard."
tags: [metabase, dashboard, layout, dashcard, row, tab, api, python]
uses_functions:
- metabase_get_dashboard_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: client
desc: "MetabaseClient autenticado con sesion activa"
- name: dashboard_id
desc: "ID del dashboard cuyas filas se calculan"
- name: tab_id
desc: "ID de la tab cuya fila final se calcula. 0 (default) = dashcards sin dashboard_tab_id (dashboard sin pestanas o tab raiz)"
output: "int: indice de fila (0-indexed) donde colocar la siguiente card. 0 si la tab no tiene dashcards."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/metabase/metabase_dashboard_next_row.py"
---
## Ejemplo
```python
from metabase import MetabaseClient, metabase_dashboard_next_row
client = MetabaseClient("https://metabase.example.com", "token...")
# Dashboard sin tabs: siguiente fila libre en la raiz
next_row = metabase_dashboard_next_row(client, dashboard_id=10)
new_dashcard = {
"id": -1,
"card_id": 99,
"col": 0, "row": next_row, "size_x": 6, "size_y": 4,
"parameter_mappings": [],
"visualization_settings": {},
}
# Dashboard con tabs: siguiente fila libre en tab 3
next_row = metabase_dashboard_next_row(client, dashboard_id=10, tab_id=3)
```
## Notas
- Cuando `tab_id == 0` se filtran dashcards donde `dashboard_tab_id` es falsy
(None, 0 o ausente). Esto cubre dashboards sin tabs y la "tab raiz".
- Requiere 1 request HTTP (GET dashboard). Si ya tienes el objeto dashboard
en memoria, es mas eficiente calcular directamente:
`max((dc["row"] + dc["size_y"] for dc in dashcards), default=0)`.
- La fila retornada es la inmediatamente disponible — no reserva espacio
ni verifica solapamiento de columnas.
@@ -0,0 +1,60 @@
"""Calcula la siguiente fila libre al final de una tab de dashboard."""
from __future__ import annotations
from .client import MetabaseClient
from .dashboards import metabase_get_dashboard
def metabase_dashboard_next_row(
client: MetabaseClient,
*,
dashboard_id: int,
tab_id: int = 0,
) -> int:
"""Retorna la primera fila libre al final de una tab de dashboard.
Obtiene el dashboard y filtra las dashcards que pertenecen a la tab
indicada. Devuelve max(dc["row"] + dc["size_y"]) entre esas cards, o 0
si no hay ninguna dashcard en la tab.
Cuando tab_id == 0 se consideran las dashcards sin dashboard_tab_id
(dashboards sin pestanas o pestaña raiz).
Args:
client: Cliente autenticado con sesion activa.
dashboard_id: ID del dashboard a consultar.
tab_id: ID de la tab cuya fila final se calcula. 0 = dashcards sin tab
(dashboard sin pestanas o tab raiz).
Returns:
Entero con la fila donde colocar la siguiente card (0-indexed).
0 si la tab no tiene dashcards.
Example:
>>> next_row = metabase_dashboard_next_row(client, dashboard_id=10)
>>> new_dashcard = {
... "id": -1,
... "card_id": 99,
... "col": 0, "row": next_row, "size_x": 6, "size_y": 4,
... "parameter_mappings": [],
... "visualization_settings": {},
... }
>>> # Dashboard con tabs
>>> next_row = metabase_dashboard_next_row(
... client, dashboard_id=10, tab_id=3
... )
"""
dashboard = metabase_get_dashboard(client, dashboard_id)
dashcards: list[dict] = dashboard.get("dashcards") or []
if tab_id == 0:
filtered = [dc for dc in dashcards if not dc.get("dashboard_tab_id")]
else:
filtered = [dc for dc in dashcards if dc.get("dashboard_tab_id") == tab_id]
if not filtered:
return 0
return max(dc["row"] + dc["size_y"] for dc in filtered)
@@ -0,0 +1,142 @@
---
name: metabase_mbql_from_source_card
kind: function
lang: py
domain: metabase
version: "1.0.0"
purity: impure
signature: "def metabase_mbql_from_source_card(*, database_id: int, source_card_id: int, aggregations: list[dict], joins: list[dict] | None = None, filters: list[list] | None = None, breakouts: list[dict] | None = None, expressions: list[dict] | None = None) -> dict"
description: "Construye un dataset_query MBQL que envuelve una saved card (source-card) con agregaciones, joins opcionales, filtros, breakouts y una segunda etapa de expressions calculadas. Genera automaticamente todos los lib/uuid con uuid4. Elimina la necesidad de construir el JSON MBQL manualmente para el patron comun de agregar sobre una card guardada y anadir expresiones derivadas."
tags: [metabase, mbql, dataset_query, builder, query, aggregation, join, expression]
uses_functions: []
uses_types: []
params:
- name: database_id
desc: "ID numerico de la database de Metabase contra la que se ejecuta la query."
- name: source_card_id
desc: "ID de la card guardada que actua como fuente de datos (equivalente a 'pick a saved question' en el query builder de Metabase)."
- name: aggregations
desc: "Lista de specs de agregacion. Cada spec es un dict con 'op' (sum|count|avg|min|max|count-distinct), 'field' (nombre de columna en la source card) y 'base_type' opcional (default type/Decimal). Para count sin campo omitir 'field'."
- name: joins
desc: "Lista de specs de join. Cada spec incluye: alias (str), source_card_id (int), strategy (str, default 'left-join'), fields (str, opcional), local_field (str), local_base_type (str), foreign_field_id (int), foreign_base_type (str)."
- name: filters
desc: "Lista de clausulas MBQL de filtro en formato raw (arrays anidados). La funcion inyecta lib/uuid a cualquier dict que no lo tenga."
- name: breakouts
desc: "Lista de specs de breakout. Soporta campo por nombre (field, base_type, temporal_unit) o por ID numerico (field_id, base_type, join_alias)."
- name: expressions
desc: "Lista de specs de expression para la segunda etapa MBQL. Cada spec tiene 'name' y 'expr' (mini-DSL recursivo). Si se proporcionan, se crea una segunda stage MBQL con esas expressions."
output: "Dict con estructura completa del dataset_query MBQL: {lib/type, database, stages}. La primera stage tiene source-card, aggregation, joins, filters y breakout. La segunda stage (solo si hay expressions) contiene las expressions calculadas. Todos los dicts internos llevan lib/uuid unicos."
returns: []
returns_optional: false
error_type: "error_py_core"
imports: []
tested: true
tests:
- "source-card y database estan presentes en el output"
- "todos los dicts anidados tienen lib/uuid"
- "agregaciones producen forma correcta op+meta+field"
- "segunda stage existe solo cuando se pasan expressions"
test_file_path: "python/functions/metabase/test_metabase_mbql_from_source_card.py"
file_path: "python/functions/metabase/metabase_mbql_from_source_card.py"
---
## Mini-DSL para expressions
El campo `expr` dentro de cada spec de expression acepta un grafo recursivo de nodos:
| Nodo | Formato | Resultado MBQL |
|---|---|---|
| Field por nombre | `{"field": "PrecioVenta", "base_type": "type/Decimal"}` | `["field", {"base-type": ..., "lib/uuid": ...}, "PrecioVenta"]` |
| Operacion | `{"op": "+", "args": [nodo1, nodo2]}` | `["+", {"lib/uuid": ...}, nodo1_mbql, nodo2_mbql]` |
| Ref a expression | `{"ref": "Margen"}` | `["expression", {"base-type": "type/Float", ...}, "Margen"]` |
| Literal | `0`, `1.5`, `"texto"` | valor directo |
Operaciones soportadas: `+`, `-`, `*`, `/`, `coalesce`, `case`, y cualquier funcion MBQL valida.
## Ejemplo (card 9918 en produccion)
```python
import sys
sys.path.insert(0, '/home/egutierrez/fn_registry/python/functions')
from metabase.metabase_mbql_from_source_card import metabase_mbql_from_source_card
q = metabase_mbql_from_source_card(
database_id=6,
source_card_id=5305,
aggregations=[
{"op": "sum", "field": "PrecioVenta", "base_type": "type/Decimal"},
{"op": "sum", "field": "PrecioCompra", "base_type": "type/Decimal"},
],
joins=[
{
"alias": "Centros - idCentro",
"source_card_id": 4076,
"fields": "none",
"local_field": "idCentro",
"local_base_type": "type/Text",
"foreign_field_id": 17316,
"foreign_base_type": "type/Text",
},
],
filters=[
["not-empty", {}, ["field", {"base-type": "type/Text"}, "Centros - idCentro__Companies__name"]],
],
expressions=[
{
"name": "Margen",
"expr": {
"op": "coalesce",
"args": [
{
"op": "/",
"args": [
{"op": "-", "args": [{"field": "sum"}, {"field": "sum_2"}]},
{"field": "sum"},
],
},
0,
],
},
},
],
)
# Verificar shape
assert q["stages"][0]["source-card"] == 5305
assert q["database"] == 6
assert len(q["stages"]) == 2 # stage0 con agg + stage1 con expressions
```
## Uso tipico con MetabaseClient
```python
from metabase import MetabaseClient, metabase_mbql_from_source_card
client = MetabaseClient('https://metabase.example.com', 'token...')
q = metabase_mbql_from_source_card(
database_id=6,
source_card_id=5305,
aggregations=[
{"op": "sum", "field": "Ventas", "base_type": "type/Decimal"},
],
)
payload = {
"name": "Ventas totales por periodo",
"type": "question",
"display": "table",
"dataset_query": q,
"visualization_settings": {},
}
card = client.request("POST", "/api/card", json=payload)
print(card["id"])
```
## Notas
- Los `lib/uuid` se generan con `uuid.uuid4()` en cada llamada, por lo que dos invocaciones producen UUIDs distintos. Esto es correcto: Metabase requiere UUIDs unicos globalmente por query, no deterministicos.
- Los filtros se pasan en formato raw para maxima flexibilidad. La funcion solo inyecta `lib/uuid` a los dicts que no lo tengan.
- La segunda stage solo se crea cuando `expressions` es no-None y no vacio. Sin expressions, el output tiene una sola stage.
- `field` en un nodo de expression refiere siempre a nombre de columna (string). Para referenciar por ID numerico usar `{"field_id": N}` (solo en breakouts por ahora; en expressions usar `{"field": "nombre"}` con el nombre del slot de aggregation como `"sum"`, `"sum_2"`, etc.).
@@ -0,0 +1,375 @@
"""Constructor de dataset_query MBQL que envuelve una saved card con agregaciones,
joins, filtros y expressions en una segunda etapa."""
from __future__ import annotations
import uuid
from typing import Any
# ---------------------------------------------------------------------------
# Helpers internos
# ---------------------------------------------------------------------------
def _uuid() -> str:
"""Genera un UUID v4 fresco como string."""
return str(uuid.uuid4())
def _inject_uuids(obj: Any) -> Any:
"""Recorre obj recursivamente e inyecta lib/uuid a dicts que no lo tengan."""
if isinstance(obj, dict):
if "lib/uuid" not in obj:
obj["lib/uuid"] = _uuid()
for k, v in obj.items():
if k != "lib/uuid":
obj[k] = _inject_uuids(v)
return obj
elif isinstance(obj, list):
return [_inject_uuids(item) for item in obj]
return obj
def _build_field_ref(
field_name: str,
base_type: str = "type/Decimal",
) -> list:
"""Construye un nodo MBQL ["field", meta, field_name] para campo de source-card."""
return [
"field",
{"base-type": base_type, "lib/uuid": _uuid()},
field_name,
]
def _build_field_ref_by_id(
field_id: int,
base_type: str = "type/Text",
join_alias: str | None = None,
) -> list:
"""Construye un nodo MBQL ["field", meta, field_id] para campo por ID numerico."""
meta: dict = {"base-type": base_type, "lib/uuid": _uuid()}
if join_alias:
meta["join-alias"] = join_alias
return ["field", meta, field_id]
# ---------------------------------------------------------------------------
# Builders de aggregation
# ---------------------------------------------------------------------------
_OPS_WITHOUT_FIELD = {"count"}
def _build_aggregation(agg_spec: dict) -> list:
"""Convierte un spec de agregacion al formato MBQL.
Formato input:
{"op": "sum", "field": "PrecioVenta", "base_type": "type/Decimal"}
{"op": "count"}
Formato output (con campo):
["sum", {"lib/uuid": ...}, ["field", {"base-type": "...", "lib/uuid": ...}, "PrecioVenta"]]
Formato output (sin campo, ej. count):
["count", {"lib/uuid": ...}]
"""
op = agg_spec["op"]
meta = {"lib/uuid": _uuid()}
if op in _OPS_WITHOUT_FIELD or "field" not in agg_spec:
return [op, meta]
field_node = _build_field_ref(
agg_spec["field"],
base_type=agg_spec.get("base_type", "type/Decimal"),
)
return [op, meta, field_node]
# ---------------------------------------------------------------------------
# Builders de join
# ---------------------------------------------------------------------------
def _build_join(join_spec: dict) -> dict:
"""Construye el dict de join MBQL a partir de un join_spec.
join_spec esperado:
{
"alias": "Centros - idCentro",
"source_card_id": 4076,
"strategy": "left-join", # default
"fields": "none", # opcional
"local_field": "idCentro",
"local_base_type": "type/Text",
"foreign_field_id": 17316,
"foreign_base_type": "type/Text",
}
"""
alias = join_spec["alias"]
strategy = join_spec.get("strategy", "left-join")
# Condicion de join: ["=", meta, local_field_ref, foreign_field_ref]
local_ref = _build_field_ref(
join_spec["local_field"],
base_type=join_spec.get("local_base_type", "type/Text"),
)
foreign_ref = _build_field_ref_by_id(
join_spec["foreign_field_id"],
base_type=join_spec.get("foreign_base_type", "type/Text"),
join_alias=alias,
)
condition_meta = {"lib/uuid": _uuid()}
condition = ["=", condition_meta, local_ref, foreign_ref]
join: dict = {
"lib/type": "mbql/join",
"lib/uuid": _uuid(),
"alias": alias,
"strategy": strategy,
"stages": [
{
"lib/type": "mbql.stage/mbql",
"source-card": join_spec["source_card_id"],
"lib/options": {"lib/uuid": _uuid()},
}
],
"conditions": [condition],
}
if "fields" in join_spec:
join["fields"] = join_spec["fields"]
return join
# ---------------------------------------------------------------------------
# Builders de breakout
# ---------------------------------------------------------------------------
def _build_breakout(breakout_spec: dict) -> list:
"""Construye un nodo MBQL de breakout.
Soporta:
{"field": "fecha", "base_type": "type/Date", "temporal_unit": "month"}
{"field_id": 156756, "base_type": "type/Text", "join_alias": "Centros - idCentro"}
"""
if "field_id" in breakout_spec:
meta: dict = {
"base-type": breakout_spec.get("base_type", "type/Text"),
"lib/uuid": _uuid(),
}
if "join_alias" in breakout_spec:
meta["join-alias"] = breakout_spec["join_alias"]
return ["field", meta, breakout_spec["field_id"]]
else:
meta = {
"base-type": breakout_spec.get("base_type", "type/Date"),
"lib/uuid": _uuid(),
}
if "temporal_unit" in breakout_spec:
meta["temporal-unit"] = breakout_spec["temporal_unit"]
return ["field", meta, breakout_spec["field"]]
# ---------------------------------------------------------------------------
# Builder de expressions (mini-DSL recursivo)
# ---------------------------------------------------------------------------
def _build_expr_node(node: Any, expr_name: str | None = None) -> Any:
"""Traduce recursivamente un nodo del mini-DSL a formato MBQL.
Primitivos soportados:
{"field": "nombre", "base_type": "type/Decimal"} → field ref por nombre
{"op": "sum_field", ...} → no aplica aqui
{"op": "+", "args": [...]} → operacion MBQL
{"ref": "NombreExpresion"} → expression ref
int | float | bool | str → literal
Nodo raiz recibe expr_name para incluir lib/expression-name en el meta.
"""
if isinstance(node, (int, float, bool, str)) and not isinstance(node, bool):
return node
if isinstance(node, bool):
return node
if not isinstance(node, dict):
return node
# Referencia a otra expression nombrada en la misma stage
if "ref" in node:
ref_name = node["ref"]
base_type = node.get("base_type", "type/Float")
return [
"expression",
{
"base-type": base_type,
"effective-type": base_type,
"lib/uuid": _uuid(),
},
ref_name,
]
# Field ref por nombre
if "field" in node and "op" not in node:
return _build_field_ref(
node["field"],
base_type=node.get("base_type", "type/Decimal"),
)
# Operacion con args
if "op" in node:
op = node["op"]
args = node.get("args", [])
meta: dict = {"lib/uuid": _uuid()}
if expr_name is not None:
meta["lib/expression-name"] = expr_name
translated_args = [_build_expr_node(a) for a in args]
return [op, meta] + translated_args
# Fallback: retornar el nodo tal cual
return node
def _build_expression_entry(expr_spec: dict) -> list:
"""Construye el nodo MBQL completo para una expression con nombre.
expr_spec:
{"name": "Margen", "expr": {...mini-DSL...}}
Retorna:
[op, {"lib/uuid": ..., "lib/expression-name": "Margen"}, ...args]
"""
name = expr_spec["name"]
expr_node = expr_spec["expr"]
# Si el nodo raiz es una operacion, trasladar el expr_name al meta
if isinstance(expr_node, dict) and "op" in expr_node:
op = expr_node["op"]
args = expr_node.get("args", [])
meta: dict = {
"lib/uuid": _uuid(),
"lib/expression-name": name,
}
translated_args = [_build_expr_node(a) for a in args]
return [op, meta] + translated_args
# Si es un field ref u otro primitivo, envolverlo
built = _build_expr_node(expr_node, expr_name=name)
if isinstance(built, list) and len(built) >= 2 and isinstance(built[1], dict):
built[1]["lib/expression-name"] = name
return built
# ---------------------------------------------------------------------------
# Funcion principal
# ---------------------------------------------------------------------------
def metabase_mbql_from_source_card(
*,
database_id: int,
source_card_id: int,
aggregations: list[dict],
joins: list[dict] | None = None,
filters: list[list] | None = None,
breakouts: list[dict] | None = None,
expressions: list[dict] | None = None,
) -> dict:
"""Construye un dataset_query MBQL que envuelve una saved card (source-card).
Genera automaticamente todos los lib/uuid necesarios. Soporta agregaciones,
joins, filtros, breakouts y una segunda etapa con expressions calculadas.
Elimina la necesidad de construir el JSON MBQL manualmente para el patron
comun "agregar sobre una card guardada y anadir expresiones derivadas".
Args:
database_id: ID de la database de Metabase contra la que se ejecuta.
source_card_id: ID de la card guardada que actua como fuente de datos
(equivalente a "pick a saved question" en el query builder).
aggregations: Lista de specs de agregacion. Cada spec es un dict con
"op" (sum|count|avg|min|max|count-distinct), "field" (nombre de
columna en la source card) y "base_type" opcional (default
"type/Decimal"). Para count sin campo, omitir "field".
joins: Lista de specs de join. Cada spec incluye "alias", "source_card_id"
(card a joinear), "strategy" (default "left-join"), "fields" opcional,
"local_field" (campo en la etapa actual), "local_base_type",
"foreign_field_id" (ID numerico del campo en la card joineada) y
"foreign_base_type".
filters: Lista de clausulas MBQL de filtro en formato raw. La funcion
inyecta lib/uuid a cualquier dict que no lo tenga. Ejemplo:
[["not-empty", {}, ["field", {"base-type": "type/Text"}, "nombre"]]].
breakouts: Lista de specs de breakout. Soporta campo por nombre (con
"field" y "base_type" opcionales y "temporal_unit" para fechas) o
por ID numerico (con "field_id", "base_type" y "join_alias" opcionales).
expressions: Lista de specs de expression para la segunda etapa MBQL.
Cada spec tiene "name" (nombre de la expresion) y "expr" (nodo del
mini-DSL recursivo). Nodos soportados: {"field": ..., "base_type": ...}
para referencias a campos, {"op": ..., "args": [...]} para operaciones
(+, -, *, /, coalesce, case, etc.), {"ref": ...} para referenciar
otra expression nombrada en la misma etapa, y literales numericos.
Si se proporcionan expressions, se crea una segunda stage MBQL.
Returns:
Dict con la estructura completa del dataset_query MBQL:
{
"lib/type": "mbql/query",
"database": database_id,
"stages": [stage0, stage1?] # stage1 solo si hay expressions
}
Todos los dicts internos tienen lib/uuid unicos generados con uuid4.
Example:
>>> q = metabase_mbql_from_source_card(
... database_id=6,
... source_card_id=5305,
... aggregations=[{"op": "count"}],
... )
>>> q["stages"][0]["source-card"]
5305
>>> q["database"]
6
"""
# ---- Stage 0 -----------------------------------------------------------
stage0: dict = {
"lib/type": "mbql.stage/mbql",
"source-card": source_card_id,
}
# Agregaciones
agg_nodes = [_build_aggregation(a) for a in aggregations]
if agg_nodes:
stage0["aggregation"] = agg_nodes
# Joins
if joins:
stage0["joins"] = [_build_join(j) for j in joins]
# Filtros: inyectar uuids a dicts que no los tengan
if filters:
stage0["filters"] = [_inject_uuids(f) for f in filters]
# Breakouts
if breakouts:
stage0["breakout"] = [_build_breakout(b) for b in breakouts]
stages = [stage0]
# ---- Stage 1 (solo si hay expressions) ---------------------------------
if expressions:
stage1: dict = {
"lib/type": "mbql.stage/mbql",
"expressions": [_build_expression_entry(e) for e in expressions],
}
stages.append(stage1)
return {
"lib/type": "mbql/query",
"database": database_id,
"stages": stages,
}
@@ -0,0 +1,98 @@
---
name: metabase_smartscalar_anothercolumn_viz
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: pure
signature: "def metabase_smartscalar_anothercolumn_viz(*, main_column: str, compare_column: str, label: str = 'vs N-1', number_style: str = 'percent', decimals: int = 2, currency: str = 'EUR', comparison_id: str = 'cmp_n1') -> dict"
description: "Construye el dict completo de visualization_settings para una card display=smartscalar con comparacion tipo anotherColumn (misma fila, columna diferente). Incluye scalar.field, scalar.comparisons y column_settings para ambas columnas con el mismo formato. Internamente reutiliza metabase_viz_column_format."
tags: [metabase, visualization, smartscalar, anothercolumn, scalar, comparison, pure, builder]
uses_functions:
- metabase_viz_column_format_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: main_column
desc: "Nombre de la columna mostrada como valor principal en el smartscalar. Debe coincidir exactamente con el nombre de columna en el resultado del query"
- name: compare_column
desc: "Nombre de la columna usada como valor de comparacion en la misma fila del query"
- name: label
desc: "Etiqueta mostrada bajo el delta de comparacion. Default 'vs N-1'"
- name: number_style
desc: "Estilo de formato para ambas columnas: 'percent', 'currency', 'decimal', '' (default Metabase). Default 'percent'"
- name: decimals
desc: "Numero de decimales para ambas columnas. Default 2"
- name: currency
desc: "Codigo ISO de moneda ('EUR', 'USD', ...). Solo relevante cuando number_style='currency'. Default 'EUR'"
- name: comparison_id
desc: "Identificador interno de la comparacion dentro del array scalar.comparisons. Default 'cmp_n1'"
output: "Dict con visualization_settings completo: {'scalar.field': str, 'scalar.comparisons': [{'id', 'type': 'anotherColumn', 'column', 'label'}], 'column_settings': {key_main: fmt, key_compare: fmt}}. Listo para usar como visualization_settings en el payload de la card."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/metabase/metabase_smartscalar_anothercolumn_viz.py"
---
## Por que existe
El patron ``anotherColumn`` de Metabase smartscalar muestra un delta entre
dos columnas de la misma fila (en lugar del patron ``previousValue`` que
usa dos filas temporales). Util cuando el query ya calcula el valor actual
y el valor comparativo directamente (tipico en queries con CTEs que calculan
n-1 en una sola pasada).
La combinacion ``scalar.field`` + ``scalar.comparisons[anotherColumn]`` +
``column_settings`` para ambas columnas con formato identico requiere varios
campos que Metabase silenciosamente ignora si faltan o estan mal formateados.
## Ejemplo
```python
from metabase import metabase_smartscalar_anothercolumn_viz, metabase_create_card_raw
# Smartscalar de margen (porcentaje) vs columna n-1
viz = metabase_smartscalar_anothercolumn_viz(
main_column="Margen",
compare_column="MargenN1",
label="vs N-1",
number_style="percent",
decimals=2,
)
# Smartscalar de venta (moneda) vs columna n-1
viz = metabase_smartscalar_anothercolumn_viz(
main_column="Venta",
compare_column="VentaN1",
label="vs ano anterior",
number_style="currency",
decimals=0,
currency="EUR",
)
# Integrar en un payload completo de card
payload = {
"name": "Margen actual",
"type": "question",
"display": "smartscalar",
"dataset_query": { ... },
"visualization_settings": viz,
}
card = metabase_create_card_raw(client, payload)
```
## Notas
- Aplica el mismo formato (``number_style``, ``decimals``, ``currency``) a
ambas columnas (``main_column`` y ``compare_column``). Si necesitas formatos
distintos, construye ``column_settings`` manualmente con
``metabase_viz_column_format`` y monta el dict a mano.
- Cuando ``number_style="currency"`` se agrega automaticamente
``currency_in_header=False`` en ambas columnas.
- El ``comparison_id`` solo necesita ser unico dentro del array
``scalar.comparisons`` de esa card — no es un ID global.
- Para el patron de 2 filas temporales (``previousValue``) usar
``metabase_smartscalar_kpi_payload`` en su lugar.
@@ -0,0 +1,111 @@
"""Construye visualization_settings para smartscalar con comparacion anotherColumn."""
from __future__ import annotations
from .metabase_viz_column_format import metabase_viz_column_format
def metabase_smartscalar_anothercolumn_viz(
*,
main_column: str,
compare_column: str,
label: str = "vs N-1",
number_style: str = "percent",
decimals: int = 2,
currency: str = "EUR",
comparison_id: str = "cmp_n1",
) -> dict:
"""Construye visualization_settings para smartscalar con comparacion anotherColumn.
Genera el dict completo de ``visualization_settings`` para una card con
``display=smartscalar`` que muestra ``main_column`` con una comparacion
de tipo ``anotherColumn`` apuntando a ``compare_column`` (ambas columnas
en la misma fila del query).
Este patron es util cuando el query devuelve directamente el valor actual
y el valor de comparacion como columnas separadas (en lugar del patron de
2 filas temporales de ``previousValue``).
Args:
main_column: Nombre de la columna que se muestra como valor principal
en el smartscalar. Debe coincidir exactamente con el nombre de
columna en el resultado del query.
compare_column: Nombre de la columna que se usa como valor de
comparacion. Debe estar en el mismo resultado del query que
``main_column``.
label: Etiqueta mostrada bajo el delta de comparacion. Default
``"vs N-1"``.
number_style: Estilo de formato para ambas columnas. Valores:
``"percent"``, ``"currency"``, ``"decimal"``, ``""`` (default
Metabase). Default ``"percent"``.
decimals: Numero de decimales para ambas columnas. Default ``2``.
currency: Codigo ISO de moneda (``"EUR"``, ``"USD"``, ...). Solo
relevante cuando ``number_style="currency"``. Default ``"EUR"``.
comparison_id: Identificador interno de la comparacion dentro del
array ``scalar.comparisons``. Debe ser unico por card. Default
``"cmp_n1"``.
Returns:
Dict con la estructura completa de ``visualization_settings``::
{
"scalar.field": "<main_column>",
"scalar.comparisons": [
{"id": "<comparison_id>", "type": "anotherColumn",
"column": "<compare_column>", "label": "<label>"},
],
"column_settings": {
'["name","<main_column>"]': {...formato...},
'["name","<compare_column>"]': {...mismo formato...},
},
}
Example:
>>> viz = metabase_smartscalar_anothercolumn_viz(
... main_column="Margen",
... compare_column="MargenN1",
... label="vs N-1",
... number_style="percent",
... decimals=2,
... )
>>> assert viz["scalar.field"] == "Margen"
>>> assert viz["scalar.comparisons"][0]["type"] == "anotherColumn"
>>> assert viz["scalar.comparisons"][0]["column"] == "MargenN1"
>>> assert '["name","Margen"]' in viz["column_settings"]
>>> assert '["name","MargenN1"]' in viz["column_settings"]
>>> # Con formato moneda
>>> viz = metabase_smartscalar_anothercolumn_viz(
... main_column="Venta",
... compare_column="VentaN1",
... label="vs ano anterior",
... number_style="currency",
... decimals=0,
... currency="EUR",
... )
"""
# Argumentos de formato comunes a ambas columnas
fmt_kwargs: dict = {
"number_style": number_style,
"decimals": decimals,
}
if number_style == "currency" and currency:
fmt_kwargs["currency"] = currency
fmt_kwargs["currency_in_header"] = False
column_settings: dict = {}
column_settings.update(metabase_viz_column_format(main_column, **fmt_kwargs))
column_settings.update(metabase_viz_column_format(compare_column, **fmt_kwargs))
return {
"scalar.field": main_column,
"scalar.comparisons": [
{
"id": comparison_id,
"type": "anotherColumn",
"column": compare_column,
"label": label,
}
],
"column_settings": column_settings,
}
@@ -0,0 +1,82 @@
---
name: metabase_viz_column_format
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: pure
signature: "def metabase_viz_column_format(column: str, *, number_style: str = '', decimals: int = -1, currency: str = '', currency_in_header: bool | None = None, suffix: str = '', prefix: str = '') -> dict"
description: "Construye la entrada column_settings para una columna en visualization_settings de Metabase. Genera la clave con el formato JSON exacto que Metabase requiere ('['name','<columna>']') y el dict de formato con solo los campos explicitamente especificados. El resultado se fusiona en visualization_settings['column_settings']."
tags: [metabase, visualization, column-settings, format, number-style, currency, percent, pure, builder]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: column
desc: "Nombre exacto de la columna tal como aparece en el resultado del query (sensible a mayusculas)"
- name: number_style
desc: "Estilo de formato numerico: '' (default), 'currency', 'percent', 'decimal', 'scientific'"
- name: decimals
desc: "Numero de decimales a mostrar. -1 = omitir (usa default de Metabase)"
- name: currency
desc: "Codigo ISO de moneda ('EUR', 'USD', ...). Solo relevante con number_style='currency'. Vacio = omitir"
- name: currency_in_header
desc: "Si True muestra el simbolo de moneda en la cabecera en lugar de junto al valor. None = omitir"
- name: suffix
desc: "Sufijo textual mostrado tras el valor. Vacio = omitir"
- name: prefix
desc: "Prefijo textual mostrado antes del valor. Vacio = omitir"
output: "Dict con una sola clave (string JSON '[\"name\",\"<column>\"]') y como valor un dict con los campos de formato explicitamente especificados. Listo para fusionar en visualization_settings['column_settings']."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/metabase/metabase_viz_column_format.py"
---
## Por que existe
Metabase espera la clave de ``column_settings`` como un string JSON serializado sin
espacios: ``'["name","Margen"]'``. Construir este string manualmente es propenso
a errores (espacios extra, comillas mal escapadas). Esta funcion estandariza la
construccion y solo incluye los campos que se especificaron — Metabase ignora
campos ausentes con su comportamiento default.
## Ejemplo
```python
from metabase import metabase_viz_column_format
# Formato porcentaje con 2 decimales
entry = metabase_viz_column_format("Margen", number_style="percent", decimals=2)
# {'["name","Margen"]': {"number_style": "percent", "decimals": 2}}
# Formato moneda sin decimales, simbolo en cabecera
entry = metabase_viz_column_format(
"MasadeMargen",
number_style="currency",
currency="EUR",
decimals=0,
currency_in_header=False,
)
# {'["name","MasadeMargen"]': {"number_style": "currency", "currency": "EUR", "decimals": 0, "currency_in_header": False}}
# Usar en visualization_settings
viz = {"column_settings": {}}
viz["column_settings"].update(metabase_viz_column_format("Venta", number_style="currency", currency="EUR", decimals=0))
viz["column_settings"].update(metabase_viz_column_format("Margen", number_style="percent", decimals=2))
```
## Notas
- La clave se genera con ``json.dumps(["name", column], separators=(",", ":"))``
que produce ``'["name","Margen"]'`` sin espacios — el formato exacto que Metabase
espera. Cualquier variacion (espacios, comillas simples) causa que Metabase
ignore silenciosamente el setting.
- Solo se incluyen en el dict de valor los campos explicitamente especificados.
Metabase aplica sus defaults para los ausentes.
- El centinela ``decimals=-1`` (valor por defecto) indica "omitir"; cualquier valor
>= 0 se incluye en el resultado incluyendo ``decimals=0``.
- Compatible con cualquier tipo de card (smartscalar, table, bar, line).
@@ -0,0 +1,84 @@
"""Construye entradas de column_settings para visualization_settings de Metabase."""
from __future__ import annotations
import json
def metabase_viz_column_format(
column: str,
*,
number_style: str = "",
decimals: int = -1,
currency: str = "",
currency_in_header: bool | None = None,
suffix: str = "",
prefix: str = "",
) -> dict:
"""Construye la entrada column_settings para una columna en visualization_settings.
Genera la clave con el formato JSON exacto que espera Metabase
(``'["name","<columna>"]'``) y el dict de formato con solo los campos
que se han especificado explicitamente (omite vacios y valores centinela).
El resultado esta pensado para fusionarse (``dict.update`` o ``|``) en
``visualization_settings["column_settings"]``.
Args:
column: Nombre exacto de la columna tal como aparece en el resultado
del query (sensible a mayusculas).
number_style: Estilo de formato numerico. Valores validos:
``""`` (sin estilo, usa el default de Metabase),
``"currency"``, ``"percent"``, ``"decimal"``, ``"scientific"``.
decimals: Numero de decimales a mostrar. ``-1`` = omitir (Metabase
usa su default automatico).
currency: Codigo ISO de moneda (``"EUR"``, ``"USD"``, ...). Solo
relevante cuando ``number_style="currency"``. Vacio = omitir.
currency_in_header: Si ``True`` muestra el simbolo de moneda en la
cabecera de columna en lugar de junto al valor. ``None`` = omitir.
suffix: Sufijo textual mostrado tras el valor (ej. ``"%"``). Vacio = omitir.
prefix: Prefijo textual mostrado antes del valor. Vacio = omitir.
Returns:
Dict con una sola clave — el string JSON ``'["name","<column>"]'`` —
y como valor un dict con los campos de formato que se han especificado.
El dict de formato puede estar vacio si no se paso ningun parametro
de formato.
Example:
>>> metabase_viz_column_format("Margen", number_style="percent", decimals=2)
{'["name","Margen"]': {"number_style": "percent", "decimals": 2}}
>>> metabase_viz_column_format(
... "MasadeMargen",
... number_style="currency",
... currency="EUR",
... decimals=0,
... currency_in_header=False,
... )
{'["name","MasadeMargen"]': {"number_style": "currency", "currency": "EUR", "decimals": 0, "currency_in_header": False}}
"""
# Clave: array JSON serializado sin espacios adicionales
key = json.dumps(["name", column], separators=(",", ":"))
fmt: dict = {}
if number_style:
fmt["number_style"] = number_style
if decimals >= 0:
fmt["decimals"] = decimals
if currency:
fmt["currency"] = currency
if currency_in_header is not None:
fmt["currency_in_header"] = currency_in_header
if suffix:
fmt["suffix"] = suffix
if prefix:
fmt["prefix"] = prefix
return {key: fmt}
@@ -0,0 +1,234 @@
"""Tests para metabase_mbql_from_source_card."""
from __future__ import annotations
import sys
import os
import re
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from metabase.metabase_mbql_from_source_card import metabase_mbql_from_source_card
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
)
def _collect_dicts(obj: object) -> list[dict]:
"""Recorre recursivamente obj y devuelve todos los dicts encontrados."""
result: list[dict] = []
if isinstance(obj, dict):
result.append(obj)
for v in obj.values():
result.extend(_collect_dicts(v))
elif isinstance(obj, list):
for item in obj:
result.extend(_collect_dicts(item))
return result
def _all_meta_dicts_have_uuid(obj: object) -> tuple[bool, list[dict]]:
"""Verifica que todos los dicts de metadatos MBQL tienen lib/uuid valido.
Los dicts de metadatos son aquellos que tienen exactamente el rol de
opciones/meta en MBQL: dicts que NO son stages (no tienen lib/type con
valor mbql.stage/*) y tampoco son el objeto raiz. En la practica son los
dicts de posicion 2 en nodos [op, meta, ...] y similares.
Devuelve (ok, lista_de_dicts_sin_uuid).
"""
# Dicts que se consideran estructurales y no necesitan lib/uuid
_STRUCTURAL_LIB_TYPES = {
"mbql/query",
"mbql.stage/mbql",
"mbql/join",
}
missing: list[dict] = []
for d in _collect_dicts(obj):
lib_type = d.get("lib/type")
# Omitir dicts estructurales (stages, joins, query raiz)
if lib_type in _STRUCTURAL_LIB_TYPES:
continue
# Omitir dicts vacios (pueden aparecer en filtros raw antes de inyectar)
if not d:
continue
# Cualquier otro dict MBQL (meta de field, de operacion, de condition...)
# debe tener lib/uuid
uid = d.get("lib/uuid")
if uid is None or not UUID_RE.match(str(uid)):
missing.append(d)
return (len(missing) == 0, missing)
# ---------------------------------------------------------------------------
# Test 1: source-card y database presentes en el output
# ---------------------------------------------------------------------------
def test_source_card_y_database_presentes():
"""source-card y database estan presentes en el output."""
q = metabase_mbql_from_source_card(
database_id=6,
source_card_id=5305,
aggregations=[{"op": "count"}],
)
assert q["database"] == 6, f"database esperado 6, got {q['database']}"
assert q["lib/type"] == "mbql/query"
stage0 = q["stages"][0]
assert stage0["source-card"] == 5305, (
f"source-card esperado 5305, got {stage0.get('source-card')}"
)
# ---------------------------------------------------------------------------
# Test 2: todos los dicts anidados tienen lib/uuid
# ---------------------------------------------------------------------------
def test_todos_los_dicts_tienen_lib_uuid():
"""todos los dicts anidados tienen lib/uuid."""
q = metabase_mbql_from_source_card(
database_id=6,
source_card_id=5305,
aggregations=[
{"op": "sum", "field": "PrecioVenta", "base_type": "type/Decimal"},
{"op": "sum", "field": "PrecioCompra", "base_type": "type/Decimal"},
],
joins=[
{
"alias": "Centros - idCentro",
"source_card_id": 4076,
"fields": "none",
"local_field": "idCentro",
"local_base_type": "type/Text",
"foreign_field_id": 17316,
"foreign_base_type": "type/Text",
},
],
filters=[
[
"not-empty",
{},
[
"field",
{"base-type": "type/Text"},
"Centros - idCentro__Companies__name",
],
]
],
expressions=[
{
"name": "Margen",
"expr": {
"op": "coalesce",
"args": [
{
"op": "/",
"args": [
{
"op": "-",
"args": [
{"field": "sum"},
{"field": "sum_2"},
],
},
{"field": "sum"},
],
},
0,
],
},
}
],
)
# Verificar que todos los dicts de metadatos MBQL tienen lib/uuid
ok, missing = _all_meta_dicts_have_uuid(q["stages"])
assert ok, (
f"Hay {len(missing)} dicts de meta sin lib/uuid valido. Primeros 3: {missing[:3]}"
)
# ---------------------------------------------------------------------------
# Test 3: agregaciones producen forma correcta op+meta+field
# ---------------------------------------------------------------------------
def test_agregaciones_producen_forma_correcta():
"""agregaciones producen forma correcta op+meta+field."""
q = metabase_mbql_from_source_card(
database_id=6,
source_card_id=100,
aggregations=[
{"op": "sum", "field": "Ventas", "base_type": "type/Decimal"},
{"op": "count"},
{"op": "avg", "field": "Precio"},
],
)
aggs = q["stages"][0]["aggregation"]
assert len(aggs) == 3
# sum: ["sum", {lib/uuid}, ["field", {base-type, lib/uuid}, "Ventas"]]
sum_agg = aggs[0]
assert sum_agg[0] == "sum", f"Esperaba 'sum', got {sum_agg[0]}"
assert isinstance(sum_agg[1], dict) and "lib/uuid" in sum_agg[1]
assert isinstance(sum_agg[2], list) and sum_agg[2][0] == "field"
assert sum_agg[2][2] == "Ventas"
assert sum_agg[2][1].get("base-type") == "type/Decimal"
# count sin campo: ["count", {lib/uuid}]
count_agg = aggs[1]
assert count_agg[0] == "count"
assert len(count_agg) == 2, f"count no deberia tener campo, got {count_agg}"
# avg con campo
avg_agg = aggs[2]
assert avg_agg[0] == "avg"
assert avg_agg[2][2] == "Precio"
# ---------------------------------------------------------------------------
# Test 4: segunda stage existe solo cuando se pasan expressions
# ---------------------------------------------------------------------------
def test_segunda_stage_solo_con_expressions():
"""segunda stage existe solo cuando se pasan expressions."""
# Sin expressions: una sola stage
q_sin = metabase_mbql_from_source_card(
database_id=6,
source_card_id=100,
aggregations=[{"op": "count"}],
)
assert len(q_sin["stages"]) == 1, (
f"Sin expressions esperaba 1 stage, got {len(q_sin['stages'])}"
)
# Con expressions: dos stages
q_con = metabase_mbql_from_source_card(
database_id=6,
source_card_id=100,
aggregations=[{"op": "count"}],
expressions=[
{
"name": "DobleConteo",
"expr": {"op": "*", "args": [{"field": "count"}, 2]},
}
],
)
assert len(q_con["stages"]) == 2, (
f"Con expressions esperaba 2 stages, got {len(q_con['stages'])}"
)
stage1 = q_con["stages"][1]
assert "expressions" in stage1, "stage1 debe tener key 'expressions'"
expr = stage1["expressions"][0]
assert expr[0] == "*", f"Esperaba op '*', got {expr[0]}"
assert expr[1].get("lib/expression-name") == "DobleConteo"