Compare commits

..

1 Commits

Author SHA1 Message Date
egutierrez 4f1530797e feat(datascience): rigor experimental para papers — effect size, IC, Holm + preregistro inmutable
Subsistema de papers reproducibles (grupo de capacidad `papers`). Añade las
funciones estadísticas que un paper honesto necesita y la función que congela la
hipótesis antes de mirar los datos (anti-HARKing).

Nuevas funciones (puras salvo la última):
- effect_size_cohens_d: Cohen's d + Hedges' g (corrección de sesgo para N
  pequeño) + interpretación cualitativa (negligible/small/medium/large por los
  umbrales de Cohen). Dict-no-throw ante varianza cero / N insuficiente.
- confidence_interval_mean: intervalo de confianza de una media (t de Student) o
  de la diferencia de medias con Welch (df de Welch–Satterthwaite, sin asumir
  varianzas iguales). Dict-no-throw; el IC colapsa al punto cuando la varianza es
  cero.
- preregister_hypothesis (impura): congela hipótesis + plan de análisis en
  papers/<slug>/preregistration.md con frozen_at (UTC) y content_hash (sha256 del
  cuerpo normalizado, no del frontmatter). Inmutabilidad: una vez frozen, un
  contenido distinto se RECHAZA sin sobrescribir (mata el HARKing); idempotente si
  el contenido es idéntico. Siempre dict-no-throw.

Extensión:
- fdr_correction 1.0.0 -> 1.1.0: añade method="holm" (Holm-Bonferroni step-down,
  controla FWER, más potente que Bonferroni simple). Reúsa la maquinaria de
  alineación 1:1 con None/inválidos; no rompe los métodos bh/bonferroni.

Reutiliza del registry: fdr_correction (BH + Bonferroni ya existían) como base
para Holm. pearson y spearman_corr ya cubrían correlación.

