import marimo __generated_with = "0.15.2" app = marimo.App(width="medium") @app.cell def _(): # marimo import marimo as mo # Data & math import pandas as pd import numpy as np # Embeddings from sentence_transformers import SentenceTransformer # Dimensionality reduction & metrics from sklearn.decomposition import PCA from sklearn.manifold import TSNE from sklearn.metrics import pairwise_distances from sklearn.neighbors import NearestNeighbors # Viz import altair as alt # Misc from functools import lru_cache import io import json # --- Notes (English): --- # - All comments are in English as requested. # - We avoid variable re-declarations across cells (Marimo DAG rule). # - The last expression in each cell is displayed automatically. return ( NearestNeighbors, PCA, SentenceTransformer, TSNE, alt, io, lru_cache, mo, np, pairwise_distances, pd, ) @app.cell def _(mo): # UI elements (defined here; values used in later cells) sample_toggle = mo.ui.checkbox(label="Use sample dataset (tiny demo)", value=True) text_area = mo.ui.text_area( value="Label,Text\ntech,Transformers accelerate NLP research.\ntech,Embeddings capture semantic meaning.\n" "finance,Markets react to macroeconomic signals.\nfinance,Portfolio optimization reduces risk.\n" "sports,The team improved defense and strategy.\nsports,Training intensity boosts performance.\n", label="CSV data (columns: Label,Text)", full_width=True ) model_a_dropdown = mo.ui.dropdown( options=[ "sentence-transformers/all-MiniLM-L6-v2", "thenlper/gte-small", "BAAI/bge-small-en-v1.5", "sentence-transformers/paraphrase-MiniLM-L6-v2", ], value="sentence-transformers/all-MiniLM-L6-v2", label="Model A" ) model_b_dropdown = mo.ui.dropdown( options=[ "sentence-transformers/all-MiniLM-L6-v2", "thenlper/gte-small", "BAAI/bge-small-en-v1.5", "sentence-transformers/paraphrase-MiniLM-L6-v2", ], value="BAAI/bge-small-en-v1.5", label="Model B" ) reducer_dropdown = mo.ui.radio( options=["PCA", "TSNE"], value="PCA", label="Dimensionality reduction" ) k_slider = mo.ui.slider(3, 20, value=10, label="k for neighborhood agreement") mo.vstack([ mo.md("### Data & Models"), sample_toggle, text_area, mo.hstack([model_a_dropdown, model_b_dropdown]), mo.hstack([reducer_dropdown, k_slider]), ]) return ( k_slider, model_a_dropdown, model_b_dropdown, reducer_dropdown, text_area, ) @app.cell def _(io, pd, text_area): # Build dataframe from UI state. # If sample_toggle is True, parse the sample CSV from text_area; otherwise expect user-provided CSV. def _parse_csv_to_df(csv_text: str) -> pd.DataFrame: # Parse CSV robustly df = pd.read_csv(io.StringIO(csv_text)) # Basic schema validation expected_cols = {"Label", "Text"} if not expected_cols.issubset(set(df.columns)): raise ValueError("CSV must contain columns: Label, Text") # Ensure string types df["Label"] = df["Label"].astype(str) df["Text"] = df["Text"].astype(str) return df dataframe_input = _parse_csv_to_df(text_area.value) dataframe_input return (dataframe_input,) @app.cell def _(SentenceTransformer, lru_cache, mo, model_a_dropdown, model_b_dropdown): # Cache model loading to avoid repeated downloads. @lru_cache(maxsize=4) def load_model_cached(model_name: str) -> SentenceTransformer: # English: caches SentenceTransformer for responsiveness return SentenceTransformer(model_name) selected_model_a = model_a_dropdown.value selected_model_b = model_b_dropdown.value mo.hstack([ mo.md(f"**Model A:** `{selected_model_a}`"), mo.md(f"**Model B:** `{selected_model_b}`"), ]) return load_model_cached, selected_model_a, selected_model_b @app.cell def _( dataframe_input, load_model_cached, np, selected_model_a, selected_model_b, ): # Generate embeddings for both models on the same text order. texts_for_embedding = dataframe_input["Text"].tolist() def _embed_texts(model_name: str, texts: list[str]) -> np.ndarray: model = load_model_cached(model_name) return model.encode(texts, show_progress_bar=False, normalize_embeddings=True) embeddings_a = _embed_texts(selected_model_a, texts_for_embedding) embeddings_b = _embed_texts(selected_model_b, texts_for_embedding) # Show shapes as a quick check {"A_shape": embeddings_a.shape, "B_shape": embeddings_b.shape} return embeddings_a, embeddings_b @app.cell def _( PCA, TSNE, dataframe_input, embeddings_a, embeddings_b, np, pd, reducer_dropdown, ): # Reduce to 2D using PCA or TSNE. reducer_choice = reducer_dropdown.value def _reduce_2d(X: np.ndarray, method: str) -> np.ndarray: if method == "PCA": reducer = PCA(n_components=2, random_state=42) return reducer.fit_transform(X) elif method == "TSNE": # TSNE default perplexity 30 works fine for small demos; set random_state for reproducibility. reducer = TSNE(n_components=2, random_state=42, init="pca", learning_rate="auto") return reducer.fit_transform(X) else: raise ValueError("Unknown reducer") coords_a_2d = _reduce_2d(embeddings_a, reducer_choice) coords_b_2d = _reduce_2d(embeddings_b, reducer_choice) # Pack into tidy DataFrames for plotting plot_df_a = pd.DataFrame({ "x": coords_a_2d[:, 0], "y": coords_a_2d[:, 1], "Label": dataframe_input["Label"], "Text": dataframe_input["Text"], "Model": "A" }) plot_df_b = pd.DataFrame({ "x": coords_b_2d[:, 0], "y": coords_b_2d[:, 1], "Label": dataframe_input["Label"], "Text": dataframe_input["Text"], "Model": "B" }) plot_df_combined = pd.concat([plot_df_a, plot_df_b], ignore_index=True) plot_df_combined return ( coords_a_2d, coords_b_2d, plot_df_a, plot_df_b, plot_df_combined, reducer_choice, ) @app.cell def _(alt, plot_df_combined, reducer_choice): # Build linked charts: one for Model A and one for Model B. # English: We use consistent color mapping by Label across both charts. # Calculate consistent color domain label_domain = sorted(plot_df_combined["Label"].unique().tolist()) base = alt.Chart(plot_df_combined).mark_circle(size=80).encode( x=alt.X("x:Q", title=f"{reducer_choice} 1"), y=alt.Y("y:Q", title=f"{reducer_choice} 2"), color=alt.Color("Label:N", legend=alt.Legend(title="Label"), scale=alt.Scale(scheme="category10", domain=label_domain)), tooltip=["Model:N", "Label:N", "Text:N"] ).properties(width=350, height=300) chart_a = base.transform_filter(alt.datum.Model == "A").properties(title="Model A") chart_b = base.transform_filter(alt.datum.Model == "B").properties(title="Model B") alt.hconcat(chart_a, chart_b).resolve_scale(color="shared") return (label_domain,) @app.cell def _(NearestNeighbors, coords_a_2d, coords_b_2d, k_slider, mo, np, pd): # Compute neighborhood agreement: for each point, overlap between k-NN in A vs B (after 2D reduction). # English: Robust version that clips k to the valid range based on dataset size. # from sklearn.neighbors import NearestNeighbors # Dataset size n_points = int(coords_a_2d.shape[0]) # Guard: need at least 3 points to compute non-trivial neighbors (exclude self) mo.stop(n_points < 3, mo.md("Need at least **3** rows to compute neighborhood overlap.")) # Clip k so that kneighbors sees n_neighbors <= n_samples - 1 (exclude self) requested_k = int(k_slider.value) max_valid_k = max(1, n_points - 2) # ensures (k+1) <= (n_points - 1) effective_k = min(requested_k, max_valid_k) # Informative message if clipping happened k_info = mo.md( f"Using **k = {effective_k}** (requested {requested_k}, max valid {max_valid_k} for n={n_points})." ) if effective_k != requested_k else mo.md(f"Using **k = {effective_k}** for n={n_points}.") def _knn_indices(X2d: np.ndarray, k: int) -> np.ndarray: # English: use k+1 to include self in the neighbor list, then drop self (index 0) # n_neighbors must be strictly less than n_samples, so pass (k+1) <= (n_points - 1) nbrs = NearestNeighbors(n_neighbors=k + 1, metric="euclidean") nbrs.fit(X2d) indices = nbrs.kneighbors(return_distance=False) return indices[:, 1:] # drop self knn_a = _knn_indices(coords_a_2d, k=effective_k) knn_b = _knn_indices(coords_b_2d, k=effective_k) def _rowwise_overlap(idx_a: np.ndarray, idx_b: np.ndarray) -> np.ndarray: # English: compute per-point overlap fraction between two k-NN index sets overlaps = [] for a, b in zip(idx_a, idx_b): inter = len(set(a.tolist()).intersection(set(b.tolist()))) overlaps.append(inter / len(a)) return np.array(overlaps, dtype=float) overlap_scores = _rowwise_overlap(knn_a, knn_b) neighborhood_agreement_mean = float(np.mean(overlap_scores)) # Display small summary table (head) + info about effective k mo.vstack([ k_info, mo.ui.dataframe( pd.DataFrame({"Point": np.arange(len(overlap_scores)), f"Overlap@{effective_k}": overlap_scores}) ), mo.md(f"**Mean Overlap@{effective_k}:** {neighborhood_agreement_mean:.3f}") ]) return neighborhood_agreement_mean, overlap_scores @app.cell def _(coords_a_2d, coords_b_2d, dataframe_input, np, pd): # For each label, compute 2D centroids in A and B, then report distance between centroids. labels_series = dataframe_input["Label"] unique_labels = sorted(labels_series.unique().tolist()) def _centroids(coords: np.ndarray, labels: pd.Series) -> dict[str, np.ndarray]: out = {} for lab in unique_labels: pts = coords[labels.values == lab] out[lab] = np.mean(pts, axis=0) return out centroids_a = _centroids(coords_a_2d, labels_series) centroids_b = _centroids(coords_b_2d, labels_series) centroid_rows = [] for lab in unique_labels: ca = centroids_a[lab] cb = centroids_b[lab] dist = float(np.linalg.norm(ca - cb)) centroid_rows.append({"Label": lab, "CentroidShift(A_vs_B)": dist}) centroid_shift_df = pd.DataFrame(centroid_rows).sort_values("CentroidShift(A_vs_B)", ascending=False) centroid_shift_df return centroid_shift_df, labels_series @app.cell def _(embeddings_a, embeddings_b, np, pairwise_distances, pd): # Compute pairwise distance matrices in the original embedding spaces (A vs B), compare rank correlation (Spearman). # English: measures whether global structure is preserved similarly by the two models. from scipy.stats import spearmanr # Use cosine distances on normalized embeddings (already normalized earlier). dist_a = pairwise_distances(embeddings_a, metric="cosine") dist_b = pairwise_distances(embeddings_b, metric="cosine") # Vectorize upper triangles (avoid diagonal) triu_idx = np.triu_indices(dist_a.shape[0], k=1) vec_a = dist_a[triu_idx] vec_b = dist_b[triu_idx] rho, p_val = spearmanr(vec_a, vec_b) pd.DataFrame({"Spearman_rho":[float(rho)], "p_value":[float(p_val)]}) return p_val, rho @app.cell def _( NearestNeighbors, coords_a_2d, coords_b_2d, dataframe_input, labels_series, np, pd, ): def _label_density(coords: np.ndarray, labels: pd.Series, k: int = 5) -> pd.DataFrame: # English: need at least 3 points; with n=2, dropping self leaves 0 neighbors n_points = int(coords.shape[0]) if n_points < 3: # Return minimal summary with NaNs to signal insufficient neighbors return pd.DataFrame({"Label": labels.values, "LocalDensity": np.nan}) \ .groupby("Label", as_index=False) \ .agg(Size=("Label","count"), MeanLocalDensity=("LocalDensity","mean")) # English: choose a valid n_neighbors for sklearn kneighbors # We query (k_eff + 1) neighbors to include self, then drop self. # Constraint: n_neighbors <= n_points - 1 to satisfy sklearn's strict inequality. n_neighbors = min(k + 1, n_points - 1) k_eff = max(1, n_neighbors - 1) # English: effective neighbors after dropping self, at least 1 nbrs = NearestNeighbors(n_neighbors=n_neighbors, metric="euclidean").fit(coords) dists, _ = nbrs.kneighbors(return_distance=True) # English: drop self at index 0 and average first k_eff distances # If n_neighbors might be > k_eff+1 due to clipping, we still take only k_eff after removing self. local_density = dists[:, 1:1 + k_eff].mean(axis=1) df = pd.DataFrame({"Label": labels.values, "LocalDensity": local_density}) out = df.groupby("Label", as_index=False).agg( Size=("Label","count"), MeanLocalDensity=("LocalDensity","mean") ) return out # English: choose k safely based on dataset size (consistent with above guard) k_density = min(5, max(1, len(dataframe_input) - 2)) density_a = _label_density(coords_a_2d, labels_series, k=k_density) density_b = _label_density(coords_b_2d, labels_series, k=k_density) summary_labels = density_a.merge(density_b, on="Label", suffixes=("_A", "_B")) summary_labels return (summary_labels,) @app.cell def _( alt, centroid_shift_df, k_slider, label_domain, mo, neighborhood_agreement_mean, np, overlap_scores, p_val, pd, plot_df_a, plot_df_b, plot_df_combined, rho, summary_labels, ): # UI panel summarizing metrics and allowing CSV export of 2D coords. export_button = mo.ui.button("Export 2D coordinates (CSV)") summary_md = mo.md( f""" ### Summary - Neighborhood agreement (mean Overlap@{k_slider.value}): **{neighborhood_agreement_mean:.3f}** - Centroid shift (A vs B): min={centroid_shift_df['CentroidShift(A_vs_B)'].min():.3f}, max={centroid_shift_df['CentroidShift(A_vs_B)'].max():.3f} - Global structure similarity (Spearman on pairwise distances): **rho={float(rho):.3f}** (p={float(p_val):.2e}) """ ) # Prepare exported CSV export_df = plot_df_combined.copy() export_payload = export_df.to_csv(index=False) mo.vstack([ summary_md, mo.ui.tabs({ "Scatter A/B": mo.ui.altair_chart(alt.hconcat( alt.Chart(plot_df_a).mark_circle(size=80).encode( x="x:Q", y="y:Q", color=alt.Color("Label:N", scale=alt.Scale(domain=label_domain)), tooltip=["Label:N","Text:N"] ).properties(title="A", width=360, height=320), alt.Chart(plot_df_b).mark_circle(size=80).encode( x="x:Q", y="y:Q", color=alt.Color("Label:N", scale=alt.Scale(domain=label_domain)), tooltip=["Label:N","Text:N"] ).properties(title="B", width=360, height=320) )), "Overlap@k (head)": mo.ui.table( pd.DataFrame({"Point": np.arange(len(overlap_scores)), f"Overlap@{k_slider.value}": overlap_scores}) ), "Centroid shift": mo.ui.table(centroid_shift_df), "Label summary": mo.ui.table(summary_labels), }), export_button ]) return @app.cell def _(): return if __name__ == "__main__": app.run()