Files
fn_registry/python/functions/pipelines/profile_table.py
T
Egutierrez caf8c25d99 fix(eda): bugs de bajo riesgo del benchmark (H1,H5,H12,H13,H14) + tests faltantes
- H1: render_eda_markdown ya no aplica doble x100 a outlier_pct (336% -> real)
- H5: profile_database filtra base_tables_only (excluye VIEWs; sakila 21->16)
- H12: suggest_reexpression salta columnas no-continuas
- H13: to_returns/profile_table elige retornos (financiera) vs diferencias (fisica)
- H14: test de regresion ATTACH sqlite via information_schema
- +8 tests de las funciones eda nuevas (acf_pacf, adf_kpss, ...). 77 tests verdes
- L/M (H2,H3,H4,H6,H7,H8,H9,H10,H11) quedan en issues 0174-0177 para revision

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 03:51:11 +02:00

545 lines
23 KiB
Python

"""profile_table — orquestador one-shot del grupo de capacidad `eda`.
Pipeline impuro: perfila UNA tabla DuckDB end-to-end componiendo las funciones
puras e impuras del grupo `eda` y, opcionalmente, escribe un report markdown +
JSON sidecar a disco. Es la composicion canonica para "hazme un EDA de esta
tabla": una sola llamada en vez de orquestar 7 funciones a mano.
Funciones del registry compuestas (NO se reimplementa su logica):
- summarize_table_duckdb : perfil base por columna (push-down SQL, sin RAM).
- duckdb_query_readonly : muestra read-only de valores no nulos por columna.
- infer_semantic_type : clasifica VARCHAR (email, integer, currency, ...).
- describe_numeric : estadistica fina sobre la muestra numerica.
- summarize_categorical : top-k, moda, entropia sobre la muestra categorica.
- column_quality_score : score 0-100 de calidad por columna.
- render_eda_markdown : report legible del TableProfile.
Aporta una capa propia de PROMOCION DE TIPO: muchas tablas guardan numeros y
fechas como VARCHAR. Tras el perfil base, se muestrea cada columna textual, se
infiere su semantic_type y, si encaja, se promociona inferred_type a "numeric"
o "datetime" antes de enriquecer. Asi una columna '10','20' (VARCHAR) recibe su
bloque numeric en vez de quedarse como categorica.
Estilo dict-no-throw del grupo: nunca lanza; captura cualquier error y devuelve
{status:'error', error:str}.
"""
import json
import os
from datetime import datetime, timezone
from datascience import (
acf_pacf,
adf_kpss_stationarity,
association_matrix,
column_quality_score,
describe_numeric,
eda_llm_insights,
exploratory_caveats,
infer_semantic_type,
render_eda_markdown,
render_eda_pdf,
run_eda_models,
stl_decompose,
suggest_reexpression,
summarize_categorical,
summarize_table_duckdb,
summarize_table_pg,
to_returns,
)
from infra import duckdb_query_readonly, pg_query
# semantic_types que justifican promocionar inferred_type -> "numeric".
_NUMERIC_SEMANTIC = ("integer", "decimal", "currency")
# semantic_types que justifican promocionar inferred_type -> "datetime".
_DATETIME_SEMANTIC = ("datetime_iso", "date_eu")
# Fraccion minima de la muestra que debe parsear a float para confirmar la
# promocion a numeric (evita promocionar columnas mayormente no parseables).
_PROMOTE_MIN_PARSE = 0.8
# Cardinalidad maxima (distinct_count) por debajo de la cual una columna numerica
# se trata como NO continua (binaria / ordinal de pocos niveles) y, por tanto, no
# es candidata a re-expresion de Tukey (la escalera de potencias no aplica a una
# variable con pocos niveles discretos).
_REEXPR_MIN_DISTINCT = 12
# Tokens en el nombre (o semantic_type currency) que sugieren que una serie de
# niveles es FINANCIERA (precios/volumen): en ese caso la transformacion adecuada
# son los retornos. Para magnitudes fisicas (temperatura, caudal) la transformacion
# correcta son las diferencias, no los retornos.
_FINANCIAL_TOKENS = (
"price", "close", "open", "high", "low", "volume", "adj", "vwap",
"bid", "ask", "return", "precio", "cierre", "apertura", "cotiz", "retorno",
)
def _is_continuous_for_reexpr(col: dict, vals_float: list) -> bool:
"""True si la columna numerica es continua y justifica sugerir re-expresion.
Se saltan (devuelve False):
- binarias / ordinales de baja cardinalidad (``distinct_count`` <= umbral):
la escalera de potencias de Tukey no tiene sentido sobre pocos niveles
discretos (p.ej. ``Survived`` 0/1, ``Pclass`` 1/2/3).
- identificadores enteros (flag ``possible_id`` y todos los valores enteros):
re-expresar un id (p.ej. ``PassengerId`` 1..n) no aporta nada.
Los floats continuos de alta cardinalidad (precios, medidas) NO se saltan
aunque lleven ``possible_id``, porque tienen parte decimal (no son enteros).
"""
dc = col.get("distinct_count")
if isinstance(dc, int) and not isinstance(dc, bool) and dc <= _REEXPR_MIN_DISTINCT:
return False
flags = col.get("flags") or []
if "possible_id" in flags and vals_float and all(
float(f).is_integer() for f in vals_float
):
return False
return True
def _looks_financial(col: dict) -> bool:
"""True si la columna parece una serie financiera (precio/volumen/divisa).
Heuristica por nombre (tokens OHLCV típicos) o ``semantic_type == currency``.
Decide si una serie de niveles se debe transformar a retornos (financiera) o a
diferencias (no financiera, p.ej. temperatura).
"""
name = (col.get("name") or "").lower()
if any(tok in name for tok in _FINANCIAL_TOKENS):
return True
return (col.get("semantic_type") or "").lower() == "currency"
def _to_float(value):
"""Parsea un valor a float limpiando simbolos de moneda y separadores.
Quita simbolos de divisa (EUR/USD/GBP/€/$/£), espacios y separadores de
miles, y normaliza la coma decimal. Devuelve None si no parsea.
"""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
s = str(value).strip()
if not s:
return None
# Limpia simbolos de moneda y unidades textuales.
for tok in ("", "$", "£", "EUR", "USD", "GBP", "eur", "usd", "gbp"):
s = s.replace(tok, "")
s = s.strip()
# Normaliza separadores: si hay coma y punto, asume punto=miles, coma=decimal.
if "," in s and "." in s:
s = s.replace(".", "").replace(",", ".")
elif "," in s:
# Solo coma: tratar como separador decimal.
s = s.replace(",", ".")
s = s.replace(" ", "")
try:
return float(s)
except (TypeError, ValueError):
return None
def _sample_values(query_fn, table: str, name: str, sample: int) -> list:
"""Trae hasta `sample` valores no nulos de una columna (read-only).
query_fn(sql) -> dict es el lector read-only del backend activo
(duckdb_query_readonly o pg_query), inyectado por profile_table.
"""
q = query_fn(
f'SELECT "{name}" AS v FROM "{table}" WHERE "{name}" IS NOT NULL '
f"LIMIT {int(sample)}",
)
if q.get("status") != "ok":
return []
return [row.get("v") for row in q.get("rows", [])]
def _sample_rows(query_fn, table: str, names: list, sample: int) -> list:
"""Trae hasta `sample` filas completas con las columnas alineadas por fila.
A diferencia de _sample_values (una columna, solo no nulos), esto preserva la
alineacion por fila entre columnas, requisito de la matriz de asociacion
(los pares (a_i, b_i) deben venir de la misma fila). query_fn es el lector
read-only del backend activo, inyectado por profile_table.
"""
if not names:
return []
cols_sql = ", ".join(f'"{n}"' for n in names)
q = query_fn(f'SELECT {cols_sql} FROM "{table}" LIMIT {int(sample)}')
if q.get("status") != "ok":
return []
return q.get("rows", [])
def _sample_series(query_fn, table: str, value_col: str, order_col, sample: int) -> list:
"""Trae hasta `sample` valores no nulos de una columna en orden de serie temporal.
A diferencia de _sample_values, cuando hay una columna de orden temporal
(`order_col`, normalmente la primera columna datetime de la tabla) se ordena
ascendentemente por ella para que la secuencia recuperada respete el orden
cronologico, requisito de los contrastes de serie temporal (ADF/KPSS, ACF/PACF,
STL). Si `order_col` es None se cae al orden fisico de inserciones (columna
numerica secuencial). query_fn es el lector read-only del backend activo.
"""
base = (
f'SELECT "{value_col}" AS v FROM "{table}" '
f'WHERE "{value_col}" IS NOT NULL'
)
if order_col:
base += f' ORDER BY "{order_col}"'
base += f" LIMIT {int(sample)}"
q = query_fn(base)
if q.get("status") != "ok":
return []
return [row.get("v") for row in q.get("rows", [])]
def _build_series_block(query_fn, table: str, col: dict, order_col, sample: int) -> dict:
"""Construye el bloque `series` de una columna numerica (estilo dict-no-throw).
Compone los contrastes de serie temporal del grupo `eda` sobre la secuencia
ordenada de la columna: estacionariedad (ADF+KPSS), autocorrelacion (ACF/PACF +
Ljung-Box) y descomposicion STL (tendencia/estacional/resto). Cuando la columna
parece de NIVELES (precios: estrictamente positiva y no claramente estacionaria)
anade ademas la conversion a retornos (`to_returns`) como sugerencia, ya que
correlacionar/modelar niveles no estacionarios produce relaciones espurias
(Granger-Newbold).
Devuelve None si no hay suficientes puntos validos (<8) para ningun contraste.
"""
name = col.get("name")
raw = _sample_series(query_fn, table, name, order_col, sample)
series_vals = [f for f in (_to_float(v) for v in raw) if f is not None]
if len(series_vals) < 8:
return None
block: dict = {
"order_col": order_col,
"ordered": bool(order_col),
"n": len(series_vals),
"stationarity": adf_kpss_stationarity(series_vals),
"acf_pacf": acf_pacf(series_vals),
# stl_decompose auto-infiere el periodo; si no hay estacionalidad detectable
# devuelve una nota y strengths None (se incluye igual, es informativo).
"stl": stl_decompose(series_vals),
}
# Sugerencia de transformacion solo si la columna parece de niveles:
# estrictamente positiva y con veredicto de estacionariedad NO confirmado.
# La transformacion adecuada depende de la SEMANTICA: retornos para series
# financieras (precios/volumen), diferencias para magnitudes fisicas
# (temperatura, caudal). Aplicar "retornos" a una temperatura no tiene sentido
# fisico; la primera diferencia si la estaciona.
nb = col.get("numeric") or {}
minimum = nb.get("min")
verdict = (block["stationarity"] or {}).get("verdict")
if (
isinstance(minimum, (int, float))
and not isinstance(minimum, bool)
and minimum > 0
and verdict in ("non_stationary", "inconclusive")
):
block["levels_suggested"] = True
if _looks_financial(col):
block["levels_kind"] = "returns"
block["to_returns"] = to_returns(series_vals, method="log")
block["levels_reason"] = (
"columna financiera estrictamente positiva y no claramente "
"estacionaria (serie de niveles/precios): trabajar sobre retornos "
"evita correlacion espuria (Granger-Newbold)."
)
else:
block["levels_kind"] = "differences"
block["levels_reason"] = (
"serie de niveles no financiera y no claramente estacionaria: la "
"primera diferencia la estaciona; los retornos no tienen sentido en "
"magnitudes fisicas (p.ej. temperatura)."
)
else:
block["levels_suggested"] = False
return block
def profile_table(
db_path: str,
table: str,
backend: str = "duckdb",
sample: int = 5000,
run_models: bool = False,
run_llm: bool = False,
run_series: bool = False,
emit_pdf: bool = False,
report_dir: str = "reports",
write_report: bool = True,
) -> dict:
"""Perfila una tabla (DuckDB o PostgreSQL) end-to-end y emite el TableProfile.
Args:
db_path: ruta al archivo DuckDB, o DSN PostgreSQL si backend="postgres".
table: nombre de la tabla a perfilar.
backend: "duckdb" (default) o "postgres". Selecciona el motor de
perfilado base (summarize) y de muestreo read-only.
sample: maximo de valores no nulos muestreados por columna para el
enriquecimiento (describe_numeric / summarize_categorical /
infer_semantic_type). Default 5000.
run_models: si True (default False) corre los modelos baratos
(PCA/KMeans/IsolationForest/normalidad) sobre las numericas y guarda
el bloque en prof["models"].
run_llm: si True (default False) hace 1 llamada LLM sobre el perfil
agregado y guarda el resultado en prof["llm"].
run_series: si True (default False) calcula, para cada columna numerica,
un bloque de serie temporal (estacionariedad ADF+KPSS, ACF/PACF,
descomposicion STL y, si parece de niveles, conversion a retornos).
Si hay una columna datetime se usa como orden cronologico; si no, se
usa el orden fisico de filas (columna numerica secuencial). Los bloques
se guardan por columna en col["series"] y agregados en prof["series"].
emit_pdf: si True (default False) renderiza un PDF multipagina vertical
(legible en movil) del perfil junto al report markdown y devuelve su
ruta en pdf_path.
report_dir: directorio donde escribir los reports si write_report.
Default "reports". Se crea si no existe.
write_report: si True (default), escribe un report markdown + un JSON
sidecar timestamped en report_dir. Si False, no toca disco y los
paths del retorno son None.
Returns:
dict. En exito: {status:'ok', profile: <TableProfile>,
report_md_path: str|None, report_json_path: str|None, pdf_path: str|None}.
En error (sin lanzar): {status:'error', error:str}.
"""
try:
# 1) Perfil base por columna (push-down SQL) + lector read-only del
# backend activo, inyectado en el muestreo (_sample_values/_sample_rows).
if backend == "postgres":
r = summarize_table_pg(db_path, table)
def _q(sql):
return pg_query(db_path, sql)
elif backend == "duckdb":
r = summarize_table_duckdb(db_path, table)
def _q(sql):
return duckdb_query_readonly(db_path, sql)
else:
return {"status": "error", "error": f"backend desconocido: {backend}"}
if r.get("status") != "ok":
return {"status": "error", "error": r.get("error", "summarize failed")}
prof = r["profile"]
cols = prof.get("columns", [])
for col in cols:
name = col.get("name")
inferred = col.get("inferred_type")
# 2) Muestra de valores no nulos.
vals = _sample_values(_q, table, name, sample)
# 3) Promocion de tipo sobre columnas textuales.
if inferred in ("categorical", "text"):
sem = infer_semantic_type(vals)
semantic = sem.get("semantic_type", "")
col["semantic_type"] = semantic
if semantic in _NUMERIC_SEMANTIC:
parsed = [_to_float(v) for v in vals]
ok = [f for f in parsed if f is not None]
if vals and (len(ok) / len(vals)) >= _PROMOTE_MIN_PARSE:
col["inferred_type"] = "numeric"
inferred = "numeric"
elif semantic in _DATETIME_SEMANTIC:
col["inferred_type"] = "datetime"
inferred = "datetime"
# 4) Enriquecer segun el inferred_type final.
if inferred == "numeric":
vals_float = [f for f in (_to_float(v) for v in vals) if f is not None]
col["numeric"] = describe_numeric(vals_float)
# Re-expresion sugerida (escalera de Tukey): que transformacion
# simetriza mejor la columna a partir de su skew/dominio. Solo para
# columnas CONTINUAS: no aplica a binarias/ordinales de baja
# cardinalidad ni a identificadores enteros (la fila seria ruido).
if _is_continuous_for_reexpr(col, vals_float):
col["reexpression"] = suggest_reexpression(col["numeric"])
elif inferred in ("categorical", "text"):
col["categorical"] = summarize_categorical(vals)
# Para columnas no promovidas que ya eran categorical/text y no
# habian pasado por infer arriba, asegurar semantic_type seteado.
if not col.get("semantic_type"):
col["semantic_type"] = infer_semantic_type(vals).get(
"semantic_type", ""
)
elif inferred == "datetime":
# profile_datetime llega en otra fase; conserva semantic_type.
col["datetime"] = None
# 5) Score de calidad por columna.
col["quality_score"] = column_quality_score(col).get("score")
# 6) Score agregado de la tabla (media de columnas).
scores = [
c["quality_score"] for c in cols if c.get("quality_score") is not None
]
prof["quality_score"] = round(sum(scores) / len(scores), 1) if scores else None
# 7) Candidatos a clave.
key_candidates = []
for c in cols:
flags = c.get("flags") or []
unique_pct = c.get("unique_pct") or 0.0
null_pct = c.get("null_pct") or 0.0
if "possible_id" in flags or (unique_pct >= 0.99 and null_pct == 0):
key_candidates.append(c["name"])
prof["key_candidates"] = key_candidates
# 8) Recalcular type_breakdown tras la promocion.
type_breakdown = {
"numeric": 0,
"categorical": 0,
"datetime": 0,
"text": 0,
"boolean": 0,
}
for c in cols:
it = c.get("inferred_type")
if it in type_breakdown:
type_breakdown[it] += 1
prof["type_breakdown"] = type_breakdown
# 8.5) Matriz de correlacion/asociacion sobre una muestra de filas
# alineadas. Elige la metrica por par de tipos (Pearson/Spearman,
# Cramer's V/Theil's U, correlation ratio, MI) via association_matrix.
# Se salta el text de alta cardinalidad (ids/urls): solo mete ruido.
try:
corr_sample = min(int(sample), 5000)
# Excluye columnas id-like (possible_id / high_cardinality) de tipo
# categorical/text: su cardinalidad ~ n filas infla Cramer's V y MI
# con asociaciones espurias (cada valor unico empareja perfecto).
# Las numericas de alta cardinalidad SI se conservan (p.ej. precios).
def _skip_for_assoc(c):
it = c.get("inferred_type")
flags = c.get("flags") or []
return it in ("categorical", "text") and (
"possible_id" in flags or "high_cardinality" in flags
)
assoc_cols = [c for c in cols if not _skip_for_assoc(c)]
rows = _sample_rows(
_q, table, [c["name"] for c in assoc_cols], corr_sample
)
assoc_input = {}
for c in assoc_cols:
name = c["name"]
it = c.get("inferred_type") or "categorical"
raw = [row.get(name) for row in rows]
if it == "numeric":
assoc_input[name] = {
"values": [_to_float(v) for v in raw],
"type": "numeric",
}
else:
assoc_input[name] = {"values": raw, "type": it}
prof["correlations"] = (
association_matrix(assoc_input) if len(assoc_input) >= 2 else None
)
except Exception: # noqa: BLE001
prof["correlations"] = None
assoc_input = {}
# Modelos baratos opt-in en su PROPIO try: un fallo de los modelos NUNCA
# debe tumbar las correlaciones ya calculadas (bug detectado en EDAs PG
# reales: un try/except compartido ponia ambos campos a None).
if run_models:
try:
prof["models"] = run_eda_models(assoc_input)
except Exception: # noqa: BLE001
prof["models"] = None
# 8.6) Capa LLM opcional: interpreta el perfil ya calculado en UNA
# llamada (data dictionary, resumen, granularidad de fila, PII, limpieza,
# analisis sugeridos). Solo envia el perfil agregado, nunca filas crudas.
if run_llm:
try:
res = eda_llm_insights(prof)
prof["llm"] = res.get("llm") if res.get("status") == "ok" else None
except Exception: # noqa: BLE001
prof["llm"] = None
# 8.7) Analisis de serie temporal opt-in. Para cada columna numerica se
# calcula estacionariedad (ADF+KPSS), autocorrelacion (ACF/PACF) y
# descomposicion STL sobre la secuencia ordenada; si parece de niveles se
# anade la conversion a retornos. Si hay una columna datetime se usa como
# orden cronologico; si no, el orden fisico (columna numerica secuencial).
if run_series:
try:
order_col = next(
(
c.get("name")
for c in cols
if c.get("inferred_type") == "datetime"
),
None,
)
series_map: dict = {}
for col in cols:
if col.get("inferred_type") != "numeric":
continue
try:
sblock = _build_series_block(
_q, table, col, order_col, sample
)
except Exception: # noqa: BLE001
sblock = None
if sblock is not None:
col["series"] = sblock
series_map[col["name"]] = sblock
prof["series"] = series_map or None
except Exception: # noqa: BLE001
prof["series"] = None
# 8.8) Avisos exploratorios: recuerdan que el EDA genera hipotesis, no
# conclusiones. Se calculan sobre el perfil ya completo (correlaciones,
# modelos, outliers, faltantes determinan que advertencias aplican).
try:
prof["caveats"] = exploratory_caveats(prof)
except Exception: # noqa: BLE001
prof["caveats"] = None
# 9) Reports opcionales (markdown + JSON sidecar + PDF movil).
report_md_path = None
report_json_path = None
pdf_path = None
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
if write_report:
os.makedirs(report_dir, exist_ok=True)
report_json_path = os.path.join(report_dir, f"eda_{table}_{ts}.json")
report_md_path = os.path.join(report_dir, f"eda_{table}_{ts}.md")
with open(report_json_path, "w", encoding="utf-8") as fh:
fh.write(json.dumps(prof, ensure_ascii=False, indent=1, default=str))
with open(report_md_path, "w", encoding="utf-8") as fh:
fh.write(render_eda_markdown(prof))
# PDF multipagina vertical (legible en movil), junto al report markdown.
if emit_pdf:
try:
os.makedirs(report_dir, exist_ok=True)
pdf_target = os.path.join(report_dir, f"eda_{table}_{ts}.pdf")
pres = render_eda_pdf(prof, pdf_target)
pdf_path = pres.get("pdf_path")
except Exception: # noqa: BLE001
pdf_path = None
return {
"status": "ok",
"profile": prof,
"report_md_path": report_md_path,
"report_json_path": report_json_path,
"pdf_path": pdf_path,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}