diff --git a/.mcp.json b/.mcp.json index 2e68d263..d24f676e 100644 --- a/.mcp.json +++ b/.mcp.json @@ -6,7 +6,7 @@ }, "jupyter": { "command": "bash", - "args": ["/home/enmanuel/fn_registry/bash/functions/infra/jupyter_mcp_serve.sh"] + "args": ["-c", "exec bash \"$(git rev-parse --show-toplevel)/bash/functions/infra/jupyter_mcp_serve.sh\""] } } } diff --git a/cpp/apps/chart_demo b/cpp/apps/chart_demo new file mode 160000 index 00000000..026f514b --- /dev/null +++ b/cpp/apps/chart_demo @@ -0,0 +1 @@ +Subproject commit 026f514bb72d5fb00c57493ea09101a3d2cf4894 diff --git a/cpp/apps/shaders_lab b/cpp/apps/shaders_lab new file mode 160000 index 00000000..dc9a970a --- /dev/null +++ b/cpp/apps/shaders_lab @@ -0,0 +1 @@ +Subproject commit dc9a970aff759837bfe12c5e640a66dc64edfbd2 diff --git a/python/functions/infra/sign_metabase_embed_jwt.md b/python/functions/infra/sign_metabase_embed_jwt.md new file mode 100644 index 00000000..01ff63c4 --- /dev/null +++ b/python/functions/infra/sign_metabase_embed_jwt.md @@ -0,0 +1,72 @@ +--- +name: sign_metabase_embed_jwt +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def sign_metabase_embed_jwt(secret: str, resource_type: str, resource_id: int, base_url: str, params: dict | None = None, exp_seconds: int = 3600, theme: str | None = None, bordered: bool = True, titled: bool = True) -> dict" +description: "Firma un JWT de static-embedding de Metabase (HS256 con PyJWT) y construye la URL del iframe (/embed//#opciones). Soporta question y dashboard, params locked/enabled, TTL configurable y tema opcional." +tags: [metabase, embed, jwt] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: ["jwt", "time"] +tested: true +tests: ["test_firma_y_decodifica_round_trip", "test_resource_type_invalido_lanza_valueerror", "test_embed_url_dashboard"] +test_file_path: "python/functions/infra/sign_metabase_embed_jwt_test.py" +file_path: "python/functions/infra/sign_metabase_embed_jwt.py" +params: + - name: secret + desc: "Secret de embedding de Metabase (Settings > Embedding). NUNCA va al cliente." + - name: resource_type + desc: '"question" o "dashboard" — tipo del recurso a embeber.' + - name: resource_id + desc: "Id numerico de la card o dashboard en Metabase." + - name: base_url + desc: 'URL base de la instancia, ej. "https://reports.autingo.es".' + - name: params + desc: "Parametros de embedding locked/enabled (dict). Default {}." + - name: exp_seconds + desc: "TTL del token en segundos desde ahora. Default 3600 (1h)." + - name: theme + desc: 'Tema opcional: "night" | "transparent" | None.' + - name: bordered + desc: "Si el iframe muestra borde. Default True." + - name: titled + desc: "Si el iframe muestra titulo. Default True." +output: 'dict con {"token": jwt str, "embed_url": url completa /embed//#opciones, "exp": unix int}.' +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join("python", "functions")) +from infra import sign_metabase_embed_jwt + +# El secret lo pasa el caller (p.ej. desde pass: metabase/aurgi-embed-secret) +result = sign_metabase_embed_jwt( + secret=os.environ["MB_EMBED_SECRET"], + resource_type="question", + resource_id=8048, + base_url="https://reports.autingo.es", +) +print(result["embed_url"]) +# https://reports.autingo.es/embed/question/#bordered=true&titled=true +print(result["exp"]) # unix timestamp de expiracion +``` + +## Cuando usarla + +Cuando necesites embeder una card/dashboard de Metabase en un iframe sin exponerla publicamente: firma server-side en cada carga, nunca hardcodees el token. + +## Gotchas + +- El `secret` NUNCA va al cliente — la firma se hace server-side y solo viaja el token al iframe. +- El token expira (`exp_seconds`, default 1h). Refirma en cada carga; no caches el token. +- La card/dashboard necesita `enable_embedding=true` en Metabase, o el endpoint de embed devuelve error. +- Los `params` deben estar registrados en `embedding_params` como `locked` o `enabled` en Metabase, o se ignoran silenciosamente. +- Verificado contra reports.autingo.es: el endpoint `/api/embed/card//query` devuelve 202 + filas con token valido, 400 con token manipulado. diff --git a/python/functions/infra/sign_metabase_embed_jwt.py b/python/functions/infra/sign_metabase_embed_jwt.py new file mode 100644 index 00000000..3e1fca48 --- /dev/null +++ b/python/functions/infra/sign_metabase_embed_jwt.py @@ -0,0 +1,69 @@ +"""Firma un JWT de static-embedding de Metabase y construye la URL del iframe.""" + +import time + +import jwt + + +def sign_metabase_embed_jwt( + secret: str, + resource_type: str, + resource_id: int, + base_url: str, + params: dict | None = None, + exp_seconds: int = 3600, + theme: str | None = None, + bordered: bool = True, + titled: bool = True, +) -> dict: + """Firma un JWT de static-embedding de Metabase (HS256) y arma la URL del iframe. + + Args: + secret: secret de embedding de Metabase (Settings > Embedding). NUNCA va al cliente. + resource_type: "question" o "dashboard". + resource_id: id numerico de la card/dashboard en Metabase. + base_url: URL base de la instancia, ej. "https://reports.autingo.es". + params: parametros de embedding locked/enabled. Default {}. + exp_seconds: TTL del token en segundos desde ahora. Default 3600 (1h). + theme: tema opcional ("night" | "transparent" | None). + bordered: si el iframe muestra borde. Default True. + titled: si el iframe muestra titulo. Default True. + + Returns: + dict con {"token": , "embed_url": , "exp": }. + + Raises: + ValueError: si resource_type no es "question" ni "dashboard". + """ + if resource_type not in ("question", "dashboard"): + raise ValueError( + f'resource_type debe ser "question" o "dashboard", recibido: {resource_type!r}' + ) + + exp = int(time.time()) + exp_seconds + payload = { + "resource": {resource_type: resource_id}, + "params": params or {}, + "exp": exp, + } + token = jwt.encode(payload, secret, algorithm="HS256") + + options = [f"bordered={str(bordered).lower()}", f"titled={str(titled).lower()}"] + if theme is not None: + options.append(f"theme={theme}") + fragment = "&".join(options) + + embed_url = f"{base_url.rstrip('/')}/embed/{resource_type}/{token}#{fragment}" + + return {"token": token, "embed_url": embed_url, "exp": exp} + + +if __name__ == "__main__": + result = sign_metabase_embed_jwt( + secret="0" * 64, + resource_type="question", + resource_id=8048, + base_url="https://reports.autingo.es", + ) + print(result["embed_url"]) + print(f"exp={result['exp']}") diff --git a/python/functions/infra/sign_metabase_embed_jwt_test.py b/python/functions/infra/sign_metabase_embed_jwt_test.py new file mode 100644 index 00000000..622cc25d --- /dev/null +++ b/python/functions/infra/sign_metabase_embed_jwt_test.py @@ -0,0 +1,42 @@ +"""Tests para sign_metabase_embed_jwt.""" + +import jwt +import pytest + +from sign_metabase_embed_jwt import sign_metabase_embed_jwt + + +def test_firma_y_decodifica_round_trip(): + secret = "test-secret-1234567890" + result = sign_metabase_embed_jwt( + secret=secret, + resource_type="question", + resource_id=8048, + base_url="https://reports.autingo.es", + params={"category": "shoes"}, + ) + decoded = jwt.decode(result["token"], secret, algorithms=["HS256"]) + assert decoded["resource"] == {"question": 8048} + assert decoded["params"] == {"category": "shoes"} + assert decoded["exp"] == result["exp"] + + +def test_resource_type_invalido_lanza_valueerror(): + with pytest.raises(ValueError): + sign_metabase_embed_jwt( + secret="s", + resource_type="bogus", + resource_id=1, + base_url="https://reports.autingo.es", + ) + + +def test_embed_url_dashboard(): + result = sign_metabase_embed_jwt( + secret="s", + resource_type="dashboard", + resource_id=42, + base_url="https://reports.autingo.es", + ) + assert "/embed/dashboard/" in result["embed_url"] + assert result["embed_url"].endswith("#bordered=true&titled=true") diff --git a/python/functions/infra/sync_xlsx_to_dq_checks.md b/python/functions/infra/sync_xlsx_to_dq_checks.md new file mode 100644 index 00000000..22fd0b7c --- /dev/null +++ b/python/functions/infra/sync_xlsx_to_dq_checks.md @@ -0,0 +1,76 @@ +--- +name: sync_xlsx_to_dq_checks +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def sync_xlsx_to_dq_checks(xlsx_path: str, project: str = 'autingo-159109', dataset: str = 'data_quality', table: str = 'dq_checks', sheet_name: str = 'catalogo', validate_sql: bool = True) -> dict" +description: "Sincroniza un catalogo de data quality checks desde un archivo Excel (.xlsx) hacia una tabla de BigQuery. Lee la hoja con openpyxl localizando columnas por cabecera, valida/normaliza cada fila (tipo fiabilidad|negocio, severity info|warning|critical, threshold int>=0, enabled booleano flexible), auto-deriva check_id unico cuando viene vacio, hace dry-run de cada assert_sql contra BigQuery para cazar SQL invalido, y carga las filas validas con WRITE_TRUNCATE (reemplazo total: el Excel es la fuente de verdad). Devuelve resumen con loaded/skipped/sql_errors/rows." +tags: [bigquery, data-quality, excel, sync] +uses_functions: [bq_auth_py_infra, bq_query_py_infra, bq_load_from_file_py_infra] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: ["openpyxl", "csv", "tempfile", "re", "os", "datetime"] +tested: true +tests: ["test_fila_valida_minima_se_carga", "test_check_id_se_autoderiva_cuando_vacio", "test_check_id_explicito_se_respeta", "test_check_id_colision_se_sufija", "test_tipo_invalido_se_skipea", "test_tipo_case_insensitive_se_normaliza", "test_pregunta_vacia_se_skipea", "test_assert_sql_vacia_se_skipea", "test_severity_invalida_se_skipea", "test_threshold_negativo_se_skipea", "test_threshold_no_numerico_se_skipea", "test_fila_totalmente_vacia_se_ignora_sin_skip", "test_parse_bool_variantes", "test_parse_threshold_defaults_y_floats_enteros", "test_slug_colapsa_y_recorta", "test_assert_sql_con_comas_y_comillas_se_conserva"] +test_file_path: "python/functions/infra/sync_xlsx_to_dq_checks_test.py" +file_path: "python/functions/infra/sync_xlsx_to_dq_checks.py" +params: + - name: xlsx_path + desc: "Ruta absoluta al archivo .xlsx del catalogo de asserts (fila 1 = cabeceras)." + - name: project + desc: "Proyecto GCP destino. Default 'autingo-159109'." + - name: dataset + desc: "Dataset BigQuery destino. Default 'data_quality'." + - name: table + desc: "Tabla BigQuery destino. Default 'dq_checks'." + - name: sheet_name + desc: "Nombre de la hoja del libro a leer. Default 'catalogo'." + - name: validate_sql + desc: "Si True, hace un dry-run de cada assert_sql contra BigQuery antes de cargarlo; las filas con SQL invalido NO se cargan y van a sql_errors. Default True." +output: 'dict con {"loaded": int filas cargadas, "skipped": int filas descartadas por validacion, "sql_errors": [{"check_id": str, "error": str}] filas con assert_sql invalido, "rows": [dict] filas cargadas con sus 11 columnas normalizadas}.' +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join("python", "functions")) +from infra.sync_xlsx_to_dq_checks import sync_xlsx_to_dq_checks + +# El xlsx es la fuente de verdad del catalogo de checks. +result = sync_xlsx_to_dq_checks( + xlsx_path="/mnt/c/Users/egutierrez/Documents/dq_catalogo_asserts.xlsx", + project="autingo-159109", + dataset="data_quality", + table="dq_checks", + sheet_name="catalogo", + validate_sql=True, +) +print(f"cargadas={result['loaded']} descartadas={result['skipped']}") +for err in result["sql_errors"]: + print(f"SQL invalido en {err['check_id']}: {err['error']}") +``` + +O directamente con `fn run` (lee el path por defecto si no se pasa argumento): + +```bash +./fn run sync_xlsx_to_dq_checks /mnt/c/Users/egutierrez/Documents/dq_catalogo_asserts.xlsx +``` + +## Cuando usarla + +Cuando edites el Excel del catalogo de asserts (anadir/quitar/modificar checks) y quieras propagar esos cambios a la tabla `dq_checks` de BigQuery, antes de la corrida horaria que ejecuta los asserts. Es el paso de sincronizacion que convierte el Excel humano en la tabla que consume el motor de data quality. + +## Gotchas + +- Impura: requiere ADC de gcloud configurado (`gcloud auth application-default login`) con acceso de escritura a BigQuery sobre el proyecto/dataset destino. +- `WRITE_TRUNCATE` borra y reemplaza la tabla `dq_checks` ENTERA en cada corrida. El Excel es la unica fuente de verdad: lo que no este en el Excel desaparece de BigQuery. No es un upsert. +- El dry-run (`validate_sql=True`) consume cuota minima pero hace un round-trip a BigQuery por cada fila — con catalogos grandes la corrida tarda proporcionalmente. Pasa `validate_sql=False` para saltarlo si ya validaste el SQL por otra via. +- `assert_sql` con comas, comillas o saltos de linea se escribe al CSV temporal con `csv.QUOTE_ALL`, de modo que el CSV que carga BigQuery siempre es valido. El archivo temporal se borra al terminar (tambien en caso de error). +- Las columnas se localizan por nombre de cabecera (fila 1), no por posicion: puedes reordenar columnas en el Excel sin romper el sync, pero las cabeceras deben llamarse exactamente `check_id, tipo, subcategoria, pregunta, assert_sql, threshold, severity, enabled, owner`. +- `created_at`/`updated_at` se generan en el momento del sync (mismo timestamp UTC para todas las filas de la corrida) — no se leen del Excel. +- Las filas que no pasan validacion (tipo/pregunta/assert_sql/severity/threshold) NO se cargan y se cuentan en `skipped`; las filas totalmente vacias se ignoran sin contarse. diff --git a/python/functions/infra/sync_xlsx_to_dq_checks.py b/python/functions/infra/sync_xlsx_to_dq_checks.py new file mode 100644 index 00000000..b5fbe103 --- /dev/null +++ b/python/functions/infra/sync_xlsx_to_dq_checks.py @@ -0,0 +1,331 @@ +"""Sincroniza un catalogo de data quality checks desde un .xlsx a BigQuery. + +El Excel es la fuente de verdad: cada corrida reemplaza la tabla destino entera +(WRITE_TRUNCATE). Antes de cargar, valida cada fila y (opcionalmente) hace un +dry-run del assert_sql contra BigQuery para cazar SQL invalido en el momento. +""" + +import csv +import os +import re +import tempfile +from datetime import datetime, timezone + +# Columnas del schema destino, en orden. El CSV temporal y el load respetan +# exactamente este orden. +SCHEMA_COLUMNS = [ + "check_id", + "tipo", + "subcategoria", + "pregunta", + "assert_sql", + "threshold", + "severity", + "enabled", + "owner", + "created_at", + "updated_at", +] + +# Cabeceras que se leen del Excel (sin created_at/updated_at, que se generan). +INPUT_HEADERS = [ + "check_id", + "tipo", + "subcategoria", + "pregunta", + "assert_sql", + "threshold", + "severity", + "enabled", + "owner", +] + +VALID_TIPOS = {"fiabilidad", "negocio"} +VALID_SEVERITIES = {"info", "warning", "critical"} +TRUE_TOKENS = {"true", "1", "si", "sí", "x", "yes", "verdadero"} +FALSE_TOKENS = {"false", "0", "no", "", "falso"} + + +def _slug(text: str) -> str: + """Pasa un texto a slug: minusculas, no-alfanumericos -> _, colapsa y recorta.""" + s = (text or "").strip().lower() + s = re.sub(r"[^a-z0-9]+", "_", s) + s = re.sub(r"_+", "_", s) + return s.strip("_") + + +def _norm(value) -> str: + """Normaliza una celda a string strip-eado. None -> ''.""" + if value is None: + return "" + return str(value).strip() + + +def _parse_bool(value) -> bool: + """Interpreta TRUE/1/si/sí/x como True; FALSE/0/vacio como False. Default True.""" + token = _norm(value).lower() + if token in TRUE_TOKENS: + return True + if token in FALSE_TOKENS: + return False + # Valor desconocido no vacio: tratar como True (la celda trae algo afirmativo). + return True + + +def _parse_threshold(value): + """Devuelve (int>=0, None) si valido, o (None, motivo) si no. + + Vacio/None -> 0. Acepta enteros y floats que sean enteros. + """ + token = _norm(value) + if token == "": + return 0, None + try: + num = float(token.replace(",", ".")) + except (ValueError, TypeError): + return None, f"threshold no es numerico: {token!r}" + if num != int(num): + return None, f"threshold debe ser entero, recibido: {token!r}" + num = int(num) + if num < 0: + return None, f"threshold debe ser >= 0, recibido: {num}" + return num, None + + +def _parse_catalog_rows(rows: list[dict]) -> tuple[list[dict], list[dict]]: + """Parsea, valida y normaliza filas crudas del catalogo. Funcion pura. + + Args: + rows: lista de dicts cabecera->valor crudo, una por fila del Excel. + + Returns: + (valid, skipped) donde: + - valid: lista de dicts con las 11 columnas del schema destino, + check_id unico garantizado, created_at/updated_at en ISO 8601 UTC. + - skipped: lista de dicts {"row": dict_original, "reason": str}. + """ + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + valid: list[dict] = [] + skipped: list[dict] = [] + seen_ids: set[str] = set() + + for raw in rows: + # Ignora filas totalmente vacias. + if all(_norm(raw.get(h)) == "" for h in INPUT_HEADERS): + continue + + tipo = _norm(raw.get("tipo")).lower() + subcategoria = _norm(raw.get("subcategoria")) + pregunta = _norm(raw.get("pregunta")) + assert_sql = _norm(raw.get("assert_sql")) + severity = _norm(raw.get("severity")).lower() + owner = _norm(raw.get("owner")) + check_id = _norm(raw.get("check_id")) + + # Validaciones. + if tipo not in VALID_TIPOS: + skipped.append({"row": raw, "reason": f"tipo invalido: {tipo!r} (esperado fiabilidad|negocio)"}) + continue + if pregunta == "": + skipped.append({"row": raw, "reason": "pregunta vacia"}) + continue + if assert_sql == "": + skipped.append({"row": raw, "reason": "assert_sql vacia"}) + continue + + if severity == "": + severity = "warning" + elif severity not in VALID_SEVERITIES: + skipped.append({"row": raw, "reason": f"severity invalida: {severity!r} (esperado info|warning|critical)"}) + continue + + threshold, threshold_err = _parse_threshold(raw.get("threshold")) + if threshold_err is not None: + skipped.append({"row": raw, "reason": threshold_err}) + continue + + enabled = _parse_bool(raw.get("enabled")) + + # Auto-deriva check_id si viene vacio. + if check_id == "": + check_id = f"{tipo}__{_slug(subcategoria)}_{_slug(pregunta)[:40]}" + + # Garantiza unicidad sufijando _2, _3, ... + base_id = check_id + suffix = 2 + while check_id in seen_ids: + check_id = f"{base_id}_{suffix}" + suffix += 1 + seen_ids.add(check_id) + + valid.append({ + "check_id": check_id, + "tipo": tipo, + "subcategoria": subcategoria, + "pregunta": pregunta, + "assert_sql": assert_sql, + "threshold": threshold, + "severity": severity, + "enabled": enabled, + "owner": owner, + "created_at": now_iso, + "updated_at": now_iso, + }) + + return valid, skipped + + +def _read_xlsx_rows(xlsx_path: str, sheet_name: str) -> list[dict]: + """Lee la hoja indicada del .xlsx y devuelve filas como dicts cabecera->valor. + + La fila 1 son las cabeceras. Localiza las columnas por nombre de cabecera, + no por posicion. Devuelve solo las cabeceras conocidas (INPUT_HEADERS). + """ + from openpyxl import load_workbook + + wb = load_workbook(filename=xlsx_path, read_only=True, data_only=True) + if sheet_name not in wb.sheetnames: + wb.close() + raise ValueError( + f"hoja {sheet_name!r} no encontrada en {xlsx_path}. Hojas: {wb.sheetnames}" + ) + ws = wb[sheet_name] + + rows_iter = ws.iter_rows(values_only=True) + try: + header_row = next(rows_iter) + except StopIteration: + wb.close() + return [] + + # Mapa cabecera-normalizada -> indice de columna. + header_index: dict[str, int] = {} + for idx, cell in enumerate(header_row): + name = _norm(cell).lower() + if name in INPUT_HEADERS: + header_index[name] = idx + + rows: list[dict] = [] + for excel_row in rows_iter: + row_dict = {} + for header in INPUT_HEADERS: + idx = header_index.get(header) + if idx is not None and idx < len(excel_row): + row_dict[header] = excel_row[idx] + else: + row_dict[header] = None + rows.append(row_dict) + + wb.close() + return rows + + +def sync_xlsx_to_dq_checks( + xlsx_path: str, + project: str = "autingo-159109", + dataset: str = "data_quality", + table: str = "dq_checks", + sheet_name: str = "catalogo", + validate_sql: bool = True, +) -> dict: + """Sincroniza un catalogo de data quality checks desde un .xlsx a BigQuery. + + Lee la hoja del Excel, valida/normaliza cada fila, (opcionalmente) hace un + dry-run de cada assert_sql contra BigQuery, y carga las filas validas a + project.dataset.table con WRITE_TRUNCATE (reemplazo total: el xlsx manda). + + Args: + xlsx_path: ruta al archivo .xlsx del catalogo. + project: proyecto GCP destino. Default "autingo-159109". + dataset: dataset BigQuery destino. Default "data_quality". + table: tabla BigQuery destino. Default "dq_checks". + sheet_name: nombre de la hoja a leer. Default "catalogo". + validate_sql: si True, dry-run de cada assert_sql contra BQ antes de cargar. + + Returns: + dict con {"loaded": int, "skipped": int, "sql_errors": [...], "rows": [...]} + donde rows es la lista de filas cargadas (con sus campos normalizados). + + Raises: + FileNotFoundError: si xlsx_path no existe. + ValueError: si la hoja no existe en el libro. + google.api_core.exceptions.GoogleAPICallError: si la carga a BQ falla. + """ + from bigquery import bq_auth, bq_query, bq_load_from_file + + if not os.path.exists(xlsx_path): + raise FileNotFoundError(f"no existe el archivo xlsx: {xlsx_path}") + + raw_rows = _read_xlsx_rows(xlsx_path, sheet_name) + valid, skipped = _parse_catalog_rows(raw_rows) + + client = bq_auth(project_id=project) + + sql_errors: list[dict] = [] + loadable: list[dict] = [] + + if validate_sql: + for row in valid: + try: + bq_query(client, row["assert_sql"], dry_run=True) + loadable.append(row) + except Exception as exc: # noqa: BLE001 — capturamos cualquier fallo de BQ + sql_errors.append({"check_id": row["check_id"], "error": str(exc)}) + else: + loadable = valid + + # Escribe las filas cargables a un CSV temporal con QUOTE_ALL: el assert_sql + # puede contener comas, comillas y saltos de linea. + tmp_fd, tmp_path = tempfile.mkstemp(suffix=".csv", prefix="dq_checks_") + os.close(tmp_fd) + try: + with open(tmp_path, "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f, quoting=csv.QUOTE_ALL) + writer.writerow(SCHEMA_COLUMNS) + for row in loadable: + writer.writerow([ + row["check_id"], + row["tipo"], + row["subcategoria"], + row["pregunta"], + row["assert_sql"], + row["threshold"], + row["severity"], + # BigQuery CSV BOOL acepta true/false en minusculas. + "true" if row["enabled"] else "false", + row["owner"], + row["created_at"], + row["updated_at"], + ]) + + bq_load_from_file( + client, + tmp_path, + dataset, + table, + source_format="CSV", + write_disposition="WRITE_TRUNCATE", + skip_leading_rows=1, + autodetect=False, + ) + finally: + if os.path.exists(tmp_path): + os.remove(tmp_path) + + return { + "loaded": len(loadable), + "skipped": len(skipped), + "sql_errors": sql_errors, + "rows": loadable, + } + + +if __name__ == "__main__": + import json + import sys + + path = sys.argv[1] if len(sys.argv) > 1 else "/mnt/c/Users/egutierrez/Documents/dq_catalogo_asserts.xlsx" + result = sync_xlsx_to_dq_checks(path) + print(json.dumps({k: v for k, v in result.items() if k != "rows"}, ensure_ascii=False, indent=2)) + print(f"loaded={result['loaded']} skipped={result['skipped']} sql_errors={len(result['sql_errors'])}") diff --git a/python/functions/infra/sync_xlsx_to_dq_checks_test.py b/python/functions/infra/sync_xlsx_to_dq_checks_test.py new file mode 100644 index 00000000..98cf0113 --- /dev/null +++ b/python/functions/infra/sync_xlsx_to_dq_checks_test.py @@ -0,0 +1,171 @@ +"""Tests para sync_xlsx_to_dq_checks (logica pura: parseo/validacion/derivacion). + +No tocan BigQuery: la autenticacion, el dry-run y la carga quedan fuera del +test unitario. Solo se ejercita _parse_catalog_rows y sus helpers. +""" + +from sync_xlsx_to_dq_checks import _parse_catalog_rows, _slug, _parse_bool, _parse_threshold + + +def _row(**kwargs) -> dict: + base = { + "check_id": "", + "tipo": "", + "subcategoria": "", + "pregunta": "", + "assert_sql": "", + "threshold": "", + "severity": "", + "enabled": "", + "owner": "", + } + base.update(kwargs) + return base + + +def test_fila_valida_minima_se_carga(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="fiabilidad", pregunta=" hay nulos? ", assert_sql="SELECT 1", enabled="TRUE"), + ]) + assert len(valid) == 1 + assert len(skipped) == 0 + r = valid[0] + assert r["tipo"] == "fiabilidad" + assert r["pregunta"] == "hay nulos?" + assert r["severity"] == "warning" # default + assert r["threshold"] == 0 # default + assert r["enabled"] is True + assert r["owner"] == "" + assert r["created_at"] == r["updated_at"] + assert r["created_at"].endswith("Z") + + +def test_check_id_se_autoderiva_cuando_vacio(): + valid, _ = _parse_catalog_rows([ + _row(tipo="negocio", subcategoria="Ventas Diarias", + pregunta="¿La venta es positiva?", assert_sql="SELECT 1"), + ]) + cid = valid[0]["check_id"] + assert cid.startswith("negocio__ventas_diarias_") + assert "la_venta_es_positiva" in cid + + +def test_check_id_explicito_se_respeta(): + valid, _ = _parse_catalog_rows([ + _row(check_id="custom_id_1", tipo="fiabilidad", + pregunta="x", assert_sql="SELECT 1"), + ]) + assert valid[0]["check_id"] == "custom_id_1" + + +def test_check_id_colision_se_sufija(): + valid, _ = _parse_catalog_rows([ + _row(check_id="dup", tipo="fiabilidad", pregunta="a", assert_sql="SELECT 1"), + _row(check_id="dup", tipo="negocio", pregunta="b", assert_sql="SELECT 2"), + _row(check_id="dup", tipo="negocio", pregunta="c", assert_sql="SELECT 3"), + ]) + ids = [r["check_id"] for r in valid] + assert ids == ["dup", "dup_2", "dup_3"] + + +def test_tipo_invalido_se_skipea(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="otracosa", pregunta="x", assert_sql="SELECT 1"), + ]) + assert valid == [] + assert len(skipped) == 1 + assert "tipo invalido" in skipped[0]["reason"] + + +def test_tipo_case_insensitive_se_normaliza(): + valid, skipped = _parse_catalog_rows([ + _row(tipo=" Fiabilidad ", pregunta="x", assert_sql="SELECT 1"), + ]) + assert len(valid) == 1 + assert skipped == [] + assert valid[0]["tipo"] == "fiabilidad" + + +def test_pregunta_vacia_se_skipea(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="negocio", pregunta=" ", assert_sql="SELECT 1"), + ]) + assert valid == [] + assert skipped[0]["reason"] == "pregunta vacia" + + +def test_assert_sql_vacia_se_skipea(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="negocio", pregunta="x", assert_sql=""), + ]) + assert valid == [] + assert skipped[0]["reason"] == "assert_sql vacia" + + +def test_severity_invalida_se_skipea(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="negocio", pregunta="x", assert_sql="SELECT 1", severity="urgente"), + ]) + assert valid == [] + assert "severity invalida" in skipped[0]["reason"] + + +def test_threshold_negativo_se_skipea(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="negocio", pregunta="x", assert_sql="SELECT 1", threshold="-5"), + ]) + assert valid == [] + assert "threshold debe ser >= 0" in skipped[0]["reason"] + + +def test_threshold_no_numerico_se_skipea(): + valid, skipped = _parse_catalog_rows([ + _row(tipo="negocio", pregunta="x", assert_sql="SELECT 1", threshold="abc"), + ]) + assert valid == [] + assert "threshold no es numerico" in skipped[0]["reason"] + + +def test_fila_totalmente_vacia_se_ignora_sin_skip(): + valid, skipped = _parse_catalog_rows([ + _row(), # todo vacio + _row(tipo="negocio", pregunta="x", assert_sql="SELECT 1"), + ]) + assert len(valid) == 1 + assert skipped == [] # la vacia se ignora, no se cuenta como skipped + + +def test_parse_bool_variantes(): + assert _parse_bool("TRUE") is True + assert _parse_bool("True") is True + assert _parse_bool(1) is True + assert _parse_bool("si") is True + assert _parse_bool("sí") is True + assert _parse_bool("x") is True + assert _parse_bool("FALSE") is False + assert _parse_bool("0") is False + assert _parse_bool("") is False + assert _parse_bool(None) is False + + +def test_parse_threshold_defaults_y_floats_enteros(): + assert _parse_threshold("") == (0, None) + assert _parse_threshold(None) == (0, None) + assert _parse_threshold("3") == (3, None) + assert _parse_threshold("3.0") == (3, None) + val, err = _parse_threshold("3.5") + assert val is None and "entero" in err + + +def test_slug_colapsa_y_recorta(): + assert _slug(" Hola, Mundo!! ") == "hola_mundo" + assert _slug("A---B___C") == "a_b_c" + assert _slug("") == "" + + +def test_assert_sql_con_comas_y_comillas_se_conserva(): + sql = 'SELECT COUNT(*) FROM t WHERE col IN ("a", "b"), foo = 1' + valid, _ = _parse_catalog_rows([ + _row(tipo="fiabilidad", pregunta="x", assert_sql=sql), + ]) + assert valid[0]["assert_sql"] == sql