feat(infra): auto-commit con 8 cambios

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-17 10:30:26 +02:00
parent 588d092858
commit 909290ddbf
9 changed files with 764 additions and 1 deletions
+1 -1
View File
@@ -6,7 +6,7 @@
},
"jupyter": {
"command": "bash",
"args": ["/home/enmanuel/fn_registry/bash/functions/infra/jupyter_mcp_serve.sh"]
"args": ["-c", "exec bash \"$(git rev-parse --show-toplevel)/bash/functions/infra/jupyter_mcp_serve.sh\""]
}
}
}
Submodule cpp/apps/chart_demo added at 026f514bb7
Submodule cpp/apps/shaders_lab added at dc9a970aff
@@ -0,0 +1,72 @@
---
name: sign_metabase_embed_jwt
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def sign_metabase_embed_jwt(secret: str, resource_type: str, resource_id: int, base_url: str, params: dict | None = None, exp_seconds: int = 3600, theme: str | None = None, bordered: bool = True, titled: bool = True) -> dict"
description: "Firma un JWT de static-embedding de Metabase (HS256 con PyJWT) y construye la URL del iframe (/embed/<type>/<token>#opciones). Soporta question y dashboard, params locked/enabled, TTL configurable y tema opcional."
tags: [metabase, embed, jwt]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["jwt", "time"]
tested: true
tests: ["test_firma_y_decodifica_round_trip", "test_resource_type_invalido_lanza_valueerror", "test_embed_url_dashboard"]
test_file_path: "python/functions/infra/sign_metabase_embed_jwt_test.py"
file_path: "python/functions/infra/sign_metabase_embed_jwt.py"
params:
- name: secret
desc: "Secret de embedding de Metabase (Settings > Embedding). NUNCA va al cliente."
- name: resource_type
desc: '"question" o "dashboard" — tipo del recurso a embeber.'
- name: resource_id
desc: "Id numerico de la card o dashboard en Metabase."
- name: base_url
desc: 'URL base de la instancia, ej. "https://reports.autingo.es".'
- name: params
desc: "Parametros de embedding locked/enabled (dict). Default {}."
- name: exp_seconds
desc: "TTL del token en segundos desde ahora. Default 3600 (1h)."
- name: theme
desc: 'Tema opcional: "night" | "transparent" | None.'
- name: bordered
desc: "Si el iframe muestra borde. Default True."
- name: titled
desc: "Si el iframe muestra titulo. Default True."
output: 'dict con {"token": jwt str, "embed_url": url completa /embed/<type>/<token>#opciones, "exp": unix int}.'
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from infra import sign_metabase_embed_jwt
# El secret lo pasa el caller (p.ej. desde pass: metabase/aurgi-embed-secret)
result = sign_metabase_embed_jwt(
secret=os.environ["MB_EMBED_SECRET"],
resource_type="question",
resource_id=8048,
base_url="https://reports.autingo.es",
)
print(result["embed_url"])
# https://reports.autingo.es/embed/question/<jwt>#bordered=true&titled=true
print(result["exp"]) # unix timestamp de expiracion
```
## Cuando usarla
Cuando necesites embeder una card/dashboard de Metabase en un iframe sin exponerla publicamente: firma server-side en cada carga, nunca hardcodees el token.
## Gotchas
- El `secret` NUNCA va al cliente — la firma se hace server-side y solo viaja el token al iframe.
- El token expira (`exp_seconds`, default 1h). Refirma en cada carga; no caches el token.
- La card/dashboard necesita `enable_embedding=true` en Metabase, o el endpoint de embed devuelve error.
- Los `params` deben estar registrados en `embedding_params` como `locked` o `enabled` en Metabase, o se ignoran silenciosamente.
- Verificado contra reports.autingo.es: el endpoint `/api/embed/card/<token>/query` devuelve 202 + filas con token valido, 400 con token manipulado.
@@ -0,0 +1,69 @@
"""Firma un JWT de static-embedding de Metabase y construye la URL del iframe."""
import time
import jwt
def sign_metabase_embed_jwt(
secret: str,
resource_type: str,
resource_id: int,
base_url: str,
params: dict | None = None,
exp_seconds: int = 3600,
theme: str | None = None,
bordered: bool = True,
titled: bool = True,
) -> dict:
"""Firma un JWT de static-embedding de Metabase (HS256) y arma la URL del iframe.
Args:
secret: secret de embedding de Metabase (Settings > Embedding). NUNCA va al cliente.
resource_type: "question" o "dashboard".
resource_id: id numerico de la card/dashboard en Metabase.
base_url: URL base de la instancia, ej. "https://reports.autingo.es".
params: parametros de embedding locked/enabled. Default {}.
exp_seconds: TTL del token en segundos desde ahora. Default 3600 (1h).
theme: tema opcional ("night" | "transparent" | None).
bordered: si el iframe muestra borde. Default True.
titled: si el iframe muestra titulo. Default True.
Returns:
dict con {"token": <jwt str>, "embed_url": <url completa>, "exp": <unix int>}.
Raises:
ValueError: si resource_type no es "question" ni "dashboard".
"""
if resource_type not in ("question", "dashboard"):
raise ValueError(
f'resource_type debe ser "question" o "dashboard", recibido: {resource_type!r}'
)
exp = int(time.time()) + exp_seconds
payload = {
"resource": {resource_type: resource_id},
"params": params or {},
"exp": exp,
}
token = jwt.encode(payload, secret, algorithm="HS256")
options = [f"bordered={str(bordered).lower()}", f"titled={str(titled).lower()}"]
if theme is not None:
options.append(f"theme={theme}")
fragment = "&".join(options)
embed_url = f"{base_url.rstrip('/')}/embed/{resource_type}/{token}#{fragment}"
return {"token": token, "embed_url": embed_url, "exp": exp}
if __name__ == "__main__":
result = sign_metabase_embed_jwt(
secret="0" * 64,
resource_type="question",
resource_id=8048,
base_url="https://reports.autingo.es",
)
print(result["embed_url"])
print(f"exp={result['exp']}")
@@ -0,0 +1,42 @@
"""Tests para sign_metabase_embed_jwt."""
import jwt
import pytest
from sign_metabase_embed_jwt import sign_metabase_embed_jwt
def test_firma_y_decodifica_round_trip():
secret = "test-secret-1234567890"
result = sign_metabase_embed_jwt(
secret=secret,
resource_type="question",
resource_id=8048,
base_url="https://reports.autingo.es",
params={"category": "shoes"},
)
decoded = jwt.decode(result["token"], secret, algorithms=["HS256"])
assert decoded["resource"] == {"question": 8048}
assert decoded["params"] == {"category": "shoes"}
assert decoded["exp"] == result["exp"]
def test_resource_type_invalido_lanza_valueerror():
with pytest.raises(ValueError):
sign_metabase_embed_jwt(
secret="s",
resource_type="bogus",
resource_id=1,
base_url="https://reports.autingo.es",
)
def test_embed_url_dashboard():
result = sign_metabase_embed_jwt(
secret="s",
resource_type="dashboard",
resource_id=42,
base_url="https://reports.autingo.es",
)
assert "/embed/dashboard/" in result["embed_url"]
assert result["embed_url"].endswith("#bordered=true&titled=true")
@@ -0,0 +1,76 @@
---
name: sync_xlsx_to_dq_checks
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def sync_xlsx_to_dq_checks(xlsx_path: str, project: str = 'autingo-159109', dataset: str = 'data_quality', table: str = 'dq_checks', sheet_name: str = 'catalogo', validate_sql: bool = True) -> dict"
description: "Sincroniza un catalogo de data quality checks desde un archivo Excel (.xlsx) hacia una tabla de BigQuery. Lee la hoja con openpyxl localizando columnas por cabecera, valida/normaliza cada fila (tipo fiabilidad|negocio, severity info|warning|critical, threshold int>=0, enabled booleano flexible), auto-deriva check_id unico cuando viene vacio, hace dry-run de cada assert_sql contra BigQuery para cazar SQL invalido, y carga las filas validas con WRITE_TRUNCATE (reemplazo total: el Excel es la fuente de verdad). Devuelve resumen con loaded/skipped/sql_errors/rows."
tags: [bigquery, data-quality, excel, sync]
uses_functions: [bq_auth_py_infra, bq_query_py_infra, bq_load_from_file_py_infra]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["openpyxl", "csv", "tempfile", "re", "os", "datetime"]
tested: true
tests: ["test_fila_valida_minima_se_carga", "test_check_id_se_autoderiva_cuando_vacio", "test_check_id_explicito_se_respeta", "test_check_id_colision_se_sufija", "test_tipo_invalido_se_skipea", "test_tipo_case_insensitive_se_normaliza", "test_pregunta_vacia_se_skipea", "test_assert_sql_vacia_se_skipea", "test_severity_invalida_se_skipea", "test_threshold_negativo_se_skipea", "test_threshold_no_numerico_se_skipea", "test_fila_totalmente_vacia_se_ignora_sin_skip", "test_parse_bool_variantes", "test_parse_threshold_defaults_y_floats_enteros", "test_slug_colapsa_y_recorta", "test_assert_sql_con_comas_y_comillas_se_conserva"]
test_file_path: "python/functions/infra/sync_xlsx_to_dq_checks_test.py"
file_path: "python/functions/infra/sync_xlsx_to_dq_checks.py"
params:
- name: xlsx_path
desc: "Ruta absoluta al archivo .xlsx del catalogo de asserts (fila 1 = cabeceras)."
- name: project
desc: "Proyecto GCP destino. Default 'autingo-159109'."
- name: dataset
desc: "Dataset BigQuery destino. Default 'data_quality'."
- name: table
desc: "Tabla BigQuery destino. Default 'dq_checks'."
- name: sheet_name
desc: "Nombre de la hoja del libro a leer. Default 'catalogo'."
- name: validate_sql
desc: "Si True, hace un dry-run de cada assert_sql contra BigQuery antes de cargarlo; las filas con SQL invalido NO se cargan y van a sql_errors. Default True."
output: 'dict con {"loaded": int filas cargadas, "skipped": int filas descartadas por validacion, "sql_errors": [{"check_id": str, "error": str}] filas con assert_sql invalido, "rows": [dict] filas cargadas con sus 11 columnas normalizadas}.'
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from infra.sync_xlsx_to_dq_checks import sync_xlsx_to_dq_checks
# El xlsx es la fuente de verdad del catalogo de checks.
result = sync_xlsx_to_dq_checks(
xlsx_path="/mnt/c/Users/egutierrez/Documents/dq_catalogo_asserts.xlsx",
project="autingo-159109",
dataset="data_quality",
table="dq_checks",
sheet_name="catalogo",
validate_sql=True,
)
print(f"cargadas={result['loaded']} descartadas={result['skipped']}")
for err in result["sql_errors"]:
print(f"SQL invalido en {err['check_id']}: {err['error']}")
```
O directamente con `fn run` (lee el path por defecto si no se pasa argumento):
```bash
./fn run sync_xlsx_to_dq_checks /mnt/c/Users/egutierrez/Documents/dq_catalogo_asserts.xlsx
```
## Cuando usarla
Cuando edites el Excel del catalogo de asserts (anadir/quitar/modificar checks) y quieras propagar esos cambios a la tabla `dq_checks` de BigQuery, antes de la corrida horaria que ejecuta los asserts. Es el paso de sincronizacion que convierte el Excel humano en la tabla que consume el motor de data quality.
## Gotchas
- Impura: requiere ADC de gcloud configurado (`gcloud auth application-default login`) con acceso de escritura a BigQuery sobre el proyecto/dataset destino.
- `WRITE_TRUNCATE` borra y reemplaza la tabla `dq_checks` ENTERA en cada corrida. El Excel es la unica fuente de verdad: lo que no este en el Excel desaparece de BigQuery. No es un upsert.
- El dry-run (`validate_sql=True`) consume cuota minima pero hace un round-trip a BigQuery por cada fila — con catalogos grandes la corrida tarda proporcionalmente. Pasa `validate_sql=False` para saltarlo si ya validaste el SQL por otra via.
- `assert_sql` con comas, comillas o saltos de linea se escribe al CSV temporal con `csv.QUOTE_ALL`, de modo que el CSV que carga BigQuery siempre es valido. El archivo temporal se borra al terminar (tambien en caso de error).
- Las columnas se localizan por nombre de cabecera (fila 1), no por posicion: puedes reordenar columnas en el Excel sin romper el sync, pero las cabeceras deben llamarse exactamente `check_id, tipo, subcategoria, pregunta, assert_sql, threshold, severity, enabled, owner`.
- `created_at`/`updated_at` se generan en el momento del sync (mismo timestamp UTC para todas las filas de la corrida) — no se leen del Excel.
- Las filas que no pasan validacion (tipo/pregunta/assert_sql/severity/threshold) NO se cargan y se cuentan en `skipped`; las filas totalmente vacias se ignoran sin contarse.
@@ -0,0 +1,331 @@
"""Sincroniza un catalogo de data quality checks desde un .xlsx a BigQuery.
El Excel es la fuente de verdad: cada corrida reemplaza la tabla destino entera
(WRITE_TRUNCATE). Antes de cargar, valida cada fila y (opcionalmente) hace un
dry-run del assert_sql contra BigQuery para cazar SQL invalido en el momento.
"""
import csv
import os
import re
import tempfile
from datetime import datetime, timezone
# Columnas del schema destino, en orden. El CSV temporal y el load respetan
# exactamente este orden.
SCHEMA_COLUMNS = [
"check_id",
"tipo",
"subcategoria",
"pregunta",
"assert_sql",
"threshold",
"severity",
"enabled",
"owner",
"created_at",
"updated_at",
]
# Cabeceras que se leen del Excel (sin created_at/updated_at, que se generan).
INPUT_HEADERS = [
"check_id",
"tipo",
"subcategoria",
"pregunta",
"assert_sql",
"threshold",
"severity",
"enabled",
"owner",
]
VALID_TIPOS = {"fiabilidad", "negocio"}
VALID_SEVERITIES = {"info", "warning", "critical"}
TRUE_TOKENS = {"true", "1", "si", "", "x", "yes", "verdadero"}
FALSE_TOKENS = {"false", "0", "no", "", "falso"}
def _slug(text: str) -> str:
"""Pasa un texto a slug: minusculas, no-alfanumericos -> _, colapsa y recorta."""
s = (text or "").strip().lower()
s = re.sub(r"[^a-z0-9]+", "_", s)
s = re.sub(r"_+", "_", s)
return s.strip("_")
def _norm(value) -> str:
"""Normaliza una celda a string strip-eado. None -> ''."""
if value is None:
return ""
return str(value).strip()
def _parse_bool(value) -> bool:
"""Interpreta TRUE/1/si/sí/x como True; FALSE/0/vacio como False. Default True."""
token = _norm(value).lower()
if token in TRUE_TOKENS:
return True
if token in FALSE_TOKENS:
return False
# Valor desconocido no vacio: tratar como True (la celda trae algo afirmativo).
return True
def _parse_threshold(value):
"""Devuelve (int>=0, None) si valido, o (None, motivo) si no.
Vacio/None -> 0. Acepta enteros y floats que sean enteros.
"""
token = _norm(value)
if token == "":
return 0, None
try:
num = float(token.replace(",", "."))
except (ValueError, TypeError):
return None, f"threshold no es numerico: {token!r}"
if num != int(num):
return None, f"threshold debe ser entero, recibido: {token!r}"
num = int(num)
if num < 0:
return None, f"threshold debe ser >= 0, recibido: {num}"
return num, None
def _parse_catalog_rows(rows: list[dict]) -> tuple[list[dict], list[dict]]:
"""Parsea, valida y normaliza filas crudas del catalogo. Funcion pura.
Args:
rows: lista de dicts cabecera->valor crudo, una por fila del Excel.
Returns:
(valid, skipped) donde:
- valid: lista de dicts con las 11 columnas del schema destino,
check_id unico garantizado, created_at/updated_at en ISO 8601 UTC.
- skipped: lista de dicts {"row": dict_original, "reason": str}.
"""
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
valid: list[dict] = []
skipped: list[dict] = []
seen_ids: set[str] = set()
for raw in rows:
# Ignora filas totalmente vacias.
if all(_norm(raw.get(h)) == "" for h in INPUT_HEADERS):
continue
tipo = _norm(raw.get("tipo")).lower()
subcategoria = _norm(raw.get("subcategoria"))
pregunta = _norm(raw.get("pregunta"))
assert_sql = _norm(raw.get("assert_sql"))
severity = _norm(raw.get("severity")).lower()
owner = _norm(raw.get("owner"))
check_id = _norm(raw.get("check_id"))
# Validaciones.
if tipo not in VALID_TIPOS:
skipped.append({"row": raw, "reason": f"tipo invalido: {tipo!r} (esperado fiabilidad|negocio)"})
continue
if pregunta == "":
skipped.append({"row": raw, "reason": "pregunta vacia"})
continue
if assert_sql == "":
skipped.append({"row": raw, "reason": "assert_sql vacia"})
continue
if severity == "":
severity = "warning"
elif severity not in VALID_SEVERITIES:
skipped.append({"row": raw, "reason": f"severity invalida: {severity!r} (esperado info|warning|critical)"})
continue
threshold, threshold_err = _parse_threshold(raw.get("threshold"))
if threshold_err is not None:
skipped.append({"row": raw, "reason": threshold_err})
continue
enabled = _parse_bool(raw.get("enabled"))
# Auto-deriva check_id si viene vacio.
if check_id == "":
check_id = f"{tipo}__{_slug(subcategoria)}_{_slug(pregunta)[:40]}"
# Garantiza unicidad sufijando _2, _3, ...
base_id = check_id
suffix = 2
while check_id in seen_ids:
check_id = f"{base_id}_{suffix}"
suffix += 1
seen_ids.add(check_id)
valid.append({
"check_id": check_id,
"tipo": tipo,
"subcategoria": subcategoria,
"pregunta": pregunta,
"assert_sql": assert_sql,
"threshold": threshold,
"severity": severity,
"enabled": enabled,
"owner": owner,
"created_at": now_iso,
"updated_at": now_iso,
})
return valid, skipped
def _read_xlsx_rows(xlsx_path: str, sheet_name: str) -> list[dict]:
"""Lee la hoja indicada del .xlsx y devuelve filas como dicts cabecera->valor.
La fila 1 son las cabeceras. Localiza las columnas por nombre de cabecera,
no por posicion. Devuelve solo las cabeceras conocidas (INPUT_HEADERS).
"""
from openpyxl import load_workbook
wb = load_workbook(filename=xlsx_path, read_only=True, data_only=True)
if sheet_name not in wb.sheetnames:
wb.close()
raise ValueError(
f"hoja {sheet_name!r} no encontrada en {xlsx_path}. Hojas: {wb.sheetnames}"
)
ws = wb[sheet_name]
rows_iter = ws.iter_rows(values_only=True)
try:
header_row = next(rows_iter)
except StopIteration:
wb.close()
return []
# Mapa cabecera-normalizada -> indice de columna.
header_index: dict[str, int] = {}
for idx, cell in enumerate(header_row):
name = _norm(cell).lower()
if name in INPUT_HEADERS:
header_index[name] = idx
rows: list[dict] = []
for excel_row in rows_iter:
row_dict = {}
for header in INPUT_HEADERS:
idx = header_index.get(header)
if idx is not None and idx < len(excel_row):
row_dict[header] = excel_row[idx]
else:
row_dict[header] = None
rows.append(row_dict)
wb.close()
return rows
def sync_xlsx_to_dq_checks(
xlsx_path: str,
project: str = "autingo-159109",
dataset: str = "data_quality",
table: str = "dq_checks",
sheet_name: str = "catalogo",
validate_sql: bool = True,
) -> dict:
"""Sincroniza un catalogo de data quality checks desde un .xlsx a BigQuery.
Lee la hoja del Excel, valida/normaliza cada fila, (opcionalmente) hace un
dry-run de cada assert_sql contra BigQuery, y carga las filas validas a
project.dataset.table con WRITE_TRUNCATE (reemplazo total: el xlsx manda).
Args:
xlsx_path: ruta al archivo .xlsx del catalogo.
project: proyecto GCP destino. Default "autingo-159109".
dataset: dataset BigQuery destino. Default "data_quality".
table: tabla BigQuery destino. Default "dq_checks".
sheet_name: nombre de la hoja a leer. Default "catalogo".
validate_sql: si True, dry-run de cada assert_sql contra BQ antes de cargar.
Returns:
dict con {"loaded": int, "skipped": int, "sql_errors": [...], "rows": [...]}
donde rows es la lista de filas cargadas (con sus campos normalizados).
Raises:
FileNotFoundError: si xlsx_path no existe.
ValueError: si la hoja no existe en el libro.
google.api_core.exceptions.GoogleAPICallError: si la carga a BQ falla.
"""
from bigquery import bq_auth, bq_query, bq_load_from_file
if not os.path.exists(xlsx_path):
raise FileNotFoundError(f"no existe el archivo xlsx: {xlsx_path}")
raw_rows = _read_xlsx_rows(xlsx_path, sheet_name)
valid, skipped = _parse_catalog_rows(raw_rows)
client = bq_auth(project_id=project)
sql_errors: list[dict] = []
loadable: list[dict] = []
if validate_sql:
for row in valid:
try:
bq_query(client, row["assert_sql"], dry_run=True)
loadable.append(row)
except Exception as exc: # noqa: BLE001 — capturamos cualquier fallo de BQ
sql_errors.append({"check_id": row["check_id"], "error": str(exc)})
else:
loadable = valid
# Escribe las filas cargables a un CSV temporal con QUOTE_ALL: el assert_sql
# puede contener comas, comillas y saltos de linea.
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".csv", prefix="dq_checks_")
os.close(tmp_fd)
try:
with open(tmp_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL)
writer.writerow(SCHEMA_COLUMNS)
for row in loadable:
writer.writerow([
row["check_id"],
row["tipo"],
row["subcategoria"],
row["pregunta"],
row["assert_sql"],
row["threshold"],
row["severity"],
# BigQuery CSV BOOL acepta true/false en minusculas.
"true" if row["enabled"] else "false",
row["owner"],
row["created_at"],
row["updated_at"],
])
bq_load_from_file(
client,
tmp_path,
dataset,
table,
source_format="CSV",
write_disposition="WRITE_TRUNCATE",
skip_leading_rows=1,
autodetect=False,
)
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
return {
"loaded": len(loadable),
"skipped": len(skipped),
"sql_errors": sql_errors,
"rows": loadable,
}
if __name__ == "__main__":
import json
import sys
path = sys.argv[1] if len(sys.argv) > 1 else "/mnt/c/Users/egutierrez/Documents/dq_catalogo_asserts.xlsx"
result = sync_xlsx_to_dq_checks(path)
print(json.dumps({k: v for k, v in result.items() if k != "rows"}, ensure_ascii=False, indent=2))
print(f"loaded={result['loaded']} skipped={result['skipped']} sql_errors={len(result['sql_errors'])}")
@@ -0,0 +1,171 @@
"""Tests para sync_xlsx_to_dq_checks (logica pura: parseo/validacion/derivacion).
No tocan BigQuery: la autenticacion, el dry-run y la carga quedan fuera del
test unitario. Solo se ejercita _parse_catalog_rows y sus helpers.
"""
from sync_xlsx_to_dq_checks import _parse_catalog_rows, _slug, _parse_bool, _parse_threshold
def _row(**kwargs) -> dict:
base = {
"check_id": "",
"tipo": "",
"subcategoria": "",
"pregunta": "",
"assert_sql": "",
"threshold": "",
"severity": "",
"enabled": "",
"owner": "",
}
base.update(kwargs)
return base
def test_fila_valida_minima_se_carga():
valid, skipped = _parse_catalog_rows([
_row(tipo="fiabilidad", pregunta=" hay nulos? ", assert_sql="SELECT 1", enabled="TRUE"),
])
assert len(valid) == 1
assert len(skipped) == 0
r = valid[0]
assert r["tipo"] == "fiabilidad"
assert r["pregunta"] == "hay nulos?"
assert r["severity"] == "warning" # default
assert r["threshold"] == 0 # default
assert r["enabled"] is True
assert r["owner"] == ""
assert r["created_at"] == r["updated_at"]
assert r["created_at"].endswith("Z")
def test_check_id_se_autoderiva_cuando_vacio():
valid, _ = _parse_catalog_rows([
_row(tipo="negocio", subcategoria="Ventas Diarias",
pregunta="¿La venta es positiva?", assert_sql="SELECT 1"),
])
cid = valid[0]["check_id"]
assert cid.startswith("negocio__ventas_diarias_")
assert "la_venta_es_positiva" in cid
def test_check_id_explicito_se_respeta():
valid, _ = _parse_catalog_rows([
_row(check_id="custom_id_1", tipo="fiabilidad",
pregunta="x", assert_sql="SELECT 1"),
])
assert valid[0]["check_id"] == "custom_id_1"
def test_check_id_colision_se_sufija():
valid, _ = _parse_catalog_rows([
_row(check_id="dup", tipo="fiabilidad", pregunta="a", assert_sql="SELECT 1"),
_row(check_id="dup", tipo="negocio", pregunta="b", assert_sql="SELECT 2"),
_row(check_id="dup", tipo="negocio", pregunta="c", assert_sql="SELECT 3"),
])
ids = [r["check_id"] for r in valid]
assert ids == ["dup", "dup_2", "dup_3"]
def test_tipo_invalido_se_skipea():
valid, skipped = _parse_catalog_rows([
_row(tipo="otracosa", pregunta="x", assert_sql="SELECT 1"),
])
assert valid == []
assert len(skipped) == 1
assert "tipo invalido" in skipped[0]["reason"]
def test_tipo_case_insensitive_se_normaliza():
valid, skipped = _parse_catalog_rows([
_row(tipo=" Fiabilidad ", pregunta="x", assert_sql="SELECT 1"),
])
assert len(valid) == 1
assert skipped == []
assert valid[0]["tipo"] == "fiabilidad"
def test_pregunta_vacia_se_skipea():
valid, skipped = _parse_catalog_rows([
_row(tipo="negocio", pregunta=" ", assert_sql="SELECT 1"),
])
assert valid == []
assert skipped[0]["reason"] == "pregunta vacia"
def test_assert_sql_vacia_se_skipea():
valid, skipped = _parse_catalog_rows([
_row(tipo="negocio", pregunta="x", assert_sql=""),
])
assert valid == []
assert skipped[0]["reason"] == "assert_sql vacia"
def test_severity_invalida_se_skipea():
valid, skipped = _parse_catalog_rows([
_row(tipo="negocio", pregunta="x", assert_sql="SELECT 1", severity="urgente"),
])
assert valid == []
assert "severity invalida" in skipped[0]["reason"]
def test_threshold_negativo_se_skipea():
valid, skipped = _parse_catalog_rows([
_row(tipo="negocio", pregunta="x", assert_sql="SELECT 1", threshold="-5"),
])
assert valid == []
assert "threshold debe ser >= 0" in skipped[0]["reason"]
def test_threshold_no_numerico_se_skipea():
valid, skipped = _parse_catalog_rows([
_row(tipo="negocio", pregunta="x", assert_sql="SELECT 1", threshold="abc"),
])
assert valid == []
assert "threshold no es numerico" in skipped[0]["reason"]
def test_fila_totalmente_vacia_se_ignora_sin_skip():
valid, skipped = _parse_catalog_rows([
_row(), # todo vacio
_row(tipo="negocio", pregunta="x", assert_sql="SELECT 1"),
])
assert len(valid) == 1
assert skipped == [] # la vacia se ignora, no se cuenta como skipped
def test_parse_bool_variantes():
assert _parse_bool("TRUE") is True
assert _parse_bool("True") is True
assert _parse_bool(1) is True
assert _parse_bool("si") is True
assert _parse_bool("") is True
assert _parse_bool("x") is True
assert _parse_bool("FALSE") is False
assert _parse_bool("0") is False
assert _parse_bool("") is False
assert _parse_bool(None) is False
def test_parse_threshold_defaults_y_floats_enteros():
assert _parse_threshold("") == (0, None)
assert _parse_threshold(None) == (0, None)
assert _parse_threshold("3") == (3, None)
assert _parse_threshold("3.0") == (3, None)
val, err = _parse_threshold("3.5")
assert val is None and "entero" in err
def test_slug_colapsa_y_recorta():
assert _slug(" Hola, Mundo!! ") == "hola_mundo"
assert _slug("A---B___C") == "a_b_c"
assert _slug("") == ""
def test_assert_sql_con_comas_y_comillas_se_conserva():
sql = 'SELECT COUNT(*) FROM t WHERE col IN ("a", "b"), foo = 1'
valid, _ = _parse_catalog_rows([
_row(tipo="fiabilidad", pregunta="x", assert_sql=sql),
])
assert valid[0]["assert_sql"] == sql