763e06c127
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
102 lines
3.8 KiB
Python
102 lines
3.8 KiB
Python
"""Detección de segmentos naturales con KMeans + selección automática de k por silhouette."""
|
|
|
|
import numpy as np
|
|
from sklearn.cluster import KMeans
|
|
from sklearn.metrics import silhouette_score
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
|
|
def kmeans_segments(columns: dict, k_min: int = 2, k_max: int = 8) -> dict:
|
|
"""Detecta segmentos naturales en columnas numéricas con KMeans.
|
|
|
|
Estandariza las features, descarta las filas con algún valor None, y prueba
|
|
cada k en el rango [k_min, min(k_max, n_rows-1)] eligiendo el de mayor
|
|
silhouette. Determinista: KMeans usa random_state=0 y n_init fijo.
|
|
|
|
Args:
|
|
columns: dict {col_name: [valores numéricos]} con todas las listas
|
|
alineadas por fila (misma longitud).
|
|
k_min: número mínimo de clusters a probar (>= 2).
|
|
k_max: número máximo de clusters a probar (se acota a n_rows-1).
|
|
|
|
Returns:
|
|
dict con:
|
|
- best_k: k con mejor silhouette.
|
|
- silhouette: silhouette del mejor k.
|
|
- scores_by_k: lista de {k, silhouette, inertia} por cada k probado.
|
|
- cluster_sizes: tamaño de cada cluster del mejor modelo.
|
|
- centers: centroides del mejor modelo en el espacio estandarizado.
|
|
- n_rows_used: filas válidas usadas tras descartar None.
|
|
- n_features: número de columnas numéricas usadas.
|
|
Si hay menos de 2 columnas numéricas o menos de k_min*2 filas válidas,
|
|
devuelve {"best_k": 0, "note": "datos insuficientes"} sin lanzar error.
|
|
"""
|
|
# Quedarse solo con columnas cuyos valores sean numéricos (o None).
|
|
numeric_cols: list[str] = []
|
|
for name, values in columns.items():
|
|
ok = True
|
|
for v in values:
|
|
if v is None:
|
|
continue
|
|
if isinstance(v, bool) or not isinstance(v, (int, float)):
|
|
ok = False
|
|
break
|
|
if ok:
|
|
numeric_cols.append(name)
|
|
|
|
if len(numeric_cols) < 2:
|
|
return {"best_k": 0, "note": "datos insuficientes"}
|
|
|
|
# Construir matriz alineada por fila y descartar filas con algún None.
|
|
col_lists = [columns[name] for name in numeric_cols]
|
|
n_rows_total = min(len(c) for c in col_lists)
|
|
rows: list[list[float]] = []
|
|
for i in range(n_rows_total):
|
|
row = [col_lists[j][i] for j in range(len(numeric_cols))]
|
|
if any(v is None for v in row):
|
|
continue
|
|
rows.append([float(v) for v in row])
|
|
|
|
n_rows_used = len(rows)
|
|
n_features = len(numeric_cols)
|
|
|
|
if n_rows_used < k_min * 2:
|
|
return {"best_k": 0, "note": "datos insuficientes"}
|
|
|
|
X = np.asarray(rows, dtype=float)
|
|
X_scaled = StandardScaler().fit_transform(X)
|
|
|
|
upper_k = min(k_max, n_rows_used - 1)
|
|
if upper_k < k_min:
|
|
return {"best_k": 0, "note": "datos insuficientes"}
|
|
|
|
scores_by_k: list[dict] = []
|
|
best = None # (silhouette, k, model, labels)
|
|
for k in range(k_min, upper_k + 1):
|
|
model = KMeans(n_clusters=k, n_init=10, random_state=0)
|
|
labels = model.fit_predict(X_scaled)
|
|
# silhouette necesita al menos 2 clusters efectivos.
|
|
if len(set(labels)) < 2:
|
|
sil = -1.0
|
|
else:
|
|
sil = float(silhouette_score(X_scaled, labels))
|
|
scores_by_k.append(
|
|
{"k": k, "silhouette": sil, "inertia": float(model.inertia_)}
|
|
)
|
|
if best is None or sil > best[0]:
|
|
best = (sil, k, model, labels)
|
|
|
|
best_sil, best_k, best_model, best_labels = best
|
|
cluster_sizes = [int(np.sum(best_labels == c)) for c in range(best_k)]
|
|
centers = [[float(x) for x in center] for center in best_model.cluster_centers_]
|
|
|
|
return {
|
|
"best_k": best_k,
|
|
"silhouette": best_sil,
|
|
"scores_by_k": scores_by_k,
|
|
"cluster_sizes": cluster_sizes,
|
|
"centers": centers,
|
|
"n_rows_used": n_rows_used,
|
|
"n_features": n_features,
|
|
}
|