Compare commits

...

2 Commits

Author SHA1 Message Date
egutierrez 00cd5274bc feat(eda): capítulo GEOSPATIAL del AutomaticEDA (scatter geográfico + zona/país)
Capítulo nuevo chapters/geospatial.py (CHAPTER_VERSION 1.0.0). Cuando el dataset
tiene un par de coordenadas, dibuja un scatter geográfico en proyección
equirectangular (la escala respeta la latitud para no estirar la longitud) y
analiza la extensión: bounding box, centroide, span, conteo por zona/país,
hemisferios y una interpretación. Cuando NO hay coordenadas, build_geospatial
devuelve None y el capítulo se omite.

Sigue el contrato de capítulos (firma build_<id>(profile, ctx) -> Chapter|None,
lectura defensiva, nunca lanza) y el patrón de modelos/num_distr: delega el
cálculo a las primitivas puras del registry (detect_latlon_columns,
analyze_geo_extent, build_geo_scatter) y solo dibuja la figura matplotlib de
forma perezosa. Las coordenadas crudas llegan por ctx['geo_points'] o
ctx['raw_numeric'] (como modelos lee raw_numeric); sin ellas, degrada con un
bounding box aproximado de numeric.min/max y una nota honesta.

Anti-cortes: usa DataTable/KVTable/Figure/Markdown del modelo, que el paginador
parte sin cortar. Test self-contained con golden + 6 edges + anti-cut (nombres
largos + 2100 puntos en varias regiones renderizan a PDF y PPTX sin truncar).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:29:33 +02:00
egutierrez cd658cc703 feat(eda): primitivas geoespaciales del grupo eda (detección lat/lon + extensión + scatter)
Tres funciones puras nuevas del dominio datascience (tags eda + geospatial) que
sostienen el capítulo GEOSPATIAL del AutomaticEDA, delegadas a fn-constructor:

- detect_latlon_columns: identifica el par (lat, lon) por nombre de columna +
  rango de valores ([-90,90] / [-180,180]) desde profile['columns']. Devuelve
  {lat_col, lon_col, confidence, reason}. 9 tests.
- analyze_geo_extent: bbox, centroide, span haversine, conteo por zona/país
  (lookup offline con bounding boxes embebidos, KISS sin geopandas) y
  hemisferios. 7 tests.
- build_geo_scatter: prepara los puntos del scatter en orden [lon, lat] con
  downsampling determinista por paso fijo + aspect equirectangular 1/cos(lat)
  clampado. 6 tests.

