4de071f2f9
project_clusters_2d (pura): PCA(2)+KMeans sobre el MISMO subset estandarizado, devolviendo proyeccion 2D y labels alineados por fila + centroides en espacio PCA + perfiles de cluster desestandarizados. Es la pieza que garantiza la alineacion points<->labels que pca_explained y kmeans_segments no cubren (estandarizan por separado y kmeans descarta los labels). Habilita el scatter PCA coloreado por cluster (MUST-8.1). describe_clusters_llm (impura): micro-analisis LLM de los clusters en una sola llamada a ask_llm (grupo claude-direct), devuelve titulo + descripcion por cluster con degradacion dict-no-throw a titulos genericos si el LLM no responde (MUST-8.2). Ambas re-exportadas en datascience/__init__.py. Tests: 6/6 y 9/9 (sin red). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
128 lines
4.7 KiB
Python
128 lines
4.7 KiB
Python
"""Tests para project_clusters_2d."""
|
|
|
|
import numpy as np
|
|
|
|
from project_clusters_2d import project_clusters_2d
|
|
|
|
|
|
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
|
|
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
|
|
|
|
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
|
|
"""
|
|
rng = np.random.default_rng(seed)
|
|
base_centers = [
|
|
np.full(n_features, 0.0),
|
|
np.full(n_features, 12.0),
|
|
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
|
|
]
|
|
rows: list[np.ndarray] = []
|
|
for center in base_centers:
|
|
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
|
|
rows.extend(pts)
|
|
mat = np.array(rows)
|
|
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
|
|
|
|
|
|
def test_golden_three_blobs_aligned_projection_and_clusters():
|
|
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
|
|
result = project_clusters_2d(columns, k_min=2, k_max=8)
|
|
|
|
n_used = result["n_used"]
|
|
assert n_used == 150
|
|
assert result["note"] == ""
|
|
|
|
best_k = result["best_k"]
|
|
assert 2 <= best_k <= 4
|
|
|
|
# points y labels alineados por fila.
|
|
assert len(result["points"]) == len(result["labels"])
|
|
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
|
|
|
|
# Cada punto es un par (x, y).
|
|
assert all(len(p) == 2 for p in result["points"])
|
|
|
|
# Labels dentro del rango [0, best_k).
|
|
assert all(0 <= lbl < best_k for lbl in result["labels"])
|
|
|
|
# Centroides 2D: uno por cluster.
|
|
assert len(result["centers_2d"]) == best_k
|
|
assert all(len(c) == 2 for c in result["centers_2d"])
|
|
|
|
# Varianza explicada de los 2 componentes.
|
|
assert len(result["explained_2d"]) == 2
|
|
|
|
# cluster_sizes cubre todas las filas usadas.
|
|
assert sum(result["cluster_sizes"]) == n_used
|
|
assert len(result["cluster_sizes"]) == best_k
|
|
|
|
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
|
|
assert len(result["cluster_profiles"]) == best_k
|
|
for prof in result["cluster_profiles"]:
|
|
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
|
|
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
|
|
assert 1 <= len(prof["distinctive"]) <= 3
|
|
assert prof["size"] >= 0
|
|
assert 0.0 <= prof["pct"] <= 1.0
|
|
|
|
|
|
def test_edge_subsampling_keeps_points_labels_aligned():
|
|
# max_points pequeño fuerza submuestreo conjunto de points + labels.
|
|
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
|
|
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
|
|
|
|
n_used = result["n_used"]
|
|
assert n_used == 150 # el fit usa todas las filas
|
|
|
|
# points y labels submuestreados pero siempre con la misma longitud.
|
|
assert len(result["points"]) == len(result["labels"])
|
|
assert len(result["points"]) <= 40
|
|
|
|
# centers/sizes/profiles se calculan sobre TODOS los puntos.
|
|
assert sum(result["cluster_sizes"]) == n_used
|
|
assert len(result["centers_2d"]) == result["best_k"]
|
|
assert result["note"] != "" # senala el submuestreo
|
|
|
|
|
|
def test_edge_single_numeric_column_insufficient():
|
|
columns = {"x": [float(i) for i in range(50)]}
|
|
result = project_clusters_2d(columns, k_min=2, k_max=8)
|
|
|
|
assert result["best_k"] == 0
|
|
assert result["note"] == "datos insuficientes"
|
|
assert result["points"] == []
|
|
assert result["labels"] == []
|
|
assert result["centers_2d"] == []
|
|
assert result["cluster_profiles"] == []
|
|
|
|
|
|
def test_edge_too_few_rows_insufficient():
|
|
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
|
|
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
|
|
result = project_clusters_2d(columns, k_min=2, k_max=8)
|
|
|
|
assert result["best_k"] == 0
|
|
assert result["note"] == "datos insuficientes"
|
|
|
|
|
|
def test_edge_non_numeric_column_dropped_without_error():
|
|
# La columna de strings se descarta; quedan 3 numericas -> funciona.
|
|
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
|
|
columns["label"] = ["a"] * len(columns["f0"])
|
|
result = project_clusters_2d(columns, k_min=2, k_max=6)
|
|
|
|
assert result["best_k"] >= 2
|
|
assert "label" not in result["feature_names"]
|
|
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
|
|
assert len(result["points"]) == len(result["labels"])
|
|
|
|
|
|
def test_edge_constant_column_dropped():
|
|
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
|
|
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
|
|
columns["const"] = [7.0] * len(columns["f0"])
|
|
result = project_clusters_2d(columns, k_min=2, k_max=6)
|
|
|
|
assert "const" not in result["feature_names"]
|
|
assert result["best_k"] >= 2
|