fix(eda): hallazgos de comportamiento del benchmark (H2,H3,H6,H7,H8,H10,H11)

Ronda 4 (verificada con re-corrida sobre los datasets afectados):
- H2: stl_decompose deriva periodo de la frecuencia del indice (seattle period=365
  seasonal_strength=0.84; fin del period=2 espurio)
- H3+H10: infer_fk por senal de nombre (<X>Id->X.<X>Id) + excluir no-clave -> chinook
  111->9 FK, todas reales, cero absurdas, 16-27x mas rapido; base intacta (flag off->111)
- H6: association no computa eta2 si cardinalidad~=n (Ticket-Fare espurio fuera)
- H7: id secuencial monotono excluido de correlacion y PCA/KMeans (PassengerId fuera)
- H8: correlacion de series no estacionarias marcada espuria / sobre retornos
- H11: distribution_type usa modos/cardinalidad/normalidad (quality->discrete)
- 66 tests verdes

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Egutierrez
2026-06-29 06:37:47 +02:00
parent c4cff5ed5b
commit e142ef026d
12 changed files with 1028 additions and 36 deletions
@@ -223,15 +223,30 @@ def association_matrix(
)
def _skip(name: str) -> bool:
"""True si la columna no aporta asociacion util (pocos validos o text ruidoso)."""
"""True si la columna no aporta asociacion util (pocos validos, datetime o cat casi-unica)."""
col = columns[name]
vals = col.get("values", [])
ctype = col.get("type", "categorical")
numeric = _is_numeric_type(ctype)
if _valid_count(vals, numeric) < 3:
nvalid = _valid_count(vals, numeric)
if nvalid < 3:
return True
# Texto de cardinalidad ~ n: identificadores/free-text, sin asociacion util.
if ctype == "text" and n_rows > 0 and _cardinality(vals) >= 0.9 * n_rows:
if numeric:
return False
# Datetime: indice temporal unico-ish por fila. Como categorica da
# correlation_ratio (eta) ~= 1 trivial frente a cualquier numerica (cada
# fecha es su propio grupo de un solo valor) y Cramer's V / MI inflados.
# La estacionalidad/tendencia se analizan en el bloque de series, no aqui.
if ctype == "datetime":
return True
# Grupos casi singleton: si el tamano medio de grupo (valores presentes /
# cardinalidad) es < 1.5, la varianza intra-grupo ~= 0 y correlation_ratio
# sale ~= 1 por artefacto determinista (no por azar: el FDR no protege).
# Cubre ids/free-text (Ticket: 681 distintos sobre 891) y categoricas
# dispersas con muchos nulos (Cabin: 147 distintos sobre 204 presentes).
# Se mide sobre valores PRESENTES, no sobre n_rows, para captar las dispersas.
card = _cardinality(vals)
if card >= 2 and (nvalid / card) < 1.5:
return True
return False
@@ -156,3 +156,65 @@ def test_bonferroni_method_is_accepted():
assert result["multiple_testing"]["method"] == "bonferroni"
pair = _find_pair(result["pairs"], "size", "price")
assert pair["p_value_adjusted"] is not None
# --- H6: correlation_ratio espurio por cardinalidad casi-unica ---------------
def test_h6_categorica_casi_unica_excluida():
# Una categorica con cardinalidad ~ n (id/free-text como Ticket) hace que cada
# grupo tenga un solo valor -> varianza intra-grupo ~= 0 -> correlation_ratio
# = 1 trivial. No debe aparecer ni evaluado ni como par fuerte.
n = 60
columns = {
"ticket": {"values": [f"T{i}" for i in range(n)], "type": "categorical"},
"fare": {"values": [float(i) * 1.3 for i in range(n)], "type": "numeric"},
}
result = association_matrix(columns)
assert _find_pair(result["pairs"], "ticket", "fare") is None
assert _find_pair(result["strong"], "ticket", "fare") is None
def test_h6_categorica_dispersa_con_nulos_excluida():
# Categorica dispersa con muchos None (como Cabin: 147 distintos sobre 204
# presentes): los pocos presentes son casi todos distintos -> grupos singleton.
# Se mide sobre valores PRESENTES, no sobre n filas, para captarla.
vals = [f"C{i}" if i % 4 == 0 else None for i in range(80)] # ~20 presentes, distintos
columns = {
"cabin": {"values": vals, "type": "categorical"},
"fare": {"values": [float(i) for i in range(80)], "type": "numeric"},
}
result = association_matrix(columns)
assert _find_pair(result["pairs"], "cabin", "fare") is None
def test_h6_datetime_excluido_de_pares():
# Datetime es indice unico-ish por fila -> correlation_ratio = 1 espurio contra
# cualquier numerica. Se excluye de los pares de asociacion (las series se
# analizan aparte, no aqui).
columns = {
"date": {
"values": [f"2020-01-{i + 1:02d}" for i in range(10)],
"type": "datetime",
},
"value": {"values": [float(i) for i in range(10)], "type": "numeric"},
}
result = association_matrix(columns)
assert _find_pair(result["pairs"], "date", "value") is None
def test_h6_categorica_legitima_se_conserva():
# Edge anti-sobrefiltrado: una categorica de baja cardinalidad (grupos grandes,
# tamano medio >= 1.5) SIGUE evaluandose y su asociacion fuerte se conserva.
columns = {
"region": {
"values": ["N", "N", "S", "S", "E", "E", "W", "W"],
"type": "categorical",
},
"score": {
"values": [10.0, 11.0, 50.0, 49.0, 90.0, 91.0, 30.0, 31.0],
"type": "numeric",
},
}
result = association_matrix(columns)
assert _find_pair(result["pairs"], "region", "score") is not None
assert _find_pair(result["strong"], "region", "score") is not None
@@ -4,11 +4,11 @@ name: detect_distribution_type
kind: function
lang: py
domain: datascience
version: "1.0.0"
version: "1.1.0"
purity: pure
signature: "def detect_distribution_type(values: list[float]) -> dict"
description: "Classifies the shape of a numeric distribution using skewness, excess kurtosis, tail ratio and log-skewness. Returns a type label and raw stats."
tags: [statistics, distribution, classification, skewness, kurtosis, pendiente-usar]
description: "Classifies the shape of a numeric distribution using cardinality (distinct values), number of prominent modes, skewness, excess kurtosis, tail ratio and log-skewness. Returns a type label and raw stats. Discrete/ordinal and multimodal columns are detected before the symmetric normal-ish test so they are never mislabeled normal."
tags: [statistics, distribution, classification, skewness, kurtosis, multimodal, cardinality, eda]
uses_functions: []
uses_types: []
returns: []
@@ -27,15 +27,21 @@ tests:
- "test_detect_right_skewed"
- "test_detect_stats_keys"
- "test_detect_exactly_30"
- "test_detect_discrete_low_cardinality"
- "test_detect_multimodal"
- "test_detect_normal_still_normal_after_fix"
- "test_detect_stats_has_new_keys"
- "test_detect_unimodal_skewed_not_multimodal"
test_file_path: "python/functions/datascience/tests/test_detect_distribution_type.py"
file_path: "python/functions/datascience/detect_distribution_type.py"
params:
- name: values
desc: "List of numeric values to classify. Minimum 30 for meaningful classification."
output: >
Dict with "type" (str) and "stats" (dict). Type is one of: normal-ish,
lognormal-ish, heavy-tail, right-skewed, left-skewed, other, too_few_samples.
Stats contains: n, skew, kurtosis, tail_ratio, log_skew.
Dict with "type" (str) and "stats" (dict). Type is one of: discrete,
multimodal, heavy-tail, normal-ish, lognormal-ish, right-skewed, left-skewed,
other, too_few_samples. Stats contains: n, skew, kurtosis, tail_ratio,
log_skew, n_unique, n_modes, jb_stat, jb_pvalue.
source_repo: "internal:footprint_aurgi"
source_license: "internal-aurgi"
source_file: "aurgi_mapas/generar_pdf_reporte.py:133"
@@ -56,8 +62,14 @@ detect_distribution_type([1]*5)
## Logica de clasificacion
El orden importa: cardinalidad y modalidad se evaluan **antes** del test simetrico
`normal-ish`, para que una columna discreta/ordinal o multimodal nunca se etiquete
"normal" solo porque su skewness sea pequena.
- n < 30 → too_few_samples
- n_unique <= 15 → discrete (ordinal / counts de pocos niveles)
- excess kurtosis > 3 → heavy-tail
- n >= 100 AND n_modes >= 2 → multimodal
- |skew| <= 0.5 AND |kurt| <= 1 → normal-ish
- skew > 0.5 AND log_skew cerca de 0 AND tail_ratio > 2 → lognormal-ish
- skew > 0.5 → right-skewed
@@ -65,3 +77,35 @@ detect_distribution_type([1]*5)
- default → other
tail_ratio = p99/p50; log_skew calculado solo si hay >= 30 positivos.
`n_modes` cuenta picos prominentes de un histograma suavizado (~sqrt(n) bins,
suavizado triangular) separados por un valle profundo (cae por debajo del 60% del
pico menor). Esto evita modos espurios por ruido en continuas unimodales sesgadas.
## Cuando usarla
Cuando perfiles una columna numerica y quieras saber su forma para elegir el
estadistico/visualizacion adecuados (media+desv vs mediana+IQR, histograma vs
boxplot). Distingue discretas/ordinales y multimodales que un criterio por-skew
confunde con normales.
## Gotchas
- **Jarque-Bera NO es gate de `normal-ish`.** `jb_stat`/`jb_pvalue` se reportan en
`stats` como senal diagnostica, pero con n grande Jarque-Bera rechaza normalidad
para columnas perfectamente acampanadas (p.ej. pH o density del vino, n~1600,
jb_p≈0 pese a ser normal-ish). Usarlo como umbral duro produce falsos negativos
masivos. La robustez ante el tamano muestral la dan cardinalidad y modalidad.
- El umbral `n_unique <= 15` etiqueta como `discrete` cualquier continua con muy
pocos valores distintos: eso es correcto (es discreta/ordinal de facto), no un
falso positivo.
- `multimodal` solo se evalua con `n >= 100`; por debajo el histograma es demasiado
ruidoso para afirmar multimodalidad y se cae a la logica de skew/kurt.
## Capability growth log
- v1.1.0 (2026-06-29) — H11: anade deteccion de cardinalidad (`discrete`) y
modalidad (`multimodal`) antes del test `normal-ish`, mas `n_unique`, `n_modes`,
`jb_stat`, `jb_pvalue` en stats. Corrige falsos "normal-ish" en discretas/ordinales
(wine `quality`) y multimodales (precios BTC). Retrocompatible: continuas normales,
sesgadas y heavy-tail no cambian.
@@ -7,9 +7,26 @@ import numpy as np
def detect_distribution_type(values: list[float]) -> dict:
"""Classify the distribution shape of a numeric sample.
Uses skewness, excess kurtosis, tail ratio (p99/p50), and log-skewness
to assign one of: normal-ish, lognormal-ish, heavy-tail, right-skewed,
left-skewed, other, or too_few_samples (n < 30).
Uses cardinality (number of distinct values), number of prominent modes,
skewness, excess kurtosis, tail ratio (p99/p50) and log-skewness to assign
one of: discrete, multimodal, heavy-tail, normal-ish, lognormal-ish,
right-skewed, left-skewed, other, or too_few_samples (n < 30).
A skew-only criterion mislabels discrete/ordinal and multimodal columns as
"normal-ish" (e.g. a 6-level rating, or multimodal asset prices whose
skewness happens to be small). To avoid that, cardinality and modality are
checked *before* the symmetric normal-ish test:
* ``n_unique <= 15`` -> "discrete" (ordinal / low-cardinality counts).
* ``n_modes >= 2`` (with ``n >= 100``) -> "multimodal".
The Jarque-Bera statistic and its p-value are computed from the already
available skewness and excess kurtosis and reported in ``stats`` as a
diagnostic signal. It is deliberately NOT used as a hard gate for the
"normal-ish" label: with large samples Jarque-Bera rejects normality for
trivially non-normal but perfectly bell-shaped columns, which would produce
massive false negatives. Cardinality and modality, by contrast, are robust
to sample size.
Args:
values: List of numeric values.
@@ -17,7 +34,8 @@ def detect_distribution_type(values: list[float]) -> dict:
Returns:
Dict with keys:
"type" (str): distribution label.
"stats" (dict): {"n", "skew", "kurtosis", "tail_ratio", "log_skew"}.
"stats" (dict): {"n", "skew", "kurtosis", "tail_ratio", "log_skew",
"n_unique", "n_modes", "jb_stat", "jb_pvalue"}.
"""
n = len(values)
if n < 30:
@@ -58,17 +76,37 @@ def detect_distribution_type(values: list[float]) -> dict:
else:
log_skew = math.nan
# Cardinality and modality (robust to sample size).
n_unique = int(np.unique(arr).size)
n_modes = _count_modes(arr)
# Jarque-Bera statistic from the moments already computed. Under the null
# of normality it follows a chi-squared distribution with 2 degrees of
# freedom, whose survival function is exp(-x / 2).
jb_stat = n / 6.0 * (skew ** 2 + (kurt ** 2) / 4.0)
jb_pvalue = math.exp(-jb_stat / 2.0)
stats = {
"n": n,
"skew": skew,
"kurtosis": kurt,
"tail_ratio": tail_ratio,
"log_skew": log_skew,
"n_unique": n_unique,
"n_modes": n_modes,
"jb_stat": jb_stat,
"jb_pvalue": jb_pvalue,
}
# Classification logic
if kurt > 3.0:
# Classification logic. Cardinality and modality come first so a discrete or
# multimodal column is never mislabeled "normal-ish" on the basis of a small
# skewness alone.
if n_unique <= 15:
dist_type = "discrete"
elif kurt > 3.0:
dist_type = "heavy-tail"
elif n >= 100 and n_modes >= 2:
dist_type = "multimodal"
elif abs(skew) <= 0.5 and abs(kurt) <= 1.0:
dist_type = "normal-ish"
elif (
@@ -87,3 +125,58 @@ def detect_distribution_type(values: list[float]) -> dict:
dist_type = "other"
return {"type": dist_type, "stats": stats}
def _count_modes(values, prom_frac: float = 0.15, valley_frac: float = 0.6) -> int:
"""Count prominent modes separated by deep valleys in a histogram.
A naive local-maximum count over a raw histogram is dominated by sampling
noise, so this:
1. Bins the data into ~sqrt(n) bins and applies a light triangular smooth.
2. Keeps local maxima taller than ``prom_frac`` of the global peak.
3. Merges two adjacent peaks unless the lowest point between them falls
below ``valley_frac`` of the smaller peak (a genuine separating valley).
Args:
values: Numeric numpy array.
prom_frac: Minimum peak height as a fraction of the tallest peak.
valley_frac: Two peaks count as distinct modes only if the valley
between them dips below this fraction of the smaller peak.
Returns:
Number of distinct modes (0 for an empty/degenerate sample).
"""
arr = np.asarray(values, dtype=float)
n = arr.size
if n == 0:
return 0
n_bins = max(10, min(50, int(round(math.sqrt(n)))))
counts, _ = np.histogram(arr, bins=n_bins)
kernel = np.array([1.0, 2.0, 1.0])
kernel /= kernel.sum()
smooth = np.convolve(counts.astype(float), kernel, mode="same")
peak_global = float(smooth.max())
if peak_global <= 0:
return 0
threshold = peak_global * prom_frac
peaks = []
for i in range(len(smooth)):
left = smooth[i - 1] if i > 0 else -1.0
right = smooth[i + 1] if i < len(smooth) - 1 else -1.0
if smooth[i] >= threshold and smooth[i] > left and smooth[i] >= right:
peaks.append(i)
if len(peaks) <= 1:
return len(peaks)
kept = [peaks[0]]
for p in peaks[1:]:
prev = kept[-1]
valley = float(smooth[prev:p + 1].min())
if valley <= valley_frac * min(smooth[prev], smooth[p]):
kept.append(p) # separated by a deep valley -> distinct mode
elif smooth[p] > smooth[prev]:
kept[-1] = p # same mode, keep the taller peak
return len(kept)
@@ -3,10 +3,10 @@ name: infer_fk_containment_duckdb
kind: function
lang: py
domain: datascience
version: "1.0.0"
version: "1.1.0"
purity: impure
signature: "def infer_fk_containment_duckdb(db_path: str, tables: list = None, min_inclusion: float = 0.9, max_card: int = 200000) -> dict"
description: "Infiere FOREIGN KEYs candidatas entre tablas DuckDB por containment de valores: para un par (col A de T1, col B de T2), inclusion(A subseteq B) = |distinct(A) interseccion distinct(B)| / |distinct(A)|; si inclusion >= min_inclusion y B parece clave (distinct/count >= 0.95) entonces A -> B es FK candidata. Poda por tipo base y push-down SQL (COUNT DISTINCT / INTERSECT) sin traer filas a RAM. Parte del grupo eda (relaciones inter-tabla)."
signature: "def infer_fk_containment_duckdb(db_path: str, tables: list = None, min_inclusion: float = 0.9, max_card: int = 200000, require_name_signal: bool = True) -> dict"
description: "Infiere FOREIGN KEYs candidatas entre tablas DuckDB combinando SEÑAL DE NOMBRE y containment de valores: exige primero que el origen nombre/contenga la tabla destino (patron <X>Id -> X.<X>Id / <x>_id -> x) y que el destino sea su PK nombrada, excluyendo PKs propias y columnas de medida como origen; luego confirma con inclusion(A subseteq B) = |distinct(A) interseccion distinct(B)| / |distinct(A)| >= min_inclusion y B key-ish (distinct/count >= 0.95). El filtro de nombre va ANTES del INTERSECT: mata el 10-20x de falsos positivos de la contencion pura y acelera (menos pares). Degrada a contencion pura si el esquema no usa convencion de nombres. Poda por tipo base y push-down SQL sin traer filas a RAM. Parte del grupo eda (relaciones inter-tabla)."
tags: [eda, relations, duckdb, foreign-key, schema-inference, datascience, exploratory-data-analysis]
params:
- name: db_path
@@ -17,7 +17,9 @@ params:
desc: "Umbral minimo de inclusion (0-1) para emitir una FK candidata. inclusion(A subseteq B) = |distinct(A) interseccion distinct(B)| / |distinct(A)|. Default 0.9."
- name: max_card
desc: "Tope de filas en la tabla destino (lado B, el caro del INTERSECT). Si count(T2) > max_card, los pares hacia T2 se saltan para no disparar un INTERSECT gigante; se acumula una nota en skipped[]. Default 200000."
output: "dict dict-no-throw. En exito {status:'ok', fk_candidates:[{from_table, from_col, to_table, to_col, inclusion, cardinality, to_is_key}, ...], tables:[str], skipped:[str]} con fk_candidates ordenado por inclusion descendente; cardinality es '1:1' (A casi unica en T1) o 'N:1' (A se repite, apunta a la key de T2). En error {status:'error', error:str}."
- name: require_name_signal
desc: "Si True (default) exige señal de nombre ademas de contencion: el origen debe nombrar/contener la tabla destino y NO ser la PK de su propia tabla; el destino debe ser su PK nombrada (o `id`). Filtra los pares ANTES del INTERSECT (precision + velocidad, issues H3+H10). Degrada automaticamente a contencion pura si el esquema no usa convencion de nombres de clave (ninguna columna `...id`). Con False nunca se exige señal (comportamiento historico de contencion pura)."
output: "dict dict-no-throw. En exito {status:'ok', fk_candidates:[{from_table, from_col, to_table, to_col, inclusion, cardinality, to_is_key, name_match}, ...], tables:[str], skipped:[str], name_signal_enforced:bool} con fk_candidates ordenado por (name_match, inclusion) descendente; name_match indica si la candidata tiene señal de nombre; name_signal_enforced indica si el filtro de nombre se aplico (False si se degrado a contencion pura); cardinality es '1:1' (A casi unica en T1) o 'N:1' (A se repite, apunta a la key de T2). En error {status:'error', error:str}."
uses_functions: [duckdb_list_tables_py_infra, duckdb_table_schema_py_infra, duckdb_query_readonly_py_infra]
uses_types: []
returns: []
@@ -25,7 +27,7 @@ returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_detecta_fk_orders_customer_id", "test_shape_resultado", "test_no_inventa_fk_columnas_no_relacionadas", "test_no_fk_entre_tipos_incompatibles", "test_min_inclusion_alto_filtra", "test_subset_explicito_de_tablas", "test_db_inexistente_devuelve_error", "test_tabla_invalida_devuelve_error"]
tests: ["test_detecta_fk_orders_customer_id", "test_shape_resultado", "test_no_inventa_fk_columnas_no_relacionadas", "test_no_fk_entre_tipos_incompatibles", "test_min_inclusion_alto_filtra", "test_subset_explicito_de_tablas", "test_db_inexistente_devuelve_error", "test_tabla_invalida_devuelve_error", "test_name_signal_helpers", "test_conserva_fk_reales_con_nombre", "test_excluye_medida_y_pk_como_origen", "test_degrada_a_contencion_sin_pistas_de_nombre", "test_require_name_signal_false_es_historico", "test_flujo_materializado_create_table_as_no_vacia"]
test_file_path: "python/functions/datascience/infer_fk_containment_duckdb_test.py"
file_path: "python/functions/datascience/infer_fk_containment_duckdb.py"
---
@@ -71,6 +73,8 @@ else:
- **Impura**: lee de disco via las primitivas read-only del grupo `duckdb` (no crea ni modifica la base). El `db_path` debe existir.
- **Coste O(pares podados)**: el numero de comparaciones es O(tablas^2 x columnas^2) ANTES de la poda. La poda por tipo base (solo se comparan columnas de la misma clase: ambos enteros, ambos varchar, ...) recorta drasticamente ese espacio, pero en esquemas con muchas tablas y columnas del mismo tipo puede seguir siendo costoso. Cada par evaluado dispara un `INTERSECT` en el motor.
- **`INTERSECT` puede ser caro en tablas enormes**: por eso `max_card` (default 200000) limita el lado destino. Si `count(T2) > max_card`, los pares hacia T2 se saltan y se anota en `skipped[]`. Sube `max_card` con cuidado: el INTERSECT materializa los distintos de ambos lados.
- **Señal de nombre obligatoria por defecto (`require_name_signal=True`)**: para emitir una candidata, el origen debe NOMBRAR o CONTENER la tabla destino (`AlbumId -> Album`, `customer_id -> customers`, `manager_staff_id -> staff`) y el destino debe ser su PK nombrada (o `id`). Esto mata el grueso de falsos de la contencion pura (sin el filtro, chinook daba 111 candidatas vs 9 reales; sakila 565 vs ~21). Limite: una FK con **nombre divergente** que no contiene la tabla destino (p.ej. `Customer.SupportRepId -> Employee.EmployeeId`) NO es alcanzable por nombre y se pierde; y las **self-FK** (`Employee.ReportsTo -> Employee`) nunca se infieren (la funcion exige T1 != T2). Si tu esquema usa convencion de nombres pero tiene FK con columnas que no terminan en `id`, esas tambien se pierden en modo enforce.
- **Degrada a contencion pura sin convencion de nombres**: si NINGUNA columna del esquema termina en `id`, no se exige señal y se vuelve al comportamiento historico de solo-contencion (`name_signal_enforced=False` en el retorno). Tambien puedes forzarlo con `require_name_signal=False`.
- **Containment != FK declarada**: que A este contenido en B (con B key-ish) es una FK *probable*, no una garantia. Una columna puede estar contenida por coincidencia (rangos pequenos de enteros, banderas, fechas solapadas) sin ser una relacion real. Revisa siempre las candidatas; trata `inclusion` y `cardinality` como senales, no como verdad.
- **Entero y float NO se mezclan**: la poda por tipo pone INTEGER/BIGINT/... en la clase `integer` y FLOAT/DOUBLE/DECIMAL en `float`, y solo empareja columnas de la misma clase. Una FK entera contra una columna float casi nunca es real, asi que se descarta de entrada.
- **Solo esquema `main`** cuando `tables=None`: hereda el alcance de `duckdb_list_tables` (esquema `main`).
@@ -101,6 +105,30 @@ filas a RAM. Los `count(*)` por tabla y los `distinct` por columna se cachean pa
no recomputarlos entre pares.
```text
fk_candidate = {
from_table, from_col, to_table, to_col, inclusion, cardinality, to_is_key
from_table, from_col, to_table, to_col, inclusion, cardinality, to_is_key,
name_match
}
```
Antes del criterio de containment (pasos 1-5), cuando `require_name_signal=True` y
el esquema usa convencion de nombres, se aplica un filtro de SEÑAL DE NOMBRE que
recorta los pares evaluados (por eso baja tambien el coste, issue H10):
0a. El origen no puede ser la PK de su propia tabla, detectada SOLO por NOMBRE: el
stem de la columna casa con el nombre de la tabla (`Genre.GenreId`, `film.film_id`)
o es el generico `id`. NO se usa la PRIMARY KEY declarada — asi funciona sobre
tablas materializadas con `CREATE TABLE AS` (sin PK), donde `Track.AlbumId`
(stem 'album' != tabla 'track') NO es PK propia y se conserva como FK.
0b. El destino debe ser la PK nombrada de su tabla: `to_col` nombra `to_table`
(`store_id` en store) o es el generico `id`.
0c. El origen debe nombrar la tabla destino: su stem casa con `to_table`
(`<X>Id -> X`) o la contiene como subcadena (`manager_staff_id -> staff`).
## Capability growth log
- v1.1.0 (2026-06-29) — añade `require_name_signal` (default True): filtro de señal
de nombre ANTES del containment. Corrige falsos positivos masivos de la
inferencia por sola contencion (chinook 111->9, sakila 565->21) y acelera (chinook
6.8s->0.4s, sakila 23.4s->0.9s). Issues H3 + H10 del benchmark EDA. Retrocompatible:
degrada a contencion pura si el esquema no usa convencion de nombres de clave.
Nuevos campos en el retorno: `name_match` por candidata y `name_signal_enforced`.
@@ -94,6 +94,119 @@ def _valid_idents(*names) -> bool:
return all(isinstance(n, str) and _IDENT_RE.match(n) for n in names)
# --- Señal de nombre (precisión de FK, issues H3 + H10) -----------------------
# La contención de valores por si sola produce 10-20x falsos positivos: cualquier
# clave entera pequeña (1..N) esta contenida en cualquier clave mas grande, y las
# columnas de medida (cantidades, importes) caen dentro del rango de los ids. La
# señal mas fuerte de una FK real es el NOMBRE de la columna: el patron canonico
# `<X>Id -> <X>.<X>Id` (PascalCase de chinook) o `<x>_id -> <x>.<x>_id` (snake_case
# de sakila). Filtrar candidatos por nombre ANTES del INTERSECT corrige precision
# y rendimiento a la vez (menos pares que evaluar).
def _norm(s) -> str:
"""Normaliza un identificador: minusculas, solo [a-z0-9] (quita `_`, espacios)."""
return re.sub(r"[^a-z0-9]", "", str(s).lower())
def _singular(s: str) -> str:
"""Singular ingles aproximado (KISS): customers->customer, cities->city.
Heuristica suficiente para casar columna `<x>_id` con tabla `<x>s`/`<x>`.
"""
if len(s) > 4 and s.endswith("ies"):
return s[:-3] + "y"
if len(s) > 3 and s.endswith("s") and not s.endswith("ss"):
return s[:-1]
return s
def _ends_id(norm: str) -> bool:
"""True si el nombre normalizado termina en `id` (incluye el propio `id`)."""
return norm.endswith("id") and norm != ""
def _id_stem(norm: str) -> str:
"""Quita el sufijo `id` de un nombre ya normalizado: albumid->album, id->''."""
return norm[:-2] if norm.endswith("id") else norm
def _name_eq(a, b) -> bool:
"""Igualdad de nombres tolerante a singular/plural (normaliza ambos lados)."""
na, nb = _norm(a), _norm(b)
if not na or not nb:
return False
return _singular(na) == _singular(nb)
def _col_is_own_pk(col: str, table: str) -> bool:
"""True si `col` parece la PRIMARY KEY de su propia `table` por nombre.
Una PK no es origen de FK en un esquema normal: `Genre.GenreId` referencia a
su propia tabla, no a otra. Casos: `GenreId` en Genre, `film_id` en film, o el
generico `id`. Esto impide que las PKs pequeñas (1..N), contenidas por
construccion en cualquier clave mayor, se emitan como FK absurdas (origen).
"""
nc = _norm(col)
if nc == "id":
return True
if _ends_id(nc) and _name_eq(_id_stem(nc), table):
return True
return False
def _name_signal(from_col: str, to_table: str, to_col: str) -> bool:
"""True si (from_col -> to_table.to_col) tiene señal de nombre de FK real.
Dos condiciones, AMBAS necesarias:
1. El DESTINO es la PK nombrada de su tabla: `to_col` nombra `to_table`
(`store_id` en store, `AlbumId` en Album) o es el generico `id`. Esto ancla
el destino a una clave real, no a una columna cualquiera de la tabla.
2. El ORIGEN apunta a esa tabla por su nombre: el stem de `from_col` casa con
`to_table` (`<X>Id -> X`, `<x>_id -> x`) o lo CONTIENE como subcadena
(`manager_staff_id -> staff`). El origen debe terminar en `id`.
Mata el grueso de falsos de la contencion pura: `ArtistId -> Invoice.InvoiceId`
falla porque "artist" no nombra ni contiene "invoice"; `Quantity -> AlbumId`
falla porque "quantity" no termina en id. Conserva las FK reales con nombre que
casa (`Track.AlbumId -> Album.AlbumId`). Limite conocido: FK con nombre
divergente que no contiene la tabla destino (`Customer.SupportRepId ->
Employee.EmployeeId`) no son alcanzables por nombre.
"""
nfc = _norm(from_col)
if not _ends_id(nfc):
return False
ntc = _norm(to_col)
# (1) destino = PK nombrada del to_table, o `id` generico.
to_col_ok = (_ends_id(ntc) and _name_eq(_id_stem(ntc), to_table)) or ntc == "id"
if not to_col_ok:
return False
# (2) origen nombra (o contiene) la tabla destino.
fstem = _id_stem(nfc)
if _name_eq(fstem, to_table):
return True
sing_t = _singular(_norm(to_table))
if sing_t and (sing_t in fstem or _norm(to_table) in fstem):
return True
return False
def _schema_has_name_hints(cols_by_table: dict) -> bool:
"""True si el esquema usa convencion de nombres de clave (alguna columna `...id`).
Permite degradar a contencion pura (retrocompatible) en bases con columnas
cripticas (`c1`, `c2`) que no siguen ninguna convencion: ahi la señal de
nombre no aplica y se vuelve al comportamiento historico.
"""
for cols in cols_by_table.values():
for c in cols:
if _ends_id(_norm(c["name"])):
return True
return False
def _scalar(res: dict):
"""Extrae el unico valor escalar de un resultado duckdb_query_readonly.
@@ -111,6 +224,7 @@ def infer_fk_containment_duckdb(
tables: list = None,
min_inclusion: float = 0.9,
max_card: int = 200000,
require_name_signal: bool = True,
) -> dict:
"""Infiere FOREIGN KEYs candidatas entre tablas DuckDB por containment de valores.
@@ -125,13 +239,23 @@ def infer_fk_containment_duckdb(
max_card: tope de filas en la tabla destino (lado B, el caro del INTERSECT).
Si count(T2) > max_card, el par se salta para no disparar un INTERSECT
gigante; se acumula una nota en skipped[]. Default 200000.
require_name_signal: si True (default) exige SEÑAL DE NOMBRE ademas de
contencion: la columna origen debe nombrar la tabla destino (patron
`<X>Id -> X.<X>Id` / `<x>_id -> x.<x>_id`) o referenciar su PK
nombrada, y NO puede ser la PK de su propia tabla. Esto elimina el
10-20x de falsos positivos de la contencion pura (issues H3+H10) y, al
filtrar pares ANTES del INTERSECT, acelera. Degrada automaticamente a
contencion pura si el esquema no usa convencion de nombres de clave
(ninguna columna `...id`), por retrocompatibilidad. Con False nunca se
exige señal (comportamiento historico).
Returns:
dict dict-no-throw. En exito:
{status:'ok',
fk_candidates:[{from_table, from_col, to_table, to_col, inclusion,
cardinality, to_is_key}, ...], # ordenado por inclusion desc
tables:[str], skipped:[str]}
cardinality, to_is_key, name_match}, ...],
# ordenado por inclusion desc
tables:[str], skipped:[str], name_signal_enforced:bool}
En error (sin lanzar): {status:'error', error:str}.
"""
try:
@@ -214,6 +338,11 @@ def infer_fk_containment_duckdb(
key_cache[cache_key] = (ratio >= 0.95, ratio)
return key_cache[cache_key]
# 4b) ¿Exigir señal de nombre? Se exige si el caller lo pide Y el esquema
# usa convencion de nombres de clave. Si no hay convencion (columnas
# cripticas), se degrada a contencion pura (retrocompatible).
enforce_name = require_name_signal and _schema_has_name_hints(cols_by_table)
candidates = []
# 5) Pares (A en T1, B en T2) con T1 != T2 y misma clase de tipo (PODA).
@@ -235,11 +364,24 @@ def infer_fk_containment_duckdb(
for a in cols_by_table[t1]:
if a["type_class"] == "other":
continue
# PODA por nombre: una PK nunca es ORIGEN de FK. Excluir aqui
# (antes del bucle interno) mata pares absurdos como
# `Genre.GenreId -> Track.TrackId` de raiz.
if enforce_name and _col_is_own_pk(a["name"], t1):
continue
for b in cols_by_table[t2]:
# PODA: solo pares con la misma clase de tipo base.
if a["type_class"] != b["type_class"]:
continue
# PODA POR NOMBRE (issues H3+H10): exigir señal de nombre
# ANTES del INTERSECT. Recorta el grueso de pares falsos y
# evita el coste del containment sobre ellos.
if enforce_name and not _name_signal(
a["name"], t2, b["name"]
):
continue
# distinct(A); si es 0, no hay containment que medir.
d_a = distinct_count(t1, a["name"])
if d_a == 0:
@@ -281,16 +423,25 @@ def infer_fk_containment_duckdb(
"inclusion": inclusion,
"cardinality": cardinality,
"to_is_key": True,
"name_match": _name_signal(
a["name"], t2, b["name"]
),
}
)
candidates.sort(key=lambda c: c["inclusion"], reverse=True)
# Orden: primero las que tienen señal de nombre (FK mas fiables), luego por
# inclusion descendente. En modo degradado (sin señal) todas son False y el
# orden cae a inclusion, como antes.
candidates.sort(
key=lambda c: (c["name_match"], c["inclusion"]), reverse=True
)
return {
"status": "ok",
"fk_candidates": candidates,
"tables": tables,
"skipped": skipped,
"name_signal_enforced": enforce_name,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
@@ -145,3 +145,169 @@ def test_tabla_invalida_devuelve_error(db):
"""Un nombre de tabla no interpolable devuelve error sin tocar la base."""
res = infer_fk_containment_duckdb(db, tables=["orders; DROP TABLE orders"])
assert res["status"] == "error"
# --- Señal de nombre: precisión de FK (issues H3 + H10) -----------------------
def test_name_signal_helpers():
"""Unit tests del nucleo de señal de nombre (puro, sin DB).
Cubre el patron canonico `<X>Id -> X.<X>Id`, el snake_case `<x>_id -> x`, el
nombre compuesto que contiene la tabla (`manager_staff_id -> staff`), y los
falsos que la contencion pura dejaba pasar.
"""
from .infer_fk_containment_duckdb import _col_is_own_pk, _name_signal
# Golden — FK reales con nombre que casa.
assert _name_signal("AlbumId", "Album", "AlbumId") is True
assert _name_signal("customer_id", "customers", "id") is True
assert _name_signal("ArtistId", "Artist", "ArtistId") is True
# Nombre compuesto que CONTIENE la tabla destino.
assert _name_signal("manager_staff_id", "staff", "staff_id") is True
# Falsos que mata el fix.
assert _name_signal("Quantity", "Album", "AlbumId") is False # no es id-ref
assert _name_signal("ArtistId", "Invoice", "InvoiceId") is False # no nombra Invoice
assert _name_signal("GenreId", "Track", "TrackId") is False # no nombra Track
# PK de su propia tabla: nunca es ORIGEN de FK.
assert _col_is_own_pk("GenreId", "Genre") is True
assert _col_is_own_pk("film_id", "film") is True
assert _col_is_own_pk("id", "customers") is True
assert _col_is_own_pk("AlbumId", "Track") is False # FK, no PK propia
@pytest.fixture
def db_relational(tmp_path):
"""Esquema mini estilo chinook: FK con nombre + medida + PK pequeña.
artist(ArtistId PK 1..3), album(AlbumId PK 1..5, ArtistId FK->artist),
track(TrackId PK 1..10, AlbumId FK->album, Quantity 1..3 medida).
La contencion PURA inventaria:
- artist.ArtistId (1..3) ⊆ album.AlbumId (1..5) → falso (ArtistId es PK).
- track.Quantity (1..3) ⊆ artist.ArtistId (1..3) → falso (Quantity es medida).
El fix por nombre debe eliminar ambos y conservar solo las 2 FK reales.
"""
path = str(tmp_path / "rel_test.duckdb")
con = duckdb.connect(path)
con.execute("CREATE TABLE artist (ArtistId INTEGER, Name VARCHAR)")
con.execute("INSERT INTO artist VALUES (1,'a'),(2,'b'),(3,'c')")
con.execute("CREATE TABLE album (AlbumId INTEGER, ArtistId INTEGER, Title VARCHAR)")
con.execute(
"INSERT INTO album VALUES "
"(1,1,'x'),(2,1,'y'),(3,2,'z'),(4,3,'w'),(5,3,'v')"
)
con.execute("CREATE TABLE track (TrackId INTEGER, AlbumId INTEGER, Quantity INTEGER)")
con.execute(
"INSERT INTO track VALUES "
"(1,1,1),(2,1,2),(3,2,1),(4,2,3),(5,3,2),"
"(6,3,1),(7,4,2),(8,4,3),(9,5,1),(10,5,2)"
)
con.close()
return path
def test_conserva_fk_reales_con_nombre(db_relational):
"""Golden: las 2 FK con nombre que casa se conservan."""
res = infer_fk_containment_duckdb(db_relational)
assert res["status"] == "ok"
assert res["name_signal_enforced"] is True
c = res["fk_candidates"]
assert _find(c, "album", "ArtistId", "artist", "ArtistId") is not None
assert _find(c, "track", "AlbumId", "album", "AlbumId") is not None
# Cada candidata trae el flag name_match True (enforce activo).
assert all(fk["name_match"] is True for fk in c)
def test_excluye_medida_y_pk_como_origen(db_relational):
"""Golden anti-falsos: Quantity (medida) y PKs propias no son origen de FK."""
res = infer_fk_containment_duckdb(db_relational)
c = res["fk_candidates"]
# Quantity es una cantidad, jamas una FK.
assert not any(fk["from_col"] == "Quantity" for fk in c)
# artist.ArtistId es PK de artist: nunca origen (no album falso).
assert not any(
fk["from_table"] == "artist" and fk["from_col"] == "ArtistId" for fk in c
)
# album.AlbumId es PK de album: nunca origen.
assert not any(
fk["from_table"] == "album" and fk["from_col"] == "AlbumId" for fk in c
)
# Resultado total acotado: solo las 2 FK reales (cero ruido).
assert len(c) == 2
def test_degrada_a_contencion_sin_pistas_de_nombre(tmp_path):
"""Edge retrocompatible: esquema sin convencion de nombres degrada a contencion.
Columnas cripticas (a, b, c, d) sin sufijo id → no se exige señal de nombre y
se emite por contencion pura, como el comportamiento historico.
"""
path = str(tmp_path / "cryptic.duckdb")
con = duckdb.connect(path)
con.execute("CREATE TABLE t1 (a INTEGER, b VARCHAR)")
con.execute("INSERT INTO t1 VALUES (1,'p'),(1,'q'),(2,'r'),(3,'s')") # a no unica
con.execute("CREATE TABLE t2 (c INTEGER, d VARCHAR)")
con.execute("INSERT INTO t2 VALUES (1,'p'),(2,'q'),(3,'r')") # c key 1..3
con.close()
res = infer_fk_containment_duckdb(path)
assert res["status"] == "ok"
# Sin pistas de nombre → no se exige señal (degrada).
assert res["name_signal_enforced"] is False
# t1.a (1..3) ⊆ t2.c (1..3): se emite por contencion pura.
fk = _find(res["fk_candidates"], "t1", "a", "t2", "c")
assert fk is not None
assert fk["name_match"] is False # no hay señal, pero se emite por contencion
def test_flujo_materializado_create_table_as_no_vacia(tmp_path):
"""Regresion del flujo real: tablas materializadas con CREATE TABLE AS (sin
PRIMARY KEY declarada, como hace profile_database al ATTACH sqlite) NO deben
vaciar el resultado. La señal de nombre se basa en el PATRON de nombres +
unicidad/cardinalidad como proxy de clave, nunca en la PK fisica declarada.
Confirma el caso critico `Track.AlbumId -> Album.AlbumId`: AlbumId es unica-ish
dentro de Track pero NO es la PK de Track (stem 'album' != tabla 'track'), asi
que NO se excluye como origen.
"""
path = str(tmp_path / "materialized.duckdb")
con = duckdb.connect(path)
# CREATE TABLE AS SELECT: sin PK, exactamente como la materializacion sqlite.
con.execute(
"CREATE TABLE Artist AS SELECT * FROM "
"(VALUES (1,'a'),(2,'b'),(3,'c')) t(ArtistId, Name)"
)
con.execute(
"CREATE TABLE Album AS SELECT * FROM "
"(VALUES (1,1,'x'),(2,1,'y'),(3,2,'z'),(4,3,'w'),(5,3,'v')) "
"t(AlbumId, ArtistId, Title)"
)
con.execute(
"CREATE TABLE Track AS SELECT * FROM "
"(VALUES (1,1),(2,1),(3,2),(4,2),(5,3),(6,4),(7,5)) t(TrackId, AlbumId)"
)
con.close()
res = infer_fk_containment_duckdb(path)
assert res["status"] == "ok"
c = res["fk_candidates"]
# NO debe vaciar: sin PK declarada el patron de nombres sigue detectando FK.
assert len(c) > 0, "flujo materializado sin PK no debe dar 0 candidatas"
# FK reales conservadas pese a no haber PRIMARY KEY fisica.
assert _find(c, "Track", "AlbumId", "Album", "AlbumId") is not None
assert _find(c, "Album", "ArtistId", "Artist", "ArtistId") is not None
def test_require_name_signal_false_es_historico(db_relational):
"""Apagar require_name_signal vuelve al comportamiento de contencion pura.
Sin el filtro de nombre reaparecen los falsos (mas candidatas que las 2 reales).
"""
res = infer_fk_containment_duckdb(db_relational, require_name_signal=False)
assert res["status"] == "ok"
assert res["name_signal_enforced"] is False
# La contencion pura genera mas que las 2 FK reales (incluye PKs/medidas).
assert len(res["fk_candidates"]) > 2
+17 -4
View File
@@ -32,16 +32,29 @@ def _clean(values: list) -> list[float]:
def _infer_period(arr: np.ndarray, max_period: int) -> int | None:
"""Infiere el periodo estacional dominante via autocorrelacion.
"""Infiere el periodo estacional dominante via autocorrelacion del residuo detrended.
Busca el retardo (entre 2 y ``max_period``) que maximiza la autocorrelacion
de la serie. Devuelve None si no encuentra un pico claro (autocorrelacion
maxima por debajo de un umbral pequeno).
de la serie tras restarle su tendencia lineal. El detrend es clave: sobre una
serie con tendencia la autocorrelacion cruda decae monotonamente y el retardo
minimo (2) gana siempre, produciendo un ``period=2`` espurio que enmascara la
estacionalidad real (falso negativo). Quitando primero la recta de mejor ajuste
por minimos cuadrados, el lag ganador refleja el ciclo estacional y no la deriva.
Devuelve None si no encuentra un pico claro (autocorrelacion maxima por debajo
de un umbral pequeno).
"""
n = len(arr)
if n < 6:
return None
x = arr - arr.mean()
# Detrend lineal: resta la recta de mejor ajuste para que la tendencia no
# domine la autocorrelacion (si no, lag=2 gana siempre en series con deriva).
t = np.arange(n, dtype=float)
try:
slope, intercept = np.polyfit(t, arr, 1)
detrended = arr - (slope * t + intercept)
except (np.linalg.LinAlgError, ValueError):
detrended = arr - arr.mean()
x = detrended - detrended.mean()
denom = float(np.dot(x, x))
if denom == 0.0:
return None
@@ -70,3 +70,37 @@ def test_serie_larga_resume_sin_values():
assert res["trend"]["values"] is None
assert "mean" in res["trend"]
assert "note" in res["trend"]
# --- H2: deteccion de periodo robusta a tendencia (detrend) ------------------
def test_h2_infer_period_detrend_con_tendencia_fuerte():
# Golden: serie con tendencia FUERTE + estacionalidad de periodo 12. Sin detrend
# la autocorrelacion cruda decae monotonamente y el lag minimo (2) gana siempre
# (period=2 espurio). Con el detrend lineal el lag ganador es el periodo real.
from stl_decompose import _infer_period
serie = _serie_estacional(n=120, period=12, trend=0.8, amp=10.0, seed=0)
arr = np.asarray(serie, dtype=float)
assert _infer_period(arr, max_period=60) == 12
def test_h2_auto_period_no_degenera_a_2():
# End-to-end: stl_decompose(period=None) sobre serie con tendencia fuerte detecta
# estacionalidad real en vez de reportar period=2 y seasonal_strength ~ 0
# (el falso negativo de estacionalidad que motivo el fix H2).
serie = _serie_estacional(n=120, period=12, trend=0.8, amp=10.0, seed=0)
res = stl_decompose(serie)
assert res["period"] != 2
assert res["seasonal_strength"] > 0.5
def test_h2_serie_sin_estacionalidad_no_inventa_periodo():
# Edge: serie con SOLO tendencia (sin componente estacional) no debe inventar un
# periodo; tras el detrend el residuo es ruido sin pico de autocorrelacion claro.
rng = np.random.default_rng(7)
serie = [0.5 * i + rng.normal(0, 1) for i in range(120)]
res = stl_decompose(serie)
# Sin periodo fiable: nota explicita, nunca seasonal_strength=0 como conclusion.
assert res["trend_strength"] is None
assert "note" in res
@@ -43,3 +43,57 @@ def test_detect_exactly_30():
values = rng.normal(0, 1, 30).tolist()
result = detect_distribution_type(values)
assert result["type"] != "too_few_samples"
# --- H11: discrete / multimodal no deben etiquetarse "normal-ish" ---
def test_detect_discrete_low_cardinality():
# Rating ordinal de 6 niveles (como wine `quality`): skewness pequena,
# antes caia en "normal-ish"; ahora debe ser "discrete".
rng = np.random.default_rng(3)
values = rng.integers(3, 9, size=1500).astype(float).tolist() # 6 valores distintos
result = detect_distribution_type(values)
assert result["type"] == "discrete", f"Got {result['type']}"
assert result["stats"]["n_unique"] <= 15
def test_detect_multimodal():
# Mezcla bimodal claramente separada con skewness ~0: antes "normal-ish",
# ahora "multimodal".
rng = np.random.default_rng(4)
values = np.concatenate(
[rng.normal(-4, 0.6, 1000), rng.normal(4, 0.6, 1000)]
).tolist()
result = detect_distribution_type(values)
assert result["type"] == "multimodal", f"Got {result['type']}"
assert result["stats"]["n_modes"] >= 2
def test_detect_normal_still_normal_after_fix():
# Retrocompatibilidad: una normal continua genuina sigue "normal-ish"
# pese a los nuevos checks de cardinalidad / modos.
rng = np.random.default_rng(5)
values = rng.normal(10, 2, 2000).tolist()
result = detect_distribution_type(values)
assert result["type"] == "normal-ish", f"Got {result['type']}"
assert result["stats"]["n_modes"] == 1
assert result["stats"]["n_unique"] > 15
def test_detect_stats_has_new_keys():
rng = np.random.default_rng(6)
values = rng.normal(0, 1, 200).tolist()
stats = detect_distribution_type(values)["stats"]
for key in ("n_unique", "n_modes", "jb_stat", "jb_pvalue"):
assert key in stats, f"missing {key}"
def test_detect_unimodal_skewed_not_multimodal():
# Continua unimodal sesgada (exponencial): el detector de modos no debe
# inventar modos espurios y la etiqueta no debe ser "multimodal".
rng = np.random.default_rng(8)
values = rng.exponential(1.0, 2000).tolist()
result = detect_distribution_type(values)
assert result["type"] != "multimodal", f"Got {result['type']}"
assert result["stats"]["n_modes"] == 1
+201 -7
View File
@@ -26,7 +26,7 @@ Estilo dict-no-throw del grupo: nunca lanza; captura cualquier error y devuelve
import json
import os
from datetime import datetime, timezone
from datetime import date, datetime, timezone
from datascience import (
acf_pacf,
@@ -109,6 +109,104 @@ def _looks_financial(col: dict) -> bool:
return (col.get("semantic_type") or "").lower() == "currency"
def _to_ordinal_days(value) -> float | None:
"""Convierte un valor fecha/datetime/ISO-string a dias ordinales (float), o None.
Soporta los tipos que devuelve DuckDB para columnas DATE/TIMESTAMP
(``datetime.date`` / ``datetime.datetime``) y strings ISO. Devuelve None para
cualquier cosa que no parsee a una fecha (numeros sueltos, basura).
"""
if value is None or isinstance(value, bool):
return None
if isinstance(value, datetime):
return value.toordinal() + value.hour / 24.0 + value.minute / 1440.0
if isinstance(value, date):
return float(value.toordinal())
if isinstance(value, (int, float)):
return None # entero/float suelto: no es una fecha fiable
s = str(value).strip()
if not s:
return None
try:
return float(datetime.fromisoformat(s).toordinal())
except (TypeError, ValueError):
pass
try:
return float(date.fromisoformat(s[:10]).toordinal())
except (TypeError, ValueError):
return None
def _infer_period_from_dates(dates: list, n_series: int) -> int | None:
"""Deriva el periodo estacional de la FRECUENCIA del indice datetime.
En vez de adivinar el periodo por autocorrelacion cruda (que en series con
tendencia degenera a 2 y produce un falso negativo de estacionalidad), se mide
el delta mediano en dias entre observaciones consecutivas ordenadas y se mapea
a su periodo estacional dominante:
- diario (delta ~= 1 dia) -> 365 si hay >= 2 anios de datos; si no, 7.
- semanal (delta ~= 7 dias) -> 52.
- mensual (delta ~= 30 dias)-> 12.
- trimestral (delta ~= 91) -> 4.
Devuelve None si no hay fechas suficientes, si la frecuencia no encaja en un
patron conocido, o si la serie es demasiado corta para dos ciclos del periodo.
"""
ords = [d for d in (_to_ordinal_days(v) for v in dates) if d is not None]
if len(ords) < 3:
return None
ords.sort()
deltas = sorted(b - a for a, b in zip(ords[:-1], ords[1:]) if b - a > 0)
if not deltas:
return None
med = deltas[len(deltas) // 2] # delta mediano en dias
if med <= 2.0: # diario
if n_series >= 730: # >= 2 anios: estacionalidad anual
return 365
return 7 if n_series >= 14 else None
if 5.0 <= med <= 10.0: # semanal
return 52 if n_series >= 104 else None
if 25.0 <= med <= 35.0: # mensual
return 12 if n_series >= 24 else None
if 85.0 <= med <= 100.0: # trimestral
return 4 if n_series >= 8 else None
return None
def _is_sequential_id(col: dict) -> bool:
"""True si la columna numerica es un id ENTERO secuencial (indice de fila).
Distingue ``PassengerId`` (1..n, enteros densos, monotono) de un float continuo
de alta cardinalidad (precios): el id no debe entrar en correlacion ni en
PCA/KMeans (es ruido que infla pares espurios y distorsiona componentes); el
precio si. Criterio: flag ``possible_id`` + min/max enteros + rango denso (casi
todos los enteros del intervalo presentes). Un precio tiene parte decimal en
min/max, asi que NUNCA lo marca.
"""
if col.get("inferred_type") != "numeric":
return False
if "possible_id" not in (col.get("flags") or []):
return False
nb = col.get("numeric") or {}
mn, mx = nb.get("min"), nb.get("max")
if not (
isinstance(mn, (int, float))
and not isinstance(mn, bool)
and isinstance(mx, (int, float))
and not isinstance(mx, bool)
):
return False
if not (float(mn).is_integer() and float(mx).is_integer()):
return False # float continuo (precios): mantener
dc = col.get("distinct_count")
if isinstance(dc, int) and not isinstance(dc, bool) and dc > 1:
span = float(mx) - float(mn) + 1.0
if span > 0 and (dc / span) >= 0.95:
return True
return False
def _to_float(value):
"""Parsea un valor a float limpiando simbolos de moneda y separadores.
@@ -215,15 +313,30 @@ def _build_series_block(query_fn, table: str, col: dict, order_col, sample: int)
if len(series_vals) < 8:
return None
# Periodo estacional derivado de la FRECUENCIA del indice datetime (mensual->12,
# diario->365/7), que es fiable, en vez de dejar que stl_decompose lo adivine por
# autocorrelacion (que en series con tendencia degenera a period=2 -> falso
# negativo de estacionalidad). Si no hay order_col datetime o la frecuencia no
# encaja, period queda None y stl_decompose lo infiere (ya con detrend) o avisa.
period = None
period_source = "autocorr"
if order_col:
raw_dates = _sample_series(query_fn, table, order_col, order_col, sample)
period = _infer_period_from_dates(raw_dates, len(series_vals))
if period is not None:
period_source = "datetime_freq"
block: dict = {
"order_col": order_col,
"ordered": bool(order_col),
"n": len(series_vals),
"stationarity": adf_kpss_stationarity(series_vals),
"acf_pacf": acf_pacf(series_vals),
# stl_decompose auto-infiere el periodo; si no hay estacionalidad detectable
# devuelve una nota y strengths None (se incluye igual, es informativo).
"stl": stl_decompose(series_vals),
# Periodo de la frecuencia del indice si se pudo derivar; si no, stl_decompose
# lo infiere por autocorrelacion del residuo detrended o devuelve una nota
# ("periodo no determinado") sin reportar seasonal_strength=0 como conclusion.
"period_source": period_source,
"stl": stl_decompose(series_vals, period=period),
}
# Sugerencia de transformacion solo si la columna parece de niveles:
@@ -423,9 +536,16 @@ def profile_table(
def _skip_for_assoc(c):
it = c.get("inferred_type")
flags = c.get("flags") or []
return it in ("categorical", "text") and (
# Categoricas/text id-like por cardinalidad ~ n.
if it in ("categorical", "text") and (
"possible_id" in flags or "high_cardinality" in flags
)
):
return True
# Id ENTERO secuencial numerico (PassengerId 1..n): indice de fila,
# genera pares espurios (correlation_ratio ~0.9 sig=si) y entra como
# feature ruidosa en PCA/KMeans. Los floats continuos (precios) NO
# se saltan aunque lleven possible_id.
return _is_sequential_id(c)
assoc_cols = [c for c in cols if not _skip_for_assoc(c)]
rows = _sample_rows(
@@ -455,7 +575,30 @@ def profile_table(
# reales: un try/except compartido ponia ambos campos a None).
if run_models:
try:
prof["models"] = run_eda_models(assoc_input)
# Subconjunto de features para PCA/KMeans: solo numericas CONTINUAS.
# Se excluyen binarias/ordinales/target de baja cardinalidad (Survived
# 0/1, Pclass 1/2/3): como dimensiones del PCA/clustering anaden ruido
# y distorsionan componentes y centroides. Los id secuenciales ya
# quedaron fuera de assoc_input via _skip_for_assoc.
def _is_model_feature(cname):
c = next(
(x for x in assoc_cols if x.get("name") == cname), None
)
if c is None or c.get("inferred_type") != "numeric":
return False
dc = c.get("distinct_count")
if (
isinstance(dc, int)
and not isinstance(dc, bool)
and dc <= _REEXPR_MIN_DISTINCT
):
return False
return True
models_input = {
n: v for n, v in assoc_input.items() if _is_model_feature(n)
}
prof["models"] = run_eda_models(models_input)
except Exception: # noqa: BLE001
prof["models"] = None
@@ -501,6 +644,57 @@ def profile_table(
except Exception: # noqa: BLE001
prof["series"] = None
# 8.75) Marcar las correlaciones num-num calculadas sobre NIVELES de series
# no estacionarias (autocorreladas) como posible espuria (Granger-Newbold):
# Close-Open=0.998 es un artefacto de la raiz unitaria, no una relacion util.
# Para uso financiero lo correcto es correlacionar retornos (col["series"]
# ["to_returns"]). Si corrio run_series se usa el veredicto ADF/KPSS por
# columna; si no, una heuristica financiera por nombre + dominio positivo.
try:
corr = prof.get("correlations")
if isinstance(corr, dict):
levels_cols: set = set()
series_map = prof.get("series") or {}
if series_map:
for cname, sb in series_map.items():
if not isinstance(sb, dict):
continue
verdict = (sb.get("stationarity") or {}).get("verdict")
if sb.get("levels_suggested") or verdict in (
"non_stationary",
"inconclusive",
):
levels_cols.add(cname)
else:
for c in cols:
if c.get("inferred_type") == "numeric" and _looks_financial(c):
mn = (c.get("numeric") or {}).get("min")
if (
isinstance(mn, (int, float))
and not isinstance(mn, bool)
and mn > 0
):
levels_cols.add(c.get("name"))
n_marked = 0
for pair in corr.get("pairs", []):
if (
pair.get("method") == "pearson/spearman"
and pair.get("a") in levels_cols
and pair.get("b") in levels_cols
):
pair["levels_possible_spurious"] = True
n_marked += 1
if n_marked:
corr["levels_caveat"] = (
f"{n_marked} par(es) de correlacion se calculan sobre NIVELES "
"de series no estacionarias (autocorreladas): la correlacion "
"puede ser espuria (Granger-Newbold). Para uso financiero, "
"correlacionar sobre retornos/diferencias (ver el bloque "
"series de cada columna)."
)
except Exception: # noqa: BLE001
pass
# 8.8) Avisos exploratorios: recuerdan que el EDA genera hipotesis, no
# conclusiones. Se calculan sobre el perfil ya completo (correlaciones,
# modelos, outliers, faltantes determinan que advertencias aplican).
@@ -14,7 +14,9 @@ import tempfile
import duckdb
from pipelines.profile_table import (
_infer_period_from_dates,
_is_continuous_for_reexpr,
_is_sequential_id,
_looks_financial,
profile_table,
)
@@ -121,6 +123,142 @@ def test_series_no_financiera_sugiere_diferencias():
assert "to_returns" not in s
# --- H2: periodo estacional derivado de la frecuencia del indice datetime ---
def test_infer_period_from_dates_mensual_y_diario():
from datetime import date as _date, timedelta
# Mensual (delta ~30 dias) con 72 puntos -> periodo 12.
mensual = [_date(2000 + i // 12, i % 12 + 1, 1) for i in range(72)]
assert _infer_period_from_dates(mensual, n_series=72) == 12
# Diario con >= 2 anios de datos -> estacionalidad anual (365).
diario = [_date(2010, 1, 1) + timedelta(days=i) for i in range(800)]
assert _infer_period_from_dates(diario, n_series=800) == 365
# Diario corto (< 2 anios) -> cae a semanal (7).
diario_corto = [_date(2010, 1, 1) + timedelta(days=i) for i in range(100)]
assert _infer_period_from_dates(diario_corto, n_series=100) == 7
# Sin fechas validas -> None (stl_decompose infiere o avisa).
assert _infer_period_from_dates(["x", None, 3], n_series=50) is None
def test_h2_periodo_de_frecuencia_datetime_end_to_end():
import math
from datetime import date as _date
tmp_dir = tempfile.mkdtemp(prefix="h2_period_test_")
db_path = os.path.join(tmp_dir, "m.duckdb")
con = duckdb.connect(db_path)
con.execute("CREATE TABLE m (d DATE, v DOUBLE)")
rows = []
for i in range(72): # 6 anios mensual con estacionalidad de periodo 12
dt = _date(2000 + i // 12, i % 12 + 1, 1)
v = 10.0 + 0.1 * i + 5.0 * math.sin(2 * math.pi * (i % 12) / 12)
rows.append((dt, v))
con.executemany("INSERT INTO m VALUES (?, ?)", rows)
con.close()
r = profile_table(db_path, "m", run_series=True, write_report=False)
assert r["status"] == "ok", r
s = _col(r["profile"], "v").get("series") or {}
assert s.get("period_source") == "datetime_freq"
stl = s.get("stl") or {}
assert stl.get("period") == 12
# Estacionalidad sinusoidal clara -> fuerza estacional alta (antes salia ~0).
assert (stl.get("seasonal_strength") or 0) > 0.3
# --- H7: id entero secuencial fuera de correlacion y de PCA/KMeans -----------
def test_is_sequential_id_distingue_id_de_precio():
# Id entero secuencial denso (1..n): True.
idcol = {
"inferred_type": "numeric",
"flags": ["possible_id"],
"distinct_count": 300,
"numeric": {"min": 1.0, "max": 300.0},
}
assert _is_sequential_id(idcol) is True
# Float continuo de alta cardinalidad (precios): min/max con decimales -> False.
precio = {
"inferred_type": "numeric",
"flags": ["possible_id"],
"distinct_count": 300,
"numeric": {"min": 24.35, "max": 189.7},
}
assert _is_sequential_id(precio) is False
# Entero disperso (anios): no es indice denso -> False.
disperso = {
"inferred_type": "numeric",
"flags": ["possible_id"],
"distinct_count": 3,
"numeric": {"min": 1990.0, "max": 2010.0},
}
assert _is_sequential_id(disperso) is False
# Sin flag possible_id -> nunca id secuencial.
sin_flag = {
"inferred_type": "numeric",
"flags": [],
"distinct_count": 300,
"numeric": {"min": 1.0, "max": 300.0},
}
assert _is_sequential_id(sin_flag) is False
def test_h7_id_secuencial_fuera_de_correlacion_y_modelos():
tmp_dir = tempfile.mkdtemp(prefix="h7_id_test_")
db_path = os.path.join(tmp_dir, "t.duckdb")
con = duckdb.connect(db_path)
con.execute("CREATE TABLE t (rid INTEGER, age DOUBLE, fare DOUBLE)")
# rid 0..299: indice de fila (id secuencial). age/fare: floats continuos.
con.execute(
"INSERT INTO t SELECT i, ((i*0.13)%80)+1.5, ((i*1.7)%50)+0.3 "
"FROM range(300) tbl(i)"
)
con.close()
r = profile_table(db_path, "t", run_models=True, write_report=False)
assert r["status"] == "ok", r
prof = r["profile"]
# rid (id secuencial) no entra en correlaciones fuertes.
strong = (prof.get("correlations") or {}).get("strong", [])
assert not any("rid" in (p["a"], p["b"]) for p in strong)
# rid no entra como feature de los modelos (normality solo sobre continuas).
norm = (prof.get("models") or {}).get("normality") or {}
assert "rid" not in norm
# age/fare (continuas) SI se mantienen como features.
assert "age" in norm and "fare" in norm
# --- H8: correlacion sobre niveles no estacionarios marcada espuria ----------
def test_h8_correlacion_niveles_marcada_posible_espuria():
tmp_dir = tempfile.mkdtemp(prefix="h8_levels_test_")
db_path = os.path.join(tmp_dir, "s.duckdb")
con = duckdb.connect(db_path)
con.execute('CREATE TABLE s (ts INTEGER, "close" DOUBLE, "open" DOUBLE)')
rows = []
level = 100.0
for t in range(90): # niveles crecientes (no estacionarios), close~open
level += 1.0 + (t % 5) * 0.4
rows.append((t, level, level - 0.5))
con.executemany("INSERT INTO s VALUES (?, ?, ?)", rows)
con.close()
r = profile_table(db_path, "s", run_series=True, write_report=False)
assert r["status"] == "ok", r
corr = r["profile"].get("correlations") or {}
co = [p for p in corr.get("pairs", []) if {p["a"], p["b"]} == {"close", "open"}]
assert co, "par close-open no encontrado"
# Ambas son series financieras de niveles no estacionarias -> par marcado.
assert co[0].get("levels_possible_spurious") is True
assert "levels_caveat" in corr
def _make_db() -> str:
"""Crea una DuckDB temporal con la tabla de prueba y devuelve su path."""
tmp_dir = tempfile.mkdtemp(prefix="profile_table_test_")