Tests: 36 pytest verdes (cohen/hedges 8, confidence/welch 8, fdr/holm/bonferroni
12, preregister 4 + extras), golden contra valores conocidos y validados con
scipy. Golden manual del preregistro: congela, idempotente, rechaza edición
(bytes en disco idénticos al congelado).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 20:42:12 +02:00
21 changed files with 1266 additions and 1246 deletions
+6
View File
@@ -59,6 +59,9 @@ from .acf_pacf import acf_pacf
from .stl_decompose import stl_decompose
from .to_returns import to_returns
from .fdr_correction import fdr_correction
from .effect_size_cohens_d import effect_size_cohens_d
from .confidence_interval_mean import confidence_interval_mean
from .preregister_hypothesis import preregister_hypothesis
from .suggest_reexpression import suggest_reexpression
from .exploratory_caveats import exploratory_caveats
from .render_eda_pdf import render_eda_pdf, render_eda_pdf_relational
@@ -90,6 +93,9 @@ __all__ = [
"stl_decompose",
"to_returns",
"fdr_correction",
"effect_size_cohens_d",
"confidence_interval_mean",
"preregister_hypothesis",
"suggest_reexpression",
"exploratory_caveats",
"render_eda_pdf",
@@ -31,7 +31,7 @@ import math
from .. import model
CHAPTER_VERSION = "1.1.0"
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "correlacion"
CHAPTER_TITLE = "Correlación"
@@ -47,13 +47,6 @@ _MAX_MATRIX_LABELS = 16
# How many pairs to show in each of the top-positive / top-negative tables.
_TOP_N = 10
# How many of the strongest numeric-numeric pairs to draw as scatter plots on
# each sign (positive / negative). A scatter per pair carries a fitted line/curve
# and a relationship-type label; keeping the count small keeps the chapter
# readable on a phone / a slide. Only signed (Pearson/Spearman) pairs qualify —
# Cramér's V / correlation ratio pairs are not numeric-numeric, so no scatter.
_SCATTER_TOP_N = 3
# Glossary terms this chapter explains. Each is registered in the shared
# collector (ctx['glossary']) and marked clickable on its first appearance in the
# body — the canonical two-step pattern (see ``cat_distr`` for the reference
@@ -321,139 +314,6 @@ def _fdr_text(corr: dict, mark_term: bool = False) -> str | None:
return " ".join(parts)
def _is_seq(values) -> bool:
"""True for a non-empty list/tuple of values (a raw numeric column)."""
return isinstance(values, (list, tuple)) and len(values) > 0
def _select_scatter_pairs(pairs: list, top_n: int = _SCATTER_TOP_N):
"""Pick the strongest numeric-numeric pairs to draw as scatters.
Only signed (Pearson/Spearman) pairs are numeric-numeric and thus eligible
for a scatter with a fitted curve. Returns up to ``top_n`` of the strongest
positive pairs followed by up to ``top_n`` of the strongest negative ones,
each ranked by magnitude. Mixed-type metrics (Cramér's V, correlation ratio,
mutual information) are excluded — they have no x/y scatter interpretation.
"""
positive = []
negative = []
for pair in pairs:
if not isinstance(pair, dict) or not _is_signed(pair):
continue
value = pair.get("value")
if not _is_num(value):
continue
if value > 0:
positive.append(pair)
elif value < 0:
negative.append(pair)
positive.sort(key=lambda p: abs(float(p.get("value", 0.0))), reverse=True)
negative.sort(key=lambda p: abs(float(p.get("value", 0.0))), reverse=True)
return positive[:top_n] + negative[:top_n]
def _classification_note(a: str, b: str, cls: dict) -> str:
"""Human-readable sentence describing the relationship of a pair.
Plain text (not baked into the figure image) so the type label is selectable
in the PDF / extractable by pdftotext, and sits right next to its scatter
inside the keep-together Group.
"""
tipo = model._safe_str(cls.get("tipo")) or "sin forma clara"
bits = []
pearson = cls.get("pearson")
spearman = cls.get("spearman")
r2_lin = cls.get("r2_linear")
r2_poly = None
for key in ("r2_poly2", "r2_poly3"):
v = cls.get(key)
if _is_num(v) and (r2_poly is None or float(v) > r2_poly):
r2_poly = float(v)
if _is_num(pearson):
bits.append(f"Pearson r={float(pearson):+.2f}")
if _is_num(spearman):
bits.append(f"Spearman ρ={float(spearman):+.2f}")
if _is_num(r2_lin):
bits.append(f"R² lineal={float(r2_lin):.2f}")
if r2_poly is not None:
bits.append(f"R² polinómico={r2_poly:.2f}")
metrics = "; ".join(bits)
text = (f"Relación **{tipo}** entre «{a}» y «{b}»."
+ (f" {metrics}." if metrics else ""))
return text
def _scatter_blocks(pairs: list, raw_numeric):
"""Build keep-together scatter Groups for the strongest num-num pairs.
Returns a list of blocks (a Heading plus one Group per pair), or an empty
list when there is no raw numeric data (e.g. the lite profile drops
``ctx['raw_numeric']`` to skip live recomputation) or the relationship
helpers are unavailable. Never raises: any failure degrades to no scatters,
leaving the matrix + tables intact.
"""
if not isinstance(raw_numeric, dict) or not raw_numeric:
return []
selected = _select_scatter_pairs(pairs)
if not selected:
return []
# The relationship helpers live in the datascience package. Import lazily so
# the chapter still builds (matrix + tables) when they are absent.
try:
from datascience.classify_relationship_type import (
classify_relationship_type,
)
from datascience.relationship_scatter_figure import (
relationship_scatter_figure,
)
except Exception: # noqa: BLE001 — degrade, never break the chapter.
return []
groups = []
for pair in selected:
a = pair.get("a")
b = pair.get("b")
xs = raw_numeric.get(a)
ys = raw_numeric.get(b)
# Edge: a selected pair has no raw column (aggregated profile, renamed
# column, …) — skip just that pair, keep the rest.
if not _is_seq(xs) or not _is_seq(ys):
continue
try:
cls = classify_relationship_type(list(xs), list(ys)) or {}
except Exception: # noqa: BLE001
continue
a_lbl = model._safe_str(a)
b_lbl = model._safe_str(b)
def _make(xs=xs, ys=ys, a_lbl=a_lbl, b_lbl=b_lbl, cls=cls):
return relationship_scatter_figure(
list(xs), list(ys), x_label=a_lbl, y_label=b_lbl,
classification=cls)
groups.append(model.Group(blocks=[
model.Heading(text=f"{a_lbl}{b_lbl}", level=2),
model.Figure(
make=_make,
caption=(f"Dispersión de «{a_lbl}» frente a «{b_lbl}» con la "
"curva de ajuste del mejor modelo.")),
model.Markdown(text=_classification_note(a_lbl, b_lbl, cls)),
]))
if not groups:
return []
intro = model.Markdown(text=(
"Para los pares numéricos más fuertes (positivos y negativos) se dibuja "
"la nube de puntos con su ajuste y se clasifica el **tipo de relación**: "
"**lineal** (una recta basta), **polinómica** (curva de grado 2/3 que "
"mejora claramente el ajuste lineal), **monótona no-lineal** (crece o "
"decrece siempre pero no en línea recta; Spearman ≫ Pearson) o "
"**débil/sin forma**."))
return [model.Heading(text="Relaciones más fuertes (scatter)", level=2),
intro] + groups
def build_correlacion(profile: dict, ctx: dict):
"""Build the Correlation Chapter, or None if there are no pairs to show.
@@ -532,18 +392,6 @@ def build_correlacion(profile: dict, ctx: dict):
"No se han hallado correlaciones negativas significativas entre "
"columnas numéricas.")))
# 2.5) Scatter plots of the strongest numeric-numeric pairs, each with its
# fitted curve and a relationship-type label (lineal / polinómica / monótona
# / débil). Needs the raw numeric sample (ctx['raw_numeric'], row-aligned);
# when it is absent (aggregated/lite profile) the scatters are simply omitted
# and the matrix + tables above stand on their own.
raw_numeric = None
if isinstance(ctx, dict):
raw_numeric = ctx.get("raw_numeric") or profile.get("raw_numeric")
else:
raw_numeric = profile.get("raw_numeric")
blocks.extend(_scatter_blocks(pairs, raw_numeric))
# 3) Spuriousness caveat for level-based correlations (GrangerNewbold).
caveat = corr.get("levels_caveat")
if isinstance(caveat, str) and caveat.strip():
@@ -175,105 +175,6 @@ def test_anticorte_matriz_ancha_y_etiquetas_largas_no_se_cortan():
assert "azufre" in _pdf_text(pdf)
def _raw_numeric_for_profile(n: int = 80) -> dict:
"""Row-aligned raw numeric sample matching the signed pairs of _profile().
Builds columns with a clear, deterministic shape so the relationship-type
classifier has something unambiguous to label:
- density vs alcohol: strong negative linear (the top-negative pair).
- alcohol vs quality: positive linear.
- ph, fixed_acidity, sulphates: filler columns for the remaining pairs.
"""
import math as _m
alcohol = [8.0 + 0.05 * i for i in range(n)]
density = [1.0 - 0.002 * a for a in alcohol] # neg linear vs alcohol
quality = [3.0 + 0.4 * a + (0.1 if i % 2 else -0.1) # pos linear vs alcohol
for i, a in enumerate(alcohol)]
ph = [3.0 + 0.3 * _m.sin(i / 5.0) for i in range(n)]
fixed_acidity = [7.0 - 0.5 * p for p in ph] # neg linear vs ph
sulphates = [0.5 + 0.01 * (i % 7) for i in range(n)]
return {
"alcohol": alcohol, "density": density, "quality": quality,
"ph": ph, "fixed_acidity": fixed_acidity, "sulphates": sulphates,
}
def test_golden_scatters_de_pares_num_num_con_tipo_de_relacion():
"""Con ctx['raw_numeric'], el capítulo añade scatters (Figure dentro de Group)
de los pares num-num más fuertes, cada uno con su etiqueta de tipo en texto."""
from datascience.automatic_eda.model import Group
ctx = {"raw_numeric": _raw_numeric_for_profile()}
ch = build_correlacion(_profile(), ctx)
assert ch is not None
groups = [b for b in ch.blocks if isinstance(b, Group)]
assert groups, "debe emitir al menos un Group con scatter"
# Cada Group lleva su figura (lazy) y una nota de texto con el tipo.
for g in groups:
gkinds = [b.kind for b in g.blocks]
assert "figure" in gkinds and "markdown" in gkinds
# La sección y la etiqueta de tipo aparecen como texto plano (extraíble).
headings = " ".join(b.text for b in ch.blocks if b.kind == "heading")
assert "Relaciones más fuertes" in headings
body = " ".join(b.text for g in groups for b in g.blocks
if b.kind == "markdown")
assert any(t in body for t in
("lineal", "polinómica", "monótona", "sin forma"))
# El par num-num más fuerte (density ↔ alcohol) tiene scatter; el par cat-cat
# (region ↔ type) NO — no es numérico.
assert "density" in body or "alcohol" in body
assert "region" not in body and "type" not in body
def test_golden_pdf_muestra_scatters_con_etiqueta_de_tipo():
"""En el PDF, el capítulo Correlación incluye los scatters y su etiqueta de
tipo en texto seleccionable (pdftotext la encuentra)."""
prof = _profile()
ctx = {"raw_numeric": _raw_numeric_for_profile()}
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "corr_scatter.pdf")
rp = render_automatic_eda_pdf(prof, pdf, {"title": "EDA — wine",
"ctx": ctx})
assert rp["path"] == pdf and rp["n_pages"] >= 1
txt = _pdf_text(pdf)
assert "Relaciones" in txt and "scatter" in txt.lower()
# Alguna etiqueta de tipo de relación, en texto.
assert any(t in txt for t in
("lineal", "polin", "monóton", "monoton", "sin forma"))
def test_edge_sin_raw_numeric_omite_scatters_sin_lanzar():
"""profile lite / ctx None: sin raw_numeric el capítulo omite los scatters
pero sigue emitiendo matriz + tablas (no lanza)."""
from datascience.automatic_eda.model import Group
for ctx in (None, {}, {"raw_numeric": None}, {"raw_numeric": {}}):
ch = build_correlacion(_profile(), ctx)
assert ch is not None
assert not [b for b in ch.blocks if isinstance(b, Group)]
# La matriz y al menos una tabla top siguen presentes.
assert any(b.kind == "figure" for b in ch.blocks)
assert any(b.kind == "data_table" for b in ch.blocks)
def test_edge_par_sin_columna_cruda_se_omite_sin_lanzar():
"""Si un par seleccionado no tiene su columna en raw_numeric, se omite ese
par (no lanza); los demás scatters se construyen igual."""
from datascience.automatic_eda.model import Group
raw = _raw_numeric_for_profile()
raw.pop("density", None) # rompe el par density ↔ alcohol
ch = build_correlacion(_profile(), {"raw_numeric": raw})
assert ch is not None
groups = [b for b in ch.blocks if isinstance(b, Group)]
body = " ".join(b.text for g in groups for b in g.blocks
if b.kind == "markdown")
# density desaparece de los scatters; otros pares (p.ej. ph↔fixed_acidity,
# alcohol↔quality) pueden seguir presentes sin error.
assert "density" not in body
def test_glosario_engancha_metodos_y_fdr():
"""Mejora 4b: los métodos de correlación (Pearson, Spearman, Cramér's V,
razón de correlación) y la corrección por comparaciones múltiples (FDR) se
@@ -1,68 +0,0 @@
---
name: classify_relationship_type
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def classify_relationship_type(xs: list, ys: list) -> dict"
description: "Clasifica el TIPO de relacion entre dos variables numericas pareadas por indice para el EDA automatico del grupo eda. Limpia los pares de forma defensiva (descarta None/bool/NaN/inf), reusa pearson y spearman_corr del registry y ajusta polinomios de grado 2 y 3 con numpy.polyfit (R^2 manual), y a partir de esas senales etiqueta la forma: 'lineal', 'polinomica (grado 2/3)', 'monotona no-lineal' o 'debil/sin forma'. Orden de decision: debil -> monotona -> polinomica -> lineal (la primera que matchea gana), con umbrales calibrados para datos reales discretos/ruidosos. Devuelve ademas los coeficientes del mejor modelo en orden de numpy.polyval para pintar la curva de ajuste sobre el scatter. Funcion pura no-throw: ante datos insuficientes (menos de 5 pares validos o varianza ~0) o cualquier fallo devuelve el dict canonico con tipo='debil/sin forma' y el resto a None."
tags: [eda, correlation, relationship, classification, polyfit, datascience, pure]
params:
- name: xs
desc: "Lista (o tupla) de valores numericos de la primera variable, pareada por indice con ys. Cada par xs[i],ys[i] se descarta si cualquiera de los dos es None, bool, NaN o inf. Lectura defensiva."
- name: ys
desc: "Lista (o tupla) de valores numericos de la segunda variable, pareada por indice con xs. Mismas reglas de limpieza que xs."
output: "Dict con SIEMPRE las mismas 8 claves: tipo (str: 'lineal' | 'polinómica (grado 2)' | 'polinómica (grado 3)' | 'monótona no-lineal' | 'débil/sin forma'); pearson (float|None: coeficiente de Pearson r); r2_linear (float|None: r**2 del ajuste lineal); spearman (float|None: rho de Spearman); r2_poly2 (float|None: R^2 del ajuste polinomico de grado 2); r2_poly3 (float|None: R^2 del ajuste de grado 3); best_degree (int|None: grado del modelo elegido — 1 lineal, 2/3 polinomico, None si monotona/debil); coeffs (list|None: coeficientes del mejor modelo en orden de numpy.polyval para pintar la curva, o None). Ante datos insuficientes o error: tipo='débil/sin forma' y el resto de claves a None."
uses_functions: [pearson_py_datascience, spearman_corr_py_datascience]
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [numpy]
tested: true
tests: ["test_lineal", "test_polinomica_cuadratica", "test_monotona_no_lineal", "test_monotona_exponencial", "test_debil_sin_forma", "test_lista_vacia_no_lanza", "test_longitudes_distintas_no_lanza", "test_todos_none_no_lanza", "test_entradas_none_no_lanza", "test_constante_no_lanza", "test_filtra_nan_inf_bool"]
test_file_path: "python/functions/datascience/classify_relationship_type_test.py"
file_path: "python/functions/datascience/classify_relationship_type.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.classify_relationship_type import classify_relationship_type
import numpy as np
# Relacion claramente cuadratica (forma de parabola) sobre dominio simetrico.
x = list(np.linspace(-10, 10, 60))
y = [v * v for v in x]
res = classify_relationship_type(x, y)
print(res["tipo"]) # 'polinómica (grado 2)'
print(res["best_degree"]) # 2
print(res["r2_linear"]) # 0.0 -> el Pearson lineal no ve la parabola
print(res["r2_poly2"]) # 1.0
print(res["coeffs"]) # [1.0, -0.0, -0.0] -> numpy.polyval(coeffs, x) ~ x**2
# El capitulo pinta la curva de ajuste cuando coeffs no es None:
# if res["coeffs"] is not None:
# xs_fit = np.linspace(min(x), max(x), 200)
# ys_fit = np.polyval(res["coeffs"], xs_fit)
# ax.plot(xs_fit, ys_fit) # curva sobre el ax.scatter(x, y)
```
## Cuando usarla
- Usala en el capitulo de relaciones/correlaciones del EDA automatico, despues de detectar dos columnas numericas con alguna asociacion, para decidir QUE curva de ajuste pintar sobre el scatter (recta, parabola, cubica o ninguna) y poner una etiqueta legible al tipo de relacion.
- Cuando un Pearson bajo no signifique "sin relacion": esta funcion cruza Pearson con Spearman y con ajustes polinomicos para distinguir una relacion lineal debil de una monotona no-lineal (que el rango si capta) o de una curva polinomica.
- Cuando necesites un punto de entrada determinista y no-throw que, con los mismos datos, devuelva siempre el mismo `tipo` y los mismos `coeffs` listos para `numpy.polyval` sin tener que ajustar modelos a mano en el capitulo.
## Gotchas
- Funcion pura, deterministica y no-throw: ante menos de 5 pares validos, varianza ~0 (xs o ys constante) o cualquier excepcion interna devuelve el dict canonico `tipo="débil/sin forma"` con el resto de claves a `None`. El dict SIEMPRE trae las 8 claves: nunca compruebes existencia, comprueba `None`.
- El orden de decision importa: `débil -> monótona -> polinómica -> lineal` (la primera que matchee gana). La monotonia se evalua ANTES que el ajuste polinomico, asi que una curva monotona suave (exp, log, potencias) sale `monótona no-lineal` aunque un cubico tambien la ajuste — la dominancia del rango (Spearman >> Pearson) es la senal mas interpretable. Solo cae en `polinómica` una forma curva NO monotona (p.ej. una parabola, Spearman ~0 pero R^2 polinomico alto).
- Umbrales fijos (calibrados para EDA con datos discretos/ruidosos, no para inferencia formal): `débil/sin forma` si las tres senales son bajas a la vez (`abs(pearson) < 0.3` y `abs(spearman) < 0.3` y `mejor_poly < 0.3`); `monótona no-lineal` si `abs(spearman) - abs(pearson) >= 0.1` y `abs(spearman) >= 0.4`; `polinómica (grado N)` si el mejor polinomico mejora `>= 0.1` sobre el lineal y su R^2 `>= 0.3`; en cualquier otro caso con senal (no debil) `lineal`. El suelo de 0.3 evita llamar "debil" a relaciones reales pero discretas (conteos, escalas ordinales) con R^2 bajo pero direccion clara.
- `coeffs` va en orden de `numpy.polyval` (grado descendente). Para `lineal` es `[pendiente, intercepto]` (grado 1); para `polinómica` los del grado elegido; para `monótona no-lineal` y `débil/sin forma` es `None` (el scatter pintara una curva suavizada o nada — lo decide el capitulo, no esta funcion).
- `best_degree` prefiere el grado 2 sobre el 3 cuando empatan dentro de 0.02 de R^2 (parsimonia): no esperes grado 3 salvo que mejore claramente.
- Los pares con `None`, `bool`, `NaN` o `inf` se descartan por indice en silencio; `bool` cuenta como no-numerico (un `True` no es `1`). El dominio de los datos afecta al resultado: una parabola sobre un dominio simetrico da Pearson ~0 (sale `polinómica`), pero sobre un dominio asimetrico el Pearson sube y puede salir `lineal`.
@@ -1,187 +0,0 @@
"""Clasifica el TIPO de relacion entre dos variables numericas pareadas.
Funcion pura del grupo eda. Dadas dos listas numericas pareadas por indice,
limpia los pares de forma defensiva, calcula correlaciones lineal (Pearson) y de
rangos (Spearman) y ajustes polinomicos de grado 2 y 3, y a partir de esas
senales etiqueta la forma de la relacion para el EDA automatico:
"lineal" | "polinómica (grado 2)" | "polinómica (grado 3)" |
"monótona no-lineal" | "débil/sin forma"
Ademas devuelve los coeficientes del mejor modelo (en orden de numpy.polyval)
para que el capitulo pinte la curva de ajuste sobre el scatter. Reusa las
funciones del registry `pearson` y `spearman_corr` en vez de reimplementarlas.
NUNCA lanza: ante cualquier fallo o dato insuficiente devuelve el dict canonico
con tipo="débil/sin forma" y el resto de claves a None.
"""
import math
import warnings
import numpy as np
from datascience.datascience import pearson
from datascience.spearman_corr import spearman_corr
# Forma canonica de la respuesta cuando no se puede clasificar (datos
# insuficientes, varianza nula o error interno). Siempre las mismas claves.
_WEAK = {
"tipo": "débil/sin forma",
"pearson": None,
"r2_linear": None,
"spearman": None,
"r2_poly2": None,
"r2_poly3": None,
"best_degree": None,
"coeffs": None,
}
def _is_num(v) -> bool:
"""True si v es un numero real finito (int/float, no bool, no NaN, no inf)."""
return (
isinstance(v, (int, float))
and not isinstance(v, bool)
and not (isinstance(v, float) and (math.isnan(v) or math.isinf(v)))
)
def _poly_r2(coeffs, x_arr, y_arr, ss_tot: float) -> float:
"""R^2 de un ajuste polinomico: 1 - SS_res/SS_tot. 0 si SS_tot==0."""
if ss_tot == 0.0:
return 0.0
pred = np.polyval(coeffs, x_arr)
ss_res = float(np.sum((y_arr - pred) ** 2))
return 1.0 - ss_res / ss_tot
def classify_relationship_type(xs: list, ys: list) -> dict:
"""Clasifica el tipo de relacion entre dos variables numericas pareadas.
Empareja xs[i],ys[i] por indice y descarta el par si cualquiera de los dos
es None, bool, NaN o inf. Sobre los pares limpios calcula Pearson r
(r2_linear = r**2), Spearman rho y los R^2 de ajustes polinomicos de grado 2
y 3 (con numpy.polyfit + R^2 manual). Con esas senales decide la etiqueta.
Orden de evaluacion de la etiqueta (la primera que matchee gana). Los
umbrales estan calibrados para datos reales, a menudo discretos y ruidosos
(conteos, escalas ordinales): una relacion con |r| >= 0.3, |rho| >= 0.3 o un
polinomio con R^2 >= 0.3 ya tiene FORMA y no debe etiquetarse como "debil".
1. "débil/sin forma" — todas las senales bajas a la vez:
abs(pearson) < 0.3 y abs(spearman) < 0.3 y mejor_poly < 0.3.
2. "monótona no-lineal" — el rango (Spearman) capta una monotonia que el
Pearson lineal no: abs(spearman) - abs(pearson) >= 0.1 y
abs(spearman) >= 0.4. No se fuerza un polinomio (coeffs/best_degree =
None); el capitulo dibuja la tendencia ordenada sobre el scatter.
3. "polinómica (grado N)" — el mejor polinomico mejora claramente sobre
el lineal (mejor_poly - r2_linear >= 0.1) y mejor_poly >= 0.3. N es el
grado (2 o 3) con mejor R^2, prefiriendo el 2 si empatan dentro de 0.02
(parsimonia).
4. "lineal" — el resto: hay senal (no es debil) y la forma que existe es
esencialmente lineal. best_degree=1, coeffs del ajuste de grado 1.
Si hay menos de 5 pares validos, o la varianza de xs o de ys es ~0
(constante), devuelve directamente "débil/sin forma".
Args:
xs: lista (o tupla) de valores numericos de la primera variable,
pareada por indice con ys. Pares con None/bool/NaN/inf se descartan.
ys: lista (o tupla) de valores numericos de la segunda variable,
pareada por indice con xs.
Returns:
dict con SIEMPRE las mismas claves:
tipo (str), pearson (float|None), r2_linear (float|None),
spearman (float|None), r2_poly2 (float|None), r2_poly3 (float|None),
best_degree (int|None: 1, 2, 3 o None),
coeffs (list|None: coeficientes en orden de numpy.polyval, o None).
Nunca lanza: ante fallo o datos insuficientes devuelve el dict debil.
"""
try:
if xs is None or ys is None:
return dict(_WEAK)
pairs = [
(float(x), float(y))
for x, y in zip(xs, ys)
if _is_num(x) and _is_num(y)
]
# Datos insuficientes para hablar de forma de la relacion.
if len(pairs) < 5:
return dict(_WEAK)
clean_x = [p[0] for p in pairs]
clean_y = [p[1] for p in pairs]
# Varianza ~0 en cualquiera de las series => relacion indefinida.
if len(set(clean_x)) < 2 or len(set(clean_y)) < 2:
return dict(_WEAK)
x_arr = np.asarray(clean_x, dtype=float)
y_arr = np.asarray(clean_y, dtype=float)
if float(np.var(x_arr)) < 1e-15 or float(np.var(y_arr)) < 1e-15:
return dict(_WEAK)
# Correlaciones reutilizando las funciones del registry.
r = pearson(clean_x, clean_y)
spearman = spearman_corr(clean_x, clean_y)
r2_linear = r ** 2
# Ajustes polinomicos grado 2 y 3 con R^2 manual.
ss_tot = float(np.sum((y_arr - float(np.mean(y_arr))) ** 2))
with warnings.catch_warnings():
warnings.simplefilter("ignore")
c1 = np.polyfit(x_arr, y_arr, 1)
c2 = np.polyfit(x_arr, y_arr, 2)
c3 = np.polyfit(x_arr, y_arr, 3)
r2_poly2 = _poly_r2(c2, x_arr, y_arr, ss_tot)
r2_poly3 = _poly_r2(c3, x_arr, y_arr, ss_tot)
mejor_poly = max(r2_poly2, r2_poly3)
# Grado del mejor polinomico, con preferencia por la parsimonia: solo se
# elige el grado 3 si supera al grado 2 por mas de 0.02.
best_poly_degree = 3 if (r2_poly3 - r2_poly2) > 0.02 else 2
abs_s = abs(spearman)
abs_p = abs(r)
# Decision en orden: debil-temprano -> monotona -> polinomica -> lineal.
if abs_p < 0.3 and abs_s < 0.3 and mejor_poly < 0.3:
# Ninguna senal supera el suelo de forma: relacion debil/sin forma.
tipo = "débil/sin forma"
best_degree = None
coeffs = None
elif (abs_s - abs_p) >= 0.1 and abs_s >= 0.4:
# Spearman (rango) capta una monotonia que el Pearson lineal no:
# relacion monotona no-lineal. No se fuerza un polinomio que tal vez
# no ajusta bien; el capitulo dibuja la tendencia ordenada.
tipo = "monótona no-lineal"
best_degree = None
coeffs = None
elif (mejor_poly - r2_linear) >= 0.1 and mejor_poly >= 0.3:
tipo = "polinómica (grado {})".format(best_poly_degree)
best_degree = best_poly_degree
best_coeffs = c2 if best_poly_degree == 2 else c3
coeffs = [float(c) for c in best_coeffs]
else:
# Hay senal (no es debil) y no es ni monotona-pura ni polinomica:
# la correlacion que existe es esencialmente lineal.
tipo = "lineal"
best_degree = 1
coeffs = [float(c) for c in c1]
return {
"tipo": tipo,
"pearson": round(float(r), 6),
"r2_linear": round(float(r2_linear), 6),
"spearman": round(float(spearman), 6),
"r2_poly2": round(float(r2_poly2), 6),
"r2_poly3": round(float(r2_poly3), 6),
"best_degree": best_degree,
"coeffs": (
[round(c, 8) for c in coeffs] if coeffs is not None else None
),
}
except Exception:
return dict(_WEAK)
@@ -1,174 +0,0 @@
"""Tests para classify_relationship_type."""
import os
import sys
import numpy as np
sys.path.insert(0, os.path.dirname(__file__))
from classify_relationship_type import classify_relationship_type
# Claves que el dict de salida debe contener SIEMPRE.
_EXPECTED_KEYS = {
"tipo", "pearson", "r2_linear", "spearman",
"r2_poly2", "r2_poly3", "best_degree", "coeffs",
}
def _assert_shape(r):
"""Toda salida tiene exactamente las 8 claves canonicas."""
assert isinstance(r, dict)
assert set(r.keys()) == _EXPECTED_KEYS
def test_lineal():
"""Golden: y = 2x + 1 con ruido pequeno -> 'lineal', best_degree=1."""
rng = np.random.default_rng(42)
x = np.linspace(0.0, 10.0, 50)
y = 2.0 * x + 1.0 + rng.normal(0.0, 0.3, 50)
r = classify_relationship_type(list(x), list(y))
_assert_shape(r)
assert r["tipo"] == "lineal"
assert r["best_degree"] == 1
assert r["r2_linear"] >= 0.5
# coeffs ~ [pendiente, intercepto] del ajuste de grado 1.
assert r["coeffs"] is not None and len(r["coeffs"]) == 2
assert abs(r["coeffs"][0] - 2.0) < 0.1 # pendiente ~2
assert abs(r["coeffs"][1] - 1.0) < 0.3 # intercepto ~1
def test_polinomica_cuadratica():
"""Golden: y = x**2 sobre [-10, 10] -> 'polinómica', best_degree in (2, 3)."""
x = np.linspace(-10.0, 10.0, 60)
y = x ** 2
r = classify_relationship_type(list(x), list(y))
_assert_shape(r)
assert r["tipo"].startswith("polinómica")
assert r["best_degree"] in (2, 3)
# Una parabola perfecta queda capturada por el grado 2 (parsimonia).
assert r["best_degree"] == 2
assert r["r2_poly2"] > 0.99
assert r["coeffs"] is not None and len(r["coeffs"]) == r["best_degree"] + 1
def test_monotona_no_lineal():
"""Golden: monotona convexa de cola pesada -> 'monótona no-lineal'.
y = 1/(N+1-i)**2 es estrictamente creciente (Spearman ~ 1) pero su cola
explosiva hace que ni la recta ni un polinomio de grado 2/3 la ajusten
(R^2 polinomico < 0.5), de modo que el Pearson lineal NO capta la relacion
que el rango (Spearman) si ve. Construccion deterministica (sin azar).
"""
n = 200
i = np.arange(n, dtype=float)
y = 1.0 / (n + 1 - i) ** 2
r = classify_relationship_type(list(i), list(y))
_assert_shape(r)
assert r["tipo"] == "monótona no-lineal"
assert r["best_degree"] is None
assert r["coeffs"] is None
# Spearman fuerte y claramente por encima del Pearson.
assert abs(r["spearman"]) >= 0.5
assert abs(r["spearman"]) - abs(r["pearson"]) >= 0.15
def test_monotona_exponencial():
"""DoD literal: y = exp(x) (monotona no-lineal) -> 'monótona no-lineal'.
exp es estrictamente creciente (Spearman = 1) pero el Pearson lineal queda
claramente por debajo (~0.86), así que la dominancia del rango la marca como
monótona no-lineal en vez de lineal o polinómica.
"""
x = np.linspace(0.0, 5.0, 80)
y = np.exp(x)
r = classify_relationship_type(list(x), list(y))
_assert_shape(r)
assert r["tipo"] == "monótona no-lineal"
assert r["best_degree"] is None and r["coeffs"] is None
assert abs(r["spearman"]) >= 0.9
assert abs(r["spearman"]) - abs(r["pearson"]) >= 0.1
def test_debil_sin_forma():
"""Golden: x e y independientes (semilla fija) -> 'débil/sin forma'."""
rng = np.random.default_rng(0)
x = rng.normal(0.0, 1.0, 200)
y = rng.normal(0.0, 1.0, 200)
r = classify_relationship_type(list(x), list(y))
_assert_shape(r)
assert r["tipo"] == "débil/sin forma"
assert r["best_degree"] is None
assert r["coeffs"] is None
# Todas las senales son bajas.
assert abs(r["pearson"]) < 0.3
assert r["r2_linear"] < 0.1
def test_lista_vacia_no_lanza():
"""Edge: listas vacias -> dict debil canonico, sin lanzar."""
r = classify_relationship_type([], [])
_assert_shape(r)
assert r["tipo"] == "débil/sin forma"
assert r["pearson"] is None
assert r["r2_linear"] is None
assert r["spearman"] is None
assert r["r2_poly2"] is None
assert r["r2_poly3"] is None
assert r["best_degree"] is None
assert r["coeffs"] is None
def test_longitudes_distintas_no_lanza():
"""Edge: listas de distinta longitud -> empareja por indice, no lanza."""
# zip trunca a la longitud minima: solo 3 pares (< 5) -> debil.
r = classify_relationship_type([1, 2, 3, 4, 5, 6, 7, 8], [1.0, 2.0, 3.0])
_assert_shape(r)
assert r["tipo"] == "débil/sin forma"
assert r["best_degree"] is None
def test_todos_none_no_lanza():
"""Edge: todos los valores None -> ningun par valido -> debil, no lanza."""
r = classify_relationship_type([None, None, None, None, None, None],
[None, None, None, None, None, None])
_assert_shape(r)
assert r["tipo"] == "débil/sin forma"
assert r["coeffs"] is None
def test_entradas_none_no_lanza():
"""Edge: xs/ys None directamente -> debil, no lanza."""
assert classify_relationship_type(None, None)["tipo"] == "débil/sin forma"
assert classify_relationship_type([1.0, 2.0], None)["tipo"] == "débil/sin forma"
def test_constante_no_lanza():
"""Edge: ys constante (varianza ~0) -> debil, no lanza."""
r = classify_relationship_type([1, 2, 3, 4, 5, 6, 7], [5, 5, 5, 5, 5, 5, 5])
_assert_shape(r)
assert r["tipo"] == "débil/sin forma"
def test_filtra_nan_inf_bool():
"""Edge: pares con NaN/inf/bool/None se descartan por indice."""
nan = float("nan")
inf = float("inf")
# Solo i=0,1,2,3,4 quedan validos (5 pares) y forman una recta perfecta.
xs = [0.0, 1.0, 2.0, 3.0, 4.0, nan, inf, True, None]
ys = [1.0, 3.0, 5.0, 7.0, 9.0, 1.0, 2.0, 3.0, 4.0]
r = classify_relationship_type(xs, ys)
_assert_shape(r)
# Los 5 pares validos son y = 2x + 1 exacto -> lineal.
assert r["tipo"] == "lineal"
assert r["best_degree"] == 1
@@ -0,0 +1,87 @@
---
name: confidence_interval_mean
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def confidence_interval_mean(data: list, other: list = None, confidence: float = 0.95) -> dict"
description: "Intervalo de confianza (IC) de la media de una muestra con la t de Student, o de la DIFERENCIA de medias de dos muestras independientes con el metodo de Welch (sin asumir varianzas iguales). Una muestra: df=n-1, se=sd_muestral/sqrt(n) (sd con ddof=1), tcrit=t.ppf((1+confidence)/2, df), ci=mean+/-tcrit*se. Dos muestras: IC de mean(data)-mean(other) con se=sqrt(se1^2+se2^2) y grados de libertad de Welch-Satterthwaite. Pura y robusta: nunca lanza; ante casos degenerados (muestra vacia, n<2) devuelve nan + clave note, y con varianza cero el IC colapsa al punto (no es error). Usa scipy.stats y numpy."
tags: [papers, statistics, confidence-interval, welch, t-test, python]
params:
- name: data
desc: "muestra de observaciones numericas (lista de numeros). Si other es None, el IC es el de la media de data."
- name: other
desc: "segunda muestra independiente (lista de numeros) o None (default). Si se da, el IC es el de la diferencia de medias mean(data)-mean(other) calculada con Welch (no asume varianzas iguales)."
- name: confidence
desc: "nivel de confianza en (0, 1); 0.95 = IC del 95% (default). El cuantil critico es t.ppf((1+confidence)/2, df)."
output: "dict {mean, ci_low, ci_high, se, df, confidence, n}. mean = media de data (una muestra) o la diferencia mean(data)-mean(other) (dos muestras). En el caso de dos muestras se anaden ademas n1 y n2 (y n = n1+n2). df son los grados de libertad de la t (Welch-Satterthwaite si dos muestras). Casos degenerados (muestra vacia, n<2) anaden la clave note y dejan ci_low/ci_high/se (y a veces df) en nan; con varianza cero y n>=2 el IC colapsa a [mean, mean] con se=0 (con note, sin nan). Nunca None ni excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [scipy, numpy]
tested: true
tests: ["test_one_sample_golden_contra_scipy", "test_one_sample_distinto_nivel_confianza", "test_welch_diferencia_golden_contra_scipy", "test_edge_un_solo_elemento_no_lanza_nan_note", "test_edge_lista_vacia_no_lanza_note", "test_edge_varianza_cero_colapsa_al_punto", "test_edge_welch_muestra_vacia_no_lanza_note", "test_edge_welch_n1_uno_no_lanza_note"]
test_file_path: "python/functions/datascience/confidence_interval_mean_test.py"
file_path: "python/functions/datascience/confidence_interval_mean.py"
---
## Ejemplo
```python
from datascience import confidence_interval_mean
# IC del 95% de la media de una muestra (t de Student).
data = [2, 4, 4, 4, 5, 5, 7, 9]
ci = confidence_interval_mean(data, confidence=0.95)
print(ci["mean"]) # -> 5.0
print(ci["df"]) # -> 7.0 (n - 1)
print(round(ci["ci_low"], 5), round(ci["ci_high"], 5))
# -> 3.21251 6.78749 (se con sd muestral ddof=1 ~ 2.13809)
# IC del 95% de la DIFERENCIA de medias (Welch, no asume varianzas iguales).
control = [23.0, 21.0, 25.0, 22.0, 24.0, 26.0]
tratado = [18.0, 20.0, 17.0, 19.0, 21.0]
diff = confidence_interval_mean(control, tratado, confidence=0.95)
print(diff["mean"]) # -> 4.5 (mean(control) - mean(tratado))
print(round(diff["ci_low"], 4), round(diff["ci_high"], 4))
# Si el intervalo no incluye 0, la diferencia es significativa al 5%.
# Degenerados: nunca lanza.
print(confidence_interval_mean([5])["note"]) # n < 2: ... indefinidos
print(confidence_interval_mean([3, 3, 3])["se"]) # -> 0.0 (IC colapsa a [3, 3])
```
## Cuando usarla
Cuando quieras cuantificar la **incertidumbre de una media estimada** a partir de
una muestra: reporta `[ci_low, ci_high]` en vez de un punto suelto para mostrar
el rango plausible del valor real al nivel de confianza pedido. Usala tambien
para **comparar dos grupos** (A/B test, control vs tratamiento, antes vs
despues con grupos independientes): pasa las dos muestras y, si el IC de la
diferencia **no incluye el 0**, la diferencia es significativa al nivel
`1 - confidence`. Es el complemento del p-valor: ademas de "hay efecto", te dice
"de que tamano y con que margen". Para dos muestras usa Welch por defecto, asi
que no necesitas comprobar antes si las varianzas son iguales.
## Gotchas
- Pura y determinista (no hace I/O, no muta las entradas), pero **no** es
stdlib-only: depende de `scipy.stats` y `numpy` (ambos en el venv del proyecto).
- Con `other` usa **Welch** (df de Welch-Satterthwaite): NO asume varianzas
iguales ni tamanos de muestra iguales. Si necesitas el t-test clasico de
varianzas agrupadas (pooled), esta funcion no lo hace.
- `sd` se calcula con **ddof=1** (sd muestral), que es lo correcto para el IC de
una media con la t. Atajos como `sd_poblacional/sqrt(n)` (ddof=0) dan un
intervalo demasiado estrecho.
- En el caso de dos muestras, `mean` es la **diferencia** `mean(data) - mean(other)`
(no la media de data). El orden importa: el signo del IC depende de cual va
primero.
- Nunca lanza. Casos degenerados devuelven `nan` en `ci_low`/`ci_high`/`se`
(y a veces `df`) mas una clave `note`: muestra vacia o `n < 2` en cualquiera de
las muestras. **Excepcion**: con varianza cero y `n >= 2` el IC colapsa al
punto `[mean, mean]` con `se = 0` (no es un error, no hay `nan`).
- Comprueba `"note" in out` antes de usar `ci_low`/`ci_high` si la muestra puede
ser degenerada.
@@ -0,0 +1,176 @@
"""Intervalo de confianza de la media (una muestra) o de la diferencia de medias (Welch).
Funcion pura del grupo papers. Calcula el intervalo de confianza (IC) de la media
de una muestra usando la t de Student, o el IC de la diferencia de medias de dos
muestras independientes con el metodo de Welch (sin asumir varianzas iguales).
- Una muestra: ``df = n - 1``, ``se = sd / sqrt(n)`` (sd con ddof=1),
``tcrit = t.ppf((1 + confidence) / 2, df)``, ``ci = mean +/- tcrit * se``.
- Dos muestras (Welch): IC de ``mean(data) - mean(other)``, con
``se = sqrt(se1^2 + se2^2)`` y grados de libertad de Welch-Satterthwaite.
No lanza excepciones: ante casos degenerados (muestras vacias, ``n < 2``,
varianza cero) devuelve un dict coherente con ``ci_low``/``ci_high``/``se`` en
``nan`` (salvo el sub-caso de varianza cero, donde el IC colapsa al punto) y una
clave ``note`` explicando el caso. Usa ``scipy.stats`` y ``numpy``.
"""
from __future__ import annotations
import math
import numpy as np
from scipy import stats
def confidence_interval_mean(
data: list, other: list = None, confidence: float = 0.95
) -> dict:
"""Intervalo de confianza de la media o de la diferencia de medias (Welch).
Si ``other`` es ``None``, calcula el IC de la media de ``data`` con la t de
Student. Si se proporciona ``other``, calcula el IC de la diferencia
``mean(data) - mean(other)`` con el metodo de Welch (no asume varianzas
iguales) y grados de libertad de Welch-Satterthwaite.
Es una funcion pura y determinista: no hace I/O ni muta las entradas. No
lanza excepcion ante datos degenerados; en su lugar devuelve un dict con la
clave ``note`` y los campos numericos indefinidos a ``nan``.
Args:
data: muestra de observaciones numericas (lista de numeros).
other: segunda muestra independiente. Si se da, el IC es el de la
diferencia de medias ``mean(data) - mean(other)`` con Welch. Si es
``None`` (default), el IC es el de la media de ``data``.
confidence: nivel de confianza en (0, 1), p.ej. 0.95 para el 95%.
Returns:
dict con las claves:
mean: media de ``data`` (una muestra) o la diferencia
``mean(data) - mean(other)`` (dos muestras).
ci_low: extremo inferior del intervalo de confianza.
ci_high: extremo superior del intervalo de confianza.
se: error estandar de la media (o de la diferencia).
df: grados de libertad de la t (Welch-Satterthwaite si dos muestras).
confidence: nivel de confianza aplicado (float).
n: tamano de la muestra (una muestra) o tamano total ``n1 + n2``
(dos muestras; ademas se incluyen ``n1`` y ``n2``).
En el caso de dos muestras se incluyen ademas ``n1`` y ``n2``. Casos
degenerados (muestra vacia, ``n < 2``, etc.) anaden la clave ``note`` y
dejan ``ci_low``/``ci_high``/``se`` (y a veces ``df``) en ``nan``.
"""
conf = float(confidence)
if other is None:
return _ci_one_sample(data, conf)
return _ci_welch(data, other, conf)
def _ci_one_sample(data: list, conf: float) -> dict:
"""IC de la media de una sola muestra con la t de Student."""
arr = np.asarray(list(data), dtype=float)
n = int(arr.size)
base = {
"mean": float("nan"),
"ci_low": float("nan"),
"ci_high": float("nan"),
"se": float("nan"),
"df": float("nan"),
"confidence": conf,
"n": n,
}
if n == 0:
base["note"] = "muestra vacia: media e intervalo indefinidos"
return base
mean = float(arr.mean())
base["mean"] = mean
if n < 2:
base["note"] = "n < 2: error estandar y grados de libertad indefinidos"
return base
df = n - 1
base["df"] = float(df)
sd = float(arr.std(ddof=1))
se = sd / math.sqrt(n)
base["se"] = se
# Varianza cero: el IC colapsa al punto (no es un error).
if se == 0.0:
base["ci_low"] = mean
base["ci_high"] = mean
base["note"] = "varianza cero: el intervalo colapsa a la media"
return base
tcrit = float(stats.t.ppf((1.0 + conf) / 2.0, df))
margin = tcrit * se
base["ci_low"] = mean - margin
base["ci_high"] = mean + margin
return base
def _ci_welch(data: list, other: list, conf: float) -> dict:
"""IC de la diferencia de medias de dos muestras con el metodo de Welch."""
a = np.asarray(list(data), dtype=float)
b = np.asarray(list(other), dtype=float)
n1 = int(a.size)
n2 = int(b.size)
base = {
"mean": float("nan"),
"ci_low": float("nan"),
"ci_high": float("nan"),
"se": float("nan"),
"df": float("nan"),
"confidence": conf,
"n": n1 + n2,
"n1": n1,
"n2": n2,
}
if n1 == 0 or n2 == 0:
base["note"] = "alguna muestra esta vacia: diferencia e intervalo indefinidos"
return base
mean1 = float(a.mean())
mean2 = float(b.mean())
diff = mean1 - mean2
base["mean"] = diff
if n1 < 2 or n2 < 2:
base["note"] = (
"n < 2 en alguna muestra: error estandar y grados de libertad indefinidos"
)
return base
sd1 = float(a.std(ddof=1))
sd2 = float(b.std(ddof=1))
se1 = sd1 / math.sqrt(n1)
se2 = sd2 / math.sqrt(n2)
se = math.sqrt(se1 * se1 + se2 * se2)
base["se"] = se
# Ambas varianzas cero: el IC de la diferencia colapsa al punto.
if se == 0.0:
base["ci_low"] = diff
base["ci_high"] = diff
base["df"] = float("nan")
base["note"] = "varianza cero en ambas muestras: el intervalo colapsa a la diferencia"
return base
# Grados de libertad de Welch-Satterthwaite.
df = (se1 * se1 + se2 * se2) ** 2 / (
(se1**4) / (n1 - 1) + (se2**4) / (n2 - 1)
)
base["df"] = float(df)
tcrit = float(stats.t.ppf((1.0 + conf) / 2.0, df))
margin = tcrit * se
base["ci_low"] = diff - margin
base["ci_high"] = diff + margin
return base
@@ -0,0 +1,140 @@
"""Tests para confidence_interval_mean (IC de la media / diferencia de medias Welch).
Importa el modulo hoja directamente (`confidence_interval_mean`) para no depender
de que el paquete reexporte la funcion en su __init__ (lo integra el orquestador
al cerrar el grupo).
Los golden se calculan con scipy dentro del propio test para que sean robustos:
la funcion bajo prueba debe coincidir con la referencia de scipy a ~1e-9.
"""
import math
import numpy as np
from scipy import stats
from confidence_interval_mean import confidence_interval_mean
def test_one_sample_golden_contra_scipy():
# mean=5.0, n=8. Este dataset tiene sd POBLACIONAL (ddof=0) exactamente 2.0,
# pero la sd MUESTRAL (ddof=1, la que exige la spec y la que es correcta para
# el IC de una media con la t) es sqrt(32/7) ~ 2.13809. El golden robusto se
# calcula con scipy usando se con ddof=1, no con el atajo 2.0/sqrt(8).
data = [2, 4, 4, 4, 5, 5, 7, 9]
out = confidence_interval_mean(data, confidence=0.95)
n = len(data)
mean = float(np.mean(data))
sd = float(np.std(data, ddof=1)) # sample sd ~ 2.13809
se = sd / math.sqrt(n)
lo, hi = stats.t.interval(0.95, df=n - 1, loc=mean, scale=se)
assert abs(out["mean"] - 5.0) < 1e-9
assert abs(out["se"] - se) < 1e-12
assert out["df"] == 7.0
assert out["n"] == 8
assert out["confidence"] == 0.95
assert abs(out["ci_low"] - lo) < 1e-9
assert abs(out["ci_high"] - hi) < 1e-9
# Valores tabulados correctos para ddof=1 (no los 3.32793/6.67207 del
# enunciado, que asumian erroneamente sd=2.0 / ddof=0).
assert abs(out["ci_low"] - 3.21251) < 1e-3
assert abs(out["ci_high"] - 6.78749) < 1e-3
assert "note" not in out
def test_one_sample_distinto_nivel_confianza():
data = [10.0, 12.0, 11.0, 13.0, 9.0, 14.0]
out = confidence_interval_mean(data, confidence=0.99)
n = len(data)
mean = float(np.mean(data))
se = float(np.std(data, ddof=1)) / math.sqrt(n)
lo, hi = stats.t.interval(0.99, df=n - 1, loc=mean, scale=se)
assert abs(out["mean"] - mean) < 1e-12
assert abs(out["ci_low"] - lo) < 1e-9
assert abs(out["ci_high"] - hi) < 1e-9
assert out["df"] == float(n - 1)
def test_welch_diferencia_golden_contra_scipy():
data = [23.0, 21.0, 25.0, 22.0, 24.0, 26.0]
other = [18.0, 20.0, 17.0, 19.0, 21.0]
conf = 0.95
out = confidence_interval_mean(data, other, confidence=conf)
a = np.asarray(data, dtype=float)
b = np.asarray(other, dtype=float)
n1, n2 = a.size, b.size
mean1, mean2 = float(a.mean()), float(b.mean())
diff = mean1 - mean2
se1 = float(a.std(ddof=1)) / math.sqrt(n1)
se2 = float(b.std(ddof=1)) / math.sqrt(n2)
se = math.sqrt(se1**2 + se2**2)
df = (se1**2 + se2**2) ** 2 / (se1**4 / (n1 - 1) + se2**4 / (n2 - 1))
lo, hi = stats.t.interval(conf, df=df, loc=diff, scale=se)
assert abs(out["mean"] - diff) < 1e-9
assert abs(out["mean"] - (mean1 - mean2)) < 1e-9
assert abs(out["se"] - se) < 1e-12
assert abs(out["df"] - df) < 1e-9
assert abs(out["ci_low"] - lo) < 1e-9
assert abs(out["ci_high"] - hi) < 1e-9
assert out["n1"] == n1
assert out["n2"] == n2
assert out["n"] == n1 + n2
assert "note" not in out
def test_edge_un_solo_elemento_no_lanza_nan_note():
out = confidence_interval_mean([5], confidence=0.95)
assert out["mean"] == 5.0 # la media si esta definida con n=1
assert math.isnan(out["se"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["ci_high"])
assert math.isnan(out["df"])
assert out["n"] == 1
assert "note" in out
def test_edge_lista_vacia_no_lanza_note():
out = confidence_interval_mean([], confidence=0.95)
assert math.isnan(out["mean"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["ci_high"])
assert math.isnan(out["se"])
assert out["n"] == 0
assert "note" in out
def test_edge_varianza_cero_colapsa_al_punto():
out = confidence_interval_mean([3, 3, 3], confidence=0.95)
assert out["mean"] == 3.0
assert out["se"] == 0.0
assert out["ci_low"] == 3.0
assert out["ci_high"] == 3.0
assert not math.isnan(out["ci_low"])
assert out["n"] == 3
assert "note" in out
def test_edge_welch_muestra_vacia_no_lanza_note():
out = confidence_interval_mean([1.0, 2.0, 3.0], [], confidence=0.95)
assert math.isnan(out["mean"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["se"])
assert out["n1"] == 3
assert out["n2"] == 0
assert "note" in out
def test_edge_welch_n1_uno_no_lanza_note():
out = confidence_interval_mean([5.0], [1.0, 2.0, 3.0], confidence=0.95)
# La diferencia de medias si esta definida.
assert abs(out["mean"] - (5.0 - 2.0)) < 1e-9
assert math.isnan(out["se"])
assert math.isnan(out["ci_low"])
assert math.isnan(out["df"])
assert "note" in out
@@ -0,0 +1,80 @@
---
name: effect_size_cohens_d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def effect_size_cohens_d(group_a: list, group_b: list) -> dict"
description: "Tamano del efecto (effect size) entre dos grupos numericos: Cohen's d (diferencia de medias estandarizada por la desviacion tipica combinada, varianzas muestrales ddof=1), Hedges' g (d corregido por el sesgo al alza con muestras pequenas via el factor J) e interpretacion cualitativa de la magnitud segun los umbrales clasicos de Cohen (negligible/small/medium/large). El p-valor dice si hay diferencia; el effect size dice como de grande, de forma adimensional e independiente del N. Pura, sin dependencias externas; nunca lanza: los casos degenerados (varianza cero, N<2, listas vacias) devuelven NaN + una clave note."
tags: [papers, statistics, effect-size, cohens-d, hedges-g, python]
params:
- name: group_a
desc: "primera muestra (lista de numeros). Necesita >=2 observaciones para que exista la varianza muestral (ddof=1)."
- name: group_b
desc: "segunda muestra (lista de numeros). Necesita >=2 observaciones. El signo de cohens_d es positivo cuando mean_a > mean_b."
output: "dict {cohens_d: float (diferencia de medias estandarizada, puede ser NaN), hedges_g: float (cohens_d * factor de correccion J, puede ser NaN), interpretation: str ('negligible'|'small'|'medium'|'large', o 'undefined' en casos degenerados), n_a: int, n_b: int, mean_a: float, mean_b: float, pooled_sd: float (desviacion tipica combinada)}. Casos degenerados (varianza cero en ambos grupos, N<2 en algun grupo, o listas vacias) anaden clave note. Nunca None ni excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: true
tests: ["test_golden_large_effect", "test_hedges_g_menor_en_magnitud_que_cohens_d", "test_interpretation_thresholds", "test_signo_positivo_cuando_a_mayor_que_b", "test_varianza_cero_no_lanza", "test_n_insuficiente_no_lanza", "test_listas_vacias_no_lanza", "test_un_grupo_vacio_no_lanza"]
test_file_path: "python/functions/datascience/effect_size_cohens_d_test.py"
file_path: "python/functions/datascience/effect_size_cohens_d.py"
---
## Ejemplo
```python
from datascience import effect_size_cohens_d
# Dos grupos desplazados 2 unidades, misma dispersion.
a = [1, 2, 3, 4, 5] # media 3, varianza muestral 2.5
b = [3, 4, 5, 6, 7] # media 5, varianza muestral 2.5
out = effect_size_cohens_d(a, b)
print(out["cohens_d"]) # -> -1.264911... (a esta 1.26 SD por debajo de b)
print(out["hedges_g"]) # -> -1.142500... (|g| < |d|: correccion N pequeno)
print(out["interpretation"]) # -> "large" (|d| >= 0.8)
print(out["pooled_sd"]) # -> 1.581138...
# Caso degenerado: varianza cero -> no lanza, NaN + note.
deg = effect_size_cohens_d([5, 5, 5], [5, 5, 5])
print(deg["interpretation"]) # -> "undefined"
print(deg["note"]) # -> "varianza cero, effect size indefinido"
```
## Cuando usarla
Cuando ya sepas que dos grupos difieren (o quieras cuantificar su diferencia)
y necesites una medida **de magnitud, no de significancia**: comparar el antes
y el despues de una intervencion, el grupo control frente al tratamiento, o dos
cohortes. Reportala junto al p-valor para responder "¿como de grande es la
diferencia?" — un p-valor minusculo con N enorme puede esconder un efecto
trivial. Es adimensional (en unidades de desviaciones tipicas), asi que hace
comparables resultados entre estudios y alimenta meta-analisis. Usa **Hedges' g**
en lugar de Cohen's d cuando los grupos sean pequenos (decenas o menos): d
sobreestima el efecto y g lo corrige.
## Gotchas
- Pura y sin dependencias externas (solo `math` de la stdlib).
- Usa **varianza muestral** (ddof=1), no poblacional. Por eso cada grupo
necesita al menos 2 observaciones; con N=1 la varianza muestral no existe y la
funcion devuelve NaN + `note`.
- **Nunca lanza excepcion**. Los casos degenerados devuelven `cohens_d` y
`hedges_g` a `float('nan')`, `interpretation="undefined"` y una clave `note`:
varianza cero en ambos grupos (`pooled_sd == 0`), N<2 en algun grupo, o listas
vacias. Comprueba con `math.isnan(out["cohens_d"])` o la presencia de `note`
antes de usar el resultado.
- El **signo** de `cohens_d` depende del orden de los argumentos: positivo si
`mean_a > mean_b`, negativo en caso contrario. La `interpretation` usa `|d|`,
asi que no depende del orden.
- `pooled_sd` asume varianzas comparables entre grupos (homogeneidad). Si las
dispersiones son muy distintas, Cohen's d clasico pierde precision; considera
variantes (Glass's delta) fuera del alcance de esta funcion.
- Los umbrales de Cohen (0.2 / 0.5 / 0.8) son convencion, no ley: interpretalos
segun el dominio.
@@ -0,0 +1,156 @@
"""Effect size de dos grupos: Cohen's d, Hedges' g e interpretacion cualitativa.
Funcion pura del grupo papers. El p-valor responde a "¿hay diferencia?" pero no
a "¿como de grande es?". El tamano del efecto (effect size) cuantifica la
magnitud de la diferencia entre dos grupos de forma adimensional, independiente
del N, y es lo que hace comparables resultados entre estudios (meta-analisis).
- Cohen's d: diferencia de medias estandarizada por la desviacion tipica
combinada (pooled SD), con varianzas muestrales (ddof=1).
- Hedges' g: Cohen's d corregido por el sesgo al alza que sufre d con muestras
pequenas, multiplicando por el factor de correccion J.
- interpretation: etiqueta cualitativa de |d| segun los umbrales clasicos de
Cohen (negligible / small / medium / large).
No usa dependencias externas: aritmetica de la libreria estandar (``math``).
"""
from __future__ import annotations
import math
def _mean(xs: list) -> float:
"""Media aritmetica de una lista no vacia de numeros."""
return sum(float(x) for x in xs) / len(xs)
def _sample_variance(xs: list, mean: float) -> float:
"""Varianza muestral (ddof=1) de una lista con al menos 2 elementos."""
n = len(xs)
return sum((float(x) - mean) ** 2 for x in xs) / (n - 1)
def _interpret(abs_d: float) -> str:
"""Etiqueta cualitativa del tamano del efecto segun |d| (umbrales de Cohen)."""
if abs_d < 0.2:
return "negligible"
if abs_d < 0.5:
return "small"
if abs_d < 0.8:
return "medium"
return "large"
def effect_size_cohens_d(group_a: list, group_b: list) -> dict:
"""Calcula el tamano del efecto entre dos grupos numericos.
Devuelve Cohen's d (diferencia de medias estandarizada por la pooled SD),
Hedges' g (d corregido por sesgo de muestra pequena) y una etiqueta
cualitativa de la magnitud segun los umbrales de Cohen.
Es una funcion pura y determinista: no hace I/O, no muta la entrada. No lanza
excepcion ante datos degenerados; en su lugar devuelve un dict con
``cohens_d`` / ``hedges_g`` a ``float('nan')``, ``interpretation`` a
``"undefined"`` y una clave ``note`` explicando el caso.
Definiciones:
s_pooled = sqrt(((n1-1)*s1^2 + (n2-1)*s2^2) / (n1+n2-2)), con s1^2, s2^2
varianzas muestrales (ddof=1).
cohens_d = (mean_a - mean_b) / s_pooled.
J = 1 - 3 / (4*(n1+n2) - 9) (factor de correccion de Hedges).
hedges_g = cohens_d * J.
Args:
group_a: primera muestra (lista de numeros). Necesita >=2 elementos para
que exista la varianza muestral.
group_b: segunda muestra (lista de numeros). Necesita >=2 elementos.
Returns:
dict con las claves:
cohens_d: float, diferencia de medias estandarizada (puede ser NaN).
hedges_g: float, Cohen's d corregido por sesgo (puede ser NaN).
interpretation: str, "negligible" | "small" | "medium" | "large", o
"undefined" en casos degenerados.
n_a: int, tamano de group_a.
n_b: int, tamano de group_b.
mean_a: float, media de group_a (NaN si vacio).
mean_b: float, media de group_b (NaN si vacio).
pooled_sd: float, desviacion tipica combinada (NaN si indefinida).
Casos degenerados (lista vacia, N<2 en algun grupo, o varianza cero en
ambos grupos -> pooled_sd == 0) anaden ademas una clave ``note``.
"""
nan = float("nan")
n_a = len(group_a)
n_b = len(group_b)
# Listas vacias: ni media ni varianza definidas.
if n_a == 0 or n_b == 0:
return {
"cohens_d": nan,
"hedges_g": nan,
"interpretation": "undefined",
"n_a": n_a,
"n_b": n_b,
"mean_a": _mean(group_a) if n_a else nan,
"mean_b": _mean(group_b) if n_b else nan,
"pooled_sd": nan,
"note": "grupo vacio: media y varianza indefinidas, effect size indefinido",
}
mean_a = _mean(group_a)
mean_b = _mean(group_b)
# N insuficiente: la varianza muestral (ddof=1) no existe con un solo dato,
# y la correccion de Hedges no es fiable.
if n_a < 2 or n_b < 2:
return {
"cohens_d": nan,
"hedges_g": nan,
"interpretation": "undefined",
"n_a": n_a,
"n_b": n_b,
"mean_a": mean_a,
"mean_b": mean_b,
"pooled_sd": nan,
"note": (
"N insuficiente: cada grupo necesita >=2 observaciones para la "
"varianza muestral; effect size indefinido"
),
}
var_a = _sample_variance(group_a, mean_a)
var_b = _sample_variance(group_b, mean_b)
pooled_sd = math.sqrt(
((n_a - 1) * var_a + (n_b - 1) * var_b) / (n_a + n_b - 2)
)
# Varianza cero en ambos grupos: no se puede estandarizar (division por 0).
if pooled_sd == 0.0:
return {
"cohens_d": nan,
"hedges_g": nan,
"interpretation": "undefined",
"n_a": n_a,
"n_b": n_b,
"mean_a": mean_a,
"mean_b": mean_b,
"pooled_sd": 0.0,
"note": "varianza cero, effect size indefinido",
}
cohens_d = (mean_a - mean_b) / pooled_sd
j = 1.0 - 3.0 / (4.0 * (n_a + n_b) - 9.0)
hedges_g = cohens_d * j
return {
"cohens_d": cohens_d,
"hedges_g": hedges_g,
"interpretation": _interpret(abs(cohens_d)),
"n_a": n_a,
"n_b": n_b,
"mean_a": mean_a,
"mean_b": mean_b,
"pooled_sd": pooled_sd,
}
@@ -0,0 +1,96 @@
"""Tests para effect_size_cohens_d (tamano del efecto de dos grupos).
Importa el modulo hoja directamente (`effect_size_cohens_d`) para no depender de
que el paquete reexporte la funcion en su __init__ (lo integra el orquestador al
cerrar el grupo papers). El pytest del repo tiene pythonpath=["functions", ...],
asi que el modulo hoja se resuelve por su nombre directo.
"""
import math
from effect_size_cohens_d import effect_size_cohens_d
def test_golden_large_effect():
# group_a: mean 3, var muestral 2.5; group_b: mean 5, var 2.5.
# pooled_sd = sqrt(2.5) ~= 1.5811388.
# cohens_d = (3-5)/1.5811388 ~= -1.264911.
# J = 1 - 3/(4*10-9) = 1 - 3/31 = 0.9032258.
# hedges_g = d * J = -1.2649111 * 0.9032258 ~= -1.142500.
out = effect_size_cohens_d([1, 2, 3, 4, 5], [3, 4, 5, 6, 7])
assert abs(out["cohens_d"] - (-1.26491)) < 1e-4
assert abs(out["hedges_g"] - (-1.14250)) < 1e-4
assert out["interpretation"] == "large"
assert out["n_a"] == 5
assert out["n_b"] == 5
assert abs(out["mean_a"] - 3.0) < 1e-12
assert abs(out["mean_b"] - 5.0) < 1e-12
assert abs(out["pooled_sd"] - math.sqrt(2.5)) < 1e-9
assert "note" not in out
def test_hedges_g_menor_en_magnitud_que_cohens_d():
# La correccion J esta en (0, 1), asi que |g| < |d| siempre.
out = effect_size_cohens_d([1, 2, 3, 4, 5], [3, 4, 5, 6, 7])
assert abs(out["hedges_g"]) < abs(out["cohens_d"])
def test_interpretation_thresholds():
# negligible: |d| < 0.2. Medias casi iguales con varianza grande.
neg = effect_size_cohens_d([0, 10, 20, 30], [1, 11, 21, 31])
assert neg["interpretation"] == "negligible"
assert abs(neg["cohens_d"]) < 0.2
# small: 0.2 <= |d| < 0.5.
small = effect_size_cohens_d([0, 10, 20, 30], [4, 14, 24, 34])
assert small["interpretation"] == "small"
assert 0.2 <= abs(small["cohens_d"]) < 0.5
# medium: 0.5 <= |d| < 0.8.
medium = effect_size_cohens_d([0, 10, 20, 30], [9, 19, 29, 39])
assert medium["interpretation"] == "medium"
assert 0.5 <= abs(medium["cohens_d"]) < 0.8
def test_signo_positivo_cuando_a_mayor_que_b():
out = effect_size_cohens_d([10, 12, 14, 16], [1, 2, 3, 4])
assert out["cohens_d"] > 0
assert out["interpretation"] == "large"
def test_varianza_cero_no_lanza():
out = effect_size_cohens_d([5, 5, 5], [5, 5, 5])
assert math.isnan(out["cohens_d"])
assert math.isnan(out["hedges_g"])
assert out["interpretation"] == "undefined"
assert out["pooled_sd"] == 0.0
assert "note" in out
assert "varianza cero" in out["note"]
def test_n_insuficiente_no_lanza():
out = effect_size_cohens_d([3], [1, 2, 3])
assert math.isnan(out["cohens_d"])
assert math.isnan(out["hedges_g"])
assert out["interpretation"] == "undefined"
assert out["n_a"] == 1
assert out["n_b"] == 3
assert "note" in out
def test_listas_vacias_no_lanza():
out = effect_size_cohens_d([], [])
assert math.isnan(out["cohens_d"])
assert math.isnan(out["hedges_g"])
assert out["interpretation"] == "undefined"
assert out["n_a"] == 0
assert out["n_b"] == 0
assert "note" in out
def test_un_grupo_vacio_no_lanza():
out = effect_size_cohens_d([1, 2, 3], [])
assert math.isnan(out["cohens_d"])
assert out["interpretation"] == "undefined"
assert out["n_b"] == 0
assert "note" in out
+29 -11
View File
@@ -3,19 +3,19 @@ name: fdr_correction
kind: function
lang: py
domain: datascience
version: "1.0.0"
version: "1.1.0"
purity: pure
signature: "def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = \"bh\") -> dict"
description: "Correccion de comparaciones multiples (multiple-testing) sobre una lista de p-valores: Benjamini-Hochberg (FDR, 'bh') o Bonferroni (FWER, 'bonferroni'). Antidoto al sesgo de mineria de datos (data-mining bias): al evaluar muchas hipotesis a la vez (todos los pares de una matriz), el azar produce falsos positivos; esta funcion ajusta los p-valores y marca cuales siguen siendo significativos tras corregir. Pura, sin dependencias externas, alineada 1:1 con la entrada (admite None en posiciones sin test)."
tags: [eda, statistics, multiple-testing, fdr, benjamini-hochberg, bonferroni, p-value, data-mining-bias, python]
description: "Correccion de comparaciones multiples (multiple-testing) sobre una lista de p-valores: Benjamini-Hochberg (FDR, 'bh'), Bonferroni (FWER, 'bonferroni') o Holm-Bonferroni (FWER step-down, 'holm', mas potente que Bonferroni simple). Antidoto al sesgo de mineria de datos (data-mining bias): al evaluar muchas hipotesis a la vez (todos los pares de una matriz), el azar produce falsos positivos; esta funcion ajusta los p-valores y marca cuales siguen siendo significativos tras corregir. Pura, sin dependencias externas, alineada 1:1 con la entrada (admite None en posiciones sin test)."
tags: [eda, statistics, multiple-testing, fdr, benjamini-hochberg, bonferroni, holm, holm-bonferroni, fwer, p-value, data-mining-bias, python]
params:
- name: pvalues
desc: "lista de p-valores (floats en [0, 1]). Se admiten None u otros valores no validos en posiciones sin test disponible; se propagan como None en la salida y no cuentan como prueba (m)."
- name: alpha
desc: "nivel de significancia objetivo tras la correccion (default 0.05). Para BH es el umbral del FDR; para Bonferroni, del FWER (tasa de error por familia)."
- name: method
desc: "'bh' = Benjamini-Hochberg (controla FDR, menos conservador, mas potencia); 'bonferroni' = controla FWER (mas conservador). Cualquier otro valor devuelve un dict con note."
output: "dict {p_values_adjusted: lista alineada con pvalues (float ajustado o None), reject: lista de bool (True = significativo tras corregir), n_tests: nº de p-valores validos (m), n_rejected: nº de hipotesis rechazadas, alpha: float aplicado, method: str}. Casos degenerados (vacio, sin p validos, metodo desconocido) anaden clave note. Nunca None ni excepcion."
desc: "'bh' = Benjamini-Hochberg (controla FDR, menos conservador, mas potencia); 'bonferroni' = controla FWER (mas conservador); 'holm' = Holm-Bonferroni (controla FWER, step-down, uniformemente mas potente que Bonferroni simple). Cualquier otro valor devuelve un dict con note."
output: "dict {p_values_adjusted: lista alineada con pvalues (float ajustado o None), reject: lista de bool (True = significativo tras corregir), n_tests: nº de p-valores validos (m), n_rejected: nº de hipotesis rechazadas, alpha: float aplicado, method: str ('bh' | 'bonferroni' | 'holm')}. Casos degenerados (vacio, sin p validos, metodo desconocido) anaden clave note. Nunca None ni excepcion."
uses_functions: []
uses_types: []
returns: []
@@ -23,7 +23,7 @@ returns_optional: false
error_type: ""
imports: [math]
tested: true
tests: ["test_bh_golden_rechaza_dos_de_tres", "test_bonferroni_mas_conservador_que_bh", "test_p_values_adjusted_alineados_y_en_rango", "test_none_se_propaga_alineado", "test_lista_vacia_devuelve_note", "test_solo_none_devuelve_note", "test_metodo_desconocido_devuelve_note", "test_todos_significativos"]
tests: ["test_bh_golden_rechaza_dos_de_tres", "test_bonferroni_mas_conservador_que_bh", "test_p_values_adjusted_alineados_y_en_rango", "test_none_se_propaga_alineado", "test_lista_vacia_devuelve_note", "test_solo_none_devuelve_note", "test_metodo_desconocido_devuelve_note", "test_todos_significativos", "test_holm_golden_rechaza_dos_de_cuatro", "test_holm_entre_bonferroni_y_bh", "test_none_se_propaga_alineado_holm", "test_lista_vacia_holm_devuelve_note"]
test_file_path: "python/functions/datascience/fdr_correction_test.py"
file_path: "python/functions/datascience/fdr_correction.py"
---
@@ -45,6 +45,13 @@ bon = fdr_correction(pvalues, alpha=0.05, method="bonferroni")
print(bon["reject"]) # -> [True, False, False]
print(bon["p_values_adjusted"]) # -> [0.03, 0.06, 1.0]
# Holm-Bonferroni (step-down): controla el FWER como Bonferroni pero es mas
# potente; rechaza al menos tanto como Bonferroni simple, nunca menos.
holm = fdr_correction([0.01, 0.04, 0.03, 0.005], alpha=0.05, method="holm")
print(holm["reject"]) # -> [True, False, False, True]
print(holm["p_values_adjusted"]) # -> [0.03, 0.06, 0.06, 0.02]
print(holm["n_rejected"]) # -> 2
# Posiciones sin test (None) se propagan alineadas: el llamador puede pasar la
# lista completa de pares y recuperar el mapeo 1:1.
mix = fdr_correction([0.001, None, 0.9])
@@ -61,8 +68,11 @@ combinaciones y se quede con las que "pasan". Sin corregir, con N pruebas y
alpha=0.05 esperas ~5% de falsos positivos *por azar*: cuantas mas pruebas, mas
correlaciones espurias. Llama a `fdr_correction` con todos los p-valores de la
familia y usa `reject` (no el umbral crudo) para decidir que es real. Usa `"bh"`
por defecto (mejor potencia); `"bonferroni"` cuando un falso positivo sea muy
costoso y prefieras maxima cautela.
por defecto (mejor potencia); `"holm"` (Holm-Bonferroni, FWER step-down) cuando
quieras controlar el FWER pero sin la perdida de potencia de Bonferroni simple
(rechaza al menos tanto como `"bonferroni"`, nunca menos); `"bonferroni"` cuando
un falso positivo sea muy costoso y prefieras la maxima cautela del metodo mas
simple.
## Gotchas
@@ -76,8 +86,16 @@ costoso y prefieras maxima cautela.
eso puedes pasar la lista completa de pares aunque algunos no tengan test.
- `n_tests` es el numero de p-valores **validos** (m), que puede ser menor que
`len(pvalues)` si hay `None`.
- BH y Bonferroni controlan cosas distintas: BH la tasa de falsos
descubrimientos (FDR), Bonferroni la probabilidad de *cualquier* falso
- BH controla cosa distinta que Bonferroni/Holm: BH la tasa de falsos
descubrimientos (FDR); Bonferroni y Holm la probabilidad de *cualquier* falso
positivo (FWER). No son intercambiables; elige segun el coste de equivocarte.
- `"holm"` y `"bonferroni"` controlan ambos el FWER, pero Holm es step-down y
uniformemente mas potente: rechaza al menos tantas hipotesis como Bonferroni
simple sobre el mismo set, nunca menos. Si controlas FWER, `"holm"` domina a
`"bonferroni"` salvo que necesites el ajuste mas simple por interpretabilidad.
- Metodo desconocido o lista vacia/sin p validos no lanzan: devuelven un dict
con `note`.
con `note`. Los metodos validos son `"bh"`, `"bonferroni"` y `"holm"`.
## Capability growth log
- v1.1.0 (2026-06-30) — añade method="holm" (Holm-Bonferroni step-down, FWER, más potente que Bonferroni simple).
+29 -9
View File
@@ -5,12 +5,15 @@ todos los pares de una matriz de asociacion), la probabilidad de obtener al meno
un falso positivo por azar crece con el numero de pruebas: es el sesgo de mineria
de datos (data-mining bias) descrito por Aronson en *Evidence-Based Technical
Analysis* (cap. 6). Esta funcion ajusta los p-valores para controlar ese sesgo
mediante dos metodos clasicos:
mediante tres metodos clasicos:
- Benjamini-Hochberg (``"bh"``): controla la tasa de falsos descubrimientos
(False Discovery Rate, FDR). Menos conservador, mas potencia estadistica.
- Bonferroni (``"bonferroni"``): controla la tasa de error por familia
(Family-Wise Error Rate, FWER). Mas conservador.
- Holm-Bonferroni (``"holm"``): controla el FWER como Bonferroni pero es un
procedimiento step-down uniformemente mas potente; rechaza al menos tantas
hipotesis como Bonferroni simple, nunca menos.
No usa dependencias externas: aritmetica de la libreria estandar.
"""
@@ -35,8 +38,9 @@ def _is_valid_p(v) -> bool:
def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> dict:
"""Corrige una lista de p-valores por comparaciones multiples.
Aplica Benjamini-Hochberg (FDR) o Bonferroni (FWER) sobre ``pvalues`` y
devuelve, alineado posicion a posicion con la entrada, el p-valor ajustado y
Aplica Benjamini-Hochberg (FDR), Bonferroni (FWER) o Holm-Bonferroni
(FWER, step-down) sobre ``pvalues`` y devuelve, alineado posicion a
posicion con la entrada, el p-valor ajustado y
si cada hipotesis se rechaza al nivel ``alpha`` tras la correccion. Las
posiciones cuyo valor no sea un p-valor valido (``None``, ``NaN``, fuera de
``[0, 1]`` o no numerico) se conservan en la salida como ``None`` /
@@ -53,8 +57,10 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
otros valores no validos en posiciones sin test disponible; se
propagan como ``None`` en la salida y no cuentan como prueba.
alpha: nivel de significancia objetivo tras la correccion (default 0.05).
Para BH es el umbral del FDR; para Bonferroni, del FWER.
method: ``"bh"`` (Benjamini-Hochberg, FDR) o ``"bonferroni"`` (FWER).
Para BH es el umbral del FDR; para Bonferroni y Holm, del FWER.
method: ``"bh"`` (Benjamini-Hochberg, FDR), ``"bonferroni"`` (FWER) o
``"holm"`` (Holm-Bonferroni, FWER step-down, mas potente que
Bonferroni simple).
Returns:
dict con las claves:
@@ -68,7 +74,7 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
n_tests: numero de p-valores validos usados en la correccion (m).
n_rejected: numero de hipotesis rechazadas (significativas).
alpha: nivel de significancia aplicado (float).
method: metodo aplicado (``"bh"`` o ``"bonferroni"``).
method: metodo aplicado (``"bh"``, ``"bonferroni"`` o ``"holm"``).
Casos degenerados (lista vacia, sin p-valores validos o metodo
desconocido) anaden ademas una clave ``note`` y devuelven listas
@@ -76,7 +82,7 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
en las posiciones invalidas).
"""
method_norm = (method or "").strip().lower()
if method_norm not in {"bh", "bonferroni"}:
if method_norm not in {"bh", "bonferroni", "holm"}:
n = len(pvalues)
return {
"p_values_adjusted": [None] * n,
@@ -86,8 +92,8 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
"alpha": float(alpha),
"method": method,
"note": (
f"metodo desconocido '{method}'; usa 'bh' (Benjamini-Hochberg) "
"o 'bonferroni'"
f"metodo desconocido '{method}'; usa 'bh' (Benjamini-Hochberg), "
"'bonferroni' o 'holm' (Holm-Bonferroni)"
),
}
@@ -129,6 +135,20 @@ def fdr_correction(pvalues: list, alpha: float = 0.05, method: str = "bh") -> di
padj = min(1.0, p * m)
adjusted[orig_idx] = padj
reject[orig_idx] = padj <= a
elif method_norm == "holm":
# Holm-Bonferroni (step-down). Ordena p ascendente; para el rank k
# (1-indexed) el p ajustado crudo es (m - k + 1) * p_(k). Impon
# monotonicidad acumulada (no decreciente) recorriendo de menor a mayor:
# padj_(k) = max(padj_(k-1), min(1, (m-k+1)*p_(k))), con padj_(0)=0.
order = sorted(valid, key=lambda t: t[1]) # [(orig_idx, p), ...] por p asc
prev = 0.0
for k in range(1, m + 1):
orig_idx, p = order[k - 1]
raw = min(1.0, (m - k + 1) * p)
padj = max(prev, raw)
prev = padj
adjusted[orig_idx] = padj
reject[orig_idx] = padj <= a
else:
# Benjamini-Hochberg (step-up). Ordena p ascendente y calcula q-valores
# con la monotonicidad acumulada de derecha a izquierda.
@@ -82,7 +82,8 @@ def test_solo_none_devuelve_note():
def test_metodo_desconocido_devuelve_note():
out = fdr_correction([0.01, 0.02], method="holm")
# 'holm' ya es un metodo valido (v1.1.0); usamos uno realmente desconocido.
out = fdr_correction([0.01, 0.02], method="sidak")
assert "note" in out
assert out["n_rejected"] == 0
assert out["reject"] == [False, False]
@@ -97,3 +98,66 @@ def test_todos_significativos():
assert bon["n_rejected"] == 3
assert all(bh["reject"])
assert all(bon["reject"])
def test_holm_golden_rechaza_dos_de_cuatro():
# Holm-Bonferroni (step-down) sobre [0.01, 0.04, 0.03, 0.005], m=4, alpha=0.05.
# Ordenado ascendente: 0.005, 0.01, 0.03, 0.04.
# padj_(1) = 4*0.005 = 0.02
# padj_(2) = max(0.02, 3*0.01=0.03) = 0.03
# padj_(3) = max(0.03, 2*0.03=0.06) = 0.06
# padj_(4) = max(0.06, 1*0.04=0.04) = 0.06
# Mapeado al orden de entrada [0.01, 0.04, 0.03, 0.005]:
# 0.01 -> 0.03, 0.04 -> 0.06, 0.03 -> 0.06, 0.005 -> 0.02
out = fdr_correction([0.01, 0.04, 0.03, 0.005], alpha=0.05, method="holm")
assert out["method"] == "holm"
assert out["n_tests"] == 4
adj = out["p_values_adjusted"]
assert abs(adj[0] - 0.03) < 1e-9
assert abs(adj[1] - 0.06) < 1e-9
assert abs(adj[2] - 0.06) < 1e-9
assert abs(adj[3] - 0.02) < 1e-9
assert out["reject"] == [True, False, False, True]
assert out["n_rejected"] == 2
def test_holm_entre_bonferroni_y_bh():
# Holm controla FWER como Bonferroni pero es step-down: rechaza AL MENOS
# tanto como Bonferroni simple, y a lo sumo tanto como BH (FDR, menos
# conservador). Cadena de potencia: bonferroni <= holm <= bh.
pvalues = [0.01, 0.02, 0.04, 0.005]
bon = fdr_correction(pvalues, alpha=0.05, method="bonferroni")
holm = fdr_correction(pvalues, alpha=0.05, method="holm")
bh = fdr_correction(pvalues, alpha=0.05, method="bh")
assert holm["n_rejected"] >= bon["n_rejected"]
assert holm["n_rejected"] <= bh["n_rejected"]
# En este set Holm gana potencia frente a Bonferroni simple (estricto).
assert holm["n_rejected"] > bon["n_rejected"]
# Un set donde Holm es estrictamente mas conservador que BH.
pvals2 = [0.01, 0.02, 0.03, 0.04]
bon2 = fdr_correction(pvals2, alpha=0.05, method="bonferroni")
holm2 = fdr_correction(pvals2, alpha=0.05, method="holm")
bh2 = fdr_correction(pvals2, alpha=0.05, method="bh")
assert holm2["n_rejected"] >= bon2["n_rejected"]
assert holm2["n_rejected"] < bh2["n_rejected"]
def test_none_se_propaga_alineado_holm():
# None se propaga alineado tambien con holm: la posicion central no cuenta
# como prueba (m=2) y se devuelve como None / False.
out = fdr_correction([0.001, None, 0.9], method="holm")
assert out["n_tests"] == 2
assert out["p_values_adjusted"][1] is None
assert out["reject"][1] is False
assert out["reject"][0] is True
assert len(out["reject"]) == 3
def test_lista_vacia_holm_devuelve_note():
out = fdr_correction([], method="holm")
assert out["p_values_adjusted"] == []
assert out["reject"] == []
assert out["n_tests"] == 0
assert out["n_rejected"] == 0
assert "note" in out
@@ -0,0 +1,100 @@
---
name: preregister_hypothesis
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def preregister_hypothesis(paper_dir: str, hypotheses: dict, analysis_plan: dict) -> dict"
description: "Pre-registra (congela) la hipotesis y el plan de analisis de un paper ANTES de mirar los datos: antidoto al HARKing (Hypothesizing After the Results are Known). Escribe/actualiza <paper_dir>/preregistration.md con un frontmatter (paper_slug, frozen_at, content_hash, status) y un cuerpo markdown DETERMINISTA derivado de (hypotheses, analysis_plan) (mismo input -> mismo cuerpo byte a byte, claves ordenadas alfabeticamente). El content_hash es sha256 del cuerpo NORMALIZADO (strip por linea + colapso de blancos), nunca del frontmatter. Una vez status=frozen es INMUTABLE: re-congelar con el mismo contenido es idempotente (no reescribe, devuelve unchanged) y re-congelar con contenido distinto se RECHAZA (no sobrescribe, devuelve error) para que no se pueda ajustar la hipotesis a los resultados. Estilo dict-no-throw: nunca lanza."
tags: [papers, preregistration, reproducibility, anti-harking, python]
params:
- name: paper_dir
desc: "ruta del directorio del paper, p.ej. 'papers/0001-mi-paper'. Debe existir (no se crea aqui). El paper_slug del frontmatter es el basename del dir. Si no existe o no es str -> {status:error, path, note} sin crash ni creacion."
- name: hypotheses
desc: "dict de hipotesis, p.ej. {'h0': 'no hay diferencia ...', 'h1': 'el grupo A > grupo B ...'}. Se renderiza en la seccion '## Hypotheses' con una linea por clave, ordenadas alfabeticamente para determinismo."
- name: analysis_plan
desc: "dict con el plan de analisis, p.ej. {'test': 'welch_t_test', 'effect_size_metric': 'cohens_d', 'decision_rule': 'rechazar H0 si p<0.05 tras Holm y |d|>=0.5', 'planned_n': 100, 'multiple_correction': 'holm'}. Se renderiza en '## Analysis plan' con una linea por clave (ordenadas alfabeticamente). Acepta valores no-str (int, etc.)."
output: "dict dict-no-throw (NUNCA lanza). status='frozen' cuando escribe el archivo por primera vez o congela un draft previo ({status, path, content_hash, frozen_at}). status='unchanged' cuando ya estaba frozen con el mismo content_hash: no reescribe y preserva el archivo byte-identico incl. el frozen_at original ({status, path, content_hash, frozen_at}). status='error' cuando paper_dir no existe, ya esta frozen con un hash distinto (rechazo anti-HARKing, no sobrescribe), inputs invalidos o error de I/O ({status, path, note, [content_hash]})."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [hashlib]
tested: true
tests: ["test_golden_congela_y_escribe_archivo", "test_idempotente_mismo_input_no_reescribe", "test_inmutabilidad_anti_harking_rechaza_contenido_distinto", "test_error_paper_dir_inexistente_no_crash_no_crea"]
test_file_path: "python/functions/datascience/preregister_hypothesis_test.py"
file_path: "python/functions/datascience/preregister_hypothesis.py"
---
## Ejemplo
```python
import os, tempfile
from datascience import preregister_hypothesis
# Un directorio de paper que ya existe.
paper_dir = tempfile.mkdtemp(prefix="0001-")
hypotheses = {
"h0": "no hay diferencia entre el grupo A y el grupo B",
"h1": "el grupo A tiene mayor conversion que el grupo B",
}
analysis_plan = {
"test": "welch_t_test",
"effect_size_metric": "cohens_d",
"decision_rule": "rechazar H0 si p<0.05 tras Holm y |d|>=0.5",
"planned_n": 100,
"multiple_correction": "holm",
}
# 1) Primera vez: congela y escribe <paper_dir>/preregistration.md
r1 = preregister_hypothesis(paper_dir, hypotheses, analysis_plan)
print(r1["status"]) # -> "frozen"
print(r1["content_hash"]) # sha256 del cuerpo
# 2) Mismo input: idempotente, no reescribe.
r2 = preregister_hypothesis(paper_dir, hypotheses, analysis_plan)
print(r2["status"]) # -> "unchanged"
# 3) Cambiar la hipotesis tras congelar (HARKing): rechazado, archivo intacto.
r3 = preregister_hypothesis(paper_dir, {"h0": "...", "h1": "otra cosa"}, analysis_plan)
print(r3["status"]) # -> "error"
```
## Cuando usarla
Llamala al ARRANCAR el analisis de un paper, antes de tocar los datos, para
dejar por escrito (y firmado por hash) que vas a probar y como vas a decidir.
Es el primer paso de un flujo reproducible: pre-registras la hipotesis y el plan
(`test`, `effect_size_metric`, `decision_rule`, `planned_n`,
`multiple_correction`), y solo despues corres el analisis y comparas con lo
pre-registrado. Si mas tarde el analisis "descubre" otra hipotesis que encaja
mejor con los datos, el pre-registro congelado deja en evidencia el cambio: no se
puede reescribir. Combinala con `effect_size_cohens_d` y `fdr_correction` para
cerrar el plan declarado (effect size + correccion de multiples comparaciones).
## Gotchas
- **Inmutabilidad (el corazon)**: una vez `status: frozen`, el pre-registro NO se
puede editar. Re-congelar con el MISMO contenido es idempotente (`unchanged`,
no reescribe, preserva incluso el `frozen_at` original). Re-congelar con
contenido DISTINTO devuelve `error` y deja el archivo intacto: asi se mata el
HARKing. Para cambiar de verdad la hipotesis hay que borrar el archivo a mano y
asumir explicitamente que ya no es un pre-registro valido.
- **dict-no-throw**: la funcion NUNCA lanza. Cualquier error previsible
(directorio inexistente, inputs no-dict, fallo de I/O, excepcion inesperada) se
captura y se devuelve como `{"status": "error", "note": ...}`. Siempre incluye
`path` (la ruta esperada del `preregistration.md`).
- **El hash es SOLO del cuerpo, nunca del frontmatter**: el frontmatter contiene
el propio `content_hash` y el `frozen_at` (timestamp), asi que incluirlos en el
hash seria circular y romperia la idempotencia. El cuerpo se normaliza antes de
hashear (strip por linea + colapso de lineas en blanco + strip final): cambios
irrelevantes de whitespace no alteran el hash, pero cambios de contenido SI.
- **Determinismo**: el cuerpo se genera con las claves de `hypotheses` y
`analysis_plan` ordenadas alfabeticamente, de modo que el orden de insercion del
dict no afecta al hash. Mismo `(hypotheses, analysis_plan)` -> mismo cuerpo y
mismo hash, byte a byte.
- **No crea el directorio del paper**: si `paper_dir` no existe, devuelve `error`
sin crear nada (ni el dir ni el archivo).
@@ -0,0 +1,202 @@
"""Congela (pre-registra) la hipotesis y el plan de analisis de un paper.
Anti-HARKing (Hypothesizing After the Results are Known): el pre-registro fija
la hipotesis y el plan de analisis ANTES de mirar los datos. Una vez congelado
(``status: frozen``) es INMUTABLE: cualquier intento posterior de re-congelar con
un contenido distinto se RECHAZA en vez de sobrescribir, de modo que no se puede
"ajustar" la hipotesis a los resultados despues de verlos.
Escribe/actualiza ``<paper_dir>/preregistration.md`` con un frontmatter
(``paper_slug``, ``frozen_at``, ``content_hash``, ``status``) y un cuerpo
markdown DETERMINISTA derivado de ``(hypotheses, analysis_plan)``.
Estilo dict-no-throw: NUNCA lanza; cualquier error previsible se captura y se
devuelve como ``{"status": "error", "note": ...}``.
"""
import hashlib
import os
from datetime import datetime, timezone
def _build_body(hypotheses: dict, analysis_plan: dict) -> str:
"""Construye el cuerpo markdown del pre-registro de forma DETERMINISTA.
Mismo ``(hypotheses, analysis_plan)`` -> mismo cuerpo byte a byte. Las claves
se ordenan alfabeticamente para no depender del orden de insercion del dict.
"""
lines = ["## Hypotheses", ""]
for k in sorted(hypotheses.keys()):
lines.append(f"- **{k}**: {hypotheses[k]}")
lines.append("")
lines.append("## Analysis plan")
lines.append("")
for k in sorted(analysis_plan.keys()):
lines.append(f"- **{k}**: {analysis_plan[k]}")
return "\n".join(lines)
def _normalize(body: str) -> str:
"""Normaliza el cuerpo para el hash: strip por linea + colapsa blancos.
Cambios irrelevantes de whitespace (espacios al final, dobles lineas en
blanco) no alteran el hash; cambios de contenido SI. Esto hace el hash
robusto sin perder la capacidad de detectar ediciones reales.
"""
out = []
prev_blank = False
for raw in body.splitlines():
line = raw.strip()
if line == "":
if prev_blank:
continue
prev_blank = True
else:
prev_blank = False
out.append(line)
return "\n".join(out).strip()
def _content_hash(body: str) -> str:
"""sha256 hex del cuerpo NORMALIZADO (nunca del frontmatter)."""
return hashlib.sha256(_normalize(body).encode("utf-8")).hexdigest()
def _parse_frontmatter(text: str) -> dict:
"""Parsea el frontmatter ``--- ... ---`` simple (key: value) de un .md."""
if not text.startswith("---"):
return {}
parts = text.split("---", 2)
if len(parts) < 3:
return {}
fm = {}
for line in parts[1].splitlines():
line = line.strip()
if not line or ":" not in line:
continue
key, _, value = line.partition(":")
fm[key.strip()] = value.strip()
return fm
def _render_file(slug: str, frozen_at: str, content_hash: str, body: str) -> str:
"""Compone el archivo completo: frontmatter frozen + cuerpo."""
return (
"---\n"
f"paper_slug: {slug}\n"
f"frozen_at: {frozen_at}\n"
f"content_hash: {content_hash}\n"
"status: frozen\n"
"---\n"
"\n"
f"{body}\n"
)
def preregister_hypothesis(paper_dir: str, hypotheses: dict, analysis_plan: dict) -> dict:
"""Congela la hipotesis y el plan de analisis de un paper (anti-HARKing).
Escribe ``<paper_dir>/preregistration.md`` con frontmatter ``status: frozen``
y un cuerpo markdown determinista. Una vez congelado es inmutable.
Args:
paper_dir: ruta del directorio del paper (p.ej. ``"papers/0001-mi-paper"``).
El ``paper_slug`` es el basename del directorio. Debe existir.
hypotheses: dict de hipotesis, p.ej.
``{"h0": "no hay diferencia ...", "h1": "grupo A > grupo B ..."}``.
analysis_plan: dict con el plan, p.ej.
``{"test": "welch_t_test", "effect_size_metric": "cohens_d",
"decision_rule": "...", "planned_n": 100, "multiple_correction": "holm"}``.
Returns:
dict dict-no-throw (NUNCA lanza). Claves segun el caso:
- frozen: {"status": "frozen", "path", "content_hash", "frozen_at"}
- unchanged: {"status": "unchanged", "path", "content_hash", "frozen_at"}
- error: {"status": "error", "path", "note", ...}
"""
expected_path = os.path.join(paper_dir, "preregistration.md")
try:
# 1) El directorio del paper debe existir; no se crea aqui.
if not isinstance(paper_dir, str) or not os.path.isdir(paper_dir):
return {
"status": "error",
"path": expected_path,
"note": f"paper_dir no existe: {paper_dir}",
}
if not isinstance(hypotheses, dict) or not isinstance(analysis_plan, dict):
return {
"status": "error",
"path": expected_path,
"note": "hypotheses y analysis_plan deben ser dict",
}
slug = os.path.basename(os.path.normpath(paper_dir))
# 2) + 3) Cuerpo determinista y su hash (solo del cuerpo, no del frontmatter).
body = _build_body(hypotheses, analysis_plan)
new_hash = _content_hash(body)
# 5) Logica de escritura.
if os.path.exists(expected_path):
existing = ""
try:
with open(expected_path, "r", encoding="utf-8") as fh:
existing = fh.read()
except OSError as exc:
return {
"status": "error",
"path": expected_path,
"note": f"no se pudo leer el pre-registro existente: {exc}",
}
fm = _parse_frontmatter(existing)
old_status = fm.get("status", "")
old_hash = fm.get("content_hash", "")
old_frozen_at = fm.get("frozen_at", "")
if old_status == "frozen":
if old_hash == new_hash:
# Idempotente: mismo contenido ya congelado. No se reescribe.
return {
"status": "unchanged",
"path": expected_path,
"content_hash": new_hash,
"frozen_at": old_frozen_at,
}
# Inmutabilidad: ya congelado con OTRO hash -> se rechaza (anti-HARKing).
return {
"status": "error",
"path": expected_path,
"content_hash": new_hash,
"note": (
"pre-registro inmutable: ya esta congelado (frozen) con un "
"hash distinto; un pre-registro no se puede editar tras "
"congelarse"
),
}
# status != "frozen" (p.ej. draft) -> se congela ahora.
# Archivo nuevo o draft existente: congelar con timestamp actual.
frozen_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
file_text = _render_file(slug, frozen_at, new_hash, body)
try:
with open(expected_path, "w", encoding="utf-8") as fh:
fh.write(file_text)
except OSError as exc:
return {
"status": "error",
"path": expected_path,
"note": f"no se pudo escribir el pre-registro: {exc}",
}
return {
"status": "frozen",
"path": expected_path,
"content_hash": new_hash,
"frozen_at": frozen_at,
}
except Exception as exc: # noqa: BLE001 - dict-no-throw: nunca propagar.
return {
"status": "error",
"path": expected_path,
"note": f"error inesperado: {exc}",
}
@@ -0,0 +1,99 @@
"""Tests para preregister_hypothesis (pre-registro inmutable, anti-HARKing).
Importa el modulo hoja directamente (`preregister_hypothesis`) para no depender
de que el paquete reexporte la funcion en su __init__ (lo integra el orquestador
al cerrar el grupo papers). El pytest del repo resuelve el modulo hoja por su
nombre directo.
Todos los tests son hermeticos y deterministas: usan el fixture `tmp_path` de
pytest; NUNCA escriben en `papers/`.
"""
from preregister_hypothesis import preregister_hypothesis
def _parse_frontmatter(text: str) -> dict:
parts = text.split("---", 2)
fm = {}
for line in parts[1].splitlines():
line = line.strip()
if not line or ":" not in line:
continue
key, _, value = line.partition(":")
fm[key.strip()] = value.strip()
return fm
HYP = {"h0": "no hay diferencia entre A y B", "h1": "el grupo A > grupo B"}
PLAN = {
"test": "welch_t_test",
"effect_size_metric": "cohens_d",
"decision_rule": "rechazar H0 si p<0.05 tras Holm y |d|>=0.5",
"planned_n": 100,
"multiple_correction": "holm",
}
def test_golden_congela_y_escribe_archivo(tmp_path):
paper = tmp_path / "0001-x"
paper.mkdir()
res = preregister_hypothesis(str(paper), HYP, PLAN)
assert res["status"] == "frozen"
pre = paper / "preregistration.md"
assert pre.exists()
text = pre.read_text(encoding="utf-8")
fm = _parse_frontmatter(text)
assert fm["status"] == "frozen"
assert fm["paper_slug"] == "0001-x"
assert fm["content_hash"] # no vacio
assert fm["frozen_at"] # no vacio
assert res["content_hash"] == fm["content_hash"]
assert res["frozen_at"] == fm["frozen_at"]
def test_idempotente_mismo_input_no_reescribe(tmp_path):
paper = tmp_path / "0001-x"
paper.mkdir()
pre = paper / "preregistration.md"
first = preregister_hypothesis(str(paper), HYP, PLAN)
assert first["status"] == "frozen"
bytes_before = pre.read_bytes()
second = preregister_hypothesis(str(paper), HYP, PLAN)
assert second["status"] == "unchanged"
# Mismo hash y frozen_at original preservado.
assert second["content_hash"] == first["content_hash"]
assert second["frozen_at"] == first["frozen_at"]
# El archivo NO cambio byte a byte (incl. frozen_at).
assert pre.read_bytes() == bytes_before
def test_inmutabilidad_anti_harking_rechaza_contenido_distinto(tmp_path):
paper = tmp_path / "0001-x"
paper.mkdir()
pre = paper / "preregistration.md"
preregister_hypothesis(str(paper), HYP, PLAN)
bytes_frozen = pre.read_bytes()
# Intento de re-congelar con una hipotesis DISTINTA (HARKing) -> rechazado.
hyp_tramposo = {"h0": "no hay diferencia", "h1": "el grupo B > grupo A (cambiado tras ver datos)"}
res = preregister_hypothesis(str(paper), hyp_tramposo, PLAN)
assert res["status"] == "error"
# Asercion mas importante: el archivo en disco SIGUE siendo el original.
assert pre.read_bytes() == bytes_frozen
def test_error_paper_dir_inexistente_no_crash_no_crea(tmp_path):
missing = tmp_path / "no-existe"
res = preregister_hypothesis(str(missing), HYP, PLAN)
assert res["status"] == "error"
# No se creo el directorio ni el archivo.
assert not missing.exists()
assert not (missing / "preregistration.md").exists()
@@ -1,122 +0,0 @@
---
id: relationship_scatter_figure_py_datascience
name: relationship_scatter_figure
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def relationship_scatter_figure(xs: list, ys: list, x_label: str = \"\", y_label: str = \"\", classification: dict = None, max_points: int = 2000) -> \"matplotlib.figure.Figure\""
description: "Construye una figura matplotlib scatter de un par de variables numéricas con su curva/recta de ajuste y una anotación del tipo de relación (lineal, polinómica grado 2/3, monótona no-lineal, etc.) más sus métricas (r, ρ, R²lin, R²poly). Consume el dict de classify_relationship_type; si es None lo calcula internamente reusando esa función. Devuelve un matplotlib.figure.Figure listo para rasterizar por el renderer del informe EDA (PDF/PPTX). Backend Agg sin pyplot global; downsample determinista de los puntos dibujados; defensivo ante vacío/None."
tags: [eda, correlation, scatter, relationship, matplotlib, figure, visualization, datascience, impure]
uses_functions: [classify_relationship_type_py_datascience]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [matplotlib, numpy]
example: |
from relationship_scatter_figure import relationship_scatter_figure
xs = [float(i) for i in range(100)]
ys = [0.5 * x * x - x + 3 for x in xs]
classification = {
"tipo": "polinómica (grado 2)", "pearson": 0.97, "spearman": 0.99,
"r2_linear": 0.92, "r2_poly2": 0.999, "r2_poly3": 0.999,
"best_degree": 2, "coeffs": [0.5, -1.0, 3.0],
}
fig = relationship_scatter_figure(xs, ys, x_label="dosis", y_label="efecto", classification=classification)
tested: true
tests:
- "test_returns_figure"
- "test_downsample_determinista"
- "test_empty_no_lanza"
- "test_classification_none"
test_file_path: "python/functions/datascience/relationship_scatter_figure_test.py"
file_path: "python/functions/datascience/relationship_scatter_figure.py"
params:
- name: xs
desc: "Lista (o tupla) de valores x. Se emparejan por índice con ys. Valores None, bool, NaN o inf descartan ese par (lectura defensiva)."
- name: ys
desc: "Lista (o tupla) de valores y, paralela a xs. Mismas reglas defensivas que xs."
- name: x_label
desc: "Etiqueta del eje/título para la variable x. Default \"\" (en el título cae a \"x\")."
- name: y_label
desc: "Etiqueta del eje/título para la variable y. Default \"\" (en el título cae a \"y\")."
- name: classification
desc: "Opcional. Dict de classify_relationship_type con claves tipo, pearson, r2_linear, spearman, r2_poly2, r2_poly3, best_degree, coeffs. Si es None se calcula internamente importando y llamando a classify_relationship_type sobre los pares limpios (self-contained). Si el módulo hermano no está disponible, se dibuja el scatter sin curva de ajuste ni anotación. Default None."
- name: max_points
desc: "Tope del nº de puntos DIBUJADOS. Si los pares limpios superan el tope, la nube se submuestrea por paso fijo ceil(n/max_points) tomando pairs[::step] — DETERMINISTA, no aleatorio, reproducible. La clasificación/ajuste usa SIEMPRE todos los pares limpios; el downsample solo adelgaza el dibujo. Valor no-positivo o no-int desactiva el downsample. Default 2000."
output: "Un matplotlib.figure.Figure (figsize 6.4x4.0, dpi 150) con un Axes scatter (puntos semitransparentes alpha 0.5, color #4C72B0), la curva/recta de ajuste (numpy.polyval sobre coeffs, color #C44E52) cuando hay un ajuste polinómico disponible, título \"{x_label} ↔ {y_label}\", labels de ejes y una caja de anotación en la esquina superior izquierda con el tipo de relación y las métricas disponibles (r, ρ, R²lin, R²poly; se omiten las None). Si tras la limpieza hay menos de 2 pares válidos, devuelve igualmente una Figure con un texto centrado \"Sin datos suficientes para el scatter\" (nunca lanza). El caller rasteriza/cierra la figura; la función no la muestra ni la guarda."
---
## Ejemplo
```python
from relationship_scatter_figure import relationship_scatter_figure
# Par numérico con relación cuadrática y su clasificación (de
# classify_relationship_type). Pasándola explícita evitas recomputarla.
xs = [float(i) for i in range(100)]
ys = [0.5 * x * x - x + 3 for x in xs]
classification = {
"tipo": "polinómica (grado 2)",
"pearson": 0.97,
"spearman": 0.99,
"r2_linear": 0.92,
"r2_poly2": 0.999,
"r2_poly3": 0.999,
"best_degree": 2,
"coeffs": [0.5, -1.0, 3.0],
}
fig = relationship_scatter_figure(
xs, ys, x_label="dosis", y_label="efecto", classification=classification
)
# El renderer del informe lo rasteriza; aquí solo persistimos para inspección.
fig.savefig("/tmp/scatter_dosis_efecto.png")
# Con classification=None la función la calcula internamente (self-contained):
fig2 = relationship_scatter_figure(xs, ys, x_label="dosis", y_label="efecto")
```
## Cuando usarla
Úsala dentro del informe EDA automático cuando quieras visualizar de un vistazo
la relación entre dos variables numéricas: la nube de puntos, la curva que mejor
la ajusta y una etiqueta legible del tipo de relación con sus métricas. Es la
pareja "vista humana" de `classify_relationship_type`: esa función decide el
tipo y los coeficientes; esta los pinta en una `Figure` que el renderer del
informe rasteriza a PDF/PPTX. Pásale el dict de clasificación si ya lo tienes
calculado (evitas recomputar el ajuste); si no, déjalo en `None` y la función lo
resuelve sola sobre los pares limpios. Pensada para móvil: anotación pequeña
(fontsize 8) y nube adelgazada por `max_points` para que el PDF no pese.
## Gotchas
- **Impura por matplotlib.** Toca la maquinaria de render. Usa el backend `Agg`
y la API orientada a objetos `Figure`/`add_subplot` — NUNCA `pyplot.*` aquí,
para no tocar el estado global ni filtrar figuras entre llamadas. `pyplot` NO
es thread-safe; esta función lo evita construyendo el `Figure` directamente,
así que es segura de llamar en bucle desde el renderer.
- **El caller cierra la figura.** Devuelve el `Figure` pero no lo muestra ni lo
guarda. Quien la consume debe rasterizarla y luego liberarla
(`matplotlib.pyplot.close(fig)`) para no acumular memoria en lotes grandes de
pares de columnas.
- **Downsample determinista, solo del dibujo.** Cuando los pares limpios superan
`max_points`, la nube DIBUJADA se adelgaza por paso fijo `pairs[::step]`
(reproducible, no aleatorio). La clasificación y el ajuste usan SIEMPRE todos
los pares limpios; el downsample no altera las métricas ni la curva.
- **`classification=None` ⇒ se calcula sola.** Importa y llama a
`classify_relationship_type` sobre los pares limpios. Si ese módulo hermano no
está disponible (entorno incompleto), NO lanza: dibuja el scatter sin curva de
ajuste ni anotación. Pasar la clasificación explícita es más barato (no
recomputa el ajuste).
- **Sin curva para `monótona no-lineal`.** Cuando `coeffs` es `None` o
`best_degree` es `None` (p.ej. tipo "monótona no-lineal"), no se pinta recta
polinómica — solo la nube y la anotación. Tampoco se dibuja la curva si el
rango de x es nulo (todos los x iguales). Nunca falla por esto.
- **Defensiva, nunca lanza.** `xs=[]`, `ys=[]`, menos de 2 pares válidos, ends
`None`/`bool`/`NaN`/`inf` o `coeffs` malformado se manejan sin error: en el
peor caso devuelve una `Figure` con "Sin datos suficientes para el scatter".
No envuelvas la llamada en try/except por miedo a un raise — no lo hay.
@@ -1,322 +0,0 @@
"""Impure EDA helper: scatter figure of a numeric pair with its fit (`eda` group).
Builds a matplotlib scatter of two numeric variables, overlays the fitted
curve/line implied by the relationship classification (linear, polynomial of
degree 2/3, etc.) and annotates the relationship type with its available
metrics. Returns a ready-to-rasterize ``matplotlib.figure.Figure``; it never
shows nor saves it.
Impure because it touches matplotlib's rendering machinery. It uses the headless
Agg backend and the object-oriented ``Figure`` API (no ``pyplot``) so it leaks no
global state and is safe to call repeatedly from a report renderer.
To keep the rendered PDF/PPTX light on phones, when the number of valid pairs
exceeds ``max_points`` the *plotted* points are down-sampled DETERMINISTICALLY by
a fixed step (``pairs[::step]``), never randomly, so the output is reproducible.
The classification/fit always uses every clean pair; the down-sample only thins
the drawn cloud.
"""
import math
import matplotlib
matplotlib.use("Agg")
import numpy as np # noqa: E402
from matplotlib.figure import Figure # noqa: E402
# Sober blue for the scatter cloud and red for the fitted curve (Tufte: the
# data points are the primary ink, the fit is the secondary highlight).
_POINT_COLOR = "#4C72B0"
_FIT_COLOR = "#C44E52"
# Muted gray for the no-data fallback message.
_MUTED_TEXT = "#5f6b7a"
def _finite(value):
"""Coerce ``value`` to a finite float, or return None when not usable.
bool is a subclass of int, but a real numeric measurement is never a bool,
so True/False are treated as missing instead of coercing to 1.0/0.0. NaN and
+/-infinity are never valid either.
"""
if value is None or isinstance(value, bool):
return None
try:
f = float(value)
except (TypeError, ValueError):
return None
if math.isnan(f) or math.isinf(f):
return None
return f
def _clean_pairs(xs, ys):
"""Pair ``xs[i], ys[i]`` by index, dropping any pair with a non-finite end."""
pairs = []
if isinstance(xs, (list, tuple)) and isinstance(ys, (list, tuple)):
n = min(len(xs), len(ys))
for i in range(n):
x = _finite(xs[i])
y = _finite(ys[i])
if x is None or y is None:
continue
pairs.append((x, y))
return pairs
def _ordered_trend(xs_clean, ys_clean, n_bins: int = 12):
"""Return (x_trend, y_trend): the ordered trend of y over x for a monotonic
relationship that has no polynomial fit.
When x has few distinct values (an ordinal/discrete scale) the trend is the
mean of y per distinct x value. Otherwise x is split into ``n_bins`` ordered
quantile bins and each point is (mean x, mean y) of the bin. Returns
``(None, None)`` when there is nothing meaningful to draw.
"""
x_arr = np.asarray(xs_clean, dtype=float)
y_arr = np.asarray(ys_clean, dtype=float)
if x_arr.size < 2:
return None, None
uniq = np.unique(x_arr)
if uniq.size <= max(2, n_bins):
# Discrete x: one trend point per distinct value (mean y).
xt = uniq
yt = np.array([float(np.mean(y_arr[x_arr == ux])) for ux in uniq])
return xt, yt
# Continuous x: ordered quantile bins, (mean x, mean y) per bin.
order = np.argsort(x_arr, kind="stable")
x_sorted = x_arr[order]
y_sorted = y_arr[order]
chunks_x = np.array_split(x_sorted, n_bins)
chunks_y = np.array_split(y_sorted, n_bins)
xt = np.array([float(np.mean(cx)) for cx in chunks_x if cx.size])
yt = np.array([float(np.mean(cy)) for cy in chunks_y if cy.size])
return xt, yt
def _no_data_figure(message: str) -> "matplotlib.figure.Figure":
"""A bare Figure carrying a centered muted message (defensive fallback)."""
fig = Figure(figsize=(6.4, 4.0), dpi=150)
ax = fig.add_subplot(111)
ax.axis("off")
ax.text(
0.5,
0.5,
message,
ha="center",
va="center",
fontsize=12,
color=_MUTED_TEXT,
transform=ax.transAxes,
)
fig.tight_layout()
return fig
def _metrics_caption(classification: dict) -> str:
"""Format the available metrics of a classification dict into one line.
Omits the metrics that are None. Keys consumed (any may be absent/None):
``pearson`` (r), ``spearman`` (rho), ``r2_linear`` (R²lin) and the best
polynomial R² (``r2_poly3`` if a cubic was the best fit, else ``r2_poly2``).
"""
parts = []
r = _finite(classification.get("pearson"))
if r is not None:
parts.append(f"r={r:.2f}")
rho = _finite(classification.get("spearman"))
if rho is not None:
parts.append(f"ρ={rho:.2f}")
r2_lin = _finite(classification.get("r2_linear"))
if r2_lin is not None:
parts.append(f"R²lin={r2_lin:.2f}")
# Prefer the R² of the best polynomial degree when it is a poly fit.
best_degree = classification.get("best_degree")
r2_poly = None
if best_degree == 3:
r2_poly = _finite(classification.get("r2_poly3"))
elif best_degree == 2:
r2_poly = _finite(classification.get("r2_poly2"))
if r2_poly is None:
# Fall back to whichever poly R² is present (cubic first).
r2_poly = _finite(classification.get("r2_poly3"))
if r2_poly is None:
r2_poly = _finite(classification.get("r2_poly2"))
if r2_poly is not None:
parts.append(f"R²poly={r2_poly:.2f}")
return " ".join(parts)
def relationship_scatter_figure(
xs: list,
ys: list,
x_label: str = "",
y_label: str = "",
classification: dict = None,
max_points: int = 2000,
) -> "matplotlib.figure.Figure":
"""Build a scatter figure of a numeric pair with its fit and a type label.
Cleans the pairs defensively (drops any pair with a None/bool/NaN/inf end),
plots a semi-transparent scatter cloud (down-sampled deterministically when
it exceeds ``max_points``), overlays the polynomial fit implied by
``classification`` and annotates the relationship type plus its available
metrics in a corner box.
The fit and classification always use every clean pair; only the drawn cloud
is thinned by the down-sample. When ``classification`` is None it is computed
internally by reusing ``classify_relationship_type`` over the clean pairs, so
the function is self-contained.
The function is fully defensive: empty input, fewer than 2 clean pairs, a
missing/None ``coeffs`` or a missing sibling classifier never raise. When
there is nothing valid to draw it still returns a ``Figure`` carrying a
centered "Sin datos suficientes para el scatter" message.
Args:
xs: List (or tuple) of x values. Paired by index with ``ys``. Values that
are None, bool, NaN or infinite discard that pair. Read defensively.
ys: List (or tuple) of y values, parallel to ``xs``. Same defensive rules.
x_label: Axis/title label for the x variable. Default "" (falls back to
"x" in the title).
y_label: Axis/title label for the y variable. Default "" (falls back to
"y" in the title).
classification: Optional dict from ``classify_relationship_type`` with
keys ``tipo, pearson, r2_linear, spearman, r2_poly2, r2_poly3,
best_degree, coeffs``. When None, it is computed internally by
importing and calling ``classify_relationship_type`` over the clean
pairs. When that sibling module is unavailable, the scatter is still
drawn (no fit curve, no annotation).
max_points: Cap on the number of *plotted* points. When the number of
clean pairs exceeds this cap, the drawn cloud is down-sampled by a
fixed step ``ceil(n/max_points)`` taking ``pairs[::step]`` —
DETERMINISTIC, not random, so the figure is reproducible. A
non-positive or non-int value disables down-sampling. Default 2000.
Returns:
A ``matplotlib.figure.Figure`` (figsize 6.4x4.0, dpi 150) with a single
scatter Axes, the fitted curve (when a polynomial fit is available) and a
corner annotation with the relationship type and metrics. When there are
fewer than 2 clean pairs it returns a Figure with a centered "Sin datos
suficientes para el scatter" message. The caller rasterizes/closes it.
"""
pairs = _clean_pairs(xs, ys)
if len(pairs) < 2:
return _no_data_figure("Sin datos suficientes para el scatter")
# Full clean coordinates feed the classification/fit; the plotted cloud is
# what gets thinned.
xs_clean = [p[0] for p in pairs]
ys_clean = [p[1] for p in pairs]
# Resolve the classification. If not provided, reuse the sibling classifier
# over ALL clean pairs (self-contained). Missing module => no fit/annotation.
cls = classification
if cls is None:
try:
from classify_relationship_type import classify_relationship_type
cls = classify_relationship_type(xs_clean, ys_clean)
except Exception:
cls = None
if not isinstance(cls, dict):
cls = {}
# --- Deterministic down-sampling of the DRAWN points only.
n_total = len(pairs)
if (
isinstance(max_points, int)
and not isinstance(max_points, bool)
and max_points > 0
and n_total > max_points
):
step = math.ceil(n_total / max_points)
sampled = pairs[::step]
else:
sampled = pairs
x_plot = [p[0] for p in sampled]
y_plot = [p[1] for p in sampled]
fig = Figure(figsize=(6.4, 4.0), dpi=150)
ax = fig.add_subplot(111)
ax.scatter(
x_plot,
y_plot,
s=12,
alpha=0.5,
color=_POINT_COLOR,
edgecolors="none",
rasterized=True,
)
# --- Fitted curve/line over the full clean x range.
coeffs = cls.get("coeffs")
best_degree = cls.get("best_degree")
tipo = cls.get("tipo")
x_min, x_max = min(xs_clean), max(xs_clean)
drew_fit = False
if coeffs is not None and best_degree is not None and x_max > x_min:
try:
coeff_arr = np.asarray(coeffs, dtype=float)
if coeff_arr.ndim == 1 and coeff_arr.size > 0 and np.all(np.isfinite(coeff_arr)):
x_line = np.linspace(x_min, x_max, 200)
y_line = np.polyval(coeff_arr, x_line)
if np.all(np.isfinite(y_line)):
ax.plot(x_line, y_line, color=_FIT_COLOR, linewidth=2)
drew_fit = True
except Exception:
# Never fail the figure because of a malformed coeffs array.
pass
# A monotonic non-linear relationship has no fitted polynomial (coeffs is
# None by design — a low-degree polynomial would mislead). Draw instead the
# ordered trend of y over x so the reader still sees the shape: y averaged
# within ordered x-bins (or per distinct x value when x is discrete with few
# levels, e.g. an ordinal scale). Defensive: any failure leaves the cloud.
if (not drew_fit and isinstance(tipo, str) and "monóton" in tipo.lower()
and x_max > x_min):
try:
xt, yt = _ordered_trend(xs_clean, ys_clean)
if xt is not None and len(xt) >= 2:
ax.plot(xt, yt, color=_FIT_COLOR, linewidth=2, marker="o",
markersize=3)
except Exception:
pass
# --- Labels and title.
tx = x_label if x_label else "x"
ty = y_label if y_label else "y"
ax.set_title(f"{tx}{ty}", fontsize=12, loc="left", pad=8)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
# --- Corner annotation: relationship type + available metrics.
caption_lines = []
if tipo:
caption_lines.append(str(tipo))
metrics_line = _metrics_caption(cls)
if metrics_line:
caption_lines.append(metrics_line)
if caption_lines:
ax.text(
0.03,
0.97,
"\n".join(caption_lines),
transform=ax.transAxes,
ha="left",
va="top",
fontsize=8,
bbox=dict(
boxstyle="round,pad=0.35",
facecolor="white",
edgecolor="#cccccc",
alpha=0.85,
),
)
fig.tight_layout()
return fig
@@ -1,100 +0,0 @@
"""Tests para relationship_scatter_figure (scatter de un par numérico, grupo eda).
Usa el backend Agg sin pyplot global; no muestra ni guarda figuras. Cada test
cierra explícitamente la Figure construida (matplotlib.pyplot.close) para no
acumular estado entre tests.
"""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
from matplotlib.collections import PathCollection # noqa: E402
from matplotlib.figure import Figure # noqa: E402
from relationship_scatter_figure import relationship_scatter_figure
def _scatter_offsets(fig):
"""Return the plotted points of the first PathCollection (scatter) found."""
for ax in fig.axes:
for coll in ax.collections:
if isinstance(coll, PathCollection):
return coll.get_offsets()
return None
def test_returns_figure():
xs = [float(i) for i in range(20)]
ys = [2.0 * x + 1.0 for x in xs] # y = 2x + 1
classification = {
"tipo": "lineal",
"pearson": 1.0,
"r2_linear": 1.0,
"spearman": 1.0,
"r2_poly2": 1.0,
"r2_poly3": 1.0,
"best_degree": 1,
"coeffs": [2.0, 1.0],
}
fig = relationship_scatter_figure(
xs, ys, x_label="a", y_label="b", classification=classification
)
assert hasattr(fig, "savefig")
assert len(fig.axes) >= 1
plt.close(fig)
def test_downsample_determinista():
n = 5000
xs = [float(i) for i in range(n)]
ys = [0.5 * x for x in xs]
classification = {
"tipo": "lineal",
"pearson": 1.0,
"r2_linear": 1.0,
"spearman": 1.0,
"r2_poly2": 1.0,
"r2_poly3": 1.0,
"best_degree": 1,
"coeffs": [0.5, 0.0],
}
fig = relationship_scatter_figure(
xs, ys, x_label="x", y_label="y", classification=classification, max_points=1000
)
assert isinstance(fig, Figure)
offsets = _scatter_offsets(fig)
assert offsets is not None
# El nº de puntos dibujados no debe exceder el cap.
assert len(offsets) <= 1000
plt.close(fig)
def test_empty_no_lanza():
fig = relationship_scatter_figure([], [], x_label="x", y_label="y")
assert isinstance(fig, Figure)
plt.close(fig)
def test_classification_none():
# Solo se ejecuta si el módulo hermano classify_relationship_type existe.
try:
import classify_relationship_type # noqa: F401
except Exception:
import pytest
pytest.skip("classify_relationship_type aún no disponible")
xs = [float(i) for i in range(30)]
ys = [3.0 * x - 2.0 for x in xs]
fig = relationship_scatter_figure(
xs, ys, x_label="a", y_label="b", classification=None
)
assert isinstance(fig, Figure)
assert len(fig.axes) >= 1
plt.close(fig)