00cd5274bc
Capítulo nuevo chapters/geospatial.py (CHAPTER_VERSION 1.0.0). Cuando el dataset tiene un par de coordenadas, dibuja un scatter geográfico en proyección equirectangular (la escala respeta la latitud para no estirar la longitud) y analiza la extensión: bounding box, centroide, span, conteo por zona/país, hemisferios y una interpretación. Cuando NO hay coordenadas, build_geospatial devuelve None y el capítulo se omite. Sigue el contrato de capítulos (firma build_<id>(profile, ctx) -> Chapter|None, lectura defensiva, nunca lanza) y el patrón de modelos/num_distr: delega el cálculo a las primitivas puras del registry (detect_latlon_columns, analyze_geo_extent, build_geo_scatter) y solo dibuja la figura matplotlib de forma perezosa. Las coordenadas crudas llegan por ctx['geo_points'] o ctx['raw_numeric'] (como modelos lee raw_numeric); sin ellas, degrada con un bounding box aproximado de numeric.min/max y una nota honesta. Anti-cortes: usa DataTable/KVTable/Figure/Markdown del modelo, que el paginador parte sin cortar. Test self-contained con golden + 6 edges + anti-cut (nombres largos + 2100 puntos en varias regiones renderizan a PDF y PPTX sin truncar). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
478 lines
19 KiB
Python
478 lines
19 KiB
Python
"""Geospatial chapter (GEOSPATIAL) for AutomaticEDA.
|
||
|
||
When the dataset carries a coordinate pair (latitude/longitude), this chapter
|
||
draws the points on a **geographic scatter** in an equirectangular projection
|
||
(scaled so degrees of longitude are not stretched at the data's latitude) and
|
||
analyses the **zone / country** the points fall in: bounding box, centroid,
|
||
geographic span, and a per-region count. When there is **no** coordinate pair the
|
||
chapter returns ``None`` — exactly the user requirement.
|
||
|
||
Detection and the heavy lifting are delegated to pure ``eda``-group registry
|
||
functions, never reimplemented here:
|
||
|
||
- ``detect_latlon_columns`` — finds the (lat, lon) column pair by name + value
|
||
range from the ``profile['columns']`` metadata.
|
||
- ``analyze_geo_extent`` — bbox, centroid, haversine span, per-region counts and
|
||
hemisphere from the raw coordinate arrays.
|
||
- ``build_geo_scatter`` — deterministically down-sampled points + bbox + the
|
||
aspect ratio for the equirectangular projection. This chapter only draws the
|
||
matplotlib figure from that prepared data (same split as ``num_distr`` does
|
||
with ``build_boxplot_stats``).
|
||
|
||
The raw coordinate arrays are **not** in a standard TableProfile (it stores only
|
||
per-column aggregates), so — exactly like ``modelos`` reads ``raw_numeric`` from
|
||
``ctx`` — this chapter looks for the coordinates in ``ctx`` (or ``profile``) and
|
||
degrades honestly when they are absent: it still detects the columns and shows an
|
||
approximate bounding box derived from the per-column ``numeric.min/max``, with a
|
||
note that the raw points are needed for the map.
|
||
|
||
ctx keys this chapter consumes (all optional):
|
||
geo_points : dict — ``{"lats": [...], "lons": [...]}`` raw coordinate arrays.
|
||
Used directly when present (forward-compatible with a calculation phase
|
||
that samples them from the table).
|
||
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
|
||
and ``geo_points`` is not, the detected lat/lon columns are read from it.
|
||
run_geo_llm : bool — when True, call ``ask_llm`` for a one-line narrative of
|
||
where the points concentrate (otherwise a derived note is used).
|
||
geo_llm_model : str — model id for the optional live LLM call.
|
||
|
||
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
|
||
Reads everything defensively (``.get``) and never raises.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
|
||
from .. import model
|
||
|
||
# Pure registry functions (group ``eda``) delegated to. Imported defensively so
|
||
# the chapter stays importable (degrading gracefully) if one is unavailable.
|
||
try:
|
||
from datascience.detect_latlon_columns import detect_latlon_columns
|
||
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
|
||
detect_latlon_columns = None # type: ignore[assignment]
|
||
try:
|
||
from datascience.analyze_geo_extent import analyze_geo_extent
|
||
except Exception: # noqa: BLE001
|
||
analyze_geo_extent = None # type: ignore[assignment]
|
||
try:
|
||
from datascience.build_geo_scatter import build_geo_scatter
|
||
except Exception: # noqa: BLE001
|
||
build_geo_scatter = None # type: ignore[assignment]
|
||
|
||
CHAPTER_VERSION = "1.0.0"
|
||
CHAPTER_ID = "geospatial"
|
||
CHAPTER_TITLE = "Análisis geoespacial"
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Formatting helpers (mirror the other chapters' defensive style).
|
||
# --------------------------------------------------------------------------- #
|
||
def _fmt_num(value, decimals: int = 4) -> str:
|
||
if value is None:
|
||
return "—"
|
||
if isinstance(value, bool):
|
||
return "sí" if value else "no"
|
||
if isinstance(value, int):
|
||
return f"{value:,}".replace(",", ".")
|
||
if isinstance(value, float):
|
||
if value != value: # NaN
|
||
return "NaN"
|
||
if value in (float("inf"), float("-inf")):
|
||
return str(value)
|
||
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
|
||
return text if text else "0"
|
||
return model._safe_str(value)
|
||
|
||
|
||
def _fmt_coord(value, decimals: int = 4) -> str:
|
||
"""Format a coordinate degree value, defensively."""
|
||
try:
|
||
return f"{float(value):.{decimals}f}°"
|
||
except (TypeError, ValueError):
|
||
return model._safe_str(value)
|
||
|
||
|
||
def _fmt_km(value) -> str:
|
||
if value is None:
|
||
return "—"
|
||
try:
|
||
v = float(value)
|
||
except (TypeError, ValueError):
|
||
return model._safe_str(value)
|
||
if v >= 100:
|
||
return f"{v:,.0f} km".replace(",", ".")
|
||
return f"{v:.1f} km"
|
||
|
||
|
||
def _is_dict(v) -> bool:
|
||
return isinstance(v, dict)
|
||
|
||
|
||
def _clean_floats(seq) -> list:
|
||
"""Return a list of floats from an arbitrary sequence (drop None/NaN)."""
|
||
out = []
|
||
if not isinstance(seq, (list, tuple)):
|
||
return out
|
||
for v in seq:
|
||
try:
|
||
f = float(v)
|
||
except (TypeError, ValueError):
|
||
out.append(None)
|
||
continue
|
||
out.append(f if f == f else None) # NaN -> None
|
||
return out
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Resolve the (lat, lon) columns and the raw coordinate arrays.
|
||
# --------------------------------------------------------------------------- #
|
||
def _detect_columns(profile: dict) -> dict:
|
||
"""Detect the lat/lon column pair from the profile metadata, or {}."""
|
||
cols = profile.get("columns")
|
||
if not isinstance(cols, list) or not cols or detect_latlon_columns is None:
|
||
return {}
|
||
try:
|
||
det = detect_latlon_columns(cols)
|
||
except Exception: # noqa: BLE001 — never break the chapter.
|
||
return {}
|
||
return det if _is_dict(det) else {}
|
||
|
||
|
||
def _resolve_coords(profile: dict, ctx: dict, detected: dict):
|
||
"""Return (lats, lons, source_label).
|
||
|
||
Order: ctx/profile['geo_points'] (explicit arrays) → ctx/profile
|
||
['raw_numeric'] keyed by the detected lat/lon column names → (None, None).
|
||
"""
|
||
gp = ctx.get("geo_points") or profile.get("geo_points")
|
||
if _is_dict(gp):
|
||
lats = gp.get("lats")
|
||
if lats is None:
|
||
lats = gp.get("lat")
|
||
lons = gp.get("lons")
|
||
if lons is None:
|
||
lons = gp.get("lon")
|
||
if lats and lons:
|
||
return list(lats), list(lons), "geo_points"
|
||
|
||
lat_col = (detected or {}).get("lat_col")
|
||
lon_col = (detected or {}).get("lon_col")
|
||
if lat_col and lon_col:
|
||
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
|
||
if _is_dict(raw):
|
||
lats = raw.get(lat_col)
|
||
lons = raw.get(lon_col)
|
||
if lats and lons:
|
||
return list(lats), list(lons), "raw_numeric"
|
||
return None, None, "none"
|
||
|
||
|
||
def _column_by_name(profile: dict, name):
|
||
if not name:
|
||
return None
|
||
for col in profile.get("columns") or []:
|
||
if isinstance(col, dict) and col.get("name") == name:
|
||
return col
|
||
return None
|
||
|
||
|
||
def _bbox_from_profile(profile: dict, detected: dict):
|
||
"""Approximate bbox from the per-column numeric.min/max (no raw points)."""
|
||
lat_c = _column_by_name(profile, (detected or {}).get("lat_col"))
|
||
lon_c = _column_by_name(profile, (detected or {}).get("lon_col"))
|
||
lat_n = lat_c.get("numeric") if _is_dict(lat_c) else None
|
||
lon_n = lon_c.get("numeric") if _is_dict(lon_c) else None
|
||
if not _is_dict(lat_n) or not _is_dict(lon_n):
|
||
return None
|
||
try:
|
||
return {
|
||
"lat_min": float(lat_n.get("min")),
|
||
"lat_max": float(lat_n.get("max")),
|
||
"lon_min": float(lon_n.get("min")),
|
||
"lon_max": float(lon_n.get("max")),
|
||
}
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Figure builder (lazy: matplotlib only imported when the renderer draws it).
|
||
# --------------------------------------------------------------------------- #
|
||
def _make_geo_scatter(scatter: dict, lat_col: str, lon_col: str):
|
||
"""Return a zero-arg callable drawing the geographic scatter, or None."""
|
||
points = scatter.get("points") or []
|
||
if not points:
|
||
return None
|
||
bbox = scatter.get("bbox") if _is_dict(scatter.get("bbox")) else {}
|
||
aspect = scatter.get("aspect") or 1.0
|
||
pad = scatter.get("pad") if _is_dict(scatter.get("pad")) else {}
|
||
n_total = scatter.get("n_total")
|
||
n_shown = scatter.get("n_shown")
|
||
|
||
def _draw():
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
xs = [p[0] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
|
||
ys = [p[1] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
|
||
|
||
fig, ax = plt.subplots(figsize=(6.6, 5.0))
|
||
# More points -> smaller markers + lower alpha so dense clouds read as
|
||
# density without saturating the page with ink (Tufte).
|
||
n = max(len(xs), 1)
|
||
size = 18 if n <= 200 else (8 if n <= 1000 else 4)
|
||
alpha = 0.75 if n <= 200 else (0.5 if n <= 1000 else 0.35)
|
||
ax.scatter(xs, ys, s=size, c="#2a6f97", alpha=alpha, linewidths=0,
|
||
zorder=3)
|
||
|
||
# Bounding box rectangle for orientation.
|
||
if bbox:
|
||
try:
|
||
lo_x, hi_x = float(bbox["lon_min"]), float(bbox["lon_max"])
|
||
lo_y, hi_y = float(bbox["lat_min"]), float(bbox["lat_max"])
|
||
ax.plot([lo_x, hi_x, hi_x, lo_x, lo_x],
|
||
[lo_y, lo_y, hi_y, hi_y, lo_y],
|
||
color="#e15759", linewidth=1.0, linestyle="--",
|
||
alpha=0.8, zorder=4, label="Bounding box")
|
||
px = float(pad.get("lon", 0.0) or 0.0)
|
||
py = float(pad.get("lat", 0.0) or 0.0)
|
||
ax.set_xlim(lo_x - px, hi_x + px)
|
||
ax.set_ylim(lo_y - py, hi_y + py)
|
||
except (TypeError, ValueError, KeyError):
|
||
pass
|
||
|
||
# Equirectangular: scale Y/X so longitude is not stretched at this
|
||
# latitude (integridad de proyección, Tufte). aspect = 1/cos(lat).
|
||
try:
|
||
ax.set_aspect(float(aspect))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
ax.set_xlabel(f"Longitud ({lon_col})", fontsize=8)
|
||
ax.set_ylabel(f"Latitud ({lat_col})", fontsize=8)
|
||
ax.tick_params(labelsize=7)
|
||
ax.grid(color="#e6e6e6", linewidth=0.5, zorder=0)
|
||
title = "Distribución geográfica de las coordenadas"
|
||
if n_shown is not None and n_total is not None and n_shown < n_total:
|
||
title += f"\n(mostrando {n_shown:,} de {n_total:,} puntos)".replace(",", ".")
|
||
ax.set_title(title, fontsize=10)
|
||
ax.legend(loc="best", fontsize=7, frameon=True, framealpha=0.9)
|
||
fig.tight_layout()
|
||
return fig
|
||
|
||
return _draw
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Section builders.
|
||
# --------------------------------------------------------------------------- #
|
||
def _intro_block(detected: dict, lat_col: str, lon_col: str) -> list:
|
||
conf = (detected or {}).get("confidence")
|
||
reason = model._safe_str((detected or {}).get("reason"))
|
||
conf_txt = ""
|
||
if conf is not None:
|
||
try:
|
||
conf_txt = f" (confianza {float(conf) * 100:.0f}%)"
|
||
except (TypeError, ValueError):
|
||
conf_txt = ""
|
||
text = (
|
||
"Este dataset contiene **coordenadas geográficas**: se identificó el par "
|
||
f"**latitud = «{lat_col}»** y **longitud = «{lon_col}»**{conf_txt}. La "
|
||
"detección combina el nombre de la columna y el rango de sus valores "
|
||
"(latitud en [−90, 90], longitud en [−180, 180])."
|
||
)
|
||
if reason:
|
||
text += f"\n\n*Criterio de detección:* {reason}."
|
||
return [model.Heading(text=CHAPTER_TITLE, level=1),
|
||
model.Markdown(text=text)]
|
||
|
||
|
||
def _extent_blocks(extent: dict) -> list:
|
||
"""KVTable with bbox/centroid/span + DataTable with the per-region counts."""
|
||
if not _is_dict(extent) or not extent.get("n_points"):
|
||
return []
|
||
blocks = []
|
||
bbox = extent.get("bbox") if _is_dict(extent.get("bbox")) else {}
|
||
centroid = extent.get("centroid") if _is_dict(extent.get("centroid")) else {}
|
||
hemi = extent.get("hemisphere") if _is_dict(extent.get("hemisphere")) else {}
|
||
|
||
rows = [("Puntos con coordenadas", _fmt_num(extent.get("n_points")))]
|
||
if bbox:
|
||
rows.append(("Latitud (mín. / máx.)",
|
||
f"{_fmt_coord(bbox.get('lat_min'))} a "
|
||
f"{_fmt_coord(bbox.get('lat_max'))}"))
|
||
rows.append(("Longitud (mín. / máx.)",
|
||
f"{_fmt_coord(bbox.get('lon_min'))} a "
|
||
f"{_fmt_coord(bbox.get('lon_max'))}"))
|
||
if centroid:
|
||
rows.append(("Centroide",
|
||
f"{_fmt_coord(centroid.get('lat'))}, "
|
||
f"{_fmt_coord(centroid.get('lon'))}"))
|
||
if extent.get("span_km") is not None:
|
||
rows.append(("Extensión (diagonal)", _fmt_km(extent.get("span_km"))))
|
||
if hemi:
|
||
n, s = hemi.get("north"), hemi.get("south")
|
||
e, w = hemi.get("east"), hemi.get("west")
|
||
rows.append(("Hemisferios",
|
||
f"N {_fmt_num(n)} / S {_fmt_num(s)} · "
|
||
f"E {_fmt_num(e)} / O {_fmt_num(w)}"))
|
||
blocks.append(model.KVTable(rows=rows, title="Extensión geográfica"))
|
||
|
||
by_region = extent.get("by_region")
|
||
if isinstance(by_region, list) and by_region:
|
||
total = sum(r.get("count", 0) for r in by_region if _is_dict(r)) or 0
|
||
rrows = []
|
||
for r in by_region:
|
||
if not _is_dict(r):
|
||
continue
|
||
cnt = r.get("count", 0)
|
||
pct = (cnt / total) if total else None
|
||
pct_txt = f"{pct * 100:.1f}%" if pct is not None else "—"
|
||
rrows.append([model._safe_str(r.get("region")), _fmt_num(cnt),
|
||
pct_txt])
|
||
if rrows:
|
||
blocks.append(model.DataTable(
|
||
header=["Zona / país", "Puntos", "% del total"], rows=rrows,
|
||
title="Distribución por zona",
|
||
note="Asignación aproximada por bounding box de cada región "
|
||
"(no es reverse-geocoding exacto de fronteras)."))
|
||
return blocks
|
||
|
||
|
||
def _narrative_block(profile: dict, ctx: dict, extent: dict) -> list:
|
||
"""A one-line narrative of where the points concentrate.
|
||
|
||
Uses the derived ``note`` from analyze_geo_extent by default; optionally
|
||
calls an LLM (ctx['run_geo_llm']) for a richer one-liner.
|
||
"""
|
||
note = model._safe_str((extent or {}).get("note"))
|
||
if ctx.get("run_geo_llm"):
|
||
by_region = (extent or {}).get("by_region") or []
|
||
bbox = (extent or {}).get("bbox") or {}
|
||
try:
|
||
from core.ask_llm import ask_llm
|
||
prompt = (
|
||
"Eres un analista de datos. En UNA frase en español, describe "
|
||
"dónde se concentran geográficamente estos puntos. Sé concreto "
|
||
"y no inventes precisión que los datos no tienen.\n"
|
||
f"Conteo por zona: {by_region}\nBounding box: {bbox}."
|
||
)
|
||
out = ask_llm(prompt,
|
||
model=ctx.get("geo_llm_model",
|
||
"claude-haiku-4-5-20251001"),
|
||
echo=False)
|
||
if out and isinstance(out, str) and out.strip():
|
||
note = out.strip()
|
||
except Exception: # noqa: BLE001 — degrade to the derived note.
|
||
pass
|
||
if not note:
|
||
return []
|
||
return [model.Markdown(text=f"**Interpretación.** {note}")]
|
||
|
||
|
||
def _no_points_block(profile: dict, detected: dict) -> list:
|
||
"""Degrade honestly when the raw coordinate arrays are not available."""
|
||
blocks = []
|
||
bbox = _bbox_from_profile(profile, detected)
|
||
if bbox:
|
||
rows = [
|
||
("Latitud (mín. / máx.)",
|
||
f"{_fmt_coord(bbox.get('lat_min'))} a "
|
||
f"{_fmt_coord(bbox.get('lat_max'))}"),
|
||
("Longitud (mín. / máx.)",
|
||
f"{_fmt_coord(bbox.get('lon_min'))} a "
|
||
f"{_fmt_coord(bbox.get('lon_max'))}"),
|
||
]
|
||
blocks.append(model.KVTable(
|
||
rows=rows, title="Extensión geográfica (aproximada)"))
|
||
blocks.append(model.Note(
|
||
"No se incluyeron las coordenadas crudas en el contexto, por lo que el "
|
||
"mapa y el análisis por zona no se han dibujado. El bounding box "
|
||
"mostrado se deriva de los mínimos y máximos por columna. Para el "
|
||
"scatter geográfico completo, pasa los arrays en "
|
||
"ctx['geo_points'] = {'lats': [...], 'lons': [...]} o las columnas en "
|
||
"ctx['raw_numeric']."))
|
||
return blocks
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Entry point.
|
||
# --------------------------------------------------------------------------- #
|
||
def build_geospatial(profile: dict, ctx: dict):
|
||
"""Build the GEOSPATIAL Chapter, or None if the dataset has no coordinates.
|
||
|
||
Args:
|
||
profile: the ``eda`` group TableProfile dict.
|
||
ctx: presentation context; may carry ``geo_points``/``raw_numeric`` with
|
||
the raw coordinate arrays and the ``run_geo_llm`` flag.
|
||
|
||
Returns:
|
||
A ``model.Chapter`` with the geographic scatter + zone/country analysis,
|
||
or ``None`` when no latitude/longitude column pair is detected.
|
||
"""
|
||
profile = profile or {}
|
||
ctx = ctx or {}
|
||
if not isinstance(profile, dict):
|
||
return None
|
||
|
||
detected = _detect_columns(profile)
|
||
lats, lons, source = _resolve_coords(profile, ctx, detected)
|
||
|
||
has_detection = bool((detected or {}).get("lat_col") and
|
||
(detected or {}).get("lon_col"))
|
||
has_points = bool(lats and lons)
|
||
if not has_detection and not has_points:
|
||
return None # chapter does not apply: no coordinates in this dataset.
|
||
|
||
# Labels for axes / intro. When only raw arrays were given (no detection),
|
||
# fall back to generic names.
|
||
lat_col = (detected or {}).get("lat_col") or "lat"
|
||
lon_col = (detected or {}).get("lon_col") or "lon"
|
||
|
||
blocks = _intro_block(detected, lat_col, lon_col)
|
||
|
||
if has_points:
|
||
clean_lats = _clean_floats(lats)
|
||
clean_lons = _clean_floats(lons)
|
||
|
||
# Zone / country analysis.
|
||
extent = {}
|
||
if analyze_geo_extent is not None:
|
||
try:
|
||
extent = analyze_geo_extent(clean_lats, clean_lons) or {}
|
||
except Exception: # noqa: BLE001
|
||
extent = {}
|
||
|
||
# The geographic scatter figure (its own page/slide).
|
||
scatter = {}
|
||
if build_geo_scatter is not None:
|
||
try:
|
||
scatter = build_geo_scatter(clean_lats, clean_lons) or {}
|
||
except Exception: # noqa: BLE001
|
||
scatter = {}
|
||
maker = _make_geo_scatter(scatter, lat_col, lon_col) if scatter else None
|
||
if maker is not None:
|
||
blocks.append(model.Figure(
|
||
make=maker,
|
||
caption="Cada punto es una observación situada por sus "
|
||
"coordenadas; el recuadro rojo es el bounding box. La "
|
||
"escala respeta la latitud (proyección equirectangular)."))
|
||
else:
|
||
blocks.append(model.Note(
|
||
"No se pudo construir el scatter geográfico a partir de las "
|
||
"coordenadas proporcionadas."))
|
||
|
||
blocks += _extent_blocks(extent)
|
||
blocks += _narrative_block(profile, ctx, extent)
|
||
else:
|
||
# Columns detected but no raw points available — degrade honestly.
|
||
blocks += _no_points_block(profile, detected)
|
||
|
||
if not blocks:
|
||
return None
|
||
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
|
||
version=CHAPTER_VERSION, blocks=blocks)
|