fix(eda): bugs de bajo riesgo del benchmark (H1,H5,H12,H13,H14) + tests faltantes

- H1: render_eda_markdown ya no aplica doble x100 a outlier_pct (336% -> real)
- H5: profile_database filtra base_tables_only (excluye VIEWs; sakila 21->16)
- H12: suggest_reexpression salta columnas no-continuas
- H13: to_returns/profile_table elige retornos (financiera) vs diferencias (fisica)
- H14: test de regresion ATTACH sqlite via information_schema
- +8 tests de las funciones eda nuevas (acf_pacf, adf_kpss, ...). 77 tests verdes
- L/M (H2,H3,H4,H6,H7,H8,H9,H10,H11) quedan en issues 0174-0177 para revision

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Egutierrez
2026-06-29 03:51:11 +02:00
parent 7ac69ab4fb
commit caf8c25d99
17 changed files with 1145 additions and 31 deletions
@@ -0,0 +1,71 @@
"""Tests para acf_pacf."""
import numpy as np
from acf_pacf import acf_pacf
def _ar1(phi: float, n: int, seed: int) -> list:
rng = np.random.default_rng(seed)
series = [0.0]
for _ in range(n):
series.append(phi * series[-1] + rng.normal(0, 1))
return series
def test_ruido_blanco_no_autocorrelado():
rng = np.random.default_rng(0)
ruido = rng.normal(0, 1, 500).tolist()
res = acf_pacf(ruido)
assert res["is_autocorrelated"] is False
def test_ar1_es_autocorrelado():
ar = _ar1(0.8, 500, seed=1)
res = acf_pacf(ar)
assert res["is_autocorrelated"] is True
def test_lag1_significativo_en_ar1():
# En un AR(1) la PACF corta tras el lag 1: lag 1 debe ser significativo.
ar = _ar1(0.8, 500, seed=2)
res = acf_pacf(ar)
assert 1 in res["significant_pacf_lags"]
assert 1 in res["significant_acf_lags"]
def test_muestra_insuficiente_devuelve_nota():
res = acf_pacf([1, 2, 3, 4, 5])
assert res["n"] == 5
assert res["note"] == "datos insuficientes"
assert res["is_autocorrelated"] is None
def test_descarta_none_y_nan():
rng = np.random.default_rng(3)
base = rng.normal(0, 1, 200).tolist()
sucio = []
for i, v in enumerate(base):
sucio.append(v)
if i % 25 == 0:
sucio.append(None)
sucio.append(float("nan"))
res = acf_pacf(sucio)
assert res["n"] == 200
def test_recorta_nlags_a_limites():
# Serie de 20 puntos con nlags=40: debe recortar a < n/2.
rng = np.random.default_rng(4)
serie = rng.normal(0, 1, 20).tolist()
res = acf_pacf(serie, nlags=40)
assert res["nlags"] < 20 // 2
assert len(res["acf"]) == res["nlags"] + 1
def test_acf_lag0_es_uno():
rng = np.random.default_rng(5)
serie = rng.normal(0, 1, 100).tolist()
res = acf_pacf(serie)
assert abs(res["acf"][0] - 1.0) < 1e-9
assert abs(res["pacf"][0] - 1.0) < 1e-9
@@ -0,0 +1,76 @@
"""Tests para adf_kpss_stationarity."""
import numpy as np
from adf_kpss_stationarity import adf_kpss_stationarity
def test_random_walk_es_no_estacionario():
# Random walk = suma acumulada de ruido: tiene raiz unitaria.
rng = np.random.default_rng(123)
paseo = np.cumsum(rng.normal(0.0, 1.0, 400)).tolist()
res = adf_kpss_stationarity(paseo)
assert res["verdict"] == "non_stationary"
assert res["adf"]["stationary"] is False
assert res["kpss"]["stationary"] is False
def test_ruido_blanco_es_estacionario():
# Ruido blanco gaussiano: estacionario por construccion.
rng = np.random.default_rng(42)
ruido = rng.normal(0.0, 1.0, 400).tolist()
res = adf_kpss_stationarity(ruido)
assert res["verdict"] == "stationary"
assert res["adf"]["stationary"] is True
assert res["kpss"]["stationary"] is True
assert res["warning"] is None
def test_serie_con_tendencia_no_es_estacionaria():
# Tendencia lineal determinista + ruido pequeno: KPSS la marca no estacionaria.
rng = np.random.default_rng(7)
serie = [0.1 * i + rng.normal(0, 0.5) for i in range(300)]
res = adf_kpss_stationarity(serie)
assert res["verdict"] != "stationary"
assert res["warning"] is not None
def test_muestra_insuficiente_devuelve_nota():
res = adf_kpss_stationarity([1, 2, 3, 4, 5])
assert res["n"] == 5
assert res["note"] == "datos insuficientes"
assert res["verdict"] is None
def test_descarta_none_y_nan():
rng = np.random.default_rng(1)
base = rng.normal(0, 1, 200).tolist()
sucio = []
for i, v in enumerate(base):
sucio.append(v)
if i % 20 == 0:
sucio.append(None)
sucio.append(float("nan"))
res = adf_kpss_stationarity(sucio)
assert res["n"] == 200 # las None/NaN no cuentan
def test_warning_presente_si_no_estacionaria():
# Tendencia lineal fuerte: garantiza no estacionariedad (verdict != stationary).
rng = np.random.default_rng(99)
serie = [0.5 * i + rng.normal(0, 0.3) for i in range(300)]
res = adf_kpss_stationarity(serie)
assert res["verdict"] != "stationary"
assert res["warning"] is not None
assert "espuria" in res["warning"].lower()
def test_estructura_basica_del_dict():
rng = np.random.default_rng(5)
ruido = rng.normal(0, 1, 100).tolist()
res = adf_kpss_stationarity(ruido)
for key in ("n", "alpha", "adf", "kpss", "verdict"):
assert key in res
for sub in ("stat", "p_value", "lags", "stationary", "conclusion"):
assert sub in res["adf"]
assert sub in res["kpss"]
@@ -0,0 +1,112 @@
"""Tests para exploratory_caveats."""
from exploratory_caveats import exploratory_caveats
def _ids(out):
return {c["id"] for c in out["caveats"]}
def test_perfil_vacio_solo_caveat_general():
out = exploratory_caveats({})
assert out["n"] == 1
assert _ids(out) == {"exploratory_nature"}
assert out["note"]
def test_none_no_lanza_y_da_general():
out = exploratory_caveats(None)
assert _ids(out) == {"exploratory_nature"}
def test_caveat_general_siempre_primero():
out = exploratory_caveats({"n_rows": 1000, "columns": []})
assert out["caveats"][0]["id"] == "exploratory_nature"
def test_correlaciones_disparan_causalidad_y_overfitting():
profile = {
"n_rows": 5000,
"correlations": {"pairs": [{"a": "x", "b": "y", "value": 0.8}]},
}
ids = _ids(exploratory_caveats(profile))
assert "correlation_not_causation" in ids
assert "in_sample_overfitting" in ids
# un solo par -> NO dispara comparaciones múltiples
assert "multiple_comparisons" not in ids
def test_dos_o_mas_pares_disparan_comparaciones_multiples():
profile = {
"correlations": [
{"a": "x", "b": "y", "value": 0.8},
{"a": "x", "b": "z", "value": -0.6},
],
}
assert "multiple_comparisons" in _ids(exploratory_caveats(profile))
def test_modelos_disparan_overfitting_y_pvalues():
profile = {
"models": {
"pca": {"explained": [0.6, 0.3]},
"normality": {"col_a": {"is_normal": False}},
},
}
ids = _ids(exploratory_caveats(profile))
assert "in_sample_overfitting" in ids
assert "p_values_not_confirmation" in ids
def test_outliers_por_columna_disparan_caveat():
profile = {
"columns": [
{"name": "precio", "numeric": {"n_outliers": 3, "outlier_pct": 1.5}},
],
}
assert "outliers_not_errors" in _ids(exploratory_caveats(profile))
def test_outliers_multivariantes_disparan_caveat():
profile = {"models": {"outliers": {"flags": [True, False, True]}}}
assert "outliers_not_errors" in _ids(exploratory_caveats(profile))
def test_trend_pvalue_dispara_caveat_pvalues():
profile = {
"columns": [
{"name": "ventas", "trend": {"direction": "up", "p_value": 0.01}},
],
}
assert "p_values_not_confirmation" in _ids(exploratory_caveats(profile))
def test_muestra_pequena_dispara_caveat():
out = exploratory_caveats({"n_rows": 12})
assert "small_sample" in _ids(out)
msg = next(c["message"] for c in out["caveats"] if c["id"] == "small_sample")
assert "12" in msg
def test_muestra_grande_no_dispara_small_sample():
assert "small_sample" not in _ids(exploratory_caveats({"n_rows": 5000}))
def test_muchos_faltantes_disparan_missing_data():
assert "missing_data_bias" in _ids(exploratory_caveats({"null_cell_pct": 0.35}))
def test_columnas_all_null_disparan_missing_data():
assert "missing_data_bias" in _ids(exploratory_caveats({"all_null_cols": ["x"]}))
def test_pocos_faltantes_no_disparan_missing_data():
assert "missing_data_bias" not in _ids(exploratory_caveats({"null_cell_pct": 0.05}))
def test_estructura_de_cada_caveat():
out = exploratory_caveats({"correlations": [{"a": "x", "b": "y", "value": 0.9}]})
for c in out["caveats"]:
assert set(c.keys()) == {"id", "topic", "message", "reference"}
assert all(isinstance(c[k], str) and c[k] for k in c)
assert out["n"] == len(out["caveats"])
@@ -0,0 +1,99 @@
"""Tests para fdr_correction (correccion de comparaciones multiples).
Importa el modulo hoja directamente (`datascience.fdr_correction`) para no
depender de que el paquete reexporte la funcion en su __init__ (lo integra el
orquestador al cerrar el grupo eda).
"""
from datascience.fdr_correction import fdr_correction
def test_bh_golden_rechaza_dos_de_tres():
# Dos p-valores fuertes y uno claramente no significativo.
# BH (step-up) sobre [0.01, 0.02, 0.5], m=3, alpha=0.05:
# q3 = 0.5*3/3 = 0.50
# q2 = min(0.50, 0.02*3/2=0.03) = 0.03
# q1 = min(0.03, 0.01*3/1=0.03) = 0.03
# reject = [q<=0.05] -> [True, True, False]
out = fdr_correction([0.01, 0.02, 0.5], alpha=0.05, method="bh")
assert out["reject"] == [True, True, False]
assert out["n_rejected"] == 2
assert out["n_tests"] == 3
assert out["method"] == "bh"
# q-valores esperados.
adj = out["p_values_adjusted"]
assert abs(adj[0] - 0.03) < 1e-9
assert abs(adj[1] - 0.03) < 1e-9
assert abs(adj[2] - 0.50) < 1e-9
def test_bonferroni_mas_conservador_que_bh():
pvalues = [0.01, 0.02, 0.5]
bh = fdr_correction(pvalues, alpha=0.05, method="bh")
bon = fdr_correction(pvalues, alpha=0.05, method="bonferroni")
# Bonferroni nunca rechaza mas que BH.
assert bon["n_rejected"] <= bh["n_rejected"]
# p ajustado = min(1, p*m): [0.03, 0.06, 1.0] -> solo el primero pasa.
assert bon["reject"] == [True, False, False]
assert abs(bon["p_values_adjusted"][0] - 0.03) < 1e-9
assert abs(bon["p_values_adjusted"][1] - 0.06) < 1e-9
assert bon["p_values_adjusted"][2] == 1.0
def test_p_values_adjusted_alineados_y_en_rango():
pvalues = [0.001, 0.2, 0.04, 0.6, 0.9]
out = fdr_correction(pvalues, method="bh")
assert len(out["p_values_adjusted"]) == len(pvalues)
assert len(out["reject"]) == len(pvalues)
for q in out["p_values_adjusted"]:
assert q is not None and 0.0 <= q <= 1.0
# El p-valor ajustado nunca es menor que el crudo (la correccion solo sube).
for p, q in zip(pvalues, out["p_values_adjusted"]):
assert q >= p - 1e-12
def test_none_se_propaga_alineado():
# Posicion central sin test disponible: se propaga como None / False y no
# cuenta como prueba (m=2, no 3).
out = fdr_correction([0.001, None, 0.9], alpha=0.05, method="bh")
assert out["n_tests"] == 2
assert out["p_values_adjusted"][1] is None
assert out["reject"][1] is False
assert out["reject"][0] is True
assert len(out["reject"]) == 3
def test_lista_vacia_devuelve_note():
out = fdr_correction([])
assert out["p_values_adjusted"] == []
assert out["reject"] == []
assert out["n_tests"] == 0
assert out["n_rejected"] == 0
assert "note" in out
def test_solo_none_devuelve_note():
out = fdr_correction([None, None, float("nan")])
assert out["n_tests"] == 0
assert out["n_rejected"] == 0
assert out["reject"] == [False, False, False]
assert out["p_values_adjusted"] == [None, None, None]
assert "note" in out
def test_metodo_desconocido_devuelve_note():
out = fdr_correction([0.01, 0.02], method="holm")
assert "note" in out
assert out["n_rejected"] == 0
assert out["reject"] == [False, False]
def test_todos_significativos():
# Todos los p-valores diminutos -> todos rechazados con ambos metodos.
pvalues = [1e-6, 1e-5, 1e-4]
bh = fdr_correction(pvalues, alpha=0.05, method="bh")
bon = fdr_correction(pvalues, alpha=0.05, method="bonferroni")
assert bh["n_rejected"] == 3
assert bon["n_rejected"] == 3
assert all(bh["reject"])
assert all(bon["reject"])
@@ -201,7 +201,10 @@ def render_eda_markdown(profile: dict) -> str:
if val is None: if val is None:
continue continue
if key == "outlier_pct": if key == "outlier_pct":
stat_rows.append([label, _fmt_pct(val)]) # outlier_pct ya viene en escala 0-100 desde describe_numeric
# (100 * n_outliers / n). NO usar _fmt_pct (multiplica x100 otra
# vez y produce porcentajes imposibles, p.ej. 7% -> 700%).
stat_rows.append([label, _fmt_num(val, 2) + "%"])
elif key == "distribution_type": elif key == "distribution_type":
stat_rows.append([label, str(val)]) stat_rows.append([label, str(val)])
else: else:
@@ -373,12 +376,26 @@ def render_eda_markdown(profile: dict) -> str:
elif stl.get("note"): elif stl.get("note"):
rows.append(["STL", stl.get("note")]) rows.append(["STL", stl.get("note")])
if s.get("levels_suggested"): if s.get("levels_suggested"):
rows.append(["sugerencia", "convertir a retornos (serie de niveles)"]) # La transformación recomendada depende de la semántica: retornos para
tr = s.get("to_returns") or {} # series financieras (precio/volumen), diferencias para magnitudes
if tr.get("mean") is not None: # físicas (temperatura, caudal). Aplicar "retornos" a temperatura no
rows.append(["retorno medio (log)", _fmt_num(tr.get("mean"))]) # tiene sentido físico; las diferencias sí.
if tr.get("std") is not None: kind = s.get("levels_kind")
rows.append(["volatilidad retornos (σ)", _fmt_num(tr.get("std"))]) if kind == "returns":
label = "convertir a retornos (serie de niveles financiera)"
elif kind == "differences":
label = "trabajar sobre diferencias (serie de niveles no financiera)"
else:
label = "convertir a retornos o diferencias (serie de niveles)"
rows.append(["sugerencia", label])
# Las métricas de retorno (media/volatilidad) solo se muestran cuando la
# transformación recomendada son retornos; para diferencias no aplican.
if kind != "differences":
tr = s.get("to_returns") or {}
if tr.get("mean") is not None:
rows.append(["retorno medio (log)", _fmt_num(tr.get("mean"))])
if tr.get("std") is not None:
rows.append(["volatilidad retornos (σ)", _fmt_num(tr.get("std"))])
if rows: if rows:
block.append(_md_table(["aspecto", "valor"], rows)) block.append(_md_table(["aspecto", "valor"], rows))
if stat.get("warning"): if stat.get("warning"):
@@ -53,7 +53,9 @@ def _sample_profile(correlations=None, llm=None):
"p99": 95.0, "p99": 95.0,
"skew": 0.4, "skew": 0.4,
"kurtosis": 2.1, "kurtosis": 2.1,
"outlier_pct": 0.012, # outlier_pct ya viene en escala 0-100 desde describe_numeric
# (100 * n_outliers / n), NO en fracción 0-1.
"outlier_pct": 3.5,
"distribution_type": "right-skewed", "distribution_type": "right-skewed",
"histogram": [ "histogram": [
{"lo": 0, "hi": 25, "count": 100}, {"lo": 0, "hi": 25, "count": 100},
@@ -126,8 +128,15 @@ def test_pct_fields_scaled_by_100():
assert "0.86%" not in md assert "0.86%" not in md
# categorical top pct=0.5 -> "50.0%". # categorical top pct=0.5 -> "50.0%".
assert "50.0" in md assert "50.0" in md
# outlier_pct=0.012 -> "1.20%".
assert "1.20%" in md
def test_outlier_pct_not_double_scaled():
# outlier_pct ya viene en escala 0-100 (describe_numeric): el render lo muestra
# tal cual + '%', SIN multiplicar otra vez por 100. outlier_pct=3.5 -> "3.5%",
# nunca "350%" (el bug del doble ×100).
md = render_eda_markdown(_sample_profile())
assert "3.5%" in md
assert "350" not in md
def test_pct_handles_none_as_blank(): def test_pct_handles_none_as_blank():
@@ -0,0 +1,172 @@
"""Tests para render_eda_pdf.
Importa el módulo directo (sys.path), igual que el resto de tests del grupo eda,
para no depender del registro en __init__.py (lo añade el orquestador al integrar).
"""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from render_eda_pdf import render_eda_pdf
def _synthetic_profile() -> dict:
"""TableProfile sintético mínimo: 2 numéricas + 1 categórica + overview."""
return {
"table": "ventas",
"source": "data/ventas.csv",
"profiled_at": "2026-06-28 10:00 UTC",
"n_rows": 1000,
"n_cols": 3,
"null_cell_pct": 0.02,
"duplicate_rows": 5,
"duplicate_pct": 0.005,
"quality_score": 92.5,
"type_breakdown": {"numeric": 2, "categorical": 1},
"key_candidates": ["id"],
"columns": [
{
"name": "precio",
"inferred_type": "numeric",
"semantic_type": "currency",
"null_pct": 0.0,
"distinct_count": 850,
"unique_pct": 0.85,
"quality_score": 95.0,
"flags": [],
"numeric": {
"min": 1.0, "max": 100.0, "median": 40.0, "mean": 42.5,
"std": 12.3, "p25": 30.0, "p75": 55.0, "outlier_pct": 1.2,
"distribution_type": "right-skewed",
"histogram": [
{"lo": 0.0, "hi": 25.0, "count": 100},
{"lo": 25.0, "hi": 50.0, "count": 500},
{"lo": 50.0, "hi": 75.0, "count": 300},
{"lo": 75.0, "hi": 100.0, "count": 50},
],
},
},
{
"name": "unidades",
"inferred_type": "numeric",
"semantic_type": "integer",
"null_pct": 0.01,
"distinct_count": 40,
"unique_pct": 0.04,
"quality_score": 88.0,
"flags": ["has_nulls"],
"numeric": {
"min": 1.0, "max": 12.0, "median": 4.0, "mean": 4.8,
"std": 2.1, "outlier_pct": 0.0,
"distribution_type": "normal",
"histogram": [
{"lo": 1.0, "hi": 4.0, "count": 400},
{"lo": 4.0, "hi": 8.0, "count": 450},
{"lo": 8.0, "hi": 12.0, "count": 150},
],
},
},
{
"name": "categoria",
"inferred_type": "categorical",
"semantic_type": "",
"null_pct": 0.0,
"distinct_count": 3,
"unique_pct": 0.003,
"quality_score": 99.0,
"flags": [],
"categorical": {
"entropy": 1.05,
"top": [
{"value": "neumaticos", "count": 500, "pct": 0.5},
{"value": "aceite", "count": 300, "pct": 0.3},
{"value": "filtros", "count": 200, "pct": 0.2},
],
},
},
],
"correlations": {
"pairs": [
{"a": "precio", "b": "unidades", "value": -0.42, "method": "pearson"},
],
},
}
def test_golden_genera_pdf_multipagina(tmp_path):
"""Caso real: profile completo -> PDF existe, pesa >0 y tiene varias páginas."""
out = str(tmp_path / "eda_ventas.pdf")
res = render_eda_pdf(_synthetic_profile(), out, title="EDA — ventas")
assert isinstance(res, dict)
assert set(res.keys()) == {"pdf_path", "n_pages", "note"}
assert res["pdf_path"] == out
assert os.path.exists(out)
assert os.path.getsize(out) > 0
# Cover + overview + numéricas + categóricas + calidad + correlaciones >= 5.
assert res["n_pages"] >= 5
# Cabecera de archivo PDF.
with open(out, "rb") as fh:
assert fh.read(4) == b"%PDF"
def test_edge_profile_vacio_no_revienta(tmp_path):
"""Edge: dict vacío -> 1 página garantizada, sin excepción."""
out = str(tmp_path / "vacio.pdf")
res = render_eda_pdf({}, out)
assert os.path.exists(out)
assert os.path.getsize(out) > 0
assert res["n_pages"] >= 1
assert res["pdf_path"] == out
def test_edge_profile_none_no_revienta(tmp_path):
"""Edge: None -> tratado como vacío, 1 página, sin excepción."""
out = str(tmp_path / "none.pdf")
res = render_eda_pdf(None, out)
assert os.path.exists(out)
assert res["n_pages"] >= 1
def test_edge_solo_numericas(tmp_path):
"""Edge: profile sólo con columnas numéricas (sin categóricas ni corr)."""
prof = {
"table": "t",
"n_rows": 10,
"n_cols": 1,
"columns": [
{
"name": "x",
"inferred_type": "numeric",
"quality_score": 80.0,
"numeric": {
"median": 2.0, "mean": 2.0,
"histogram": [{"lo": 0.0, "hi": 4.0, "count": 10}],
},
},
],
}
out = str(tmp_path / "num.pdf")
res = render_eda_pdf(prof, out)
assert os.path.exists(out)
assert res["n_pages"] >= 2 # cover + numéricas al menos.
def test_forward_compat_seccion_desconocida(tmp_path):
"""Error/forward-compat: un bloque nuevo del profile se vuelca, no rompe."""
prof = {
"table": "t",
"n_rows": 5,
"columns": [],
# Bloques que este renderer no conoce (otros agentes los añaden):
"models": {"kmeans": {"k": 3, "silhouette": 0.55}},
"caveats": ["muestra pequeña", "fechas como texto"],
}
out = str(tmp_path / "fwd.pdf")
res = render_eda_pdf(prof, out)
assert os.path.exists(out)
assert res["n_pages"] >= 1
# No se perdió ninguna sección por error.
assert "omitida" not in res["note"]
@@ -0,0 +1,72 @@
"""Tests para stl_decompose."""
import numpy as np
from stl_decompose import stl_decompose
def _serie_estacional(n: int, period: int, trend: float, amp: float, seed: int) -> list:
rng = np.random.default_rng(seed)
return [
trend * i + amp * np.sin(2 * np.pi * i / period) + rng.normal(0, 1)
for i in range(n)
]
def test_serie_con_tendencia_y_estacionalidad():
serie = _serie_estacional(n=120, period=12, trend=0.3, amp=10.0, seed=0)
res = stl_decompose(serie, period=12)
assert res["period"] == 12
assert res["trend_strength"] > 0.5
assert res["seasonal_strength"] > 0.5
assert len(res["trend"]["values"]) == 120
def test_fuerza_estacional_alta_con_estacionalidad_fuerte():
# Amplitud estacional grande, ruido pequeno => seasonal_strength cercano a 1.
serie = _serie_estacional(n=120, period=12, trend=0.05, amp=20.0, seed=1)
res = stl_decompose(serie, period=12)
assert res["seasonal_strength"] > 0.9
def test_infiere_periodo_si_none():
serie = _serie_estacional(n=120, period=12, trend=0.1, amp=10.0, seed=2)
res = stl_decompose(serie) # period=None
assert res.get("period_inferred") is True
assert res["period"] is not None
def test_serie_corta_devuelve_nota():
# period=12 pero solo 20 puntos (< 2*period=24): nota, no descompone.
serie = _serie_estacional(n=20, period=12, trend=0.1, amp=5.0, seed=3)
res = stl_decompose(serie, period=12)
assert "note" in res
assert res["trend_strength"] is None
def test_muestra_insuficiente_devuelve_nota():
res = stl_decompose([1, 2, 3, 4, 5])
assert res["n"] == 5
assert res["note"] == "datos insuficientes"
assert res["seasonal_strength"] is None
def test_descarta_none_y_nan():
serie = _serie_estacional(n=120, period=12, trend=0.2, amp=8.0, seed=4)
sucio = []
for i, v in enumerate(serie):
sucio.append(v)
if i % 30 == 0:
sucio.append(None)
sucio.append(float("nan"))
res = stl_decompose(sucio, period=12)
assert res["n"] == 120
def test_serie_larga_resume_sin_values():
# >200 puntos: las componentes vienen resumidas sin 'values'.
serie = _serie_estacional(n=300, period=12, trend=0.1, amp=10.0, seed=5)
res = stl_decompose(serie, period=12)
assert res["trend"]["values"] is None
assert "mean" in res["trend"]
assert "note" in res["trend"]
@@ -0,0 +1,97 @@
"""Tests para suggest_reexpression."""
from suggest_reexpression import suggest_reexpression
def test_aproximadamente_simetrica_recomienda_none():
# |skew| < 0.5 -> no hace falta re-expresar.
out = suggest_reexpression({"skew": 0.1, "min": 5.0, "zero_pct": 0.0, "negative_pct": 0.0})
assert out["recommended"] == "none"
assert out["ladder_power"] == 1.0
assert out["alternatives"] == []
assert out["note"] == ""
def test_positiva_fuerte_todo_positivo_recomienda_log():
# Cola derecha larga sobre datos estrictamente positivos -> log.
out = suggest_reexpression({"skew": 2.3, "min": 1.0, "zero_pct": 0.0, "negative_pct": 0.0})
assert out["recommended"] == "log"
assert out["ladder_power"] == 0.0
transforms = [a["transform"] for a in out["alternatives"]]
assert "box-cox" in transforms
def test_positiva_moderada_todo_positivo_recomienda_sqrt():
out = suggest_reexpression({"skew": 0.7, "min": 2.0, "zero_pct": 0.0, "negative_pct": 0.0})
assert out["recommended"] == "sqrt"
assert out["ladder_power"] == 0.5
def test_positiva_con_ceros_fuerte_recomienda_log1p():
# log(0) indefinido -> log1p en presencia de ceros.
out = suggest_reexpression({"skew": 1.5, "min": 0.0, "zero_pct": 12.0, "negative_pct": 0.0})
assert out["recommended"] == "log1p"
assert out["ladder_power"] == 0.0
def test_positiva_con_negativos_recomienda_yeo_johnson():
# log/Box-Cox no admiten negativos -> Yeo-Johnson.
out = suggest_reexpression({"skew": 1.8, "min": -4.0, "zero_pct": 0.0, "negative_pct": 20.0})
assert out["recommended"] == "yeo-johnson"
assert out["ladder_power"] is None # data-driven
def test_negativa_fuerte_todo_positivo_recomienda_cube():
# Cola izquierda -> subir por la escalera de Tukey.
out = suggest_reexpression({"skew": -1.6, "min": 3.0, "zero_pct": 0.0, "negative_pct": 0.0})
assert out["recommended"] == "cube"
assert out["ladder_power"] == 3.0
def test_negativa_moderada_todo_positivo_recomienda_square():
out = suggest_reexpression({"skew": -0.8, "min": 3.0, "zero_pct": 0.0, "negative_pct": 0.0})
assert out["recommended"] == "square"
assert out["ladder_power"] == 2.0
def test_dominio_desconocido_recomienda_yeo_johnson_con_nota():
# Solo skew, sin min/zero_pct/negative_pct -> opción segura + nota.
out = suggest_reexpression({"skew": 1.4})
assert out["recommended"] == "yeo-johnson"
assert "dominio desconocido" in out["note"]
def test_acepta_columnprofile_completo_con_numeric_anidado():
# Si llega un ColumnProfile entero, baja a su sub-bloque numeric.
profile = {
"name": "precio",
"inferred_type": "numeric",
"numeric": {"skew": 2.0, "min": 1.0, "zero_pct": 0.0, "negative_pct": 0.0},
}
out = suggest_reexpression(profile)
assert out["recommended"] == "log"
def test_skew_ausente_devuelve_nota():
out = suggest_reexpression({"min": 1.0, "max": 9.0})
assert out["recommended"] is None
assert "skew ausente" in out["note"]
def test_stats_vacio_devuelve_nota():
out = suggest_reexpression({})
assert out["recommended"] is None
assert out["alternatives"] == []
assert out["note"]
def test_no_dict_no_lanza():
out = suggest_reexpression(None)
assert out["recommended"] is None
assert out["note"]
def test_skew_no_numerico_devuelve_nota():
out = suggest_reexpression({"skew": "mucho"})
assert out["recommended"] is None
assert out["skew"] is None
@@ -0,0 +1,72 @@
"""Tests para to_returns."""
import math
from to_returns import to_returns
def test_log_returns_valores_conocidos():
precios = [100.0, 105.0, 103.0, 108.0]
res = to_returns(precios, method="log")
esperado = [
math.log(105 / 100),
math.log(103 / 105),
math.log(108 / 103),
]
assert res["n_returns"] == 3
assert res["n_skipped"] == 0
for got, exp in zip(res["returns"], esperado):
assert math.isclose(got, exp, rel_tol=1e-12)
def test_simple_returns_valores_conocidos():
precios = [100.0, 105.0, 103.0]
res = to_returns(precios, method="simple")
esperado = [105 / 100 - 1, 103 / 105 - 1]
for got, exp in zip(res["returns"], esperado):
assert math.isclose(got, exp, rel_tol=1e-12)
def test_log_marca_no_positivo_como_invalido():
# Un 0 invalida los dos pasos que lo tocan (prev=0 y cur=0).
res = to_returns([100.0, 0.0, 50.0], method="log")
assert res["n_skipped"] == 2
assert res["returns"] == [None, None]
assert res["mean"] is None
def test_simple_admite_negativos():
# Retornos negativos validos en simple; -10 no invalida (solo prev==0 lo hace).
res = to_returns([100.0, 90.0, 81.0], method="simple")
assert res["n_skipped"] == 0
assert all(r < 0 for r in res["returns"])
def test_method_invalido_devuelve_nota():
res = to_returns([1.0, 2.0, 3.0], method="cuadratico")
assert res["returns"] == []
assert "method" in res["note"]
def test_un_solo_punto_devuelve_nota():
res = to_returns([100.0])
assert res["n"] == 1
assert res["note"] == "datos insuficientes"
assert res["returns"] == []
def test_descarta_none_y_nan():
precios = [100.0, None, 105.0, float("nan"), 110.0]
res = to_returns(precios, method="log")
# Quedan 3 niveles validos (100, 105, 110) => 2 retornos.
assert res["n_levels"] == 3
assert res["n_returns"] == 2
def test_stats_de_retornos():
precios = [100.0, 110.0, 121.0] # +10% cada paso en simple
res = to_returns(precios, method="simple")
assert math.isclose(res["mean"], 0.10, rel_tol=1e-9)
assert math.isclose(res["std"], 0.0, abs_tol=1e-12)
assert math.isclose(res["min"], 0.10, rel_tol=1e-9)
assert math.isclose(res["max"], 0.10, rel_tol=1e-9)
+9 -4
View File
@@ -5,8 +5,8 @@ lang: py
domain: infra domain: infra
version: "1.0.0" version: "1.0.0"
purity: impure purity: impure
signature: "def duckdb_list_tables(db_path: str) -> dict" signature: "def duckdb_list_tables(db_path: str, base_tables_only: bool = False) -> dict"
description: "Lista las tablas de una base DuckDB abierta en modo solo lectura (duckdb.connect(db_path, read_only=True)), de modo que nunca crea ni modifica la base. La conexion se cierra siempre en try/finally. Consulta information_schema.tables del esquema main y devuelve los nombres ordenados alfabeticamente. Devuelve un dict sin lanzar (estilo del grupo duckdb): {status:'ok', tables} en exito y {status:'error', error} en fallo. Es la introspeccion 'que tablas hay' del grupo duckdb; complementa a duckdb_query_readonly_py_infra (lectura de filas) y a duckdb_table_schema_py_infra (schema de una tabla). Depende del paquete duckdb (1.5.2 en python/.venv)." description: "Lista las tablas de una base DuckDB abierta en modo solo lectura (duckdb.connect(db_path, read_only=True)), de modo que nunca crea ni modifica la base. La conexion se cierra siempre en try/finally. Consulta information_schema.tables del esquema main y devuelve los nombres ordenados alfabeticamente. Con base_tables_only=True filtra table_type='BASE TABLE', excluyendo las VIEWs (util para perfilar/relacionar solo tablas reales). Devuelve un dict sin lanzar (estilo del grupo duckdb): {status:'ok', tables} en exito y {status:'error', error} en fallo. Es la introspeccion 'que tablas hay' del grupo duckdb; complementa a duckdb_query_readonly_py_infra (lectura de filas) y a duckdb_table_schema_py_infra (schema de una tabla). Depende del paquete duckdb (1.5.2 en python/.venv)."
tags: [duckdb, sql, introspection, readonly, tables] tags: [duckdb, sql, introspection, readonly, tables]
uses_functions: [] uses_functions: []
uses_types: [] uses_types: []
@@ -17,12 +17,16 @@ imports: [duckdb]
params: params:
- name: db_path - name: db_path
desc: "ruta al archivo DuckDB. Debe existir: el modo read_only NO crea la base. Un path inexistente devuelve {status:'error'}." desc: "ruta al archivo DuckDB. Debe existir: el modo read_only NO crea la base. Un path inexistente devuelve {status:'error'}."
- name: base_tables_only
desc: "si True (default False) filtra table_type='BASE TABLE', excluyendo las VIEWs del esquema main. Util para perfilar/relacionar solo tablas reales (perfilar una VIEW infla el conteo y multiplica relaciones FK falsas)."
output: "dict. En exito: {status:'ok', tables:[str,...]} con los nombres de tabla del esquema main ordenados alfabeticamente. En error (sin lanzar): {status:'error', error:str}." output: "dict. En exito: {status:'ok', tables:[str,...]} con los nombres de tabla del esquema main ordenados alfabeticamente. En error (sin lanzar): {status:'error', error:str}."
tested: true tested: true
tests: tests:
- "test_lista_tablas_ordenadas" - "test_lista_tablas_ordenadas"
- "test_base_vacia_devuelve_lista_vacia" - "test_base_vacia_devuelve_lista_vacia"
- "test_db_inexistente_devuelve_status_error" - "test_db_inexistente_devuelve_status_error"
- "test_base_tables_only_excluye_views"
- "test_attach_sqlite_materializado_lista_por_information_schema"
test_file_path: "python/functions/infra/duckdb_list_tables_test.py" test_file_path: "python/functions/infra/duckdb_list_tables_test.py"
file_path: "python/functions/infra/duckdb_list_tables.py" file_path: "python/functions/infra/duckdb_list_tables.py"
--- ---
@@ -64,7 +68,8 @@ selector de tablas en una UI. Es el primer paso natural antes de
- DuckDB es single-writer: si otro proceso tiene la base abierta en escritura con - DuckDB es single-writer: si otro proceso tiene la base abierta en escritura con
una version distinta del motor, la apertura read-only puede fallar con error de una version distinta del motor, la apertura read-only puede fallar con error de
lock. El error se devuelve como `{status:'error', ...}`, no se lanza. lock. El error se devuelve como `{status:'error', ...}`, no se lanza.
- Solo lista tablas del esquema `main` (el por defecto). Vistas y tablas de otros - Solo lista objetos del esquema `main` (el por defecto); tablas de otros esquemas
esquemas no aparecen. no aparecen. Por defecto incluye **vistas** (table_type VIEW) además de las tablas
base; pasa `base_tables_only=True` para quedarte solo con las `BASE TABLE`.
- Una base recien creada sin tablas devuelve `{status:'ok', tables:[]}` (no es un - Una base recien creada sin tablas devuelve `{status:'ok', tables:[]}` (no es un
error): lista vacia. error): lista vacia.
+15 -4
View File
@@ -13,12 +13,19 @@ introspeccion de alto nivel "que tablas hay" del grupo duckdb.
""" """
def duckdb_list_tables(db_path: str) -> dict: def duckdb_list_tables(db_path: str, base_tables_only: bool = False) -> dict:
"""Lista las tablas de una base DuckDB en modo solo lectura. """Lista las tablas de una base DuckDB en modo solo lectura.
Args: Args:
db_path: ruta al archivo DuckDB. Debe existir: el modo read_only NO crea db_path: ruta al archivo DuckDB. Debe existir: el modo read_only NO crea
la base. Un path inexistente devuelve {status:'error', ...}. la base. Un path inexistente devuelve {status:'error', ...}.
base_tables_only: si True (default False) filtra por
`table_type = 'BASE TABLE'`, excluyendo las VIEWs (y demas objetos no
tabla-base) del esquema `main`. Util para perfilar/relacionar solo las
tablas reales: perfilar una VIEW infla el numero de tablas y multiplica
las relaciones FK falsas. El default mantiene el comportamiento previo
(lista todo lo que aparece en information_schema.tables del esquema
main) para no romper consumidores existentes.
Returns: Returns:
dict. En exito: {status:'ok', tables:[str,...]} con los nombres de tabla dict. En exito: {status:'ok', tables:[str,...]} con los nombres de tabla
@@ -28,10 +35,14 @@ def duckdb_list_tables(db_path: str) -> dict:
conn = None conn = None
try: try:
conn = __import__("duckdb").connect(db_path, read_only=True) conn = __import__("duckdb").connect(db_path, read_only=True)
rows = conn.execute( sql = (
"SELECT table_name FROM information_schema.tables " "SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'main' ORDER BY table_name" "WHERE table_schema = 'main'"
).fetchall() )
if base_tables_only:
sql += " AND table_type = 'BASE TABLE'"
sql += " ORDER BY table_name"
rows = conn.execute(sql).fetchall()
tables = [row[0] for row in rows] tables = [row[0] for row in rows]
return {"status": "ok", "tables": tables} return {"status": "ok", "tables": tables}
except Exception as e: # noqa: BLE001 except Exception as e: # noqa: BLE001
@@ -38,3 +38,59 @@ def test_db_inexistente_devuelve_status_error(tmp_path):
res = duckdb_list_tables(str(tmp_path / "noexiste.duckdb")) res = duckdb_list_tables(str(tmp_path / "noexiste.duckdb"))
assert res["status"] == "error" assert res["status"] == "error"
assert "error" in res assert "error" in res
def test_base_tables_only_excluye_views(tmp_path):
# Una BASE TABLE + una VIEW: por defecto se listan ambas; con
# base_tables_only=True la VIEW se excluye.
db = tmp_path / "withviews.duckdb"
con = duckdb.connect(str(db))
con.execute("CREATE TABLE ventas (id INTEGER, total DOUBLE)")
con.execute("CREATE VIEW ventas_resumen AS SELECT id FROM ventas")
con.close()
# Default: incluye la view.
res_all = duckdb_list_tables(str(db))
assert res_all["status"] == "ok"
assert res_all["tables"] == ["ventas", "ventas_resumen"]
# base_tables_only: solo la tabla base.
res_base = duckdb_list_tables(str(db), base_tables_only=True)
assert res_base["status"] == "ok"
assert res_base["tables"] == ["ventas"]
def test_attach_sqlite_materializado_lista_por_information_schema(tmp_path):
# Regresión H14: tras ATTACH de una base SQLite en DuckDB se materializan sus
# tablas y se listan vía information_schema (NO sqlite_master, que no existe en
# DuckDB). duckdb_list_tables debe verlas como tablas del esquema main.
import sqlite3
sqlite_path = str(tmp_path / "src.sqlite")
sconn = sqlite3.connect(sqlite_path)
sconn.execute("CREATE TABLE clientes (id INTEGER PRIMARY KEY, nombre TEXT)")
sconn.execute("INSERT INTO clientes VALUES (1,'Ana'),(2,'Luis')")
sconn.execute("CREATE VIEW clientes_v AS SELECT id FROM clientes")
sconn.commit()
sconn.close()
ddb_path = str(tmp_path / "materialized.duckdb")
con = duckdb.connect(ddb_path)
con.execute("INSTALL sqlite")
con.execute("LOAD sqlite")
con.execute(f"ATTACH '{sqlite_path}' AS src (TYPE sqlite)")
# Listar tablas base del catálogo attachado por information_schema (no
# sqlite_master) y materializarlas como tablas nativas DuckDB.
rows = con.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_catalog='src' AND table_type='BASE TABLE' "
"AND table_name NOT LIKE 'sqlite_%'"
).fetchall()
for (name,) in rows:
con.execute(f'CREATE TABLE "{name}" AS SELECT * FROM src."{name}"')
con.execute("DETACH src")
con.close()
res = duckdb_list_tables(ddb_path)
assert res["status"] == "ok"
assert "clientes" in res["tables"]
@@ -151,9 +151,11 @@ def profile_database(
} }
""" """
try: try:
# 1) Resolver lista de tablas. # 1) Resolver lista de tablas. Solo BASE TABLE: las VIEWs no son tablas
# reales — perfilarlas infla n_tables y multiplica las FK falsas (sus
# columnas son copias de las de las tablas base, con contención perfecta).
if tables is None: if tables is None:
lst = duckdb_list_tables(db_path) lst = duckdb_list_tables(db_path, base_tables_only=True)
if lst.get("status") != "ok": if lst.get("status") != "ok":
return {"status": "error", "error": lst.get("error", "list failed")} return {"status": "error", "error": lst.get("error", "list failed")}
tables = lst.get("tables", []) tables = lst.get("tables", [])
@@ -78,6 +78,77 @@ def test_profile_database_two_related_tables():
assert res["report_json_path"] is None assert res["report_json_path"] is None
def test_profile_database_excluye_views(tmp_path):
# Regresión H5: una VIEW no es una tabla real. profile_database debe perfilar
# solo las BASE TABLE y no contar las VIEWs (inflan n_tables y multiplican FK
# falsas, al ser copias de columnas de las tablas base).
db_path = os.path.join(str(tmp_path), "withviews.duckdb")
_build_related_db(db_path)
con = duckdb.connect(db_path)
con.execute("CREATE VIEW customers_v AS SELECT id, name FROM customers")
con.execute("CREATE VIEW orders_v AS SELECT order_id, total FROM orders")
con.close()
res = profile_database(db_path, write_report=False)
assert res["status"] == "ok", res
prof = res["db_profile"]
# Solo las 2 tablas base; las 2 views quedan fuera.
assert prof["n_tables"] == 2
profiled = {tp["table"] for tp in prof["table_profiles"]}
assert profiled == {"customers", "orders"}
assert "customers_v" not in profiled
assert "orders_v" not in profiled
def test_profile_database_attach_sqlite_no_usa_sqlite_master(tmp_path):
# Regresión H14: materializar una base SQLite vía ATTACH (information_schema,
# no sqlite_master) y perfilarla con profile_database sin que falle. Blinda el
# bug original 'sqlite_master does not exist'.
import sqlite3
sqlite_path = os.path.join(str(tmp_path), "shop.sqlite")
sconn = sqlite3.connect(sqlite_path)
sconn.execute("CREATE TABLE customers (id INTEGER PRIMARY KEY, name TEXT)")
sconn.execute("INSERT INTO customers VALUES (1,'Ana'),(2,'Luis'),(3,'Marta')")
sconn.execute(
"CREATE TABLE orders (order_id INTEGER, customer_id INTEGER, total REAL)"
)
sconn.execute(
"INSERT INTO orders VALUES (10,1,99.5),(11,2,12.0),(12,3,7.25),(13,1,5.0)"
)
sconn.execute("CREATE VIEW big_orders AS SELECT * FROM orders WHERE total > 10")
sconn.commit()
sconn.close()
ddb_path = os.path.join(str(tmp_path), "shop_mat.duckdb")
con = duckdb.connect(ddb_path)
con.execute("INSTALL sqlite")
con.execute("LOAD sqlite")
con.execute(f"ATTACH '{sqlite_path}' AS src (TYPE sqlite)")
rows = con.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_catalog='src' AND table_type='BASE TABLE' "
"AND table_name NOT LIKE 'sqlite_%'"
).fetchall()
for (name,) in rows:
con.execute(f'CREATE TABLE "{name}" AS SELECT * FROM src."{name}"')
con.execute("DETACH src")
con.close()
res = profile_database(ddb_path, write_report=False)
assert res["status"] == "ok", res
prof = res["db_profile"]
# Solo las 2 tablas base materializadas (la VIEW no se materializó).
profiled = {tp["table"] for tp in prof["table_profiles"]}
assert profiled == {"customers", "orders"}
# FK orders.customer_id -> customers.id detectable.
assert any(
fk.get("from_table") == "orders" and fk.get("to_table") == "customers"
for fk in prof["fk_candidates"]
), prof["fk_candidates"]
def test_profile_database_writes_report(tmp_path): def test_profile_database_writes_report(tmp_path):
db_path = os.path.join(str(tmp_path), "shop2.duckdb") db_path = os.path.join(str(tmp_path), "shop2.duckdb")
_build_related_db(db_path) _build_related_db(db_path)
+77 -10
View File
@@ -57,6 +57,57 @@ _DATETIME_SEMANTIC = ("datetime_iso", "date_eu")
# promocion a numeric (evita promocionar columnas mayormente no parseables). # promocion a numeric (evita promocionar columnas mayormente no parseables).
_PROMOTE_MIN_PARSE = 0.8 _PROMOTE_MIN_PARSE = 0.8
# Cardinalidad maxima (distinct_count) por debajo de la cual una columna numerica
# se trata como NO continua (binaria / ordinal de pocos niveles) y, por tanto, no
# es candidata a re-expresion de Tukey (la escalera de potencias no aplica a una
# variable con pocos niveles discretos).
_REEXPR_MIN_DISTINCT = 12
# Tokens en el nombre (o semantic_type currency) que sugieren que una serie de
# niveles es FINANCIERA (precios/volumen): en ese caso la transformacion adecuada
# son los retornos. Para magnitudes fisicas (temperatura, caudal) la transformacion
# correcta son las diferencias, no los retornos.
_FINANCIAL_TOKENS = (
"price", "close", "open", "high", "low", "volume", "adj", "vwap",
"bid", "ask", "return", "precio", "cierre", "apertura", "cotiz", "retorno",
)
def _is_continuous_for_reexpr(col: dict, vals_float: list) -> bool:
"""True si la columna numerica es continua y justifica sugerir re-expresion.
Se saltan (devuelve False):
- binarias / ordinales de baja cardinalidad (``distinct_count`` <= umbral):
la escalera de potencias de Tukey no tiene sentido sobre pocos niveles
discretos (p.ej. ``Survived`` 0/1, ``Pclass`` 1/2/3).
- identificadores enteros (flag ``possible_id`` y todos los valores enteros):
re-expresar un id (p.ej. ``PassengerId`` 1..n) no aporta nada.
Los floats continuos de alta cardinalidad (precios, medidas) NO se saltan
aunque lleven ``possible_id``, porque tienen parte decimal (no son enteros).
"""
dc = col.get("distinct_count")
if isinstance(dc, int) and not isinstance(dc, bool) and dc <= _REEXPR_MIN_DISTINCT:
return False
flags = col.get("flags") or []
if "possible_id" in flags and vals_float and all(
float(f).is_integer() for f in vals_float
):
return False
return True
def _looks_financial(col: dict) -> bool:
"""True si la columna parece una serie financiera (precio/volumen/divisa).
Heuristica por nombre (tokens OHLCV típicos) o ``semantic_type == currency``.
Decide si una serie de niveles se debe transformar a retornos (financiera) o a
diferencias (no financiera, p.ej. temperatura).
"""
name = (col.get("name") or "").lower()
if any(tok in name for tok in _FINANCIAL_TOKENS):
return True
return (col.get("semantic_type") or "").lower() == "currency"
def _to_float(value): def _to_float(value):
"""Parsea un valor a float limpiando simbolos de moneda y separadores. """Parsea un valor a float limpiando simbolos de moneda y separadores.
@@ -175,8 +226,12 @@ def _build_series_block(query_fn, table: str, col: dict, order_col, sample: int)
"stl": stl_decompose(series_vals), "stl": stl_decompose(series_vals),
} }
# Sugerencia de retornos solo si la columna parece de niveles: estrictamente # Sugerencia de transformacion solo si la columna parece de niveles:
# positiva y con veredicto de estacionariedad NO confirmado. # estrictamente positiva y con veredicto de estacionariedad NO confirmado.
# La transformacion adecuada depende de la SEMANTICA: retornos para series
# financieras (precios/volumen), diferencias para magnitudes fisicas
# (temperatura, caudal). Aplicar "retornos" a una temperatura no tiene sentido
# fisico; la primera diferencia si la estaciona.
nb = col.get("numeric") or {} nb = col.get("numeric") or {}
minimum = nb.get("min") minimum = nb.get("min")
verdict = (block["stationarity"] or {}).get("verdict") verdict = (block["stationarity"] or {}).get("verdict")
@@ -186,13 +241,22 @@ def _build_series_block(query_fn, table: str, col: dict, order_col, sample: int)
and minimum > 0 and minimum > 0
and verdict in ("non_stationary", "inconclusive") and verdict in ("non_stationary", "inconclusive")
): ):
block["to_returns"] = to_returns(series_vals, method="log")
block["levels_suggested"] = True block["levels_suggested"] = True
block["levels_reason"] = ( if _looks_financial(col):
"columna estrictamente positiva y no claramente estacionaria: parece una " block["levels_kind"] = "returns"
"serie de niveles (precios); trabajar sobre retornos evita correlacion " block["to_returns"] = to_returns(series_vals, method="log")
"espuria (Granger-Newbold)." block["levels_reason"] = (
) "columna financiera estrictamente positiva y no claramente "
"estacionaria (serie de niveles/precios): trabajar sobre retornos "
"evita correlacion espuria (Granger-Newbold)."
)
else:
block["levels_kind"] = "differences"
block["levels_reason"] = (
"serie de niveles no financiera y no claramente estacionaria: la "
"primera diferencia la estaciona; los retornos no tienen sentido en "
"magnitudes fisicas (p.ej. temperatura)."
)
else: else:
block["levels_suggested"] = False block["levels_suggested"] = False
@@ -296,8 +360,11 @@ def profile_table(
vals_float = [f for f in (_to_float(v) for v in vals) if f is not None] vals_float = [f for f in (_to_float(v) for v in vals) if f is not None]
col["numeric"] = describe_numeric(vals_float) col["numeric"] = describe_numeric(vals_float)
# Re-expresion sugerida (escalera de Tukey): que transformacion # Re-expresion sugerida (escalera de Tukey): que transformacion
# simetriza mejor la columna a partir de su skew/dominio. # simetriza mejor la columna a partir de su skew/dominio. Solo para
col["reexpression"] = suggest_reexpression(col["numeric"]) # columnas CONTINUAS: no aplica a binarias/ordinales de baja
# cardinalidad ni a identificadores enteros (la fila seria ruido).
if _is_continuous_for_reexpr(col, vals_float):
col["reexpression"] = suggest_reexpression(col["numeric"])
elif inferred in ("categorical", "text"): elif inferred in ("categorical", "text"):
col["categorical"] = summarize_categorical(vals) col["categorical"] = summarize_categorical(vals)
# Para columnas no promovidas que ya eran categorical/text y no # Para columnas no promovidas que ya eran categorical/text y no
@@ -13,7 +13,112 @@ import tempfile
import duckdb import duckdb
from pipelines.profile_table import profile_table from pipelines.profile_table import (
_is_continuous_for_reexpr,
_looks_financial,
profile_table,
)
# --- H12: re-expresión solo para columnas continuas -------------------------
def test_is_continuous_for_reexpr_baja_cardinalidad():
# Binaria (2 niveles) y ordinal de baja cardinalidad (3 niveles): NO continuas.
binaria = {"distinct_count": 2, "flags": []}
ordinal = {"distinct_count": 3, "flags": []}
assert _is_continuous_for_reexpr(binaria, [0.0, 1.0, 0.0, 1.0]) is False
assert _is_continuous_for_reexpr(ordinal, [1.0, 2.0, 3.0, 2.0]) is False
def test_is_continuous_for_reexpr_id_entero():
# Identificador entero (possible_id + todos enteros): NO continua.
idcol = {"distinct_count": 200, "flags": ["possible_id"]}
vals = [float(i) for i in range(1, 201)]
assert _is_continuous_for_reexpr(idcol, vals) is False
def test_is_continuous_for_reexpr_float_continuo():
# Float continuo de alta cardinalidad, aunque lleve possible_id, SÍ es continuo
# (tiene parte decimal, no es un id entero).
precio = {"distinct_count": 200, "flags": ["possible_id"]}
vals = [i * 1.7 for i in range(200)]
assert _is_continuous_for_reexpr(precio, vals) is True
def test_reexpression_solo_para_columnas_continuas():
# En una tabla con binaria/ordinal/id/continua, solo la continua trae el bloque
# reexpression en su ColumnProfile.
tmp_dir = tempfile.mkdtemp(prefix="reexpr_test_")
db_path = os.path.join(tmp_dir, "t.duckdb")
con = duckdb.connect(db_path)
con.execute(
"CREATE TABLE t (pid INTEGER, surv INTEGER, pclass INTEGER, fare DOUBLE)"
)
con.execute(
"INSERT INTO t SELECT i, i%2, (i%3)+1, ((i*1.7)%50)+0.3 "
"FROM range(300) tbl(i)"
)
con.close()
r = profile_table(db_path, "t", write_report=False)
assert r["status"] == "ok", r
prof = r["profile"]
assert _col(prof, "pid").get("reexpression") is None # id entero
assert _col(prof, "surv").get("reexpression") is None # binaria
assert _col(prof, "pclass").get("reexpression") is None # ordinal baja card
assert _col(prof, "fare").get("reexpression") is not None # continua
# --- H13: retornos (financiera) vs diferencias (física) ---------------------
def test_looks_financial_por_nombre_y_semantic():
assert _looks_financial({"name": "Close"}) is True
assert _looks_financial({"name": "Adj Close"}) is True
assert _looks_financial({"name": "Volume"}) is True
assert _looks_financial({"name": "precio_cierre"}) is True
assert _looks_financial({"name": "temp_max"}) is False
assert _looks_financial({"name": "precipitation"}) is False
assert _looks_financial({"name": "caudal", "semantic_type": "currency"}) is True
def _make_series_db(value_col: str) -> str:
"""DuckDB con una serie de niveles no estacionaria (random walk creciente)."""
tmp_dir = tempfile.mkdtemp(prefix="series_test_")
db_path = os.path.join(tmp_dir, "s.duckdb")
con = duckdb.connect(db_path)
con.execute(f'CREATE TABLE s (ts INTEGER, "{value_col}" DOUBLE)')
# Niveles estrictamente positivos con tendencia creciente (no estacionaria).
level = 100.0
rows = []
for t in range(80):
level += 1.0 + (t % 7) * 0.3 # incrementos positivos deterministas
rows.append((t, level))
con.executemany(f'INSERT INTO s VALUES (?, ?)', rows)
con.close()
return db_path
def test_series_financiera_sugiere_retornos():
db_path = _make_series_db("close")
r = profile_table(db_path, "s", run_series=True, write_report=False)
assert r["status"] == "ok", r
s = _col(r["profile"], "close").get("series")
assert s is not None
if s.get("levels_suggested"):
assert s.get("levels_kind") == "returns"
def test_series_no_financiera_sugiere_diferencias():
db_path = _make_series_db("temp_max")
r = profile_table(db_path, "s", run_series=True, write_report=False)
assert r["status"] == "ok", r
s = _col(r["profile"], "temp_max").get("series")
assert s is not None
if s.get("levels_suggested"):
assert s.get("levels_kind") == "differences"
# Para diferencias no se computa el bloque de retornos.
assert "to_returns" not in s
def _make_db() -> str: def _make_db() -> str: