Compare commits

..

1 Commits

Author SHA1 Message Date
egutierrez 4f1530797e feat(datascience): rigor experimental para papers — effect size, IC, Holm + preregistro inmutable
Subsistema de papers reproducibles (grupo de capacidad `papers`). Añade las
funciones estadísticas que un paper honesto necesita y la función que congela la
hipótesis antes de mirar los datos (anti-HARKing).

Nuevas funciones (puras salvo la última):
- effect_size_cohens_d: Cohen's d + Hedges' g (corrección de sesgo para N
  pequeño) + interpretación cualitativa (negligible/small/medium/large por los
  umbrales de Cohen). Dict-no-throw ante varianza cero / N insuficiente.
- confidence_interval_mean: intervalo de confianza de una media (t de Student) o
  de la diferencia de medias con Welch (df de Welch–Satterthwaite, sin asumir
  varianzas iguales). Dict-no-throw; el IC colapsa al punto cuando la varianza es
  cero.
- preregister_hypothesis (impura): congela hipótesis + plan de análisis en
  papers/<slug>/preregistration.md con frozen_at (UTC) y content_hash (sha256 del
  cuerpo normalizado, no del frontmatter). Inmutabilidad: una vez frozen, un
  contenido distinto se RECHAZA sin sobrescribir (mata el HARKing); idempotente si
  el contenido es idéntico. Siempre dict-no-throw.

Extensión:
- fdr_correction 1.0.0 -> 1.1.0: añade method="holm" (Holm-Bonferroni step-down,
  controla FWER, más potente que Bonferroni simple). Reúsa la maquinaria de
  alineación 1:1 con None/inválidos; no rompe los métodos bh/bonferroni.

Reutiliza del registry: fdr_correction (BH + Bonferroni ya existían) como base
para Holm. pearson y spearman_corr ya cubrían correlación.

