Files
fn_registry/python/functions/datascience/automatic_eda/chapters/geospatial.py
T
egutierrez 00cd5274bc feat(eda): capítulo GEOSPATIAL del AutomaticEDA (scatter geográfico + zona/país)
Capítulo nuevo chapters/geospatial.py (CHAPTER_VERSION 1.0.0). Cuando el dataset
tiene un par de coordenadas, dibuja un scatter geográfico en proyección
equirectangular (la escala respeta la latitud para no estirar la longitud) y
analiza la extensión: bounding box, centroide, span, conteo por zona/país,
hemisferios y una interpretación. Cuando NO hay coordenadas, build_geospatial
devuelve None y el capítulo se omite.

Sigue el contrato de capítulos (firma build_<id>(profile, ctx) -> Chapter|None,
lectura defensiva, nunca lanza) y el patrón de modelos/num_distr: delega el
cálculo a las primitivas puras del registry (detect_latlon_columns,
analyze_geo_extent, build_geo_scatter) y solo dibuja la figura matplotlib de
forma perezosa. Las coordenadas crudas llegan por ctx['geo_points'] o
ctx['raw_numeric'] (como modelos lee raw_numeric); sin ellas, degrada con un
bounding box aproximado de numeric.min/max y una nota honesta.

Anti-cortes: usa DataTable/KVTable/Figure/Markdown del modelo, que el paginador
parte sin cortar. Test self-contained con golden + 6 edges + anti-cut (nombres
largos + 2100 puntos en varias regiones renderizan a PDF y PPTX sin truncar).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:29:33 +02:00

