Compare commits

..

2 Commits

Author SHA1 Message Date
egutierrez 00cd5274bc feat(eda): capítulo GEOSPATIAL del AutomaticEDA (scatter geográfico + zona/país)
Capítulo nuevo chapters/geospatial.py (CHAPTER_VERSION 1.0.0). Cuando el dataset
tiene un par de coordenadas, dibuja un scatter geográfico en proyección
equirectangular (la escala respeta la latitud para no estirar la longitud) y
analiza la extensión: bounding box, centroide, span, conteo por zona/país,
hemisferios y una interpretación. Cuando NO hay coordenadas, build_geospatial
devuelve None y el capítulo se omite.

Sigue el contrato de capítulos (firma build_<id>(profile, ctx) -> Chapter|None,
lectura defensiva, nunca lanza) y el patrón de modelos/num_distr: delega el
cálculo a las primitivas puras del registry (detect_latlon_columns,
analyze_geo_extent, build_geo_scatter) y solo dibuja la figura matplotlib de
forma perezosa. Las coordenadas crudas llegan por ctx['geo_points'] o
ctx['raw_numeric'] (como modelos lee raw_numeric); sin ellas, degrada con un
bounding box aproximado de numeric.min/max y una nota honesta.

Anti-cortes: usa DataTable/KVTable/Figure/Markdown del modelo, que el paginador
parte sin cortar. Test self-contained con golden + 6 edges + anti-cut (nombres
largos + 2100 puntos en varias regiones renderizan a PDF y PPTX sin truncar).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:29:33 +02:00
egutierrez cd658cc703 feat(eda): primitivas geoespaciales del grupo eda (detección lat/lon + extensión + scatter)
Tres funciones puras nuevas del dominio datascience (tags eda + geospatial) que
sostienen el capítulo GEOSPATIAL del AutomaticEDA, delegadas a fn-constructor:

- detect_latlon_columns: identifica el par (lat, lon) por nombre de columna +
  rango de valores ([-90,90] / [-180,180]) desde profile['columns']. Devuelve
  {lat_col, lon_col, confidence, reason}. 9 tests.
- analyze_geo_extent: bbox, centroide, span haversine, conteo por zona/país
  (lookup offline con bounding boxes embebidos, KISS sin geopandas) y
  hemisferios. 7 tests.
- build_geo_scatter: prepara los puntos del scatter en orden [lon, lat] con
  downsampling determinista por paso fijo + aspect equirectangular 1/cos(lat)
  clampado. 6 tests.

