Files
Visualizaciones/prueba_de_embeddings .py
egutierrez 46573ccc8e Add drawing and visualization applications with Marimo framework
- Implement dibujar.py for drawing functionality with base64 and PIL image rendering.
- Create dibujar_retropaint.py for retro painting features using the Paint widget.
- Develop draw_data.py to visualize data with Scatter and Bar widgets, including lazy installation of dependencies.
- Add layout configuration for graphical representations in layouts/Graficos_plotly.grid.json.
- Enhance shell interaction with mejora_shell_mowidget.py, allowing local library imports and script execution.
- Introduce primera_prueba_shell_mowidget.py for testing shell commands and user input handling.
- Create prueba_de_embeddings.py for embedding visualizations using Sentence Transformers and dimensionality reduction techniques.
- Implement pygwalker_visualizaciones.py for interactive data exploration and visualization using Pygwalker.
- Add a sample bash script for user input and ping functionality in scripts/mi_script.sh.
2025-09-02 23:53:01 +02:00

480 lines
16 KiB
Python

import marimo
__generated_with = "0.15.2"
app = marimo.App(width="medium")
@app.cell
def _():
# marimo
import marimo as mo
# Data & math
import pandas as pd
import numpy as np
# Embeddings
from sentence_transformers import SentenceTransformer
# Dimensionality reduction & metrics
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics import pairwise_distances
from sklearn.neighbors import NearestNeighbors
# Viz
import altair as alt
# Misc
from functools import lru_cache
import io
import json
# --- Notes (English): ---
# - All comments are in English as requested.
# - We avoid variable re-declarations across cells (Marimo DAG rule).
# - The last expression in each cell is displayed automatically.
return (
NearestNeighbors,
PCA,
SentenceTransformer,
TSNE,
alt,
io,
lru_cache,
mo,
np,
pairwise_distances,
pd,
)
@app.cell
def _(mo):
# UI elements (defined here; values used in later cells)
sample_toggle = mo.ui.checkbox(label="Use sample dataset (tiny demo)", value=True)
text_area = mo.ui.text_area(
value="Label,Text\ntech,Transformers accelerate NLP research.\ntech,Embeddings capture semantic meaning.\n"
"finance,Markets react to macroeconomic signals.\nfinance,Portfolio optimization reduces risk.\n"
"sports,The team improved defense and strategy.\nsports,Training intensity boosts performance.\n",
label="CSV data (columns: Label,Text)", full_width=True
)
model_a_dropdown = mo.ui.dropdown(
options=[
"sentence-transformers/all-MiniLM-L6-v2",
"thenlper/gte-small",
"BAAI/bge-small-en-v1.5",
"sentence-transformers/paraphrase-MiniLM-L6-v2",
],
value="sentence-transformers/all-MiniLM-L6-v2",
label="Model A"
)
model_b_dropdown = mo.ui.dropdown(
options=[
"sentence-transformers/all-MiniLM-L6-v2",
"thenlper/gte-small",
"BAAI/bge-small-en-v1.5",
"sentence-transformers/paraphrase-MiniLM-L6-v2",
],
value="BAAI/bge-small-en-v1.5",
label="Model B"
)
reducer_dropdown = mo.ui.radio(
options=["PCA", "TSNE"],
value="PCA",
label="Dimensionality reduction"
)
k_slider = mo.ui.slider(3, 20, value=10, label="k for neighborhood agreement")
mo.vstack([
mo.md("### Data & Models"),
sample_toggle,
text_area,
mo.hstack([model_a_dropdown, model_b_dropdown]),
mo.hstack([reducer_dropdown, k_slider]),
])
return (
k_slider,
model_a_dropdown,
model_b_dropdown,
reducer_dropdown,
text_area,
)
@app.cell
def _(io, pd, text_area):
# Build dataframe from UI state.
# If sample_toggle is True, parse the sample CSV from text_area; otherwise expect user-provided CSV.
def _parse_csv_to_df(csv_text: str) -> pd.DataFrame:
# Parse CSV robustly
df = pd.read_csv(io.StringIO(csv_text))
# Basic schema validation
expected_cols = {"Label", "Text"}
if not expected_cols.issubset(set(df.columns)):
raise ValueError("CSV must contain columns: Label, Text")
# Ensure string types
df["Label"] = df["Label"].astype(str)
df["Text"] = df["Text"].astype(str)
return df
dataframe_input = _parse_csv_to_df(text_area.value)
dataframe_input
return (dataframe_input,)
@app.cell
def _(SentenceTransformer, lru_cache, mo, model_a_dropdown, model_b_dropdown):
# Cache model loading to avoid repeated downloads.
@lru_cache(maxsize=4)
def load_model_cached(model_name: str) -> SentenceTransformer:
# English: caches SentenceTransformer for responsiveness
return SentenceTransformer(model_name)
selected_model_a = model_a_dropdown.value
selected_model_b = model_b_dropdown.value
mo.hstack([
mo.md(f"**Model A:** `{selected_model_a}`"),
mo.md(f"**Model B:** `{selected_model_b}`"),
])
return load_model_cached, selected_model_a, selected_model_b
@app.cell
def _(
dataframe_input,
load_model_cached,
np,
selected_model_a,
selected_model_b,
):
# Generate embeddings for both models on the same text order.
texts_for_embedding = dataframe_input["Text"].tolist()
def _embed_texts(model_name: str, texts: list[str]) -> np.ndarray:
model = load_model_cached(model_name)
return model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
embeddings_a = _embed_texts(selected_model_a, texts_for_embedding)
embeddings_b = _embed_texts(selected_model_b, texts_for_embedding)
# Show shapes as a quick check
{"A_shape": embeddings_a.shape, "B_shape": embeddings_b.shape}
return embeddings_a, embeddings_b
@app.cell
def _(
PCA,
TSNE,
dataframe_input,
embeddings_a,
embeddings_b,
np,
pd,
reducer_dropdown,
):
# Reduce to 2D using PCA or TSNE.
reducer_choice = reducer_dropdown.value
def _reduce_2d(X: np.ndarray, method: str) -> np.ndarray:
if method == "PCA":
reducer = PCA(n_components=2, random_state=42)
return reducer.fit_transform(X)
elif method == "TSNE":
# TSNE default perplexity 30 works fine for small demos; set random_state for reproducibility.
reducer = TSNE(n_components=2, random_state=42, init="pca", learning_rate="auto")
return reducer.fit_transform(X)
else:
raise ValueError("Unknown reducer")
coords_a_2d = _reduce_2d(embeddings_a, reducer_choice)
coords_b_2d = _reduce_2d(embeddings_b, reducer_choice)
# Pack into tidy DataFrames for plotting
plot_df_a = pd.DataFrame({
"x": coords_a_2d[:, 0],
"y": coords_a_2d[:, 1],
"Label": dataframe_input["Label"],
"Text": dataframe_input["Text"],
"Model": "A"
})
plot_df_b = pd.DataFrame({
"x": coords_b_2d[:, 0],
"y": coords_b_2d[:, 1],
"Label": dataframe_input["Label"],
"Text": dataframe_input["Text"],
"Model": "B"
})
plot_df_combined = pd.concat([plot_df_a, plot_df_b], ignore_index=True)
plot_df_combined
return (
coords_a_2d,
coords_b_2d,
plot_df_a,
plot_df_b,
plot_df_combined,
reducer_choice,
)
@app.cell
def _(alt, plot_df_combined, reducer_choice):
# Build linked charts: one for Model A and one for Model B.
# English: We use consistent color mapping by Label across both charts.
# Calculate consistent color domain
label_domain = sorted(plot_df_combined["Label"].unique().tolist())
base = alt.Chart(plot_df_combined).mark_circle(size=80).encode(
x=alt.X("x:Q", title=f"{reducer_choice} 1"),
y=alt.Y("y:Q", title=f"{reducer_choice} 2"),
color=alt.Color("Label:N", legend=alt.Legend(title="Label"), scale=alt.Scale(scheme="category10", domain=label_domain)),
tooltip=["Model:N", "Label:N", "Text:N"]
).properties(width=350, height=300)
chart_a = base.transform_filter(alt.datum.Model == "A").properties(title="Model A")
chart_b = base.transform_filter(alt.datum.Model == "B").properties(title="Model B")
alt.hconcat(chart_a, chart_b).resolve_scale(color="shared")
return (label_domain,)
@app.cell
def _(NearestNeighbors, coords_a_2d, coords_b_2d, k_slider, mo, np, pd):
# Compute neighborhood agreement: for each point, overlap between k-NN in A vs B (after 2D reduction).
# English: Robust version that clips k to the valid range based on dataset size.
# from sklearn.neighbors import NearestNeighbors
# Dataset size
n_points = int(coords_a_2d.shape[0])
# Guard: need at least 3 points to compute non-trivial neighbors (exclude self)
mo.stop(n_points < 3, mo.md("Need at least **3** rows to compute neighborhood overlap."))
# Clip k so that kneighbors sees n_neighbors <= n_samples - 1 (exclude self)
requested_k = int(k_slider.value)
max_valid_k = max(1, n_points - 2) # ensures (k+1) <= (n_points - 1)
effective_k = min(requested_k, max_valid_k)
# Informative message if clipping happened
k_info = mo.md(
f"Using **k = {effective_k}** (requested {requested_k}, max valid {max_valid_k} for n={n_points})."
) if effective_k != requested_k else mo.md(f"Using **k = {effective_k}** for n={n_points}.")
def _knn_indices(X2d: np.ndarray, k: int) -> np.ndarray:
# English: use k+1 to include self in the neighbor list, then drop self (index 0)
# n_neighbors must be strictly less than n_samples, so pass (k+1) <= (n_points - 1)
nbrs = NearestNeighbors(n_neighbors=k + 1, metric="euclidean")
nbrs.fit(X2d)
indices = nbrs.kneighbors(return_distance=False)
return indices[:, 1:] # drop self
knn_a = _knn_indices(coords_a_2d, k=effective_k)
knn_b = _knn_indices(coords_b_2d, k=effective_k)
def _rowwise_overlap(idx_a: np.ndarray, idx_b: np.ndarray) -> np.ndarray:
# English: compute per-point overlap fraction between two k-NN index sets
overlaps = []
for a, b in zip(idx_a, idx_b):
inter = len(set(a.tolist()).intersection(set(b.tolist())))
overlaps.append(inter / len(a))
return np.array(overlaps, dtype=float)
overlap_scores = _rowwise_overlap(knn_a, knn_b)
neighborhood_agreement_mean = float(np.mean(overlap_scores))
# Display small summary table (head) + info about effective k
mo.vstack([
k_info,
mo.ui.dataframe(
pd.DataFrame({"Point": np.arange(len(overlap_scores)),
f"Overlap@{effective_k}": overlap_scores})
),
mo.md(f"**Mean Overlap@{effective_k}:** {neighborhood_agreement_mean:.3f}")
])
return neighborhood_agreement_mean, overlap_scores
@app.cell
def _(coords_a_2d, coords_b_2d, dataframe_input, np, pd):
# For each label, compute 2D centroids in A and B, then report distance between centroids.
labels_series = dataframe_input["Label"]
unique_labels = sorted(labels_series.unique().tolist())
def _centroids(coords: np.ndarray, labels: pd.Series) -> dict[str, np.ndarray]:
out = {}
for lab in unique_labels:
pts = coords[labels.values == lab]
out[lab] = np.mean(pts, axis=0)
return out
centroids_a = _centroids(coords_a_2d, labels_series)
centroids_b = _centroids(coords_b_2d, labels_series)
centroid_rows = []
for lab in unique_labels:
ca = centroids_a[lab]
cb = centroids_b[lab]
dist = float(np.linalg.norm(ca - cb))
centroid_rows.append({"Label": lab, "CentroidShift(A_vs_B)": dist})
centroid_shift_df = pd.DataFrame(centroid_rows).sort_values("CentroidShift(A_vs_B)", ascending=False)
centroid_shift_df
return centroid_shift_df, labels_series
@app.cell
def _(embeddings_a, embeddings_b, np, pairwise_distances, pd):
# Compute pairwise distance matrices in the original embedding spaces (A vs B), compare rank correlation (Spearman).
# English: measures whether global structure is preserved similarly by the two models.
from scipy.stats import spearmanr
# Use cosine distances on normalized embeddings (already normalized earlier).
dist_a = pairwise_distances(embeddings_a, metric="cosine")
dist_b = pairwise_distances(embeddings_b, metric="cosine")
# Vectorize upper triangles (avoid diagonal)
triu_idx = np.triu_indices(dist_a.shape[0], k=1)
vec_a = dist_a[triu_idx]
vec_b = dist_b[triu_idx]
rho, p_val = spearmanr(vec_a, vec_b)
pd.DataFrame({"Spearman_rho":[float(rho)], "p_value":[float(p_val)]})
return p_val, rho
@app.cell
def _(
NearestNeighbors,
coords_a_2d,
coords_b_2d,
dataframe_input,
labels_series,
np,
pd,
):
def _label_density(coords: np.ndarray, labels: pd.Series, k: int = 5) -> pd.DataFrame:
# English: need at least 3 points; with n=2, dropping self leaves 0 neighbors
n_points = int(coords.shape[0])
if n_points < 3:
# Return minimal summary with NaNs to signal insufficient neighbors
return pd.DataFrame({"Label": labels.values, "LocalDensity": np.nan}) \
.groupby("Label", as_index=False) \
.agg(Size=("Label","count"), MeanLocalDensity=("LocalDensity","mean"))
# English: choose a valid n_neighbors for sklearn kneighbors
# We query (k_eff + 1) neighbors to include self, then drop self.
# Constraint: n_neighbors <= n_points - 1 to satisfy sklearn's strict inequality.
n_neighbors = min(k + 1, n_points - 1)
k_eff = max(1, n_neighbors - 1) # English: effective neighbors after dropping self, at least 1
nbrs = NearestNeighbors(n_neighbors=n_neighbors, metric="euclidean").fit(coords)
dists, _ = nbrs.kneighbors(return_distance=True)
# English: drop self at index 0 and average first k_eff distances
# If n_neighbors might be > k_eff+1 due to clipping, we still take only k_eff after removing self.
local_density = dists[:, 1:1 + k_eff].mean(axis=1)
df = pd.DataFrame({"Label": labels.values, "LocalDensity": local_density})
out = df.groupby("Label", as_index=False).agg(
Size=("Label","count"),
MeanLocalDensity=("LocalDensity","mean")
)
return out
# English: choose k safely based on dataset size (consistent with above guard)
k_density = min(5, max(1, len(dataframe_input) - 2))
density_a = _label_density(coords_a_2d, labels_series, k=k_density)
density_b = _label_density(coords_b_2d, labels_series, k=k_density)
summary_labels = density_a.merge(density_b, on="Label", suffixes=("_A", "_B"))
summary_labels
return (summary_labels,)
@app.cell
def _(
alt,
centroid_shift_df,
k_slider,
label_domain,
mo,
neighborhood_agreement_mean,
np,
overlap_scores,
p_val,
pd,
plot_df_a,
plot_df_b,
plot_df_combined,
rho,
summary_labels,
):
# UI panel summarizing metrics and allowing CSV export of 2D coords.
export_button = mo.ui.button("Export 2D coordinates (CSV)")
summary_md = mo.md(
f"""
### Summary
- Neighborhood agreement (mean Overlap@{k_slider.value}): **{neighborhood_agreement_mean:.3f}**
- Centroid shift (A vs B): min={centroid_shift_df['CentroidShift(A_vs_B)'].min():.3f}, max={centroid_shift_df['CentroidShift(A_vs_B)'].max():.3f}
- Global structure similarity (Spearman on pairwise distances): **rho={float(rho):.3f}** (p={float(p_val):.2e})
"""
)
# Prepare exported CSV
export_df = plot_df_combined.copy()
export_payload = export_df.to_csv(index=False)
mo.vstack([
summary_md,
mo.ui.tabs({
"Scatter A/B": mo.ui.altair_chart(alt.hconcat(
alt.Chart(plot_df_a).mark_circle(size=80).encode(
x="x:Q", y="y:Q", color=alt.Color("Label:N", scale=alt.Scale(domain=label_domain)),
tooltip=["Label:N","Text:N"]
).properties(title="A", width=360, height=320),
alt.Chart(plot_df_b).mark_circle(size=80).encode(
x="x:Q", y="y:Q", color=alt.Color("Label:N", scale=alt.Scale(domain=label_domain)),
tooltip=["Label:N","Text:N"]
).properties(title="B", width=360, height=320)
)),
"Overlap@k (head)": mo.ui.table(
pd.DataFrame({"Point": np.arange(len(overlap_scores)),
f"Overlap@{k_slider.value}": overlap_scores})
),
"Centroid shift": mo.ui.table(centroid_shift_df),
"Label summary": mo.ui.table(summary_labels),
}),
export_button
])
return
@app.cell
def _():
return
if __name__ == "__main__":
app.run()