Registradas en datascience/__init__.py. Todas pure, params_schema completo,
.md autosuficiente (Ejemplo + Cuando usarla + Gotchas).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:29:33 +02:00
12 changed files with 1891 additions and 0 deletions
+6
View File
@@ -44,6 +44,9 @@ from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .detect_latlon_columns import detect_latlon_columns
from .analyze_geo_extent import analyze_geo_extent
from .build_geo_scatter import build_geo_scatter
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -90,6 +93,9 @@ __all__ = [
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"detect_latlon_columns",
"analyze_geo_extent",
"build_geo_scatter",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -0,0 +1,61 @@
---
name: analyze_geo_extent
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def analyze_geo_extent(lats: list, lons: list) -> dict"
description: "Calcula la extension geografica de una nube de coordenadas (lat/lon) y asigna cada punto a un pais/region mediante un lookup OFFLINE contra una tabla de bounding boxes embebida como constante. Devuelve bounding box, centroide, span de la diagonal (haversine), conteo por region (top-8 + Otros), reparto por hemisferios y una frase resumen en ES. Lectura defensiva: descarta pares None/NaN/fuera de rango y NUNCA lanza. Solo stdlib (math); sin geopandas/shapely. Las cajas de paises son rectangulos aproximados, no reverse-geocoding exacto."
tags: [eda, geospatial, geo, coordinates, bounding-box, haversine, datascience]
params:
- name: lats
desc: "Lista de latitudes en grados, rango valido [-90, 90]. Se empareja por indice con lons (gana la longitud minima comun si difieren). Cada valor puede ser None/NaN/no-numerico/fuera de rango: se lee defensivo y se descarta el par."
- name: lons
desc: "Lista de longitudes en grados, rango valido [-180, 180]. Paralela a lats, emparejada por indice. Valores None/NaN/no-numericos/fuera de rango se descartan junto con su par."
output: "Dict con el resumen geografico: {n_points=pares validos usados, bbox={lat_min,lat_max,lon_min,lon_max} o None, centroid={lat,lon}=media de lat/lon validos o None, span_km=distancia haversine (radio 6371 km) de la diagonal SO->NE del bbox, by_region=[{region,count}] descendente por count limitado a top-8 con el resto agregado en 'Otros', hemisphere={north,south,east,west} (ecuador->norte, meridiano 0->este), note=frase ES resumen}. Si no hay pares validos devuelve la forma cero: n_points 0, bbox None, centroid None, span_km 0.0, by_region [], hemisphere a ceros y note 'sin coordenadas validas'. Puntos que no caen en ninguna caja -> region 'Oceano/Otros'."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: true
tests: ["test_nube_en_espana", "test_dos_paises_distintos", "test_listas_vacias", "test_pares_invalidos_filtrados", "test_longitudes_desbalanceadas", "test_span_km_haversine_par_conocido", "test_no_lanza_con_entradas_raras"]
test_file_path: "python/functions/datascience/analyze_geo_extent_test.py"
file_path: "python/functions/datascience/analyze_geo_extent.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.analyze_geo_extent import analyze_geo_extent
# Nube de puntos alrededor de Madrid + un punto en Paris.
lats = [40.4, 40.0, 41.0, 48.8]
lons = [-3.7, -3.5, -4.0, 2.3]
res = analyze_geo_extent(lats, lons)
print(res["n_points"]) # 4
print(res["by_region"]) # [{'region': 'España', 'count': 3}, {'region': 'Francia', 'count': 1}]
print(round(res["span_km"], 1)) # diagonal SO->NE del bbox en km
print(res["hemisphere"]) # {'north': 4, 'south': 0, 'east': 1, 'west': 3}
print(res["note"]) # los puntos se concentran en España (3 de 4)
```
## Cuando usarla
- Usala en el perfilado EDA (grupo `eda`) cuando una tabla tenga columnas de latitud y longitud y quieras un resumen geografico rapido: donde se concentran los puntos, cuanto territorio cubren y a que paises/regiones caen, sin montar geopandas ni un reverse-geocoder.
- Cuando necesites un capitulo `geospatial` del `AutomaticEDA`: alimenta el bbox + centroide para centrar un mapa, el `span_km` para elegir el zoom, y `by_region` para una tabla de conteos por pais.
- Cuando quieras detectar datos sucios de coordenadas (mezcla de hemisferios inesperada, puntos en `Oceano/Otros`, span enorme) antes de seguir el analisis.
## Gotchas
- Funcion pura, sin I/O ni red y determinista: mismas entradas -> misma salida. Lectura defensiva, NUNCA lanza; pares con None/NaN o fuera de rango ([-90,90] lat, [-180,180] lon) se descartan en silencio.
- El lookup de region es una **aproximacion rectangular**: cada pais/region es un bounding box, NO su frontera real. Un punto en el mar cerca de una costa, o en una esquina del rectangulo, puede asignarse a un pais vecino. No es reverse-geocoding exacto — para precision real hace falta un shapefile (fuera de scope por KISS).
- Cajas solapadas se resuelven por orden: gana la PRIMERA que contiene el punto. Los paises se listan antes que los continentes (fallback), y entre vecinos el mas estrecho/occidental va primero (Portugal antes que España, Chile antes que Argentina, EEUU contiguo antes que Canada). Un punto que no cae en ninguna caja -> `Oceano/Otros`.
- La tabla cubre ~24 paises grandes + 6 regiones continentales; paises pequeños o no listados caen a su continente o a `Oceano/Otros`. No incluye territorios insulares lejanos (Canarias, Hawaii, etc.).
- `span_km` es la diagonal del bounding box (esquina SO a NE), no la dispersion real de la nube ni el area; con un solo punto valido el bbox es degenerado y `span_km` es 0.0.
- El ecuador (`lat == 0`) cuenta como hemisferio norte y el meridiano 0 (`lon == 0`) como este, por convencion `>= 0`.
@@ -0,0 +1,209 @@
"""analyze_geo_extent — geographic extent of a cloud of coordinates (EDA `geospatial`).
Pure function: no I/O, no network, deterministic. Given two parallel lists of
latitudes and longitudes it derives the bounding box, centroid, diagonal span
(haversine), per-region counts and hemisphere split of the points, and assigns
each point to a country/region via an OFFLINE lookup against a table of
rectangular bounding boxes embedded as a constant (`_REGION_BBOXES`).
It never reads files, never hits the network and depends only on `math`. The
country boxes are deliberately coarse rectangles (a KISS approximation, NOT a
reverse-geocoder). Reading is defensive throughout and the function NEVER
raises: invalid pairs (None / NaN / out of range) are silently discarded and an
empty cloud yields a zeroed result the caller can skip.
"""
import math
# Earth mean radius in km used by the haversine formula.
_EARTH_RADIUS_KM = 6371.0
# How many distinct regions to surface in `by_region` before collapsing the
# remainder into a single "Otros" bucket.
_TOP_REGIONS = 8
# Offline region lookup: (name, lat_min, lat_max, lon_min, lon_max).
#
# Specific countries are listed FIRST and continental fallbacks LAST: each point
# is assigned to the FIRST box that contains it, so the more specific country box
# wins over the broad continent box. Boxes are coarse rectangles approximating
# the mainland extent of each region; overlapping neighbours are ordered so the
# narrower/more-western country claims its coastal points (e.g. Portugal before
# Spain, Chile before Argentina, the contiguous US before Canada).
_REGION_BBOXES = (
# --- countries (specific) ---
("Portugal", 36.9, 42.2, -9.6, -6.2),
("España", 36.0, 43.8, -9.4, 3.4),
("Francia", 41.3, 51.1, -5.2, 9.6),
("Reino Unido", 49.9, 58.7, -8.6, 1.8),
("Irlanda", 51.4, 55.4, -10.6, -5.9),
("Países Bajos", 50.7, 53.6, 3.3, 7.2),
("Bélgica", 49.5, 51.5, 2.5, 6.4),
("Suiza", 45.8, 47.8, 5.9, 10.5),
("Alemania", 47.3, 55.1, 5.9, 15.0),
("Italia", 36.6, 47.1, 6.6, 18.5),
("Marruecos", 27.7, 35.9, -13.2, -1.0),
("Egipto", 22.0, 31.7, 25.0, 35.0),
("Sudáfrica", -34.8, -22.1, 16.5, 32.9),
("China", 18.0, 53.6, 73.5, 135.1),
("Japón", 24.0, 45.6, 122.9, 145.9),
("India", 6.7, 35.5, 68.1, 97.4),
("Australia", -43.7, -10.0, 112.9, 153.7),
("México", 14.5, 32.7, -118.4, -86.7),
("Estados Unidos", 24.4, 49.4, -125.0, -66.9),
("Canadá", 41.7, 83.1, -141.0, -52.6),
("Chile", -55.9, -17.5, -75.6, -66.4),
("Argentina", -55.1, -21.8, -73.6, -53.6),
("Brasil", -33.8, 5.3, -74.0, -34.8),
("Rusia", 41.2, 77.0, 19.6, 180.0),
# --- continental fallbacks (broad) ---
("Europa", 34.0, 72.0, -25.0, 45.0),
("África", -35.0, 37.5, -18.0, 52.0),
("Asia", 5.0, 78.0, 26.0, 180.0),
("América del Norte", 7.0, 84.0, -168.0, -52.0),
("América del Sur", -56.0, 13.0, -82.0, -34.0),
("Oceanía", -50.0, 0.0, 110.0, 180.0),
)
def _coord(value, limit):
"""Coerce a coordinate to a valid float in [-limit, limit] or None.
bool is a subclass of int but never a real coordinate, so True/False are
treated as missing. NaN and out-of-range values are rejected.
"""
if value is None or isinstance(value, bool):
return None
try:
f = float(value)
except (TypeError, ValueError):
return None
# NaN is the only value that is not equal to itself.
if f != f or f < -limit or f > limit:
return None
return f
def _haversine_km(lat1, lon1, lat2, lon2):
"""Great-circle distance in km between two (lat, lon) points in degrees."""
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2.0) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2.0) ** 2
return 2.0 * _EARTH_RADIUS_KM * math.asin(min(1.0, math.sqrt(a)))
def _region_of(lat, lon):
"""Return the name of the first embedded box containing (lat, lon)."""
for name, lat_min, lat_max, lon_min, lon_max in _REGION_BBOXES:
if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max:
return name
return "Océano/Otros"
def _empty_result():
"""Result shape when there are no valid coordinate pairs."""
return {
"n_points": 0,
"bbox": None,
"centroid": None,
"span_km": 0.0,
"by_region": [],
"hemisphere": {"north": 0, "south": 0, "east": 0, "west": 0},
"note": "sin coordenadas validas",
}
def analyze_geo_extent(lats: list, lons: list) -> dict:
"""Summarise the geographic extent of a cloud of lat/lon coordinates.
Pairs `lats[i]` with `lons[i]` by index (over the common length when the two
lists differ in size), discards any pair where either value is None / NaN or
outside [-90, 90] (lat) / [-180, 180] (lon), and derives the bounding box,
centroid, diagonal span, per-region counts and hemisphere split. Each valid
point is matched to a country/region by an offline lookup against coarse
rectangular bounding boxes (`_REGION_BBOXES`).
Args:
lats: List of latitudes in degrees ([-90, 90]); read defensively.
lons: List of longitudes in degrees ([-180, 180]); read defensively.
Paired with `lats` by index; the shorter length wins when they differ.
Returns:
Dict with the geographic summary:
{n_points, bbox={lat_min,lat_max,lon_min,lon_max}, centroid={lat,lon},
span_km (haversine of the SW->NE bbox diagonal), by_region=[{region,count}]
(descending, top-8 with the rest folded into "Otros"),
hemisphere={north,south,east,west}, note (Spanish summary phrase)}.
With no valid pairs returns the zeroed shape: n_points 0, bbox None,
centroid None, span_km 0.0, empty by_region, zeroed hemisphere and the
note "sin coordenadas validas". Never raises.
"""
if not isinstance(lats, (list, tuple)) or not isinstance(lons, (list, tuple)):
return _empty_result()
valid = []
# zip already stops at the shorter list -> unbalanced lengths are handled.
for raw_lat, raw_lon in zip(lats, lons):
lat = _coord(raw_lat, 90.0)
lon = _coord(raw_lon, 180.0)
if lat is None or lon is None:
continue
valid.append((lat, lon))
if not valid:
return _empty_result()
n = len(valid)
lat_vals = [p[0] for p in valid]
lon_vals = [p[1] for p in valid]
lat_min, lat_max = min(lat_vals), max(lat_vals)
lon_min, lon_max = min(lon_vals), max(lon_vals)
centroid_lat = sum(lat_vals) / n
centroid_lon = sum(lon_vals) / n
# Diagonal span: SW corner (lat_min, lon_min) to NE corner (lat_max, lon_max).
span_km = _haversine_km(lat_min, lon_min, lat_max, lon_max)
# Hemisphere split: the equator/prime-meridian go to north/east respectively.
north = sum(1 for lat in lat_vals if lat >= 0.0)
south = n - north
east = sum(1 for lon in lon_vals if lon >= 0.0)
west = n - east
# Count points per region (offline bbox lookup).
counts = {}
for lat, lon in valid:
region = _region_of(lat, lon)
counts[region] = counts.get(region, 0) + 1
# Descending by count, then by name for a deterministic tie-break.
ranked = sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))
by_region = [{"region": name, "count": count} for name, count in ranked[:_TOP_REGIONS]]
rest = sum(count for _, count in ranked[_TOP_REGIONS:])
if rest > 0:
by_region.append({"region": "Otros", "count": rest})
top_region, top_count = ranked[0]
note = (
"los puntos se concentran en {region} ({count} de {n})".format(
region=top_region, count=top_count, n=n
)
)
return {
"n_points": n,
"bbox": {
"lat_min": lat_min,
"lat_max": lat_max,
"lon_min": lon_min,
"lon_max": lon_max,
},
"centroid": {"lat": centroid_lat, "lon": centroid_lon},
"span_km": span_km,
"by_region": by_region,
"hemisphere": {"north": north, "south": south, "east": east, "west": west},
"note": note,
}
@@ -0,0 +1,126 @@
"""Tests para analyze_geo_extent."""
import math
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from analyze_geo_extent import analyze_geo_extent, _haversine_km
# Keys that a non-empty result dict must always contain.
_EXPECTED_KEYS = {
"n_points", "bbox", "centroid", "span_km",
"by_region", "hemisphere", "note",
}
def test_nube_en_espana():
"""Golden: nube de puntos alrededor de Madrid -> region top = España."""
# Cuatro puntos en torno a Madrid (lat ~40, lon ~-3.7), con algo de spread.
lats = [40.4, 40.0, 41.0, 39.5]
lons = [-3.7, -3.5, -4.0, -3.2]
res = analyze_geo_extent(lats, lons)
assert set(res.keys()) == _EXPECTED_KEYS
assert res["n_points"] == 4
# Todos caen en España -> by_region una sola entrada.
assert res["by_region"][0]["region"] == "España"
assert res["by_region"][0]["count"] == 4
# Centroide coherente: media de lat y lon.
assert math.isclose(res["centroid"]["lat"], sum(lats) / 4, rel_tol=1e-9)
assert math.isclose(res["centroid"]["lon"], sum(lons) / 4, rel_tol=1e-9)
# bbox correcto.
assert res["bbox"]["lat_min"] == 39.5
assert res["bbox"]["lat_max"] == 41.0
assert res["bbox"]["lon_min"] == -4.0
assert res["bbox"]["lon_max"] == -3.2
# Hay spread -> diagonal > 0.
assert res["span_km"] > 0.0
# Hemisferio norte (lat>0) y oeste (lon<0).
assert res["hemisphere"]["north"] == 4
assert res["hemisphere"]["south"] == 0
assert res["hemisphere"]["east"] == 0
assert res["hemisphere"]["west"] == 4
assert "España" in res["note"]
def test_dos_paises_distintos():
"""Golden: puntos en España y Francia -> by_region con 2 entradas."""
# Madrid (España) x2 y Paris (Francia) x1.
lats = [40.4, 40.0, 48.8]
lons = [-3.7, -3.5, 2.3]
res = analyze_geo_extent(lats, lons)
assert res["n_points"] == 3
regions = {entry["region"]: entry["count"] for entry in res["by_region"]}
assert regions == {"España": 2, "Francia": 1}
# Orden descendente por count: España (2) antes que Francia (1).
assert res["by_region"][0]["region"] == "España"
assert res["by_region"][0]["count"] == 2
# Madrid y Paris ambos hemisferio norte; Paris lon>0 -> 1 east, 2 west.
assert res["hemisphere"]["north"] == 3
assert res["hemisphere"]["east"] == 1
assert res["hemisphere"]["west"] == 2
def test_listas_vacias():
"""Edge: listas vacias -> n_points 0, bbox None, sin lanzar."""
res = analyze_geo_extent([], [])
assert res["n_points"] == 0
assert res["bbox"] is None
assert res["centroid"] is None
assert res["span_km"] == 0.0
assert res["by_region"] == []
assert res["hemisphere"] == {"north": 0, "south": 0, "east": 0, "west": 0}
assert res["note"] == "sin coordenadas validas"
def test_pares_invalidos_filtrados():
"""Edge: None / NaN / fuera de rango se descartan, no lanza."""
nan = float("nan")
lats = [40.4, None, nan, 91.0, -200.0, 40.0]
lons = [-3.7, -3.5, -3.0, 2.0, 5.0, -3.5]
# Validos: indices 0 y 5 (lat 91 fuera de rango, lon -200 fuera de rango,
# None y NaN descartados).
res = analyze_geo_extent(lats, lons)
assert res["n_points"] == 2
assert res["by_region"][0]["region"] == "España"
assert res["by_region"][0]["count"] == 2
def test_longitudes_desbalanceadas():
"""Edge: len(lats) != len(lons) usa el minimo comun sin lanzar."""
lats = [40.4, 40.0, 41.0, 39.5] # 4 elementos
lons = [-3.7, -3.5] # 2 elementos
res = analyze_geo_extent(lats, lons)
# Solo se emparejan los 2 primeros.
assert res["n_points"] == 2
assert res["bbox"]["lat_min"] == 40.0
assert res["bbox"]["lat_max"] == 40.4
def test_span_km_haversine_par_conocido():
"""Edge: span_km coincide con haversine de la diagonal del bbox."""
# Dos puntos: (0, 0) y (0, 1). bbox diagonal = mismos dos puntos.
res = analyze_geo_extent([0.0, 0.0], [0.0, 1.0])
# 1 grado de longitud en el ecuador ~ 111.19 km.
expected = _haversine_km(0.0, 0.0, 0.0, 1.0)
assert math.isclose(res["span_km"], expected, rel_tol=1e-9)
assert math.isclose(res["span_km"], 111.19, abs_tol=0.5)
def test_no_lanza_con_entradas_raras():
"""Edge: tipos no-lista o None devuelven la forma vacia sin lanzar."""
assert analyze_geo_extent(None, None)["n_points"] == 0
assert analyze_geo_extent("foo", "bar")["n_points"] == 0
# Strings dentro de las listas se descartan como invalidos.
res = analyze_geo_extent(["x", 40.0], [None, -3.5])
assert res["n_points"] == 1
@@ -0,0 +1,477 @@
"""Geospatial chapter (GEOSPATIAL) for AutomaticEDA.
When the dataset carries a coordinate pair (latitude/longitude), this chapter
draws the points on a **geographic scatter** in an equirectangular projection
(scaled so degrees of longitude are not stretched at the data's latitude) and
analyses the **zone / country** the points fall in: bounding box, centroid,
geographic span, and a per-region count. When there is **no** coordinate pair the
chapter returns ``None`` — exactly the user requirement.
Detection and the heavy lifting are delegated to pure ``eda``-group registry
functions, never reimplemented here:
- ``detect_latlon_columns`` — finds the (lat, lon) column pair by name + value
range from the ``profile['columns']`` metadata.
- ``analyze_geo_extent`` — bbox, centroid, haversine span, per-region counts and
hemisphere from the raw coordinate arrays.
- ``build_geo_scatter`` — deterministically down-sampled points + bbox + the
aspect ratio for the equirectangular projection. This chapter only draws the
matplotlib figure from that prepared data (same split as ``num_distr`` does
with ``build_boxplot_stats``).
The raw coordinate arrays are **not** in a standard TableProfile (it stores only
per-column aggregates), so — exactly like ``modelos`` reads ``raw_numeric`` from
``ctx`` — this chapter looks for the coordinates in ``ctx`` (or ``profile``) and
degrades honestly when they are absent: it still detects the columns and shows an
approximate bounding box derived from the per-column ``numeric.min/max``, with a
note that the raw points are needed for the map.
ctx keys this chapter consumes (all optional):
geo_points : dict — ``{"lats": [...], "lons": [...]}`` raw coordinate arrays.
Used directly when present (forward-compatible with a calculation phase
that samples them from the table).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``geo_points`` is not, the detected lat/lon columns are read from it.
run_geo_llm : bool — when True, call ``ask_llm`` for a one-line narrative of
where the points concentrate (otherwise a derived note is used).
geo_llm_model : str — model id for the optional live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and never raises.
"""
from __future__ import annotations
import math
from .. import model
# Pure registry functions (group ``eda``) delegated to. Imported defensively so
# the chapter stays importable (degrading gracefully) if one is unavailable.
try:
from datascience.detect_latlon_columns import detect_latlon_columns
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
detect_latlon_columns = None # type: ignore[assignment]
try:
from datascience.analyze_geo_extent import analyze_geo_extent
except Exception: # noqa: BLE001
analyze_geo_extent = None # type: ignore[assignment]
try:
from datascience.build_geo_scatter import build_geo_scatter
except Exception: # noqa: BLE001
build_geo_scatter = None # type: ignore[assignment]
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "geospatial"
CHAPTER_TITLE = "Análisis geoespacial"
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the other chapters' defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 4) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_coord(value, decimals: int = 4) -> str:
"""Format a coordinate degree value, defensively."""
try:
return f"{float(value):.{decimals}f}°"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_km(value) -> str:
if value is None:
return ""
try:
v = float(value)
except (TypeError, ValueError):
return model._safe_str(value)
if v >= 100:
return f"{v:,.0f} km".replace(",", ".")
return f"{v:.1f} km"
def _is_dict(v) -> bool:
return isinstance(v, dict)
def _clean_floats(seq) -> list:
"""Return a list of floats from an arbitrary sequence (drop None/NaN)."""
out = []
if not isinstance(seq, (list, tuple)):
return out
for v in seq:
try:
f = float(v)
except (TypeError, ValueError):
out.append(None)
continue
out.append(f if f == f else None) # NaN -> None
return out
# --------------------------------------------------------------------------- #
# Resolve the (lat, lon) columns and the raw coordinate arrays.
# --------------------------------------------------------------------------- #
def _detect_columns(profile: dict) -> dict:
"""Detect the lat/lon column pair from the profile metadata, or {}."""
cols = profile.get("columns")
if not isinstance(cols, list) or not cols or detect_latlon_columns is None:
return {}
try:
det = detect_latlon_columns(cols)
except Exception: # noqa: BLE001 — never break the chapter.
return {}
return det if _is_dict(det) else {}
def _resolve_coords(profile: dict, ctx: dict, detected: dict):
"""Return (lats, lons, source_label).
Order: ctx/profile['geo_points'] (explicit arrays) → ctx/profile
['raw_numeric'] keyed by the detected lat/lon column names → (None, None).
"""
gp = ctx.get("geo_points") or profile.get("geo_points")
if _is_dict(gp):
lats = gp.get("lats")
if lats is None:
lats = gp.get("lat")
lons = gp.get("lons")
if lons is None:
lons = gp.get("lon")
if lats and lons:
return list(lats), list(lons), "geo_points"
lat_col = (detected or {}).get("lat_col")
lon_col = (detected or {}).get("lon_col")
if lat_col and lon_col:
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw):
lats = raw.get(lat_col)
lons = raw.get(lon_col)
if lats and lons:
return list(lats), list(lons), "raw_numeric"
return None, None, "none"
def _column_by_name(profile: dict, name):
if not name:
return None
for col in profile.get("columns") or []:
if isinstance(col, dict) and col.get("name") == name:
return col
return None
def _bbox_from_profile(profile: dict, detected: dict):
"""Approximate bbox from the per-column numeric.min/max (no raw points)."""
lat_c = _column_by_name(profile, (detected or {}).get("lat_col"))
lon_c = _column_by_name(profile, (detected or {}).get("lon_col"))
lat_n = lat_c.get("numeric") if _is_dict(lat_c) else None
lon_n = lon_c.get("numeric") if _is_dict(lon_c) else None
if not _is_dict(lat_n) or not _is_dict(lon_n):
return None
try:
return {
"lat_min": float(lat_n.get("min")),
"lat_max": float(lat_n.get("max")),
"lon_min": float(lon_n.get("min")),
"lon_max": float(lon_n.get("max")),
}
except (TypeError, ValueError):
return None
# --------------------------------------------------------------------------- #
# Figure builder (lazy: matplotlib only imported when the renderer draws it).
# --------------------------------------------------------------------------- #
def _make_geo_scatter(scatter: dict, lat_col: str, lon_col: str):
"""Return a zero-arg callable drawing the geographic scatter, or None."""
points = scatter.get("points") or []
if not points:
return None
bbox = scatter.get("bbox") if _is_dict(scatter.get("bbox")) else {}
aspect = scatter.get("aspect") or 1.0
pad = scatter.get("pad") if _is_dict(scatter.get("pad")) else {}
n_total = scatter.get("n_total")
n_shown = scatter.get("n_shown")
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
xs = [p[0] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
ys = [p[1] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
fig, ax = plt.subplots(figsize=(6.6, 5.0))
# More points -> smaller markers + lower alpha so dense clouds read as
# density without saturating the page with ink (Tufte).
n = max(len(xs), 1)
size = 18 if n <= 200 else (8 if n <= 1000 else 4)
alpha = 0.75 if n <= 200 else (0.5 if n <= 1000 else 0.35)
ax.scatter(xs, ys, s=size, c="#2a6f97", alpha=alpha, linewidths=0,
zorder=3)
# Bounding box rectangle for orientation.
if bbox:
try:
lo_x, hi_x = float(bbox["lon_min"]), float(bbox["lon_max"])
lo_y, hi_y = float(bbox["lat_min"]), float(bbox["lat_max"])
ax.plot([lo_x, hi_x, hi_x, lo_x, lo_x],
[lo_y, lo_y, hi_y, hi_y, lo_y],
color="#e15759", linewidth=1.0, linestyle="--",
alpha=0.8, zorder=4, label="Bounding box")
px = float(pad.get("lon", 0.0) or 0.0)
py = float(pad.get("lat", 0.0) or 0.0)
ax.set_xlim(lo_x - px, hi_x + px)
ax.set_ylim(lo_y - py, hi_y + py)
except (TypeError, ValueError, KeyError):
pass
# Equirectangular: scale Y/X so longitude is not stretched at this
# latitude (integridad de proyección, Tufte). aspect = 1/cos(lat).
try:
ax.set_aspect(float(aspect))
except (TypeError, ValueError):
pass
ax.set_xlabel(f"Longitud ({lon_col})", fontsize=8)
ax.set_ylabel(f"Latitud ({lat_col})", fontsize=8)
ax.tick_params(labelsize=7)
ax.grid(color="#e6e6e6", linewidth=0.5, zorder=0)
title = "Distribución geográfica de las coordenadas"
if n_shown is not None and n_total is not None and n_shown < n_total:
title += f"\n(mostrando {n_shown:,} de {n_total:,} puntos)".replace(",", ".")
ax.set_title(title, fontsize=10)
ax.legend(loc="best", fontsize=7, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders.
# --------------------------------------------------------------------------- #
def _intro_block(detected: dict, lat_col: str, lon_col: str) -> list:
conf = (detected or {}).get("confidence")
reason = model._safe_str((detected or {}).get("reason"))
conf_txt = ""
if conf is not None:
try:
conf_txt = f" (confianza {float(conf) * 100:.0f}%)"
except (TypeError, ValueError):
conf_txt = ""
text = (
"Este dataset contiene **coordenadas geográficas**: se identificó el par "
f"**latitud = «{lat_col}»** y **longitud = «{lon_col}»**{conf_txt}. La "
"detección combina el nombre de la columna y el rango de sus valores "
"(latitud en [90, 90], longitud en [180, 180])."
)
if reason:
text += f"\n\n*Criterio de detección:* {reason}."
return [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=text)]
def _extent_blocks(extent: dict) -> list:
"""KVTable with bbox/centroid/span + DataTable with the per-region counts."""
if not _is_dict(extent) or not extent.get("n_points"):
return []
blocks = []
bbox = extent.get("bbox") if _is_dict(extent.get("bbox")) else {}
centroid = extent.get("centroid") if _is_dict(extent.get("centroid")) else {}
hemi = extent.get("hemisphere") if _is_dict(extent.get("hemisphere")) else {}
rows = [("Puntos con coordenadas", _fmt_num(extent.get("n_points")))]
if bbox:
rows.append(("Latitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lat_min'))} a "
f"{_fmt_coord(bbox.get('lat_max'))}"))
rows.append(("Longitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lon_min'))} a "
f"{_fmt_coord(bbox.get('lon_max'))}"))
if centroid:
rows.append(("Centroide",
f"{_fmt_coord(centroid.get('lat'))}, "
f"{_fmt_coord(centroid.get('lon'))}"))
if extent.get("span_km") is not None:
rows.append(("Extensión (diagonal)", _fmt_km(extent.get("span_km"))))
if hemi:
n, s = hemi.get("north"), hemi.get("south")
e, w = hemi.get("east"), hemi.get("west")
rows.append(("Hemisferios",
f"N {_fmt_num(n)} / S {_fmt_num(s)} · "
f"E {_fmt_num(e)} / O {_fmt_num(w)}"))
blocks.append(model.KVTable(rows=rows, title="Extensión geográfica"))
by_region = extent.get("by_region")
if isinstance(by_region, list) and by_region:
total = sum(r.get("count", 0) for r in by_region if _is_dict(r)) or 0
rrows = []
for r in by_region:
if not _is_dict(r):
continue
cnt = r.get("count", 0)
pct = (cnt / total) if total else None
pct_txt = f"{pct * 100:.1f}%" if pct is not None else ""
rrows.append([model._safe_str(r.get("region")), _fmt_num(cnt),
pct_txt])
if rrows:
blocks.append(model.DataTable(
header=["Zona / país", "Puntos", "% del total"], rows=rrows,
title="Distribución por zona",
note="Asignación aproximada por bounding box de cada región "
"(no es reverse-geocoding exacto de fronteras)."))
return blocks
def _narrative_block(profile: dict, ctx: dict, extent: dict) -> list:
"""A one-line narrative of where the points concentrate.
Uses the derived ``note`` from analyze_geo_extent by default; optionally
calls an LLM (ctx['run_geo_llm']) for a richer one-liner.
"""
note = model._safe_str((extent or {}).get("note"))
if ctx.get("run_geo_llm"):
by_region = (extent or {}).get("by_region") or []
bbox = (extent or {}).get("bbox") or {}
try:
from core.ask_llm import ask_llm
prompt = (
"Eres un analista de datos. En UNA frase en español, describe "
"dónde se concentran geográficamente estos puntos. Sé concreto "
"y no inventes precisión que los datos no tienen.\n"
f"Conteo por zona: {by_region}\nBounding box: {bbox}."
)
out = ask_llm(prompt,
model=ctx.get("geo_llm_model",
"claude-haiku-4-5-20251001"),
echo=False)
if out and isinstance(out, str) and out.strip():
note = out.strip()
except Exception: # noqa: BLE001 — degrade to the derived note.
pass
if not note:
return []
return [model.Markdown(text=f"**Interpretación.** {note}")]
def _no_points_block(profile: dict, detected: dict) -> list:
"""Degrade honestly when the raw coordinate arrays are not available."""
blocks = []
bbox = _bbox_from_profile(profile, detected)
if bbox:
rows = [
("Latitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lat_min'))} a "
f"{_fmt_coord(bbox.get('lat_max'))}"),
("Longitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lon_min'))} a "
f"{_fmt_coord(bbox.get('lon_max'))}"),
]
blocks.append(model.KVTable(
rows=rows, title="Extensión geográfica (aproximada)"))
blocks.append(model.Note(
"No se incluyeron las coordenadas crudas en el contexto, por lo que el "
"mapa y el análisis por zona no se han dibujado. El bounding box "
"mostrado se deriva de los mínimos y máximos por columna. Para el "
"scatter geográfico completo, pasa los arrays en "
"ctx['geo_points'] = {'lats': [...], 'lons': [...]} o las columnas en "
"ctx['raw_numeric']."))
return blocks
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_geospatial(profile: dict, ctx: dict):
"""Build the GEOSPATIAL Chapter, or None if the dataset has no coordinates.
Args:
profile: the ``eda`` group TableProfile dict.
ctx: presentation context; may carry ``geo_points``/``raw_numeric`` with
the raw coordinate arrays and the ``run_geo_llm`` flag.
Returns:
A ``model.Chapter`` with the geographic scatter + zone/country analysis,
or ``None`` when no latitude/longitude column pair is detected.
"""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
detected = _detect_columns(profile)
lats, lons, source = _resolve_coords(profile, ctx, detected)
has_detection = bool((detected or {}).get("lat_col") and
(detected or {}).get("lon_col"))
has_points = bool(lats and lons)
if not has_detection and not has_points:
return None # chapter does not apply: no coordinates in this dataset.
# Labels for axes / intro. When only raw arrays were given (no detection),
# fall back to generic names.
lat_col = (detected or {}).get("lat_col") or "lat"
lon_col = (detected or {}).get("lon_col") or "lon"
blocks = _intro_block(detected, lat_col, lon_col)
if has_points:
clean_lats = _clean_floats(lats)
clean_lons = _clean_floats(lons)
# Zone / country analysis.
extent = {}
if analyze_geo_extent is not None:
try:
extent = analyze_geo_extent(clean_lats, clean_lons) or {}
except Exception: # noqa: BLE001
extent = {}
# The geographic scatter figure (its own page/slide).
scatter = {}
if build_geo_scatter is not None:
try:
scatter = build_geo_scatter(clean_lats, clean_lons) or {}
except Exception: # noqa: BLE001
scatter = {}
maker = _make_geo_scatter(scatter, lat_col, lon_col) if scatter else None
if maker is not None:
blocks.append(model.Figure(
make=maker,
caption="Cada punto es una observación situada por sus "
"coordenadas; el recuadro rojo es el bounding box. La "
"escala respeta la latitud (proyección equirectangular)."))
else:
blocks.append(model.Note(
"No se pudo construir el scatter geográfico a partir de las "
"coordenadas proporcionadas."))
blocks += _extent_blocks(extent)
blocks += _narrative_block(profile, ctx, extent)
else:
# Columns detected but no raw points available — degrade honestly.
blocks += _no_points_block(profile, detected)
if not blocks:
return None
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,245 @@
"""Tests for the GEOSPATIAL chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic TableProfiles (no DuckDB) so the suite is fast
and deterministic. The raw coordinate arrays are passed through ``ctx`` exactly
as the chapter's contract documents (``ctx['geo_points']`` / ``ctx['raw_numeric']``).
Verifies that the chapter detects the lat/lon pair, draws the geographic scatter
figure, analyses the zone/country (bounding box + per-region counts), returns
None when there are no coordinates, degrades honestly when the raw points are
absent, and that a profile with long column names + many points + several
regions renders to PDF and PPTX without cutting any text (long content wraps, it
is never truncated).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.geospatial import (
build_geospatial,
CHAPTER_VERSION,
)
from datascience.automatic_eda import build_document, render_pdf, render_pptx
# --------------------------------------------------------------------------- #
# Synthetic data helpers
# --------------------------------------------------------------------------- #
def _grid(lat0: float, lon0: float, n: int, spread: float = 1.0):
"""A small deterministic cloud of n points around (lat0, lon0)."""
lats, lons = [], []
for i in range(n):
# deterministic pseudo-spread, no randomness.
f = (i % 11) / 11.0 - 0.5
g = (i % 7) / 7.0 - 0.5
lats.append(lat0 + f * spread)
lons.append(lon0 + g * spread)
return lats, lons
def _profile_with_coords(lat_name="lat", lon_name="lon", lats=None, lons=None):
"""A profile carrying a lat/lon column pair with valid ranges."""
lats = lats if lats is not None else [40.4, 41.0, 39.8, 40.1]
lons = lons if lons is not None else [-3.7, -3.6, -4.0, -3.9]
return {
"table": "lugares",
"columns": [
{"name": lat_name, "inferred_type": "numeric",
"numeric": {"min": min(lats), "max": max(lats),
"mean": sum(lats) / len(lats)}},
{"name": lon_name, "inferred_type": "numeric",
"numeric": {"min": min(lons), "max": max(lons),
"mean": sum(lons) / len(lons)}},
{"name": "valor", "inferred_type": "numeric",
"numeric": {"min": 0, "max": 100, "mean": 50}},
],
}
def _ctx_points(lats, lons):
return {"geo_points": {"lats": lats, "lons": lons}}
def _kinds(chapter):
return [getattr(b, "kind", None) for b in chapter.blocks]
def _tables(chapter):
return [b for b in chapter.blocks if getattr(b, "kind", None) == "data_table"]
def _figures(chapter):
return [b for b in chapter.blocks if getattr(b, "kind", None) == "figure"]
# --------------------------------------------------------------------------- #
# Golden
# --------------------------------------------------------------------------- #
def test_golden_estructura_y_version():
lats, lons = [40.4, 41.0, 39.8, 40.1], [-3.7, -3.6, -4.0, -3.9]
ch = build_geospatial(_profile_with_coords(lats=lats, lons=lons),
_ctx_points(lats, lons))
assert ch is not None
assert ch.id == "geospatial"
assert ch.version == CHAPTER_VERSION
kinds = _kinds(ch)
# intro heading + markdown + scatter figure + extent kv + per-region table.
assert "heading" in kinds
assert "markdown" in kinds
assert "figure" in kinds, "falta el scatter geográfico"
assert "kv_table" in kinds, "falta la tabla de extensión"
def test_golden_detecta_columnas_y_nombra_ejes():
lats, lons = _grid(40.4, -3.7, 30, spread=0.8)
prof = _profile_with_coords("latitude", "longitude", lats, lons)
ch = build_geospatial(prof, _ctx_points(lats, lons))
intro = [b for b in ch.blocks if b.kind == "markdown"][0].text
assert "latitude" in intro and "longitude" in intro
def test_golden_figura_es_perezosa_y_dibujable():
lats, lons = _grid(40.4, -3.7, 50, spread=0.6)
ch = build_geospatial(_profile_with_coords(lats=lats, lons=lons),
_ctx_points(lats, lons))
fig_block = _figures(ch)[0]
assert fig_block.make is not None and fig_block.fig is None # lazy
fig = fig_block.make() # must draw without raising
assert fig is not None
import matplotlib.pyplot as plt
plt.close(fig)
def test_golden_analisis_por_zona_espana():
lats, lons = _grid(40.4, -3.7, 40, spread=0.5) # Madrid area
ch = build_geospatial(_profile_with_coords(lats=lats, lons=lons),
_ctx_points(lats, lons))
tables = _tables(ch)
region_tbl = [t for t in tables if "zona" in (t.title or "").lower()]
assert region_tbl, "falta la tabla por zona/país"
flat = " ".join(" ".join(str(c) for c in r) for r in region_tbl[0].rows)
# Spain-area points must resolve to a Spain/European region, not empty.
assert region_tbl[0].rows
assert any(c for c in (region_tbl[0].rows[0]))
def test_golden_raw_numeric_source():
"""Coordinates can also come from ctx['raw_numeric'] keyed by detected cols."""
lats, lons = _grid(48.85, 2.35, 25, spread=0.4) # Paris area
prof = _profile_with_coords("lat", "lon", lats, lons)
ctx = {"raw_numeric": {"lat": lats, "lon": lons}}
ch = build_geospatial(prof, ctx)
assert ch is not None
assert _figures(ch), "el scatter debe construirse desde raw_numeric"
# --------------------------------------------------------------------------- #
# Edges
# --------------------------------------------------------------------------- #
def test_edge_sin_coordenadas_devuelve_none():
prof = {
"table": "ventas",
"columns": [
{"name": "precio", "inferred_type": "numeric",
"numeric": {"min": 0, "max": 1000}},
{"name": "categoria", "inferred_type": "text"},
],
}
assert build_geospatial(prof, {}) is None
def test_edge_none_y_vacio_no_rompen():
assert build_geospatial(None, None) is None
assert build_geospatial({}, {}) is None
assert build_geospatial({"columns": []}, {}) is None
assert build_geospatial("not a dict", {}) is None
def test_edge_nombre_lat_pero_rango_invalido_no_aplica():
"""A column named 'lat' whose values are out of [-90,90] is NOT a coordinate."""
prof = {
"table": "x",
"columns": [
{"name": "lat", "inferred_type": "numeric",
"numeric": {"min": 1000, "max": 9999}},
{"name": "lon", "inferred_type": "numeric",
"numeric": {"min": 1000, "max": 9999}},
],
}
assert build_geospatial(prof, {}) is None
def test_edge_columnas_detectadas_sin_puntos_degrada():
"""Detected lat/lon but no raw arrays -> honest note + approx bbox, no crash."""
prof = _profile_with_coords(lats=[40.0, 41.0], lons=[-3.0, -4.0])
ch = build_geospatial(prof, {}) # no geo_points / raw_numeric
assert ch is not None
assert not _figures(ch), "sin puntos no debe dibujarse el scatter"
notes = [b for b in ch.blocks if b.kind == "note"]
assert notes and "coordenadas crudas" in notes[0].text
def test_edge_coordenadas_con_nan_se_filtran():
lats = [40.4, float("nan"), 41.0, None, 39.8]
lons = [-3.7, -3.6, float("nan"), -3.9, -4.0]
ch = build_geospatial(_profile_with_coords(lats=[39.8, 41.0],
lons=[-4.0, -3.6]),
_ctx_points(lats, lons))
assert ch is not None # must not raise on NaN/None
# --------------------------------------------------------------------------- #
# Anti-cut: long names + many points + several regions render without truncation
# --------------------------------------------------------------------------- #
def _multiregion_points(per: int = 700):
"""Points spread across Spain, France and the USA to fill the region table."""
lats, lons = [], []
for (la, lo) in ((40.4, -3.7), (48.85, 2.35), (39.0, -98.0)):
gl, gn = _grid(la, lo, per, spread=2.0)
lats += gl
lons += gn
return lats, lons
def test_anticut_pdf_y_pptx_no_truncan():
lat_name = "latitud_geografica_del_punto_de_observacion_registrado"
lon_name = "longitud_geografica_del_punto_de_observacion_registrado"
lats, lons = _multiregion_points(700)
prof = _profile_with_coords(lat_name, lon_name, lats, lons)
ctx = {"geo_points": {"lats": lats, "lons": lons}}
full = build_document(prof, ctx)
assert any(c.id == "geospatial" for c in full)
chapters = [c for c in full if c.id == "geospatial"]
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "g.pdf")
pptx = os.path.join(d, "g.pptx")
rp = render_pdf(chapters, pdf, {"title": "EDA"})
rx = render_pptx(chapters, pptx, {"title": "EDA"})
assert os.path.exists(pdf) and os.path.exists(pptx)
assert (rp or {}).get("n_pages", 0) >= 1
# PDF: the long lat column name survives whole (wraps, not cut) and there
# is no truncation marker in this chapter.
pdf_txt = "".join((pg.extract_text() or "") for pg in PdfReader(pdf).pages)
assert "" not in pdf_txt and "..." not in pdf_txt
norm = re.sub(r"\s+", "", pdf_txt)
assert lat_name in norm, "el nombre largo de la columna se cortó en el PDF"
# PPTX: long name present in some shape/cell, untruncated.
allt = []
for s in Presentation(pptx).slides:
for sh in s.shapes:
if sh.has_text_frame:
allt.append(sh.text_frame.text)
if sh.has_table:
for row in sh.table.rows:
for c in row.cells:
allt.append(c.text)
joined = re.sub(r"\s+", "", "\n".join(allt))
assert lat_name in joined, "el nombre largo de la columna se cortó en el PPTX"
@@ -0,0 +1,68 @@
---
name: build_geo_scatter
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def build_geo_scatter(lats: list, lons: list, max_points: int = 2000) -> dict"
description: "Prepara los datos de un scatter geografico en proyeccion equirectangular para el grupo eda. Empareja lats/lons por indice, descarta pares None/NaN/inf/bool o fuera de rango (lat en [-90,90], lon en [-180,180]) y aplica downsampling DETERMINISTA por paso fijo (pairs[::step]) cuando hay mas pares validos que max_points, para no saturar el PDF/PPTX en moviles. Devuelve los puntos en orden [lon, lat] listos para ax.scatter, el bbox, el aspect 1/cos(centroid_lat) clampado a [0.3,5.0] y un pad sugerido (~5% del rango con suelo minimo). Lectura defensiva; NUNCA lanza ni dibuja: el capitulo se encarga de matplotlib."
tags: [eda, geospatial, datascience, scatter, map, downsample, equirectangular, profiling]
params:
- name: lats
desc: "Lista (o tupla) de latitudes en grados, paralela a lons. Se empareja por indice. Un valor None, NaN, infinito, bool o fuera de [-90,90] descarta ese par. Lectura defensiva."
- name: lons
desc: "Lista (o tupla) de longitudes en grados, paralela a lats. Un valor None, NaN, infinito, bool o fuera de [-180,180] descarta ese par."
- name: max_points
desc: "Tope de puntos a devolver (default 2000). Si los pares validos superan el tope, se hace downsampling determinista por paso fijo step=ceil(n_total/max_points) tomando pairs[::step] (NO aleatorio, reproducible). Un valor no entero o <=0 desactiva el downsampling."
output: "Dict listo para dibujar: {points: [[lon, lat], ...] en orden x=lon/y=lat para ax.scatter; n_total: pares validos antes del downsample (int); n_shown: puntos devueltos tras el downsample (int); downsampled: bool (n_shown<n_total); bbox: {lat_min, lat_max, lon_min, lon_max} o None si no hay puntos; aspect: 1/cos(centroid_lat) clampado a [0.3,5.0] para no estirar la proyeccion equirectangular; pad: {lon, lat} ~5% del rango respectivo con suelo minimo 0.01 grados}. Si no hay pares validos: points=[], n_total=0, n_shown=0, downsampled=False, bbox=None, aspect=1.0, pad={lon:0.0, lat:0.0}."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: true
tests: ["test_geo_scatter_nube_espana", "test_downsampling_determinista_y_reproducible", "test_listas_vacias_no_lanza", "test_un_solo_punto_pad_minimo_y_aspect_finito", "test_filtra_none_nan_y_fuera_de_rango", "test_latitud_alta_aspect_clamped"]
test_file_path: "python/functions/datascience/build_geo_scatter_test.py"
file_path: "python/functions/datascience/build_geo_scatter.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.build_geo_scatter import build_geo_scatter
# Nube de coordenadas (lat, lon) alrededor de Madrid:
lats = [40.0, 41.0, 39.0, 40.5]
lons = [-3.7, -3.0, -4.0, -3.5]
geo = build_geo_scatter(lats, lons, max_points=2000)
print(geo["points"][0]) # [-3.7, 40.0] -> orden [x=lon, y=lat]
print(geo["bbox"]) # {'lat_min': 39.0, 'lat_max': 41.0, 'lon_min': -4.0, 'lon_max': -3.0}
print(round(geo["aspect"], 3)) # 1.308 -> ensancha el eje x en latitudes medias
print(geo["pad"]) # {'lon': 0.05, 'lat': 0.1} -> margen ~5%
# El capitulo dibuja con matplotlib (esta funcion NO dibuja):
# xs = [p[0] for p in geo["points"]]; ys = [p[1] for p in geo["points"]]
# ax.scatter(xs, ys); ax.set_aspect(geo["aspect"])
# ax.set_xlim(geo["bbox"]["lon_min"] - geo["pad"]["lon"], geo["bbox"]["lon_max"] + geo["pad"]["lon"])
# ax.set_ylim(geo["bbox"]["lat_min"] - geo["pad"]["lat"], geo["bbox"]["lat_max"] + geo["pad"]["lat"])
```
## Cuando usarla
- Usala antes de dibujar un scatter geografico (mapa de puntos en proyeccion equirectangular) en el capitulo geospatial de `AutomaticEDA`: limpia los pares de coordenadas, los reduce a un tamano razonable para el PDF/PPTX y te da bbox, aspect y pad listos para fijar los ejes.
- Cuando tengas dos columnas de lat/lon ya extraidas y quieras un punto de entrada determinista (mismo dataset -> mismo dibujo) que no sature el documento en moviles.
- Cuando necesites el aspect correcto para que un grado de longitud no se vea estirado respecto a uno de latitud (integridad visual, Tufte) sin calcularlo a mano.
## Gotchas
- Funcion pura, sin I/O y determinista. NO dibuja: solo PREPARA los datos; el capitulo se encarga de matplotlib. Lectura defensiva: pares con None/NaN/inf/bool o coordenadas fuera de rango se descartan en silencio y NUNCA lanza.
- El downsampling es DETERMINISTA por paso fijo (`step = ceil(n_total / max_points)`, `pairs[::step]`), NO aleatorio: la misma entrada produce siempre la misma salida (reproducible en tests). El primer punto mostrado es siempre el primer par valido. No es un muestreo uniforme aleatorio — es un barrido regular del orden de entrada.
- `points` va en orden `[lon, lat]` (x, y), no `[lat, lon]`: pasalo directo a `ax.scatter(xs, ys)` sin invertir. Confundir el orden espeja el mapa.
- `aspect = 1/cos(centroid_lat)` se clampa a `[0.3, 5.0]`. En latitudes altas `cos -> 0` y el valor real explota: por encima de ~78 grados el aspect queda fijado en 5.0. Si el centroide cae justo en un polo (`+-90`) se usa el clamp en vez de dividir por cero.
- `pad` es ~5% del rango de cada eje con un suelo minimo de `0.01` grados: con un solo punto o todos iguales (rango 0) el pad cae al suelo para que el punto no quede en una linea. En el caso sin puntos validos el pad es `{lon:0.0, lat:0.0}` y `bbox` es `None`.
- `bbox`, `aspect` y `pad` se calculan sobre los puntos YA mostrados (tras el downsample), de modo que los ejes encajan exactamente con lo que se dibuja.
@@ -0,0 +1,153 @@
"""build_geo_scatter — prepare points for a geographic scatter (EDA `geospatial`).
Pure function: no I/O, deterministic. Takes two parallel lists of latitudes and
longitudes and returns the data a caller needs to draw a geographic scatter in an
equirectangular projection: cleaned points in [lon, lat] order, a bounding box, a
projection aspect ratio and a suggested axis padding.
It NEVER draws anything (no matplotlib) — the chapter that consumes this output is
responsible for the rendering. Reading is defensive throughout and the function
NEVER raises: malformed pairs (None, NaN, infinity or out-of-range coordinates)
are silently dropped and an empty/valid result is always returned.
To keep the rendered PDF/PPTX light on phones, when the number of valid pairs
exceeds `max_points` the points are down-sampled DETERMINISTICALLY by a fixed
step (`pairs[::step]`), never randomly, so the result is reproducible.
"""
import math
# Minimum axis padding (in degrees) so a single point or a zero-range cloud is
# never drawn glued to the axis border (it would collapse to a line).
_MIN_PAD = 0.01
# Aspect ratio clamp. 1/cos(lat) blows up near the poles; clamp keeps the render
# sane (Tufte: do not let the projection stretch the cloud out of proportion).
_ASPECT_MIN = 0.3
_ASPECT_MAX = 5.0
def _coord(value):
"""Coerce to a finite float defensively; return None for invalid coordinates.
bool is a subclass of int, but a real latitude/longitude is never a bool, so
True/False are treated as missing instead of coercing to 1.0/0.0. NaN and
+/-infinity are never valid coordinates either.
"""
if value is None or isinstance(value, bool):
return None
try:
coord = float(value)
except (TypeError, ValueError):
return None
if math.isnan(coord) or math.isinf(coord):
return None
return coord
def build_geo_scatter(lats: list, lons: list, max_points: int = 2000) -> dict:
"""Prepare the data for a geographic scatter in equirectangular projection.
Pairs `lats` and `lons` by index, drops invalid pairs, optionally
down-samples deterministically, and derives the geometry (bbox, aspect, pad)
a caller needs to draw the cloud. No raw rendering is performed.
Args:
lats: List (or tuple) of latitudes in degrees. Paired by index with
`lons`. A value that is None, NaN, infinite, bool or outside
[-90, 90] discards that pair. Read defensively.
lons: List (or tuple) of longitudes in degrees, parallel to `lats`. A
value outside [-180, 180] (or None/NaN/inf/bool) discards that pair.
max_points: Cap on the number of points returned. When the number of
valid pairs exceeds this cap, the points are down-sampled by a fixed
step `ceil(n_total / max_points)` taking `pairs[::step]` — DETERMINISTIC,
not random, so the output is reproducible. A non-positive or non-int
value disables down-sampling.
Returns:
Dict ready for a caller's ax.scatter:
{points: [[lon, lat], ...] (x=lon, y=lat order), n_total: valid pairs
before down-sampling, n_shown: points returned, downsampled: bool,
bbox: {lat_min, lat_max, lon_min, lon_max} or None, aspect: 1/cos(centroid
lat) clamped to [0.3, 5.0], pad: {lon, lat} ~5% of each range with a small
floor}. When there are no valid pairs returns points=[], n_total=0,
n_shown=0, downsampled=False, bbox=None, aspect=1.0, pad={lon:0.0, lat:0.0}.
"""
pairs = [] # each item is (lon, lat) — already in [x, y] order
if isinstance(lats, (list, tuple)) and isinstance(lons, (list, tuple)):
n = min(len(lats), len(lons))
for i in range(n):
lat = _coord(lats[i])
lon = _coord(lons[i])
if lat is None or lon is None:
continue
if lat < -90.0 or lat > 90.0:
continue
if lon < -180.0 or lon > 180.0:
continue
pairs.append((lon, lat))
n_total = len(pairs)
if n_total == 0:
return {
"points": [],
"n_total": 0,
"n_shown": 0,
"downsampled": False,
"bbox": None,
"aspect": 1.0,
"pad": {"lon": 0.0, "lat": 0.0},
}
# Deterministic down-sampling by a fixed step. Reproducible: same input ->
# same output, no randomness.
if (
isinstance(max_points, int)
and not isinstance(max_points, bool)
and max_points > 0
and n_total > max_points
):
step = math.ceil(n_total / max_points)
sampled = pairs[::step]
else:
sampled = pairs
points = [[lon, lat] for (lon, lat) in sampled]
n_shown = len(points)
downsampled = n_shown < n_total
lons_s = [p[0] for p in sampled]
lats_s = [p[1] for p in sampled]
lon_min, lon_max = min(lons_s), max(lons_s)
lat_min, lat_max = min(lats_s), max(lats_s)
bbox = {
"lat_min": lat_min,
"lat_max": lat_max,
"lon_min": lon_min,
"lon_max": lon_max,
}
# Aspect for an equirectangular projection: stretch the x axis by 1/cos(lat)
# at the cloud centroid so a degree of longitude reads at its real width.
centroid_lat = sum(lats_s) / len(lats_s)
cos_lat = math.cos(math.radians(centroid_lat))
if cos_lat < 1e-12: # centroid at (or numerically at) a pole
aspect = _ASPECT_MAX
else:
aspect = 1.0 / cos_lat
aspect = max(_ASPECT_MIN, min(_ASPECT_MAX, aspect))
# Padding ~5% of each range, with a small floor so a zero-range cloud (single
# point / all identical) still gets a non-zero margin.
pad_lon = max(0.05 * (lon_max - lon_min), _MIN_PAD)
pad_lat = max(0.05 * (lat_max - lat_min), _MIN_PAD)
return {
"points": points,
"n_total": n_total,
"n_shown": n_shown,
"downsampled": downsampled,
"bbox": bbox,
"aspect": aspect,
"pad": {"lon": pad_lon, "lat": pad_lat},
}
@@ -0,0 +1,140 @@
"""Tests para build_geo_scatter."""
import math
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from build_geo_scatter import build_geo_scatter
# Keys that a non-empty result dict must always contain.
_EXPECTED_KEYS = {
"points", "n_total", "n_shown", "downsampled", "bbox", "aspect", "pad",
}
def test_geo_scatter_nube_espana():
"""Golden: nube en Espana -> points en orden [lon, lat], bbox, aspect>1, pad 5%."""
# Cuatro puntos alrededor de Madrid (lat ~40, lon negativo).
lats = [40.0, 41.0, 39.0, 40.5]
lons = [-3.7, -3.0, -4.0, -3.5]
r = build_geo_scatter(lats, lons)
assert set(r.keys()) == _EXPECTED_KEYS
# points en orden [x=lon, y=lat]: primer elemento lon (negativo), segundo lat (~40).
assert r["points"] == [[-3.7, 40.0], [-3.0, 41.0], [-4.0, 39.0], [-3.5, 40.5]]
for lon, lat in r["points"]:
assert lon < 0.0 # longitudes de Espana son negativas
assert 36.0 < lat < 44.0 # latitudes peninsulares
# Sin downsampling: 4 < 2000.
assert r["n_total"] == 4
assert r["n_shown"] == 4
assert r["downsampled"] is False
# bbox correcto.
assert r["bbox"] == {
"lat_min": 39.0, "lat_max": 41.0,
"lon_min": -4.0, "lon_max": -3.0,
}
# aspect = 1/cos(centroid_lat); centroid = 40.125 -> ~1.31 > 1.
centroid_lat = (40.0 + 41.0 + 39.0 + 40.5) / 4.0
expected_aspect = 1.0 / math.cos(math.radians(centroid_lat))
assert r["aspect"] > 1.0
assert abs(r["aspect"] - expected_aspect) < 1e-9
assert abs(r["aspect"] - 1.305) < 0.02 # cos(40) ~ 0.77
# pad 5% del rango (lon_range=1.0 -> 0.05 ; lat_range=2.0 -> 0.1).
assert abs(r["pad"]["lon"] - 0.05) < 1e-9
assert abs(r["pad"]["lat"] - 0.10) < 1e-9
def test_downsampling_determinista_y_reproducible():
"""Golden: 5000 puntos, max_points=2000 -> n_shown<=2000, downsampled, reproducible."""
lats = [40.0 + (i % 100) * 0.01 for i in range(5000)]
lons = [-3.0 - (i % 100) * 0.01 for i in range(5000)]
r1 = build_geo_scatter(lats, lons, max_points=2000)
assert r1["n_total"] == 5000
assert r1["n_shown"] <= 2000
assert r1["downsampled"] is True
# step = ceil(5000/2000) = 3 -> len(pairs[::3]) = 1667.
assert r1["n_shown"] == 1667
# Determinista: dos llamadas con la misma entrada dan exactamente lo mismo.
r2 = build_geo_scatter(lats, lons, max_points=2000)
assert r1 == r2
assert r1["points"] == r2["points"]
# El primer punto del downsample es el primer par valido (step parte de 0).
assert r1["points"][0] == [lons[0], lats[0]]
def test_listas_vacias_no_lanza():
"""Edge: listas vacias / None -> points [] sin lanzar."""
r = build_geo_scatter([], [])
assert r["points"] == []
assert r["n_total"] == 0
assert r["n_shown"] == 0
assert r["downsampled"] is False
assert r["bbox"] is None
assert r["aspect"] == 1.0
assert r["pad"] == {"lon": 0.0, "lat": 0.0}
# None como entrada tampoco lanza.
assert build_geo_scatter(None, None)["points"] == []
assert build_geo_scatter([40.0], None)["n_total"] == 0
assert build_geo_scatter(None, [-3.0])["n_total"] == 0
def test_un_solo_punto_pad_minimo_y_aspect_finito():
"""Edge: un solo punto -> pad minimo no cero, bbox degenerado, aspect finito."""
r = build_geo_scatter([40.0], [-3.7])
assert r["n_total"] == 1
assert r["n_shown"] == 1
assert r["points"] == [[-3.7, 40.0]]
assert r["downsampled"] is False
assert r["bbox"] == {
"lat_min": 40.0, "lat_max": 40.0,
"lon_min": -3.7, "lon_max": -3.7,
}
# rango 0 -> pad cae al floor minimo (no cero).
assert r["pad"]["lon"] == 0.01
assert r["pad"]["lat"] == 0.01
# aspect finito y dentro del clamp.
assert math.isfinite(r["aspect"])
assert 0.3 <= r["aspect"] <= 5.0
def test_filtra_none_nan_y_fuera_de_rango():
"""Edge: pares con None/NaN/fuera de rango se descartan por indice."""
nan = float("nan")
inf = float("inf")
# i=0 i=1 i=2 i=3 i=4 i=5 i=6
lats = [40.0, None, nan, 200.0, 41.0, 39.0, inf]
lons = [-3.0, -3.5, -3.6, -3.7, 999.0, -4.0, -2.0]
r = build_geo_scatter(lats, lons)
# Validos solo i=0 (40,-3.0) e i=5 (39,-4.0):
# i=1 lat None, i=2 lat NaN, i=3 lat 200 fuera de rango,
# i=4 lon 999 fuera de rango, i=6 lat inf.
assert r["n_total"] == 2
assert r["points"] == [[-3.0, 40.0], [-4.0, 39.0]]
assert r["bbox"] == {
"lat_min": 39.0, "lat_max": 40.0,
"lon_min": -4.0, "lon_max": -3.0,
}
def test_latitud_alta_aspect_clamped():
"""Edge: latitudes ~85 -> aspect clamped <= 5.0."""
r = build_geo_scatter([85.0, 85.0, 84.0], [10.0, 11.0, 9.0])
# cos(~84.7) ~ 0.093 -> 1/0.093 ~ 10.7 -> clamp a 5.0.
assert r["aspect"] <= 5.0
assert r["aspect"] == 5.0
assert math.isfinite(r["aspect"])
@@ -0,0 +1,67 @@
---
name: detect_latlon_columns
id: detect_latlon_columns_py_datascience
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def detect_latlon_columns(columns: list, samples: dict | None = None) -> dict"
description: "Detecta un par (latitud, longitud) entre las columnas de un TableProfile del grupo eda combinando heuristica de nombre (latitude/longitude/lat/lon/lng + x/y debiles) con validacion de rango obligatoria (latitud en [-90,90], longitud en [-180,180]). Lee defensivamente con .get; NUNCA lanza. Usa el sub-bloque numeric.min/max o, si falta, la lista de samples opcional. Devuelve SIEMPRE un dict {lat_col, lon_col, confidence, reason}; si no hay par valido, las columnas van a None y confidence a 0.0."
tags: [eda, geospatial, profiling, latlon, coordinates, detection, datascience]
params:
- name: columns
desc: "Lista de dicts ColumnProfile (el campo `columns` de un TableProfile del grupo eda). Cada dict se lee con .get; solo `name` (str) es obligatorio. Se consultan `inferred_type` (p.ej. 'numeric') y el sub-dict `numeric` con `min`/`max` (floats) para validar el rango. Entradas no-dict o sin name se ignoran sin lanzar."
- name: samples
desc: "Opcional {nombre_columna: [valores...]} para validar el rango cuando una columna no trae numeric.min/max. Los valores nulos se ignoran; si algun valor no nulo no es numerico la columna no se considera coordenada. Si es None u omitido, solo se usa el bloque numeric."
output: "Dict SIEMPRE presente con la forma {lat_col: str|None, lon_col: str|None, confidence: float en [0,1], reason: str en espanol}. En exito, lat_col y lon_col nombran columnas distintas; confidence ~1.0 para par con nombre fuerte (latitude/longitude/lat/lon/lng) + rango valido y ~0.7 para par debil (x/y) + rango. En fallo, ambas columnas None, confidence 0.0 y reason explica por que (sin columnas, nombre sin match, rango fuera de bounds, falta uno de los dos ejes...)."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: true
tests: ["test_par_latitude_longitude_fuerte", "test_par_lat_lon_abreviado", "test_par_x_y_debil_con_rango_valido", "test_nombre_lat_lon_pero_rango_fuera_no_detecta", "test_par_fuerte_prevalece_sobre_debil", "test_entradas_vacias_o_invalidas_no_lanzan", "test_solo_latitud_sin_longitud_no_detecta", "test_deteccion_por_samples_cuando_falta_numeric", "test_samples_fuera_de_rango_descarta"]
test_file_path: "python/functions/datascience/detect_latlon_columns_test.py"
file_path: "python/functions/datascience/detect_latlon_columns.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.detect_latlon_columns import detect_latlon_columns
# Columnas tal y como vienen en profile['columns'] de un TableProfile del grupo eda:
columns = [
{"name": "id", "inferred_type": "numeric", "numeric": {"min": 1, "max": 9999}},
{"name": "latitude", "inferred_type": "numeric", "numeric": {"min": -45.0, "max": 45.0}},
{"name": "longitude", "inferred_type": "numeric", "numeric": {"min": -120.0, "max": 120.0}},
]
res = detect_latlon_columns(columns)
print(res["lat_col"], res["lon_col"], res["confidence"])
# latitude longitude 1.0
# Sin bloque numeric, validando el rango con samples:
cols2 = [{"name": "lat"}, {"name": "lon"}]
samples = {"lat": [10.5, 20.0, 30.25], "lon": [-40.0, 50.5, 60.0]}
print(detect_latlon_columns(cols2, samples)["lat_col"]) # lat
```
## Cuando usarla
- Usala al perfilar una tabla en `AutomaticEDA` para decidir si tiene geometria de puntos: cuando `detect_latlon_columns` devuelve un par con `confidence` alta, el capitulo geospatial puede dibujar un mapa, calcular un bounding box o proponer un cluster espacial.
- Antes de un analisis geoespacial (alpha shape, convex hull, joins por proximidad) para localizar automaticamente que columnas son la latitud y la longitud sin pedirlo al usuario.
- Cuando recibas un `TableProfile` del grupo `eda` y quieras enrutar columnas a sub-analisis por tipo semantico: este es el detector del par lat/lon, complementario a `infer_semantic_type`.
## Gotchas
- Funcion pura, sin I/O y determinista. Lectura defensiva con `.get`: NUNCA lanza. Cualquier input malformado (None, no-lista, entradas no-dict, claves ausentes) devuelve el dict de fallo con `lat_col`/`lon_col` en None y `confidence` 0.0.
- **El nombre solo no basta**: una columna `latitude` cuyo rango se sale de `[-90, 90]` se descarta (no es coordenada real). Igual para `longitude` fuera de `[-180, 180]`. La validacion de rango es obligatoria.
- El rango de latitud `[-90, 90]` es un subconjunto del de longitud `[-180, 180]`, por eso el nombre es necesario para desambiguar cual eje es cual; una columna numerica en `[-90, 90]` sin nombre que sugiera lat/lon no se detecta.
- Los nombres genericos `x`/`y` (y `x_coord`/`y_coord`) son candidatos **debiles**: solo forman par si el rango encaja y existe la otra mitad (un `x`/`lon` para la `y`, un `y`/`lat` para la `x`). Un `y` suelto sin pareja devuelve None.
- Requiere AMBOS ejes para considerar exito. Si solo encuentra latitud o solo longitud, devuelve el dict de fallo (no media coordenada).
- `samples` solo se consulta cuando falta `numeric.min`/`numeric.max`. Si una columna trae el bloque numeric, ese manda aunque pases samples para ella.
- El matching de nombre es por subcadena normalizada (se quitan `_`, `-` y espacios), asi que nombres como `plate` (contiene "lat") podrian marcarse como candidatos por nombre — pero solo pasarian si su rango cae en `[-90, 90]` y hay una longitud pareja, filtro que en la practica descarta los falsos positivos.
@@ -0,0 +1,198 @@
"""detect_latlon_columns — detect a (latitude, longitude) column pair in an EDA profile.
Pure function: no I/O, deterministic. Takes the `columns` list of a TableProfile
(group `eda`) and decides whether two of its columns form a geographic coordinate
pair (latitude + longitude), combining a name heuristic with a value-range check.
The detection is intentionally conservative: a name hint alone is never enough. A
column is only accepted as latitude/longitude if its numeric range fits inside the
valid coordinate bounds ([-90, 90] for latitude, [-180, 180] for longitude). When
the `numeric` sub-block is absent the optional `samples` argument is used instead.
Reading is fully defensive (.get throughout) and the function NEVER raises: any
malformed input (None, non-list, non-dict entries, missing keys) simply yields a
no-pair result {"lat_col": None, "lon_col": None, "confidence": 0.0, "reason": ...}.
"""
import re
# Collapse the separators a column name may use (snake_case, kebab-case, spaces)
# so that "y_coord", "y-coord" and "y coord" all normalize to the same token.
_SEP_RE = re.compile(r"[\s_\-]+")
# Name-match strengths: a strong, unambiguous coordinate name vs a weak generic
# axis name (x / y) that only counts when the range also fits and a partner exists.
_STRONG = 0.6
_WEAK = 0.3
_RANGE_BONUS = 0.4 # added once the mandatory range validation passes
def _normalize(name):
"""Lowercase a column name and strip separator chars (_, -, whitespace)."""
if not isinstance(name, str):
return ""
return _SEP_RE.sub("", name.strip().lower())
def _num(value):
"""Coerce to float defensively; return None for None/bool/non-numeric."""
# bool is a subclass of int; a coordinate value is never a real bool, so treat
# True/False as missing instead of silently coercing to 1.0/0.0.
if value is None or isinstance(value, bool):
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _lat_name_strength(nn):
"""Strength of a normalized name as a latitude candidate (0=no match)."""
if not nn:
return 0.0
# "lat", "latitude", "latitud" all contain the "lat" stem.
if "lat" in nn:
return _STRONG
# Weak generic axis name: only useful when paired with an x/lon partner.
if nn in ("y", "ycoord", "ycoordinate", "ycoordinates"):
return _WEAK
return 0.0
def _lon_name_strength(nn):
"""Strength of a normalized name as a longitude candidate (0=no match)."""
if not nn:
return 0.0
# "lon", "long", "longitude", "longitud" share the "lon" stem; "lng" is separate.
if "lon" in nn or "lng" in nn:
return _STRONG
if nn in ("x", "xcoord", "xcoordinate", "xcoordinates"):
return _WEAK
return 0.0
def _col_range(col, sample_values):
"""Return (min, max) floats for a column, or (None, None) if not numeric.
Prefers the `numeric` sub-block min/max (the output of describe_numeric); falls
back to the provided sample list. A column is only treated as numeric when both
extremes are derivable: from the numeric block, or from samples whose every
non-null value coerces to a number.
"""
if isinstance(col, dict):
numeric = col.get("numeric")
if isinstance(numeric, dict):
mn = _num(numeric.get("min"))
mx = _num(numeric.get("max"))
if mn is not None and mx is not None:
return mn, mx
# Fall back to samples when the numeric block is missing or incomplete.
if isinstance(sample_values, (list, tuple)):
non_null = [v for v in sample_values if v is not None]
if non_null:
coerced = [_num(v) for v in non_null]
# Any non-numeric sample means we cannot trust the column as numeric.
if all(c is not None for c in coerced):
return min(coerced), max(coerced)
return None, None
def _no_pair(reason):
"""Canonical empty result: no coordinate pair detected."""
return {"lat_col": None, "lon_col": None, "confidence": 0.0, "reason": reason}
def detect_latlon_columns(columns: list, samples: dict | None = None) -> dict:
"""Detect a (latitude, longitude) column pair from an eda TableProfile.
Combines a name heuristic (latitude/longitude/lat/lon/lng + weak x/y) with a
mandatory range validation: the chosen latitude must sit in [-90, 90] and the
longitude in [-180, 180]. A name hint whose range does not fit is discarded.
Both sides are required for success; if only one is found, no pair is returned.
Args:
columns: List of ColumnProfile dicts (the `columns` of a TableProfile).
Each dict is read defensively with .get; only `name` is required.
`numeric.min` / `numeric.max` (and optionally `inferred_type`) are used
for the range check when present.
samples: Optional {column_name: [values...]} used to validate the range
when a column lacks `numeric.min`/`numeric.max`. If None/omitted, only
the `numeric` sub-block is consulted.
Returns:
Always a dict {"lat_col": str|None, "lon_col": str|None,
"confidence": float, "reason": str}. On success lat_col and lon_col name
the detected pair (distinct columns) and confidence is in [0, 1]: a pair
validated by a strong name on both sides scores ~1.0, a weak x/y pair ~0.7.
On failure both columns are None and confidence is 0.0.
"""
if not isinstance(columns, (list, tuple)) or len(columns) == 0:
return _no_pair("sin columnas que inspeccionar")
sample_map = samples if isinstance(samples, dict) else {}
# (column_name, confidence) for each side. Confidence already includes the
# range bonus because membership in the list implies the range was validated.
lat_candidates = []
lon_candidates = []
for col in columns:
if not isinstance(col, dict):
continue
name = col.get("name")
if not isinstance(name, str) or not name:
continue
nn = _normalize(name)
lat_strength = _lat_name_strength(nn)
lon_strength = _lon_name_strength(nn)
if lat_strength == 0.0 and lon_strength == 0.0:
continue # name gives no coordinate hint; skip.
mn, mx = _col_range(col, sample_map.get(name))
is_numeric = mn is not None and mx is not None
if not is_numeric:
continue # range cannot be validated -> not a coordinate.
if lat_strength > 0.0 and mn >= -90.0 and mx <= 90.0:
lat_candidates.append((name, lat_strength + _RANGE_BONUS))
if lon_strength > 0.0 and mn >= -180.0 and mx <= 180.0:
lon_candidates.append((name, lon_strength + _RANGE_BONUS))
if not lat_candidates and not lon_candidates:
return _no_pair("ninguna columna sugiere latitud ni longitud por nombre+rango")
if not lat_candidates:
return _no_pair("no se encontro columna de latitud valida (nombre+rango en [-90,90])")
if not lon_candidates:
return _no_pair("no se encontro columna de longitud valida (nombre+rango en [-180,180])")
# Pick the distinct pair with the highest combined confidence. First match wins
# on ties to keep the result deterministic by input order.
best = None # (combined, lat_name, lon_name, lat_c, lon_c)
for lat_name, lat_c in lat_candidates:
for lon_name, lon_c in lon_candidates:
if lat_name == lon_name:
continue # a column cannot be both axes of the same pair.
combined = (lat_c + lon_c) / 2.0
if best is None or combined > best[0]:
best = (combined, lat_name, lon_name, lat_c, lon_c)
if best is None:
return _no_pair("solo una columna sirve para ambos ejes; no hay par lat/lon distinto")
combined, lat_name, lon_name, lat_c, lon_c = best
confidence = max(0.0, min(1.0, combined))
lat_label = "fuerte" if lat_c >= 0.9 else "debil"
lon_label = "fuerte" if lon_c >= 0.9 else "debil"
reason = (
f"par lat='{lat_name}' (nombre {lat_label}) / lon='{lon_name}' "
f"(nombre {lon_label}) con rango valido"
)
return {
"lat_col": lat_name,
"lon_col": lon_name,
"confidence": confidence,
"reason": reason,
}
@@ -0,0 +1,141 @@
"""Tests para detect_latlon_columns."""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from detect_latlon_columns import detect_latlon_columns
# Keys that every result dict (success or failure) must expose.
_EXPECTED_KEYS = {"lat_col", "lon_col", "confidence", "reason"}
def _col(name, mn=None, mx=None, inferred="numeric"):
"""Build a minimal ColumnProfile-like dict for the tests."""
col = {"name": name, "inferred_type": inferred}
if mn is not None or mx is not None:
col["numeric"] = {"min": mn, "max": mx}
return col
def test_par_latitude_longitude_fuerte():
"""Golden: nombres latitude/longitude con rango valido -> par con confianza alta."""
columns = [
_col("id", mn=1, mx=9999, inferred="numeric"),
_col("latitude", mn=-45.0, mx=45.0),
_col("longitude", mn=-120.0, mx=120.0),
]
res = detect_latlon_columns(columns)
assert set(res.keys()) == _EXPECTED_KEYS
assert res["lat_col"] == "latitude"
assert res["lon_col"] == "longitude"
# Nombre fuerte (0.6) + rango (0.4) en ambos lados -> 1.0.
assert abs(res["confidence"] - 1.0) < 1e-9
assert "rango valido" in res["reason"]
def test_par_lat_lon_abreviado():
"""Golden: nombres abreviados lat/lon tambien se detectan como fuertes."""
columns = [
_col("lat", mn=40.0, mx=43.0),
_col("lon", mn=-4.0, mx=-1.0),
_col("precio", mn=0.0, mx=500.0),
]
res = detect_latlon_columns(columns)
assert res["lat_col"] == "lat"
assert res["lon_col"] == "lon"
assert abs(res["confidence"] - 1.0) < 1e-9
def test_par_x_y_debil_con_rango_valido():
"""Edge: x/y genericos solo cuentan como par debil cuando el rango encaja."""
columns = [
_col("y_coord", mn=-10.0, mx=10.0), # debil latitud
_col("x_coord", mn=-150.0, mx=150.0), # debil longitud
]
res = detect_latlon_columns(columns)
assert res["lat_col"] == "y_coord"
assert res["lon_col"] == "x_coord"
# Nombre debil (0.3) + rango (0.4) -> 0.7 en ambos lados.
assert abs(res["confidence"] - 0.7) < 1e-9
def test_nombre_lat_lon_pero_rango_fuera_no_detecta():
"""Edge: nombre lat/lon con rango fuera de bounds -> NO es coordenada."""
columns = [
_col("latitude", mn=-200.0, mx=200.0), # fuera de [-90, 90]
_col("longitude", mn=-120.0, mx=120.0), # valido, pero sin par lat
]
res = detect_latlon_columns(columns)
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
assert isinstance(res["reason"], str) and res["reason"]
def test_par_fuerte_prevalece_sobre_debil():
"""Edge: con candidatos fuertes y debiles, gana el par de mayor confianza."""
columns = [
_col("latitude", mn=-45.0, mx=45.0), # fuerte lat
_col("y", mn=-30.0, mx=30.0), # debil lat
_col("longitude", mn=-120.0, mx=120.0), # fuerte lon
_col("x", mn=-100.0, mx=100.0), # debil lon
]
res = detect_latlon_columns(columns)
assert res["lat_col"] == "latitude"
assert res["lon_col"] == "longitude"
assert abs(res["confidence"] - 1.0) < 1e-9
def test_entradas_vacias_o_invalidas_no_lanzan():
"""Edge: sin columnas / vacio / no-lista / entradas no-dict -> dict None sin lanzar."""
for bad in ([], None, "no soy lista", 42, [1, 2, 3], [{}], [{"foo": "bar"}]):
res = detect_latlon_columns(bad)
assert set(res.keys()) == _EXPECTED_KEYS
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
assert isinstance(res["reason"], str)
def test_solo_latitud_sin_longitud_no_detecta():
"""Edge: solo hay latitud valida, falta la longitud -> sin par."""
columns = [
_col("latitude", mn=-45.0, mx=45.0),
_col("temperatura", mn=-5.0, mx=40.0),
]
res = detect_latlon_columns(columns)
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
def test_deteccion_por_samples_cuando_falta_numeric():
"""Edge: sin bloque numeric, el rango se valida con samples."""
columns = [
{"name": "lat"}, # sin numeric ni inferred_type
{"name": "lon"},
]
samples = {
"lat": [10.5, 20.0, None, 30.25], # todos dentro de [-90, 90]
"lon": [-40.0, 50.5, 60.0], # todos dentro de [-180, 180]
}
res = detect_latlon_columns(columns, samples)
assert res["lat_col"] == "lat"
assert res["lon_col"] == "lon"
assert abs(res["confidence"] - 1.0) < 1e-9
def test_samples_fuera_de_rango_descarta():
"""Edge: samples fuera de bounds invalidan la columna pese al nombre fuerte."""
columns = [{"name": "lat"}, {"name": "lon"}]
samples = {
"lat": [10.0, 95.0], # 95 > 90 -> latitud invalida
"lon": [-40.0, 50.0],
}
res = detect_latlon_columns(columns, samples)
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0