feat(eda): project_clusters_2d + describe_clusters_llm para el capitulo MODELOS

project_clusters_2d (pura): PCA(2)+KMeans sobre el MISMO subset estandarizado,
devolviendo proyeccion 2D y labels alineados por fila + centroides en espacio PCA
+ perfiles de cluster desestandarizados. Es la pieza que garantiza la alineacion
points<->labels que pca_explained y kmeans_segments no cubren (estandarizan por
separado y kmeans descarta los labels). Habilita el scatter PCA coloreado por
cluster (MUST-8.1).

describe_clusters_llm (impura): micro-analisis LLM de los clusters en una sola
llamada a ask_llm (grupo claude-direct), devuelve titulo + descripcion por cluster
con degradacion dict-no-throw a titulos genericos si el LLM no responde (MUST-8.2).

Ambas re-exportadas en datascience/__init__.py. Tests: 6/6 y 9/9 (sin red).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 14:57:27 +02:00
parent cb7a7fc1fd
commit 4de071f2f9
7 changed files with 931 additions and 0 deletions
+4
View File
@@ -42,6 +42,8 @@ from .isolation_forest_outliers import isolation_forest_outliers
from .normality_tests import normality_tests
from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -86,6 +88,8 @@ __all__ = [
"normality_tests",
"trend_slope",
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -0,0 +1,97 @@
---
name: describe_clusters_llm
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def describe_clusters_llm(cluster_profiles: list, feature_names: list, model: str = \"claude-haiku-4-5-20251001\") -> dict"
description: "Micro-analisis LLM de clusters de KMeans (grupo eda). Toma los perfiles AGREGADOS de cada cluster (los que produce project_clusters_2d: tamano, centroide en escala original, features distintivas y centroide en z-score) y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una descripcion de 1-2 frases en espanol. Clave de coste/privacidad: NO envia filas crudas, solo el resumen agregado de cada grupo (tamano, % del total y la media de las features distintivas con su signo respecto a la media global). Reusa ask_llm del grupo claude-direct (API directa con token OAuth de Claude). Impura, dict-no-throw: nunca lanza, degrada a titulos genericos 'Cluster N' si el LLM no responde o el parseo falla."
tags: [eda, clustering, llm, claude-direct, datascience, kmeans]
params:
- name: cluster_profiles
desc: "Lista de perfiles de cluster con la forma que produce project_clusters_2d: cada uno {cluster:int, size:int, pct:float, centroid_original:{feature: media en escala original}, distinctive:[features distintivas], centroid_z:{feature: z-score}}. Solo se le envia al LLM un resumen agregado; nunca filas crudas. Lista vacia o no-lista -> clusters=[] sin llamar al LLM."
- name: feature_names
desc: "Nombres de las features del dataset. Se incluyen como contexto en el prompt para que el LLM pueda nombrar los clusters; no es obligatorio que coincida con las features distintivas de cada perfil."
- name: model
desc: "id del modelo Anthropic a usar. Default 'claude-haiku-4-5-20251001' (haiku, coste bajo, ~2-3s). Para titulos/descripciones mas finas, pasar p.ej. 'claude-opus-4-8'."
output: "dict dict-no-throw: {clusters:[{cluster:int, title:str, description:str}], model:str, note:str}. note=='' si todo fue bien. Si el LLM no respondio (note='LLM no disponible') o el parseo fallo (note='parse fallido'), clusters trae titulos genericos 'Cluster N' con description vacia. Si cluster_profiles esta vacio o no es lista: {clusters:[], model, note:'sin clusters'}. NUNCA lanza."
uses_functions: [ask_llm_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_parse_clusters_json_valid_array", "test_parse_clusters_json_wrapped_in_junk_text", "test_parse_clusters_json_non_json_returns_none", "test_parse_clusters_json_fills_missing_cluster_by_index", "test_describe_clusters_llm_ok_with_monkeypatched_llm", "test_describe_clusters_llm_degrades_on_empty_response", "test_describe_clusters_llm_degrades_on_unparseable_response", "test_describe_clusters_llm_empty_list_skips_llm", "test_describe_clusters_llm_non_list_input_skips_llm"]
test_file_path: "python/functions/datascience/describe_clusters_llm_test.py"
file_path: "python/functions/datascience/describe_clusters_llm.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.describe_clusters_llm import describe_clusters_llm
# Perfiles agregados producidos por project_clusters_2d (no hay filas crudas).
cluster_profiles = [
{
"cluster": 0, "size": 60, "pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1, "size": 40, "pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
feature_names = ["acidez", "alcohol", "azucar"]
out = describe_clusters_llm(cluster_profiles, feature_names) # haiku por defecto
# out = describe_clusters_llm(cluster_profiles, feature_names, model="claude-opus-4-8")
if not out["note"]:
for c in out["clusters"]:
print(f"Cluster {c['cluster']}: {c['title']}")
print(" ", c["description"])
else:
# Degradacion: titulos genericos "Cluster N".
print("LLM no usado:", out["note"])
for c in out["clusters"]:
print(c["cluster"], c["title"])
```
## Cuando usarla
Cuando ya has clusterizado un dataset (KMeans + `project_clusters_2d`) y quieres
poner NOMBRE y descripcion legible a cada grupo en vez de dejar "Cluster 0/1/2".
Es el paso interpretativo que sigue al perfilado de clusters: `project_clusters_2d`
calcula tamano, centroides y features distintivas, y `describe_clusters_llm` los
traduce a un titulo corto + 1-2 frases por cluster. Usala al cerrar un EDA con
segmentacion para el resumen final o el report. Una sola llamada al LLM describe
todos los clusters a la vez (barato).
## Gotchas
- **Impura: hace 1 llamada de red al LLM.** No es determinista ni gratis. Latencia
tipica ~2-3s con haiku.
- **Requiere token OAuth de Claude** en `~/.claude/.credentials.json` (via `ask_llm`
/ grupo `claude-direct`). Sin token / sin red, NO lanza: degrada a titulos
genericos `Cluster N` con `note="LLM no disponible"`.
- **NO envia filas crudas al LLM**, solo el resumen AGREGADO de cada cluster
(tamano, % del total y la media de las features distintivas con su signo respecto
a la media global). Privacidad y coste minimos por diseno — pero requiere que los
perfiles vengan ya calculados por `project_clusters_2d`.
- **Modelo `haiku` por defecto** para coste bajo; sube a `claude-opus-4-8` si
necesitas titulos/descripciones mas finas (mas caro y lento).
- **dict-no-throw**: si el modelo no devuelve un JSON array parseable, retorna
titulos genericos con `note="parse fallido"`. Comprueba siempre `out["note"]`
antes de fiarte de los titulos.
- El LLM puede sobre-interpretar: el system prompt le pide ser sobrio y no inventar
causas, pero revisa los titulos antes de publicarlos en un report.
@@ -0,0 +1,240 @@
"""describe_clusters_llm — micro-analisis LLM de clusters de KMeans (grupo `eda`).
Toma los PERFILES AGREGADOS de cada cluster (los que produce `project_clusters_2d`:
tamano, centroide en escala original, features distintivas y centroide en z-score)
y, con UNA sola llamada al LLM, pide por cada cluster un TITULO corto + una
descripcion de 1-2 frases, en espanol.
Clave de coste y privacidad: NO se envian filas crudas al LLM. Solo viaja el
perfil AGREGADO de cada grupo (tamano, % del total y la media de las features
distintivas con su signo respecto a la media global). El coste es minimo y ningun
dato fila-a-fila sale del proceso.
Reusa `ask_llm` del registry (grupo claude-direct, API directa con el token OAuth
de Claude en ~/.claude/.credentials.json, arranque 0). Impura: una llamada de red.
Estilo dict-no-throw: NUNCA lanza; ante cualquier fallo (red, LLM caido, parseo)
degrada a titulos genericos "Cluster N" + una nota explicando el motivo.
"""
import json
from core.ask_llm import ask_llm
_SYSTEM = (
"Eres un analista de datos. Recibes los PERFILES AGREGADOS de los clusters de "
"un KMeans (por cada grupo: su tamano y la media de sus features distintivas, "
"con el signo respecto a la media global; nunca filas crudas) y los describes "
"de forma sobria y util. Para cada cluster generas un titulo corto y "
"descriptivo (por ejemplo 'Vinos de alta acidez y baja graduacion') y una "
"descripcion de 1-2 frases. NO inventes causas ni sobre-interpretes: limitate a "
"lo que dicen los numeros. Responde en espanol. Responde SIEMPRE y SOLO con un "
"unico JSON array valido, sin texto alrededor y sin fences de markdown, con "
'EXACTAMENTE la forma [{"cluster": <int>, "title": "<titulo corto>", '
'"description": "<1-2 frases>"}], un objeto por cluster.'
)
def _fmt_num(value) -> str:
"""Formatea un numero de forma compacta para el prompt (None -> '?')."""
if value is None:
return "?"
if isinstance(value, bool):
return str(value)
if isinstance(value, float):
if value == int(value):
return str(int(value))
return f"{value:.4g}"
return str(value)
def _cluster_id(profile: dict, index: int) -> int:
"""Devuelve el id del cluster del perfil, o el indice si no es un int valido."""
raw = (profile or {}).get("cluster")
if isinstance(raw, bool):
return index
if isinstance(raw, int):
return raw
try:
return int(raw)
except (TypeError, ValueError):
return index
def _build_prompt(cluster_profiles: list, feature_names: list) -> str:
"""Construye un resumen textual compacto de los perfiles para el LLM.
Funcion interna PURA: no toca red ni disco, es testeable sin credenciales.
Por cada cluster incluye su numero, tamano (size + pct%) y, para cada feature
distintiva, el valor del centroide en escala original mas si esta por encima o
por debajo de la media (signo del z-score en centroid_z). Pasa AGREGADOS, nunca
dato crudo de filas.
Args:
cluster_profiles: lista de perfiles de cluster (forma de project_clusters_2d).
feature_names: nombres de las features del dataset (solo contexto).
Returns:
El texto del prompt.
"""
cluster_profiles = cluster_profiles or []
feature_names = feature_names if isinstance(feature_names, list) else []
lines = [
"Perfiles AGREGADOS de clusters de KMeans. No hay filas crudas, solo medias por grupo.",
f"Numero de clusters: {len(cluster_profiles)}",
]
if feature_names:
lines.append("Features del dataset: " + ", ".join(str(f) for f in feature_names))
lines.append("")
for i, prof in enumerate(cluster_profiles):
prof = prof or {}
cid = _cluster_id(prof, i)
size = prof.get("size")
pct = prof.get("pct")
pct_str = f"{pct:.1f}%" if isinstance(pct, (int, float)) and not isinstance(pct, bool) else "?"
lines.append(f"Cluster {cid}: tamano={_fmt_num(size)} ({pct_str} del total)")
distinctive = prof.get("distinctive") or []
centroid_o = prof.get("centroid_original") or {}
centroid_z = prof.get("centroid_z") or {}
if distinctive:
lines.append(" Features distintivas (media del grupo):")
for feat in distinctive:
val = centroid_o.get(feat)
z = centroid_z.get(feat)
direction = ""
if isinstance(z, (int, float)) and not isinstance(z, bool):
if z > 0:
direction = "por encima de la media"
elif z < 0:
direction = "por debajo de la media"
else:
direction = "en la media"
if direction:
lines.append(f" - {feat}: {_fmt_num(val)} ({direction})")
else:
lines.append(f" - {feat}: {_fmt_num(val)}")
else:
lines.append(" (sin features distintivas marcadas)")
lines.append("")
lines.append(
"Devuelve SOLO el JSON array descrito en las instrucciones del sistema, "
"sin texto antes ni despues."
)
return "\n".join(lines)
def _parse_clusters_json(text: str, n: int):
"""Extrae y normaliza el array JSON de la respuesta del LLM.
Funcion interna testeable sin red. Localiza el primer '[' y el ultimo ']' del
texto (tolerando texto basura alrededor o fences de markdown), hace json.loads
y normaliza cada entrada a {cluster:int, title:str, description:str}, rellenando
el cluster por indice si falta. NUNCA lanza: ante cualquier fallo devuelve None
(senal de degradacion para el caller).
Args:
text: respuesta cruda del LLM.
n: numero de perfiles esperados (referencia; la longitud real la marca el array).
Returns:
Lista normalizada de dicts, o None si no se pudo parsear un array valido.
"""
if not text or not isinstance(text, str):
return None
start = text.find("[")
end = text.rfind("]")
if start == -1 or end == -1 or end <= start:
return None
try:
data = json.loads(text[start : end + 1])
except (ValueError, TypeError):
return None
if not isinstance(data, list):
return None
out = []
for i, item in enumerate(data):
if not isinstance(item, dict):
out.append({"cluster": i, "title": f"Cluster {i}", "description": ""})
continue
raw_cluster = item.get("cluster")
if isinstance(raw_cluster, bool):
cluster = i
elif isinstance(raw_cluster, int):
cluster = raw_cluster
else:
try:
cluster = int(raw_cluster)
except (TypeError, ValueError):
cluster = i
title = item.get("title")
title = str(title) if title is not None else f"Cluster {cluster}"
desc = item.get("description")
desc = str(desc) if desc is not None else ""
out.append({"cluster": cluster, "title": title, "description": desc})
return out
def _generic_clusters(cluster_profiles: list) -> list:
"""Titulos genericos por cluster para la degradacion (sin LLM)."""
out = []
for i, prof in enumerate(cluster_profiles):
cid = _cluster_id(prof or {}, i)
out.append({"cluster": cid, "title": f"Cluster {cid}", "description": ""})
return out
def describe_clusters_llm(
cluster_profiles: list,
feature_names: list,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""Describe los clusters de un KMeans con UNA sola llamada al LLM.
Args:
cluster_profiles: lista de perfiles de cluster (la forma que produce
project_clusters_2d): cada uno {"cluster": int, "size": int,
"pct": float, "centroid_original": {feature: media},
"distinctive": [features], "centroid_z": {feature: z}}. Solo se le
envia al LLM el resumen agregado, nunca filas crudas.
feature_names: nombres de las features del dataset (contexto para el LLM).
model: id del modelo Anthropic. Default claude-haiku-4-5-20251001
(haiku, coste bajo).
Returns:
dict dict-no-throw: {"clusters": [{cluster:int, title:str, description:str}],
"model": str, "note": str}. note == "" si todo fue bien; si el LLM no
respondio o el parseo fallo, clusters trae titulos genericos "Cluster N" y
note explica el motivo ("LLM no disponible" / "parse fallido"). Si
cluster_profiles esta vacio o no es lista, devuelve clusters=[] sin llamar
al LLM (note "sin clusters"). NUNCA lanza.
"""
if not isinstance(cluster_profiles, list) or not cluster_profiles:
return {"clusters": [], "model": model, "note": "sin clusters"}
n = len(cluster_profiles)
prompt = _build_prompt(cluster_profiles, feature_names)
try:
text = ask_llm(prompt, model=model, system=_SYSTEM, echo=False)
except Exception: # noqa: BLE001 — degradacion: cualquier fallo de red/LLM.
text = ""
parsed = _parse_clusters_json(text, n)
if parsed:
return {"clusters": parsed, "model": model, "note": ""}
note = "LLM no disponible" if not text else "parse fallido"
return {"clusters": _generic_clusters(cluster_profiles), "model": model, "note": note}
@@ -0,0 +1,160 @@
"""Tests para describe_clusters_llm.
NO acceden a red ni a credenciales: _parse_clusters_json es testeable aislada y la
unica via que llamaria al LLM (describe_clusters_llm) se prueba monkeypatcheando
ask_llm con respuestas simuladas. Cubre golden (LLM ok), edge (cluster faltante,
array envuelto en basura, lista vacia / input no-lista) y error (LLM caido, texto
no parseable) — todos sin tocar la red.
"""
import importlib
import json
from datascience.describe_clusters_llm import (
_parse_clusters_json,
describe_clusters_llm,
)
# Perfiles de ejemplo con la forma que produce project_clusters_2d.
_PROFILES = [
{
"cluster": 0,
"size": 60,
"pct": 60.0,
"centroid_original": {"acidez": 8.5, "alcohol": 9.2},
"distinctive": ["acidez", "alcohol"],
"centroid_z": {"acidez": 1.4, "alcohol": -0.9},
},
{
"cluster": 1,
"size": 40,
"pct": 40.0,
"centroid_original": {"acidez": 5.1, "alcohol": 13.0},
"distinctive": ["alcohol"],
"centroid_z": {"acidez": -0.7, "alcohol": 1.6},
},
]
_FEATURES = ["acidez", "alcohol", "azucar"]
def _patch_ask_llm(monkeypatch, returner):
"""Monkeypatchea ask_llm en el modulo bajo prueba con un callable simulado."""
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(
mod, "ask_llm", lambda prompt, model="x", system="", echo=True: returner
)
# --- _parse_clusters_json (parser puro, sin red) ---
def test_parse_clusters_json_valid_array():
text = json.dumps(
[
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed == [
{"cluster": 0, "title": "A", "description": "desc a"},
{"cluster": 1, "title": "B", "description": "desc b"},
]
def test_parse_clusters_json_wrapped_in_junk_text():
payload = [{"cluster": 0, "title": "Solo uno", "description": "d"}]
text = "Claro, aqui tienes el resultado:\n" + json.dumps(payload) + "\nEspero que sirva."
parsed = _parse_clusters_json(text, 1)
assert parsed[0]["title"] == "Solo uno"
assert parsed[0]["cluster"] == 0
def test_parse_clusters_json_non_json_returns_none():
# Texto sin array JSON -> degradacion (None) sin lanzar.
assert _parse_clusters_json("no hay json aqui", 2) is None
assert _parse_clusters_json("", 2) is None
assert _parse_clusters_json("{solo un objeto}", 2) is None
def test_parse_clusters_json_fills_missing_cluster_by_index():
text = json.dumps(
[
{"title": "A", "description": "d"},
{"title": "B", "description": "e"},
]
)
parsed = _parse_clusters_json(text, 2)
assert parsed[0]["cluster"] == 0
assert parsed[1]["cluster"] == 1
assert parsed[0]["title"] == "A"
# --- describe_clusters_llm (con ask_llm monkeypatcheado, sin red) ---
def test_describe_clusters_llm_ok_with_monkeypatched_llm(monkeypatch):
fake = json.dumps(
[
{
"cluster": 0,
"title": "Vinos de alta acidez",
"description": "Acidez por encima de la media y graduacion baja.",
},
{
"cluster": 1,
"title": "Vinos de alta graduacion",
"description": "Alcohol claramente por encima de la media.",
},
]
)
_patch_ask_llm(monkeypatch, fake)
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["note"] == ""
assert out["model"] == "claude-haiku-4-5-20251001"
assert len(out["clusters"]) == 2
assert out["clusters"][0]["title"] == "Vinos de alta acidez"
assert set(out["clusters"][0].keys()) == {"cluster", "title", "description"}
def test_describe_clusters_llm_degrades_on_empty_response(monkeypatch):
# ask_llm devuelve "" (error/red caida) -> titulos genericos + note.
_patch_ask_llm(monkeypatch, "")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["clusters"][0]["description"] == ""
assert out["note"] == "LLM no disponible"
assert out["model"] == "claude-haiku-4-5-20251001"
def test_describe_clusters_llm_degrades_on_unparseable_response(monkeypatch):
_patch_ask_llm(monkeypatch, "lo siento, no puedo ayudarte con eso")
out = describe_clusters_llm(_PROFILES, _FEATURES)
assert out["clusters"][0]["title"] == "Cluster 0"
assert out["clusters"][1]["title"] == "Cluster 1"
assert out["note"] == "parse fallido"
def test_describe_clusters_llm_empty_list_skips_llm(monkeypatch):
# Con lista vacia NO debe llamarse al LLM en absoluto.
def boom(*args, **kwargs):
raise AssertionError("ask_llm no debe llamarse con lista vacia")
mod = importlib.import_module("datascience.describe_clusters_llm")
monkeypatch.setattr(mod, "ask_llm", boom)
out = describe_clusters_llm([], _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
def test_describe_clusters_llm_non_list_input_skips_llm():
# Input no-lista (None) -> clusters vacio sin tocar la red.
out = describe_clusters_llm(None, _FEATURES)
assert out["clusters"] == []
assert out["note"] == "sin clusters"
assert out["model"] == "claude-haiku-4-5-20251001"
@@ -0,0 +1,95 @@
---
name: project_clusters_2d
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def project_clusters_2d(columns: dict, k_min: int = 2, k_max: int = 8, max_points: int = 2000) -> dict"
description: "PCA a 2D + KMeans sobre el MISMO subset numerico estandarizado, devolviendo proyeccion 2D y labels de cluster ALINEADOS por fila para pintar un scatter PCA coloreado por cluster. Estandariza una sola vez, elige k por silhouette y proyecta centroides al espacio PCA. Determinista."
tags: [eda, models, clustering, pca, kmeans, scatter, dimensionality-reduction, datascience, sklearn]
params:
- name: columns
desc: "Mapa {nombre_columna: [valores numericos]}. Listas alineadas por fila (misma longitud). Columnas no numericas o con <2 valores distintos se descartan; None/NaN descartan la fila completa (listwise)."
- name: k_min
desc: "Numero minimo de clusters a probar por silhouette (default 2). El minimo de filas validas requerido es max(3, k_min*2)."
- name: k_max
desc: "Numero maximo de clusters a probar (default 8). Se acota a min(k_max, n_filas_validas-1)."
- name: max_points
desc: "Tope de puntos devueltos en points/labels (default 2000). Si n_used lo supera, points y labels se submuestrean CONJUNTAMENTE con paso determinista para seguir alineados; el fit usa siempre todas las filas."
output: "dict con points (proyeccion 2D, posiblemente submuestreada a max_points), labels (cluster de cada point, alineado con points), centers_2d (centroides en espacio PCA, len==best_k), best_k, silhouette, explained_2d ([var PC1, var PC2]), cluster_sizes (sobre n_used total), cluster_profiles (lista de {cluster, size, pct, centroid_original, distinctive top-3 por |z|, centroid_z}), feature_names, n_used (filas del fit antes de muestreo) y note (\"\" si ok). Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve best_k=0, listas vacias y note 'datos insuficientes' sin lanzar excepcion."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [numpy, scikit-learn]
tested: true
tests: ["test_golden_three_blobs_aligned_projection_and_clusters", "test_edge_subsampling_keeps_points_labels_aligned", "test_edge_single_numeric_column_insufficient", "test_edge_too_few_rows_insufficient", "test_edge_non_numeric_column_dropped_without_error", "test_edge_constant_column_dropped"]
test_file_path: "python/functions/datascience/project_clusters_2d_test.py"
file_path: "python/functions/datascience/project_clusters_2d.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.project_clusters_2d import project_clusters_2d
# Tres grupos gaussianos bien separados sobre 4 features.
import numpy as np
rng = np.random.default_rng(0)
rows = []
for center in (np.full(4, 0.0), np.full(4, 12.0), np.array([0.0, 12.0, 0.0, 12.0])):
rows.extend(rng.normal(loc=center, scale=0.4, size=(50, 4)))
mat = np.array(rows)
columns = {f"f{j}": [float(v) for v in mat[:, j]] for j in range(4)}
res = project_clusters_2d(columns, k_min=2, k_max=8)
print(res["best_k"]) # 3
print(len(res["points"]), len(res["labels"])) # 150 150 (alineados)
print(len(res["centers_2d"])) # == best_k
print([round(v, 2) for v in res["explained_2d"]]) # varianza de PC1, PC2
# Pintar: scatter(points[:,0], points[:,1], c=labels) + marcar centers_2d.
```
## Cuando usarla
Cuando, durante un EDA, quieres un scatter 2D de un dataset tabular numerico
coloreado por segmento descubierto automaticamente, y necesitas que cada punto
de la proyeccion lleve su etiqueta de cluster correcta. Usala en vez de
combinar `pca_explained` + `kmeans_segments` a mano: esas estandarizan por
separado y descartan los labels, asi que sus salidas no se pueden cruzar fila a
fila. Esta funcion garantiza esa alineacion (mismo X estandarizado para PCA y
KMeans) y ademas proyecta los centroides KMeans al espacio PCA para dibujarlos.
## Gotchas
- Funcion pura y determinista (StandardScaler + PCA random_state=0 + KMeans
random_state=0, n_init=10), pero requiere `numpy` y `scikit-learn` instalados.
- `points`/`labels` pueden venir submuestreados si `n_used > max_points` (paso
determinista `[::ceil(n_used/max_points)]`); `n_used`, `centers_2d`,
`cluster_sizes` y `cluster_profiles` se calculan SIEMPRE sobre todas las filas.
Cuando hay submuestreo, `note` lo indica.
- `centroid_z` y `distinctive` estan en z-score (espacio escalado);
`centroid_original` esta en las unidades originales (via
`scaler.inverse_transform`). No mezcles ambos al interpretar.
- `centers_2d` esta en el espacio PCA (coordenadas del scatter), no en unidades
originales: pintalo sobre el mismo eje que `points`.
- Silhouette baja con best_k alto sugiere que no hay estructura de cluster real;
el scatter puede no mostrar grupos separados.
## Notas
Pieza de composicion que `pca_explained` + `kmeans_segments` no cubren: ambas
estandarizan internamente por separado (cada una su propio `StandardScaler`) y
`kmeans_segments` no expone los labels por fila, por lo que no se pueden cruzar
con la `projection` de `pca_explained`. Esta funcion usa `sklearn` directo
(StandardScaler una sola vez compartido por PCA y KMeans) para garantizar la
alineacion `points[i] <-> labels[i]` y proyectar los centroides KMeans al
espacio PCA. Coercion y listwise deletion siguen el estilo de `pca_explained`
(None/NaN -> fila descartada, columnas no parseables o constantes descartadas).
Degrada con gracia: con <2 columnas numericas o <max(3, k_min*2) filas validas
devuelve `note: "datos insuficientes"` sin lanzar excepcion (try/except
defensivo en todo el cuerpo).
@@ -0,0 +1,208 @@
"""Proyeccion PCA-2D + KMeans sobre el mismo subset, con puntos y labels alineados.
Estandariza una sola vez las columnas numericas (z-score), proyecta a 2D con PCA
y clusteriza con KMeans sobre EXACTAMENTE la misma matriz escalada, de modo que
la proyeccion 2D (`points`) y la etiqueta de cluster (`labels`) quedan alineadas
fila a fila. Es la pieza que `pca_explained` + `kmeans_segments` no cubren: esas
dos estandarizan por separado y descartan los labels, asi que sus salidas no se
pueden cruzar para pintar un scatter PCA coloreado por cluster. Determinista.
"""
import math
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
def project_clusters_2d(
columns: dict,
k_min: int = 2,
k_max: int = 8,
max_points: int = 2000,
) -> dict:
"""Proyecta a 2D (PCA) y clusteriza (KMeans) el mismo subset estandarizado.
PCA a 2D y KMeans se ajustan sobre la MISMA matriz estandarizada, por lo que
`points` (proyeccion 2D) y `labels` (cluster por fila) quedan alineados por
indice. El k se elige automaticamente por silhouette en el rango
[k_min, min(k_max, n_rows-1)], igual criterio que `kmeans_segments`.
Determinista: StandardScaler + PCA(random_state=0) + KMeans(random_state=0,
n_init=10).
Args:
columns: mapa {nombre_columna: [valores numericos]}. Listas alineadas por
fila (misma longitud). Columnas no numericas o con menos de 2 valores
distintos se descartan. None/NaN marcan filas a descartar listwise
(una fila se elimina si cualquier feature falta).
k_min: numero minimo de clusters a probar (default 2).
k_max: numero maximo de clusters a probar (default 8). Se acota a
min(k_max, n_rows_validas-1).
max_points: tope de puntos devueltos en `points`/`labels`. Si las filas
usadas superan este tope, se submuestrea points y labels CONJUNTAMENTE
con paso determinista para mantenerlos alineados. El fit (best_k,
silhouette, centroides, perfiles) usa SIEMPRE todas las filas.
Returns:
dict con points (proyeccion 2D, posiblemente submuestreada a max_points),
labels (cluster de cada point, alineado con points), centers_2d
(centroides en espacio PCA, len == best_k), best_k, silhouette,
explained_2d (varianza de PC1 y PC2), cluster_sizes (sobre n_used total),
cluster_profiles (ver abajo), feature_names, n_used (filas del fit antes
de muestreo) y note ("" si ok). Cada entrada de cluster_profiles:
{cluster, size, pct, centroid_original (medias en escala original),
centroid_z (z del centroide), distinctive (top 3 features por |z|)}.
Con <2 columnas numericas o <max(3, k_min*2) filas validas devuelve
best_k=0 y note "datos insuficientes" sin lanzar excepcion.
"""
feature_names: list[str] = []
def insufficient(names: list[str], n_used: int) -> dict:
return {
"best_k": 0,
"points": [],
"labels": [],
"centers_2d": [],
"cluster_profiles": [],
"feature_names": names,
"n_used": int(n_used),
"note": "datos insuficientes",
}
try:
if not isinstance(columns, dict) or not columns:
return insufficient([], 0)
# 1. Coerce a numerico, descartando columnas no parseables o constantes.
numeric_cols: dict[str, list] = {}
for name, values in columns.items():
if not isinstance(values, (list, tuple)):
continue
coerced: list[float] = []
usable = True
for v in values:
if v is None:
coerced.append(math.nan)
continue
try:
coerced.append(float(v))
except (TypeError, ValueError):
usable = False
break
if not usable:
continue
# Menos de 2 valores distintos no aporta varianza -> descartar.
distinct = {x for x in coerced if not math.isnan(x)}
if len(distinct) < 2:
continue
numeric_cols[name] = coerced
feature_names = list(numeric_cols.keys())
if len(feature_names) < 2:
return insufficient(feature_names, 0)
# 2. Matriz alineada por fila + listwise deletion (cualquier NaN -> fuera).
matrix = np.array(
[numeric_cols[n] for n in feature_names], dtype=float
).T
valid_mask = ~np.isnan(matrix).any(axis=1)
data = matrix[valid_mask]
n_used = int(data.shape[0])
min_rows = max(3, k_min * 2)
if n_used < min_rows:
return insufficient(feature_names, n_used)
# 3. Estandarizar UNA sola vez (guardamos el scaler para desestandarizar).
scaler = StandardScaler()
X_scaled = scaler.fit_transform(data)
# 4. PCA a 2D sobre la matriz escalada.
pca = PCA(n_components=2, random_state=0)
pca.fit(X_scaled)
proj = pca.transform(X_scaled)
# 5. KMeans con seleccion automatica de k por silhouette (mismo X_scaled).
upper_k = min(k_max, n_used - 1)
if upper_k < k_min:
return insufficient(feature_names, n_used)
best = None # (silhouette, k, model, labels)
for k in range(k_min, upper_k + 1):
model = KMeans(n_clusters=k, n_init=10, random_state=0)
labels_k = model.fit_predict(X_scaled)
if len(set(labels_k)) < 2:
sil = -1.0
else:
sil = float(silhouette_score(X_scaled, labels_k))
if best is None or sil > best[0]:
best = (sil, k, model, labels_k)
best_sil, best_k, best_model, labels = best
# 6. Centroides KMeans (espacio escalado) proyectados al espacio PCA.
centers_2d = pca.transform(best_model.cluster_centers_)
# 7. Perfiles por cluster sobre TODAS las filas usadas.
centroids_original = scaler.inverse_transform(best_model.cluster_centers_)
cluster_sizes: list[int] = []
cluster_profiles: list[dict] = []
for c in range(best_k):
size = int(np.sum(labels == c))
cluster_sizes.append(size)
z_vec = best_model.cluster_centers_[c]
orig_vec = centroids_original[c]
centroid_z = {
feature_names[j]: float(z_vec[j]) for j in range(len(feature_names))
}
centroid_original = {
feature_names[j]: float(orig_vec[j])
for j in range(len(feature_names))
}
order = np.argsort(np.abs(z_vec))[::-1]
distinctive = [feature_names[int(j)] for j in order[:3]]
cluster_profiles.append(
{
"cluster": int(c),
"size": size,
"pct": float(size / n_used) if n_used else 0.0,
"centroid_original": centroid_original,
"distinctive": distinctive,
"centroid_z": centroid_z,
}
)
# 8. Muestreo determinista CONJUNTO de points + labels (mantiene alineacion).
note = ""
if n_used > max_points and max_points > 0:
step = math.ceil(n_used / max_points)
proj_out = proj[::step]
labels_out = labels[::step]
note = f"submuestreado a {len(proj_out)} de {n_used} puntos para visualizacion"
else:
proj_out = proj
labels_out = labels
points = [[float(row[0]), float(row[1])] for row in proj_out]
labels_list = [int(v) for v in labels_out]
centers_list = [[float(row[0]), float(row[1])] for row in centers_2d]
explained_2d = [float(x) for x in pca.explained_variance_ratio_]
return {
"points": points,
"labels": labels_list,
"centers_2d": centers_list,
"best_k": int(best_k),
"silhouette": float(best_sil),
"explained_2d": explained_2d,
"cluster_sizes": cluster_sizes,
"cluster_profiles": cluster_profiles,
"feature_names": feature_names,
"n_used": n_used,
"note": note,
}
except Exception:
# Lectura defensiva: nunca propagar excepciones al caller del EDA.
return insufficient(feature_names, 0)
@@ -0,0 +1,127 @@
"""Tests para project_clusters_2d."""
import numpy as np
from project_clusters_2d import project_clusters_2d
def _three_blobs(seed: int = 0, per_blob: int = 50, n_features: int = 4):
"""Genera 3 gaussianas bien separadas en n_features dims, alineadas por fila.
Devuelve un dict {col: [valores]} con las columnas alineadas por fila.
"""
rng = np.random.default_rng(seed)
base_centers = [
np.full(n_features, 0.0),
np.full(n_features, 12.0),
np.array([0.0, 12.0, 0.0, 12.0][:n_features] + [0.0] * max(0, n_features - 4)),
]
rows: list[np.ndarray] = []
for center in base_centers:
pts = rng.normal(loc=center, scale=0.4, size=(per_blob, n_features))
rows.extend(pts)
mat = np.array(rows)
return {f"f{j}": [float(v) for v in mat[:, j]] for j in range(n_features)}
def test_golden_three_blobs_aligned_projection_and_clusters():
columns = _three_blobs(seed=0, per_blob=50, n_features=4)
result = project_clusters_2d(columns, k_min=2, k_max=8)
n_used = result["n_used"]
assert n_used == 150
assert result["note"] == ""
best_k = result["best_k"]
assert 2 <= best_k <= 4
# points y labels alineados por fila.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) == n_used # sin submuestreo (150 < 2000)
# Cada punto es un par (x, y).
assert all(len(p) == 2 for p in result["points"])
# Labels dentro del rango [0, best_k).
assert all(0 <= lbl < best_k for lbl in result["labels"])
# Centroides 2D: uno por cluster.
assert len(result["centers_2d"]) == best_k
assert all(len(c) == 2 for c in result["centers_2d"])
# Varianza explicada de los 2 componentes.
assert len(result["explained_2d"]) == 2
# cluster_sizes cubre todas las filas usadas.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["cluster_sizes"]) == best_k
# cluster_profiles: una entrada por cluster, con centroid_original poblado.
assert len(result["cluster_profiles"]) == best_k
for prof in result["cluster_profiles"]:
assert set(prof["centroid_original"].keys()) == set(result["feature_names"])
assert set(prof["centroid_z"].keys()) == set(result["feature_names"])
assert 1 <= len(prof["distinctive"]) <= 3
assert prof["size"] >= 0
assert 0.0 <= prof["pct"] <= 1.0
def test_edge_subsampling_keeps_points_labels_aligned():
# max_points pequeño fuerza submuestreo conjunto de points + labels.
columns = _three_blobs(seed=1, per_blob=50, n_features=3)
result = project_clusters_2d(columns, k_min=2, k_max=6, max_points=40)
n_used = result["n_used"]
assert n_used == 150 # el fit usa todas las filas
# points y labels submuestreados pero siempre con la misma longitud.
assert len(result["points"]) == len(result["labels"])
assert len(result["points"]) <= 40
# centers/sizes/profiles se calculan sobre TODOS los puntos.
assert sum(result["cluster_sizes"]) == n_used
assert len(result["centers_2d"]) == result["best_k"]
assert result["note"] != "" # senala el submuestreo
def test_edge_single_numeric_column_insufficient():
columns = {"x": [float(i) for i in range(50)]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
assert result["points"] == []
assert result["labels"] == []
assert result["centers_2d"] == []
assert result["cluster_profiles"] == []
def test_edge_too_few_rows_insufficient():
# Solo 2 filas validas, min_rows = max(3, k_min*2) = 4 -> insuficiente.
columns = {"x": [1.0, 5.0], "y": [2.0, 9.0]}
result = project_clusters_2d(columns, k_min=2, k_max=8)
assert result["best_k"] == 0
assert result["note"] == "datos insuficientes"
def test_edge_non_numeric_column_dropped_without_error():
# La columna de strings se descarta; quedan 3 numericas -> funciona.
columns = _three_blobs(seed=2, per_blob=50, n_features=3)
columns["label"] = ["a"] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert result["best_k"] >= 2
assert "label" not in result["feature_names"]
assert set(result["feature_names"]) == {"f0", "f1", "f2"}
assert len(result["points"]) == len(result["labels"])
def test_edge_constant_column_dropped():
# Una columna constante (0 varianza) se descarta por <2 valores distintos.
columns = _three_blobs(seed=3, per_blob=50, n_features=3)
columns["const"] = [7.0] * len(columns["f0"])
result = project_clusters_2d(columns, k_min=2, k_max=6)
assert "const" not in result["feature_names"]
assert result["best_k"] >= 2