478 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Geospatial chapter (GEOSPATIAL) for AutomaticEDA.
When the dataset carries a coordinate pair (latitude/longitude), this chapter
draws the points on a **geographic scatter** in an equirectangular projection
(scaled so degrees of longitude are not stretched at the data's latitude) and
analyses the **zone / country** the points fall in: bounding box, centroid,
geographic span, and a per-region count. When there is **no** coordinate pair the
chapter returns ``None`` — exactly the user requirement.
Detection and the heavy lifting are delegated to pure ``eda``-group registry
functions, never reimplemented here:
- ``detect_latlon_columns`` — finds the (lat, lon) column pair by name + value
range from the ``profile['columns']`` metadata.
- ``analyze_geo_extent`` — bbox, centroid, haversine span, per-region counts and
hemisphere from the raw coordinate arrays.
- ``build_geo_scatter`` — deterministically down-sampled points + bbox + the
aspect ratio for the equirectangular projection. This chapter only draws the
matplotlib figure from that prepared data (same split as ``num_distr`` does
with ``build_boxplot_stats``).
The raw coordinate arrays are **not** in a standard TableProfile (it stores only
per-column aggregates), so — exactly like ``modelos`` reads ``raw_numeric`` from
``ctx`` — this chapter looks for the coordinates in ``ctx`` (or ``profile``) and
degrades honestly when they are absent: it still detects the columns and shows an
approximate bounding box derived from the per-column ``numeric.min/max``, with a
note that the raw points are needed for the map.
ctx keys this chapter consumes (all optional):
geo_points : dict — ``{"lats": [...], "lons": [...]}`` raw coordinate arrays.
Used directly when present (forward-compatible with a calculation phase
that samples them from the table).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``geo_points`` is not, the detected lat/lon columns are read from it.
run_geo_llm : bool — when True, call ``ask_llm`` for a one-line narrative of
where the points concentrate (otherwise a derived note is used).
geo_llm_model : str — model id for the optional live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and never raises.
"""
from __future__ import annotations
import math
from .. import model
# Pure registry functions (group ``eda``) delegated to. Imported defensively so
# the chapter stays importable (degrading gracefully) if one is unavailable.
try:
from datascience.detect_latlon_columns import detect_latlon_columns
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
detect_latlon_columns = None # type: ignore[assignment]
try:
from datascience.analyze_geo_extent import analyze_geo_extent
except Exception: # noqa: BLE001
analyze_geo_extent = None # type: ignore[assignment]
try:
from datascience.build_geo_scatter import build_geo_scatter
except Exception: # noqa: BLE001
build_geo_scatter = None # type: ignore[assignment]
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "geospatial"
CHAPTER_TITLE = "Análisis geoespacial"
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the other chapters' defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 4) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_coord(value, decimals: int = 4) -> str:
"""Format a coordinate degree value, defensively."""
try:
return f"{float(value):.{decimals}f}°"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_km(value) -> str:
if value is None:
return ""
try:
v = float(value)
except (TypeError, ValueError):
return model._safe_str(value)
if v >= 100:
return f"{v:,.0f} km".replace(",", ".")
return f"{v:.1f} km"
def _is_dict(v) -> bool:
return isinstance(v, dict)
def _clean_floats(seq) -> list:
"""Return a list of floats from an arbitrary sequence (drop None/NaN)."""
out = []
if not isinstance(seq, (list, tuple)):
return out
for v in seq:
try:
f = float(v)
except (TypeError, ValueError):
out.append(None)
continue
out.append(f if f == f else None) # NaN -> None
return out
# --------------------------------------------------------------------------- #
# Resolve the (lat, lon) columns and the raw coordinate arrays.
# --------------------------------------------------------------------------- #
def _detect_columns(profile: dict) -> dict:
"""Detect the lat/lon column pair from the profile metadata, or {}."""
cols = profile.get("columns")
if not isinstance(cols, list) or not cols or detect_latlon_columns is None:
return {}
try:
det = detect_latlon_columns(cols)
except Exception: # noqa: BLE001 — never break the chapter.
return {}
return det if _is_dict(det) else {}
def _resolve_coords(profile: dict, ctx: dict, detected: dict):
"""Return (lats, lons, source_label).
Order: ctx/profile['geo_points'] (explicit arrays) → ctx/profile
['raw_numeric'] keyed by the detected lat/lon column names → (None, None).
"""
gp = ctx.get("geo_points") or profile.get("geo_points")
if _is_dict(gp):
lats = gp.get("lats")
if lats is None:
lats = gp.get("lat")
lons = gp.get("lons")
if lons is None:
lons = gp.get("lon")
if lats and lons:
return list(lats), list(lons), "geo_points"
lat_col = (detected or {}).get("lat_col")
lon_col = (detected or {}).get("lon_col")
if lat_col and lon_col:
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw):
lats = raw.get(lat_col)
lons = raw.get(lon_col)
if lats and lons:
return list(lats), list(lons), "raw_numeric"
return None, None, "none"
def _column_by_name(profile: dict, name):
if not name:
return None
for col in profile.get("columns") or []:
if isinstance(col, dict) and col.get("name") == name:
return col
return None
def _bbox_from_profile(profile: dict, detected: dict):
"""Approximate bbox from the per-column numeric.min/max (no raw points)."""
lat_c = _column_by_name(profile, (detected or {}).get("lat_col"))
lon_c = _column_by_name(profile, (detected or {}).get("lon_col"))
lat_n = lat_c.get("numeric") if _is_dict(lat_c) else None
lon_n = lon_c.get("numeric") if _is_dict(lon_c) else None
if not _is_dict(lat_n) or not _is_dict(lon_n):
return None
try:
return {
"lat_min": float(lat_n.get("min")),
"lat_max": float(lat_n.get("max")),
"lon_min": float(lon_n.get("min")),
"lon_max": float(lon_n.get("max")),
}
except (TypeError, ValueError):
return None
# --------------------------------------------------------------------------- #
# Figure builder (lazy: matplotlib only imported when the renderer draws it).
# --------------------------------------------------------------------------- #
def _make_geo_scatter(scatter: dict, lat_col: str, lon_col: str):
"""Return a zero-arg callable drawing the geographic scatter, or None."""
points = scatter.get("points") or []
if not points:
return None
bbox = scatter.get("bbox") if _is_dict(scatter.get("bbox")) else {}
aspect = scatter.get("aspect") or 1.0
pad = scatter.get("pad") if _is_dict(scatter.get("pad")) else {}
n_total = scatter.get("n_total")
n_shown = scatter.get("n_shown")
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
xs = [p[0] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
ys = [p[1] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
fig, ax = plt.subplots(figsize=(6.6, 5.0))
# More points -> smaller markers + lower alpha so dense clouds read as
# density without saturating the page with ink (Tufte).
n = max(len(xs), 1)
size = 18 if n <= 200 else (8 if n <= 1000 else 4)
alpha = 0.75 if n <= 200 else (0.5 if n <= 1000 else 0.35)
ax.scatter(xs, ys, s=size, c="#2a6f97", alpha=alpha, linewidths=0,
zorder=3)
# Bounding box rectangle for orientation.
if bbox:
try:
lo_x, hi_x = float(bbox["lon_min"]), float(bbox["lon_max"])
lo_y, hi_y = float(bbox["lat_min"]), float(bbox["lat_max"])
ax.plot([lo_x, hi_x, hi_x, lo_x, lo_x],
[lo_y, lo_y, hi_y, hi_y, lo_y],
color="#e15759", linewidth=1.0, linestyle="--",
alpha=0.8, zorder=4, label="Bounding box")
px = float(pad.get("lon", 0.0) or 0.0)
py = float(pad.get("lat", 0.0) or 0.0)
ax.set_xlim(lo_x - px, hi_x + px)
ax.set_ylim(lo_y - py, hi_y + py)
except (TypeError, ValueError, KeyError):
pass
# Equirectangular: scale Y/X so longitude is not stretched at this
# latitude (integridad de proyección, Tufte). aspect = 1/cos(lat).
try:
ax.set_aspect(float(aspect))
except (TypeError, ValueError):
pass
ax.set_xlabel(f"Longitud ({lon_col})", fontsize=8)
ax.set_ylabel(f"Latitud ({lat_col})", fontsize=8)
ax.tick_params(labelsize=7)
ax.grid(color="#e6e6e6", linewidth=0.5, zorder=0)
title = "Distribución geográfica de las coordenadas"
if n_shown is not None and n_total is not None and n_shown < n_total:
title += f"\n(mostrando {n_shown:,} de {n_total:,} puntos)".replace(",", ".")
ax.set_title(title, fontsize=10)
ax.legend(loc="best", fontsize=7, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders.
# --------------------------------------------------------------------------- #
def _intro_block(detected: dict, lat_col: str, lon_col: str) -> list:
conf = (detected or {}).get("confidence")
reason = model._safe_str((detected or {}).get("reason"))
conf_txt = ""
if conf is not None:
try:
conf_txt = f" (confianza {float(conf) * 100:.0f}%)"
except (TypeError, ValueError):
conf_txt = ""
text = (
"Este dataset contiene **coordenadas geográficas**: se identificó el par "
f"**latitud = «{lat_col}»** y **longitud = «{lon_col}»**{conf_txt}. La "
"detección combina el nombre de la columna y el rango de sus valores "
"(latitud en [90, 90], longitud en [180, 180])."
)
if reason:
text += f"\n\n*Criterio de detección:* {reason}."
return [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=text)]
def _extent_blocks(extent: dict) -> list:
"""KVTable with bbox/centroid/span + DataTable with the per-region counts."""
if not _is_dict(extent) or not extent.get("n_points"):
return []
blocks = []
bbox = extent.get("bbox") if _is_dict(extent.get("bbox")) else {}
centroid = extent.get("centroid") if _is_dict(extent.get("centroid")) else {}
hemi = extent.get("hemisphere") if _is_dict(extent.get("hemisphere")) else {}
rows = [("Puntos con coordenadas", _fmt_num(extent.get("n_points")))]
if bbox:
rows.append(("Latitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lat_min'))} a "
f"{_fmt_coord(bbox.get('lat_max'))}"))
rows.append(("Longitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lon_min'))} a "
f"{_fmt_coord(bbox.get('lon_max'))}"))
if centroid:
rows.append(("Centroide",
f"{_fmt_coord(centroid.get('lat'))}, "
f"{_fmt_coord(centroid.get('lon'))}"))
if extent.get("span_km") is not None:
rows.append(("Extensión (diagonal)", _fmt_km(extent.get("span_km"))))
if hemi:
n, s = hemi.get("north"), hemi.get("south")
e, w = hemi.get("east"), hemi.get("west")
rows.append(("Hemisferios",
f"N {_fmt_num(n)} / S {_fmt_num(s)} · "
f"E {_fmt_num(e)} / O {_fmt_num(w)}"))
blocks.append(model.KVTable(rows=rows, title="Extensión geográfica"))
by_region = extent.get("by_region")
if isinstance(by_region, list) and by_region:
total = sum(r.get("count", 0) for r in by_region if _is_dict(r)) or 0
rrows = []
for r in by_region:
if not _is_dict(r):
continue
cnt = r.get("count", 0)
pct = (cnt / total) if total else None
pct_txt = f"{pct * 100:.1f}%" if pct is not None else ""
rrows.append([model._safe_str(r.get("region")), _fmt_num(cnt),
pct_txt])
if rrows:
blocks.append(model.DataTable(
header=["Zona / país", "Puntos", "% del total"], rows=rrows,
title="Distribución por zona",
note="Asignación aproximada por bounding box de cada región "
"(no es reverse-geocoding exacto de fronteras)."))
return blocks
def _narrative_block(profile: dict, ctx: dict, extent: dict) -> list:
"""A one-line narrative of where the points concentrate.
Uses the derived ``note`` from analyze_geo_extent by default; optionally
calls an LLM (ctx['run_geo_llm']) for a richer one-liner.
"""
note = model._safe_str((extent or {}).get("note"))
if ctx.get("run_geo_llm"):
by_region = (extent or {}).get("by_region") or []
bbox = (extent or {}).get("bbox") or {}
try:
from core.ask_llm import ask_llm
prompt = (
"Eres un analista de datos. En UNA frase en español, describe "
"dónde se concentran geográficamente estos puntos. Sé concreto "
"y no inventes precisión que los datos no tienen.\n"
f"Conteo por zona: {by_region}\nBounding box: {bbox}."
)
out = ask_llm(prompt,
model=ctx.get("geo_llm_model",
"claude-haiku-4-5-20251001"),
echo=False)
if out and isinstance(out, str) and out.strip():
note = out.strip()
except Exception: # noqa: BLE001 — degrade to the derived note.
pass
if not note:
return []
return [model.Markdown(text=f"**Interpretación.** {note}")]
def _no_points_block(profile: dict, detected: dict) -> list:
"""Degrade honestly when the raw coordinate arrays are not available."""
blocks = []
bbox = _bbox_from_profile(profile, detected)
if bbox:
rows = [
("Latitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lat_min'))} a "
f"{_fmt_coord(bbox.get('lat_max'))}"),
("Longitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lon_min'))} a "
f"{_fmt_coord(bbox.get('lon_max'))}"),
]
blocks.append(model.KVTable(
rows=rows, title="Extensión geográfica (aproximada)"))
blocks.append(model.Note(
"No se incluyeron las coordenadas crudas en el contexto, por lo que el "
"mapa y el análisis por zona no se han dibujado. El bounding box "
"mostrado se deriva de los mínimos y máximos por columna. Para el "
"scatter geográfico completo, pasa los arrays en "
"ctx['geo_points'] = {'lats': [...], 'lons': [...]} o las columnas en "
"ctx['raw_numeric']."))
return blocks
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_geospatial(profile: dict, ctx: dict):
"""Build the GEOSPATIAL Chapter, or None if the dataset has no coordinates.
Args:
profile: the ``eda`` group TableProfile dict.
ctx: presentation context; may carry ``geo_points``/``raw_numeric`` with
the raw coordinate arrays and the ``run_geo_llm`` flag.
Returns:
A ``model.Chapter`` with the geographic scatter + zone/country analysis,
or ``None`` when no latitude/longitude column pair is detected.
"""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
detected = _detect_columns(profile)
lats, lons, source = _resolve_coords(profile, ctx, detected)
has_detection = bool((detected or {}).get("lat_col") and
(detected or {}).get("lon_col"))
has_points = bool(lats and lons)
if not has_detection and not has_points:
return None # chapter does not apply: no coordinates in this dataset.
# Labels for axes / intro. When only raw arrays were given (no detection),
# fall back to generic names.
lat_col = (detected or {}).get("lat_col") or "lat"
lon_col = (detected or {}).get("lon_col") or "lon"
blocks = _intro_block(detected, lat_col, lon_col)
if has_points:
clean_lats = _clean_floats(lats)
clean_lons = _clean_floats(lons)
# Zone / country analysis.
extent = {}
if analyze_geo_extent is not None:
try:
extent = analyze_geo_extent(clean_lats, clean_lons) or {}
except Exception: # noqa: BLE001
extent = {}
# The geographic scatter figure (its own page/slide).
scatter = {}
if build_geo_scatter is not None:
try:
scatter = build_geo_scatter(clean_lats, clean_lons) or {}
except Exception: # noqa: BLE001
scatter = {}
maker = _make_geo_scatter(scatter, lat_col, lon_col) if scatter else None
if maker is not None:
blocks.append(model.Figure(
make=maker,
caption="Cada punto es una observación situada por sus "
"coordenadas; el recuadro rojo es el bounding box. La "
"escala respeta la latitud (proyección equirectangular)."))
else:
blocks.append(model.Note(
"No se pudo construir el scatter geográfico a partir de las "
"coordenadas proporcionadas."))
blocks += _extent_blocks(extent)
blocks += _narrative_block(profile, ctx, extent)
else:
# Columns detected but no raw points available — degrade honestly.
blocks += _no_points_block(profile, detected)
if not blocks:
return None
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)