"""Geospatial chapter (GEOSPATIAL) for AutomaticEDA. When the dataset carries a coordinate pair (latitude/longitude), this chapter draws the points on a **geographic scatter** in an equirectangular projection (scaled so degrees of longitude are not stretched at the data's latitude) and analyses the **zone / country** the points fall in: bounding box, centroid, geographic span, and a per-region count. When there is **no** coordinate pair the chapter returns ``None`` — exactly the user requirement. Detection and the heavy lifting are delegated to pure ``eda``-group registry functions, never reimplemented here: - ``detect_latlon_columns`` — finds the (lat, lon) column pair by name + value range from the ``profile['columns']`` metadata. - ``analyze_geo_extent`` — bbox, centroid, haversine span, per-region counts and hemisphere from the raw coordinate arrays. - ``build_geo_scatter`` — deterministically down-sampled points + bbox + the aspect ratio for the equirectangular projection. This chapter only draws the matplotlib figure from that prepared data (same split as ``num_distr`` does with ``build_boxplot_stats``). The raw coordinate arrays are **not** in a standard TableProfile (it stores only per-column aggregates), so — exactly like ``modelos`` reads ``raw_numeric`` from ``ctx`` — this chapter looks for the coordinates in ``ctx`` (or ``profile``) and degrades honestly when they are absent: it still detects the columns and shows an approximate bounding box derived from the per-column ``numeric.min/max``, with a note that the raw points are needed for the map. ctx keys this chapter consumes (all optional): geo_points : dict — ``{"lats": [...], "lons": [...]}`` raw coordinate arrays. Used directly when present (forward-compatible with a calculation phase that samples them from the table). raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present and ``geo_points`` is not, the detected lat/lon columns are read from it. run_geo_llm : bool — when True, call ``ask_llm`` for a one-line narrative of where the points concentrate (otherwise a derived note is used). geo_llm_model : str — model id for the optional live LLM call. Contract: build_(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z". Reads everything defensively (``.get``) and never raises. """ from __future__ import annotations import math from .. import model # Pure registry functions (group ``eda``) delegated to. Imported defensively so # the chapter stays importable (degrading gracefully) if one is unavailable. try: from datascience.detect_latlon_columns import detect_latlon_columns except Exception: # noqa: BLE001 — keep the chapter importable no matter what. detect_latlon_columns = None # type: ignore[assignment] try: from datascience.analyze_geo_extent import analyze_geo_extent except Exception: # noqa: BLE001 analyze_geo_extent = None # type: ignore[assignment] try: from datascience.build_geo_scatter import build_geo_scatter except Exception: # noqa: BLE001 build_geo_scatter = None # type: ignore[assignment] CHAPTER_VERSION = "1.0.0" CHAPTER_ID = "geospatial" CHAPTER_TITLE = "Análisis geoespacial" # --------------------------------------------------------------------------- # # Formatting helpers (mirror the other chapters' defensive style). # --------------------------------------------------------------------------- # def _fmt_num(value, decimals: int = 4) -> str: if value is None: return "—" if isinstance(value, bool): return "sí" if value else "no" if isinstance(value, int): return f"{value:,}".replace(",", ".") if isinstance(value, float): if value != value: # NaN return "NaN" if value in (float("inf"), float("-inf")): return str(value) text = f"{value:.{decimals}f}".rstrip("0").rstrip(".") return text if text else "0" return model._safe_str(value) def _fmt_coord(value, decimals: int = 4) -> str: """Format a coordinate degree value, defensively.""" try: return f"{float(value):.{decimals}f}°" except (TypeError, ValueError): return model._safe_str(value) def _fmt_km(value) -> str: if value is None: return "—" try: v = float(value) except (TypeError, ValueError): return model._safe_str(value) if v >= 100: return f"{v:,.0f} km".replace(",", ".") return f"{v:.1f} km" def _is_dict(v) -> bool: return isinstance(v, dict) def _clean_floats(seq) -> list: """Return a list of floats from an arbitrary sequence (drop None/NaN).""" out = [] if not isinstance(seq, (list, tuple)): return out for v in seq: try: f = float(v) except (TypeError, ValueError): out.append(None) continue out.append(f if f == f else None) # NaN -> None return out # --------------------------------------------------------------------------- # # Resolve the (lat, lon) columns and the raw coordinate arrays. # --------------------------------------------------------------------------- # def _detect_columns(profile: dict) -> dict: """Detect the lat/lon column pair from the profile metadata, or {}.""" cols = profile.get("columns") if not isinstance(cols, list) or not cols or detect_latlon_columns is None: return {} try: det = detect_latlon_columns(cols) except Exception: # noqa: BLE001 — never break the chapter. return {} return det if _is_dict(det) else {} def _resolve_coords(profile: dict, ctx: dict, detected: dict): """Return (lats, lons, source_label). Order: ctx/profile['geo_points'] (explicit arrays) → ctx/profile ['raw_numeric'] keyed by the detected lat/lon column names → (None, None). """ gp = ctx.get("geo_points") or profile.get("geo_points") if _is_dict(gp): lats = gp.get("lats") if lats is None: lats = gp.get("lat") lons = gp.get("lons") if lons is None: lons = gp.get("lon") if lats and lons: return list(lats), list(lons), "geo_points" lat_col = (detected or {}).get("lat_col") lon_col = (detected or {}).get("lon_col") if lat_col and lon_col: raw = ctx.get("raw_numeric") or profile.get("raw_numeric") if _is_dict(raw): lats = raw.get(lat_col) lons = raw.get(lon_col) if lats and lons: return list(lats), list(lons), "raw_numeric" return None, None, "none" def _column_by_name(profile: dict, name): if not name: return None for col in profile.get("columns") or []: if isinstance(col, dict) and col.get("name") == name: return col return None def _bbox_from_profile(profile: dict, detected: dict): """Approximate bbox from the per-column numeric.min/max (no raw points).""" lat_c = _column_by_name(profile, (detected or {}).get("lat_col")) lon_c = _column_by_name(profile, (detected or {}).get("lon_col")) lat_n = lat_c.get("numeric") if _is_dict(lat_c) else None lon_n = lon_c.get("numeric") if _is_dict(lon_c) else None if not _is_dict(lat_n) or not _is_dict(lon_n): return None try: return { "lat_min": float(lat_n.get("min")), "lat_max": float(lat_n.get("max")), "lon_min": float(lon_n.get("min")), "lon_max": float(lon_n.get("max")), } except (TypeError, ValueError): return None # --------------------------------------------------------------------------- # # Figure builder (lazy: matplotlib only imported when the renderer draws it). # --------------------------------------------------------------------------- # def _make_geo_scatter(scatter: dict, lat_col: str, lon_col: str): """Return a zero-arg callable drawing the geographic scatter, or None.""" points = scatter.get("points") or [] if not points: return None bbox = scatter.get("bbox") if _is_dict(scatter.get("bbox")) else {} aspect = scatter.get("aspect") or 1.0 pad = scatter.get("pad") if _is_dict(scatter.get("pad")) else {} n_total = scatter.get("n_total") n_shown = scatter.get("n_shown") def _draw(): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt xs = [p[0] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2] ys = [p[1] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2] fig, ax = plt.subplots(figsize=(6.6, 5.0)) # More points -> smaller markers + lower alpha so dense clouds read as # density without saturating the page with ink (Tufte). n = max(len(xs), 1) size = 18 if n <= 200 else (8 if n <= 1000 else 4) alpha = 0.75 if n <= 200 else (0.5 if n <= 1000 else 0.35) ax.scatter(xs, ys, s=size, c="#2a6f97", alpha=alpha, linewidths=0, zorder=3) # Bounding box rectangle for orientation. if bbox: try: lo_x, hi_x = float(bbox["lon_min"]), float(bbox["lon_max"]) lo_y, hi_y = float(bbox["lat_min"]), float(bbox["lat_max"]) ax.plot([lo_x, hi_x, hi_x, lo_x, lo_x], [lo_y, lo_y, hi_y, hi_y, lo_y], color="#e15759", linewidth=1.0, linestyle="--", alpha=0.8, zorder=4, label="Bounding box") px = float(pad.get("lon", 0.0) or 0.0) py = float(pad.get("lat", 0.0) or 0.0) ax.set_xlim(lo_x - px, hi_x + px) ax.set_ylim(lo_y - py, hi_y + py) except (TypeError, ValueError, KeyError): pass # Equirectangular: scale Y/X so longitude is not stretched at this # latitude (integridad de proyección, Tufte). aspect = 1/cos(lat). try: ax.set_aspect(float(aspect)) except (TypeError, ValueError): pass ax.set_xlabel(f"Longitud ({lon_col})", fontsize=8) ax.set_ylabel(f"Latitud ({lat_col})", fontsize=8) ax.tick_params(labelsize=7) ax.grid(color="#e6e6e6", linewidth=0.5, zorder=0) title = "Distribución geográfica de las coordenadas" if n_shown is not None and n_total is not None and n_shown < n_total: title += f"\n(mostrando {n_shown:,} de {n_total:,} puntos)".replace(",", ".") ax.set_title(title, fontsize=10) ax.legend(loc="best", fontsize=7, frameon=True, framealpha=0.9) fig.tight_layout() return fig return _draw # --------------------------------------------------------------------------- # # Section builders. # --------------------------------------------------------------------------- # def _intro_block(detected: dict, lat_col: str, lon_col: str) -> list: conf = (detected or {}).get("confidence") reason = model._safe_str((detected or {}).get("reason")) conf_txt = "" if conf is not None: try: conf_txt = f" (confianza {float(conf) * 100:.0f}%)" except (TypeError, ValueError): conf_txt = "" text = ( "Este dataset contiene **coordenadas geográficas**: se identificó el par " f"**latitud = «{lat_col}»** y **longitud = «{lon_col}»**{conf_txt}. La " "detección combina el nombre de la columna y el rango de sus valores " "(latitud en [−90, 90], longitud en [−180, 180])." ) if reason: text += f"\n\n*Criterio de detección:* {reason}." return [model.Heading(text=CHAPTER_TITLE, level=1), model.Markdown(text=text)] def _extent_blocks(extent: dict) -> list: """KVTable with bbox/centroid/span + DataTable with the per-region counts.""" if not _is_dict(extent) or not extent.get("n_points"): return [] blocks = [] bbox = extent.get("bbox") if _is_dict(extent.get("bbox")) else {} centroid = extent.get("centroid") if _is_dict(extent.get("centroid")) else {} hemi = extent.get("hemisphere") if _is_dict(extent.get("hemisphere")) else {} rows = [("Puntos con coordenadas", _fmt_num(extent.get("n_points")))] if bbox: rows.append(("Latitud (mín. / máx.)", f"{_fmt_coord(bbox.get('lat_min'))} a " f"{_fmt_coord(bbox.get('lat_max'))}")) rows.append(("Longitud (mín. / máx.)", f"{_fmt_coord(bbox.get('lon_min'))} a " f"{_fmt_coord(bbox.get('lon_max'))}")) if centroid: rows.append(("Centroide", f"{_fmt_coord(centroid.get('lat'))}, " f"{_fmt_coord(centroid.get('lon'))}")) if extent.get("span_km") is not None: rows.append(("Extensión (diagonal)", _fmt_km(extent.get("span_km")))) if hemi: n, s = hemi.get("north"), hemi.get("south") e, w = hemi.get("east"), hemi.get("west") rows.append(("Hemisferios", f"N {_fmt_num(n)} / S {_fmt_num(s)} · " f"E {_fmt_num(e)} / O {_fmt_num(w)}")) blocks.append(model.KVTable(rows=rows, title="Extensión geográfica")) by_region = extent.get("by_region") if isinstance(by_region, list) and by_region: total = sum(r.get("count", 0) for r in by_region if _is_dict(r)) or 0 rrows = [] for r in by_region: if not _is_dict(r): continue cnt = r.get("count", 0) pct = (cnt / total) if total else None pct_txt = f"{pct * 100:.1f}%" if pct is not None else "—" rrows.append([model._safe_str(r.get("region")), _fmt_num(cnt), pct_txt]) if rrows: blocks.append(model.DataTable( header=["Zona / país", "Puntos", "% del total"], rows=rrows, title="Distribución por zona", note="Asignación aproximada por bounding box de cada región " "(no es reverse-geocoding exacto de fronteras).")) return blocks def _narrative_block(profile: dict, ctx: dict, extent: dict) -> list: """A one-line narrative of where the points concentrate. Uses the derived ``note`` from analyze_geo_extent by default; optionally calls an LLM (ctx['run_geo_llm']) for a richer one-liner. """ note = model._safe_str((extent or {}).get("note")) if ctx.get("run_geo_llm"): by_region = (extent or {}).get("by_region") or [] bbox = (extent or {}).get("bbox") or {} try: from core.ask_llm import ask_llm prompt = ( "Eres un analista de datos. En UNA frase en español, describe " "dónde se concentran geográficamente estos puntos. Sé concreto " "y no inventes precisión que los datos no tienen.\n" f"Conteo por zona: {by_region}\nBounding box: {bbox}." ) out = ask_llm(prompt, model=ctx.get("geo_llm_model", "claude-haiku-4-5-20251001"), echo=False) if out and isinstance(out, str) and out.strip(): note = out.strip() except Exception: # noqa: BLE001 — degrade to the derived note. pass if not note: return [] return [model.Markdown(text=f"**Interpretación.** {note}")] def _no_points_block(profile: dict, detected: dict) -> list: """Degrade honestly when the raw coordinate arrays are not available.""" blocks = [] bbox = _bbox_from_profile(profile, detected) if bbox: rows = [ ("Latitud (mín. / máx.)", f"{_fmt_coord(bbox.get('lat_min'))} a " f"{_fmt_coord(bbox.get('lat_max'))}"), ("Longitud (mín. / máx.)", f"{_fmt_coord(bbox.get('lon_min'))} a " f"{_fmt_coord(bbox.get('lon_max'))}"), ] blocks.append(model.KVTable( rows=rows, title="Extensión geográfica (aproximada)")) blocks.append(model.Note( "No se incluyeron las coordenadas crudas en el contexto, por lo que el " "mapa y el análisis por zona no se han dibujado. El bounding box " "mostrado se deriva de los mínimos y máximos por columna. Para el " "scatter geográfico completo, pasa los arrays en " "ctx['geo_points'] = {'lats': [...], 'lons': [...]} o las columnas en " "ctx['raw_numeric'].")) return blocks # --------------------------------------------------------------------------- # # Entry point. # --------------------------------------------------------------------------- # def build_geospatial(profile: dict, ctx: dict): """Build the GEOSPATIAL Chapter, or None if the dataset has no coordinates. Args: profile: the ``eda`` group TableProfile dict. ctx: presentation context; may carry ``geo_points``/``raw_numeric`` with the raw coordinate arrays and the ``run_geo_llm`` flag. Returns: A ``model.Chapter`` with the geographic scatter + zone/country analysis, or ``None`` when no latitude/longitude column pair is detected. """ profile = profile or {} ctx = ctx or {} if not isinstance(profile, dict): return None detected = _detect_columns(profile) lats, lons, source = _resolve_coords(profile, ctx, detected) has_detection = bool((detected or {}).get("lat_col") and (detected or {}).get("lon_col")) has_points = bool(lats and lons) if not has_detection and not has_points: return None # chapter does not apply: no coordinates in this dataset. # Labels for axes / intro. When only raw arrays were given (no detection), # fall back to generic names. lat_col = (detected or {}).get("lat_col") or "lat" lon_col = (detected or {}).get("lon_col") or "lon" blocks = _intro_block(detected, lat_col, lon_col) if has_points: clean_lats = _clean_floats(lats) clean_lons = _clean_floats(lons) # Zone / country analysis. extent = {} if analyze_geo_extent is not None: try: extent = analyze_geo_extent(clean_lats, clean_lons) or {} except Exception: # noqa: BLE001 extent = {} # The geographic scatter figure (its own page/slide). scatter = {} if build_geo_scatter is not None: try: scatter = build_geo_scatter(clean_lats, clean_lons) or {} except Exception: # noqa: BLE001 scatter = {} maker = _make_geo_scatter(scatter, lat_col, lon_col) if scatter else None if maker is not None: blocks.append(model.Figure( make=maker, caption="Cada punto es una observación situada por sus " "coordenadas; el recuadro rojo es el bounding box. La " "escala respeta la latitud (proyección equirectangular).")) else: blocks.append(model.Note( "No se pudo construir el scatter geográfico a partir de las " "coordenadas proporcionadas.")) blocks += _extent_blocks(extent) blocks += _narrative_block(profile, ctx, extent) else: # Columns detected but no raw points available — degrade honestly. blocks += _no_points_block(profile, detected) if not blocks: return None return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE, version=CHAPTER_VERSION, blocks=blocks)