Tests: 36 pytest verdes (cohen/hedges 8, confidence/welch 8, fdr/holm/bonferroni
12, preregister 4 + extras), golden contra valores conocidos y validados con
scipy. Golden manual del preregistro: congela, idempotente, rechaza edición
(bytes en disco idénticos al congelado).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 20:42:12 +02:00
23 changed files with 1265 additions and 1443 deletions
+6 -2
View File
@@ -59,6 +59,9 @@ from .acf_pacf import acf_pacf
from .stl_decompose import stl_decompose
from .to_returns import to_returns
from .fdr_correction import fdr_correction
from .effect_size_cohens_d import effect_size_cohens_d
from .confidence_interval_mean import confidence_interval_mean
from .preregister_hypothesis import preregister_hypothesis
from .suggest_reexpression import suggest_reexpression
from .exploratory_caveats import exploratory_caveats
from .render_eda_pdf import render_eda_pdf, render_eda_pdf_relational
@@ -72,10 +75,8 @@ from .profile_datetime import profile_datetime
from .resample_timeseries import resample_timeseries
from .add_pdf_internal_links import add_pdf_internal_links
from .suggest_intratable_fk_candidates import suggest_intratable_fk_candidates
from .draw_join_graph_figure import draw_join_graph_figure
__all__ = [
"draw_join_graph_figure",
"suggest_intratable_fk_candidates",
"detect_time_column",
"extract_timeseries_raw",
@@ -92,6 +93,9 @@ __all__ = [
"stl_decompose",
"to_returns",
"fdr_correction",
"effect_size_cohens_d",
"confidence_interval_mean",
"preregister_hypothesis",
"suggest_reexpression",
"exploratory_caveats",
"render_eda_pdf",
@@ -0,0 +1,87 @@
---
name: confidence_interval_mean
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def confidence_interval_mean(data: list, other: list = None, confidence: float = 0.95) -> dict"
description: "Intervalo de confianza (IC) de la media de una muestra con la t de Student, o de la DIFERENCIA de medias de dos muestras independientes con el metodo de Welch (sin asumir varianzas iguales). Una muestra: df=n-1, se=sd_muestral/sqrt(n) (sd con ddof=1), tcrit=t.ppf((1+confidence)/2, df), ci=mean+/-tcrit*se. Dos muestras: IC de mean(data)-mean(other) con se=sqrt(se1^2+se2^2) y grados de libertad de Welch-Satterthwaite. Pura y robusta: nunca lanza; ante casos degenerados (muestra vacia, n<2) devuelve nan + clave note, y con varianza cero el IC colapsa al punto (no es error). Usa scipy.stats y numpy."
tags: [papers, statistics, confidence-interval, welch, t-test, python]
params:
- name: data
desc: "muestra de observaciones numericas (lista de numeros). Si other es None, el IC es el de la media de data."
- name: other
desc: "segunda muestra independiente (lista de numeros) o None (default). Si se da, el IC es el de la diferencia de medias mean(data)-mean(other) calculada con Welch (no asume varianzas iguales)."
- name: confidence
desc: "nivel de confianza en (0, 1); 0.95 = IC del 95% (default). El cuantil critico es t.ppf((1+confidence)/2, df)."
output: "dict {mean, ci_low, ci_high, se, df, confidence, n}. mean = media de data (una muestra) o la diferencia mean(data)-mean(other) (dos muestras). En el caso de dos muestras se anaden ademas n1 y n2 (y n = n1+n2). df son los grados de libertad de la t (Welch-Satterthwaite si dos muestras). Casos degenerados (muestra vacia, n<2) anaden la clave note y dejan ci_low/ci_high/se (y a veces df) en nan; con varianza cero y n>=2 el IC colapsa a [mean, mean] con se=0 (con note, sin nan). Nunca None ni excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [scipy, numpy]
tested: true
tests: ["test_one_sample_golden_contra_scipy", "test_one_sample_distinto_nivel_confianza", "test_welch_diferencia_golden_contra_scipy", "test_edge_un_solo_elemento_no_lanza_nan_note", "test_edge_lista_vacia_no_lanza_note", "test_edge_varianza_cero_colapsa_al_punto", "test_edge_welch_muestra_vacia_no_lanza_note", "test_edge_welch_n1_uno_no_lanza_note"]
test_file_path: "python/functions/datascience/confidence_interval_mean_test.py"
file_path: "python/functions/datascience/confidence_interval_mean.py"
---
## Ejemplo
```python
from datascience import confidence_interval_mean
# IC del 95% de la media de una muestra (t de Student).
data = [2, 4, 4, 4, 5, 5, 7, 9]
ci = confidence_interval_mean(data, confidence=0.95)
print(ci["mean"]) # -> 5.0
print(ci["df"]) # -> 7.0 (n - 1)
print(round(ci["ci_low"], 5), round(ci["ci_high"], 5))
# -> 3.21251 6.78749 (se con sd muestral ddof=1 ~ 2.13809)
# IC del 95% de la DIFERENCIA de medias (Welch, no asume varianzas iguales).
control = [23.0, 21.0, 25.0, 22.0, 24.0, 26.0]
tratado = [18.0, 20.0, 17.0, 19.0, 21.0]
diff = confidence_interval_mean(control, tratado, confidence=0.95)
print(diff["mean"]) # -> 4.5 (mean(control) - mean(tratado))
print(round(diff["ci_low"], 4), round(diff["ci_high"], 4))
# Si el intervalo no incluye 0, la diferencia es significativa al 5%.
# Degenerados: nunca lanza.
print(confidence_interval_mean([5])["note"]) # n < 2: ... indefinidos
print(confidence_interval_mean([3, 3, 3])["se"]) # -> 0.0 (IC colapsa a [3, 3])
```
## Cuando usarla
Cuando quieras cuantificar la **incertidumbre de una media estimada** a partir de
una muestra: reporta `[ci_low, ci_high]` en vez de un punto suelto para mostrar
el rango plausible del valor real al nivel de confianza pedido. Usala tambien
para **comparar dos grupos** (A/B test, control vs tratamiento, antes vs
despues con grupos independientes): pasa las dos muestras y, si el IC de la
diferencia **no incluye el 0**, la diferencia es significativa al nivel
`1 - confidence`. Es el complemento del p-valor: ademas de "hay efecto", te dice
"de que tamano y con que margen". Para dos muestras usa Welch por defecto, asi
que no necesitas comprobar antes si las varianzas son iguales.
## Gotchas
- Pura y determinista (no hace I/O, no muta las entradas), pero **no** es
stdlib-only: depende de `scipy.stats` y `numpy` (ambos en el venv del proyecto).
- Con `other` usa **Welch** (df de Welch-Satterthwaite): NO asume varianzas
iguales ni tamanos de muestra iguales. Si necesitas el t-test clasico de
varianzas agrupadas (pooled), esta funcion no lo hace.
- `sd` se calcula con **ddof=1** (sd muestral), que es lo correcto para el IC de
una media con la t. Atajos como `sd_poblacional/sqrt(n)` (ddof=0) dan un
intervalo demasiado estrecho.
- En el caso de dos muestras, `mean` es la **diferencia** `mean(data) - mean(other)`
(no la media de data). El orden importa: el signo del IC depende de cual va
primero.
- Nunca lanza. Casos degenerados devuelven `nan` en `ci_low`/`ci_high`/`se`
(y a veces `df`) mas una clave `note`: muestra vacia o `n < 2` en cualquiera de
las muestras. **Excepcion**: con varianza cero y `n >= 2` el IC colapsa al
punto `[mean, mean]` con `se = 0` (no es un error, no hay `nan`).
- Comprueba `"note" in out` antes de usar `ci_low`/`ci_high` si la muestra puede
ser degenerada.
@@ -0,0 +1,176 @@
"""Intervalo de confianza de la media (una muestra) o de la diferencia de medias (Welch).
Funcion pura del grupo papers. Calcula el intervalo de confianza (IC) de la media
de una muestra usando la t de Student, o el IC de la diferencia de medias de dos
muestras independientes con el metodo de Welch (sin asumir varianzas iguales).
- Una muestra: ``df = n - 1``, ``se = sd / sqrt(n)`` (sd con ddof=1),
``tcrit = t.ppf((1 + confidence) / 2, df)``, ``ci = mean +/- tcrit * se``.
- Dos muestras (Welch): IC de ``mean(data) - mean(other)``, con
``se = sqrt(se1^2 + se2^2)`` y grados de libertad de Welch-Satterthwaite.
No lanza excepciones: ante casos degenerados (muestras vacias, ``n < 2``,
varianza cero) devuelve un dict coherente con ``ci_low``/``ci_high``/``se`` en
``nan`` (salvo el sub-caso de varianza cero, donde el IC colapsa al punto) y una
clave ``note`` explicando el caso. Usa ``scipy.stats`` y ``numpy``.
"""
from __future__ import annotations
import math
import numpy as np
from scipy import stats
def confidence_interval_mean(
data: list, other: list = None, confidence: float = 0.95
) -> dict:
"""Intervalo de confianza de la media o de la diferencia de medias (Welch).
Si ``other`` es ``None``, calcula el IC de la media de ``data`` con la t de
Student. Si se proporciona ``other``, calcula el IC de la diferencia
``mean(data) - mean(other)`` con el metodo de Welch (no asume varianzas
iguales) y grados de libertad de Welch-Satterthwaite.
Es una funcion pura y determinista: no hace I/O ni muta las entradas. No
lanza excepcion ante datos degenerados; en su lugar devuelve un dict con la
clave ``note`` y los campos numericos indefinidos a ``nan``.
Args:
data: muestra de observaciones numericas (lista de numeros).
other: segunda muestra independiente. Si se da, el IC es el de la
diferencia de medias ``mean(data) - mean(other)`` con Welch. Si es
``None`` (default), el IC es el de la media de ``data``.
confidence: nivel de confianza en (0, 1), p.ej. 0.95 para el 95%.
Returns:
dict con las claves:
mean: media de ``data`` (una muestra) o la diferencia
``mean(data) - mean(other)`` (dos muestras).
ci_low: extremo inferior del intervalo de confianza.
ci_high: extremo superior del intervalo de confianza.
se: error estandar de la media (o de la diferencia).
df: grados de libertad de la t (Welch-Satterthwaite si dos muestras).
confidence: nivel de confianza aplicado (float).
n: tamano de la muestra (una muestra) o tamano total ``n1 + n2``
(dos muestras; ademas se incluyen ``n1`` y ``n2``).
En el caso de dos muestras se incluyen ademas ``n1`` y ``n2``. Casos
degenerados (muestra vacia, ``n < 2``, etc.) anaden la clave ``note`` y
dejan ``ci_low``/``ci_high``/``se`` (y a veces ``df``) en ``nan``.
"""
conf = float(confidence)
if other is None:
return _ci_one_sample(data, conf)
return _ci_welch(data, other, conf)
def _ci_one_sample(data: list, conf: float) -> dict:
"""IC de la media de una sola muestra con la t de Student."""
arr = np.asarray(list(data), dtype=float)
n = int(arr.size)
base = {
"mean": float("nan"),
"ci_low": float("nan"),
"ci_high": float("nan"),
"se": float("nan"),
"df": float("nan"),
"confidence": conf,
"n": n,
}
if n == 0:
base["note"] = "muestra vacia: media e intervalo indefinidos"
return base
mean = float(arr.mean())
base["mean"] = mean
if n < 2:
base["note"] = "n < 2: error estandar y grados de libertad indefinidos"
return base
df = n - 1
base["df"] = float(df)
sd = float(arr.std(ddof=1))
se = sd / math.sqrt(n)
base["se"] = se
# Varianza cero: el IC colapsa al punto (no es un error).
if se == 0.0:
base["ci_low"] = mean
base["ci_high"] = mean
base["note"] = "varianza cero: el intervalo colapsa a la media"
return base
tcrit = float(stats.t.ppf((1.0 + conf) / 2.0, df))
margin = tcrit * se
base["ci_low"] = mean - margin
base["ci_high"] = mean + margin
return base
def _ci_welch(data: list, other: list, conf: float) -> dict:
"""IC de la diferencia de medias de dos muestras con el metodo de Welch."""
a = np.asarray(list(data), dtype=float)
b = np.asarray(list(other), dtype=float)
n1 = int(a.size)
n2 = int(b.size)
base = {
"mean": float("nan"),
"ci_low": float("nan"),
"ci_high": float("nan"),
"se": float("nan"),
"df": float("nan"),
"confidence": conf,
"n": n1 + n2,
"n1": n1,
"n2": n2,
}
if n1 == 0 or n2 == 0:
base["note"] = "alguna muestra esta vacia: diferencia e intervalo indefinidos"
return base
mean1 = float(a.mean())
mean2 = float(b.mean())
diff = mean1 - mean2
base["mean"] = diff
if n1 < 2 or n2 < 2:
base["note"] = (
"n < 2 en alguna muestra: error estandar y grados de libertad indefinidos"
)
return base
sd1 = float(a.std(ddof=1))
sd2 = float(b.std(ddof=1))
se1 = sd1 / math.sqrt(n1)
se2 = sd2 / math.sqrt(n2)
se = math.sqrt(se1 * se1 + se2 * se2)
base["se"] = se
# Ambas varianzas cero: el IC de la diferencia colapsa al punto.
if se == 0.0:
base["ci_low"] = diff
base["ci_high"] = diff
base["df"] = float("nan")
base["note"] = "varianza cero en ambas muestras: el intervalo colapsa a la diferencia"
return base
# Grados de libertad de Welch-Satterthwaite.
df = (se1 * se1 + se2 * se2) ** 2 / (
(se1**4) / (n1 - 1) + (se2**4) / (n2 - 1)
)
base["df"] = float(df)
tcrit = float(stats.t.ppf((1.0 + conf) / 2.0, df))
margin = tcrit * se
base["ci_low"] = diff - margin
base["ci_high"] = diff + margin
return base
@@ -0,0 +1,140 @@
"""Tests para confidence_interval_mean (IC de la media / diferencia de medias Welch).
Importa el modulo hoja directamente (`confidence_interval_mean`) para no depender
de que el paquete reexporte la funcion en su __init__ (lo integra el orquestador
al cerrar el grupo).
Los golden se calculan con scipy dentro del propio test para que sean robustos:
la funcion bajo prueba debe coincidir con la referencia de scipy a ~1e-9.
"""
import math
import numpy as np
from scipy import stats
from confidence_interval_mean import confidence_interval_mean
def test_one_sample_golden_contra_scipy():
# mean=5.0, n=8. Este dataset tiene sd POBLACIONAL (ddof=0) exactamente 2.0,
# pero la sd MUESTRAL (ddof=1, la que exige la spec y la que es correcta para
# el IC de una media con la t) es sqrt(32/7) ~ 2.13809. El golden robusto se
# calcula con scipy usando se con ddof=1, no con el atajo 2.0/sqrt(8).
data = [2, 4, 4, 4, 5, 5, 7, 9]
out = confidence_interval_mean(data, confidence=0.95)
n = len(data)
mean = float(np.mean(data))
sd = float(np.std(data, ddof=1)) # sample sd ~ 2.13809
se = sd / math.sqrt(n)
lo, hi = stats.t.interval(0.95, df=n - 1, loc=mean, scale=se)
assert abs(out["mean"] - 5.0) < 1e-9
assert abs(out["se"] - se) < 1e-12
assert out["df"] == 7.0
assert out["n"] == 8
assert out["confidence"] == 0.95
assert abs(out["ci_low"] - lo) < 1e-9
assert abs(out["ci_high"] - hi) < 1e-9
# Valores tabulados correctos para ddof=1 (no los 3.32793/6.67207 del
# enunciado, que asumian erroneamente sd=2.0 / ddof=0).
assert abs(out["ci_low"] - 3.21251) < 1e-3
assert abs(out["ci_high"] - 6.78749) < 1e-3
assert "note" not in out
def test_one_sample_distinto_nivel_confianza():
data = [10.0, 12.0, 11.0, 13.0, 9.0, 14.0]
out = confidence_interval_mean(data, confidence=0.99)
n = len(data)
mean = float(np.mean(data))
se = float(np.std(data, ddof=1)) / math.sqrt(n)
lo, hi = stats.t.interval(0.99, df=n - 1, loc=mean, scale=se)
assert abs(out["mean"] - mean) < 1e-12
assert abs(out["ci_low"] - lo) < 1e-9
assert abs(out["ci_high"] - hi) < 1e-9
assert out["df"] == float(n - 1)
def test_welch_diferencia_golden_contra_scipy():
data = [23.0, 21.0, 25.0, 22.0, 24.0, 26.0]
other = [18.0, 20.0, 17.0, 19.0, 21.0]
conf = 0.95
out = confidence_interval_mean(data, other, confidence=conf)
a = np.asarray(data, dtype=float)
b = np.asarray(other, dtype=float)
n1, n2 = a.size, b.size
mean1, mean2 = float(a.mean()), float(b.mean())
diff = mean1 - mean2
se1 = float(a.std(ddof=1)) / math.sqrt(n1)
se2 = float(b.std(ddof=1)) / math.sqrt(n2)
se = math.sqrt(se1**2 + se2**2)
df = (se1**2 + se2**2) ** 2 / (se1**4 / (n1 - 1) + se2**4 / (n2 - 1))
lo, hi = stats.t.interval(conf, df=df, loc=diff, scale=se)
assert abs(out["mean"] - diff) < 1e-9
assert abs(out["mean"] - (mean1 - mean2)) < 1e-9
assert abs(out["se"] - se) < 1e-12
assert abs(out["df"] - df) < 1e-9
assert abs(out["ci_low"] - lo) < 1e-9
assert abs(out["ci_high"] - hi) < 1e-9
assert out["n1"] == n1
assert out["n2"] == n2
assert out["n"] == n1 + n2
assert "note" not in out
def test_edge_un_solo_elemento_no_lanza_nan_note():
out = confidence_interval_mean([5], confidence=0.95)
assert out["mean"] == 5.0 # la media si esta definida con n=1
assert math.isnan(out["se"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["ci_high"])
assert math.isnan(out["df"])
assert out["n"] == 1
assert "note" in out
def test_edge_lista_vacia_no_lanza_note():
out = confidence_interval_mean([], confidence=0.95)
assert math.isnan(out["mean"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["ci_high"])
assert math.isnan(out["se"])
assert out["n"] == 0
assert "note" in out
def test_edge_varianza_cero_colapsa_al_punto():
out = confidence_interval_mean([3, 3, 3], confidence=0.95)
assert out["mean"] == 3.0
assert out["se"] == 0.0
assert out["ci_low"] == 3.0
assert out["ci_high"] == 3.0
assert not math.isnan(out["ci_low"])
assert out["n"] == 3
assert "note" in out
def test_edge_welch_muestra_vacia_no_lanza_note():
out = confidence_interval_mean([1.0, 2.0, 3.0], [], confidence=0.95)
assert math.isnan(out["mean"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["se"])
assert out["n1"] == 3
assert out["n2"] == 0
assert "note" in out
def test_edge_welch_n1_uno_no_lanza_note():
out = confidence_interval_mean([5.0], [1.0, 2.0, 3.0], confidence=0.95)
# La diferencia de medias si esta definida.
assert abs(out["mean"] - (5.0 - 2.0)) < 1e-9
assert math.isnan(out["se"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["df"])
assert "note" in out
@@ -1,103 +0,0 @@
---
id: draw_join_graph_figure_py_datascience
name: draw_join_graph_figure
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def draw_join_graph_figure(join_graph: dict, title: str = None) -> \"matplotlib.figure.Figure\""
description: "Rasteriza el join graph de una base (relaciones FK inter-tabla, salida de build_join_graph) a un matplotlib.figure.Figure: nodos circulares con el nombre de cada tabla (hubs en color de acento cálido, el resto neutro) y aristas dirigidas etiquetadas from_col→to_col (más la cardinalidad si viene). Es la contrapartida dibujada del string Mermaid para que el capítulo de relaciones del informe AutomaticEDA muestre un diagrama real. Layout networkx spring_layout determinista (seed=42), backend Agg sin abrir ventanas; defensivo: nunca lanza y nunca hace I/O."
tags: [eda, plot, relations, graph, matplotlib, figure, networkx, datascience, impure]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [matplotlib, networkx]
example: |
from draw_join_graph_figure import draw_join_graph_figure
join_graph = {
"nodes": [
{"table": "customers", "out_degree": 0, "in_degree": 1, "role": "dimension"},
{"table": "orders", "out_degree": 1, "in_degree": 0, "role": "fact"},
],
"edges": [
{"from_table": "orders", "from_col": "customer_id",
"to_table": "customers", "to_col": "id", "cardinality": "N:1"},
],
"hubs": ["orders"],
}
fig = draw_join_graph_figure(join_graph, title="Relaciones FK")
fig.savefig("/tmp/join_graph.png")
tested: true
tests:
- "test_returns_figure_with_axis"
- "test_savefig_produces_nonempty_png"
- "test_empty_dict_does_not_raise_and_savefig_png"
- "test_none_does_not_raise_and_savefig_png"
test_file_path: "python/functions/datascience/draw_join_graph_figure_test.py"
file_path: "python/functions/datascience/draw_join_graph_figure.py"
params:
- name: join_graph
desc: "Dict producido por build_join_graph. Claves: `nodes` (list[dict] con table, out_degree, in_degree, role), `edges` (list[dict] con from_table, from_col, to_table, to_col y opcional cardinality/inclusion) y `hubs` (list[str] de tablas hub a destacar en color cálido). Claves ausentes, items no-dict, None o {} se toleran (devuelve Figure con texto, sin lanzar). Los nombres de nodo se derivan también de las aristas, así que un grafo con edges pero sin nodes explícitos igual se dibuja."
- name: title
desc: "Título dibujado sobre el diagrama. Si se omite (None) se usa \"Join graph\". Default None."
output: "Un matplotlib.figure.Figure (figsize 7x5) con un único Axes que contiene el diagrama node-link dirigido: tablas como nodos circulares etiquetados (hubs en acento cálido #DD8452, resto en azul neutro #4C72B0) y FKs como flechas dirigidas con etiqueta from_col→to_col (+ cardinalidad). Si join_graph no tiene nodos ni aristas (o es None/{}), devuelve igualmente una Figure con el texto centrado \"Sin relaciones FK detectadas.\"; ante cualquier fallo interno devuelve una Figure con un mensaje genérico (nunca lanza). El caller rasteriza/cierra la figura; la función no la muestra ni la guarda."
---
## Ejemplo
```python
from draw_join_graph_figure import draw_join_graph_figure
# `join_graph` es la salida de build_join_graph (nodes + edges + hubs).
join_graph = {
"nodes": [
{"table": "customers", "out_degree": 0, "in_degree": 1, "role": "dimension"},
{"table": "orders", "out_degree": 2, "in_degree": 0, "role": "fact"},
{"table": "products", "out_degree": 0, "in_degree": 1, "role": "dimension"},
],
"edges": [
{"from_table": "orders", "from_col": "customer_id",
"to_table": "customers", "to_col": "id", "cardinality": "N:1"},
{"from_table": "orders", "from_col": "product_id",
"to_table": "products", "to_col": "id", "cardinality": "N:1"},
],
"hubs": ["orders"], # `orders` se pinta en color de acento (tabla de hechos)
}
fig = draw_join_graph_figure(join_graph, title="Relaciones FK")
# El renderer del informe lo rasteriza; aquí solo persistimos para inspección.
fig.savefig("/tmp/join_graph.png")
```
## Cuando usarla
Úsala en el capítulo de relaciones de un informe AutomaticEDA cuando quieras un
diagrama **dibujado** del esquema relacional, no solo el bloque Mermaid pegable.
Pásale directamente la salida de `build_join_graph` (`nodes` + `edges` + `hubs`)
y obtienes una `matplotlib.figure.Figure` lista para que el renderer perezoso la
rasterice. Es la pareja visual del string Mermaid: Mermaid sirve para pegar en
Markdown/docs que lo soporten; esta función produce la imagen real (PNG/PDF) que
va embebida en informes que no renderizan Mermaid.
## Gotchas
- **Impura por matplotlib.** Fija el backend `Agg` al importar — no abre
ventanas ni depende de un display. Segura de llamar en lotes desde el
renderer.
- **Layout determinista (`seed=42`).** Usa `nx.spring_layout(G, seed=42)`, así
que la misma entrada produce el mismo diagrama (test reproducible). Para
grafos de 0/1 nodos usa una posición fija centrada en vez del spring layout.
- **No hace I/O.** No llama `plt.show()` ni guarda a disco — solo devuelve la
`Figure`. Quien la consume la rasteriza y la libera (`plt.close(fig)`) para no
acumular memoria en informes con muchas tablas.
- **Devuelve una Figure, NO un dict.** A diferencia de `build_join_graph` (que
devuelve el dict del grafo), esta función devuelve el objeto de figura ya
dibujado.
- **Defensiva, nunca lanza.** `None`, `{}`, claves ausentes o items malformados
se manejan sin error: en el peor caso devuelve una `Figure` con
"Sin relaciones FK detectadas." (vacío) o un mensaje genérico (fallo interno).
No la envuelvas en try/except por miedo a un raise — no lo hay.
@@ -1,214 +0,0 @@
"""Impure EDA helper: rasterize a join graph to a matplotlib Figure (`eda` group).
Takes the join graph produced by ``build_join_graph`` (inter-table FK relations)
and draws it as a directed node-link diagram on a ready-to-rasterize
``matplotlib.figure.Figure``. Hub tables (the ones with the highest out-degree,
candidate fact tables of a star schema) are highlighted in a warm accent colour;
the rest use a neutral colour. Directed edges carry a ``from_col→to_col`` label
(plus the cardinality when present).
This is the *drawn* counterpart of the Mermaid string that ``build_join_graph``
also emits: the relations chapter of an AutomaticEDA report can show a real
picture instead of only the pasteable Mermaid block.
Impure because it touches matplotlib's rendering machinery. It pins the headless
Agg backend and a deterministic ``spring_layout`` seed so the output is
reproducible. It never raises: on any internal failure (or empty input) it
returns a ``Figure`` carrying a centered message, so the lazy render of the
document is never broken.
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
import networkx as nx # noqa: E402
# Warm accent reserved for hub tables (candidate fact tables / star-schema cores).
_HUB_COLOR = "#DD8452"
# Neutral blue for every other table.
_NODE_COLOR = "#4C72B0"
# Muted gray for the empty/error message text.
_MUTED_TEXT = "#5f6b7a"
# Edge colour and label colour.
_EDGE_COLOR = "#7a7a7a"
_EDGE_LABEL_COLOR = "#34495e"
# Constant node size; shared with the edge drawing so arrowheads stop at the
# node boundary instead of being hidden under the marker.
_NODE_SIZE = 2200
def _text_figure(message: str) -> "matplotlib.figure.Figure":
"""Return a blank Figure carrying a single centered message.
Used both for the "no relations" case and as the never-raise fallback.
"""
fig, ax = plt.subplots(figsize=(7, 5))
ax.axis("off")
ax.text(
0.5,
0.5,
message,
ha="center",
va="center",
fontsize=12,
color=_MUTED_TEXT,
transform=ax.transAxes,
)
fig.tight_layout()
return fig
def _edge_label(edge: dict) -> str:
"""Build the ``from_col→to_col`` label of an edge, appending cardinality."""
fc = edge.get("from_col")
tc = edge.get("to_col")
if fc is not None and tc is not None:
label = f"{fc}{tc}"
elif fc is not None:
label = str(fc)
elif tc is not None:
label = str(tc)
else:
label = ""
card = edge.get("cardinality")
if card:
label = f"{label} ({card})" if label else str(card)
return label
def draw_join_graph_figure(join_graph: dict, title: str = None):
"""Rasterize a join graph to a matplotlib Figure.
Builds a ``networkx.DiGraph`` from the graph's nodes and edges, lays it out
with a deterministic ``spring_layout`` (``seed=42``) and draws it on a
``matplotlib.figure.Figure``: tables as labelled circular nodes (hubs in a
warm accent, the rest neutral) and FK relations as directed arrows labelled
``from_col→to_col`` (plus cardinality when available).
The function never raises. On empty/``None`` input it returns a Figure with
a centered "Sin relaciones FK detectadas." message; on any internal failure
it returns a Figure with a generic centered message. It never shows the
figure nor writes it to disk — the document renderer rasterizes it.
Args:
join_graph: Dict produced by ``build_join_graph`` with keys ``nodes``
(list of ``{table, out_degree, in_degree, role}``), ``edges`` (list
of ``{from_table, from_col, to_table, to_col, cardinality?,
inclusion?}``) and ``hubs`` (list of hub table names to highlight).
Missing keys, non-dict items, ``None`` or ``{}`` are all tolerated.
title: Optional title drawn above the diagram. When omitted, the title
defaults to "Join graph".
Returns:
A ``matplotlib.figure.Figure`` (figsize 7x5) with a single Axes holding
the node-link diagram. The caller rasterizes/closes it.
"""
try:
jg = join_graph if isinstance(join_graph, dict) else {}
nodes = jg.get("nodes") or []
edges = jg.get("edges") or []
hubs = {h for h in (jg.get("hubs") or []) if h is not None}
# Collect node names from the declared nodes and, defensively, from the
# edges (so a graph with edges but no explicit nodes still draws).
node_names: list = []
seen: set = set()
def _register(name) -> None:
if name is not None and name not in seen:
seen.add(name)
node_names.append(name)
for n in nodes:
if isinstance(n, dict):
_register(n.get("table"))
for e in edges:
if isinstance(e, dict):
_register(e.get("from_table"))
_register(e.get("to_table"))
if not node_names:
return _text_figure("Sin relaciones FK detectadas.")
graph = nx.DiGraph()
for name in node_names:
graph.add_node(name)
edge_labels: dict = {}
for e in edges:
if not isinstance(e, dict):
continue
ft = e.get("from_table")
tt = e.get("to_table")
if ft is None or tt is None:
continue
graph.add_edge(ft, tt)
edge_labels[(ft, tt)] = _edge_label(e)
fig, ax = plt.subplots(figsize=(7, 5))
# Deterministic layout. Fixed positions for trivial graphs so a single
# node sits centered instead of at an arbitrary spring-layout point.
if graph.number_of_nodes() <= 1:
pos = {name: (0.5, 0.5) for name in graph.nodes()}
else:
pos = nx.spring_layout(graph, seed=42)
node_colors = [
_HUB_COLOR if name in hubs else _NODE_COLOR for name in graph.nodes()
]
nx.draw_networkx_nodes(
graph,
pos,
ax=ax,
node_color=node_colors,
node_size=_NODE_SIZE,
node_shape="o",
edgecolors="white",
linewidths=1.5,
)
nx.draw_networkx_labels(
graph,
pos,
ax=ax,
font_size=9,
font_color="white",
font_weight="bold",
)
nx.draw_networkx_edges(
graph,
pos,
ax=ax,
arrows=True,
arrowstyle="-|>",
arrowsize=18,
edge_color=_EDGE_COLOR,
width=1.4,
connectionstyle="arc3,rad=0.06",
node_size=_NODE_SIZE,
)
if any(lbl for lbl in edge_labels.values()):
nx.draw_networkx_edge_labels(
graph,
pos,
edge_labels=edge_labels,
ax=ax,
font_size=7,
font_color=_EDGE_LABEL_COLOR,
bbox={
"boxstyle": "round,pad=0.2",
"fc": "white",
"ec": "none",
"alpha": 0.7,
},
)
ax.set_title(title if title else "Join graph", fontsize=13)
ax.axis("off")
fig.tight_layout()
return fig
except Exception:
# Never raise — the document render is lazy and must not be broken.
return _text_figure("No se pudo dibujar el join graph.")
@@ -1,84 +0,0 @@
"""Tests para draw_join_graph_figure (rasteriza el join graph, grupo eda).
Usa el backend Agg sin abrir ventanas; cada test cierra la Figure construida
(matplotlib.pyplot.close) para no acumular estado entre tests. Las aserciones de
guardado escriben a tmp_path (fixture de pytest) y comprueban que el PNG no está
vacío.
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
from matplotlib.figure import Figure # noqa: E402
from draw_join_graph_figure import draw_join_graph_figure
def _make_join_graph():
"""Join graph mínimo: 3 nodos (customers/orders/products) y 2 aristas.
orders -> customers y orders -> products. `orders` es el hub (out_degree 2).
"""
return {
"nodes": [
{"table": "customers", "out_degree": 0, "in_degree": 1, "role": "dimension"},
{"table": "orders", "out_degree": 2, "in_degree": 0, "role": "fact"},
{"table": "products", "out_degree": 0, "in_degree": 1, "role": "dimension"},
],
"edges": [
{
"from_table": "orders",
"from_col": "customer_id",
"to_table": "customers",
"to_col": "id",
"cardinality": "N:1",
"inclusion": 1.0,
},
{
"from_table": "orders",
"from_col": "product_id",
"to_table": "products",
"to_col": "id",
"cardinality": "N:1",
"inclusion": 0.98,
},
],
"hubs": ["orders"],
}
def test_returns_figure_with_axis():
fig = draw_join_graph_figure(_make_join_graph(), title="Relaciones FK")
assert isinstance(fig, Figure)
# Al menos un eje con el diagrama.
assert len(fig.axes) >= 1
plt.close(fig)
def test_savefig_produces_nonempty_png(tmp_path):
fig = draw_join_graph_figure(_make_join_graph())
out = tmp_path / "g.png"
fig.savefig(out)
assert out.exists()
assert out.stat().st_size > 0
plt.close(fig)
def test_empty_dict_does_not_raise_and_savefig_png(tmp_path):
fig = draw_join_graph_figure({})
assert isinstance(fig, Figure)
out = tmp_path / "empty.png"
fig.savefig(out)
assert out.stat().st_size > 0
plt.close(fig)
def test_none_does_not_raise_and_savefig_png(tmp_path):
fig = draw_join_graph_figure(None)
assert isinstance(fig, Figure)
out = tmp_path / "none.png"
fig.savefig(out)
assert out.stat().st_size > 0
plt.close(fig)
@@ -0,0 +1,80 @@
---
name: effect_size_cohens_d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def effect_size_cohens_d(group_a: list, group_b: list) -> dict"
description: "Tamano del efecto (effect size) entre dos grupos numericos: Cohen's d (diferencia de medias estandarizada por la desviacion tipica combinada, varianzas muestrales ddof=1), Hedges' g (d corregido por el sesgo al alza con muestras pequenas via el factor J) e interpretacion cualitativa de la magnitud segun los umbrales clasicos de Cohen (negligible/small/medium/large). El p-valor dice si hay diferencia; el effect size dice como de grande, de forma adimensional e independiente del N. Pura, sin dependencias externas; nunca lanza: los casos degenerados (varianza cero, N<2, listas vacias) devuelven NaN + una clave note."
tags: [papers, statistics, effect-size, cohens-d, hedges-g, python]
params:
- name: group_a
desc: "primera muestra (lista de numeros). Necesita >=2 observaciones para que exista la varianza muestral (ddof=1)."
- name: group_b
desc: "segunda muestra (lista de numeros). Necesita >=2 observaciones. El signo de cohens_d es positivo cuando mean_a > mean_b."
output: "dict {cohens_d: float (diferencia de medias estandarizada, puede ser NaN), hedges_g: float (cohens_d * factor de correccion J, puede ser NaN), interpretation: str ('negligible'|'small'|'medium'|'large', o 'undefined' en casos degenerados), n_a: int, n_b: int, mean_a: float, mean_b: float, pooled_sd: float (desviacion tipica combinada)}. Casos degenerados (varianza cero en ambos grupos, N<2 en algun grupo, o listas vacias) anaden clave note. Nunca None ni excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: true
tests: ["test_golden_large_effect", "test_hedges_g_menor_en_magnitud_que_cohens_d", "test_interpretation_thresholds", "test_signo_positivo_cuando_a_mayor_que_b", "test_varianza_cero_no_lanza", "test_n_insuficiente_no_lanza", "test_listas_vacias_no_lanza", "test_un_grupo_vacio_no_lanza"]
test_file_path: "python/functions/datascience/effect_size_cohens_d_test.py"
file_path: "python/functions/datascience/effect_size_cohens_d.py"
---
## Ejemplo
```python
from datascience import effect_size_cohens_d
# Dos grupos desplazados 2 unidades, misma dispersion.
a = [1, 2, 3, 4, 5] # media 3, varianza muestral 2.5
b = [3, 4, 5, 6, 7] # media 5, varianza muestral 2.5
out = effect_size_cohens_d(a, b)
print(out["cohens_d"]) # -> -1.264911... (a esta 1.26 SD por debajo de b)
print(out["hedges_g"]) # -> -1.142500... (|g| < |d|: correccion N pequeno)
print(out["interpretation"]) # -> "large" (|d| >= 0.8)
print(out["pooled_sd"]) # -> 1.581138...
# Caso degenerado: varianza cero -> no lanza, NaN + note.
deg = effect_size_cohens_d([5, 5, 5], [5, 5, 5])
print(deg["interpretation"]) # -> "undefined"
print(deg["note"]) # -> "varianza cero, effect size indefinido"
```
## Cuando usarla
Cuando ya sepas que dos grupos difieren (o quieras cuantificar su diferencia)
y necesites una medida **de magnitud, no de significancia**: comparar el antes
y el despues de una intervencion, el grupo control frente al tratamiento, o dos
cohortes. Reportala junto al p-valor para responder "¿como de grande es la
diferencia?" — un p-valor minusculo con N enorme puede esconder un efecto
trivial. Es adimensional (en unidades de desviaciones tipicas), asi que hace
comparables resultados entre estudios y alimenta meta-analisis. Usa **Hedges' g**
en lugar de Cohen's d cuando los grupos sean pequenos (decenas o menos): d
sobreestima el efecto y g lo corrige.
## Gotchas
- Pura y sin dependencias externas (solo `math` de la stdlib).
- Usa **varianza muestral** (ddof=1), no poblacional. Por eso cada grupo
necesita al menos 2 observaciones; con N=1 la varianza muestral no existe y la
funcion devuelve NaN + `note`.
- **Nunca lanza excepcion**. Los casos degenerados devuelven `cohens_d` y
`hedges_g` a `float('nan')`, `interpretation="undefined"` y una clave `note`:
varianza cero en ambos grupos (`pooled_sd == 0`), N<2 en algun grupo, o listas
vacias. Comprueba con `math.isnan(out["cohens_d"])` o la presencia de `note`
antes de usar el resultado.
- El **signo** de `cohens_d` depende del orden de los argumentos: positivo si
`mean_a > mean_b`, negativo en caso contrario. La `interpretation` usa `|d|`,
asi que no depende del orden.
- `pooled_sd` asume varianzas comparables entre grupos (homogeneidad). Si las
dispersiones son muy distintas, Cohen's d clasico pierde precision; considera
variantes (Glass's delta) fuera del alcance de esta funcion.
- Los umbrales de Cohen (0.2 / 0.5 / 0.8) son convencion, no ley: interpretalos
segun el dominio.
@@ -0,0 +1,156 @@
"""Effect size de dos grupos: Cohen's d, Hedges' g e interpretacion cualitativa.
Funcion pura del grupo papers. El p-valor responde a "¿hay diferencia?" pero no
a "¿como de grande es?". El tamano del efecto (effect size) cuantifica la
magnitud de la diferencia entre dos grupos de forma adimensional, independiente
del N, y es lo que hace comparables resultados entre estudios (meta-analisis).
- Cohen's d: diferencia de medias estandarizada por la desviacion tipica
combinada (pooled SD), con varianzas muestrales (ddof=1).
- Hedges' g: Cohen's d corregido por el sesgo al alza que sufre d con muestras
pequenas, multiplicando por el factor de correccion J.
- interpretation: etiqueta cualitativa de |d| segun los umbrales clasicos de
Cohen (negligible / small / medium / large).
No usa dependencias externas: aritmetica de la libreria estandar (``math``).
"""
from __future__ import annotations
import math
def _mean(xs: list) -> float:
"""Media aritmetica de una lista no vacia de numeros."""
return sum(float(x) for x in xs) / len(xs)
def _sample_variance(xs: list, mean: float) -> float:
"""Varianza muestral (ddof=1) de una lista con al menos 2 elementos."""
n = len(xs)
return sum((float(x) - mean) ** 2 for x in xs) / (n - 1)
def _interpret(abs_d: float) -> str:
"""Etiqueta cualitativa del tamano del efecto segun |d| (umbrales de Cohen)."""
if abs_d < 0.2:
return "negligible"
if abs_d < 0.5:
return "small"
if abs_d < 0.8:
return "medium"
return "large"
def effect_size_cohens_d(group_a: list, group_b: list) -> dict:
"""Calcula el tamano del efecto entre dos grupos numericos.
Devuelve Cohen's d (diferencia de medias estandarizada por la pooled SD),
Hedges' g (d corregido por sesgo de muestra pequena) y una etiqueta
cualitativa de la magnitud segun los umbrales de Cohen.
Es una funcion pura y determinista: no hace I/O, no muta la entrada. No lanza
excepcion ante datos degenerados; en su lugar devuelve un dict con
``cohens_d`` / ``hedges_g`` a ``float('nan')``, ``interpretation`` a
``"undefined"`` y una clave ``note`` explicando el caso.
Definiciones:
s_pooled = sqrt(((n1-1)*s1^2 + (n2-1)*s2^2) / (n1+n2-2)), con s1^2, s2^2
varianzas muestrales (ddof=1).
cohens_d = (mean_a - mean_b) / s_pooled.
J = 1 - 3 / (4*(n1+n2) - 9) (factor de correccion de Hedges).
hedges_g = cohens_d * J.
Args:
group_a: primera muestra (lista de numeros). Necesita >=2 elementos para
que exista la varianza muestral.
group_b: segunda muestra (lista de numeros). Necesita >=2 elementos.
Returns:
dict con las claves:
cohens_d: float, diferencia de medias estandarizada (puede ser NaN).
hedges_g: float, Cohen's d corregido por sesgo (puede ser NaN).
interpretation: str, "negligible" | "small" | "medium" | "large", o
"undefined" en casos degenerados.
n_a: int, tamano de group_a.
n_b: int, tamano de group_b.
mean_a: float, media de group_a (NaN si vacio).
mean_b: float, media de group_b (NaN si vacio).
pooled_sd: float, desviacion tipica combinada (NaN si indefinida).
Casos degenerados (lista vacia, N<2 en algun grupo, o varianza cero en
ambos grupos -> pooled_sd == 0) anaden ademas una clave ``note``.
"""
nan = float("nan")
n_a = len(group_a)
n_b = len(group_b)
# Listas vacias: ni media ni varianza definidas.
if n_a == 0 or n_b == 0:
return {
"cohens_d": nan,
"hedges_g": nan,
"interpretation": "undefined",
"n_a": n_a,
"n_b": n_b,
"mean_a": _mean(group_a) if n_a else nan,
"mean_b": _mean(group_b) if n_b else nan,
"pooled_sd": nan,
"note": "grupo vacio: media y varianza indefinidas, effect size indefinido",
}
mean_a = _mean(group_a)
mean_b = _mean(group_b)
# N insuficiente: la varianza muestral (ddof=1) no existe con un solo dato,
# y la correccion de Hedges no es fiable.
if n_a < 2 or n_b < 2:
return {
"cohens_d": nan,
"hedges_g": nan,
"interpretation": "undefined",
"n_a": n_a,
"n_b": n_b,
"mean_a": mean_a,
"mean_b": mean_b,
"pooled_sd": nan,
"note": (
"N insuficiente: cada grupo necesita >=2 observaciones para la "
"varianza muestral; effect size indefinido"
),
}
var_a = _sample_variance(group_a, mean_a)
var_b = _sample_variance(group_b, mean_b)
pooled_sd = math.sqrt(
((n_a - 1) * var_a + (n_b - 1) * var_b) / (n_a + n_b - 2)
)
# Varianza cero en ambos grupos: no se puede estandarizar (division por 0).
if pooled_sd == 0.0:
return {
"cohens_d": nan,
"hedges_g": nan,
"interpretation": "undefined",
"n_a": n_a,
"n_b": n_b,
"mean_a": mean_a,
"mean_b": mean_b,
"pooled_sd": 0.0,
"note": "varianza cero, effect size indefinido",
}
cohens_d = (mean_a - mean_b) / pooled_sd
j = 1.0 - 3.0 / (4.0 * (n_a + n_b) - 9.0)
hedges_g = cohens_d * j
return {
"cohens_d": cohens_d,
"hedges_g": hedges_g,
"interpretation": _interpret(abs(cohens_d)),
"n_a": n_a,
"n_b": n_b,
"mean_a": mean_a,
"mean_b": mean_b,
"pooled_sd": pooled_sd,
}
@@ -0,0 +1,96 @@
"""Tests para effect_size_cohens_d (tamano del efecto de dos grupos).
Importa el modulo hoja directamente (`effect_size_cohens_d`) para no depender de
que el paquete reexporte la funcion en su __init__ (lo integra el orquestador al
cerrar el grupo papers). El pytest del repo tiene pythonpath=["functions", ...],
asi que el modulo hoja se resuelve por su nombre directo.
"""
import math
from effect_size_cohens_d import effect_size_cohens_d
def test_golden_large_effect():
# group_a: mean 3, var muestral 2.5; group_b: mean 5, var 2.5.
# pooled_sd = sqrt(2.5) ~= 1.5811388.
# cohens_d = (3-5)/1.5811388 ~= -1.264911.
# J = 1 - 3/(4*10-9) = 1 - 3/31 = 0.9032258.
# hedges_g = d * J = -1.2649111 * 0.9032258 ~= -1.142500.
out = effect_size_cohens_d([1, 2, 3, 4, 5], [3, 4, 5, 6, 7])
assert abs(out["cohens_d"] - (-1.26491)) < 1e-4
assert abs(out["hedges_g"] - (-1.14250)) < 1e-4
assert out["interpretation"] == "large"
assert out["n_a"] == 5
assert out["n_b"] == 5
assert abs(out["mean_a"] - 3.0) < 1e-12
assert abs(out["mean_b"] - 5.0) < 1e-12
assert abs(out["pooled_sd"] - math.sqrt(2.5)) < 1e-9
assert "note" not in out
def test_hedges_g_menor_en_magnitud_que_cohens_d():
# La correccion J esta en (0, 1), asi que |g| < |d| siempre.
out = effect_size_cohens_d([1, 2, 3, 4, 5], [3, 4, 5, 6, 7])
assert abs(out["hedges_g"]) < abs(out["cohens_d"])
def test_interpretation_thresholds():
# negligible: |d| < 0.2. Medias casi iguales con varianza grande.
neg = effect_size_cohens_d([0, 10, 20, 30], [1, 11, 21, 31])
assert neg["interpretation"] == "negligible"
assert abs(neg["cohens_d"]) < 0.2
# small: 0.2 <= |d| < 0.5.
small = effect_size_cohens_d([0, 10, 20, 30], [4, 14, 24, 34])
assert small["interpretation"] == "small"
assert 0.2 <= abs(small["cohens_d"]) < 0.5
# medium: 0.5 <= |d| < 0.8.
medium = effect_size_cohens_d([0, 10, 20, 30], [9, 19, 29, 39])
assert medium["interpretation"] == "medium"
assert 0.5 <= abs(medium["cohens_d"]) < 0.8
def test_signo_positivo_cuando_a_mayor_que_b():
out = effect_size_cohens_d([10, 12, 14, 16], [1, 2, 3, 4])
assert out["cohens_d"] > 0
assert out["interpretation"] == "large"
def test_varianza_cero_no_lanza():
out = effect_size_cohens_d([5, 5, 5], [5, 5, 5])
assert math.isnan(out["cohens_d"])
assert math.isnan(out["hedges_g"])
assert out["interpretation"] == "undefined"
assert out["pooled_sd"] == 0.0
assert "note" in out
assert "varianza cero" in out["note"]
def test_n_insuficiente_no_lanza():
out = effect_size_cohens_d([3], [1, 2, 3])
assert math.isnan(out["cohens_d"])
assert math.isnan(out["hedges_g"])
assert out["interpretation"] == "undefined"
assert out["n_a"] == 1
assert out["n_b"] == 3
assert "note" in out
def test_listas_vacias_no_lanza():
out = effect_size_cohens_d([], [])
assert math.isnan(out["cohens_d"])
assert math.isnan(out["hedges_g"])
assert out["interpretation"] == "undefined"
assert out["n_a"] == 0
assert out["n_b"] == 0
assert "note" in out
def test_un_grupo_vacio_no_lanza():
out = effect_size_cohens_d([1, 2, 3], [])
assert math.isnan(out["cohens_d"])
assert out["interpretation"] == "undefined"
assert out["n_b"] == 0
assert "note" in out
+29 -11
View File
@@ -3,19 +3,19 @@ name: fdr_correction
kind: function
lang: py
domain: datascience
version: "1.0.0"
version: "1.1.0"
purity: pure
signature: "def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = \"bh\") -> dict"
description: "Correccion de comparaciones multiples (multiple-testing) sobre una lista de p-valores: Benjamini-Hochberg (FDR, 'bh') o Bonferroni (FWER, 'bonferroni'). Antidoto al sesgo de mineria de datos (data-mining bias): al evaluar muchas hipotesis a la vez (todos los pares de una matriz), el azar produce falsos positivos; esta funcion ajusta los p-valores y marca cuales siguen siendo significativos tras corregir. Pura, sin dependencias externas, alineada 1:1 con la entrada (admite None en posiciones sin test)."
tags: [eda, statistics, multiple-testing, fdr, benjamini-hochberg, bonferroni, p-value, data-mining-bias, python]
description: "Correccion de comparaciones multiples (multiple-testing) sobre una lista de p-valores: Benjamini-Hochberg (FDR, 'bh'), Bonferroni (FWER, 'bonferroni') o Holm-Bonferroni (FWER step-down, 'holm', mas potente que Bonferroni simple). Antidoto al sesgo de mineria de datos (data-mining bias): al evaluar muchas hipotesis a la vez (todos los pares de una matriz), el azar produce falsos positivos; esta funcion ajusta los p-valores y marca cuales siguen siendo significativos tras corregir. Pura, sin dependencias externas, alineada 1:1 con la entrada (admite None en posiciones sin test)."
tags: [eda, statistics, multiple-testing, fdr, benjamini-hochberg, bonferroni, holm, holm-bonferroni, fwer, p-value, data-mining-bias, python]
params:
- name: pvalues
desc: "lista de p-valores (floats en [0, 1]). Se admiten None u otros valores no validos en posiciones sin test disponible; se propagan como None en la salida y no cuentan como prueba (m)."
- name: alpha
desc: "nivel de significancia objetivo tras la correccion (default 0.05). Para BH es el umbral del FDR; para Bonferroni, del FWER (tasa de error por familia)."
- name: method
desc: "'bh' = Benjamini-Hochberg (controla FDR, menos conservador, mas potencia); 'bonferroni' = controla FWER (mas conservador). Cualquier otro valor devuelve un dict con note."
output: "dict {p_values_adjusted: lista alineada con pvalues (float ajustado o None), reject: lista de bool (True = significativo tras corregir), n_tests: nº de p-valores validos (m), n_rejected: nº de hipotesis rechazadas, alpha: float aplicado, method: str}. Casos degenerados (vacio, sin p validos, metodo desconocido) anaden clave note. Nunca None ni excepcion."
desc: "'bh' = Benjamini-Hochberg (controla FDR, menos conservador, mas potencia); 'bonferroni' = controla FWER (mas conservador); 'holm' = Holm-Bonferroni (controla FWER, step-down, uniformemente mas potente que Bonferroni simple). Cualquier otro valor devuelve un dict con note."
output: "dict {p_values_adjusted: lista alineada con pvalues (float ajustado o None), reject: lista de bool (True = significativo tras corregir), n_tests: nº de p-valores validos (m), n_rejected: nº de hipotesis rechazadas, alpha: float aplicado, method: str ('bh' | 'bonferroni' | 'holm')}. Casos degenerados (vacio, sin p validos, metodo desconocido) anaden clave note. Nunca None ni excepcion."
uses_functions: []
uses_types: []
returns: []
@@ -23,7 +23,7 @@ returns_optional: false
error_type: ""
imports: [math]
tested: true
tests: ["test_bh_golden_rechaza_dos_de_tres", "test_bonferroni_mas_conservador_que_bh", "test_p_values_adjusted_alineados_y_en_rango", "test_none_se_propaga_alineado", "test_lista_vacia_devuelve_note", "test_solo_none_devuelve_note", "test_metodo_desconocido_devuelve_note", "test_todos_significativos"]
tests: ["test_bh_golden_rechaza_dos_de_tres", "test_bonferroni_mas_conservador_que_bh", "test_p_values_adjusted_alineados_y_en_rango", "test_none_se_propaga_alineado", "test_lista_vacia_devuelve_note", "test_solo_none_devuelve_note", "test_metodo_desconocido_devuelve_note", "test_todos_significativos", "test_holm_golden_rechaza_dos_de_cuatro", "test_holm_entre_bonferroni_y_bh", "test_none_se_propaga_alineado_holm", "test_lista_vacia_holm_devuelve_note"]
test_file_path: "python/functions/datascience/fdr_correction_test.py"
file_path: "python/functions/datascience/fdr_correction.py"
---
@@ -45,6 +45,13 @@ bon = fdr_correction(pvalues, alpha=0.05, method="bonferroni")
print(bon["reject"]) # -> [True, False, False]
print(bon["p_values_adjusted"]) # -> [0.03, 0.06, 1.0]
# Holm-Bonferroni (step-down): controla el FWER como Bonferroni pero es mas
# potente; rechaza al menos tanto como Bonferroni simple, nunca menos.
holm = fdr_correction([0.01, 0.04, 0.03, 0.005], alpha=0.05, method="holm")
print(holm["reject"]) # -> [True, False, False, True]
print(holm["p_values_adjusted"]) # -> [0.03, 0.06, 0.06, 0.02]
print(holm["n_rejected"]) # -> 2
# Posiciones sin test (None) se propagan alineadas: el llamador puede pasar la
# lista completa de pares y recuperar el mapeo 1:1.
mix = fdr_correction([0.001, None, 0.9])
@@ -61,8 +68,11 @@ combinaciones y se quede con las que "pasan". Sin corregir, con N pruebas y
alpha=0.05 esperas ~5% de falsos positivos *por azar*: cuantas mas pruebas, mas
correlaciones espurias. Llama a `fdr_correction` con todos los p-valores de la
familia y usa `reject` (no el umbral crudo) para decidir que es real. Usa `"bh"`
por defecto (mejor potencia); `"bonferroni"` cuando un falso positivo sea muy
costoso y prefieras maxima cautela.
por defecto (mejor potencia); `"holm"` (Holm-Bonferroni, FWER step-down) cuando
quieras controlar el FWER pero sin la perdida de potencia de Bonferroni simple
(rechaza al menos tanto como `"bonferroni"`, nunca menos); `"bonferroni"` cuando
un falso positivo sea muy costoso y prefieras la maxima cautela del metodo mas
simple.
## Gotchas
@@ -76,8 +86,16 @@ costoso y prefieras maxima cautela.
eso puedes pasar la lista completa de pares aunque algunos no tengan test.
- `n_tests` es el numero de p-valores **validos** (m), que puede ser menor que
`len(pvalues)` si hay `None`.
- BH y Bonferroni controlan cosas distintas: BH la tasa de falsos
descubrimientos (FDR), Bonferroni la probabilidad de *cualquier* falso
- BH controla cosa distinta que Bonferroni/Holm: BH la tasa de falsos
descubrimientos (FDR); Bonferroni y Holm la probabilidad de *cualquier* falso
positivo (FWER). No son intercambiables; elige segun el coste de equivocarte.
- `"holm"` y `"bonferroni"` controlan ambos el FWER, pero Holm es step-down y
uniformemente mas potente: rechaza al menos tantas hipotesis como Bonferroni
simple sobre el mismo set, nunca menos. Si controlas FWER, `"holm"` domina a
`"bonferroni"` salvo que necesites el ajuste mas simple por interpretabilidad.
- Metodo desconocido o lista vacia/sin p validos no lanzan: devuelven un dict
con `note`.
con `note`. Los metodos validos son `"bh"`, `"bonferroni"` y `"holm"`.
## Capability growth log
- v1.1.0 (2026-06-30) — añade method="holm" (Holm-Bonferroni step-down, FWER, más potente que Bonferroni simple).
+29 -9
View File
@@ -5,12 +5,15 @@ todos los pares de una matriz de asociacion), la probabilidad de obtener al meno
un falso positivo por azar crece con el numero de pruebas: es el sesgo de mineria
de datos (data-mining bias) descrito por Aronson en *Evidence-Based Technical
Analysis* (cap. 6). Esta funcion ajusta los p-valores para controlar ese sesgo
mediante dos metodos clasicos:
mediante tres metodos clasicos:
- Benjamini-Hochberg (``"bh"``): controla la tasa de falsos descubrimientos
(False Discovery Rate, FDR). Menos conservador, mas potencia estadistica.
- Bonferroni (``"bonferroni"``): controla la tasa de error por familia
(Family-Wise Error Rate, FWER). Mas conservador.
- Holm-Bonferroni (``"holm"``): controla el FWER como Bonferroni pero es un
procedimiento step-down uniformemente mas potente; rechaza al menos tantas
hipotesis como Bonferroni simple, nunca menos.
No usa dependencias externas: aritmetica de la libreria estandar.
"""
@@ -35,8 +38,9 @@ def _is_valid_p(v) -> bool:
def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> dict:
"""Corrige una lista de p-valores por comparaciones multiples.
Aplica Benjamini-Hochberg (FDR) o Bonferroni (FWER) sobre ``pvalues`` y
devuelve, alineado posicion a posicion con la entrada, el p-valor ajustado y
Aplica Benjamini-Hochberg (FDR), Bonferroni (FWER) o Holm-Bonferroni
(FWER, step-down) sobre ``pvalues`` y devuelve, alineado posicion a
posicion con la entrada, el p-valor ajustado y
si cada hipotesis se rechaza al nivel ``alpha`` tras la correccion. Las
posiciones cuyo valor no sea un p-valor valido (``None``, ``NaN``, fuera de
``[0, 1]`` o no numerico) se conservan en la salida como ``None`` /
@@ -53,8 +57,10 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
otros valores no validos en posiciones sin test disponible; se
propagan como ``None`` en la salida y no cuentan como prueba.
alpha: nivel de significancia objetivo tras la correccion (default 0.05).
Para BH es el umbral del FDR; para Bonferroni, del FWER.
method: ``"bh"`` (Benjamini-Hochberg, FDR) o ``"bonferroni"`` (FWER).
Para BH es el umbral del FDR; para Bonferroni y Holm, del FWER.
method: ``"bh"`` (Benjamini-Hochberg, FDR), ``"bonferroni"`` (FWER) o
``"holm"`` (Holm-Bonferroni, FWER step-down, mas potente que
Bonferroni simple).
Returns:
dict con las claves:
@@ -68,7 +74,7 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
n_tests: numero de p-valores validos usados en la correccion (m).
n_rejected: numero de hipotesis rechazadas (significativas).
alpha: nivel de significancia aplicado (float).
method: metodo aplicado (``"bh"`` o ``"bonferroni"``).
method: metodo aplicado (``"bh"``, ``"bonferroni"`` o ``"holm"``).
Casos degenerados (lista vacia, sin p-valores validos o metodo
desconocido) anaden ademas una clave ``note`` y devuelven listas
@@ -76,7 +82,7 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
en las posiciones invalidas).
"""
method_norm = (method or "").strip().lower()
if method_norm not in {"bh", "bonferroni"}:
if method_norm not in {"bh", "bonferroni", "holm"}:
n = len(pvalues)
return {
"p_values_adjusted": [None] * n,
@@ -86,8 +92,8 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
"alpha": float(alpha),
"method": method,
"note": (
f"metodo desconocido '{method}'; usa 'bh' (Benjamini-Hochberg) "
"o 'bonferroni'"
f"metodo desconocido '{method}'; usa 'bh' (Benjamini-Hochberg), "
"'bonferroni' o 'holm' (Holm-Bonferroni)"
),
}
@@ -129,6 +135,20 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
padj = min(1.0, p * m)
adjusted[orig_idx] = padj
reject[orig_idx] = padj <= a
elif method_norm == "holm":
# Holm-Bonferroni (step-down). Ordena p ascendente; para el rank k
# (1-indexed) el p ajustado crudo es (m - k + 1) * p_(k). Impon
# monotonicidad acumulada (no decreciente) recorriendo de menor a mayor:
# padj_(k) = max(padj_(k-1), min(1, (m-k+1)*p_(k))), con padj_(0)=0.
order = sorted(valid, key=lambda t: t[1]) # [(orig_idx, p), ...] por p asc
prev = 0.0
for k in range(1, m + 1):
orig_idx, p = order[k - 1]
raw = min(1.0, (m - k + 1) * p)
padj = max(prev, raw)
prev = padj
adjusted[orig_idx] = padj
reject[orig_idx] = padj <= a
else:
# Benjamini-Hochberg (step-up). Ordena p ascendente y calcula q-valores
# con la monotonicidad acumulada de derecha a izquierda.
@@ -82,7 +82,8 @@ def test_solo_none_devuelve_note():
def test_metodo_desconocido_devuelve_note():
out = fdr_correction([0.01, 0.02], method="holm")
# 'holm' ya es un metodo valido (v1.1.0); usamos uno realmente desconocido.
out = fdr_correction([0.01, 0.02], method="sidak")
assert "note" in out
assert out["n_rejected"] == 0
assert out["reject"] == [False, False]
@@ -97,3 +98,66 @@ def test_todos_significativos():
assert bon["n_rejected"] == 3
assert all(bh["reject"])
assert all(bon["reject"])
def test_holm_golden_rechaza_dos_de_cuatro():
# Holm-Bonferroni (step-down) sobre [0.01, 0.04, 0.03, 0.005], m=4, alpha=0.05.
# Ordenado ascendente: 0.005, 0.01, 0.03, 0.04.
# padj_(1) = 4*0.005 = 0.02
# padj_(2) = max(0.02, 3*0.01=0.03) = 0.03
# padj_(3) = max(0.03, 2*0.03=0.06) = 0.06
# padj_(4) = max(0.06, 1*0.04=0.04) = 0.06
# Mapeado al orden de entrada [0.01, 0.04, 0.03, 0.005]:
# 0.01 -> 0.03, 0.04 -> 0.06, 0.03 -> 0.06, 0.005 -> 0.02
out = fdr_correction([0.01, 0.04, 0.03, 0.005], alpha=0.05, method="holm")
assert out["method"] == "holm"
assert out["n_tests"] == 4
adj = out["p_values_adjusted"]
assert abs(adj[0] - 0.03) < 1e-9
assert abs(adj[1] - 0.06) < 1e-9
assert abs(adj[2] - 0.06) < 1e-9
assert abs(adj[3] - 0.02) < 1e-9
assert out["reject"] == [True, False, False, True]
assert out["n_rejected"] == 2
def test_holm_entre_bonferroni_y_bh():
# Holm controla FWER como Bonferroni pero es step-down: rechaza AL MENOS
# tanto como Bonferroni simple, y a lo sumo tanto como BH (FDR, menos
# conservador). Cadena de potencia: bonferroni <= holm <= bh.
pvalues = [0.01, 0.02, 0.04, 0.005]
bon = fdr_correction(pvalues, alpha=0.05, method="bonferroni")
holm = fdr_correction(pvalues, alpha=0.05, method="holm")
bh = fdr_correction(pvalues, alpha=0.05, method="bh")
assert holm["n_rejected"] >= bon["n_rejected"]
assert holm["n_rejected"] <= bh["n_rejected"]
# En este set Holm gana potencia frente a Bonferroni simple (estricto).
assert holm["n_rejected"] > bon["n_rejected"]
# Un set donde Holm es estrictamente mas conservador que BH.
pvals2 = [0.01, 0.02, 0.03, 0.04]
bon2 = fdr_correction(pvals2, alpha=0.05, method="bonferroni")
holm2 = fdr_correction(pvals2, alpha=0.05, method="holm")
bh2 = fdr_correction(pvals2, alpha=0.05, method="bh")
assert holm2["n_rejected"] >= bon2["n_rejected"]
assert holm2["n_rejected"] < bh2["n_rejected"]
def test_none_se_propaga_alineado_holm():
# None se propaga alineado tambien con holm: la posicion central no cuenta
# como prueba (m=2) y se devuelve como None / False.
out = fdr_correction([0.001, None, 0.9], method="holm")
assert out["n_tests"] == 2
assert out["p_values_adjusted"][1] is None
assert out["reject"][1] is False
assert out["reject"][0] is True
assert len(out["reject"]) == 3
def test_lista_vacia_holm_devuelve_note():
out = fdr_correction([], method="holm")
assert out["p_values_adjusted"] == []
assert out["reject"] == []
assert out["n_tests"] == 0
assert out["n_rejected"] == 0
assert "note" in out
@@ -0,0 +1,100 @@
---
name: preregister_hypothesis
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def preregister_hypothesis(paper_dir: str, hypotheses: dict, analysis_plan: dict) -> dict"
description: "Pre-registra (congela) la hipotesis y el plan de analisis de un paper ANTES de mirar los datos: antidoto al HARKing (Hypothesizing After the Results are Known). Escribe/actualiza <paper_dir>/preregistration.md con un frontmatter (paper_slug, frozen_at, content_hash, status) y un cuerpo markdown DETERMINISTA derivado de (hypotheses, analysis_plan) (mismo input -> mismo cuerpo byte a byte, claves ordenadas alfabeticamente). El content_hash es sha256 del cuerpo NORMALIZADO (strip por linea + colapso de blancos), nunca del frontmatter. Una vez status=frozen es INMUTABLE: re-congelar con el mismo contenido es idempotente (no reescribe, devuelve unchanged) y re-congelar con contenido distinto se RECHAZA (no sobrescribe, devuelve error) para que no se pueda ajustar la hipotesis a los resultados. Estilo dict-no-throw: nunca lanza."
tags: [papers, preregistration, reproducibility, anti-harking, python]
params:
- name: paper_dir
desc: "ruta del directorio del paper, p.ej. 'papers/0001-mi-paper'. Debe existir (no se crea aqui). El paper_slug del frontmatter es el basename del dir. Si no existe o no es str -> {status:error, path, note} sin crash ni creacion."
- name: hypotheses
desc: "dict de hipotesis, p.ej. {'h0': 'no hay diferencia ...', 'h1': 'el grupo A > grupo B ...'}. Se renderiza en la seccion '## Hypotheses' con una linea por clave, ordenadas alfabeticamente para determinismo."
- name: analysis_plan
desc: "dict con el plan de analisis, p.ej. {'test': 'welch_t_test', 'effect_size_metric': 'cohens_d', 'decision_rule': 'rechazar H0 si p<0.05 tras Holm y |d|>=0.5', 'planned_n': 100, 'multiple_correction': 'holm'}. Se renderiza en '## Analysis plan' con una linea por clave (ordenadas alfabeticamente). Acepta valores no-str (int, etc.)."
output: "dict dict-no-throw (NUNCA lanza). status='frozen' cuando escribe el archivo por primera vez o congela un draft previo ({status, path, content_hash, frozen_at}). status='unchanged' cuando ya estaba frozen con el mismo content_hash: no reescribe y preserva el archivo byte-identico incl. el frozen_at original ({status, path, content_hash, frozen_at}). status='error' cuando paper_dir no existe, ya esta frozen con un hash distinto (rechazo anti-HARKing, no sobrescribe), inputs invalidos o error de I/O ({status, path, note, [content_hash]})."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [hashlib]
tested: true
tests: ["test_golden_congela_y_escribe_archivo", "test_idempotente_mismo_input_no_reescribe", "test_inmutabilidad_anti_harking_rechaza_contenido_distinto", "test_error_paper_dir_inexistente_no_crash_no_crea"]
test_file_path: "python/functions/datascience/preregister_hypothesis_test.py"
file_path: "python/functions/datascience/preregister_hypothesis.py"
---
## Ejemplo
```python
import os, tempfile
from datascience import preregister_hypothesis
# Un directorio de paper que ya existe.
paper_dir = tempfile.mkdtemp(prefix="0001-")
hypotheses = {
"h0": "no hay diferencia entre el grupo A y el grupo B",
"h1": "el grupo A tiene mayor conversion que el grupo B",
}
analysis_plan = {
"test": "welch_t_test",
"effect_size_metric": "cohens_d",
"decision_rule": "rechazar H0 si p<0.05 tras Holm y |d|>=0.5",
"planned_n": 100,
"multiple_correction": "holm",
}
# 1) Primera vez: congela y escribe <paper_dir>/preregistration.md
r1 = preregister_hypothesis(paper_dir, hypotheses, analysis_plan)
print(r1["status"]) # -> "frozen"
print(r1["content_hash"]) # sha256 del cuerpo
# 2) Mismo input: idempotente, no reescribe.
r2 = preregister_hypothesis(paper_dir, hypotheses, analysis_plan)
print(r2["status"]) # -> "unchanged"
# 3) Cambiar la hipotesis tras congelar (HARKing): rechazado, archivo intacto.
r3 = preregister_hypothesis(paper_dir, {"h0": "...", "h1": "otra cosa"}, analysis_plan)
print(r3["status"]) # -> "error"
```
## Cuando usarla
Llamala al ARRANCAR el analisis de un paper, antes de tocar los datos, para
dejar por escrito (y firmado por hash) que vas a probar y como vas a decidir.
Es el primer paso de un flujo reproducible: pre-registras la hipotesis y el plan
(`test`, `effect_size_metric`, `decision_rule`, `planned_n`,
`multiple_correction`), y solo despues corres el analisis y comparas con lo
pre-registrado. Si mas tarde el analisis "descubre" otra hipotesis que encaja
mejor con los datos, el pre-registro congelado deja en evidencia el cambio: no se
puede reescribir. Combinala con `effect_size_cohens_d` y `fdr_correction` para
cerrar el plan declarado (effect size + correccion de multiples comparaciones).
## Gotchas
- **Inmutabilidad (el corazon)**: una vez `status: frozen`, el pre-registro NO se
puede editar. Re-congelar con el MISMO contenido es idempotente (`unchanged`,
no reescribe, preserva incluso el `frozen_at` original). Re-congelar con
contenido DISTINTO devuelve `error` y deja el archivo intacto: asi se mata el
HARKing. Para cambiar de verdad la hipotesis hay que borrar el archivo a mano y
asumir explicitamente que ya no es un pre-registro valido.
- **dict-no-throw**: la funcion NUNCA lanza. Cualquier error previsible
(directorio inexistente, inputs no-dict, fallo de I/O, excepcion inesperada) se
captura y se devuelve como `{"status": "error", "note": ...}`. Siempre incluye
`path` (la ruta esperada del `preregistration.md`).
- **El hash es SOLO del cuerpo, nunca del frontmatter**: el frontmatter contiene
el propio `content_hash` y el `frozen_at` (timestamp), asi que incluirlos en el
hash seria circular y romperia la idempotencia. El cuerpo se normaliza antes de
hashear (strip por linea + colapso de lineas en blanco + strip final): cambios
irrelevantes de whitespace no alteran el hash, pero cambios de contenido SI.
- **Determinismo**: el cuerpo se genera con las claves de `hypotheses` y
`analysis_plan` ordenadas alfabeticamente, de modo que el orden de insercion del
dict no afecta al hash. Mismo `(hypotheses, analysis_plan)` -> mismo cuerpo y
mismo hash, byte a byte.
- **No crea el directorio del paper**: si `paper_dir` no existe, devuelve `error`
sin crear nada (ni el dir ni el archivo).
@@ -0,0 +1,202 @@
"""Congela (pre-registra) la hipotesis y el plan de analisis de un paper.
Anti-HARKing (Hypothesizing After the Results are Known): el pre-registro fija
la hipotesis y el plan de analisis ANTES de mirar los datos. Una vez congelado
(``status: frozen``) es INMUTABLE: cualquier intento posterior de re-congelar con
un contenido distinto se RECHAZA en vez de sobrescribir, de modo que no se puede
"ajustar" la hipotesis a los resultados despues de verlos.
Escribe/actualiza ``<paper_dir>/preregistration.md`` con un frontmatter
(``paper_slug``, ``frozen_at``, ``content_hash``, ``status``) y un cuerpo
markdown DETERMINISTA derivado de ``(hypotheses, analysis_plan)``.
Estilo dict-no-throw: NUNCA lanza; cualquier error previsible se captura y se
devuelve como ``{"status": "error", "note": ...}``.
"""
import hashlib
import os
from datetime import datetime, timezone
def _build_body(hypotheses: dict, analysis_plan: dict) -> str:
"""Construye el cuerpo markdown del pre-registro de forma DETERMINISTA.
Mismo ``(hypotheses, analysis_plan)`` -> mismo cuerpo byte a byte. Las claves
se ordenan alfabeticamente para no depender del orden de insercion del dict.
"""
lines = ["## Hypotheses", ""]
for k in sorted(hypotheses.keys()):
lines.append(f"- **{k}**: {hypotheses[k]}")
lines.append("")
lines.append("## Analysis plan")
lines.append("")
for k in sorted(analysis_plan.keys()):
lines.append(f"- **{k}**: {analysis_plan[k]}")
return "\n".join(lines)
def _normalize(body: str) -> str:
"""Normaliza el cuerpo para el hash: strip por linea + colapsa blancos.
Cambios irrelevantes de whitespace (espacios al final, dobles lineas en
blanco) no alteran el hash; cambios de contenido SI. Esto hace el hash
robusto sin perder la capacidad de detectar ediciones reales.
"""
out = []
prev_blank = False
for raw in body.splitlines():
line = raw.strip()
if line == "":
if prev_blank:
continue
prev_blank = True
else:
prev_blank = False
out.append(line)
return "\n".join(out).strip()
def _content_hash(body: str) -> str:
"""sha256 hex del cuerpo NORMALIZADO (nunca del frontmatter)."""
return hashlib.sha256(_normalize(body).encode("utf-8")).hexdigest()
def _parse_frontmatter(text: str) -> dict:
"""Parsea el frontmatter ``--- ... ---`` simple (key: value) de un .md."""
if not text.startswith("---"):
return {}
parts = text.split("---", 2)
if len(parts) < 3:
return {}
fm = {}
for line in parts[1].splitlines():
line = line.strip()
if not line or ":" not in line:
continue
key, _, value = line.partition(":")
fm[key.strip()] = value.strip()
return fm
def _render_file(slug: str, frozen_at: str, content_hash: str, body: str) -> str:
"""Compone el archivo completo: frontmatter frozen + cuerpo."""
return (
"---\n"
f"paper_slug: {slug}\n"
f"frozen_at: {frozen_at}\n"
f"content_hash: {content_hash}\n"
"status: frozen\n"
"---\n"
"\n"
f"{body}\n"
)
def preregister_hypothesis(paper_dir: str, hypotheses: dict, analysis_plan: dict) -> dict:
"""Congela la hipotesis y el plan de analisis de un paper (anti-HARKing).
Escribe ``<paper_dir>/preregistration.md`` con frontmatter ``status: frozen``
y un cuerpo markdown determinista. Una vez congelado es inmutable.
Args:
paper_dir: ruta del directorio del paper (p.ej. ``"papers/0001-mi-paper"``).
El ``paper_slug`` es el basename del directorio. Debe existir.
hypotheses: dict de hipotesis, p.ej.
``{"h0": "no hay diferencia ...", "h1": "grupo A > grupo B ..."}``.
analysis_plan: dict con el plan, p.ej.
``{"test": "welch_t_test", "effect_size_metric": "cohens_d",
"decision_rule": "...", "planned_n": 100, "multiple_correction": "holm"}``.
Returns:
dict dict-no-throw (NUNCA lanza). Claves segun el caso:
- frozen: {"status": "frozen", "path", "content_hash", "frozen_at"}
- unchanged: {"status": "unchanged", "path", "content_hash", "frozen_at"}
- error: {"status": "error", "path", "note", ...}
"""
expected_path = os.path.join(paper_dir, "preregistration.md")
try:
# 1) El directorio del paper debe existir; no se crea aqui.
if not isinstance(paper_dir, str) or not os.path.isdir(paper_dir):
return {
"status": "error",
"path": expected_path,
"note": f"paper_dir no existe: {paper_dir}",
}
if not isinstance(hypotheses, dict) or not isinstance(analysis_plan, dict):
return {
"status": "error",
"path": expected_path,
"note": "hypotheses y analysis_plan deben ser dict",
}
slug = os.path.basename(os.path.normpath(paper_dir))
# 2) + 3) Cuerpo determinista y su hash (solo del cuerpo, no del frontmatter).
body = _build_body(hypotheses, analysis_plan)
new_hash = _content_hash(body)
# 5) Logica de escritura.
if os.path.exists(expected_path):
existing = ""
try:
with open(expected_path, "r", encoding="utf-8") as fh:
existing = fh.read()
except OSError as exc:
return {
"status": "error",
"path": expected_path,
"note": f"no se pudo leer el pre-registro existente: {exc}",
}
fm = _parse_frontmatter(existing)
old_status = fm.get("status", "")
old_hash = fm.get("content_hash", "")
old_frozen_at = fm.get("frozen_at", "")
if old_status == "frozen":
if old_hash == new_hash:
# Idempotente: mismo contenido ya congelado. No se reescribe.
return {
"status": "unchanged",
"path": expected_path,
"content_hash": new_hash,
"frozen_at": old_frozen_at,
}
# Inmutabilidad: ya congelado con OTRO hash -> se rechaza (anti-HARKing).
return {
"status": "error",
"path": expected_path,
"content_hash": new_hash,
"note": (
"pre-registro inmutable: ya esta congelado (frozen) con un "
"hash distinto; un pre-registro no se puede editar tras "
"congelarse"
),
}
# status != "frozen" (p.ej. draft) -> se congela ahora.
# Archivo nuevo o draft existente: congelar con timestamp actual.
frozen_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
file_text = _render_file(slug, frozen_at, new_hash, body)
try:
with open(expected_path, "w", encoding="utf-8") as fh:
fh.write(file_text)
except OSError as exc:
return {
"status": "error",
"path": expected_path,
"note": f"no se pudo escribir el pre-registro: {exc}",
}
return {
"status": "frozen",
"path": expected_path,
"content_hash": new_hash,
"frozen_at": frozen_at,
}
except Exception as exc: # noqa: BLE001 - dict-no-throw: nunca propagar.
return {
"status": "error",
"path": expected_path,
"note": f"error inesperado: {exc}",
}
@@ -0,0 +1,99 @@
"""Tests para preregister_hypothesis (pre-registro inmutable, anti-HARKing).
Importa el modulo hoja directamente (`preregister_hypothesis`) para no depender
de que el paquete reexporte la funcion en su __init__ (lo integra el orquestador
al cerrar el grupo papers). El pytest del repo resuelve el modulo hoja por su
nombre directo.
Todos los tests son hermeticos y deterministas: usan el fixture `tmp_path` de
pytest; NUNCA escriben en `papers/`.
"""
from preregister_hypothesis import preregister_hypothesis
def _parse_frontmatter(text: str) -> dict:
parts = text.split("---", 2)
fm = {}
for line in parts[1].splitlines():
line = line.strip()
if not line or ":" not in line:
continue
key, _, value = line.partition(":")
fm[key.strip()] = value.strip()
return fm
HYP = {"h0": "no hay diferencia entre A y B", "h1": "el grupo A > grupo B"}
PLAN = {
"test": "welch_t_test",
"effect_size_metric": "cohens_d",
"decision_rule": "rechazar H0 si p<0.05 tras Holm y |d|>=0.5",
"planned_n": 100,
"multiple_correction": "holm",
}
def test_golden_congela_y_escribe_archivo(tmp_path):
paper = tmp_path / "0001-x"
paper.mkdir()
res = preregister_hypothesis(str(paper), HYP, PLAN)
assert res["status"] == "frozen"
pre = paper / "preregistration.md"
assert pre.exists()
text = pre.read_text(encoding="utf-8")
fm = _parse_frontmatter(text)
assert fm["status"] == "frozen"
assert fm["paper_slug"] == "0001-x"
assert fm["content_hash"] # no vacio
assert fm["frozen_at"] # no vacio
assert res["content_hash"] == fm["content_hash"]
assert res["frozen_at"] == fm["frozen_at"]
def test_idempotente_mismo_input_no_reescribe(tmp_path):
paper = tmp_path / "0001-x"
paper.mkdir()
pre = paper / "preregistration.md"
first = preregister_hypothesis(str(paper), HYP, PLAN)
assert first["status"] == "frozen"
bytes_before = pre.read_bytes()
second = preregister_hypothesis(str(paper), HYP, PLAN)
assert second["status"] == "unchanged"
# Mismo hash y frozen_at original preservado.
assert second["content_hash"] == first["content_hash"]
assert second["frozen_at"] == first["frozen_at"]
# El archivo NO cambio byte a byte (incl. frozen_at).
assert pre.read_bytes() == bytes_before
def test_inmutabilidad_anti_harking_rechaza_contenido_distinto(tmp_path):
paper = tmp_path / "0001-x"
paper.mkdir()
pre = paper / "preregistration.md"
preregister_hypothesis(str(paper), HYP, PLAN)
bytes_frozen = pre.read_bytes()
# Intento de re-congelar con una hipotesis DISTINTA (HARKing) -> rechazado.
hyp_tramposo = {"h0": "no hay diferencia", "h1": "el grupo B > grupo A (cambiado tras ver datos)"}
res = preregister_hypothesis(str(paper), hyp_tramposo, PLAN)
assert res["status"] == "error"
# Asercion mas importante: el archivo en disco SIGUE siendo el original.
assert pre.read_bytes() == bytes_frozen
def test_error_paper_dir_inexistente_no_crash_no_crea(tmp_path):
missing = tmp_path / "no-existe"
res = preregister_hypothesis(str(missing), HYP, PLAN)
assert res["status"] == "error"
# No se creo el directorio ni el archivo.
assert not missing.exists()
assert not (missing / "preregistration.md").exists()
-2
View File
@@ -34,7 +34,6 @@ from .upsert_xlsx_sheet import upsert_xlsx_sheet
from .duckdb_query_readonly import duckdb_query_readonly
from .duckdb_execute import duckdb_execute
from .duckdb_upsert import duckdb_upsert
from .load_folder_to_duckdb import load_folder_to_duckdb
from .imap_connect import imap_connect
from .imap_list_mailboxes import imap_list_mailboxes
from .imap_search import imap_search
@@ -51,7 +50,6 @@ __all__ = [
"upsert_xlsx_sheet",
"duckdb_query_readonly",
"duckdb_execute",
"load_folder_to_duckdb",
"duckdb_upsert",
"pg_insert_rows",
"pg_apply_sql",
@@ -1,100 +0,0 @@
---
name: load_folder_to_duckdb
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def load_folder_to_duckdb(folder: str, db_path: str = None, pattern: str = '*.csv,*.parquet,*.json') -> dict"
description: "Escanea el primer nivel de una CARPETA buscando archivos tabulares (CSV/TSV/TXT, Parquet, JSON/NDJSON) y los carga como tablas en una base DuckDB usando los lectores nativos read_csv_auto/read_parquet/read_json_auto. Es la pieza de entrada del EDA a nivel de carpeta (grupo eda). Por cada archivo crea una tabla cuyo nombre se deriva del basename saneado a [0-9a-zA-Z_] en minusculas (prefijo t_ si empieza por digito, sufijos _2/_3 ante colisiones, tabla_<i> si queda vacio). El path se escapa (comilla simple '->'') antes de interpolarlo porque los lectores DuckDB no aceptan el path como parametro posicional. Glob NO recursivo: un glob.glob(os.path.join(folder, g)) por cada patron del CSV, dedup y ordenado. db_path=None genera una DuckDB temporal (mkstemp, se borra el placeholder vacio porque DuckDB rechaza un archivo de 0 bytes) y devuelve su ruta. Un fallo al cargar un archivo concreto no aborta el resto: se registra en errors y se continua. Devuelve siempre un dict sin lanzar (estilo del grupo duckdb): {status:'ok', db_path, tables, errors} en exito (carpeta sin archivos tabulares incluida, tables=[]) y {status:'error', error} cuando la carpeta no existe o falla algo global. Depende del paquete duckdb (1.5.2)."
tags: [eda, duckdb, ingest, etl, folder]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_py_core"
imports: [glob, os, re, tempfile, duckdb]
params:
- name: folder
desc: "ruta a un directorio. Se escanea solo su primer nivel (NO recursivo). Si no existe o no es un directorio devuelve {status:'error'} sin lanzar."
- name: db_path
desc: "ruta del archivo DuckDB destino, abierto en modo read-write (lo crea si no existe). None (default) genera una DuckDB temporal unica con tempfile.mkstemp y devuelve su ruta en el campo db_path del retorno. DuckDB es single-writer: si otro proceso lo tiene abierto en escritura, connect falla con error de lock devuelto en el dict."
- name: pattern
desc: "CSV de globs separados por coma (default '*.csv,*.parquet,*.json'). Cada glob se aplica con glob.glob(os.path.join(folder, g)) sobre el primer nivel de folder; los resultados de todos los globs se deduplican y ordenan. Los globs con ** NO descienden recursivamente (glob.glob sin recursive=True)."
output: "dict. En exito: {status:'ok', db_path:str (ruta DuckDB usada), tables:[{name:str, source_file:str, n_rows:int}], errors:[{name?:str, source_file:str, error:str}]}. La carpeta sin archivos tabulares es un exito con tables=[] y errors=[]. En error (sin lanzar): {status:'error', error:str}."
tested: true
tests:
- "test_carga_dos_csv_como_tablas"
- "test_db_path_none_crea_temporal"
- "test_carpeta_vacia_es_ok_sin_tablas"
- "test_carpeta_inexistente_devuelve_status_error"
test_file_path: "python/functions/infra/load_folder_to_duckdb_test.py"
file_path: "python/functions/infra/load_folder_to_duckdb.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from infra.load_folder_to_duckdb import load_folder_to_duckdb
# Preparar una carpeta de demo con dos CSV.
import os
os.makedirs("/tmp/eda_folder_demo", exist_ok=True)
with open("/tmp/eda_folder_demo/ventas.csv", "w") as f:
f.write("id,total\n1,10.5\n2,20.0\n3,5.25\n")
with open("/tmp/eda_folder_demo/clientes.csv", "w") as f:
f.write("id,nombre\n1,ana\n2,luis\n")
# Cargar todos los tabulares de la carpeta a una DuckDB temporal.
res = load_folder_to_duckdb("/tmp/eda_folder_demo")
print(res["status"]) # ok
print(res["db_path"]) # /tmp/tmpXXXXXXXX.duckdb (temporal)
for t in res["tables"]:
print(t["name"], t["n_rows"]) # ventas 3 / clientes 2
# Persistir en una DuckDB concreta y limitar a CSV.
res2 = load_folder_to_duckdb(
"/tmp/eda_folder_demo",
db_path="/tmp/eda_folder_demo/folder.duckdb",
pattern="*.csv",
)
print(res2["tables"]) # [{'name': 'clientes', ...}, {'name': 'ventas', ...}]
```
## Cuando usarla
Cuando tienes una carpeta de datos sueltos (un dump, un export, varios CSV/Parquet
descargados) y quieres analizarlos juntos con SQL sin montar la ingesta a mano,
archivo por archivo. Es el primer eslabon del EDA a nivel de carpeta (grupo `eda`):
deja una DuckDB con una tabla por archivo, lista para perfilar con
`duckdb_table_schema_py_infra`, consultar con `duckdb_query_readonly_py_infra`, o
correlacionar aguas abajo. Usala antes de cualquier paso de perfilado cuando la
unidad de trabajo es "todos los archivos de este directorio".
## Gotchas
- **Glob NO recursivo**: solo se escanea el primer nivel de `folder`. Archivos en
subdirectorios se ignoran (ni siquiera con `**` en el patron, porque
`glob.glob` se llama sin `recursive=True`). Si necesitas recursion, aplana la
carpeta antes o amplia la funcion.
- **Saneo de nombres de tabla**: el basename se reduce a `[0-9a-zA-Z_]` en
minusculas. `Ventas 2024.csv` -> tabla `ventas_2024`. Dos archivos distintos
pueden sanear al mismo nombre (`a-b.csv` y `a_b.csv`); el segundo se desambigua
con sufijo `_2`, `_3`, ... El mapeo real archivo->tabla esta en `tables[].name`
/ `tables[].source_file`, no lo asumas.
- **`read_json_auto` requiere JSON tabular** (array de objetos u objetos NDJSON
homogeneos). Un JSON anidado o irregular puede fallar la carga de ESA tabla; el
error se registra en `errors` y el resto de archivos siguen cargandose.
- **Extension desconocida = se salta**, no falla: queda anotada en `errors` con
`unsupported extension`. Mapeo de lectores: `.csv/.tsv/.txt`->`read_csv_auto`,
`.parquet/.pq`->`read_parquet`, `.json/.ndjson`->`read_json_auto`.
- **Escritura real en disco (impura)**. DuckDB es single-writer: si otro proceso
tiene `db_path` abierto en escritura, `connect` falla con error de lock devuelto
en el dict. Un `db_path` con un directorio padre inexistente tambien falla.
- **`db_path=None` crea un archivo temporal que NO se borra solo**: la ruta se
devuelve en `db_path` para que el llamador la consuma y la limpie cuando termine.
- **Tipos inferidos por los lectores `_auto`**: los tipos de columna los infiere
DuckDB. Revisa el schema con `duckdb_table_schema_py_infra` si el tipado importa
aguas abajo.
@@ -1,175 +0,0 @@
"""Carga una carpeta de archivos tabulares (CSV/Parquet/JSON) como tablas DuckDB.
Funcion impura: escanea el primer nivel de un directorio buscando archivos que
casen con uno o varios globs, y por cada archivo crea una tabla en una base
DuckDB usando los lectores nativos (`read_csv_auto`, `read_parquet`,
`read_json_auto`). Es la pieza de entrada del EDA a nivel de carpeta (grupo
`eda`): deja una DuckDB con una tabla por archivo, lista para perfilar y
correlacionar aguas abajo.
Devuelve siempre un dict sin lanzar excepciones, siguiendo el estilo del grupo
duckdb del registry: {status:'ok', db_path, tables, errors} en exito (incluida
la carpeta sin archivos tabulares, que es un exito con tables=[]) y
{status:'error', error:str} cuando la carpeta no existe o falla algo global.
El nombre de cada tabla se deriva del basename del archivo, saneado a
`[0-9a-zA-Z_]` en minusculas, prefijado con `t_` si empieza por digito, y
desambiguado con sufijos `_2`, `_3`, ... ante colisiones. El path del archivo se
escapa (comilla simple, `'`->`''`) antes de interpolarlo en el SQL del lector,
ya que los lectores DuckDB no admiten el path como parametro posicional. Un fallo
al cargar un archivo concreto NO aborta el resto: se registra en `errors` y se
continua con los siguientes.
"""
import glob
import os
import re
import tempfile
def _sanitize_table_name(basename_no_ext: str, index: int) -> str:
"""Deriva un identificador de tabla valido desde el basename de un archivo.
Reemplaza todo lo que no sea ``[0-9a-zA-Z_]`` por ``_`` y baja a minusculas.
Si tras el saneo queda vacio, usa ``tabla_<index>``. Si empieza por digito,
prefija ``t_`` para que sea un identificador SQL valido.
"""
name = re.sub(r"[^0-9a-zA-Z_]", "_", basename_no_ext).lower()
if not name:
name = f"tabla_{index}"
if name[0].isdigit():
name = "t_" + name
return name
def _reader_for_extension(ext: str, quoted_path: str):
"""Devuelve la expresion de lector DuckDB para una extension, o None.
El ``quoted_path`` ya viene escapado y entre comillas simples. Extensiones
desconocidas devuelven None para que el llamador salte el archivo.
"""
ext = ext.lower()
if ext in (".csv", ".tsv", ".txt"):
return f"read_csv_auto('{quoted_path}')"
if ext in (".parquet", ".pq"):
return f"read_parquet('{quoted_path}')"
if ext in (".json", ".ndjson"):
return f"read_json_auto('{quoted_path}')"
return None
def load_folder_to_duckdb(
folder: str,
db_path: str = None,
pattern: str = "*.csv,*.parquet,*.json",
) -> dict:
"""Carga los archivos tabulares de una carpeta como tablas en una DuckDB.
Args:
folder: ruta a un directorio. Si no existe o no es un directorio,
devuelve {status:'error', ...} sin lanzar.
db_path: ruta de la DuckDB destino (read-write, se crea si no existe). Si
es None, se genera una base temporal con NamedTemporaryFile y su ruta
se devuelve en el retorno (`db_path`).
pattern: CSV de globs separados por coma (default
"*.csv,*.parquet,*.json"). Cada glob se aplica con
glob.glob(os.path.join(folder, g)) en el primer nivel (NO recursivo);
los resultados se deduplican y ordenan.
Returns:
dict. En exito: {status:'ok', db_path:str, tables:[{name, source_file,
n_rows}], errors:[{name?, source_file, error}]}. La carpeta sin archivos
tabulares es un exito con tables=[] y errors=[]. En error (sin lanzar):
{status:'error', error:str}.
"""
if not isinstance(folder, str) or not os.path.isdir(folder):
return {
"status": "error",
"error": f"folder does not exist or is not a directory: {folder!r}",
}
conn = None
try:
# Resolver la ruta de la DuckDB destino. Si no se da, reservar un nombre
# temporal unico y borrar el archivo vacio que crea mkstemp: DuckDB 1.5.2
# rechaza abrir un archivo de 0 bytes ("not a valid DuckDB database
# file"), por lo que debe crear el archivo el mismo desde cero.
if db_path is None:
fd, tmp_name = tempfile.mkstemp(suffix=".duckdb")
os.close(fd)
os.remove(tmp_name)
db_path = tmp_name
# Resolver los archivos: un glob por cada patron, dedup + orden estable.
globs = [g.strip() for g in pattern.split(",") if g.strip()]
found = set()
for g in globs:
for path in glob.glob(os.path.join(folder, g)):
if os.path.isfile(path):
found.add(path)
files = sorted(found)
conn = __import__("duckdb").connect(db_path)
tables = []
errors = []
used_names = set()
for i, path in enumerate(files):
base = os.path.basename(path)
stem, ext = os.path.splitext(base)
quoted_path = path.replace("'", "''")
reader = _reader_for_extension(ext, quoted_path)
if reader is None:
errors.append(
{
"source_file": path,
"error": f"unsupported extension: {ext!r}",
}
)
continue
name = _sanitize_table_name(stem, i)
# Desambiguar colisiones con sufijos _2, _3, ...
if name in used_names:
suffix = 2
while f"{name}_{suffix}" in used_names:
suffix += 1
name = f"{name}_{suffix}"
quoted_ident = '"' + name.replace('"', '""') + '"'
try:
conn.execute(
f"CREATE TABLE {quoted_ident} AS SELECT * FROM {reader}"
)
n_rows = conn.execute(
f"SELECT count(*) FROM {quoted_ident}"
).fetchone()[0]
used_names.add(name)
tables.append(
{
"name": name,
"source_file": path,
"n_rows": int(n_rows),
}
)
except Exception as e: # noqa: BLE001
errors.append(
{
"name": name,
"source_file": path,
"error": str(e),
}
)
return {
"status": "ok",
"db_path": db_path,
"tables": tables,
"errors": errors,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
finally:
if conn is not None:
conn.close()
@@ -1,73 +0,0 @@
"""Tests para load_folder_to_duckdb."""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import duckdb # noqa: E402
from load_folder_to_duckdb import load_folder_to_duckdb # noqa: E402
def _write_csv(path: str, header: str, rows: list[str]) -> None:
with open(path, "w", encoding="utf-8") as f:
f.write(header + "\n")
for r in rows:
f.write(r + "\n")
def test_carga_dos_csv_como_tablas(tmp_path):
_write_csv(
str(tmp_path / "ventas.csv"),
"id,total",
["1,10.5", "2,20.0", "3,5.25"],
)
_write_csv(
str(tmp_path / "clientes.csv"),
"id,nombre",
["1,ana", "2,luis"],
)
db = tmp_path / "out.duckdb"
res = load_folder_to_duckdb(str(tmp_path), str(db))
assert res["status"] == "ok", res
assert res["errors"] == []
assert len(res["tables"]) == 2
assert res["db_path"] == str(db)
assert os.path.exists(str(db))
by_name = {t["name"]: t for t in res["tables"]}
assert by_name["ventas"]["n_rows"] == 3
assert by_name["clientes"]["n_rows"] == 2
# Verificar que las tablas existen realmente en la base.
con = duckdb.connect(str(db), read_only=True)
assert con.execute("SELECT count(*) FROM ventas").fetchone()[0] == 3
assert con.execute("SELECT count(*) FROM clientes").fetchone()[0] == 2
con.close()
def test_db_path_none_crea_temporal(tmp_path):
_write_csv(str(tmp_path / "datos.csv"), "x", ["1", "2"])
res = load_folder_to_duckdb(str(tmp_path))
assert res["status"] == "ok", res
assert res["db_path"]
assert os.path.exists(res["db_path"])
assert len(res["tables"]) == 1
assert res["tables"][0]["n_rows"] == 2
os.remove(res["db_path"])
def test_carpeta_vacia_es_ok_sin_tablas(tmp_path):
db = tmp_path / "out.duckdb"
res = load_folder_to_duckdb(str(tmp_path), str(db))
assert res["status"] == "ok", res
assert res["tables"] == []
assert res["errors"] == []
def test_carpeta_inexistente_devuelve_status_error(tmp_path):
res = load_folder_to_duckdb(str(tmp_path / "no_existe"))
assert res["status"] == "error"
assert "folder" in res["error"]
@@ -1,115 +0,0 @@
---
name: render_automatic_eda_folder
kind: pipeline
lang: py
domain: pipelines
purity: impure
version: "1.0.0"
signature: "def render_automatic_eda_folder(path: str, out_dir: str = \"reports\", basename: str = None, profile_level: str = \"standard\", emit_pdf: bool = True, emit_pptx: bool = True, emit_md: bool = True, per_table_eda: bool = False, min_inclusion: float = 0.9, ctx_extra: dict = None) -> dict"
description: "Informe AutomaticEDA a nivel de BASE one-shot de una CARPETA de archivos tabulares (CSV/Parquet/JSON) o de una DuckDB existente. Carga la carpeta a una DuckDB temporal con load_folder_to_duckdb (o usa la DuckDB dada directa), perfila TODA la base con profile_database (resumen de cada tabla + FK candidatas por containment + join graph con diagrama Mermaid), ENSAMBLA un documento-base por capitulos (portada-base con nombre/n tablas/totales/fecha/fuente, resumen de tablas con una fila por tabla, y relaciones inter-tabla con la tabla de FK candidatas + una Figure matplotlib REAL del join graph dibujada con draw_join_graph_figure mas el texto Mermaid) y lo renderiza con el motor AutomaticEDA a PDF (A5 movil), PPTX (16:9) y Markdown autocontenido a la vez. Con per_table_eda=True anexa los capitulos de mini-EDA de cada tabla (build_document por tabla). Es el hermano a nivel de base de render_automatic_eda (que perfila UNA tabla): aqui el informe es de la base y de sus relaciones. Devuelve las rutas de PDF/PPTX/MD, el manifiesto y el DatabaseProfile."
tags: [eda, duckdb, database, profiling, relations, pipeline, dataops, report, pdf, pptx, launcher]
uses_functions:
- load_folder_to_duckdb_py_infra
- profile_database_py_pipelines
- render_automatic_eda_pdf_py_datascience
- render_automatic_eda_pptx_py_datascience
- render_automatic_eda_markdown_py_datascience
- draw_join_graph_figure_py_datascience
uses_types: []
returns: []
returns_optional: false
error_type: error_go_core
imports: []
tested: true
tests:
- "golden: carpeta con 3 CSV relacionados (customers/orders/products) emite PDF+PPTX+MD del documento-base con 3 tablas y la FK orders.customer_id->customers.id"
- "edge: carpeta vacia -> status ok con documento minimo, sin lanzar"
- "edge: 1 sola tabla -> funciona sin relaciones (capitulo relaciones dice 'sin FK')"
test_file_path: "python/functions/pipelines/render_automatic_eda_folder_test.py"
file_path: "python/functions/pipelines/render_automatic_eda_folder.py"
params:
- name: path
desc: "DIRECTORIO con archivos tabulares (CSV/Parquet/JSON) que se cargan a una DuckDB temporal, o una DuckDB ya existente (.duckdb/.ddb/.db) que se perfila directa."
- name: out_dir
desc: "Directorio de salida de los informes (se crea si no existe). Default 'reports'."
- name: basename
desc: "Nombre base de los archivos sin extension. Default 'aeda_base_<nombre>_<timestamp>'."
- name: profile_level
desc: "Preset de coste del perfil por tabla ('lite'/'standard'/'full'); ajusta el sample que profile_database pasa a cada tabla (lite=2000, standard/full=5000)."
- name: emit_pdf
desc: "Emite el PDF A5 movil del documento-base. Default True."
- name: emit_pptx
desc: "Emite el PPTX 16:9 del documento-base. Default True."
- name: emit_md
desc: "Emite el Markdown autocontenido del documento-base. Default True."
- name: per_table_eda
desc: "Si True, anexa al documento-base los capitulos de mini-EDA de cada tabla (Heading 'Tabla: <n>' + build_document por tabla). Default False (solo documento-base: portada + resumen + relaciones)."
- name: min_inclusion
desc: "Umbral de inclusion (0-1) para emitir una FK candidata (se pasa a profile_database). Default 0.9."
- name: ctx_extra
desc: "Dict opcional de claves de presentacion (p.ej. dataset_name, description) que se mezclan en el contexto de la portada-base."
output: "Dict dict-no-throw. En exito: {status:'ok', pdf_path, pptx_path, md_path, manifest_path, n_tables, n_pages, n_slides, md_chars, db_path, db_profile}. En error: {status:'error', error:str}."
---
# render_automatic_eda_folder
EDA de una **carpeta / base multi-tabla** → informe AutomaticEDA por capítulos
en PDF (móvil A5) + PPTX (16:9) + Markdown, en una sola llamada. Es el hermano a
nivel de **base** de `render_automatic_eda` (que perfila una sola tabla): aquí el
documento resume **todas** las tablas y, sobre todo, sus **relaciones**
inter-tabla (FK candidatas por containment + join graph con diagrama Mermaid).
Compone, sin reimplementar su lógica: `load_folder_to_duckdb` (carga la carpeta),
`profile_database` (perfila la base + infiere FK + join graph) y los tres
renderers del motor AutomaticEDA (`render_automatic_eda_pdf`/`_pptx`/`_markdown`),
que aceptan directamente la lista de capítulos del documento-base que este
pipeline ensambla. El pipeline de tabla única (`render_automatic_eda`) queda
intacto: esto es aditivo.
## Ejemplo
```bash
# Carpeta con varios CSV/Parquet/JSON relacionados:
./fn run render_automatic_eda_folder /tmp/eda_folder_demo
# Una DuckDB ya existente (rama directa):
./fn run render_automatic_eda_folder temp/bigdata/taxi.duckdb
```
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from pipelines.render_automatic_eda_folder import render_automatic_eda_folder
r = render_automatic_eda_folder("/tmp/eda_folder_demo", out_dir="reports")
# r["status"] == "ok"; r["pdf_path"], r["pptx_path"], r["md_path"]
# r["n_tables"] == 3; r["db_profile"]["fk_candidates"] incluye
# orders.customer_id -> customers.id
```
## Cuando usarla
Cuando quieras un EDA de una **base entera** (una carpeta de exports o una
DuckDB con varias tablas), no de una sola tabla: para ver de un vistazo qué
tablas hay, su tamaño y calidad, y cómo se relacionan (FK candidatas + diagrama),
en el mismo formato rico por capítulos (PDF móvil + PPTX + MD) que el EDA de
tabla. Usa `per_table_eda=True` cuando además quieras el mini-EDA de cada tabla
anexado.
## Gotchas
- Impuro: lee archivos del disco y escribe PDF/PPTX/MD en `out_dir`. En la rama
"carpeta" crea una **DuckDB temporal** (su ruta sale en `db_path`); no se borra
automáticamente (queda para reinspección).
- `path` se interpreta así: directorio → se carga la carpeta; archivo con
extensión `.duckdb`/`.ddb`/`.db` → se usa directo; cualquier otro archivo o un
path inexistente → `{status:'error'}` (no lanza).
- El escaneo de la carpeta es **no recursivo** (solo el primer nivel) y por
defecto cubre `*.csv,*.parquet,*.json` (ver `load_folder_to_duckdb`).
- El join graph se rasteriza a una **Figure matplotlib real** (vía
`draw_join_graph_figure`) que aparece dibujada en PDF/PPTX (nodos = tablas,
flechas = FK). Además, el **texto Mermaid** del grafo se incluye como bloque de
código (en el Markdown queda como diagrama renderizable y es útil para pegar a
un LLM).
- Carpeta vacía o con 1 sola tabla: funciona igual; el capítulo de relaciones
dice "sin FK". dict-no-throw en todos los caminos.
@@ -1,366 +0,0 @@
"""render_automatic_eda_folder — EDA de una CARPETA / base multi-tabla one-shot.
Pipeline impuro del grupo de capacidad `eda`, a nivel de BASE. Dada una CARPETA
de archivos tabulares (CSV/Parquet/JSON) o una DuckDB ya existente, produce el
informe AutomaticEDA de la BASE en sus tres formatos a la vez (PDF móvil A5 +
PPTX 16:9 + Markdown autocontenido), con los capítulos POBLADOS, en una sola
llamada. Es el hermano a nivel de base de ``render_automatic_eda`` (que perfila
UNA tabla): aquí el documento por capítulos resume TODAS las tablas y, sobre
todo, sus RELACIONES inter-tabla (FK candidatas + join graph).
Compone funciones del registry SIN reimplementar su lógica:
- load_folder_to_duckdb : carga una carpeta de archivos a una DuckDB temporal
(rama "carpeta"). En la rama "ya es duckdb" se omite.
- profile_database : perfila TODA la base (resumen de cada tabla,
TableProfiles completos, FK candidatas por
containment y join graph con diagrama Mermaid).
- render_automatic_eda_pdf : renderiza el documento-base por capítulos a PDF.
- render_automatic_eda_pptx : renderiza el mismo documento-base a PPTX.
- render_automatic_eda_markdown : serializa el mismo documento-base a Markdown
autocontenido (texto + tablas markdown).
- build_document : (solo con per_table_eda=True) ensambla los capítulos
canónicos de CADA tabla para anexarlos al documento.
La capa propia de este pipeline es ENSAMBLAR EL DOCUMENTO-BASE de capítulos a
partir del ``DatabaseProfile`` que devuelve ``profile_database`` y cablear los
tres renderers del motor AutomaticEDA. El documento-base mínimo tiene tres
capítulos: portada-base (nombre/nº tablas/totales/fecha/fuente), resumen de
tablas (una fila por tabla) y relaciones inter-tabla (FK candidatas + diagrama
Mermaid). Con ``per_table_eda=True`` anexa, por cada tabla, sus capítulos de
mini-EDA.
Estilo dict-no-throw del grupo `eda`: nunca lanza; captura cualquier error y
degrada a ``{"status": "error", "error": str}``.
"""
import os
from datetime import datetime, timezone
from datascience import (
draw_join_graph_figure,
render_automatic_eda_markdown,
render_automatic_eda_pdf,
render_automatic_eda_pptx,
)
from datascience.automatic_eda import build_document
from infra import load_folder_to_duckdb
from pipelines.profile_database import profile_database
# Mapa profile_level -> tamaño de muestra por columna del perfil de cada tabla.
# A nivel de base el coste lo domina el nº de tablas; el preset solo ajusta el
# sample que profile_database pasa a profile_table.
_SAMPLE_BY_LEVEL = {"lite": 2000, "standard": 5000, "full": 5000}
# Extensiones que se consideran "una DuckDB ya hecha" en la rama directa.
_DUCKDB_EXTS = (".duckdb", ".ddb", ".db")
def _fmt_num(v) -> str:
"""Formatea un entero con separador de millar; '' si no es número."""
if isinstance(v, bool) or not isinstance(v, (int, float)):
return ""
try:
return f"{int(v):,}".replace(",", ".")
except Exception: # noqa: BLE001
return str(v)
def _portada_chapter(db_profile: dict, source_path: str, db_path: str,
meta_ctx: dict) -> dict:
"""Capítulo de portada a nivel de base (NO reusa chapters/portada.py, que es
de tabla única): nombre de la base, nº de tablas, totales y procedencia."""
tables = db_profile.get("tables", []) or []
total_rows = sum(
(t.get("n_rows") or 0) for t in tables if isinstance(t.get("n_rows"), (int, float))
)
total_cols = sum(
(t.get("n_cols") or 0) for t in tables if isinstance(t.get("n_cols"), (int, float))
)
base_name = (meta_ctx or {}).get("dataset_name") or os.path.basename(
os.path.normpath(source_path)
) or source_path
rows = [
("Base", base_name),
("Tablas", _fmt_num(db_profile.get("n_tables"))),
("Filas totales", _fmt_num(total_rows)),
("Columnas totales", _fmt_num(total_cols)),
("Relaciones FK", _fmt_num(len(db_profile.get("fk_candidates", []) or []))),
("Fuente", source_path),
("DuckDB", db_path),
("Generado", datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")),
]
blocks = [
{"kind": "heading", "text": f"EDA de la base — {base_name}", "level": 1},
{"kind": "kv_table", "rows": rows, "title": "Resumen de la base"},
]
errs = db_profile.get("errors", []) or []
if errs:
blocks.append({
"kind": "note",
"text": f"{len(errs)} aviso(s) durante el perfilado (ver detalle).",
})
return {"id": "portada_base", "title": "Portada", "version": "1.0.0",
"blocks": blocks}
def _resumen_chapter(db_profile: dict) -> dict:
"""Capítulo con una fila por tabla: filas, columnas, calidad, key_candidates."""
header = ["Tabla", "Filas", "Columnas", "Calidad", "key_candidates"]
rows = []
for t in db_profile.get("tables", []) or []:
keys = ", ".join(t.get("key_candidates") or []) or ""
rows.append([
t.get("table"),
_fmt_num(t.get("n_rows")),
_fmt_num(t.get("n_cols")),
t.get("quality_score"),
keys,
])
if rows:
blocks = [{
"kind": "data_table", "header": header, "rows": rows,
"title": "Tablas de la base",
"note": "Una fila por tabla. Calidad = score agregado del TableProfile.",
}]
else:
blocks = [{"kind": "note",
"text": "La base no contiene tablas perfilables."}]
return {"id": "resumen_tablas", "title": "Resumen de tablas",
"version": "1.0.0", "blocks": blocks}
def _relaciones_chapter(db_profile: dict) -> dict:
"""Capítulo de relaciones inter-tabla: tabla de FK candidatas + diagrama
Mermaid del join graph (vuelca el Mermaid como bloque de código)."""
fks = db_profile.get("fk_candidates", []) or []
blocks = [{
"kind": "heading", "text": "Relaciones inter-tabla", "level": 2,
}]
if fks:
header = ["From", "To", "Inclusión", "Cardinalidad"]
rows = []
for fk in fks:
frm = f"{fk.get('from_table')}.{fk.get('from_col')}"
to = f"{fk.get('to_table')}.{fk.get('to_col')}"
inc = fk.get("inclusion")
inc_s = f"{inc:.3f}" if isinstance(inc, (int, float)) else str(inc)
rows.append([frm, to, inc_s, fk.get("cardinality")])
blocks.append({
"kind": "data_table", "header": header, "rows": rows,
"title": "FK candidatas (por containment de valores)",
"note": "Inclusión = fracción de valores de From contenidos en To.",
})
else:
blocks.append({
"kind": "note",
"text": "Sin relaciones FK candidatas detectadas entre las tablas.",
})
join_graph = db_profile.get("join_graph") or {}
has_edges = bool(join_graph.get("edges"))
if has_edges:
blocks.append({"kind": "heading", "text": "Diagrama (join graph)",
"level": 3})
# Figure matplotlib REAL del grafo de relaciones (nodos = tablas,
# aristas = FK). Lazy via `make`: el renderer la construye solo al
# paginar, y se rasteriza en PDF/PPTX. draw_join_graph_figure nunca
# lanza (devuelve una Figure de error si algo falla).
blocks.append({
"kind": "figure",
"make": (lambda jg=join_graph: draw_join_graph_figure(
jg, title="Join graph (relaciones inter-tabla)")),
"caption": "Grafo de relaciones: nodos = tablas, flechas = FK "
"candidatas (etiqueta from_col→to_col).",
"height_in": 4.5,
})
# Además, el Mermaid en texto: en el Markdown queda como diagrama
# renderizable y es útil para pegar a un LLM.
mermaid = (join_graph.get("mermaid", "") or "").strip()
if mermaid:
blocks.append({"kind": "markdown",
"text": "```mermaid\n" + mermaid + "\n```"})
return {"id": "relaciones", "title": "Relaciones inter-tabla",
"version": "1.0.0", "blocks": blocks}
def _build_db_document(db_profile: dict, source_path: str, db_path: str,
meta_ctx: dict, per_table_eda: bool) -> list:
"""Ensambla el documento-base por capítulos a partir del DatabaseProfile.
Mínimo: portada-base + resumen de tablas + relaciones. Con per_table_eda
True anexa, por cada tabla, un capítulo separador + los capítulos canónicos
de su mini-EDA (reusando build_document sobre cada TableProfile)."""
chapters = [
_portada_chapter(db_profile, source_path, db_path, meta_ctx),
_resumen_chapter(db_profile),
_relaciones_chapter(db_profile),
]
if per_table_eda:
for prof in db_profile.get("table_profiles", []) or []:
tname = prof.get("table") or "tabla"
chapters.append({
"id": f"tabla_{tname}", "title": f"Tabla: {tname}",
"version": "1.0.0",
"blocks": [{"kind": "heading", "text": f"Tabla: {tname}",
"level": 1}],
})
try:
# build_document devuelve los capítulos canónicos de la tabla.
# ctx None -> los capítulos que necesitan datos crudos degradan,
# pero salen completos los de portada/overview/distrib/calidad.
chapters.extend(build_document(prof, None) or [])
except Exception: # noqa: BLE001 — una tabla mala no rompe el doc.
chapters.append({
"id": f"tabla_{tname}_err", "title": f"Tabla: {tname}",
"version": "1.0.0",
"blocks": [{"kind": "note",
"text": "No se pudo ensamblar el mini-EDA de "
"esta tabla."}],
})
return chapters
def _resolve_db_path(path: str) -> dict:
"""Resuelve el DuckDB a perfilar desde ``path``.
- Directorio -> carga la carpeta con load_folder_to_duckdb (DuckDB temp).
- Archivo .duckdb/.ddb/.db -> se usa directo (rama "ya es duckdb").
- Otro archivo / inexistente -> error.
Devuelve {status, db_path, loaded, n_tables, load_errors}.
"""
if os.path.isdir(path):
lr = load_folder_to_duckdb(path)
if lr.get("status") != "ok":
return {"status": "error",
"error": f"load_folder_to_duckdb falló: {lr.get('error')}"}
return {
"status": "ok",
"db_path": lr.get("db_path"),
"loaded": True,
"n_tables": len(lr.get("tables", []) or []),
"load_errors": lr.get("errors", []) or [],
}
if os.path.isfile(path):
if path.lower().endswith(_DUCKDB_EXTS):
return {"status": "ok", "db_path": path, "loaded": False,
"n_tables": None, "load_errors": []}
return {"status": "error",
"error": f"'{path}' no es un directorio ni una DuckDB "
f"(extensiones {_DUCKDB_EXTS})."}
return {"status": "error", "error": f"path no existe: {path}"}
def render_automatic_eda_folder(
path: str,
out_dir: str = "reports",
basename: str = None,
profile_level: str = "standard",
emit_pdf: bool = True,
emit_pptx: bool = True,
emit_md: bool = True,
per_table_eda: bool = False,
min_inclusion: float = 0.9,
ctx_extra: dict = None,
) -> dict:
"""Perfila una CARPETA (o una DuckDB) y emite el informe AutomaticEDA de la base.
Args:
path: o bien un DIRECTORIO con archivos tabulares (CSV/Parquet/JSON) que
se cargan a una DuckDB temporal, o bien una DuckDB ya existente
(``.duckdb``/``.ddb``/``.db``) que se perfila directa.
out_dir: directorio de salida (se crea si no existe). Default "reports".
basename: nombre base de los archivos sin extensión. Default
"aeda_base_<nombre>_<timestamp>".
profile_level: preset de coste del perfil por tabla ("lite"/"standard"/
"full"); ajusta el ``sample`` que profile_database pasa a cada tabla.
emit_pdf / emit_pptx / emit_md: qué formatos emitir. Default los tres.
per_table_eda: si True, anexa al documento-base los capítulos de mini-EDA
de cada tabla (un Heading "Tabla: <n>" + build_document por tabla).
Default False (solo el documento-base: portada + resumen + relaciones).
min_inclusion: umbral de inclusión para emitir una FK candidata (0-1).
ctx_extra: dict opcional de claves de presentación (p.ej. dataset_name,
description) que se mezclan en el contexto de la portada.
Returns:
dict (nunca lanza). En éxito::
{"status": "ok", "pdf_path": str|None, "pptx_path": str|None,
"md_path": str|None, "manifest_path": str|None,
"n_tables": int, "n_pages": int|None, "n_slides": int|None,
"md_chars": int|None, "db_path": str, "db_profile": <DatabaseProfile>}
En error: {"status": "error", "error": str}.
"""
try:
# 1) Resolver la DuckDB a perfilar (cargar carpeta o usar la dada).
rdb = _resolve_db_path(path)
if rdb.get("status") != "ok":
return {"status": "error", "error": rdb.get("error")}
db_path = rdb.get("db_path")
# 2) Perfilar la base entera (resumen + FK + join graph). Sin report
# propio (write_report/emit_pdf False): este pipeline emite el suyo.
sample = _SAMPLE_BY_LEVEL.get(profile_level, 5000)
pres = profile_database(
db_path, sample=sample, write_report=False,
min_inclusion=min_inclusion, emit_pdf=False,
)
if pres.get("status") != "ok":
return {"status": "error",
"error": f"profile_database falló: {pres.get('error')}"}
db_profile = pres.get("db_profile") or {}
# 3) Ensamblar el documento-base por capítulos.
meta_ctx = dict(ctx_extra or {})
chapters = _build_db_document(
db_profile, path, db_path, meta_ctx, per_table_eda
)
# 4) Render a los tres formatos desde el MISMO documento por capítulos.
os.makedirs(out_dir, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
nm = (meta_ctx.get("dataset_name")
or os.path.basename(os.path.normpath(path)) or "base")
nm = "".join(c if c.isalnum() else "_" for c in str(nm)).strip("_") or "base"
base = basename or f"aeda_base_{nm}_{ts}"
title = f"EDA base — {meta_ctx.get('dataset_name') or nm}"
meta = {"title": title}
pdf_path = pptx_path = md_path = manifest_path = None
n_pages = n_slides = md_chars = None
if emit_pdf:
target = os.path.join(out_dir, base + ".pdf")
rpdf = render_automatic_eda_pdf(chapters, target, meta) or {}
pdf_path = rpdf.get("path")
n_pages = rpdf.get("n_pages")
manifest_path = rpdf.get("manifest_path")
if emit_pptx:
target = os.path.join(out_dir, base + ".pptx")
rpptx = render_automatic_eda_pptx(chapters, target, meta) or {}
pptx_path = rpptx.get("path")
n_slides = rpptx.get("n_slides")
if emit_md:
target = os.path.join(out_dir, base + ".md")
rmd = render_automatic_eda_markdown(chapters, target, meta) or {}
md_path = rmd.get("path")
md_chars = rmd.get("n_chars")
return {
"status": "ok",
"pdf_path": pdf_path,
"pptx_path": pptx_path,
"md_path": md_path,
"manifest_path": manifest_path,
"n_tables": db_profile.get("n_tables"),
"n_pages": n_pages,
"n_slides": n_slides,
"md_chars": md_chars,
"db_path": db_path,
"db_profile": db_profile,
}
except Exception as e: # noqa: BLE001 — dict-no-throw: degradar, nunca lanzar.
return {"status": "error", "error": str(e)}
@@ -1,188 +0,0 @@
"""Tests para render_automatic_eda_folder — EDA de una carpeta / base multi-tabla.
Golden: una carpeta con 3 CSV relacionados (customers/orders/products) produce el
documento-base en PDF + PPTX + MD, con las 3 tablas en el resumen y la FK
orders.customer_id -> customers.id en el capítulo de relaciones. Edges: carpeta
vacía (documento mínimo, sin lanzar), 1 sola tabla (sin relaciones) y la rama
"ya es una DuckDB" sobre un archivo .duckdb existente.
"""
import os
import sys
import duckdb
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from pipelines.render_automatic_eda_folder import (
_relaciones_chapter,
render_automatic_eda_folder,
)
def _write_demo_folder(folder: str) -> None:
"""3 CSV relacionados: orders.customer_id -> customers.id (FK detectable)."""
with open(os.path.join(folder, "customers.csv"), "w", encoding="utf-8") as fh:
fh.write("id,name,city\n")
fh.write("1,Alice,Madrid\n2,Bob,Barcelona\n3,Carol,Valencia\n"
"4,Dave,Sevilla\n5,Eve,Madrid\n")
with open(os.path.join(folder, "orders.csv"), "w", encoding="utf-8") as fh:
fh.write("order_id,customer_id,product_id,total\n")
fh.write("100,1,10,49.90\n101,1,11,12.50\n102,2,10,49.90\n"
"103,3,12,8.00\n104,3,11,12.50\n105,5,10,49.90\n"
"106,2,12,8.00\n")
with open(os.path.join(folder, "products.csv"), "w", encoding="utf-8") as fh:
fh.write("product_id,product_name,price\n")
fh.write("10,Widget,49.90\n11,Gadget,12.50\n12,Gizmo,8.00\n")
def _has_fk(db_profile: dict, from_t: str, from_c: str, to_t: str) -> bool:
for fk in db_profile.get("fk_candidates", []) or []:
if (fk.get("from_table") == from_t and fk.get("from_col") == from_c
and fk.get("to_table") == to_t):
return True
return False
def test_golden_folder_three_csv(tmp_path):
"""Carpeta con 3 CSV relacionados -> PDF+PPTX+MD, 3 tablas, FK detectada."""
folder = tmp_path / "demo"
folder.mkdir()
_write_demo_folder(str(folder))
out = tmp_path / "out"
r = render_automatic_eda_folder(str(folder), out_dir=str(out))
assert r["status"] == "ok", r
assert r["n_tables"] == 3
# Los tres formatos se emitieron y existen en disco.
assert r["pdf_path"] and os.path.exists(r["pdf_path"])
assert r["pptx_path"] and os.path.exists(r["pptx_path"])
assert r["md_path"] and os.path.exists(r["md_path"])
assert (r["n_pages"] or 0) >= 1
assert (r["n_slides"] or 0) >= 1
# La FK orders.customer_id -> customers.id se detecta por containment.
assert _has_fk(r["db_profile"], "orders", "customer_id", "customers"), \
r["db_profile"].get("fk_candidates")
# El Markdown menciona las 3 tablas y la relación.
md = open(r["md_path"], encoding="utf-8").read()
for t in ("customers", "orders", "products"):
assert t in md
assert "customer_id" in md
def test_edge_empty_folder(tmp_path):
"""Carpeta vacía -> status ok con documento mínimo, sin lanzar."""
folder = tmp_path / "empty"
folder.mkdir()
out = tmp_path / "out"
r = render_automatic_eda_folder(str(folder), out_dir=str(out))
assert r["status"] == "ok", r
assert r["n_tables"] == 0
# Aun sin tablas, emite el documento-base mínimo (portada + resumen vacío +
# relaciones "sin FK").
assert r["pdf_path"] and os.path.exists(r["pdf_path"])
assert r["md_path"] and os.path.exists(r["md_path"])
def test_edge_single_table_no_relations(tmp_path):
"""Carpeta con 1 sola tabla -> funciona sin relaciones (capítulo 'sin FK')."""
folder = tmp_path / "single"
folder.mkdir()
with open(folder / "lonely.csv", "w", encoding="utf-8") as fh:
fh.write("a,b\n1,x\n2,y\n3,z\n")
out = tmp_path / "out"
r = render_automatic_eda_folder(str(folder), out_dir=str(out))
assert r["status"] == "ok", r
assert r["n_tables"] == 1
assert not (r["db_profile"].get("fk_candidates") or [])
md = open(r["md_path"], encoding="utf-8").read()
assert "Sin relaciones FK" in md or "sin FK" in md.lower()
def test_accepts_existing_duckdb(tmp_path):
"""Rama 'ya es una DuckDB': un archivo .duckdb existente se perfila directo."""
db = tmp_path / "base.duckdb"
conn = duckdb.connect(str(db))
try:
conn.execute("CREATE TABLE customers (id INTEGER, name VARCHAR)")
conn.execute("INSERT INTO customers VALUES (1,'Ana'),(2,'Luis'),(3,'Eva')")
conn.execute("CREATE TABLE orders (oid INTEGER, customer_id INTEGER)")
conn.execute("INSERT INTO orders VALUES (10,1),(11,2),(12,1),(13,3)")
finally:
conn.close()
out = tmp_path / "out"
r = render_automatic_eda_folder(str(db), out_dir=str(out))
assert r["status"] == "ok", r
assert r["n_tables"] == 2
assert r["db_path"] == str(db)
assert r["pdf_path"] and os.path.exists(r["pdf_path"])
def test_emit_flags_select_formats(tmp_path):
"""emit_pdf/pptx/md controlan qué formatos se emiten."""
folder = tmp_path / "demo"
folder.mkdir()
_write_demo_folder(str(folder))
out = tmp_path / "out"
r = render_automatic_eda_folder(
str(folder), out_dir=str(out),
emit_pdf=True, emit_pptx=False, emit_md=False,
)
assert r["status"] == "ok", r
assert r["pdf_path"] and os.path.exists(r["pdf_path"])
assert r["pptx_path"] is None
assert r["md_path"] is None
def test_path_does_not_exist(tmp_path):
"""Path inexistente -> status error, sin lanzar."""
r = render_automatic_eda_folder(str(tmp_path / "nope"))
assert r["status"] == "error"
assert "no existe" in r["error"].lower()
def test_relaciones_chapter_has_real_figure_when_edges():
"""Con edges, el capítulo de relaciones incluye un bloque Figure matplotlib
REAL (no solo el texto Mermaid): su make() devuelve una Figure."""
db_profile = {
"join_graph": {
"nodes": [
{"table": "orders", "out_degree": 1, "in_degree": 0, "role": "fact"},
{"table": "customers", "out_degree": 0, "in_degree": 1, "role": "dim"},
],
"edges": [{"from_table": "orders", "from_col": "customer_id",
"to_table": "customers", "to_col": "id",
"cardinality": "N:1"}],
"mermaid": "graph LR orders --> customers",
"hubs": ["orders"],
},
"fk_candidates": [{"from_table": "orders", "from_col": "customer_id",
"to_table": "customers", "to_col": "id",
"inclusion": 1.0, "cardinality": "N:1"}],
}
ch = _relaciones_chapter(db_profile)
figs = [b for b in ch["blocks"] if b.get("kind") == "figure"]
assert len(figs) == 1, ch["blocks"]
# El make() perezoso produce una matplotlib Figure real.
import matplotlib
matplotlib.use("Agg")
fig = figs[0]["make"]()
from matplotlib.figure import Figure
assert isinstance(fig, Figure)
assert fig.get_axes(), "la Figure del join graph debe tener al menos un eje"
def test_relaciones_chapter_no_figure_when_no_edges():
"""Sin edges, no se añade bloque Figure (capítulo dice 'sin FK')."""
db_profile = {"join_graph": {"nodes": [], "edges": [], "mermaid": "",
"hubs": []}, "fk_candidates": []}
ch = _relaciones_chapter(db_profile)
assert not [b for b in ch["blocks"] if b.get("kind") == "figure"]