Files
fn_registry/python/functions/pipelines/profile_table.py
T
egutierrez 32c7336bf6 feat(infra): auto-commit con 56 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-21 14:22:55 +02:00

323 lines
13 KiB
Python

"""profile_table — orquestador one-shot del grupo de capacidad `eda`.
Pipeline impuro: perfila UNA tabla DuckDB end-to-end componiendo las funciones
puras e impuras del grupo `eda` y, opcionalmente, escribe un report markdown +
JSON sidecar a disco. Es la composicion canonica para "hazme un EDA de esta
tabla": una sola llamada en vez de orquestar 7 funciones a mano.
Funciones del registry compuestas (NO se reimplementa su logica):
- summarize_table_duckdb : perfil base por columna (push-down SQL, sin RAM).
- duckdb_query_readonly : muestra read-only de valores no nulos por columna.
- infer_semantic_type : clasifica VARCHAR (email, integer, currency, ...).
- describe_numeric : estadistica fina sobre la muestra numerica.
- summarize_categorical : top-k, moda, entropia sobre la muestra categorica.
- column_quality_score : score 0-100 de calidad por columna.
- render_eda_markdown : report legible del TableProfile.
Aporta una capa propia de PROMOCION DE TIPO: muchas tablas guardan numeros y
fechas como VARCHAR. Tras el perfil base, se muestrea cada columna textual, se
infiere su semantic_type y, si encaja, se promociona inferred_type a "numeric"
o "datetime" antes de enriquecer. Asi una columna '10','20' (VARCHAR) recibe su
bloque numeric en vez de quedarse como categorica.
Estilo dict-no-throw del grupo: nunca lanza; captura cualquier error y devuelve
{status:'error', error:str}.
"""
import json
import os
from datetime import datetime, timezone
from datascience import (
association_matrix,
column_quality_score,
describe_numeric,
eda_llm_insights,
infer_semantic_type,
render_eda_markdown,
run_eda_models,
summarize_categorical,
summarize_table_duckdb,
summarize_table_pg,
)
from infra import duckdb_query_readonly, pg_query
# semantic_types que justifican promocionar inferred_type -> "numeric".
_NUMERIC_SEMANTIC = ("integer", "decimal", "currency")
# semantic_types que justifican promocionar inferred_type -> "datetime".
_DATETIME_SEMANTIC = ("datetime_iso", "date_eu")
# Fraccion minima de la muestra que debe parsear a float para confirmar la
# promocion a numeric (evita promocionar columnas mayormente no parseables).
_PROMOTE_MIN_PARSE = 0.8
def _to_float(value):
"""Parsea un valor a float limpiando simbolos de moneda y separadores.
Quita simbolos de divisa (EUR/USD/GBP/€/$/£), espacios y separadores de
miles, y normaliza la coma decimal. Devuelve None si no parsea.
"""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
s = str(value).strip()
if not s:
return None
# Limpia simbolos de moneda y unidades textuales.
for tok in ("", "$", "£", "EUR", "USD", "GBP", "eur", "usd", "gbp"):
s = s.replace(tok, "")
s = s.strip()
# Normaliza separadores: si hay coma y punto, asume punto=miles, coma=decimal.
if "," in s and "." in s:
s = s.replace(".", "").replace(",", ".")
elif "," in s:
# Solo coma: tratar como separador decimal.
s = s.replace(",", ".")
s = s.replace(" ", "")
try:
return float(s)
except (TypeError, ValueError):
return None
def _sample_values(query_fn, table: str, name: str, sample: int) -> list:
"""Trae hasta `sample` valores no nulos de una columna (read-only).
query_fn(sql) -> dict es el lector read-only del backend activo
(duckdb_query_readonly o pg_query), inyectado por profile_table.
"""
q = query_fn(
f'SELECT "{name}" AS v FROM "{table}" WHERE "{name}" IS NOT NULL '
f"LIMIT {int(sample)}",
)
if q.get("status") != "ok":
return []
return [row.get("v") for row in q.get("rows", [])]
def _sample_rows(query_fn, table: str, names: list, sample: int) -> list:
"""Trae hasta `sample` filas completas con las columnas alineadas por fila.
A diferencia de _sample_values (una columna, solo no nulos), esto preserva la
alineacion por fila entre columnas, requisito de la matriz de asociacion
(los pares (a_i, b_i) deben venir de la misma fila). query_fn es el lector
read-only del backend activo, inyectado por profile_table.
"""
if not names:
return []
cols_sql = ", ".join(f'"{n}"' for n in names)
q = query_fn(f'SELECT {cols_sql} FROM "{table}" LIMIT {int(sample)}')
if q.get("status") != "ok":
return []
return q.get("rows", [])
def profile_table(
db_path: str,
table: str,
backend: str = "duckdb",
sample: int = 5000,
run_models: bool = False,
run_llm: bool = False,
report_dir: str = "reports",
write_report: bool = True,
) -> dict:
"""Perfila una tabla (DuckDB o PostgreSQL) end-to-end y emite el TableProfile.
Args:
db_path: ruta al archivo DuckDB, o DSN PostgreSQL si backend="postgres".
table: nombre de la tabla a perfilar.
backend: "duckdb" (default) o "postgres". Selecciona el motor de
perfilado base (summarize) y de muestreo read-only.
sample: maximo de valores no nulos muestreados por columna para el
enriquecimiento (describe_numeric / summarize_categorical /
infer_semantic_type). Default 5000.
report_dir: directorio donde escribir los reports si write_report.
Default "reports". Se crea si no existe.
write_report: si True (default), escribe un report markdown + un JSON
sidecar timestamped en report_dir. Si False, no toca disco y los
paths del retorno son None.
Returns:
dict. En exito: {status:'ok', profile: <TableProfile>,
report_md_path: str|None, report_json_path: str|None}. En error (sin
lanzar): {status:'error', error:str}.
"""
try:
# 1) Perfil base por columna (push-down SQL) + lector read-only del
# backend activo, inyectado en el muestreo (_sample_values/_sample_rows).
if backend == "postgres":
r = summarize_table_pg(db_path, table)
def _q(sql):
return pg_query(db_path, sql)
elif backend == "duckdb":
r = summarize_table_duckdb(db_path, table)
def _q(sql):
return duckdb_query_readonly(db_path, sql)
else:
return {"status": "error", "error": f"backend desconocido: {backend}"}
if r.get("status") != "ok":
return {"status": "error", "error": r.get("error", "summarize failed")}
prof = r["profile"]
cols = prof.get("columns", [])
for col in cols:
name = col.get("name")
inferred = col.get("inferred_type")
# 2) Muestra de valores no nulos.
vals = _sample_values(_q, table, name, sample)
# 3) Promocion de tipo sobre columnas textuales.
if inferred in ("categorical", "text"):
sem = infer_semantic_type(vals)
semantic = sem.get("semantic_type", "")
col["semantic_type"] = semantic
if semantic in _NUMERIC_SEMANTIC:
parsed = [_to_float(v) for v in vals]
ok = [f for f in parsed if f is not None]
if vals and (len(ok) / len(vals)) >= _PROMOTE_MIN_PARSE:
col["inferred_type"] = "numeric"
inferred = "numeric"
elif semantic in _DATETIME_SEMANTIC:
col["inferred_type"] = "datetime"
inferred = "datetime"
# 4) Enriquecer segun el inferred_type final.
if inferred == "numeric":
vals_float = [f for f in (_to_float(v) for v in vals) if f is not None]
col["numeric"] = describe_numeric(vals_float)
elif inferred in ("categorical", "text"):
col["categorical"] = summarize_categorical(vals)
# Para columnas no promovidas que ya eran categorical/text y no
# habian pasado por infer arriba, asegurar semantic_type seteado.
if not col.get("semantic_type"):
col["semantic_type"] = infer_semantic_type(vals).get(
"semantic_type", ""
)
elif inferred == "datetime":
# profile_datetime llega en otra fase; conserva semantic_type.
col["datetime"] = None
# 5) Score de calidad por columna.
col["quality_score"] = column_quality_score(col).get("score")
# 6) Score agregado de la tabla (media de columnas).
scores = [
c["quality_score"] for c in cols if c.get("quality_score") is not None
]
prof["quality_score"] = round(sum(scores) / len(scores), 1) if scores else None
# 7) Candidatos a clave.
key_candidates = []
for c in cols:
flags = c.get("flags") or []
unique_pct = c.get("unique_pct") or 0.0
null_pct = c.get("null_pct") or 0.0
if "possible_id" in flags or (unique_pct >= 0.99 and null_pct == 0):
key_candidates.append(c["name"])
prof["key_candidates"] = key_candidates
# 8) Recalcular type_breakdown tras la promocion.
type_breakdown = {
"numeric": 0,
"categorical": 0,
"datetime": 0,
"text": 0,
"boolean": 0,
}
for c in cols:
it = c.get("inferred_type")
if it in type_breakdown:
type_breakdown[it] += 1
prof["type_breakdown"] = type_breakdown
# 8.5) Matriz de correlacion/asociacion sobre una muestra de filas
# alineadas. Elige la metrica por par de tipos (Pearson/Spearman,
# Cramer's V/Theil's U, correlation ratio, MI) via association_matrix.
# Se salta el text de alta cardinalidad (ids/urls): solo mete ruido.
try:
corr_sample = min(int(sample), 5000)
# Excluye columnas id-like (possible_id / high_cardinality) de tipo
# categorical/text: su cardinalidad ~ n filas infla Cramer's V y MI
# con asociaciones espurias (cada valor unico empareja perfecto).
# Las numericas de alta cardinalidad SI se conservan (p.ej. precios).
def _skip_for_assoc(c):
it = c.get("inferred_type")
flags = c.get("flags") or []
return it in ("categorical", "text") and (
"possible_id" in flags or "high_cardinality" in flags
)
assoc_cols = [c for c in cols if not _skip_for_assoc(c)]
rows = _sample_rows(
_q, table, [c["name"] for c in assoc_cols], corr_sample
)
assoc_input = {}
for c in assoc_cols:
name = c["name"]
it = c.get("inferred_type") or "categorical"
raw = [row.get(name) for row in rows]
if it == "numeric":
assoc_input[name] = {
"values": [_to_float(v) for v in raw],
"type": "numeric",
}
else:
assoc_input[name] = {"values": raw, "type": it}
prof["correlations"] = (
association_matrix(assoc_input) if len(assoc_input) >= 2 else None
)
except Exception: # noqa: BLE001
prof["correlations"] = None
assoc_input = {}
# Modelos baratos opt-in en su PROPIO try: un fallo de los modelos NUNCA
# debe tumbar las correlaciones ya calculadas (bug detectado en EDAs PG
# reales: un try/except compartido ponia ambos campos a None).
if run_models:
try:
prof["models"] = run_eda_models(assoc_input)
except Exception: # noqa: BLE001
prof["models"] = None
# 8.6) Capa LLM opcional: interpreta el perfil ya calculado en UNA
# llamada (data dictionary, resumen, granularidad de fila, PII, limpieza,
# analisis sugeridos). Solo envia el perfil agregado, nunca filas crudas.
if run_llm:
try:
res = eda_llm_insights(prof)
prof["llm"] = res.get("llm") if res.get("status") == "ok" else None
except Exception: # noqa: BLE001
prof["llm"] = None
# 9) Reports opcionales.
report_md_path = None
report_json_path = None
if write_report:
os.makedirs(report_dir, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
report_json_path = os.path.join(report_dir, f"eda_{table}_{ts}.json")
report_md_path = os.path.join(report_dir, f"eda_{table}_{ts}.md")
with open(report_json_path, "w", encoding="utf-8") as fh:
fh.write(json.dumps(prof, ensure_ascii=False, indent=1, default=str))
with open(report_md_path, "w", encoding="utf-8") as fh:
fh.write(render_eda_markdown(prof))
return {
"status": "ok",
"profile": prof,
"report_md_path": report_md_path,
"report_json_path": report_json_path,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}