Registradas en datascience/__init__.py. Todas pure, params_schema completo,
.md autosuficiente (Ejemplo + Cuando usarla + Gotchas).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:29:33 +02:00
26 changed files with 1891 additions and 2324 deletions
+6 -8
View File
@@ -44,6 +44,9 @@ from .trend_slope import trend_slope
from .run_eda_models import run_eda_models
from .project_clusters_2d import project_clusters_2d
from .describe_clusters_llm import describe_clusters_llm
from .detect_latlon_columns import detect_latlon_columns
from .analyze_geo_extent import analyze_geo_extent
from .build_geo_scatter import build_geo_scatter
from .eda_llm_insights import eda_llm_insights
from .build_eda_notebook import build_eda_notebook
from .decode_qr_image import decode_qr_image
@@ -57,16 +60,8 @@ from .exploratory_caveats import exploratory_caveats
from .render_eda_pdf import render_eda_pdf, render_eda_pdf_relational
from .render_automatic_eda_pdf import render_automatic_eda_pdf
from .render_automatic_eda_pptx import render_automatic_eda_pptx
from .detect_time_column import detect_time_column
from .extract_timeseries_raw import extract_timeseries_raw
from .profile_datetime import profile_datetime
from .resample_timeseries import resample_timeseries
__all__ = [
"detect_time_column",
"extract_timeseries_raw",
"profile_datetime",
"resample_timeseries",
"render_automatic_eda_pdf",
"render_automatic_eda_pptx",
"decode_qr_image",
@@ -98,6 +93,9 @@ __all__ = [
"run_eda_models",
"project_clusters_2d",
"describe_clusters_llm",
"detect_latlon_columns",
"analyze_geo_extent",
"build_geo_scatter",
"eda_llm_insights",
"build_eda_notebook",
"describe_numeric",
@@ -0,0 +1,61 @@
---
name: analyze_geo_extent
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def analyze_geo_extent(lats: list, lons: list) -> dict"
description: "Calcula la extension geografica de una nube de coordenadas (lat/lon) y asigna cada punto a un pais/region mediante un lookup OFFLINE contra una tabla de bounding boxes embebida como constante. Devuelve bounding box, centroide, span de la diagonal (haversine), conteo por region (top-8 + Otros), reparto por hemisferios y una frase resumen en ES. Lectura defensiva: descarta pares None/NaN/fuera de rango y NUNCA lanza. Solo stdlib (math); sin geopandas/shapely. Las cajas de paises son rectangulos aproximados, no reverse-geocoding exacto."
tags: [eda, geospatial, geo, coordinates, bounding-box, haversine, datascience]
params:
- name: lats
desc: "Lista de latitudes en grados, rango valido [-90, 90]. Se empareja por indice con lons (gana la longitud minima comun si difieren). Cada valor puede ser None/NaN/no-numerico/fuera de rango: se lee defensivo y se descarta el par."
- name: lons
desc: "Lista de longitudes en grados, rango valido [-180, 180]. Paralela a lats, emparejada por indice. Valores None/NaN/no-numericos/fuera de rango se descartan junto con su par."
output: "Dict con el resumen geografico: {n_points=pares validos usados, bbox={lat_min,lat_max,lon_min,lon_max} o None, centroid={lat,lon}=media de lat/lon validos o None, span_km=distancia haversine (radio 6371 km) de la diagonal SO->NE del bbox, by_region=[{region,count}] descendente por count limitado a top-8 con el resto agregado en 'Otros', hemisphere={north,south,east,west} (ecuador->norte, meridiano 0->este), note=frase ES resumen}. Si no hay pares validos devuelve la forma cero: n_points 0, bbox None, centroid None, span_km 0.0, by_region [], hemisphere a ceros y note 'sin coordenadas validas'. Puntos que no caen en ninguna caja -> region 'Oceano/Otros'."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: true
tests: ["test_nube_en_espana", "test_dos_paises_distintos", "test_listas_vacias", "test_pares_invalidos_filtrados", "test_longitudes_desbalanceadas", "test_span_km_haversine_par_conocido", "test_no_lanza_con_entradas_raras"]
test_file_path: "python/functions/datascience/analyze_geo_extent_test.py"
file_path: "python/functions/datascience/analyze_geo_extent.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.analyze_geo_extent import analyze_geo_extent
# Nube de puntos alrededor de Madrid + un punto en Paris.
lats = [40.4, 40.0, 41.0, 48.8]
lons = [-3.7, -3.5, -4.0, 2.3]
res = analyze_geo_extent(lats, lons)
print(res["n_points"]) # 4
print(res["by_region"]) # [{'region': 'España', 'count': 3}, {'region': 'Francia', 'count': 1}]
print(round(res["span_km"], 1)) # diagonal SO->NE del bbox en km
print(res["hemisphere"]) # {'north': 4, 'south': 0, 'east': 1, 'west': 3}
print(res["note"]) # los puntos se concentran en España (3 de 4)
```
## Cuando usarla
- Usala en el perfilado EDA (grupo `eda`) cuando una tabla tenga columnas de latitud y longitud y quieras un resumen geografico rapido: donde se concentran los puntos, cuanto territorio cubren y a que paises/regiones caen, sin montar geopandas ni un reverse-geocoder.
- Cuando necesites un capitulo `geospatial` del `AutomaticEDA`: alimenta el bbox + centroide para centrar un mapa, el `span_km` para elegir el zoom, y `by_region` para una tabla de conteos por pais.
- Cuando quieras detectar datos sucios de coordenadas (mezcla de hemisferios inesperada, puntos en `Oceano/Otros`, span enorme) antes de seguir el analisis.
## Gotchas
- Funcion pura, sin I/O ni red y determinista: mismas entradas -> misma salida. Lectura defensiva, NUNCA lanza; pares con None/NaN o fuera de rango ([-90,90] lat, [-180,180] lon) se descartan en silencio.
- El lookup de region es una **aproximacion rectangular**: cada pais/region es un bounding box, NO su frontera real. Un punto en el mar cerca de una costa, o en una esquina del rectangulo, puede asignarse a un pais vecino. No es reverse-geocoding exacto — para precision real hace falta un shapefile (fuera de scope por KISS).
- Cajas solapadas se resuelven por orden: gana la PRIMERA que contiene el punto. Los paises se listan antes que los continentes (fallback), y entre vecinos el mas estrecho/occidental va primero (Portugal antes que España, Chile antes que Argentina, EEUU contiguo antes que Canada). Un punto que no cae en ninguna caja -> `Oceano/Otros`.
- La tabla cubre ~24 paises grandes + 6 regiones continentales; paises pequeños o no listados caen a su continente o a `Oceano/Otros`. No incluye territorios insulares lejanos (Canarias, Hawaii, etc.).
- `span_km` es la diagonal del bounding box (esquina SO a NE), no la dispersion real de la nube ni el area; con un solo punto valido el bbox es degenerado y `span_km` es 0.0.
- El ecuador (`lat == 0`) cuenta como hemisferio norte y el meridiano 0 (`lon == 0`) como este, por convencion `>= 0`.
@@ -0,0 +1,209 @@
"""analyze_geo_extent — geographic extent of a cloud of coordinates (EDA `geospatial`).
Pure function: no I/O, no network, deterministic. Given two parallel lists of
latitudes and longitudes it derives the bounding box, centroid, diagonal span
(haversine), per-region counts and hemisphere split of the points, and assigns
each point to a country/region via an OFFLINE lookup against a table of
rectangular bounding boxes embedded as a constant (`_REGION_BBOXES`).
It never reads files, never hits the network and depends only on `math`. The
country boxes are deliberately coarse rectangles (a KISS approximation, NOT a
reverse-geocoder). Reading is defensive throughout and the function NEVER
raises: invalid pairs (None / NaN / out of range) are silently discarded and an
empty cloud yields a zeroed result the caller can skip.
"""
import math
# Earth mean radius in km used by the haversine formula.
_EARTH_RADIUS_KM = 6371.0
# How many distinct regions to surface in `by_region` before collapsing the
# remainder into a single "Otros" bucket.
_TOP_REGIONS = 8
# Offline region lookup: (name, lat_min, lat_max, lon_min, lon_max).
#
# Specific countries are listed FIRST and continental fallbacks LAST: each point
# is assigned to the FIRST box that contains it, so the more specific country box
# wins over the broad continent box. Boxes are coarse rectangles approximating
# the mainland extent of each region; overlapping neighbours are ordered so the
# narrower/more-western country claims its coastal points (e.g. Portugal before
# Spain, Chile before Argentina, the contiguous US before Canada).
_REGION_BBOXES = (
# --- countries (specific) ---
("Portugal", 36.9, 42.2, -9.6, -6.2),
("España", 36.0, 43.8, -9.4, 3.4),
("Francia", 41.3, 51.1, -5.2, 9.6),
("Reino Unido", 49.9, 58.7, -8.6, 1.8),
("Irlanda", 51.4, 55.4, -10.6, -5.9),
("Países Bajos", 50.7, 53.6, 3.3, 7.2),
("Bélgica", 49.5, 51.5, 2.5, 6.4),
("Suiza", 45.8, 47.8, 5.9, 10.5),
("Alemania", 47.3, 55.1, 5.9, 15.0),
("Italia", 36.6, 47.1, 6.6, 18.5),
("Marruecos", 27.7, 35.9, -13.2, -1.0),
("Egipto", 22.0, 31.7, 25.0, 35.0),
("Sudáfrica", -34.8, -22.1, 16.5, 32.9),
("China", 18.0, 53.6, 73.5, 135.1),
("Japón", 24.0, 45.6, 122.9, 145.9),
("India", 6.7, 35.5, 68.1, 97.4),
("Australia", -43.7, -10.0, 112.9, 153.7),
("México", 14.5, 32.7, -118.4, -86.7),
("Estados Unidos", 24.4, 49.4, -125.0, -66.9),
("Canadá", 41.7, 83.1, -141.0, -52.6),
("Chile", -55.9, -17.5, -75.6, -66.4),
("Argentina", -55.1, -21.8, -73.6, -53.6),
("Brasil", -33.8, 5.3, -74.0, -34.8),
("Rusia", 41.2, 77.0, 19.6, 180.0),
# --- continental fallbacks (broad) ---
("Europa", 34.0, 72.0, -25.0, 45.0),
("África", -35.0, 37.5, -18.0, 52.0),
("Asia", 5.0, 78.0, 26.0, 180.0),
("América del Norte", 7.0, 84.0, -168.0, -52.0),
("América del Sur", -56.0, 13.0, -82.0, -34.0),
("Oceanía", -50.0, 0.0, 110.0, 180.0),
)
def _coord(value, limit):
"""Coerce a coordinate to a valid float in [-limit, limit] or None.
bool is a subclass of int but never a real coordinate, so True/False are
treated as missing. NaN and out-of-range values are rejected.
"""
if value is None or isinstance(value, bool):
return None
try:
f = float(value)
except (TypeError, ValueError):
return None
# NaN is the only value that is not equal to itself.
if f != f or f < -limit or f > limit:
return None
return f
def _haversine_km(lat1, lon1, lat2, lon2):
"""Great-circle distance in km between two (lat, lon) points in degrees."""
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2.0) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2.0) ** 2
return 2.0 * _EARTH_RADIUS_KM * math.asin(min(1.0, math.sqrt(a)))
def _region_of(lat, lon):
"""Return the name of the first embedded box containing (lat, lon)."""
for name, lat_min, lat_max, lon_min, lon_max in _REGION_BBOXES:
if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max:
return name
return "Océano/Otros"
def _empty_result():
"""Result shape when there are no valid coordinate pairs."""
return {
"n_points": 0,
"bbox": None,
"centroid": None,
"span_km": 0.0,
"by_region": [],
"hemisphere": {"north": 0, "south": 0, "east": 0, "west": 0},
"note": "sin coordenadas validas",
}
def analyze_geo_extent(lats: list, lons: list) -> dict:
"""Summarise the geographic extent of a cloud of lat/lon coordinates.
Pairs `lats[i]` with `lons[i]` by index (over the common length when the two
lists differ in size), discards any pair where either value is None / NaN or
outside [-90, 90] (lat) / [-180, 180] (lon), and derives the bounding box,
centroid, diagonal span, per-region counts and hemisphere split. Each valid
point is matched to a country/region by an offline lookup against coarse
rectangular bounding boxes (`_REGION_BBOXES`).
Args:
lats: List of latitudes in degrees ([-90, 90]); read defensively.
lons: List of longitudes in degrees ([-180, 180]); read defensively.
Paired with `lats` by index; the shorter length wins when they differ.
Returns:
Dict with the geographic summary:
{n_points, bbox={lat_min,lat_max,lon_min,lon_max}, centroid={lat,lon},
span_km (haversine of the SW->NE bbox diagonal), by_region=[{region,count}]
(descending, top-8 with the rest folded into "Otros"),
hemisphere={north,south,east,west}, note (Spanish summary phrase)}.
With no valid pairs returns the zeroed shape: n_points 0, bbox None,
centroid None, span_km 0.0, empty by_region, zeroed hemisphere and the
note "sin coordenadas validas". Never raises.
"""
if not isinstance(lats, (list, tuple)) or not isinstance(lons, (list, tuple)):
return _empty_result()
valid = []
# zip already stops at the shorter list -> unbalanced lengths are handled.
for raw_lat, raw_lon in zip(lats, lons):
lat = _coord(raw_lat, 90.0)
lon = _coord(raw_lon, 180.0)
if lat is None or lon is None:
continue
valid.append((lat, lon))
if not valid:
return _empty_result()
n = len(valid)
lat_vals = [p[0] for p in valid]
lon_vals = [p[1] for p in valid]
lat_min, lat_max = min(lat_vals), max(lat_vals)
lon_min, lon_max = min(lon_vals), max(lon_vals)
centroid_lat = sum(lat_vals) / n
centroid_lon = sum(lon_vals) / n
# Diagonal span: SW corner (lat_min, lon_min) to NE corner (lat_max, lon_max).
span_km = _haversine_km(lat_min, lon_min, lat_max, lon_max)
# Hemisphere split: the equator/prime-meridian go to north/east respectively.
north = sum(1 for lat in lat_vals if lat >= 0.0)
south = n - north
east = sum(1 for lon in lon_vals if lon >= 0.0)
west = n - east
# Count points per region (offline bbox lookup).
counts = {}
for lat, lon in valid:
region = _region_of(lat, lon)
counts[region] = counts.get(region, 0) + 1
# Descending by count, then by name for a deterministic tie-break.
ranked = sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))
by_region = [{"region": name, "count": count} for name, count in ranked[:_TOP_REGIONS]]
rest = sum(count for _, count in ranked[_TOP_REGIONS:])
if rest > 0:
by_region.append({"region": "Otros", "count": rest})
top_region, top_count = ranked[0]
note = (
"los puntos se concentran en {region} ({count} de {n})".format(
region=top_region, count=top_count, n=n
)
)
return {
"n_points": n,
"bbox": {
"lat_min": lat_min,
"lat_max": lat_max,
"lon_min": lon_min,
"lon_max": lon_max,
},
"centroid": {"lat": centroid_lat, "lon": centroid_lon},
"span_km": span_km,
"by_region": by_region,
"hemisphere": {"north": north, "south": south, "east": east, "west": west},
"note": note,
}
@@ -0,0 +1,126 @@
"""Tests para analyze_geo_extent."""
import math
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from analyze_geo_extent import analyze_geo_extent, _haversine_km
# Keys that a non-empty result dict must always contain.
_EXPECTED_KEYS = {
"n_points", "bbox", "centroid", "span_km",
"by_region", "hemisphere", "note",
}
def test_nube_en_espana():
"""Golden: nube de puntos alrededor de Madrid -> region top = España."""
# Cuatro puntos en torno a Madrid (lat ~40, lon ~-3.7), con algo de spread.
lats = [40.4, 40.0, 41.0, 39.5]
lons = [-3.7, -3.5, -4.0, -3.2]
res = analyze_geo_extent(lats, lons)
assert set(res.keys()) == _EXPECTED_KEYS
assert res["n_points"] == 4
# Todos caen en España -> by_region una sola entrada.
assert res["by_region"][0]["region"] == "España"
assert res["by_region"][0]["count"] == 4
# Centroide coherente: media de lat y lon.
assert math.isclose(res["centroid"]["lat"], sum(lats) / 4, rel_tol=1e-9)
assert math.isclose(res["centroid"]["lon"], sum(lons) / 4, rel_tol=1e-9)
# bbox correcto.
assert res["bbox"]["lat_min"] == 39.5
assert res["bbox"]["lat_max"] == 41.0
assert res["bbox"]["lon_min"] == -4.0
assert res["bbox"]["lon_max"] == -3.2
# Hay spread -> diagonal > 0.
assert res["span_km"] > 0.0
# Hemisferio norte (lat>0) y oeste (lon<0).
assert res["hemisphere"]["north"] == 4
assert res["hemisphere"]["south"] == 0
assert res["hemisphere"]["east"] == 0
assert res["hemisphere"]["west"] == 4
assert "España" in res["note"]
def test_dos_paises_distintos():
"""Golden: puntos en España y Francia -> by_region con 2 entradas."""
# Madrid (España) x2 y Paris (Francia) x1.
lats = [40.4, 40.0, 48.8]
lons = [-3.7, -3.5, 2.3]
res = analyze_geo_extent(lats, lons)
assert res["n_points"] == 3
regions = {entry["region"]: entry["count"] for entry in res["by_region"]}
assert regions == {"España": 2, "Francia": 1}
# Orden descendente por count: España (2) antes que Francia (1).
assert res["by_region"][0]["region"] == "España"
assert res["by_region"][0]["count"] == 2
# Madrid y Paris ambos hemisferio norte; Paris lon>0 -> 1 east, 2 west.
assert res["hemisphere"]["north"] == 3
assert res["hemisphere"]["east"] == 1
assert res["hemisphere"]["west"] == 2
def test_listas_vacias():
"""Edge: listas vacias -> n_points 0, bbox None, sin lanzar."""
res = analyze_geo_extent([], [])
assert res["n_points"] == 0
assert res["bbox"] is None
assert res["centroid"] is None
assert res["span_km"] == 0.0
assert res["by_region"] == []
assert res["hemisphere"] == {"north": 0, "south": 0, "east": 0, "west": 0}
assert res["note"] == "sin coordenadas validas"
def test_pares_invalidos_filtrados():
"""Edge: None / NaN / fuera de rango se descartan, no lanza."""
nan = float("nan")
lats = [40.4, None, nan, 91.0, -200.0, 40.0]
lons = [-3.7, -3.5, -3.0, 2.0, 5.0, -3.5]
# Validos: indices 0 y 5 (lat 91 fuera de rango, lon -200 fuera de rango,
# None y NaN descartados).
res = analyze_geo_extent(lats, lons)
assert res["n_points"] == 2
assert res["by_region"][0]["region"] == "España"
assert res["by_region"][0]["count"] == 2
def test_longitudes_desbalanceadas():
"""Edge: len(lats) != len(lons) usa el minimo comun sin lanzar."""
lats = [40.4, 40.0, 41.0, 39.5] # 4 elementos
lons = [-3.7, -3.5] # 2 elementos
res = analyze_geo_extent(lats, lons)
# Solo se emparejan los 2 primeros.
assert res["n_points"] == 2
assert res["bbox"]["lat_min"] == 40.0
assert res["bbox"]["lat_max"] == 40.4
def test_span_km_haversine_par_conocido():
"""Edge: span_km coincide con haversine de la diagonal del bbox."""
# Dos puntos: (0, 0) y (0, 1). bbox diagonal = mismos dos puntos.
res = analyze_geo_extent([0.0, 0.0], [0.0, 1.0])
# 1 grado de longitud en el ecuador ~ 111.19 km.
expected = _haversine_km(0.0, 0.0, 0.0, 1.0)
assert math.isclose(res["span_km"], expected, rel_tol=1e-9)
assert math.isclose(res["span_km"], 111.19, abs_tol=0.5)
def test_no_lanza_con_entradas_raras():
"""Edge: tipos no-lista o None devuelven la forma vacia sin lanzar."""
assert analyze_geo_extent(None, None)["n_points"] == 0
assert analyze_geo_extent("foo", "bar")["n_points"] == 0
# Strings dentro de las listas se descartan como invalidos.
res = analyze_geo_extent(["x", 40.0], [None, -3.5])
assert res["n_points"] == 1
@@ -0,0 +1,477 @@
"""Geospatial chapter (GEOSPATIAL) for AutomaticEDA.
When the dataset carries a coordinate pair (latitude/longitude), this chapter
draws the points on a **geographic scatter** in an equirectangular projection
(scaled so degrees of longitude are not stretched at the data's latitude) and
analyses the **zone / country** the points fall in: bounding box, centroid,
geographic span, and a per-region count. When there is **no** coordinate pair the
chapter returns ``None`` — exactly the user requirement.
Detection and the heavy lifting are delegated to pure ``eda``-group registry
functions, never reimplemented here:
- ``detect_latlon_columns`` — finds the (lat, lon) column pair by name + value
range from the ``profile['columns']`` metadata.
- ``analyze_geo_extent`` — bbox, centroid, haversine span, per-region counts and
hemisphere from the raw coordinate arrays.
- ``build_geo_scatter`` — deterministically down-sampled points + bbox + the
aspect ratio for the equirectangular projection. This chapter only draws the
matplotlib figure from that prepared data (same split as ``num_distr`` does
with ``build_boxplot_stats``).
The raw coordinate arrays are **not** in a standard TableProfile (it stores only
per-column aggregates), so — exactly like ``modelos`` reads ``raw_numeric`` from
``ctx`` — this chapter looks for the coordinates in ``ctx`` (or ``profile``) and
degrades honestly when they are absent: it still detects the columns and shows an
approximate bounding box derived from the per-column ``numeric.min/max``, with a
note that the raw points are needed for the map.
ctx keys this chapter consumes (all optional):
geo_points : dict — ``{"lats": [...], "lons": [...]}`` raw coordinate arrays.
Used directly when present (forward-compatible with a calculation phase
that samples them from the table).
raw_numeric : dict — ``{col: [values]}`` raw numeric columns; when present
and ``geo_points`` is not, the detected lat/lon columns are read from it.
run_geo_llm : bool — when True, call ``ask_llm`` for a one-line narrative of
where the points concentrate (otherwise a derived note is used).
geo_llm_model : str — model id for the optional live LLM call.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and never raises.
"""
from __future__ import annotations
import math
from .. import model
# Pure registry functions (group ``eda``) delegated to. Imported defensively so
# the chapter stays importable (degrading gracefully) if one is unavailable.
try:
from datascience.detect_latlon_columns import detect_latlon_columns
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
detect_latlon_columns = None # type: ignore[assignment]
try:
from datascience.analyze_geo_extent import analyze_geo_extent
except Exception: # noqa: BLE001
analyze_geo_extent = None # type: ignore[assignment]
try:
from datascience.build_geo_scatter import build_geo_scatter
except Exception: # noqa: BLE001
build_geo_scatter = None # type: ignore[assignment]
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "geospatial"
CHAPTER_TITLE = "Análisis geoespacial"
# --------------------------------------------------------------------------- #
# Formatting helpers (mirror the other chapters' defensive style).
# --------------------------------------------------------------------------- #
def _fmt_num(value, decimals: int = 4) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _fmt_coord(value, decimals: int = 4) -> str:
"""Format a coordinate degree value, defensively."""
try:
return f"{float(value):.{decimals}f}°"
except (TypeError, ValueError):
return model._safe_str(value)
def _fmt_km(value) -> str:
if value is None:
return ""
try:
v = float(value)
except (TypeError, ValueError):
return model._safe_str(value)
if v >= 100:
return f"{v:,.0f} km".replace(",", ".")
return f"{v:.1f} km"
def _is_dict(v) -> bool:
return isinstance(v, dict)
def _clean_floats(seq) -> list:
"""Return a list of floats from an arbitrary sequence (drop None/NaN)."""
out = []
if not isinstance(seq, (list, tuple)):
return out
for v in seq:
try:
f = float(v)
except (TypeError, ValueError):
out.append(None)
continue
out.append(f if f == f else None) # NaN -> None
return out
# --------------------------------------------------------------------------- #
# Resolve the (lat, lon) columns and the raw coordinate arrays.
# --------------------------------------------------------------------------- #
def _detect_columns(profile: dict) -> dict:
"""Detect the lat/lon column pair from the profile metadata, or {}."""
cols = profile.get("columns")
if not isinstance(cols, list) or not cols or detect_latlon_columns is None:
return {}
try:
det = detect_latlon_columns(cols)
except Exception: # noqa: BLE001 — never break the chapter.
return {}
return det if _is_dict(det) else {}
def _resolve_coords(profile: dict, ctx: dict, detected: dict):
"""Return (lats, lons, source_label).
Order: ctx/profile['geo_points'] (explicit arrays) → ctx/profile
['raw_numeric'] keyed by the detected lat/lon column names → (None, None).
"""
gp = ctx.get("geo_points") or profile.get("geo_points")
if _is_dict(gp):
lats = gp.get("lats")
if lats is None:
lats = gp.get("lat")
lons = gp.get("lons")
if lons is None:
lons = gp.get("lon")
if lats and lons:
return list(lats), list(lons), "geo_points"
lat_col = (detected or {}).get("lat_col")
lon_col = (detected or {}).get("lon_col")
if lat_col and lon_col:
raw = ctx.get("raw_numeric") or profile.get("raw_numeric")
if _is_dict(raw):
lats = raw.get(lat_col)
lons = raw.get(lon_col)
if lats and lons:
return list(lats), list(lons), "raw_numeric"
return None, None, "none"
def _column_by_name(profile: dict, name):
if not name:
return None
for col in profile.get("columns") or []:
if isinstance(col, dict) and col.get("name") == name:
return col
return None
def _bbox_from_profile(profile: dict, detected: dict):
"""Approximate bbox from the per-column numeric.min/max (no raw points)."""
lat_c = _column_by_name(profile, (detected or {}).get("lat_col"))
lon_c = _column_by_name(profile, (detected or {}).get("lon_col"))
lat_n = lat_c.get("numeric") if _is_dict(lat_c) else None
lon_n = lon_c.get("numeric") if _is_dict(lon_c) else None
if not _is_dict(lat_n) or not _is_dict(lon_n):
return None
try:
return {
"lat_min": float(lat_n.get("min")),
"lat_max": float(lat_n.get("max")),
"lon_min": float(lon_n.get("min")),
"lon_max": float(lon_n.get("max")),
}
except (TypeError, ValueError):
return None
# --------------------------------------------------------------------------- #
# Figure builder (lazy: matplotlib only imported when the renderer draws it).
# --------------------------------------------------------------------------- #
def _make_geo_scatter(scatter: dict, lat_col: str, lon_col: str):
"""Return a zero-arg callable drawing the geographic scatter, or None."""
points = scatter.get("points") or []
if not points:
return None
bbox = scatter.get("bbox") if _is_dict(scatter.get("bbox")) else {}
aspect = scatter.get("aspect") or 1.0
pad = scatter.get("pad") if _is_dict(scatter.get("pad")) else {}
n_total = scatter.get("n_total")
n_shown = scatter.get("n_shown")
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
xs = [p[0] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
ys = [p[1] for p in points if isinstance(p, (list, tuple)) and len(p) >= 2]
fig, ax = plt.subplots(figsize=(6.6, 5.0))
# More points -> smaller markers + lower alpha so dense clouds read as
# density without saturating the page with ink (Tufte).
n = max(len(xs), 1)
size = 18 if n <= 200 else (8 if n <= 1000 else 4)
alpha = 0.75 if n <= 200 else (0.5 if n <= 1000 else 0.35)
ax.scatter(xs, ys, s=size, c="#2a6f97", alpha=alpha, linewidths=0,
zorder=3)
# Bounding box rectangle for orientation.
if bbox:
try:
lo_x, hi_x = float(bbox["lon_min"]), float(bbox["lon_max"])
lo_y, hi_y = float(bbox["lat_min"]), float(bbox["lat_max"])
ax.plot([lo_x, hi_x, hi_x, lo_x, lo_x],
[lo_y, lo_y, hi_y, hi_y, lo_y],
color="#e15759", linewidth=1.0, linestyle="--",
alpha=0.8, zorder=4, label="Bounding box")
px = float(pad.get("lon", 0.0) or 0.0)
py = float(pad.get("lat", 0.0) or 0.0)
ax.set_xlim(lo_x - px, hi_x + px)
ax.set_ylim(lo_y - py, hi_y + py)
except (TypeError, ValueError, KeyError):
pass
# Equirectangular: scale Y/X so longitude is not stretched at this
# latitude (integridad de proyección, Tufte). aspect = 1/cos(lat).
try:
ax.set_aspect(float(aspect))
except (TypeError, ValueError):
pass
ax.set_xlabel(f"Longitud ({lon_col})", fontsize=8)
ax.set_ylabel(f"Latitud ({lat_col})", fontsize=8)
ax.tick_params(labelsize=7)
ax.grid(color="#e6e6e6", linewidth=0.5, zorder=0)
title = "Distribución geográfica de las coordenadas"
if n_shown is not None and n_total is not None and n_shown < n_total:
title += f"\n(mostrando {n_shown:,} de {n_total:,} puntos)".replace(",", ".")
ax.set_title(title, fontsize=10)
ax.legend(loc="best", fontsize=7, frameon=True, framealpha=0.9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Section builders.
# --------------------------------------------------------------------------- #
def _intro_block(detected: dict, lat_col: str, lon_col: str) -> list:
conf = (detected or {}).get("confidence")
reason = model._safe_str((detected or {}).get("reason"))
conf_txt = ""
if conf is not None:
try:
conf_txt = f" (confianza {float(conf) * 100:.0f}%)"
except (TypeError, ValueError):
conf_txt = ""
text = (
"Este dataset contiene **coordenadas geográficas**: se identificó el par "
f"**latitud = «{lat_col}»** y **longitud = «{lon_col}»**{conf_txt}. La "
"detección combina el nombre de la columna y el rango de sus valores "
"(latitud en [90, 90], longitud en [180, 180])."
)
if reason:
text += f"\n\n*Criterio de detección:* {reason}."
return [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=text)]
def _extent_blocks(extent: dict) -> list:
"""KVTable with bbox/centroid/span + DataTable with the per-region counts."""
if not _is_dict(extent) or not extent.get("n_points"):
return []
blocks = []
bbox = extent.get("bbox") if _is_dict(extent.get("bbox")) else {}
centroid = extent.get("centroid") if _is_dict(extent.get("centroid")) else {}
hemi = extent.get("hemisphere") if _is_dict(extent.get("hemisphere")) else {}
rows = [("Puntos con coordenadas", _fmt_num(extent.get("n_points")))]
if bbox:
rows.append(("Latitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lat_min'))} a "
f"{_fmt_coord(bbox.get('lat_max'))}"))
rows.append(("Longitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lon_min'))} a "
f"{_fmt_coord(bbox.get('lon_max'))}"))
if centroid:
rows.append(("Centroide",
f"{_fmt_coord(centroid.get('lat'))}, "
f"{_fmt_coord(centroid.get('lon'))}"))
if extent.get("span_km") is not None:
rows.append(("Extensión (diagonal)", _fmt_km(extent.get("span_km"))))
if hemi:
n, s = hemi.get("north"), hemi.get("south")
e, w = hemi.get("east"), hemi.get("west")
rows.append(("Hemisferios",
f"N {_fmt_num(n)} / S {_fmt_num(s)} · "
f"E {_fmt_num(e)} / O {_fmt_num(w)}"))
blocks.append(model.KVTable(rows=rows, title="Extensión geográfica"))
by_region = extent.get("by_region")
if isinstance(by_region, list) and by_region:
total = sum(r.get("count", 0) for r in by_region if _is_dict(r)) or 0
rrows = []
for r in by_region:
if not _is_dict(r):
continue
cnt = r.get("count", 0)
pct = (cnt / total) if total else None
pct_txt = f"{pct * 100:.1f}%" if pct is not None else ""
rrows.append([model._safe_str(r.get("region")), _fmt_num(cnt),
pct_txt])
if rrows:
blocks.append(model.DataTable(
header=["Zona / país", "Puntos", "% del total"], rows=rrows,
title="Distribución por zona",
note="Asignación aproximada por bounding box de cada región "
"(no es reverse-geocoding exacto de fronteras)."))
return blocks
def _narrative_block(profile: dict, ctx: dict, extent: dict) -> list:
"""A one-line narrative of where the points concentrate.
Uses the derived ``note`` from analyze_geo_extent by default; optionally
calls an LLM (ctx['run_geo_llm']) for a richer one-liner.
"""
note = model._safe_str((extent or {}).get("note"))
if ctx.get("run_geo_llm"):
by_region = (extent or {}).get("by_region") or []
bbox = (extent or {}).get("bbox") or {}
try:
from core.ask_llm import ask_llm
prompt = (
"Eres un analista de datos. En UNA frase en español, describe "
"dónde se concentran geográficamente estos puntos. Sé concreto "
"y no inventes precisión que los datos no tienen.\n"
f"Conteo por zona: {by_region}\nBounding box: {bbox}."
)
out = ask_llm(prompt,
model=ctx.get("geo_llm_model",
"claude-haiku-4-5-20251001"),
echo=False)
if out and isinstance(out, str) and out.strip():
note = out.strip()
except Exception: # noqa: BLE001 — degrade to the derived note.
pass
if not note:
return []
return [model.Markdown(text=f"**Interpretación.** {note}")]
def _no_points_block(profile: dict, detected: dict) -> list:
"""Degrade honestly when the raw coordinate arrays are not available."""
blocks = []
bbox = _bbox_from_profile(profile, detected)
if bbox:
rows = [
("Latitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lat_min'))} a "
f"{_fmt_coord(bbox.get('lat_max'))}"),
("Longitud (mín. / máx.)",
f"{_fmt_coord(bbox.get('lon_min'))} a "
f"{_fmt_coord(bbox.get('lon_max'))}"),
]
blocks.append(model.KVTable(
rows=rows, title="Extensión geográfica (aproximada)"))
blocks.append(model.Note(
"No se incluyeron las coordenadas crudas en el contexto, por lo que el "
"mapa y el análisis por zona no se han dibujado. El bounding box "
"mostrado se deriva de los mínimos y máximos por columna. Para el "
"scatter geográfico completo, pasa los arrays en "
"ctx['geo_points'] = {'lats': [...], 'lons': [...]} o las columnas en "
"ctx['raw_numeric']."))
return blocks
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_geospatial(profile: dict, ctx: dict):
"""Build the GEOSPATIAL Chapter, or None if the dataset has no coordinates.
Args:
profile: the ``eda`` group TableProfile dict.
ctx: presentation context; may carry ``geo_points``/``raw_numeric`` with
the raw coordinate arrays and the ``run_geo_llm`` flag.
Returns:
A ``model.Chapter`` with the geographic scatter + zone/country analysis,
or ``None`` when no latitude/longitude column pair is detected.
"""
profile = profile or {}
ctx = ctx or {}
if not isinstance(profile, dict):
return None
detected = _detect_columns(profile)
lats, lons, source = _resolve_coords(profile, ctx, detected)
has_detection = bool((detected or {}).get("lat_col") and
(detected or {}).get("lon_col"))
has_points = bool(lats and lons)
if not has_detection and not has_points:
return None # chapter does not apply: no coordinates in this dataset.
# Labels for axes / intro. When only raw arrays were given (no detection),
# fall back to generic names.
lat_col = (detected or {}).get("lat_col") or "lat"
lon_col = (detected or {}).get("lon_col") or "lon"
blocks = _intro_block(detected, lat_col, lon_col)
if has_points:
clean_lats = _clean_floats(lats)
clean_lons = _clean_floats(lons)
# Zone / country analysis.
extent = {}
if analyze_geo_extent is not None:
try:
extent = analyze_geo_extent(clean_lats, clean_lons) or {}
except Exception: # noqa: BLE001
extent = {}
# The geographic scatter figure (its own page/slide).
scatter = {}
if build_geo_scatter is not None:
try:
scatter = build_geo_scatter(clean_lats, clean_lons) or {}
except Exception: # noqa: BLE001
scatter = {}
maker = _make_geo_scatter(scatter, lat_col, lon_col) if scatter else None
if maker is not None:
blocks.append(model.Figure(
make=maker,
caption="Cada punto es una observación situada por sus "
"coordenadas; el recuadro rojo es el bounding box. La "
"escala respeta la latitud (proyección equirectangular)."))
else:
blocks.append(model.Note(
"No se pudo construir el scatter geográfico a partir de las "
"coordenadas proporcionadas."))
blocks += _extent_blocks(extent)
blocks += _narrative_block(profile, ctx, extent)
else:
# Columns detected but no raw points available — degrade honestly.
blocks += _no_points_block(profile, detected)
if not blocks:
return None
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,245 @@
"""Tests for the GEOSPATIAL chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic TableProfiles (no DuckDB) so the suite is fast
and deterministic. The raw coordinate arrays are passed through ``ctx`` exactly
as the chapter's contract documents (``ctx['geo_points']`` / ``ctx['raw_numeric']``).
Verifies that the chapter detects the lat/lon pair, draws the geographic scatter
figure, analyses the zone/country (bounding box + per-region counts), returns
None when there are no coordinates, degrades honestly when the raw points are
absent, and that a profile with long column names + many points + several
regions renders to PDF and PPTX without cutting any text (long content wraps, it
is never truncated).
"""
import os
import re
import tempfile
from pypdf import PdfReader
from pptx import Presentation
from datascience.automatic_eda.chapters.geospatial import (
build_geospatial,
CHAPTER_VERSION,
)
from datascience.automatic_eda import build_document, render_pdf, render_pptx
# --------------------------------------------------------------------------- #
# Synthetic data helpers
# --------------------------------------------------------------------------- #
def _grid(lat0: float, lon0: float, n: int, spread: float = 1.0):
"""A small deterministic cloud of n points around (lat0, lon0)."""
lats, lons = [], []
for i in range(n):
# deterministic pseudo-spread, no randomness.
f = (i % 11) / 11.0 - 0.5
g = (i % 7) / 7.0 - 0.5
lats.append(lat0 + f * spread)
lons.append(lon0 + g * spread)
return lats, lons
def _profile_with_coords(lat_name="lat", lon_name="lon", lats=None, lons=None):
"""A profile carrying a lat/lon column pair with valid ranges."""
lats = lats if lats is not None else [40.4, 41.0, 39.8, 40.1]
lons = lons if lons is not None else [-3.7, -3.6, -4.0, -3.9]
return {
"table": "lugares",
"columns": [
{"name": lat_name, "inferred_type": "numeric",
"numeric": {"min": min(lats), "max": max(lats),
"mean": sum(lats) / len(lats)}},
{"name": lon_name, "inferred_type": "numeric",
"numeric": {"min": min(lons), "max": max(lons),
"mean": sum(lons) / len(lons)}},
{"name": "valor", "inferred_type": "numeric",
"numeric": {"min": 0, "max": 100, "mean": 50}},
],
}
def _ctx_points(lats, lons):
return {"geo_points": {"lats": lats, "lons": lons}}
def _kinds(chapter):
return [getattr(b, "kind", None) for b in chapter.blocks]
def _tables(chapter):
return [b for b in chapter.blocks if getattr(b, "kind", None) == "data_table"]
def _figures(chapter):
return [b for b in chapter.blocks if getattr(b, "kind", None) == "figure"]
# --------------------------------------------------------------------------- #
# Golden
# --------------------------------------------------------------------------- #
def test_golden_estructura_y_version():
lats, lons = [40.4, 41.0, 39.8, 40.1], [-3.7, -3.6, -4.0, -3.9]
ch = build_geospatial(_profile_with_coords(lats=lats, lons=lons),
_ctx_points(lats, lons))
assert ch is not None
assert ch.id == "geospatial"
assert ch.version == CHAPTER_VERSION
kinds = _kinds(ch)
# intro heading + markdown + scatter figure + extent kv + per-region table.
assert "heading" in kinds
assert "markdown" in kinds
assert "figure" in kinds, "falta el scatter geográfico"
assert "kv_table" in kinds, "falta la tabla de extensión"
def test_golden_detecta_columnas_y_nombra_ejes():
lats, lons = _grid(40.4, -3.7, 30, spread=0.8)
prof = _profile_with_coords("latitude", "longitude", lats, lons)
ch = build_geospatial(prof, _ctx_points(lats, lons))
intro = [b for b in ch.blocks if b.kind == "markdown"][0].text
assert "latitude" in intro and "longitude" in intro
def test_golden_figura_es_perezosa_y_dibujable():
lats, lons = _grid(40.4, -3.7, 50, spread=0.6)
ch = build_geospatial(_profile_with_coords(lats=lats, lons=lons),
_ctx_points(lats, lons))
fig_block = _figures(ch)[0]
assert fig_block.make is not None and fig_block.fig is None # lazy
fig = fig_block.make() # must draw without raising
assert fig is not None
import matplotlib.pyplot as plt
plt.close(fig)
def test_golden_analisis_por_zona_espana():
lats, lons = _grid(40.4, -3.7, 40, spread=0.5) # Madrid area
ch = build_geospatial(_profile_with_coords(lats=lats, lons=lons),
_ctx_points(lats, lons))
tables = _tables(ch)
region_tbl = [t for t in tables if "zona" in (t.title or "").lower()]
assert region_tbl, "falta la tabla por zona/país"
flat = " ".join(" ".join(str(c) for c in r) for r in region_tbl[0].rows)
# Spain-area points must resolve to a Spain/European region, not empty.
assert region_tbl[0].rows
assert any(c for c in (region_tbl[0].rows[0]))
def test_golden_raw_numeric_source():
"""Coordinates can also come from ctx['raw_numeric'] keyed by detected cols."""
lats, lons = _grid(48.85, 2.35, 25, spread=0.4) # Paris area
prof = _profile_with_coords("lat", "lon", lats, lons)
ctx = {"raw_numeric": {"lat": lats, "lon": lons}}
ch = build_geospatial(prof, ctx)
assert ch is not None
assert _figures(ch), "el scatter debe construirse desde raw_numeric"
# --------------------------------------------------------------------------- #
# Edges
# --------------------------------------------------------------------------- #
def test_edge_sin_coordenadas_devuelve_none():
prof = {
"table": "ventas",
"columns": [
{"name": "precio", "inferred_type": "numeric",
"numeric": {"min": 0, "max": 1000}},
{"name": "categoria", "inferred_type": "text"},
],
}
assert build_geospatial(prof, {}) is None
def test_edge_none_y_vacio_no_rompen():
assert build_geospatial(None, None) is None
assert build_geospatial({}, {}) is None
assert build_geospatial({"columns": []}, {}) is None
assert build_geospatial("not a dict", {}) is None
def test_edge_nombre_lat_pero_rango_invalido_no_aplica():
"""A column named 'lat' whose values are out of [-90,90] is NOT a coordinate."""
prof = {
"table": "x",
"columns": [
{"name": "lat", "inferred_type": "numeric",
"numeric": {"min": 1000, "max": 9999}},
{"name": "lon", "inferred_type": "numeric",
"numeric": {"min": 1000, "max": 9999}},
],
}
assert build_geospatial(prof, {}) is None
def test_edge_columnas_detectadas_sin_puntos_degrada():
"""Detected lat/lon but no raw arrays -> honest note + approx bbox, no crash."""
prof = _profile_with_coords(lats=[40.0, 41.0], lons=[-3.0, -4.0])
ch = build_geospatial(prof, {}) # no geo_points / raw_numeric
assert ch is not None
assert not _figures(ch), "sin puntos no debe dibujarse el scatter"
notes = [b for b in ch.blocks if b.kind == "note"]
assert notes and "coordenadas crudas" in notes[0].text
def test_edge_coordenadas_con_nan_se_filtran():
lats = [40.4, float("nan"), 41.0, None, 39.8]
lons = [-3.7, -3.6, float("nan"), -3.9, -4.0]
ch = build_geospatial(_profile_with_coords(lats=[39.8, 41.0],
lons=[-4.0, -3.6]),
_ctx_points(lats, lons))
assert ch is not None # must not raise on NaN/None
# --------------------------------------------------------------------------- #
# Anti-cut: long names + many points + several regions render without truncation
# --------------------------------------------------------------------------- #
def _multiregion_points(per: int = 700):
"""Points spread across Spain, France and the USA to fill the region table."""
lats, lons = [], []
for (la, lo) in ((40.4, -3.7), (48.85, 2.35), (39.0, -98.0)):
gl, gn = _grid(la, lo, per, spread=2.0)
lats += gl
lons += gn
return lats, lons
def test_anticut_pdf_y_pptx_no_truncan():
lat_name = "latitud_geografica_del_punto_de_observacion_registrado"
lon_name = "longitud_geografica_del_punto_de_observacion_registrado"
lats, lons = _multiregion_points(700)
prof = _profile_with_coords(lat_name, lon_name, lats, lons)
ctx = {"geo_points": {"lats": lats, "lons": lons}}
full = build_document(prof, ctx)
assert any(c.id == "geospatial" for c in full)
chapters = [c for c in full if c.id == "geospatial"]
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "g.pdf")
pptx = os.path.join(d, "g.pptx")
rp = render_pdf(chapters, pdf, {"title": "EDA"})
rx = render_pptx(chapters, pptx, {"title": "EDA"})
assert os.path.exists(pdf) and os.path.exists(pptx)
assert (rp or {}).get("n_pages", 0) >= 1
# PDF: the long lat column name survives whole (wraps, not cut) and there
# is no truncation marker in this chapter.
pdf_txt = "".join((pg.extract_text() or "") for pg in PdfReader(pdf).pages)
assert "" not in pdf_txt and "..." not in pdf_txt
norm = re.sub(r"\s+", "", pdf_txt)
assert lat_name in norm, "el nombre largo de la columna se cortó en el PDF"
# PPTX: long name present in some shape/cell, untruncated.
allt = []
for s in Presentation(pptx).slides:
for sh in s.shapes:
if sh.has_text_frame:
allt.append(sh.text_frame.text)
if sh.has_table:
for row in sh.table.rows:
for c in row.cells:
allt.append(c.text)
joined = re.sub(r"\s+", "", "\n".join(allt))
assert lat_name in joined, "el nombre largo de la columna se cortó en el PPTX"
@@ -1,613 +0,0 @@
"""Time-series chapter (TIMESERIES) for AutomaticEDA.
This chapter applies **only when the table has a date/datetime column**. When it
does, it draws — exactly the user requirement — the evolution of the data over
time (the value of each numeric column aggregated per period *and* the count of
rows per period) plus the statistical analysis of the series (stationarity,
autocorrelation, trend and seasonality). When there is no temporal column
``build_timeseries`` returns ``None``.
Data sources, read defensively and never recomputed here:
- ``profile['columns']`` — to detect the time column and the numeric columns.
Delegated to the pure registry function ``detect_time_column`` (group ``eda``).
- ``profile['series'][col]`` — the per-column time-series analysis already
produced by ``profile_table(run_series=True)``: ``stationarity`` (ADF+KPSS),
``acf_pacf`` (ACF/PACF + Ljung-Box), ``stl`` (trend/seasonal/resid +
Hyndman strengths) and the levels/returns suggestion.
- ``ctx['timeseries_raw']`` (or ``profile['timeseries_raw']``) — the *raw* ordered
series ``{time_col, t:[iso...], series:{col:[float|None]}}`` needed to draw the
value-vs-time line and the per-period row count. Exactly like ``modelos`` reads
``raw_numeric`` from ``ctx``, this chapter looks for the raw series there and
degrades honestly when it is absent (it still renders the textual analysis).
The raw series is aggregated per period with the pure registry function
``resample_timeseries`` and the datetime header is built with ``profile_datetime``
(both group ``eda``). Every figure is emitted as a lazy ``Figure`` so the
renderers rasterize and scale it to fit a whole page/slide; tables go through
``DataTable``/``KVTable`` so the paginator splits them repeating the header. No
content is ever cut.
ctx keys this chapter consumes (all optional):
timeseries_raw : dict — ``{time_col, t:[...], series:{col:[...]}}`` raw
ordered series used to draw the value-vs-time line and the row-count
panel. When absent the chapter omits those figures (with a note) and
renders only the analysis available in ``profile['series']``.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and never raises.
"""
from __future__ import annotations
from .. import model
# Pure/impure registry functions (group ``eda``) consumed by this chapter,
# imported defensively so the chapter still builds (degrading the affected
# section to a note) if any of them is somehow unavailable.
try:
from datascience.detect_time_column import detect_time_column
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
detect_time_column = None # type: ignore[assignment]
try:
from datascience.profile_datetime import profile_datetime
except Exception: # noqa: BLE001
profile_datetime = None # type: ignore[assignment]
try:
from datascience.resample_timeseries import resample_timeseries
except Exception: # noqa: BLE001
resample_timeseries = None # type: ignore[assignment]
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "timeseries"
CHAPTER_TITLE = "Series temporales"
# Plain-Spanish gloss for the stationarity verdict of adf_kpss_stationarity.
_VERDICT_GLOSS = {
"stationary": "estacionaria: media y varianza estables en el tiempo; se "
"puede modelar directamente.",
"non_stationary": "no estacionaria: tiene tendencia o varianza cambiante "
"(raíz unitaria). Correlacionar o modelar sus niveles "
"produce relaciones espurias (Granger-Newbold); conviene "
"diferenciar o pasar a retornos.",
"inconclusive": "resultado no concluyente (ADF y KPSS discrepan): tratar con "
"cautela, probablemente cerca de la no estacionariedad.",
}
# OHLC-style name fragments used to collapse near-identical financial series.
_OHLC_HINTS = ("open", "high", "low", "close", "adj", "price", "vwap")
def _fmt_num(value, decimals: int = 3) -> str:
"""Compact, defensive number formatting shared with the other chapters."""
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Detection: which column is the time axis and which numeric columns to chart.
# --------------------------------------------------------------------------- #
def _detect(cols: list) -> dict:
"""Return ``{time_col, numeric_cols, ...}`` via the registry function.
Falls back to an inline scan (datetime inferred_type / datetime semantic
types) when ``detect_time_column`` is unavailable, so the chapter still works.
"""
if detect_time_column is not None:
try:
res = detect_time_column(cols)
if _is_dict(res):
return res
except Exception: # noqa: BLE001 — degrade to the inline scan.
pass
time_col = None
numeric_cols = []
for c in cols or []:
if not _is_dict(c):
continue
it = c.get("inferred_type")
sem = c.get("semantic_type")
if time_col is None and (
it == "datetime" or sem in ("datetime_iso", "date_eu")):
time_col = c.get("name")
if it == "numeric":
numeric_cols.append(c.get("name"))
return {"time_col": time_col, "numeric_cols": numeric_cols,
"time_semantic": "", "reason": "inline fallback"}
def _raw_series_for(raw: dict, col: str):
"""Return (t_list, v_list) for a column from the raw bundle, or (None, None)."""
if not _is_dict(raw):
return None, None
t = raw.get("t")
series = raw.get("series") if _is_dict(raw.get("series")) else {}
v = series.get(col)
if isinstance(t, list) and isinstance(v, list) and t and len(t) == len(v):
return t, v
return None, None
def _ohlc_groups(numeric_cols: list, raw: dict) -> dict:
"""Map each numeric column to a representative to collapse OHLC duplicates.
When several numeric columns are near-identical financial level series
(open/high/low/close/adj close), charting each one repeats the same figure
four times. We keep the first OHLC-looking column as the representative for
the *figures* and list the collapsed ones in a note; the textual analysis is
still produced for every column. Detection is by name only (cheap, no extra
data dependency) and conservative: only collapses when >=2 OHLC-like names
are present.
"""
ohlc = [c for c in numeric_cols
if isinstance(c, str) and any(h in c.lower() for h in _OHLC_HINTS)]
if len(ohlc) < 2:
return {}
representative = ohlc[0]
return {c: representative for c in ohlc if c != representative}
# --------------------------------------------------------------------------- #
# Datetime header (MUST-9.3): range / frequency / regularity / gaps.
# --------------------------------------------------------------------------- #
def _datetime_header(time_col: str, raw: dict) -> list:
"""Build the datetime profile header from the raw time axis, when present."""
blocks: list = []
t, _ = (raw.get("t"), None) if _is_dict(raw) else (None, None)
if not (isinstance(t, list) and t and profile_datetime is not None):
return blocks
try:
dt = profile_datetime(t)
except Exception: # noqa: BLE001
return blocks
if not _is_dict(dt):
return blocks
freq_gloss = {
"daily": "diaria", "weekly": "semanal", "monthly": "mensual",
"quarterly": "trimestral", "yearly": "anual",
"irregular": "irregular", "unknown": "indeterminada",
}
rows = [
("Columna de fecha", model._safe_str(time_col)),
("Rango", f"{model._safe_str(dt.get('min'))}"
f"{model._safe_str(dt.get('max'))}"),
("Observaciones", _fmt_num(dt.get("n"))),
("Fechas distintas", _fmt_num(dt.get("n_distinct"))),
("Frecuencia", freq_gloss.get(dt.get("freq"), model._safe_str(dt.get("freq")))),
("Regular", "" if dt.get("is_regular") else "no"),
]
span = dt.get("span_days")
if span is not None:
rows.append(("Duración (días)", _fmt_num(span, 1)))
n_gaps = dt.get("n_gaps")
if n_gaps is not None:
rows.append(("Huecos en la rejilla", _fmt_num(n_gaps)))
blocks.append(model.KVTable(rows=rows, title="Perfil temporal"))
note = dt.get("note")
if note:
blocks.append(model.Note(model._safe_str(note)))
return blocks
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _parse_dates(labels: list):
"""Parse a list of ISO-ish strings/dates to datetime, dropping unparseable.
Returns (dates, kept_index) so callers can align the values list.
"""
from datetime import date, datetime
out = []
keep = []
for i, lab in enumerate(labels):
if isinstance(lab, datetime):
out.append(lab)
keep.append(i)
continue
if isinstance(lab, date):
out.append(datetime(lab.year, lab.month, lab.day))
keep.append(i)
continue
s = model._safe_str(lab).strip()
if not s:
continue
s2 = s.replace("T", " ")
parsed = None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
parsed = datetime.strptime(s2[:len(fmt) + 4] if False else s2, fmt)
break
except ValueError:
continue
if parsed is None:
try:
parsed = datetime.fromisoformat(s.replace("T", " "))
except ValueError:
continue
out.append(parsed)
keep.append(i)
return out, keep
def _make_evolution_figure(name: str, rs: dict):
"""Lazy callable: value-vs-time line + per-period row-count panel (MUST-9.1)."""
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
t_labels = rs.get("t") or []
v = rs.get("v") or []
counts = rs.get("count") or []
dates, keep = _parse_dates(t_labels)
vv = [v[i] if i < len(v) else None for i in keep]
cc = [counts[i] if i < len(counts) else 0 for i in keep]
fig, (ax_v, ax_c) = plt.subplots(
2, 1, figsize=(7.0, 4.6), sharex=True,
gridspec_kw={"height_ratios": [3.0, 1.2], "hspace": 0.12})
# Top: value aggregated per period (line; gaps where the value is None).
xs = [d for d, val in zip(dates, vv) if val is not None]
ys = [val for val in vv if val is not None]
if xs and ys:
ax_v.plot(xs, ys, color="#4e79a7", linewidth=1.4, zorder=3)
ax_v.fill_between(xs, ys, min(ys), color="#9ec6df", alpha=0.18,
zorder=1)
else:
ax_v.text(0.5, 0.5, "(sin valores numéricos)", ha="center",
va="center", fontsize=9, color="#8a8a8a",
transform=ax_v.transAxes)
ax_v.set_ylabel(name, fontsize=8)
ax_v.tick_params(labelsize=7)
ax_v.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax_v.spines[spine].set_visible(False)
# Bottom: number of observations per period (density / gaps).
if dates and cc:
# Bar width ~ median spacing so bars do not overlap nor leave gaps.
width = 1.0
if len(dates) > 1:
deltas = sorted((dates[i + 1] - dates[i]).days
for i in range(len(dates) - 1))
width = max(deltas[len(deltas) // 2] * 0.8, 1.0)
ax_c.bar(dates, cc, width=width, color="#59a14f", alpha=0.75,
align="center")
ax_c.set_ylabel("nº filas", fontsize=8)
ax_c.tick_params(labelsize=7)
ax_c.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax_c.spines[spine].set_visible(False)
ax_c.xaxis.set_major_locator(mdates.AutoDateLocator())
ax_c.xaxis.set_major_formatter(mdates.ConciseDateFormatter(
ax_c.xaxis.get_major_locator()))
freq = rs.get("freq")
suptitle = f"{name} — evolución temporal"
if freq:
suptitle += f" (agregado {freq})"
fig.suptitle(suptitle, fontsize=10, fontweight="bold", x=0.02, ha="left")
return fig
return _draw
def _make_stl_figure(stl: dict):
"""Lazy callable: the STL trend/seasonal/resid panels, or None if no values.
``stl_decompose`` only carries the component *values* for short series; for
long ones it returns just summary stats (``note``). In that case there is
nothing to plot and we return None (the caller renders the strengths as text).
"""
def _component_values(comp):
if _is_dict(comp):
vals = comp.get("values")
if isinstance(vals, list) and vals:
return [x for x in vals]
return None
trend = _component_values(stl.get("trend"))
seasonal = _component_values(stl.get("seasonal"))
resid = _component_values(stl.get("resid"))
if not any([trend, seasonal, resid]):
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
panels = [("Tendencia", trend, "#4e79a7"),
("Estacional", seasonal, "#59a14f"),
("Resto", resid, "#e15759")]
panels = [(lbl, vals, col) for lbl, vals, col in panels if vals]
fig, axes = plt.subplots(len(panels), 1, figsize=(7.0, 1.4 * len(panels) + 0.6),
sharex=True)
if len(panels) == 1:
axes = [axes]
for ax, (lbl, vals, col) in zip(axes, panels):
ax.plot(range(len(vals)), vals, color=col, linewidth=1.2)
ax.set_ylabel(lbl, fontsize=8)
ax.tick_params(labelsize=7)
ax.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax.spines[spine].set_visible(False)
axes[-1].set_xlabel("índice temporal", fontsize=8)
fig.suptitle("Descomposición STL", fontsize=10, fontweight="bold",
x=0.02, ha="left")
fig.tight_layout(rect=(0, 0, 1, 0.96))
return fig
return _draw
def _make_acf_figure(acf_pacf: dict):
"""Lazy callable: the ACF stem plot with ±1.96/√n bands, or None."""
acf = acf_pacf.get("acf")
n = acf_pacf.get("n")
if not (isinstance(acf, list) and len(acf) > 1 and isinstance(n, int) and n > 0):
return None
def _draw():
import math
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
lags = list(range(len(acf)))
fig, ax = plt.subplots(figsize=(7.0, 3.2))
ax.vlines(lags, 0, acf, color="#4e79a7", linewidth=1.4)
ax.plot(lags, acf, "o", color="#4e79a7", markersize=3)
band = 1.96 / math.sqrt(n)
ax.axhspan(-band, band, color="#cccccc", alpha=0.3,
label="banda ±1.96/√n (ruido blanco)")
ax.axhline(0, color="#888888", linewidth=0.8)
ax.set_xlabel("retardo (lag)", fontsize=8)
ax.set_ylabel("ACF", fontsize=8)
ax.tick_params(labelsize=7)
ax.legend(fontsize=7, loc="upper right", framealpha=0.85)
ax.set_title("Autocorrelación (ACF): lags fuera de la banda = "
"correlación significativa", fontsize=9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Per-column textual analysis from profile['series'][col].
# --------------------------------------------------------------------------- #
def _analysis_markdown(sblock: dict) -> str:
"""One markdown block summarizing stationarity / autocorrelation / STL."""
parts: list = []
stat = sblock.get("stationarity") if _is_dict(sblock.get("stationarity")) else {}
verdict = stat.get("verdict")
if verdict:
adf = stat.get("adf") if _is_dict(stat.get("adf")) else {}
kpss = stat.get("kpss") if _is_dict(stat.get("kpss")) else {}
line = (f"**Estacionariedad:** {_VERDICT_GLOSS.get(verdict, verdict)} "
f"(ADF p={_fmt_num(adf.get('p_value'), 4)}, "
f"KPSS p={_fmt_num(kpss.get('p_value'), 4)}).")
warning = stat.get("warning")
if warning:
line += f"{model._safe_str(warning)}"
parts.append(line)
acf = sblock.get("acf_pacf") if _is_dict(sblock.get("acf_pacf")) else {}
if acf:
is_auto = acf.get("is_autocorrelated")
lb = acf.get("ljung_box") if _is_dict(acf.get("ljung_box")) else {}
sig = acf.get("significant_acf_lags") or []
if is_auto is True:
ac_line = ("**Autocorrelación:** la serie está autocorrelada "
"(Ljung-Box rechaza independencia, "
f"p={_fmt_num(lb.get('p_value'), 4)}): los valores dependen "
"de su pasado, no es ruido blanco.")
if sig:
shown = ", ".join(str(x) for x in sig[:8])
more = "" if len(sig) > 8 else ""
ac_line += f" Lags significativos: {shown}{more}."
elif is_auto is False:
ac_line = ("**Autocorrelación:** no se detecta autocorrelación "
"significativa (compatible con ruido blanco, Ljung-Box "
f"p={_fmt_num(lb.get('p_value'), 4)}).")
else:
ac_line = "**Autocorrelación:** no evaluable (datos insuficientes)."
parts.append(ac_line)
stl = sblock.get("stl") if _is_dict(sblock.get("stl")) else {}
if stl:
ts = stl.get("trend_strength")
ss = stl.get("seasonal_strength")
if ts is not None or ss is not None:
parts.append(
"**Descomposición STL:** fuerza de tendencia "
f"{_fmt_num(ts, 2)} y fuerza estacional {_fmt_num(ss, 2)} "
"(escala 01 de Hyndman: cuanto más alto, más marcada la "
"componente).")
elif stl.get("note"):
parts.append(f"**Descomposición STL:** {model._safe_str(stl.get('note'))}")
if sblock.get("levels_suggested"):
reason = sblock.get("levels_reason")
kind = sblock.get("levels_kind")
tr = sblock.get("to_returns") if _is_dict(sblock.get("to_returns")) else None
line = "**Transformación sugerida:** "
line += "pasar a retornos" if kind == "returns" else "diferenciar la serie"
if reason:
line += f"{model._safe_str(reason)}"
if tr and tr.get("mean") is not None:
line += (f" (retornos: media {_fmt_num(tr.get('mean'), 5)}, "
f"σ {_fmt_num(tr.get('std'), 5)}).")
parts.append(line)
return "\n\n".join(parts)
# --------------------------------------------------------------------------- #
# Per-column section.
# --------------------------------------------------------------------------- #
def _column_section(name: str, sblock: dict, raw: dict, collapsed_into) -> list:
"""Blocks for one numeric column: evolution figure + STL + ACF + analysis."""
blocks = [model.Heading(text=model._safe_str(name), level=2)]
# --- Value-vs-time line + per-period row count (MUST-9.1). ---
drew_evolution = False
if collapsed_into is None: # skip the figure for collapsed OHLC duplicates.
t, v = _raw_series_for(raw, name)
if t is not None and resample_timeseries is not None:
try:
rs = resample_timeseries(t, v)
except Exception: # noqa: BLE001
rs = None
if _is_dict(rs) and rs.get("t"):
blocks.append(model.Figure(
make=_make_evolution_figure(name, rs),
caption=f"Evolución de «{name}» por periodo y nº de "
f"observaciones (conteo de filas)."))
drew_evolution = True
else:
blocks.append(model.Note(
f"Serie casi idéntica a «{collapsed_into}» (grupo OHLC): se omite el "
"gráfico para no repetirlo; el análisis estadístico se mantiene."))
if not drew_evolution and collapsed_into is None:
blocks.append(model.Note(
"Gráfico de evolución temporal no disponible: falta la serie cruda "
"(pásala en ctx['timeseries_raw'] = {time_col, t, series}). Se "
"muestra solo el análisis estadístico."))
# --- STL panels (MUST-9.2). ---
stl = sblock.get("stl") if _is_dict(sblock.get("stl")) else {}
if collapsed_into is None and stl:
stl_fig = _make_stl_figure(stl)
if stl_fig is not None:
blocks.append(model.Figure(
make=stl_fig,
caption=f"Descomposición STL de «{name}»: tendencia, componente "
f"estacional y resto."))
# --- ACF figure (autocorrelation structure). ---
acf = sblock.get("acf_pacf") if _is_dict(sblock.get("acf_pacf")) else {}
if collapsed_into is None and acf:
acf_fig = _make_acf_figure(acf)
if acf_fig is not None:
blocks.append(model.Figure(
make=acf_fig,
caption=f"Función de autocorrelación de «{name}»."))
# --- Textual analysis (always, even for collapsed duplicates). ---
analysis = _analysis_markdown(sblock)
if analysis:
blocks.append(model.Markdown(text=analysis))
return blocks
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_timeseries(profile: dict, ctx: dict):
"""Build the TIMESERIES Chapter, or ``None`` if the table has no date column.
Args:
profile: the ``eda`` group TableProfile dict.
ctx: presentation context; ``ctx['timeseries_raw']`` (optional) carries
the raw ordered series used to draw the value-vs-time line and the
per-period row count.
Returns:
A ``model.Chapter`` with, per numeric column, the value-vs-time evolution
+ row-count figure, the STL panels, the ACF figure and the statistical
analysis; or ``None`` when there is no temporal column (the chapter does
not apply).
"""
profile = profile or {}
if not _is_dict(profile):
profile = {}
ctx = ctx or {}
cols = profile.get("columns") or []
det = _detect(cols)
time_col = det.get("time_col")
if not time_col:
return None # no date/datetime column -> chapter does not apply.
numeric_cols = det.get("numeric_cols") or []
series_map = profile.get("series") if _is_dict(profile.get("series")) else {}
raw = ctx.get("timeseries_raw") or profile.get("timeseries_raw")
raw = raw if _is_dict(raw) else {}
# Which columns can the chapter say anything about: those with a series
# analysis block and/or a raw series to chart. Preserve the profile order.
chartable = []
for name in numeric_cols:
has_analysis = _is_dict(series_map.get(name))
has_raw, _ = _raw_series_for(raw, name)
if has_analysis or has_raw is not None:
chartable.append(name)
if not chartable:
# A date column exists but nothing numeric to chart/analyse: still a
# valid (small) chapter — show just the datetime header if we have it.
header = _datetime_header(time_col, raw)
if not header:
return None
intro = (
f"La tabla tiene una columna temporal («{time_col}») pero no hay "
"columnas numéricas con serie analizable.")
blocks = [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=intro)] + header
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
collapsed = _ohlc_groups(chartable, raw)
intro = (
"Este capítulo analiza la evolución de la tabla en el tiempo usando la "
f"columna de fecha «{time_col}». Para cada columna numérica se muestra su "
"**evolución por periodo** (valor agregado) junto al **número de filas por "
"periodo** (densidad de observaciones), su **descomposición STL** "
"(tendencia / estacionalidad / resto) y la **función de autocorrelación**; "
"debajo, el análisis de la serie: estacionariedad (ADF + KPSS), "
"autocorrelación (Ljung-Box) y, cuando procede, la transformación "
"sugerida (retornos o diferencias) para evitar correlaciones espurias.")
blocks = [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=intro)]
blocks += _datetime_header(time_col, raw)
if collapsed:
reps = sorted(set(collapsed.values()))
collapsed_names = ", ".join(sorted(collapsed.keys()))
blocks.append(model.Note(
f"Series OHLC casi idénticas detectadas ({collapsed_names}): se "
f"grafican consolidadas en «{', '.join(reps)}» para no repetir el "
"mismo gráfico; cada columna conserva su análisis estadístico."))
for name in chartable:
sblock = series_map.get(name) if _is_dict(series_map.get(name)) else {}
blocks += _column_section(name, sblock, raw, collapsed.get(name))
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -1,244 +0,0 @@
"""Tests for the TIMESERIES chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic ``series`` blocks (shaped like
``profile_table(run_series=True)`` output) and a raw ``timeseries_raw`` bundle,
with no DuckDB, so the suite is fast and deterministic. Verifies that the chapter:
- returns ``None`` when there is no date/datetime column (the user requirement);
- never raises on ``None``/empty/garbage input;
- with a date column + raw series emits, per numeric column, the value-vs-time +
row-count evolution figure, the STL panels, the ACF figure and the textual
analysis (stationarity / autocorrelation / suggested transform);
- collapses near-identical OHLC series into one chart while keeping every
column's analysis;
- renders without cutting anything in both PDF and PPTX (every column heading
survives in the rendered output).
"""
import math
import os
import re
import tempfile
from pypdf import PdfReader
from datascience.automatic_eda.chapters.timeseries import (
build_timeseries, CHAPTER_VERSION, _VERDICT_GLOSS,
)
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures shaped like the real profile_table(run_series=True) output.
# --------------------------------------------------------------------------- #
def _dates(n: int) -> list:
"""n consecutive daily ISO date strings starting 2021-01-01."""
from datetime import date, timedelta
start = date(2021, 1, 1)
return [(start + timedelta(days=i)).isoformat() for i in range(n)]
def _series_block(n=120, verdict="non_stationary", autocorr=True, levels=True,
with_stl_values=True):
"""A synthetic ``series`` block like _build_series_block produces."""
trend = [float(i) for i in range(n)]
seasonal = [math.sin(i / 6.0) for i in range(n)]
resid = [0.1 * ((-1) ** i) for i in range(n)]
acf = [1.0] + [max(0.0, 0.9 - 0.05 * k) for k in range(1, 21)]
block = {
"order_col": "fecha",
"ordered": True,
"n": n,
"stationarity": {
"n": n, "verdict": verdict,
"adf": {"p_value": 0.42, "stationary": False},
"kpss": {"p_value": 0.01, "stationary": False},
"warning": ("serie no estacionaria: riesgo de correlación espuria"
if verdict != "stationary" else None),
},
"acf_pacf": {
"n": n, "nlags": 20, "acf": acf,
"significant_acf_lags": [1, 2, 3, 4, 5],
"ljung_box": {"stat": 123.4, "p_value": 0.0 if autocorr else 0.7,
"lags": 20},
"is_autocorrelated": autocorr,
},
"period_source": "datetime_freq",
"stl": {
"n": n, "period": 7, "period_inferred": False, "robust": False,
"trend": {"values": trend} if with_stl_values else {
"note": "serie larga: solo estadisticos", "mean": 60.0},
"seasonal": {"values": seasonal} if with_stl_values else {"mean": 0.0},
"resid": {"values": resid} if with_stl_values else {"mean": 0.0},
"trend_strength": 0.95, "seasonal_strength": 0.42,
},
}
if levels:
block["levels_suggested"] = True
block["levels_kind"] = "returns"
block["levels_reason"] = ("columna financiera no estacionaria: usar "
"retornos evita correlación espuria.")
block["to_returns"] = {"method": "log", "mean": 0.001, "std": 0.02}
else:
block["levels_suggested"] = False
return block
def _profile(numeric_names=("precio",), n=120, with_stl_values=True):
cols = [{"name": "fecha", "inferred_type": "datetime",
"semantic_type": "datetime_iso"}]
series_map = {}
for nm in numeric_names:
cols.append({"name": nm, "inferred_type": "numeric",
"numeric": {"min": 1.0, "max": 200.0, "mean": 100.0,
"median": 95.0, "std": 40.0}})
series_map[nm] = _series_block(n=n, with_stl_values=with_stl_values)
return {"table": "cotizaciones", "n_rows": n, "n_cols": len(cols),
"columns": cols, "series": series_map}
def _ctx_raw(numeric_names=("precio",), n=120):
t = _dates(n)
series = {}
for j, nm in enumerate(numeric_names):
series[nm] = [float(100 + i + 5 * j) for i in range(n)]
return {"timeseries_raw": {"time_col": "fecha", "t": t, "series": series}}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_estructura_y_figuras():
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
assert ch is not None
assert ch.id == "timeseries"
assert ch.version == CHAPTER_VERSION
kinds = [b.kind for b in ch.blocks]
assert kinds[0] == "heading" # chapter title
assert kinds[1] == "markdown" # intro
assert "kv_table" in kinds # datetime profile header (MUST-9.3)
# Per column: evolution figure + STL figure + ACF figure + analysis markdown.
figs = [b for b in ch.blocks if b.kind == "figure"]
assert len(figs) >= 3, "evolución + STL + ACF esperadas"
# Lazy makers must produce real matplotlib figures.
import matplotlib.pyplot as plt
for f in figs:
fig = f.make()
assert fig is not None
plt.close(fig)
def test_golden_evolucion_tiene_dos_paneles_valor_y_conteo():
# MUST-9.1: the evolution figure has a value panel + a row-count panel.
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
figs = [b for b in ch.blocks if b.kind == "figure"]
import matplotlib.pyplot as plt
fig = figs[0].make() # first figure is the evolution one.
assert len(fig.axes) == 2, "panel de valor + panel de conteo de filas"
plt.close(fig)
def test_golden_analisis_textual_presente():
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "Estacionariedad" in md
assert "Autocorrelación" in md
assert "STL" in md
# Verdict gloss surfaced for the non-stationary preset.
assert _VERDICT_GLOSS["non_stationary"].split(":")[0] in md
# Levels/returns suggestion surfaced.
assert "retornos" in md.lower()
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_sin_columna_fecha_devuelve_none():
prof = {"columns": [
{"name": "precio", "inferred_type": "numeric", "numeric": {"mean": 1.0}},
{"name": "ciudad", "inferred_type": "categorical",
"categorical": {"top": []}},
], "series": {"precio": _series_block()}}
assert build_timeseries(prof, {}) is None
def test_edge_none_y_vacio_no_revienta():
assert build_timeseries(None, None) is None
assert build_timeseries({}, {}) is None
assert build_timeseries({"columns": []}, {}) is None
# Date column but nothing numeric/series and no raw -> None (nothing to say).
assert build_timeseries(
{"columns": [{"name": "fecha", "inferred_type": "datetime"}]}, {}) is None
def test_edge_sin_raw_degrada_pero_mantiene_analisis():
# No ctx['timeseries_raw']: the chapter must still build (STL/ACF/analysis
# from the profile) and note that the evolution chart is unavailable.
ch = build_timeseries(_profile(("precio",)), {})
assert ch is not None
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "evolución temporal no disponible" in notes
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "Estacionariedad" in md
def test_edge_stl_solo_estadisticos_no_dibuja_panel_pero_no_revienta():
# Long series: STL carries only stats (no 'values') -> no STL figure, but the
# strengths still surface in the textual analysis.
ch = build_timeseries(_profile(("precio",), with_stl_values=False),
_ctx_raw(("precio",)))
assert ch is not None
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "STL" in md
# --------------------------------------------------------------------------- #
# OHLC consolidation (MUST-9.3).
# --------------------------------------------------------------------------- #
def test_ohlc_consolidacion():
names = ("Open", "High", "Low", "Close")
ch = build_timeseries(_profile(names), _ctx_raw(names))
assert ch is not None
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "OHLC" in notes
# Only the representative draws the evolution figure; the other 3 are collapsed
# so there are fewer evolution figures than columns.
captions = [b.caption or "" for b in ch.blocks if b.kind == "figure"]
evo = [c for c in captions if "Evolución" in c]
assert len(evo) < len(names), "las series OHLC deben consolidarse"
# Every column still has its analysis markdown (one heading per column).
headings = [b.text for b in ch.blocks if b.kind == "heading" and b.level == 2]
for nm in names:
assert nm in headings
# --------------------------------------------------------------------------- #
# Anti-cut: PDF + PPTX.
# --------------------------------------------------------------------------- #
def test_anti_corte_pdf_y_pptx():
names = tuple(f"serie_{i}" for i in range(6))
prof = _profile(names, n=90)
ctx = _ctx_raw(names, n=90)
ch = build_timeseries(prof, ctx)
col_headings = [b.text for b in ch.blocks if b.kind == "heading" and b.level == 2]
assert len(col_headings) == 6
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "ts.pdf")
res_pdf = render_automatic_eda_pdf(
prof, pdf, {"ctx": ctx, "write_manifest": False})
assert res_pdf["path"] == pdf
txt = _pdf_text(pdf)
for nm in col_headings:
assert nm in txt, f"columna '{nm}' cortada/ausente en el PDF"
pptx = os.path.join(d, "ts.pptx")
res_pptx = render_automatic_eda_pptx(
prof, pptx, {"ctx": ctx, "write_manifest": False})
assert res_pptx["path"] == pptx
assert res_pptx["n_slides"] >= 6
@@ -0,0 +1,68 @@
---
name: build_geo_scatter
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def build_geo_scatter(lats: list, lons: list, max_points: int = 2000) -> dict"
description: "Prepara los datos de un scatter geografico en proyeccion equirectangular para el grupo eda. Empareja lats/lons por indice, descarta pares None/NaN/inf/bool o fuera de rango (lat en [-90,90], lon en [-180,180]) y aplica downsampling DETERMINISTA por paso fijo (pairs[::step]) cuando hay mas pares validos que max_points, para no saturar el PDF/PPTX en moviles. Devuelve los puntos en orden [lon, lat] listos para ax.scatter, el bbox, el aspect 1/cos(centroid_lat) clampado a [0.3,5.0] y un pad sugerido (~5% del rango con suelo minimo). Lectura defensiva; NUNCA lanza ni dibuja: el capitulo se encarga de matplotlib."
tags: [eda, geospatial, datascience, scatter, map, downsample, equirectangular, profiling]
params:
- name: lats
desc: "Lista (o tupla) de latitudes en grados, paralela a lons. Se empareja por indice. Un valor None, NaN, infinito, bool o fuera de [-90,90] descarta ese par. Lectura defensiva."
- name: lons
desc: "Lista (o tupla) de longitudes en grados, paralela a lats. Un valor None, NaN, infinito, bool o fuera de [-180,180] descarta ese par."
- name: max_points
desc: "Tope de puntos a devolver (default 2000). Si los pares validos superan el tope, se hace downsampling determinista por paso fijo step=ceil(n_total/max_points) tomando pairs[::step] (NO aleatorio, reproducible). Un valor no entero o <=0 desactiva el downsampling."
output: "Dict listo para dibujar: {points: [[lon, lat], ...] en orden x=lon/y=lat para ax.scatter; n_total: pares validos antes del downsample (int); n_shown: puntos devueltos tras el downsample (int); downsampled: bool (n_shown<n_total); bbox: {lat_min, lat_max, lon_min, lon_max} o None si no hay puntos; aspect: 1/cos(centroid_lat) clampado a [0.3,5.0] para no estirar la proyeccion equirectangular; pad: {lon, lat} ~5% del rango respectivo con suelo minimo 0.01 grados}. Si no hay pares validos: points=[], n_total=0, n_shown=0, downsampled=False, bbox=None, aspect=1.0, pad={lon:0.0, lat:0.0}."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: true
tests: ["test_geo_scatter_nube_espana", "test_downsampling_determinista_y_reproducible", "test_listas_vacias_no_lanza", "test_un_solo_punto_pad_minimo_y_aspect_finito", "test_filtra_none_nan_y_fuera_de_rango", "test_latitud_alta_aspect_clamped"]
test_file_path: "python/functions/datascience/build_geo_scatter_test.py"
file_path: "python/functions/datascience/build_geo_scatter.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.build_geo_scatter import build_geo_scatter
# Nube de coordenadas (lat, lon) alrededor de Madrid:
lats = [40.0, 41.0, 39.0, 40.5]
lons = [-3.7, -3.0, -4.0, -3.5]
geo = build_geo_scatter(lats, lons, max_points=2000)
print(geo["points"][0]) # [-3.7, 40.0] -> orden [x=lon, y=lat]
print(geo["bbox"]) # {'lat_min': 39.0, 'lat_max': 41.0, 'lon_min': -4.0, 'lon_max': -3.0}
print(round(geo["aspect"], 3)) # 1.308 -> ensancha el eje x en latitudes medias
print(geo["pad"]) # {'lon': 0.05, 'lat': 0.1} -> margen ~5%
# El capitulo dibuja con matplotlib (esta funcion NO dibuja):
# xs = [p[0] for p in geo["points"]]; ys = [p[1] for p in geo["points"]]
# ax.scatter(xs, ys); ax.set_aspect(geo["aspect"])
# ax.set_xlim(geo["bbox"]["lon_min"] - geo["pad"]["lon"], geo["bbox"]["lon_max"] + geo["pad"]["lon"])
# ax.set_ylim(geo["bbox"]["lat_min"] - geo["pad"]["lat"], geo["bbox"]["lat_max"] + geo["pad"]["lat"])
```
## Cuando usarla
- Usala antes de dibujar un scatter geografico (mapa de puntos en proyeccion equirectangular) en el capitulo geospatial de `AutomaticEDA`: limpia los pares de coordenadas, los reduce a un tamano razonable para el PDF/PPTX y te da bbox, aspect y pad listos para fijar los ejes.
- Cuando tengas dos columnas de lat/lon ya extraidas y quieras un punto de entrada determinista (mismo dataset -> mismo dibujo) que no sature el documento en moviles.
- Cuando necesites el aspect correcto para que un grado de longitud no se vea estirado respecto a uno de latitud (integridad visual, Tufte) sin calcularlo a mano.
## Gotchas
- Funcion pura, sin I/O y determinista. NO dibuja: solo PREPARA los datos; el capitulo se encarga de matplotlib. Lectura defensiva: pares con None/NaN/inf/bool o coordenadas fuera de rango se descartan en silencio y NUNCA lanza.
- El downsampling es DETERMINISTA por paso fijo (`step = ceil(n_total / max_points)`, `pairs[::step]`), NO aleatorio: la misma entrada produce siempre la misma salida (reproducible en tests). El primer punto mostrado es siempre el primer par valido. No es un muestreo uniforme aleatorio — es un barrido regular del orden de entrada.
- `points` va en orden `[lon, lat]` (x, y), no `[lat, lon]`: pasalo directo a `ax.scatter(xs, ys)` sin invertir. Confundir el orden espeja el mapa.
- `aspect = 1/cos(centroid_lat)` se clampa a `[0.3, 5.0]`. En latitudes altas `cos -> 0` y el valor real explota: por encima de ~78 grados el aspect queda fijado en 5.0. Si el centroide cae justo en un polo (`+-90`) se usa el clamp en vez de dividir por cero.
- `pad` es ~5% del rango de cada eje con un suelo minimo de `0.01` grados: con un solo punto o todos iguales (rango 0) el pad cae al suelo para que el punto no quede en una linea. En el caso sin puntos validos el pad es `{lon:0.0, lat:0.0}` y `bbox` es `None`.
- `bbox`, `aspect` y `pad` se calculan sobre los puntos YA mostrados (tras el downsample), de modo que los ejes encajan exactamente con lo que se dibuja.
@@ -0,0 +1,153 @@
"""build_geo_scatter — prepare points for a geographic scatter (EDA `geospatial`).
Pure function: no I/O, deterministic. Takes two parallel lists of latitudes and
longitudes and returns the data a caller needs to draw a geographic scatter in an
equirectangular projection: cleaned points in [lon, lat] order, a bounding box, a
projection aspect ratio and a suggested axis padding.
It NEVER draws anything (no matplotlib) — the chapter that consumes this output is
responsible for the rendering. Reading is defensive throughout and the function
NEVER raises: malformed pairs (None, NaN, infinity or out-of-range coordinates)
are silently dropped and an empty/valid result is always returned.
To keep the rendered PDF/PPTX light on phones, when the number of valid pairs
exceeds `max_points` the points are down-sampled DETERMINISTICALLY by a fixed
step (`pairs[::step]`), never randomly, so the result is reproducible.
"""
import math
# Minimum axis padding (in degrees) so a single point or a zero-range cloud is
# never drawn glued to the axis border (it would collapse to a line).
_MIN_PAD = 0.01
# Aspect ratio clamp. 1/cos(lat) blows up near the poles; clamp keeps the render
# sane (Tufte: do not let the projection stretch the cloud out of proportion).
_ASPECT_MIN = 0.3
_ASPECT_MAX = 5.0
def _coord(value):
"""Coerce to a finite float defensively; return None for invalid coordinates.
bool is a subclass of int, but a real latitude/longitude is never a bool, so
True/False are treated as missing instead of coercing to 1.0/0.0. NaN and
+/-infinity are never valid coordinates either.
"""
if value is None or isinstance(value, bool):
return None
try:
coord = float(value)
except (TypeError, ValueError):
return None
if math.isnan(coord) or math.isinf(coord):
return None
return coord
def build_geo_scatter(lats: list, lons: list, max_points: int = 2000) -> dict:
"""Prepare the data for a geographic scatter in equirectangular projection.
Pairs `lats` and `lons` by index, drops invalid pairs, optionally
down-samples deterministically, and derives the geometry (bbox, aspect, pad)
a caller needs to draw the cloud. No raw rendering is performed.
Args:
lats: List (or tuple) of latitudes in degrees. Paired by index with
`lons`. A value that is None, NaN, infinite, bool or outside
[-90, 90] discards that pair. Read defensively.
lons: List (or tuple) of longitudes in degrees, parallel to `lats`. A
value outside [-180, 180] (or None/NaN/inf/bool) discards that pair.
max_points: Cap on the number of points returned. When the number of
valid pairs exceeds this cap, the points are down-sampled by a fixed
step `ceil(n_total / max_points)` taking `pairs[::step]` — DETERMINISTIC,
not random, so the output is reproducible. A non-positive or non-int
value disables down-sampling.
Returns:
Dict ready for a caller's ax.scatter:
{points: [[lon, lat], ...] (x=lon, y=lat order), n_total: valid pairs
before down-sampling, n_shown: points returned, downsampled: bool,
bbox: {lat_min, lat_max, lon_min, lon_max} or None, aspect: 1/cos(centroid
lat) clamped to [0.3, 5.0], pad: {lon, lat} ~5% of each range with a small
floor}. When there are no valid pairs returns points=[], n_total=0,
n_shown=0, downsampled=False, bbox=None, aspect=1.0, pad={lon:0.0, lat:0.0}.
"""
pairs = [] # each item is (lon, lat) — already in [x, y] order
if isinstance(lats, (list, tuple)) and isinstance(lons, (list, tuple)):
n = min(len(lats), len(lons))
for i in range(n):
lat = _coord(lats[i])
lon = _coord(lons[i])
if lat is None or lon is None:
continue
if lat < -90.0 or lat > 90.0:
continue
if lon < -180.0 or lon > 180.0:
continue
pairs.append((lon, lat))
n_total = len(pairs)
if n_total == 0:
return {
"points": [],
"n_total": 0,
"n_shown": 0,
"downsampled": False,
"bbox": None,
"aspect": 1.0,
"pad": {"lon": 0.0, "lat": 0.0},
}
# Deterministic down-sampling by a fixed step. Reproducible: same input ->
# same output, no randomness.
if (
isinstance(max_points, int)
and not isinstance(max_points, bool)
and max_points > 0
and n_total > max_points
):
step = math.ceil(n_total / max_points)
sampled = pairs[::step]
else:
sampled = pairs
points = [[lon, lat] for (lon, lat) in sampled]
n_shown = len(points)
downsampled = n_shown < n_total
lons_s = [p[0] for p in sampled]
lats_s = [p[1] for p in sampled]
lon_min, lon_max = min(lons_s), max(lons_s)
lat_min, lat_max = min(lats_s), max(lats_s)
bbox = {
"lat_min": lat_min,
"lat_max": lat_max,
"lon_min": lon_min,
"lon_max": lon_max,
}
# Aspect for an equirectangular projection: stretch the x axis by 1/cos(lat)
# at the cloud centroid so a degree of longitude reads at its real width.
centroid_lat = sum(lats_s) / len(lats_s)
cos_lat = math.cos(math.radians(centroid_lat))
if cos_lat < 1e-12: # centroid at (or numerically at) a pole
aspect = _ASPECT_MAX
else:
aspect = 1.0 / cos_lat
aspect = max(_ASPECT_MIN, min(_ASPECT_MAX, aspect))
# Padding ~5% of each range, with a small floor so a zero-range cloud (single
# point / all identical) still gets a non-zero margin.
pad_lon = max(0.05 * (lon_max - lon_min), _MIN_PAD)
pad_lat = max(0.05 * (lat_max - lat_min), _MIN_PAD)
return {
"points": points,
"n_total": n_total,
"n_shown": n_shown,
"downsampled": downsampled,
"bbox": bbox,
"aspect": aspect,
"pad": {"lon": pad_lon, "lat": pad_lat},
}
@@ -0,0 +1,140 @@
"""Tests para build_geo_scatter."""
import math
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from build_geo_scatter import build_geo_scatter
# Keys that a non-empty result dict must always contain.
_EXPECTED_KEYS = {
"points", "n_total", "n_shown", "downsampled", "bbox", "aspect", "pad",
}
def test_geo_scatter_nube_espana():
"""Golden: nube en Espana -> points en orden [lon, lat], bbox, aspect>1, pad 5%."""
# Cuatro puntos alrededor de Madrid (lat ~40, lon negativo).
lats = [40.0, 41.0, 39.0, 40.5]
lons = [-3.7, -3.0, -4.0, -3.5]
r = build_geo_scatter(lats, lons)
assert set(r.keys()) == _EXPECTED_KEYS
# points en orden [x=lon, y=lat]: primer elemento lon (negativo), segundo lat (~40).
assert r["points"] == [[-3.7, 40.0], [-3.0, 41.0], [-4.0, 39.0], [-3.5, 40.5]]
for lon, lat in r["points"]:
assert lon < 0.0 # longitudes de Espana son negativas
assert 36.0 < lat < 44.0 # latitudes peninsulares
# Sin downsampling: 4 < 2000.
assert r["n_total"] == 4
assert r["n_shown"] == 4
assert r["downsampled"] is False
# bbox correcto.
assert r["bbox"] == {
"lat_min": 39.0, "lat_max": 41.0,
"lon_min": -4.0, "lon_max": -3.0,
}
# aspect = 1/cos(centroid_lat); centroid = 40.125 -> ~1.31 > 1.
centroid_lat = (40.0 + 41.0 + 39.0 + 40.5) / 4.0
expected_aspect = 1.0 / math.cos(math.radians(centroid_lat))
assert r["aspect"] > 1.0
assert abs(r["aspect"] - expected_aspect) < 1e-9
assert abs(r["aspect"] - 1.305) < 0.02 # cos(40) ~ 0.77
# pad 5% del rango (lon_range=1.0 -> 0.05 ; lat_range=2.0 -> 0.1).
assert abs(r["pad"]["lon"] - 0.05) < 1e-9
assert abs(r["pad"]["lat"] - 0.10) < 1e-9
def test_downsampling_determinista_y_reproducible():
"""Golden: 5000 puntos, max_points=2000 -> n_shown<=2000, downsampled, reproducible."""
lats = [40.0 + (i % 100) * 0.01 for i in range(5000)]
lons = [-3.0 - (i % 100) * 0.01 for i in range(5000)]
r1 = build_geo_scatter(lats, lons, max_points=2000)
assert r1["n_total"] == 5000
assert r1["n_shown"] <= 2000
assert r1["downsampled"] is True
# step = ceil(5000/2000) = 3 -> len(pairs[::3]) = 1667.
assert r1["n_shown"] == 1667
# Determinista: dos llamadas con la misma entrada dan exactamente lo mismo.
r2 = build_geo_scatter(lats, lons, max_points=2000)
assert r1 == r2
assert r1["points"] == r2["points"]
# El primer punto del downsample es el primer par valido (step parte de 0).
assert r1["points"][0] == [lons[0], lats[0]]
def test_listas_vacias_no_lanza():
"""Edge: listas vacias / None -> points [] sin lanzar."""
r = build_geo_scatter([], [])
assert r["points"] == []
assert r["n_total"] == 0
assert r["n_shown"] == 0
assert r["downsampled"] is False
assert r["bbox"] is None
assert r["aspect"] == 1.0
assert r["pad"] == {"lon": 0.0, "lat": 0.0}
# None como entrada tampoco lanza.
assert build_geo_scatter(None, None)["points"] == []
assert build_geo_scatter([40.0], None)["n_total"] == 0
assert build_geo_scatter(None, [-3.0])["n_total"] == 0
def test_un_solo_punto_pad_minimo_y_aspect_finito():
"""Edge: un solo punto -> pad minimo no cero, bbox degenerado, aspect finito."""
r = build_geo_scatter([40.0], [-3.7])
assert r["n_total"] == 1
assert r["n_shown"] == 1
assert r["points"] == [[-3.7, 40.0]]
assert r["downsampled"] is False
assert r["bbox"] == {
"lat_min": 40.0, "lat_max": 40.0,
"lon_min": -3.7, "lon_max": -3.7,
}
# rango 0 -> pad cae al floor minimo (no cero).
assert r["pad"]["lon"] == 0.01
assert r["pad"]["lat"] == 0.01
# aspect finito y dentro del clamp.
assert math.isfinite(r["aspect"])
assert 0.3 <= r["aspect"] <= 5.0
def test_filtra_none_nan_y_fuera_de_rango():
"""Edge: pares con None/NaN/fuera de rango se descartan por indice."""
nan = float("nan")
inf = float("inf")
# i=0 i=1 i=2 i=3 i=4 i=5 i=6
lats = [40.0, None, nan, 200.0, 41.0, 39.0, inf]
lons = [-3.0, -3.5, -3.6, -3.7, 999.0, -4.0, -2.0]
r = build_geo_scatter(lats, lons)
# Validos solo i=0 (40,-3.0) e i=5 (39,-4.0):
# i=1 lat None, i=2 lat NaN, i=3 lat 200 fuera de rango,
# i=4 lon 999 fuera de rango, i=6 lat inf.
assert r["n_total"] == 2
assert r["points"] == [[-3.0, 40.0], [-4.0, 39.0]]
assert r["bbox"] == {
"lat_min": 39.0, "lat_max": 40.0,
"lon_min": -4.0, "lon_max": -3.0,
}
def test_latitud_alta_aspect_clamped():
"""Edge: latitudes ~85 -> aspect clamped <= 5.0."""
r = build_geo_scatter([85.0, 85.0, 84.0], [10.0, 11.0, 9.0])
# cos(~84.7) ~ 0.093 -> 1/0.093 ~ 10.7 -> clamp a 5.0.
assert r["aspect"] <= 5.0
assert r["aspect"] == 5.0
assert math.isfinite(r["aspect"])
@@ -0,0 +1,67 @@
---
name: detect_latlon_columns
id: detect_latlon_columns_py_datascience
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def detect_latlon_columns(columns: list, samples: dict | None = None) -> dict"
description: "Detecta un par (latitud, longitud) entre las columnas de un TableProfile del grupo eda combinando heuristica de nombre (latitude/longitude/lat/lon/lng + x/y debiles) con validacion de rango obligatoria (latitud en [-90,90], longitud en [-180,180]). Lee defensivamente con .get; NUNCA lanza. Usa el sub-bloque numeric.min/max o, si falta, la lista de samples opcional. Devuelve SIEMPRE un dict {lat_col, lon_col, confidence, reason}; si no hay par valido, las columnas van a None y confidence a 0.0."
tags: [eda, geospatial, profiling, latlon, coordinates, detection, datascience]
params:
- name: columns
desc: "Lista de dicts ColumnProfile (el campo `columns` de un TableProfile del grupo eda). Cada dict se lee con .get; solo `name` (str) es obligatorio. Se consultan `inferred_type` (p.ej. 'numeric') y el sub-dict `numeric` con `min`/`max` (floats) para validar el rango. Entradas no-dict o sin name se ignoran sin lanzar."
- name: samples
desc: "Opcional {nombre_columna: [valores...]} para validar el rango cuando una columna no trae numeric.min/max. Los valores nulos se ignoran; si algun valor no nulo no es numerico la columna no se considera coordenada. Si es None u omitido, solo se usa el bloque numeric."
output: "Dict SIEMPRE presente con la forma {lat_col: str|None, lon_col: str|None, confidence: float en [0,1], reason: str en espanol}. En exito, lat_col y lon_col nombran columnas distintas; confidence ~1.0 para par con nombre fuerte (latitude/longitude/lat/lon/lng) + rango valido y ~0.7 para par debil (x/y) + rango. En fallo, ambas columnas None, confidence 0.0 y reason explica por que (sin columnas, nombre sin match, rango fuera de bounds, falta uno de los dos ejes...)."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: true
tests: ["test_par_latitude_longitude_fuerte", "test_par_lat_lon_abreviado", "test_par_x_y_debil_con_rango_valido", "test_nombre_lat_lon_pero_rango_fuera_no_detecta", "test_par_fuerte_prevalece_sobre_debil", "test_entradas_vacias_o_invalidas_no_lanzan", "test_solo_latitud_sin_longitud_no_detecta", "test_deteccion_por_samples_cuando_falta_numeric", "test_samples_fuera_de_rango_descarta"]
test_file_path: "python/functions/datascience/detect_latlon_columns_test.py"
file_path: "python/functions/datascience/detect_latlon_columns.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.detect_latlon_columns import detect_latlon_columns
# Columnas tal y como vienen en profile['columns'] de un TableProfile del grupo eda:
columns = [
{"name": "id", "inferred_type": "numeric", "numeric": {"min": 1, "max": 9999}},
{"name": "latitude", "inferred_type": "numeric", "numeric": {"min": -45.0, "max": 45.0}},
{"name": "longitude", "inferred_type": "numeric", "numeric": {"min": -120.0, "max": 120.0}},
]
res = detect_latlon_columns(columns)
print(res["lat_col"], res["lon_col"], res["confidence"])
# latitude longitude 1.0
# Sin bloque numeric, validando el rango con samples:
cols2 = [{"name": "lat"}, {"name": "lon"}]
samples = {"lat": [10.5, 20.0, 30.25], "lon": [-40.0, 50.5, 60.0]}
print(detect_latlon_columns(cols2, samples)["lat_col"]) # lat
```
## Cuando usarla
- Usala al perfilar una tabla en `AutomaticEDA` para decidir si tiene geometria de puntos: cuando `detect_latlon_columns` devuelve un par con `confidence` alta, el capitulo geospatial puede dibujar un mapa, calcular un bounding box o proponer un cluster espacial.
- Antes de un analisis geoespacial (alpha shape, convex hull, joins por proximidad) para localizar automaticamente que columnas son la latitud y la longitud sin pedirlo al usuario.
- Cuando recibas un `TableProfile` del grupo `eda` y quieras enrutar columnas a sub-analisis por tipo semantico: este es el detector del par lat/lon, complementario a `infer_semantic_type`.
## Gotchas
- Funcion pura, sin I/O y determinista. Lectura defensiva con `.get`: NUNCA lanza. Cualquier input malformado (None, no-lista, entradas no-dict, claves ausentes) devuelve el dict de fallo con `lat_col`/`lon_col` en None y `confidence` 0.0.
- **El nombre solo no basta**: una columna `latitude` cuyo rango se sale de `[-90, 90]` se descarta (no es coordenada real). Igual para `longitude` fuera de `[-180, 180]`. La validacion de rango es obligatoria.
- El rango de latitud `[-90, 90]` es un subconjunto del de longitud `[-180, 180]`, por eso el nombre es necesario para desambiguar cual eje es cual; una columna numerica en `[-90, 90]` sin nombre que sugiera lat/lon no se detecta.
- Los nombres genericos `x`/`y` (y `x_coord`/`y_coord`) son candidatos **debiles**: solo forman par si el rango encaja y existe la otra mitad (un `x`/`lon` para la `y`, un `y`/`lat` para la `x`). Un `y` suelto sin pareja devuelve None.
- Requiere AMBOS ejes para considerar exito. Si solo encuentra latitud o solo longitud, devuelve el dict de fallo (no media coordenada).
- `samples` solo se consulta cuando falta `numeric.min`/`numeric.max`. Si una columna trae el bloque numeric, ese manda aunque pases samples para ella.
- El matching de nombre es por subcadena normalizada (se quitan `_`, `-` y espacios), asi que nombres como `plate` (contiene "lat") podrian marcarse como candidatos por nombre — pero solo pasarian si su rango cae en `[-90, 90]` y hay una longitud pareja, filtro que en la practica descarta los falsos positivos.
@@ -0,0 +1,198 @@
"""detect_latlon_columns — detect a (latitude, longitude) column pair in an EDA profile.
Pure function: no I/O, deterministic. Takes the `columns` list of a TableProfile
(group `eda`) and decides whether two of its columns form a geographic coordinate
pair (latitude + longitude), combining a name heuristic with a value-range check.
The detection is intentionally conservative: a name hint alone is never enough. A
column is only accepted as latitude/longitude if its numeric range fits inside the
valid coordinate bounds ([-90, 90] for latitude, [-180, 180] for longitude). When
the `numeric` sub-block is absent the optional `samples` argument is used instead.
Reading is fully defensive (.get throughout) and the function NEVER raises: any
malformed input (None, non-list, non-dict entries, missing keys) simply yields a
no-pair result {"lat_col": None, "lon_col": None, "confidence": 0.0, "reason": ...}.
"""
import re
# Collapse the separators a column name may use (snake_case, kebab-case, spaces)
# so that "y_coord", "y-coord" and "y coord" all normalize to the same token.
_SEP_RE = re.compile(r"[\s_\-]+")
# Name-match strengths: a strong, unambiguous coordinate name vs a weak generic
# axis name (x / y) that only counts when the range also fits and a partner exists.
_STRONG = 0.6
_WEAK = 0.3
_RANGE_BONUS = 0.4 # added once the mandatory range validation passes
def _normalize(name):
"""Lowercase a column name and strip separator chars (_, -, whitespace)."""
if not isinstance(name, str):
return ""
return _SEP_RE.sub("", name.strip().lower())
def _num(value):
"""Coerce to float defensively; return None for None/bool/non-numeric."""
# bool is a subclass of int; a coordinate value is never a real bool, so treat
# True/False as missing instead of silently coercing to 1.0/0.0.
if value is None or isinstance(value, bool):
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _lat_name_strength(nn):
"""Strength of a normalized name as a latitude candidate (0=no match)."""
if not nn:
return 0.0
# "lat", "latitude", "latitud" all contain the "lat" stem.
if "lat" in nn:
return _STRONG
# Weak generic axis name: only useful when paired with an x/lon partner.
if nn in ("y", "ycoord", "ycoordinate", "ycoordinates"):
return _WEAK
return 0.0
def _lon_name_strength(nn):
"""Strength of a normalized name as a longitude candidate (0=no match)."""
if not nn:
return 0.0
# "lon", "long", "longitude", "longitud" share the "lon" stem; "lng" is separate.
if "lon" in nn or "lng" in nn:
return _STRONG
if nn in ("x", "xcoord", "xcoordinate", "xcoordinates"):
return _WEAK
return 0.0
def _col_range(col, sample_values):
"""Return (min, max) floats for a column, or (None, None) if not numeric.
Prefers the `numeric` sub-block min/max (the output of describe_numeric); falls
back to the provided sample list. A column is only treated as numeric when both
extremes are derivable: from the numeric block, or from samples whose every
non-null value coerces to a number.
"""
if isinstance(col, dict):
numeric = col.get("numeric")
if isinstance(numeric, dict):
mn = _num(numeric.get("min"))
mx = _num(numeric.get("max"))
if mn is not None and mx is not None:
return mn, mx
# Fall back to samples when the numeric block is missing or incomplete.
if isinstance(sample_values, (list, tuple)):
non_null = [v for v in sample_values if v is not None]
if non_null:
coerced = [_num(v) for v in non_null]
# Any non-numeric sample means we cannot trust the column as numeric.
if all(c is not None for c in coerced):
return min(coerced), max(coerced)
return None, None
def _no_pair(reason):
"""Canonical empty result: no coordinate pair detected."""
return {"lat_col": None, "lon_col": None, "confidence": 0.0, "reason": reason}
def detect_latlon_columns(columns: list, samples: dict | None = None) -> dict:
"""Detect a (latitude, longitude) column pair from an eda TableProfile.
Combines a name heuristic (latitude/longitude/lat/lon/lng + weak x/y) with a
mandatory range validation: the chosen latitude must sit in [-90, 90] and the
longitude in [-180, 180]. A name hint whose range does not fit is discarded.
Both sides are required for success; if only one is found, no pair is returned.
Args:
columns: List of ColumnProfile dicts (the `columns` of a TableProfile).
Each dict is read defensively with .get; only `name` is required.
`numeric.min` / `numeric.max` (and optionally `inferred_type`) are used
for the range check when present.
samples: Optional {column_name: [values...]} used to validate the range
when a column lacks `numeric.min`/`numeric.max`. If None/omitted, only
the `numeric` sub-block is consulted.
Returns:
Always a dict {"lat_col": str|None, "lon_col": str|None,
"confidence": float, "reason": str}. On success lat_col and lon_col name
the detected pair (distinct columns) and confidence is in [0, 1]: a pair
validated by a strong name on both sides scores ~1.0, a weak x/y pair ~0.7.
On failure both columns are None and confidence is 0.0.
"""
if not isinstance(columns, (list, tuple)) or len(columns) == 0:
return _no_pair("sin columnas que inspeccionar")
sample_map = samples if isinstance(samples, dict) else {}
# (column_name, confidence) for each side. Confidence already includes the
# range bonus because membership in the list implies the range was validated.
lat_candidates = []
lon_candidates = []
for col in columns:
if not isinstance(col, dict):
continue
name = col.get("name")
if not isinstance(name, str) or not name:
continue
nn = _normalize(name)
lat_strength = _lat_name_strength(nn)
lon_strength = _lon_name_strength(nn)
if lat_strength == 0.0 and lon_strength == 0.0:
continue # name gives no coordinate hint; skip.
mn, mx = _col_range(col, sample_map.get(name))
is_numeric = mn is not None and mx is not None
if not is_numeric:
continue # range cannot be validated -> not a coordinate.
if lat_strength > 0.0 and mn >= -90.0 and mx <= 90.0:
lat_candidates.append((name, lat_strength + _RANGE_BONUS))
if lon_strength > 0.0 and mn >= -180.0 and mx <= 180.0:
lon_candidates.append((name, lon_strength + _RANGE_BONUS))
if not lat_candidates and not lon_candidates:
return _no_pair("ninguna columna sugiere latitud ni longitud por nombre+rango")
if not lat_candidates:
return _no_pair("no se encontro columna de latitud valida (nombre+rango en [-90,90])")
if not lon_candidates:
return _no_pair("no se encontro columna de longitud valida (nombre+rango en [-180,180])")
# Pick the distinct pair with the highest combined confidence. First match wins
# on ties to keep the result deterministic by input order.
best = None # (combined, lat_name, lon_name, lat_c, lon_c)
for lat_name, lat_c in lat_candidates:
for lon_name, lon_c in lon_candidates:
if lat_name == lon_name:
continue # a column cannot be both axes of the same pair.
combined = (lat_c + lon_c) / 2.0
if best is None or combined > best[0]:
best = (combined, lat_name, lon_name, lat_c, lon_c)
if best is None:
return _no_pair("solo una columna sirve para ambos ejes; no hay par lat/lon distinto")
combined, lat_name, lon_name, lat_c, lon_c = best
confidence = max(0.0, min(1.0, combined))
lat_label = "fuerte" if lat_c >= 0.9 else "debil"
lon_label = "fuerte" if lon_c >= 0.9 else "debil"
reason = (
f"par lat='{lat_name}' (nombre {lat_label}) / lon='{lon_name}' "
f"(nombre {lon_label}) con rango valido"
)
return {
"lat_col": lat_name,
"lon_col": lon_name,
"confidence": confidence,
"reason": reason,
}
@@ -0,0 +1,141 @@
"""Tests para detect_latlon_columns."""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from detect_latlon_columns import detect_latlon_columns
# Keys that every result dict (success or failure) must expose.
_EXPECTED_KEYS = {"lat_col", "lon_col", "confidence", "reason"}
def _col(name, mn=None, mx=None, inferred="numeric"):
"""Build a minimal ColumnProfile-like dict for the tests."""
col = {"name": name, "inferred_type": inferred}
if mn is not None or mx is not None:
col["numeric"] = {"min": mn, "max": mx}
return col
def test_par_latitude_longitude_fuerte():
"""Golden: nombres latitude/longitude con rango valido -> par con confianza alta."""
columns = [
_col("id", mn=1, mx=9999, inferred="numeric"),
_col("latitude", mn=-45.0, mx=45.0),
_col("longitude", mn=-120.0, mx=120.0),
]
res = detect_latlon_columns(columns)
assert set(res.keys()) == _EXPECTED_KEYS
assert res["lat_col"] == "latitude"
assert res["lon_col"] == "longitude"
# Nombre fuerte (0.6) + rango (0.4) en ambos lados -> 1.0.
assert abs(res["confidence"] - 1.0) < 1e-9
assert "rango valido" in res["reason"]
def test_par_lat_lon_abreviado():
"""Golden: nombres abreviados lat/lon tambien se detectan como fuertes."""
columns = [
_col("lat", mn=40.0, mx=43.0),
_col("lon", mn=-4.0, mx=-1.0),
_col("precio", mn=0.0, mx=500.0),
]
res = detect_latlon_columns(columns)
assert res["lat_col"] == "lat"
assert res["lon_col"] == "lon"
assert abs(res["confidence"] - 1.0) < 1e-9
def test_par_x_y_debil_con_rango_valido():
"""Edge: x/y genericos solo cuentan como par debil cuando el rango encaja."""
columns = [
_col("y_coord", mn=-10.0, mx=10.0), # debil latitud
_col("x_coord", mn=-150.0, mx=150.0), # debil longitud
]
res = detect_latlon_columns(columns)
assert res["lat_col"] == "y_coord"
assert res["lon_col"] == "x_coord"
# Nombre debil (0.3) + rango (0.4) -> 0.7 en ambos lados.
assert abs(res["confidence"] - 0.7) < 1e-9
def test_nombre_lat_lon_pero_rango_fuera_no_detecta():
"""Edge: nombre lat/lon con rango fuera de bounds -> NO es coordenada."""
columns = [
_col("latitude", mn=-200.0, mx=200.0), # fuera de [-90, 90]
_col("longitude", mn=-120.0, mx=120.0), # valido, pero sin par lat
]
res = detect_latlon_columns(columns)
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
assert isinstance(res["reason"], str) and res["reason"]
def test_par_fuerte_prevalece_sobre_debil():
"""Edge: con candidatos fuertes y debiles, gana el par de mayor confianza."""
columns = [
_col("latitude", mn=-45.0, mx=45.0), # fuerte lat
_col("y", mn=-30.0, mx=30.0), # debil lat
_col("longitude", mn=-120.0, mx=120.0), # fuerte lon
_col("x", mn=-100.0, mx=100.0), # debil lon
]
res = detect_latlon_columns(columns)
assert res["lat_col"] == "latitude"
assert res["lon_col"] == "longitude"
assert abs(res["confidence"] - 1.0) < 1e-9
def test_entradas_vacias_o_invalidas_no_lanzan():
"""Edge: sin columnas / vacio / no-lista / entradas no-dict -> dict None sin lanzar."""
for bad in ([], None, "no soy lista", 42, [1, 2, 3], [{}], [{"foo": "bar"}]):
res = detect_latlon_columns(bad)
assert set(res.keys()) == _EXPECTED_KEYS
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
assert isinstance(res["reason"], str)
def test_solo_latitud_sin_longitud_no_detecta():
"""Edge: solo hay latitud valida, falta la longitud -> sin par."""
columns = [
_col("latitude", mn=-45.0, mx=45.0),
_col("temperatura", mn=-5.0, mx=40.0),
]
res = detect_latlon_columns(columns)
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
def test_deteccion_por_samples_cuando_falta_numeric():
"""Edge: sin bloque numeric, el rango se valida con samples."""
columns = [
{"name": "lat"}, # sin numeric ni inferred_type
{"name": "lon"},
]
samples = {
"lat": [10.5, 20.0, None, 30.25], # todos dentro de [-90, 90]
"lon": [-40.0, 50.5, 60.0], # todos dentro de [-180, 180]
}
res = detect_latlon_columns(columns, samples)
assert res["lat_col"] == "lat"
assert res["lon_col"] == "lon"
assert abs(res["confidence"] - 1.0) < 1e-9
def test_samples_fuera_de_rango_descarta():
"""Edge: samples fuera de bounds invalidan la columna pese al nombre fuerte."""
columns = [{"name": "lat"}, {"name": "lon"}]
samples = {
"lat": [10.0, 95.0], # 95 > 90 -> latitud invalida
"lon": [-40.0, 50.0],
}
res = detect_latlon_columns(columns, samples)
assert res["lat_col"] is None
assert res["lon_col"] is None
assert res["confidence"] == 0.0
@@ -1,68 +0,0 @@
---
name: detect_time_column
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def detect_time_column(columns: list) -> dict"
description: "Detecta, a partir de la lista de ColumnProfile de un TableProfile del grupo eda, cual es la columna de orden temporal y que columnas numericas hay para graficar una serie en el tiempo. Una columna es temporal si inferred_type=='datetime' o semantic_type in {datetime_iso, date_eu}; time_col es la primera temporal en orden. Es la pieza que usa el capitulo TIMESERIES del AutomaticEDA para decidir si aplica. Lectura defensiva dict-no-throw: nunca lanza, siempre devuelve las mismas claves."
tags: [eda, timeseries, datetime, profiling, column-detection, automatic-eda, datascience, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: columns
desc: "lista de ColumnProfile dict de un TableProfile del grupo eda. Cada elemento suele tener name, inferred_type, semantic_type y numeric. Elementos que no sean dict se ignoran; None/no-lista/vacia -> dict 'no aplica'."
output: "dict SIEMPRE con: time_col (str|None, columna temporal elegida = primera temporal), time_semantic (str, semantic_type de la temporal o ''), numeric_cols (list[str], columnas con inferred_type=='numeric' en orden), n_datetime_cols (int), datetime_cols (list[str], todas las temporales en orden de aparicion), reason (str en espanol explicando la eleccion). Nunca lanza excepcion."
tested: true
tests: ["test_golden_datetime_y_numericas", "test_deteccion_por_semantic_type_date_eu", "test_sin_columna_temporal", "test_columns_none_no_revienta", "test_columns_vacia_no_revienta", "test_columns_no_lista_no_revienta", "test_elementos_basura_se_ignoran", "test_varias_datetime_elige_la_primera"]
test_file_path: "python/functions/datascience/detect_time_column_test.py"
file_path: "python/functions/datascience/detect_time_column.py"
---
## Ejemplo
```python
from datascience import detect_time_column
columns = [
{"name": "fecha", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "ventas", "inferred_type": "numeric"},
{"name": "unidades", "inferred_type": "numeric"},
{"name": "region", "inferred_type": "text"},
]
res = detect_time_column(columns)
res["time_col"] # -> "fecha"
res["numeric_cols"] # -> ["ventas", "unidades"]
res["n_datetime_cols"] # -> 1
# Sin columna temporal: el capitulo TIMESERIES no aplica.
detect_time_column([{"name": "id", "inferred_type": "numeric"}])["time_col"] # -> None
```
## Cuando usarla
Cuando el capitulo TIMESERIES del AutomaticEDA recibe un TableProfile y necesita
decidir si la tabla admite analisis de serie temporal: si `time_col` es None no
hay eje de tiempo y el capitulo se salta; si hay `time_col` y `numeric_cols`,
úsalas como eje X (orden cronologico) y series Y. Tambien sirve para enrutar el
resto del pipeline (acf_pacf / stl_decompose / adf_kpss_stationarity) sobre las
columnas numericas detectadas.
## Gotchas
- Es pura y stdlib-only (sin numpy ni DuckDB): segura de llamar en cualquier paso.
- `time_col` se elige por ORDEN de aparicion en la lista, no por "mejor candidata".
Si hay varias columnas datetime y quieres otra, filtra `datetime_cols` tu mismo.
- Solo mira metadatos del perfil (`inferred_type`/`semantic_type`); no parsea ni
valida los valores reales de la columna. La calidad de la deteccion depende de
que el profiler (summarize_table_duckdb / infer_semantic_type) haya inferido bien.
- Las claves del semantic_type son exactamente las del profiler: `datetime_iso`
(ISO 8601) y `date_eu` (DD/MM/AAAA). Otros formatos de fecha no se detectan por
semantic_type salvo que `inferred_type` ya sea `"datetime"`.
- `numeric_cols` se basa en `inferred_type == "numeric"` (no en "integer"/"float");
si tu profiler usa otra etiqueta, normalizala antes.
@@ -1,112 +0,0 @@
"""Detecta la columna temporal y las columnas numericas de un TableProfile (grupo eda).
Funcion pura y determinista: a partir de la lista de columnas de un TableProfile
producido por el grupo de capacidad `eda` (cada elemento es un ColumnProfile dict),
decide cual es la columna de orden temporal y que columnas numericas hay disponibles
para graficar una serie en el tiempo. Es la pieza que usa el capitulo TIMESERIES del
AutomaticEDA para decidir si la tabla admite analisis de serie temporal.
Lectura 100% defensiva al estilo "dict-no-throw" del grupo eda: nunca lanza
excepcion, siempre devuelve el mismo conjunto de claves.
"""
# semantic_type que el profiler (infer_semantic_type) emite para fechas/datetimes.
_DATETIME_SEMANTICS = ("datetime_iso", "date_eu")
def detect_time_column(columns: list) -> dict:
"""Detecta la columna temporal y las numericas de una lista de ColumnProfile.
Recorre los ColumnProfile de un TableProfile y clasifica cada columna como
temporal o numerica leyendo de forma defensiva sus claves. Una columna es
temporal si su ``inferred_type == "datetime"`` o si su ``semantic_type`` esta
en {``"datetime_iso"``, ``"date_eu"``}. La columna temporal elegida
(``time_col``) es la PRIMERA temporal en el orden de la lista. Las numericas
(``numeric_cols``) son las de ``inferred_type == "numeric"``, en orden.
Funcion pura: no hace I/O, no muta el input, es determinista.
Args:
columns: lista de ColumnProfile dict del grupo eda. Cada elemento suele
tener claves como ``name``, ``inferred_type``, ``semantic_type`` y
``numeric``. Los elementos que no sean dict se ignoran. Si ``columns``
es None, no es lista o esta vacia, se devuelve el dict "no aplica".
Returns:
Siempre un dict con las mismas claves::
{
"time_col": str | None, # columna temporal elegida (None si no hay)
"time_semantic": str, # semantic_type de la temporal ("" si no aplica)
"numeric_cols": [str, ...], # columnas con inferred_type == "numeric"
"n_datetime_cols": int, # nº de columnas temporales detectadas
"datetime_cols": [str, ...],# todas las temporales, en orden de aparicion
"reason": str, # frase corta (en espanol) que explica la eleccion
}
"""
# Caso "no aplica": entrada invalida o vacia.
if not isinstance(columns, list) or not columns:
return {
"time_col": None,
"time_semantic": "",
"numeric_cols": [],
"n_datetime_cols": 0,
"datetime_cols": [],
"reason": "no se detecto columna de fecha/datetime",
}
datetime_cols: list[str] = []
datetime_semantics: list[str] = []
numeric_cols: list[str] = []
for col in columns:
# Ignora elementos que no sean dict sin fallar.
if not isinstance(col, dict):
continue
name = col.get("name")
if name is None:
name = ""
else:
name = str(name)
inferred_type = col.get("inferred_type") or ""
semantic_type = col.get("semantic_type") or ""
is_datetime = inferred_type == "datetime" or semantic_type in _DATETIME_SEMANTICS
if is_datetime:
datetime_cols.append(name)
datetime_semantics.append(semantic_type)
if inferred_type == "numeric":
numeric_cols.append(name)
if not datetime_cols:
return {
"time_col": None,
"time_semantic": "",
"numeric_cols": numeric_cols,
"n_datetime_cols": 0,
"datetime_cols": [],
"reason": "no se detecto columna de fecha/datetime",
}
time_col = datetime_cols[0]
time_semantic = datetime_semantics[0]
if len(datetime_cols) == 1:
reason = f"columna temporal '{time_col}' detectada"
else:
reason = (
f"{len(datetime_cols)} columnas temporales; se elige la primera "
f"'{time_col}'"
)
return {
"time_col": time_col,
"time_semantic": time_semantic,
"numeric_cols": numeric_cols,
"n_datetime_cols": len(datetime_cols),
"datetime_cols": datetime_cols,
"reason": reason,
}
@@ -1,102 +0,0 @@
"""Tests para detect_time_column (grupo eda). Self-contained, sin DuckDB."""
from detect_time_column import detect_time_column
def test_golden_datetime_y_numericas():
columns = [
{"name": "fecha", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "ventas", "inferred_type": "numeric"},
{"name": "unidades", "inferred_type": "numeric"},
{"name": "region", "inferred_type": "text"},
]
res = detect_time_column(columns)
assert res["time_col"] == "fecha"
assert res["time_semantic"] == "datetime_iso"
assert res["numeric_cols"] == ["ventas", "unidades"]
assert res["n_datetime_cols"] == 1
assert res["datetime_cols"] == ["fecha"]
assert isinstance(res["reason"], str) and res["reason"]
def test_deteccion_por_semantic_type_date_eu():
# inferred_type no es datetime, pero semantic_type date_eu => temporal.
columns = [
{"name": "id", "inferred_type": "numeric"},
{"name": "dia", "inferred_type": "text", "semantic_type": "date_eu"},
{"name": "importe", "inferred_type": "numeric"},
]
res = detect_time_column(columns)
assert res["time_col"] == "dia"
assert res["time_semantic"] == "date_eu"
assert res["numeric_cols"] == ["id", "importe"]
assert res["n_datetime_cols"] == 1
assert res["datetime_cols"] == ["dia"]
def test_sin_columna_temporal():
columns = [
{"name": "id", "inferred_type": "numeric"},
{"name": "nombre", "inferred_type": "text"},
{"name": "activo", "inferred_type": "boolean"},
]
res = detect_time_column(columns)
assert res["time_col"] is None
assert res["time_semantic"] == ""
assert res["numeric_cols"] == ["id"]
assert res["n_datetime_cols"] == 0
assert res["datetime_cols"] == []
assert res["reason"] == "no se detecto columna de fecha/datetime"
def test_columns_none_no_revienta():
res = detect_time_column(None)
assert res["time_col"] is None
assert res["time_semantic"] == ""
assert res["numeric_cols"] == []
assert res["n_datetime_cols"] == 0
assert res["datetime_cols"] == []
assert res["reason"] == "no se detecto columna de fecha/datetime"
def test_columns_vacia_no_revienta():
res = detect_time_column([])
assert res["time_col"] is None
assert res["numeric_cols"] == []
assert res["n_datetime_cols"] == 0
def test_columns_no_lista_no_revienta():
# Un dict (no lista) tambien debe caer en el caso "no aplica".
res = detect_time_column({"name": "fecha", "inferred_type": "datetime"})
assert res["time_col"] is None
assert res["numeric_cols"] == []
def test_elementos_basura_se_ignoran():
columns = [
None,
"no soy un dict",
42,
{"name": "ts", "inferred_type": "datetime"},
{"name": "valor", "inferred_type": "numeric"},
]
res = detect_time_column(columns)
assert res["time_col"] == "ts"
assert res["numeric_cols"] == ["valor"]
assert res["n_datetime_cols"] == 1
def test_varias_datetime_elige_la_primera():
columns = [
{"name": "created_at", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "metric", "inferred_type": "numeric"},
{"name": "updated_at", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "fecha_baja", "inferred_type": "text", "semantic_type": "date_eu"},
]
res = detect_time_column(columns)
assert res["time_col"] == "created_at"
assert res["time_semantic"] == "datetime_iso"
assert res["n_datetime_cols"] == 3
assert res["datetime_cols"] == ["created_at", "updated_at", "fecha_baja"]
assert res["numeric_cols"] == ["metric"]
@@ -1,92 +0,0 @@
---
name: extract_timeseries_raw
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def extract_timeseries_raw(query_fn, table: str, time_col: str, value_cols: list, max_rows: int = 5000) -> dict"
description: "Extrae la serie temporal CRUDA (fechas + una o varias columnas numericas) de una tabla, ordenada cronologicamente, para alimentar el render del capitulo TIMESERIES de AutomaticEDA (linea valor-vs-tiempo + conteo por periodo). Recibe un lector read-only inyectado `query_fn(sql) -> dict` (mismo contrato que duckdb_query_readonly / pg_query / el `_q` de profile_table) y NO abre ninguna conexion por su cuenta. Construye UNA sola query con identificadores escapados, ORDER BY por la columna temporal y LIMIT. Devuelve dict dict-no-throw: t (fechas ISO string), series (lista paralela float|None por columna) y n. El capitulo no toca la BD: recibe esto en ctx['timeseries_raw']. Reutilizable tambien por profile_table en una fase futura."
tags: [eda, timeseries, datascience, automatic-eda, extraction, read-only, duckdb, postgres, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [datetime]
params:
- name: query_fn
desc: "callable lector read-only del backend activo. Recibe un string SQL y devuelve un dict {'status':'ok','rows':[{col:val,...},...]} (mismo contrato que duckdb_query_readonly o el `_q` de profile_table). NO se abre ninguna conexion dentro de la funcion: toda la lectura pasa por query_fn. Si es None -> error."
- name: table
desc: "nombre de la tabla de la que extraer la serie. Se escapa con comillas dobles en la query."
- name: time_col
desc: "nombre de la columna de orden temporal. Se usa en ORDER BY (cronologico ascendente) y se filtra IS NOT NULL. Sus valores se devuelven en `t` como string ISO."
- name: value_cols
desc: "lista de nombres de columnas numericas a extraer. Cada una produce una entrada en `series` con una lista paralela a `t`. Vacia o None -> status error."
- name: max_rows
desc: "limite de filas a leer (clausula LIMIT). Default 5000. Protege el render frente a tablas enormes."
output: "dict (nunca lanza). En exito: {'status':'ok','time_col':str,'t':[str,...] (fechas ISO en orden),'series':{col:[float|None,...],...} (paralela a t por value_col, None si el valor no es convertible a float),'n':int}. En error (sin lanzar): {'status':'error','error':str,'time_col':str,'t':[],'series':{},'n':0}. Errores: query_fn None, value_cols vacia, table/time_col vacios, o query_fn devuelve status!='ok' (se propaga su error)."
tested: true
tests: ["test_golden_t_y_series_alineadas", "test_valor_no_convertible_da_none", "test_value_cols_vacia_status_error", "test_query_fn_status_error_propaga", "test_query_fn_none_da_error_sin_reventar", "test_sql_contiene_order_by_y_limit"]
test_file_path: "python/functions/datascience/extract_timeseries_raw_test.py"
file_path: "python/functions/datascience/extract_timeseries_raw.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience import extract_timeseries_raw
from infra import duckdb_query_readonly
# El lector read-only se inyecta como closure (igual que el `_q` de profile_table).
db = "data/ventas.duckdb"
def _q(sql):
return duckdb_query_readonly(db, sql)
res = extract_timeseries_raw(_q, "ventas_diarias", "fecha", ["importe", "unidades"])
# res == {
# "status": "ok",
# "time_col": "fecha",
# "t": ["2024-01-01", "2024-01-02", ...],
# "series": {"importe": [1234.5, 980.0, ...], "unidades": [12.0, 9.0, ...]},
# "n": 365,
# }
# Se entrega al capitulo TIMESERIES sin que este toque la BD:
ctx = {"timeseries_raw": res}
```
## Cuando usarla
Cuando el capitulo TIMESERIES de AutomaticEDA necesita pintar una serie
valor-vs-tiempo (o conteo por periodo) y NO debe abrir la base de datos por su
cuenta: extraes aqui las fechas + columnas numericas ordenadas y se las pasas en
`ctx['timeseries_raw']`. Usala tambien siempre que quieras la secuencia cruda
ordenada cronologicamente de una o varias columnas para alimentar otros
contrastes de serie (ADF/KPSS, ACF/PACF, STL) reutilizando un unico lector
read-only inyectado, en vez de hacer N muestreos a mano.
## Gotchas
- **Impura**: lee de la base de datos a traves de `query_fn`. No abre conexiones
por su cuenta — depende por completo del lector inyectado. Sigue el estilo
dict-no-throw del grupo `eda`: nunca lanza; ante cualquier fallo devuelve
`{"status":"error","error":...}` con `t=[]`, `series={}`, `n=0`.
- **`error_type` en el frontmatter es `error_go_core` por convencion del registry**
(toda funcion impura debe declararlo y el indexer lo exige), pero el codigo
NO lanza esa excepcion: degrada al dict de error. Es metadata, no comportamiento.
- **No loguear los datos crudos**: `t`/`series` pueden contener datos sensibles
(igual que un HAR). No volcar el dict completo a logs ni a telemetria; en
trazas usa solo `n` y los nombres de columna.
- **Alineacion por fila**: `series[col][i]` corresponde a `t[i]`. Un valor no
convertible a float se guarda como `None` (no se descarta la fila) para no
romper la alineacion temporal.
- **Orden**: el orden cronologico depende del `ORDER BY "time_col"` del backend.
Si `time_col` esta guardada como texto con formato no lexicograficamente
ordenable (p.ej. `DD/MM/YYYY`), el orden no sera el real — normaliza la columna
a date/timestamp antes, o pasa una columna ya ordenable.
- **`max_rows`**: con LIMIT, si la tabla supera `max_rows` obtienes solo el primer
tramo cronologico, no un muestreo uniforme. Sube `max_rows` si necesitas el rango
completo.
@@ -1,122 +0,0 @@
"""extract_timeseries_raw — extrae la serie temporal CRUDA de una tabla.
Lector read-only inyectado: recibe `query_fn(sql) -> dict` con el mismo contrato
que duckdb_query_readonly / pg_query (y que el `_q` de profile_table):
`{"status": "ok", "rows": [{col: val, ...}, ...]}`. Esta funcion NO abre ninguna
conexion por su cuenta — solo usa `query_fn`. Construye UNA sola query ordenada
por la columna temporal y devuelve las fechas (`t`) mas cada columna numerica en
listas paralelas (`series`), listas para alimentar el render del capitulo
TIMESERIES de AutomaticEDA (linea valor-vs-tiempo + conteo por periodo) sin que
el capitulo toque la base de datos: recibe esto en `ctx['timeseries_raw']`.
Estilo dict-no-throw del grupo `eda`: nunca lanza; captura cualquier excepcion y
degrada a `{"status": "error", "error": str, ...}`.
"""
from datetime import date, datetime
def _to_float(value):
"""Convierte un valor a float de forma defensiva. None si no es convertible."""
if value is None:
return None
if isinstance(value, bool):
# Un bool es subclase de int en Python; no es un valor de serie valido.
return None
if isinstance(value, (int, float)):
return float(value)
s = str(value).strip()
if not s:
return None
try:
return float(s)
except (TypeError, ValueError):
return None
def _to_iso(value):
"""Convierte un valor temporal a string ISO conservando el orden de la query.
date/datetime -> isoformat(); cualquier otro valor (string, etc.) -> str().
None se preserva como None.
"""
if value is None:
return None
if isinstance(value, (datetime, date)):
return value.isoformat()
return str(value)
def extract_timeseries_raw(query_fn, table, time_col, value_cols, max_rows=5000):
"""Extrae la serie temporal cruda (fechas + columnas numericas) de una tabla.
Args:
query_fn: callable lector read-only del backend activo. Recibe un string
SQL y devuelve un dict {"status": "ok", "rows": [{col: val, ...}]}
(mismo contrato que duckdb_query_readonly / el `_q` de profile_table).
No se abre ninguna conexion aqui: toda la lectura pasa por query_fn.
table: nombre de la tabla.
time_col: nombre de la columna de orden temporal.
value_cols: lista de nombres de columnas numericas a extraer.
max_rows: limite de filas (LIMIT). Default 5000.
Returns:
dict (nunca lanza):
{
"status": "ok" | "error",
"error": str, # solo si status == "error"
"time_col": str,
"t": [str, ...], # time_col como ISO string, en orden
"series": {col: [float|None, ...], ...}, # paralela a t por columna
"n": int # nº de filas devueltas
}
"""
base = {"status": "ok", "time_col": time_col, "t": [], "series": {}, "n": 0}
try:
if query_fn is None:
return {**base, "status": "error", "error": "query_fn es None"}
if not value_cols:
return {**base, "status": "error", "error": "value_cols vacío"}
if not table or not time_col:
return {
**base,
"status": "error",
"error": "table y time_col son obligatorios",
}
# Identificadores escapados con comillas dobles (como hace profile_table)
# para tolerar nombres con mayusculas/espacios/palabras reservadas.
cols_sql = ", ".join(f'"{c}"' for c in value_cols)
sql = (
f'SELECT "{time_col}", {cols_sql} FROM "{table}" '
f'WHERE "{time_col}" IS NOT NULL '
f'ORDER BY "{time_col}" '
f"LIMIT {int(max_rows)}"
)
q = query_fn(sql)
if not isinstance(q, dict) or q.get("status") != "ok":
err = (
q.get("error", "query_fn fallo")
if isinstance(q, dict)
else "query_fn no devolvio un dict"
)
return {**base, "status": "error", "error": err}
rows = q.get("rows", []) or []
t = []
series = {c: [] for c in value_cols}
for row in rows:
t.append(_to_iso(row.get(time_col)))
for c in value_cols:
series[c].append(_to_float(row.get(c)))
return {
"status": "ok",
"time_col": time_col,
"t": t,
"series": series,
"n": len(t),
}
except Exception as e: # noqa: BLE001 - dict-no-throw: degradar, nunca lanzar
return {**base, "status": "error", "error": str(e)}
@@ -1,109 +0,0 @@
"""Tests para extract_timeseries_raw.
No usa DuckDB real: inyecta un query_fn FAKE (closure) que devuelve filas
predefinidas y, opcionalmente, captura el SQL recibido para verificar la query
generada (ORDER BY por la columna temporal + LIMIT). Asi el test es
autocontenido y no depende de ningun backend.
"""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from extract_timeseries_raw import extract_timeseries_raw
def _fake_query(rows, captured=None, status="ok", error=None):
"""Crea un query_fn FAKE.
`captured` (lista opcional) recibe el SQL ejecutado para poder inspeccionarlo.
`status`/`error` permiten simular un fallo del backend.
"""
def _q(sql):
if captured is not None:
captured.append(sql)
if status != "ok":
return {"status": "error", "error": error or "boom"}
return {"status": "ok", "rows": rows}
return _q
def test_golden_t_y_series_alineadas():
"""Golden: t y series alineadas, floats convertidos, n correcto."""
rows = [
{"fecha": "2024-01-01", "ventas": "10", "stock": 5},
{"fecha": "2024-01-02", "ventas": "20.5", "stock": 7},
{"fecha": "2024-01-03", "ventas": 30, "stock": 9},
]
res = extract_timeseries_raw(_fake_query(rows), "t", "fecha", ["ventas", "stock"])
assert res["status"] == "ok"
assert res["n"] == 3
assert res["time_col"] == "fecha"
assert res["t"] == ["2024-01-01", "2024-01-02", "2024-01-03"]
assert res["series"]["ventas"] == [10.0, 20.5, 30.0]
assert res["series"]["stock"] == [5.0, 7.0, 9.0]
def test_valor_no_convertible_da_none():
"""Valor no convertible a float -> None en la serie (alineacion preservada)."""
rows = [
{"fecha": "2024-01-01", "ventas": "abc"},
{"fecha": "2024-01-02", "ventas": None},
{"fecha": "2024-01-03", "ventas": "12.5"},
]
res = extract_timeseries_raw(_fake_query(rows), "t", "fecha", ["ventas"])
assert res["status"] == "ok"
assert res["series"]["ventas"] == [None, None, 12.5]
assert res["n"] == 3
def test_value_cols_vacia_status_error():
"""value_cols vacia -> status error con t/series/n vacios."""
res = extract_timeseries_raw(_fake_query([]), "t", "fecha", [])
assert res["status"] == "error"
assert "value_cols" in res["error"]
assert res["t"] == []
assert res["series"] == {}
assert res["n"] == 0
def test_query_fn_status_error_propaga():
"""query_fn que devuelve status != ok -> se propaga como error."""
res = extract_timeseries_raw(
_fake_query([], status="error", error="db locked"),
"t",
"fecha",
["ventas"],
)
assert res["status"] == "error"
assert "db locked" in res["error"]
assert res["n"] == 0
def test_query_fn_none_da_error_sin_reventar():
"""query_fn None -> error degradado, sin excepcion."""
res = extract_timeseries_raw(None, "t", "fecha", ["ventas"])
assert res["status"] == "error"
assert res["t"] == []
assert res["n"] == 0
def test_sql_contiene_order_by_y_limit():
"""La query generada ordena por time_col y aplica el LIMIT sobre la tabla."""
captured = []
rows = [{"fecha": "2024-01-01", "ventas": 1}]
extract_timeseries_raw(
_fake_query(rows, captured),
"ventas_tbl",
"fecha",
["ventas"],
max_rows=123,
)
assert len(captured) == 1
sql = captured[0]
assert 'ORDER BY "fecha"' in sql
assert "LIMIT 123" in sql
assert 'FROM "ventas_tbl"' in sql
@@ -1,79 +0,0 @@
---
name: profile_datetime
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def profile_datetime(values: list) -> dict"
description: "Perfil minimo de una columna fecha/datetime para la cabecera del capitulo TIMESERIES de AutomaticEDA. Acepta datetime.date, datetime.datetime y strings ISO mezclados, parsea defensivamente e ignora lo no parseable (nunca lanza). Devuelve rango (min/max ISO), n, n_distinct, span_days, frecuencia inferida (daily/weekly/monthly/quarterly/yearly/irregular/unknown) a partir del paso mediano entre fechas distintas, is_regular (pasos ~constantes), n_gaps (huecos en la rejilla) y median_step_days. Solo stdlib (datetime + statistics)."
tags: [statistics, timeseries, datetime, profiling, frequency, eda, automatic_eda, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [datetime, statistics]
params:
- name: values
desc: "lista de valores fecha. Acepta datetime.date, datetime.datetime y strings ISO ('2021-06-28', '2021-06-28T00:00:00', '2021-06-28 12:00:00'). None, vacios y no parseables se ignoran; tz-aware se normaliza a naive. Si values es None o no iterable se trata como lista vacia."
output: "dict SIEMPRE presente con: 'min'/'max' (ISO date YYYY-MM-DD o None), 'n' (valores parseables), 'n_distinct' (fechas unicas), 'span_days' (float o None), 'freq' (daily|weekly|monthly|quarterly|yearly|irregular|unknown), 'is_regular' (bool), 'n_gaps' (int), 'median_step_days' (float o None) y 'note' (str). Con <2 valores o una sola fecha distinta: freq='unknown', is_regular=False, n_gaps=0, median_step_days=None y nota. Nunca lanza."
tested: true
tests: ["test_serie_diaria_regular_golden", "test_serie_mensual_freq_monthly", "test_serie_con_hueco_cuenta_gaps", "test_strings_iso_mezclados_con_datetime", "test_lista_vacia_y_none_devuelve_unknown", "test_valores_no_parseables_ignorados", "test_span_days_correcto", "test_una_sola_fecha_es_coherente"]
test_file_path: "python/functions/datascience/profile_datetime_test.py"
file_path: "python/functions/datascience/profile_datetime.py"
---
## Ejemplo
```python
from datascience import profile_datetime
from datetime import date, datetime, timedelta
# Serie diaria regular de 30 dias
fechas = [date(2021, 1, 1) + timedelta(days=i) for i in range(30)]
res = profile_datetime(fechas)
res["freq"] # -> "daily"
res["is_regular"] # -> True
res["n_gaps"] # -> 0
res["min"], res["max"] # -> ("2021-01-01", "2021-01-30")
res["span_days"] # -> 29.0
# Acepta strings ISO mezclados con objetos datetime/date; ignora lo no parseable
profile_datetime(["2021-06-28", datetime(2021, 6, 29, 12), "basura", None])["n"] # -> 2
# Columna vacia o sin fechas validas
profile_datetime([])["freq"] # -> "unknown" + note "datos insuficientes"
```
## Cuando usarla
Cuando construyes la cabecera del capitulo TIMESERIES de un EDA y necesitas
caracterizar la columna de fecha antes de modelar: que rango cubre, cada cuanto
llegan los datos (frecuencia), si la cadencia es regular y si hay huecos en la
rejilla temporal. Es el complemento de fecha al perfil numerico/categorico del
TableProfile (cierra el `datetime{}=None` pendiente). Pasale la columna de fechas
en bruto (tal cual venga de la BD: dates, datetimes o strings ISO) y usa `freq` +
`is_regular` + `n_gaps` para decidir si conviene resamplear, rellenar huecos o
desestacionalizar mas adelante.
## Gotchas
- Es pura y stdlib-only, pero la inferencia de `freq` es heuristica por bandas
sobre el **paso mediano entre fechas distintas** (se deduplica antes de medir).
Cualquier paso fuera de las bandas conocidas (incluido sub-diario, p.ej. datos
horarios) cae en `"irregular"`: no hay banda hourly.
- El analisis de frecuencia/regularidad/huecos necesita **>=2 fechas distintas**.
Con 0-1 valores parseables o una sola fecha unica, `freq="unknown"`,
`median_step_days=None` y `n_gaps=0`, pero `min`/`max`/`span_days` siguen siendo
coherentes si hay al menos una fecha.
- `min`/`max` se reportan como ISO **date** (`YYYY-MM-DD`); la hora se conserva
internamente para calcular `span_days` y `median_step_days` (que pueden ser
fraccionarios con datetimes sub-diarios) pero no aparece en min/max.
- Los datetime con zona horaria se normalizan a naive (se descarta el tzinfo) para
poder mezclarlos con fechas naive sin que las restas lancen; esto puede desplazar
la fecha en datetimes con offset grande. Para EDA es despreciable.
- `is_regular` usa tolerancia ±25% sobre el paso mediano y umbral del 80% de los
pasos dentro de banda; series de "primero de mes" (deltas 28-31) salen regulares.
- `n_gaps` solo se calcula cuando `freq` es una rejilla regular conocida; con
`freq` `"irregular"` o `"unknown"` siempre es 0.
@@ -1,183 +0,0 @@
"""Perfil minimo de una columna fecha/datetime para la cabecera TIMESERIES (grupo eda).
Funcion pura y determinista que resume una columna temporal: rango (min/max),
numero de fechas distintas, frecuencia inferida (daily/weekly/monthly/quarterly/
yearly/irregular), regularidad de los pasos, huecos respecto a la rejilla inferida
y paso mediano entre fechas consecutivas. Cierra el `datetime{}=None` que hoy deja
pendiente el TableProfile de AutomaticEDA.
Acepta valores heterogeneos (``datetime.date``, ``datetime.datetime`` y strings
ISO como ``"2021-06-28"``, ``"2021-06-28T00:00:00"`` o ``"2021-06-28 12:00:00"``),
parsea de forma defensiva, ignora lo que no se puede parsear y NUNCA lanza.
Solo usa stdlib (``datetime`` + ``statistics``).
"""
from __future__ import annotations
import statistics
from datetime import date, datetime
def _parse_one(v) -> datetime | None:
"""Parsea un valor a ``datetime`` naive, o devuelve None si no es una fecha.
Acepta ``datetime.datetime``, ``datetime.date`` y strings ISO. Cualquier
datetime con zona horaria se normaliza a naive (se descarta el tzinfo) para
poder mezclarlo con fechas naive sin que las restas lancen ``TypeError``.
"""
if v is None or isinstance(v, bool):
return None
# datetime es subclase de date: comprobar datetime primero.
if isinstance(v, datetime):
return v.replace(tzinfo=None)
if isinstance(v, date):
return datetime(v.year, v.month, v.day)
if isinstance(v, str):
s = v.strip()
if not s:
return None
try:
dt = datetime.fromisoformat(s)
except ValueError:
return None
return dt.replace(tzinfo=None)
return None
def _infer_freq(median_step_days: float) -> str:
"""Clasifica la frecuencia a partir del paso mediano (en dias) entre fechas.
Bandas con tolerancia: ~1 dia -> daily, ~7 -> weekly, 28-31 -> monthly,
89-92 -> quarterly, 360-366 -> yearly. Cualquier paso fuera de las bandas
(incluido sub-diario) -> irregular.
"""
m = median_step_days
if 0.5 <= m <= 1.5:
return "daily"
if 6.0 <= m <= 8.0:
return "weekly"
if 28.0 <= m <= 31.0:
return "monthly"
if 89.0 <= m <= 92.0:
return "quarterly"
if 360.0 <= m <= 366.0:
return "yearly"
return "irregular"
def profile_datetime(values: list) -> dict:
"""Perfila una columna de fechas para la cabecera del capitulo TIMESERIES.
Funcion pura y determinista: no hace I/O, no muta el input y nunca lanza.
El analisis de frecuencia, regularidad y huecos se hace sobre las **fechas
distintas ordenadas** (se deduplica antes de calcular los pasos): los valores
repetidos generarian pasos de 0 dias que distorsionarian el mediano y la
inferencia. ``n`` cuenta los valores parseables (con duplicados) y
``n_distinct`` las fechas unicas.
Args:
values: lista de valores fecha. Acepta ``datetime.date``,
``datetime.datetime`` y strings ISO (``"2021-06-28"``,
``"2021-06-28T00:00:00"``, ``"2021-06-28 12:00:00"``). Los valores
None, vacios o no parseables se ignoran. Si ``values`` es None o no
iterable se trata como lista vacia.
Returns:
Siempre un dict con esta forma::
{
"min": str | None, # fecha minima ISO date (YYYY-MM-DD)
"max": str | None, # fecha maxima ISO date
"n": int, # nº de valores fecha parseables
"n_distinct": int, # nº de fechas distintas
"span_days": float | None, # (max - min) en dias
"freq": str, # daily|weekly|monthly|quarterly|
# yearly|irregular|unknown
"is_regular": bool, # pasos ~constantes (tolerancia ±25%)
"n_gaps": int, # saltos > ~1.5x el paso mediano
"median_step_days": float | None, # paso mediano entre fechas
"note": str # "" o nota corta
}
Con menos de 2 valores parseables (o una sola fecha distinta) devuelve
``freq="unknown"``, ``is_regular=False``, ``n_gaps=0``,
``median_step_days=None`` y la nota correspondiente, manteniendo min/max
y span_days coherentes cuando hay al menos una fecha.
"""
base = {
"min": None,
"max": None,
"n": 0,
"n_distinct": 0,
"span_days": None,
"freq": "unknown",
"is_regular": False,
"n_gaps": 0,
"median_step_days": None,
"note": "",
}
if values is None:
values = []
try:
iterator = list(values)
except TypeError:
iterator = []
parsed: list[datetime] = []
for v in iterator:
dt = _parse_one(v)
if dt is not None:
parsed.append(dt)
n = len(parsed)
base["n"] = n
if n == 0:
base["note"] = "datos insuficientes"
return base
distinct = sorted(set(parsed))
n_distinct = len(distinct)
dt_min = min(parsed)
dt_max = max(parsed)
base["n_distinct"] = n_distinct
base["min"] = dt_min.date().isoformat()
base["max"] = dt_max.date().isoformat()
base["span_days"] = round((dt_max - dt_min).total_seconds() / 86400.0, 6)
# Sin al menos dos fechas distintas no hay pasos que medir.
if n_distinct < 2:
base["note"] = "datos insuficientes" if n < 2 else "una sola fecha distinta"
return base
steps = [
(distinct[i + 1] - distinct[i]).total_seconds() / 86400.0
for i in range(n_distinct - 1)
]
median_step = float(statistics.median(steps))
base["median_step_days"] = round(median_step, 6)
freq = _infer_freq(median_step)
base["freq"] = freq
# Regularidad: >=80% de los pasos dentro de ±25% del paso mediano.
if median_step > 0:
tol = 0.25 * median_step
within = sum(1 for s in steps if abs(s - median_step) <= tol)
base["is_regular"] = (within / len(steps)) >= 0.8
else:
base["is_regular"] = False
# Huecos: pasos que superan ~1.5x el mediano. Solo tiene sentido cuando la
# frecuencia es una rejilla regular conocida (no irregular/unknown).
if freq not in ("unknown", "irregular") and median_step > 0:
threshold = 1.5 * median_step
base["n_gaps"] = sum(1 for s in steps if s > threshold)
else:
base["n_gaps"] = 0
return base
@@ -1,127 +0,0 @@
"""Tests para profile_datetime."""
from datetime import date, datetime, timedelta
from profile_datetime import profile_datetime
def test_serie_diaria_regular_golden():
# 30 dias consecutivos: frecuencia diaria, regular, sin huecos.
fechas = [date(2021, 1, 1) + timedelta(days=i) for i in range(30)]
res = profile_datetime(fechas)
assert res["n"] == 30
assert res["n_distinct"] == 30
assert res["min"] == "2021-01-01"
assert res["max"] == "2021-01-30"
assert res["span_days"] == 29.0
assert res["freq"] == "daily"
assert res["is_regular"] is True
assert res["n_gaps"] == 0
assert res["median_step_days"] == 1.0
assert res["note"] == ""
def test_serie_mensual_freq_monthly():
# Primero de mes durante 14 meses: paso mediano ~30/31 dias -> monthly.
fechas = []
y, m = 2021, 1
for _ in range(14):
fechas.append(date(y, m, 1))
m += 1
if m > 12:
m = 1
y += 1
res = profile_datetime(fechas)
assert res["n"] == 14
assert res["freq"] == "monthly"
assert res["min"] == "2021-01-01"
assert res["max"] == "2022-02-01"
assert 28.0 <= res["median_step_days"] <= 31.0
def test_serie_con_hueco_cuenta_gaps():
# Serie diaria con un hueco de 3 dias (faltan i=7,8,9) -> n_gaps >= 1.
fechas = [
date(2021, 1, 1) + timedelta(days=i)
for i in range(20)
if i not in (7, 8, 9)
]
res = profile_datetime(fechas)
assert res["freq"] == "daily"
assert res["n_gaps"] >= 1
assert res["median_step_days"] == 1.0
def test_strings_iso_mezclados_con_datetime():
# Mezcla de strings ISO (varios formatos) y objetos datetime/date.
valores = [
"2021-06-28",
datetime(2021, 6, 29, 12, 0, 0),
"2021-06-30T00:00:00",
date(2021, 7, 1),
]
res = profile_datetime(valores)
assert res["n"] == 4
assert res["n_distinct"] == 4
assert res["min"] == "2021-06-28"
assert res["max"] == "2021-07-01"
assert res["freq"] == "daily"
assert res["note"] == ""
def test_lista_vacia_y_none_devuelve_unknown():
for entrada in ([], None):
res = profile_datetime(entrada)
assert res["n"] == 0
assert res["n_distinct"] == 0
assert res["min"] is None
assert res["max"] is None
assert res["span_days"] is None
assert res["freq"] == "unknown"
assert res["is_regular"] is False
assert res["n_gaps"] == 0
assert res["median_step_days"] is None
assert res["note"] == "datos insuficientes"
def test_valores_no_parseables_ignorados():
# Strings basura, None, ints y un date valido mezclados: ignora lo no fecha.
valores = [
"no es una fecha",
None,
"2021-01-01",
"2021-01-02",
12345,
"tampoco",
date(2021, 1, 3),
"",
]
res = profile_datetime(valores)
assert res["n"] == 3 # solo 3 fechas parseables
assert res["n_distinct"] == 3
assert res["freq"] == "daily"
assert res["min"] == "2021-01-01"
assert res["max"] == "2021-01-03"
def test_span_days_correcto():
# Dos fechas a un anio de distancia: span 365 dias -> yearly.
res = profile_datetime([date(2020, 1, 1), date(2020, 12, 31)])
assert res["n"] == 2
assert res["n_distinct"] == 2
assert res["span_days"] == 365.0
assert res["median_step_days"] == 365.0
assert res["freq"] == "yearly"
def test_una_sola_fecha_es_coherente():
# Un unico valor: min == max, span 0, freq unknown, nota datos insuficientes.
res = profile_datetime(["2021-06-28"])
assert res["n"] == 1
assert res["n_distinct"] == 1
assert res["min"] == "2021-06-28"
assert res["max"] == "2021-06-28"
assert res["span_days"] == 0.0
assert res["freq"] == "unknown"
assert res["median_step_days"] is None
assert res["note"] == "datos insuficientes"
@@ -1,72 +0,0 @@
---
name: resample_timeseries
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def resample_timeseries(t: list, v: list, freq: str = \"auto\", agg: str = \"mean\", max_points: int = 400) -> dict"
description: "Agrega una serie temporal por periodo para graficar su evolucion y el CONTEO de observaciones por bucket. Nucleo del capitulo TIMESERIES de AutomaticEDA (grupo eda): recibe las fechas y los valores YA leidos (pura, sin tocar ninguna base de datos), empareja t[i] con v[i] por indice, parsea fechas defensivamente, trunca cada fecha al inicio de su bucket (daily/weekly/monthly/quarterly/yearly), y agrega los valores numericos validos por bucket mientras cuenta TODAS las observaciones con fecha valida (densidad temporal, incluida la fila cuyo valor es None). freq='auto' infiere del delta mediano entre fechas. Si hay mas buckets que max_points hace downsampling uniforme conservando primero y ultimo. Estilo dict-no-throw: NUNCA lanza; entrada vacia o longitudes incompatibles devuelve listas vacias + note='datos insuficientes'."
tags: [eda, timeseries, resample, aggregate, profiling, datascience, time]
params:
- name: t
desc: "Lista de fechas paralela a v. Acepta strings ISO ('YYYY-MM-DD' o 'YYYY-MM-DDTHH:MM:SS', con 'Z' opcional), datetime.date o datetime.datetime. Se parsea defensivamente; los pares cuya fecha no parsea se descartan junto con su valor."
- name: v
desc: "Lista de valores numericos (float/int) paralela a t. Puede contener None o valores no numericos: se ignoran en la agregacion pero la fila sigue contando en 'count' si su fecha es valida. bool, NaN e Inf se tratan como no numericos."
- name: freq
desc: "Granularidad del bucket: 'auto' (infiere del delta mediano en dias entre fechas: <=3 daily, <=16 weekly, <=75 monthly, <=200 quarterly, mayor yearly) o explicita en {daily, weekly, monthly, quarterly, yearly}. Una frecuencia desconocida cae a 'auto'."
- name: agg
desc: "Agregacion por bucket sobre los valores numericos validos: 'mean' | 'sum' | 'median' | 'last' (valor de la observacion cronologicamente mas reciente del bucket) | 'min' | 'max'. Una agregacion desconocida cae a 'mean'."
- name: max_points
desc: "Tope de buckets en la salida. Si n_buckets > max_points hace downsampling uniforme (1 de cada k buckets equiespaciados, conservando el primero y el ultimo) para no saturar el grafico del PDF/PPTX. max_points<=0 desactiva el limite."
output: "Dict siempre con las mismas claves: t (lista de etiquetas ISO 'YYYY-MM-DD' por bucket, orden cronologico), v (lista paralela del valor agregado por bucket segun agg; None si el bucket no tiene ningun valor numerico valido), count (lista paralela del nº de observaciones con fecha valida por bucket), freq (frecuencia efectivamente usada), agg (agregacion usada), n_in (nº de pares (t,v) con fecha valida que entraron), n_buckets (nº de buckets antes del downsample), downsampled (bool, True si se aplico downsampling), note ('' o 'datos insuficientes' cuando no hay pares validos / longitudes incompatibles / listas vacias). Numericos de v en float, count en int."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: true
tests: ["test_daily_a_mensual_mean", "test_agg_sum_y_last", "test_count_cuenta_observacion_con_valor_none", "test_downsampling_respeta_max_points_y_extremos", "test_freq_auto_infiere_mensual", "test_edge_listas_vacias_o_desiguales"]
test_file_path: "python/functions/datascience/resample_timeseries_test.py"
file_path: "python/functions/datascience/resample_timeseries.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.resample_timeseries import resample_timeseries
# Serie diaria agregada a buckets mensuales: media del valor + conteo de filas.
t = ["2020-01-01", "2020-01-15", "2020-02-01", "2020-02-10", "2020-02-20"]
v = [10.0, 20.0, 30.0, 40.0, 50.0]
r = resample_timeseries(t, v, freq="monthly", agg="mean")
print(r["t"]) # ['2020-01-01', '2020-02-01']
print(r["v"]) # [15.0, 40.0]
print(r["count"]) # [2, 3] <- densidad: nº de observaciones por mes
print(r["freq"], r["downsampled"]) # monthly False
# freq='auto' infiere la granularidad del delta mediano entre fechas.
mensual = [f"2022-{m:02d}-01" for m in range(1, 13)]
print(resample_timeseries(mensual, list(range(1, 13)))["freq"]) # monthly
```
## Cuando usarla
- Usala en el capitulo TIMESERIES de `AutomaticEDA` para construir, a partir de una columna temporal (`detect_time_column`) y una columna numerica, la doble serie que el renderer dibuja: la EVOLUCION del valor agregado por periodo y el CONTEO de observaciones por periodo.
- Cuando ya tengas las fechas y los valores leidos en memoria (de DuckDB, polars, CSV, etc.) y solo necesites agregarlos por dia/semana/mes/trimestre/año sin volver a tocar la base de datos — esta funcion es pura y recibe los datos por parametro.
- Cuando quieras un downsampling controlado para que una serie muy larga (miles de fechas) quepa en un grafico de un PDF/PPTX sin saturarlo, conservando el primer y el ultimo punto.
- Cuando no sepas la cadencia de la serie: pasa `freq="auto"` y deja que la infiera del delta mediano.
## Gotchas
- Funcion pura, sin I/O y determinista. NUNCA lanza: ante entrada invalida (listas vacias, longitudes distintas o todas las fechas no parseables) devuelve listas vacias + `note="datos insuficientes"`.
- `count` cuenta OBSERVACIONES con fecha valida en el bucket (densidad temporal), aunque su valor numerico sea `None`/no numerico. `v` agrega SOLO los valores numericos validos del bucket; si no hay ninguno, `v` del bucket es `None` mientras `count` sigue reflejando las filas. No confundas `count` (filas) con el nº de valores agregados.
- `bool`, `NaN` e `Inf` se tratan como NO numericos (se ignoran en `v`). Un string que no parsea a numero tambien se ignora en `v` pero su fila cuenta si la fecha es valida.
- El truncado de bucket usa el inicio del periodo: semana = lunes ISO (`weekday()==0`), mes = dia 1, trimestre = primer dia del trimestre (ene/abr/jul/oct), año = 1 de enero. La etiqueta de cada bucket es esa fecha de inicio en ISO `YYYY-MM-DD`, no un rango.
- El downsampling (`n_buckets > max_points`) reduce la salida a `<= max_points` puntos equiespaciados conservando primero y ultimo, pero `n_buckets` SIEMPRE reporta el conteo real previo al recorte. Si necesitas todos los buckets, sube `max_points` o ponlo `<=0`.
- Las fechas con hora se truncan a su `date()` antes de agrupar: la granularidad minima es el dia (no hay buckets horarios).
- `freq` desconocida o no-string cae a `"auto"`; `agg` desconocida cae a `"mean"`. El campo devuelto refleja la opcion efectivamente usada.
@@ -1,275 +0,0 @@
"""Agrega una serie temporal por periodo para el capitulo TIMESERIES (grupo eda).
Funcion pura y determinista: recibe las fechas y los valores YA leidos (nunca
toca una base de datos ni hace I/O) y los agrega por bucket temporal para poder
graficar la evolucion de la serie y, en paralelo, el CONTEO de observaciones por
periodo (densidad temporal).
Estilo "dict-no-throw" del grupo eda: NUNCA lanza excepcion, siempre devuelve el
mismo conjunto de claves. Lectura y parseo de fechas 100% defensivos. Solo usa la
libreria estandar (``datetime``, ``statistics``, ``re``).
"""
from __future__ import annotations
import datetime
import re
import statistics
# Frecuencias soportadas, de mas fina a mas gruesa.
_FREQS = ("daily", "weekly", "monthly", "quarterly", "yearly")
# Agregaciones soportadas.
_AGGS = ("mean", "sum", "median", "last", "min", "max")
# Acepta el inicio de una fecha ISO con cualquier separador posterior
# (incluido un caracter raro entre la fecha y la hora).
_DATE_RE = re.compile(r"(\d{4})-(\d{2})-(\d{2})")
def _to_date(x) -> "datetime.date | None":
"""Parsea defensivamente un valor a ``datetime.date``; devuelve None si falla."""
if x is None:
return None
# datetime es subclase de date: comprobarlo primero.
if isinstance(x, datetime.datetime):
return x.date()
if isinstance(x, datetime.date):
return x
s = str(x).strip()
if not s:
return None
# Camino feliz: ISO completo (con o sin hora, con o sin 'Z' final).
try:
s2 = s[:-1] if s.endswith("Z") else s
return datetime.datetime.fromisoformat(s2).date()
except ValueError:
pass
# Fallback robusto: extrae el prefijo YYYY-MM-DD con cualquier separador.
m = _DATE_RE.match(s)
if m:
try:
return datetime.date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
return None
def _to_number(x) -> "float | None":
"""Convierte a float si es numerico finito; devuelve None en otro caso."""
if x is None:
return None
if isinstance(x, bool):
# bool es subclase de int: lo tratamos como no-numerico para una serie.
return None
try:
f = float(x)
except (TypeError, ValueError):
return None
# Descarta NaN / Inf (no agregables de forma estable).
if f != f or f in (float("inf"), float("-inf")):
return None
return f
def _infer_freq(dates_sorted: list) -> str:
"""Infiere la frecuencia desde el delta mediano (en dias) entre fechas."""
if len(dates_sorted) < 2:
return "daily"
diffs = [
(dates_sorted[i + 1] - dates_sorted[i]).days
for i in range(len(dates_sorted) - 1)
]
diffs = [d for d in diffs if d > 0] # ignora duplicados del mismo dia
if not diffs:
return "daily"
med = statistics.median(diffs)
if med <= 3:
return "daily"
if med <= 16:
return "weekly"
if med <= 75:
return "monthly"
if med <= 200:
return "quarterly"
return "yearly"
def _bucket_start(d: "datetime.date", freq: str) -> "datetime.date":
"""Trunca una fecha al inicio de su bucket segun la frecuencia."""
if freq == "weekly":
return d - datetime.timedelta(days=d.weekday()) # lunes ISO
if freq == "monthly":
return datetime.date(d.year, d.month, 1)
if freq == "quarterly":
first_month = ((d.month - 1) // 3) * 3 + 1
return datetime.date(d.year, first_month, 1)
if freq == "yearly":
return datetime.date(d.year, 1, 1)
return d # daily (o cualquier otra cosa): la propia fecha
def _downsample_indices(n: int, max_points: int) -> list:
"""Indices equiespaciados conservando primero y ultimo (<= max_points)."""
if max_points <= 0 or max_points >= n:
return list(range(n))
if max_points == 1:
return [0]
idx = sorted({round(i * (n - 1) / (max_points - 1)) for i in range(max_points)})
return idx
def _empty(freq_req: str, agg: str) -> dict:
"""Resultado canonico cuando no hay datos suficientes."""
eff_freq = freq_req if freq_req in _FREQS else "auto"
return {
"t": [],
"v": [],
"count": [],
"freq": eff_freq,
"agg": agg if agg in _AGGS else "mean",
"n_in": 0,
"n_buckets": 0,
"downsampled": False,
"note": "datos insuficientes",
}
def resample_timeseries(
t: list,
v: list,
freq: str = "auto",
agg: str = "mean",
max_points: int = 400,
) -> dict:
"""Agrega una serie temporal por periodo (buckets) para graficarla.
Empareja ``t[i]`` con ``v[i]`` por indice, descarta los pares cuya fecha no
parsea, trunca cada fecha al inicio de su bucket segun ``freq`` y agrupa. Por
cada bucket devuelve el valor agregado (``agg`` sobre los valores numericos
validos) y el CONTEO de observaciones con fecha valida (densidad temporal),
independientemente de si su valor numerico es ``None``.
Funcion pura: no hace I/O, no muta los inputs, es determinista, NUNCA lanza.
Args:
t: lista de fechas paralela a ``v``. Acepta strings ISO
(``"YYYY-MM-DD"`` o ``"YYYY-MM-DDTHH:MM:SS"``, con ``Z`` opcional),
``datetime.date`` o ``datetime.datetime``. Se parsea defensivamente;
las fechas que no parsean se descartan junto con su valor.
v: lista de valores numericos (float/int). Puede contener ``None`` o
valores no numericos: estos se ignoran en la agregacion, pero la fila
sigue contando en ``count`` (siempre que su fecha sea valida).
freq: ``"auto"`` (infiere del delta mediano entre fechas) o uno de
``"daily"``, ``"weekly"``, ``"monthly"``, ``"quarterly"``,
``"yearly"``. Una frecuencia desconocida cae a ``"auto"``.
agg: agregacion por bucket: ``"mean"``, ``"sum"``, ``"median"``,
``"last"`` (valor de la observacion cronologicamente mas reciente),
``"min"`` o ``"max"``. Una agregacion desconocida cae a ``"mean"``.
max_points: si tras agregar hay mas buckets que este limite, se hace
downsampling uniforme (1 de cada k buckets equiespaciados,
conservando el primero y el ultimo) para no saturar el grafico.
Returns:
Siempre un dict con las mismas claves::
{
"t": [str, ...], # etiqueta ISO YYYY-MM-DD de cada bucket
"v": [float|None, ...], # valor agregado por bucket (None si vacio)
"count": [int, ...], # nº de observaciones con fecha valida
"freq": str, # frecuencia efectivamente usada
"agg": str, # agregacion usada
"n_in": int, # nº de pares (t,v) con fecha valida
"n_buckets": int, # nº de buckets antes del downsample
"downsampled": bool, # True si se aplico downsampling
"note": str, # "" o nota (p.ej. "datos insuficientes")
}
"""
agg = agg if agg in _AGGS else "mean"
freq_req = freq if isinstance(freq, str) else "auto"
# Validacion de entrada: deben ser listas de igual longitud y no vacias.
if (
not isinstance(t, list)
or not isinstance(v, list)
or len(t) == 0
or len(t) != len(v)
):
return _empty(freq_req, agg)
# Empareja por indice y descarta fechas no parseables.
parsed: list = [] # (date, original_index, number_or_None)
for i, (ti, vi) in enumerate(zip(t, v)):
d = _to_date(ti)
if d is None:
continue
parsed.append((d, i, _to_number(vi)))
n_in = len(parsed)
if n_in == 0:
return _empty(freq_req, agg)
# Resuelve la frecuencia efectiva.
if freq_req in _FREQS:
eff_freq = freq_req
else:
dates_sorted = sorted(d for d, _, _ in parsed)
eff_freq = _infer_freq(dates_sorted)
# Agrupa por bucket.
buckets: dict = {}
for d, idx, num in parsed:
b = _bucket_start(d, eff_freq)
slot = buckets.get(b)
if slot is None:
slot = {"count": 0, "vals": [], "last_key": None, "last_val": None}
buckets[b] = slot
slot["count"] += 1
if num is not None:
slot["vals"].append(num)
key = (d, idx)
if slot["last_key"] is None or key > slot["last_key"]:
slot["last_key"] = key
slot["last_val"] = num
ordered = sorted(buckets.items(), key=lambda kv: kv[0])
n_buckets = len(ordered)
def _aggregate(vals: list, last_val) -> "float | None":
if not vals:
return None
if agg == "sum":
return float(sum(vals))
if agg == "median":
return float(statistics.median(vals))
if agg == "last":
return float(last_val) if last_val is not None else None
if agg == "min":
return float(min(vals))
if agg == "max":
return float(max(vals))
return float(statistics.fmean(vals)) # mean (default)
t_out = [b.isoformat() for b, _ in ordered]
v_out = [_aggregate(s["vals"], s["last_val"]) for _, s in ordered]
c_out = [s["count"] for _, s in ordered]
downsampled = False
if n_buckets > max_points > 0:
keep = _downsample_indices(n_buckets, max_points)
t_out = [t_out[i] for i in keep]
v_out = [v_out[i] for i in keep]
c_out = [c_out[i] for i in keep]
downsampled = True
return {
"t": t_out,
"v": v_out,
"count": c_out,
"freq": eff_freq,
"agg": agg,
"n_in": n_in,
"n_buckets": n_buckets,
"downsampled": downsampled,
"note": "",
}
@@ -1,118 +0,0 @@
"""Tests para resample_timeseries (grupo eda)."""
import datetime
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from resample_timeseries import resample_timeseries
def test_daily_a_mensual_mean():
# Serie diaria agregada a buckets mensuales con agg="mean".
t = [
"2020-01-01", "2020-01-15",
"2020-02-01", "2020-02-10", "2020-02-20",
]
v = [10.0, 20.0, 30.0, 40.0, 50.0]
r = resample_timeseries(t, v, freq="monthly", agg="mean")
assert r["t"] == ["2020-01-01", "2020-02-01"]
assert r["v"] == [15.0, 40.0] # (10+20)/2 ; (30+40+50)/3
assert r["count"] == [2, 3]
assert r["freq"] == "monthly"
assert r["agg"] == "mean"
assert r["n_in"] == 5
assert r["n_buckets"] == 2
assert r["downsampled"] is False
assert r["note"] == ""
def test_agg_sum_y_last():
t = [
"2020-01-01", "2020-01-15",
"2020-02-01", "2020-02-10", "2020-02-20",
]
v = [10.0, 20.0, 30.0, 40.0, 50.0]
r_sum = resample_timeseries(t, v, freq="monthly", agg="sum")
assert r_sum["v"] == [30.0, 120.0]
assert r_sum["agg"] == "sum"
# last = valor de la observacion cronologicamente mas reciente del bucket,
# aunque el orden de entrada este desordenado.
t2 = ["2020-02-20", "2020-02-01", "2020-02-10", "2020-01-15", "2020-01-01"]
v2 = [50.0, 30.0, 40.0, 20.0, 10.0]
r_last = resample_timeseries(t2, v2, freq="monthly", agg="last")
assert r_last["t"] == ["2020-01-01", "2020-02-01"]
assert r_last["v"] == [20.0, 50.0] # Jan->2020-01-15=20 ; Feb->2020-02-20=50
assert r_last["agg"] == "last"
def test_count_cuenta_observacion_con_valor_none():
# Un bucket con un valor None: count cuenta la fila, v ignora el None.
t = ["2020-03-05", "2020-03-06", "2020-03-20"]
v = [None, 7.0, 9.0]
r = resample_timeseries(t, v, freq="monthly", agg="mean")
assert r["t"] == ["2020-03-01"]
assert r["count"] == [3] # 3 filas con fecha valida
assert r["v"] == [8.0] # media de los validos: (7+9)/2
assert r["n_in"] == 3
# Bucket entero sin ningun valor numerico valido -> v = None, count sigue.
r2 = resample_timeseries(
["2020-04-01", "2020-04-02"], [None, "n/a"], freq="monthly"
)
assert r2["t"] == ["2020-04-01"]
assert r2["count"] == [2]
assert r2["v"] == [None]
def test_downsampling_respeta_max_points_y_extremos():
base = datetime.date(2021, 1, 1)
t = [(base + datetime.timedelta(days=i)).isoformat() for i in range(500)]
v = [float(i) for i in range(500)]
r = resample_timeseries(t, v, freq="daily", agg="mean", max_points=400)
assert r["n_buckets"] == 500
assert r["downsampled"] is True
assert len(r["t"]) <= 400
assert len(r["t"]) == len(r["v"]) == len(r["count"])
# Primero y ultimo bucket conservados.
assert r["t"][0] == "2021-01-01"
assert r["t"][-1] == (base + datetime.timedelta(days=499)).isoformat()
def test_freq_auto_infiere_mensual():
# Fechas separadas ~1 mes -> auto infiere "monthly".
t = [f"2022-{m:02d}-01" for m in range(1, 13)]
v = [float(m) for m in range(1, 13)]
r = resample_timeseries(t, v, freq="auto", agg="mean")
assert r["freq"] == "monthly"
assert r["n_buckets"] == 12
assert r["count"] == [1] * 12
# Fechas diarias consecutivas -> auto infiere "daily".
base = datetime.date(2023, 1, 1)
td = [(base + datetime.timedelta(days=i)).isoformat() for i in range(20)]
rd = resample_timeseries(td, [float(i) for i in range(20)], freq="auto")
assert rd["freq"] == "daily"
def test_edge_listas_vacias_o_desiguales():
vacio = resample_timeseries([], [])
assert vacio["t"] == [] and vacio["v"] == [] and vacio["count"] == []
assert vacio["note"] == "datos insuficientes"
assert vacio["n_in"] == 0 and vacio["n_buckets"] == 0
desigual = resample_timeseries(["2020-01-01", "2020-01-02"], [1.0])
assert desigual["note"] == "datos insuficientes"
assert desigual["t"] == []
# Todas las fechas invalidas -> tambien insuficiente.
invalidas = resample_timeseries(["no-fecha", "tampoco"], [1.0, 2.0])
assert invalidas["note"] == "datos insuficientes"
assert invalidas["n_in"] == 0