merge: capitulo AutomaticEDA timeseries (verificado met) + funciones delegadas eda

This commit is contained in:
2026-06-30 15:45:37 +02:00
15 changed files with 2324 additions and 0 deletions
+8
View File
@@ -60,8 +60,16 @@ from .exploratory_caveats import exploratory_caveats
from .render_eda_pdf import render_eda_pdf, render_eda_pdf_relational from .render_eda_pdf import render_eda_pdf, render_eda_pdf_relational
from .render_automatic_eda_pdf import render_automatic_eda_pdf from .render_automatic_eda_pdf import render_automatic_eda_pdf
from .render_automatic_eda_pptx import render_automatic_eda_pptx from .render_automatic_eda_pptx import render_automatic_eda_pptx
from .detect_time_column import detect_time_column
from .extract_timeseries_raw import extract_timeseries_raw
from .profile_datetime import profile_datetime
from .resample_timeseries import resample_timeseries
__all__ = [ __all__ = [
"detect_time_column",
"extract_timeseries_raw",
"profile_datetime",
"resample_timeseries",
"render_automatic_eda_pdf", "render_automatic_eda_pdf",
"render_automatic_eda_pptx", "render_automatic_eda_pptx",
"decode_qr_image", "decode_qr_image",
@@ -0,0 +1,613 @@
"""Time-series chapter (TIMESERIES) for AutomaticEDA.
This chapter applies **only when the table has a date/datetime column**. When it
does, it draws — exactly the user requirement — the evolution of the data over
time (the value of each numeric column aggregated per period *and* the count of
rows per period) plus the statistical analysis of the series (stationarity,
autocorrelation, trend and seasonality). When there is no temporal column
``build_timeseries`` returns ``None``.
Data sources, read defensively and never recomputed here:
- ``profile['columns']`` — to detect the time column and the numeric columns.
Delegated to the pure registry function ``detect_time_column`` (group ``eda``).
- ``profile['series'][col]`` — the per-column time-series analysis already
produced by ``profile_table(run_series=True)``: ``stationarity`` (ADF+KPSS),
``acf_pacf`` (ACF/PACF + Ljung-Box), ``stl`` (trend/seasonal/resid +
Hyndman strengths) and the levels/returns suggestion.
- ``ctx['timeseries_raw']`` (or ``profile['timeseries_raw']``) — the *raw* ordered
series ``{time_col, t:[iso...], series:{col:[float|None]}}`` needed to draw the
value-vs-time line and the per-period row count. Exactly like ``modelos`` reads
``raw_numeric`` from ``ctx``, this chapter looks for the raw series there and
degrades honestly when it is absent (it still renders the textual analysis).
The raw series is aggregated per period with the pure registry function
``resample_timeseries`` and the datetime header is built with ``profile_datetime``
(both group ``eda``). Every figure is emitted as a lazy ``Figure`` so the
renderers rasterize and scale it to fit a whole page/slide; tables go through
``DataTable``/``KVTable`` so the paginator splits them repeating the header. No
content is ever cut.
ctx keys this chapter consumes (all optional):
timeseries_raw : dict — ``{time_col, t:[...], series:{col:[...]}}`` raw
ordered series used to draw the value-vs-time line and the row-count
panel. When absent the chapter omits those figures (with a note) and
renders only the analysis available in ``profile['series']``.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and never raises.
"""
from __future__ import annotations
from .. import model
# Pure/impure registry functions (group ``eda``) consumed by this chapter,
# imported defensively so the chapter still builds (degrading the affected
# section to a note) if any of them is somehow unavailable.
try:
from datascience.detect_time_column import detect_time_column
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
detect_time_column = None # type: ignore[assignment]
try:
from datascience.profile_datetime import profile_datetime
except Exception: # noqa: BLE001
profile_datetime = None # type: ignore[assignment]
try:
from datascience.resample_timeseries import resample_timeseries
except Exception: # noqa: BLE001
resample_timeseries = None # type: ignore[assignment]
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "timeseries"
CHAPTER_TITLE = "Series temporales"
# Plain-Spanish gloss for the stationarity verdict of adf_kpss_stationarity.
_VERDICT_GLOSS = {
"stationary": "estacionaria: media y varianza estables en el tiempo; se "
"puede modelar directamente.",
"non_stationary": "no estacionaria: tiene tendencia o varianza cambiante "
"(raíz unitaria). Correlacionar o modelar sus niveles "
"produce relaciones espurias (Granger-Newbold); conviene "
"diferenciar o pasar a retornos.",
"inconclusive": "resultado no concluyente (ADF y KPSS discrepan): tratar con "
"cautela, probablemente cerca de la no estacionariedad.",
}
# OHLC-style name fragments used to collapse near-identical financial series.
_OHLC_HINTS = ("open", "high", "low", "close", "adj", "price", "vwap")
def _fmt_num(value, decimals: int = 3) -> str:
"""Compact, defensive number formatting shared with the other chapters."""
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Detection: which column is the time axis and which numeric columns to chart.
# --------------------------------------------------------------------------- #
def _detect(cols: list) -> dict:
"""Return ``{time_col, numeric_cols, ...}`` via the registry function.
Falls back to an inline scan (datetime inferred_type / datetime semantic
types) when ``detect_time_column`` is unavailable, so the chapter still works.
"""
if detect_time_column is not None:
try:
res = detect_time_column(cols)
if _is_dict(res):
return res
except Exception: # noqa: BLE001 — degrade to the inline scan.
pass
time_col = None
numeric_cols = []
for c in cols or []:
if not _is_dict(c):
continue
it = c.get("inferred_type")
sem = c.get("semantic_type")
if time_col is None and (
it == "datetime" or sem in ("datetime_iso", "date_eu")):
time_col = c.get("name")
if it == "numeric":
numeric_cols.append(c.get("name"))
return {"time_col": time_col, "numeric_cols": numeric_cols,
"time_semantic": "", "reason": "inline fallback"}
def _raw_series_for(raw: dict, col: str):
"""Return (t_list, v_list) for a column from the raw bundle, or (None, None)."""
if not _is_dict(raw):
return None, None
t = raw.get("t")
series = raw.get("series") if _is_dict(raw.get("series")) else {}
v = series.get(col)
if isinstance(t, list) and isinstance(v, list) and t and len(t) == len(v):
return t, v
return None, None
def _ohlc_groups(numeric_cols: list, raw: dict) -> dict:
"""Map each numeric column to a representative to collapse OHLC duplicates.
When several numeric columns are near-identical financial level series
(open/high/low/close/adj close), charting each one repeats the same figure
four times. We keep the first OHLC-looking column as the representative for
the *figures* and list the collapsed ones in a note; the textual analysis is
still produced for every column. Detection is by name only (cheap, no extra
data dependency) and conservative: only collapses when >=2 OHLC-like names
are present.
"""
ohlc = [c for c in numeric_cols
if isinstance(c, str) and any(h in c.lower() for h in _OHLC_HINTS)]
if len(ohlc) < 2:
return {}
representative = ohlc[0]
return {c: representative for c in ohlc if c != representative}
# --------------------------------------------------------------------------- #
# Datetime header (MUST-9.3): range / frequency / regularity / gaps.
# --------------------------------------------------------------------------- #
def _datetime_header(time_col: str, raw: dict) -> list:
"""Build the datetime profile header from the raw time axis, when present."""
blocks: list = []
t, _ = (raw.get("t"), None) if _is_dict(raw) else (None, None)
if not (isinstance(t, list) and t and profile_datetime is not None):
return blocks
try:
dt = profile_datetime(t)
except Exception: # noqa: BLE001
return blocks
if not _is_dict(dt):
return blocks
freq_gloss = {
"daily": "diaria", "weekly": "semanal", "monthly": "mensual",
"quarterly": "trimestral", "yearly": "anual",
"irregular": "irregular", "unknown": "indeterminada",
}
rows = [
("Columna de fecha", model._safe_str(time_col)),
("Rango", f"{model._safe_str(dt.get('min'))}"
f"{model._safe_str(dt.get('max'))}"),
("Observaciones", _fmt_num(dt.get("n"))),
("Fechas distintas", _fmt_num(dt.get("n_distinct"))),
("Frecuencia", freq_gloss.get(dt.get("freq"), model._safe_str(dt.get("freq")))),
("Regular", "" if dt.get("is_regular") else "no"),
]
span = dt.get("span_days")
if span is not None:
rows.append(("Duración (días)", _fmt_num(span, 1)))
n_gaps = dt.get("n_gaps")
if n_gaps is not None:
rows.append(("Huecos en la rejilla", _fmt_num(n_gaps)))
blocks.append(model.KVTable(rows=rows, title="Perfil temporal"))
note = dt.get("note")
if note:
blocks.append(model.Note(model._safe_str(note)))
return blocks
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _parse_dates(labels: list):
"""Parse a list of ISO-ish strings/dates to datetime, dropping unparseable.
Returns (dates, kept_index) so callers can align the values list.
"""
from datetime import date, datetime
out = []
keep = []
for i, lab in enumerate(labels):
if isinstance(lab, datetime):
out.append(lab)
keep.append(i)
continue
if isinstance(lab, date):
out.append(datetime(lab.year, lab.month, lab.day))
keep.append(i)
continue
s = model._safe_str(lab).strip()
if not s:
continue
s2 = s.replace("T", " ")
parsed = None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
parsed = datetime.strptime(s2[:len(fmt) + 4] if False else s2, fmt)
break
except ValueError:
continue
if parsed is None:
try:
parsed = datetime.fromisoformat(s.replace("T", " "))
except ValueError:
continue
out.append(parsed)
keep.append(i)
return out, keep
def _make_evolution_figure(name: str, rs: dict):
"""Lazy callable: value-vs-time line + per-period row-count panel (MUST-9.1)."""
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
t_labels = rs.get("t") or []
v = rs.get("v") or []
counts = rs.get("count") or []
dates, keep = _parse_dates(t_labels)
vv = [v[i] if i < len(v) else None for i in keep]
cc = [counts[i] if i < len(counts) else 0 for i in keep]
fig, (ax_v, ax_c) = plt.subplots(
2, 1, figsize=(7.0, 4.6), sharex=True,
gridspec_kw={"height_ratios": [3.0, 1.2], "hspace": 0.12})
# Top: value aggregated per period (line; gaps where the value is None).
xs = [d for d, val in zip(dates, vv) if val is not None]
ys = [val for val in vv if val is not None]
if xs and ys:
ax_v.plot(xs, ys, color="#4e79a7", linewidth=1.4, zorder=3)
ax_v.fill_between(xs, ys, min(ys), color="#9ec6df", alpha=0.18,
zorder=1)
else:
ax_v.text(0.5, 0.5, "(sin valores numéricos)", ha="center",
va="center", fontsize=9, color="#8a8a8a",
transform=ax_v.transAxes)
ax_v.set_ylabel(name, fontsize=8)
ax_v.tick_params(labelsize=7)
ax_v.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax_v.spines[spine].set_visible(False)
# Bottom: number of observations per period (density / gaps).
if dates and cc:
# Bar width ~ median spacing so bars do not overlap nor leave gaps.
width = 1.0
if len(dates) > 1:
deltas = sorted((dates[i + 1] - dates[i]).days
for i in range(len(dates) - 1))
width = max(deltas[len(deltas) // 2] * 0.8, 1.0)
ax_c.bar(dates, cc, width=width, color="#59a14f", alpha=0.75,
align="center")
ax_c.set_ylabel("nº filas", fontsize=8)
ax_c.tick_params(labelsize=7)
ax_c.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax_c.spines[spine].set_visible(False)
ax_c.xaxis.set_major_locator(mdates.AutoDateLocator())
ax_c.xaxis.set_major_formatter(mdates.ConciseDateFormatter(
ax_c.xaxis.get_major_locator()))
freq = rs.get("freq")
suptitle = f"{name} — evolución temporal"
if freq:
suptitle += f" (agregado {freq})"
fig.suptitle(suptitle, fontsize=10, fontweight="bold", x=0.02, ha="left")
return fig
return _draw
def _make_stl_figure(stl: dict):
"""Lazy callable: the STL trend/seasonal/resid panels, or None if no values.
``stl_decompose`` only carries the component *values* for short series; for
long ones it returns just summary stats (``note``). In that case there is
nothing to plot and we return None (the caller renders the strengths as text).
"""
def _component_values(comp):
if _is_dict(comp):
vals = comp.get("values")
if isinstance(vals, list) and vals:
return [x for x in vals]
return None
trend = _component_values(stl.get("trend"))
seasonal = _component_values(stl.get("seasonal"))
resid = _component_values(stl.get("resid"))
if not any([trend, seasonal, resid]):
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
panels = [("Tendencia", trend, "#4e79a7"),
("Estacional", seasonal, "#59a14f"),
("Resto", resid, "#e15759")]
panels = [(lbl, vals, col) for lbl, vals, col in panels if vals]
fig, axes = plt.subplots(len(panels), 1, figsize=(7.0, 1.4 * len(panels) + 0.6),
sharex=True)
if len(panels) == 1:
axes = [axes]
for ax, (lbl, vals, col) in zip(axes, panels):
ax.plot(range(len(vals)), vals, color=col, linewidth=1.2)
ax.set_ylabel(lbl, fontsize=8)
ax.tick_params(labelsize=7)
ax.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax.spines[spine].set_visible(False)
axes[-1].set_xlabel("índice temporal", fontsize=8)
fig.suptitle("Descomposición STL", fontsize=10, fontweight="bold",
x=0.02, ha="left")
fig.tight_layout(rect=(0, 0, 1, 0.96))
return fig
return _draw
def _make_acf_figure(acf_pacf: dict):
"""Lazy callable: the ACF stem plot with ±1.96/√n bands, or None."""
acf = acf_pacf.get("acf")
n = acf_pacf.get("n")
if not (isinstance(acf, list) and len(acf) > 1 and isinstance(n, int) and n > 0):
return None
def _draw():
import math
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
lags = list(range(len(acf)))
fig, ax = plt.subplots(figsize=(7.0, 3.2))
ax.vlines(lags, 0, acf, color="#4e79a7", linewidth=1.4)
ax.plot(lags, acf, "o", color="#4e79a7", markersize=3)
band = 1.96 / math.sqrt(n)
ax.axhspan(-band, band, color="#cccccc", alpha=0.3,
label="banda ±1.96/√n (ruido blanco)")
ax.axhline(0, color="#888888", linewidth=0.8)
ax.set_xlabel("retardo (lag)", fontsize=8)
ax.set_ylabel("ACF", fontsize=8)
ax.tick_params(labelsize=7)
ax.legend(fontsize=7, loc="upper right", framealpha=0.85)
ax.set_title("Autocorrelación (ACF): lags fuera de la banda = "
"correlación significativa", fontsize=9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Per-column textual analysis from profile['series'][col].
# --------------------------------------------------------------------------- #
def _analysis_markdown(sblock: dict) -> str:
"""One markdown block summarizing stationarity / autocorrelation / STL."""
parts: list = []
stat = sblock.get("stationarity") if _is_dict(sblock.get("stationarity")) else {}
verdict = stat.get("verdict")
if verdict:
adf = stat.get("adf") if _is_dict(stat.get("adf")) else {}
kpss = stat.get("kpss") if _is_dict(stat.get("kpss")) else {}
line = (f"**Estacionariedad:** {_VERDICT_GLOSS.get(verdict, verdict)} "
f"(ADF p={_fmt_num(adf.get('p_value'), 4)}, "
f"KPSS p={_fmt_num(kpss.get('p_value'), 4)}).")
warning = stat.get("warning")
if warning:
line += f"{model._safe_str(warning)}"
parts.append(line)
acf = sblock.get("acf_pacf") if _is_dict(sblock.get("acf_pacf")) else {}
if acf:
is_auto = acf.get("is_autocorrelated")
lb = acf.get("ljung_box") if _is_dict(acf.get("ljung_box")) else {}
sig = acf.get("significant_acf_lags") or []
if is_auto is True:
ac_line = ("**Autocorrelación:** la serie está autocorrelada "
"(Ljung-Box rechaza independencia, "
f"p={_fmt_num(lb.get('p_value'), 4)}): los valores dependen "
"de su pasado, no es ruido blanco.")
if sig:
shown = ", ".join(str(x) for x in sig[:8])
more = "" if len(sig) > 8 else ""
ac_line += f" Lags significativos: {shown}{more}."
elif is_auto is False:
ac_line = ("**Autocorrelación:** no se detecta autocorrelación "
"significativa (compatible con ruido blanco, Ljung-Box "
f"p={_fmt_num(lb.get('p_value'), 4)}).")
else:
ac_line = "**Autocorrelación:** no evaluable (datos insuficientes)."
parts.append(ac_line)
stl = sblock.get("stl") if _is_dict(sblock.get("stl")) else {}
if stl:
ts = stl.get("trend_strength")
ss = stl.get("seasonal_strength")
if ts is not None or ss is not None:
parts.append(
"**Descomposición STL:** fuerza de tendencia "
f"{_fmt_num(ts, 2)} y fuerza estacional {_fmt_num(ss, 2)} "
"(escala 01 de Hyndman: cuanto más alto, más marcada la "
"componente).")
elif stl.get("note"):
parts.append(f"**Descomposición STL:** {model._safe_str(stl.get('note'))}")
if sblock.get("levels_suggested"):
reason = sblock.get("levels_reason")
kind = sblock.get("levels_kind")
tr = sblock.get("to_returns") if _is_dict(sblock.get("to_returns")) else None
line = "**Transformación sugerida:** "
line += "pasar a retornos" if kind == "returns" else "diferenciar la serie"
if reason:
line += f"{model._safe_str(reason)}"
if tr and tr.get("mean") is not None:
line += (f" (retornos: media {_fmt_num(tr.get('mean'), 5)}, "
f"σ {_fmt_num(tr.get('std'), 5)}).")
parts.append(line)
return "\n\n".join(parts)
# --------------------------------------------------------------------------- #
# Per-column section.
# --------------------------------------------------------------------------- #
def _column_section(name: str, sblock: dict, raw: dict, collapsed_into) -> list:
"""Blocks for one numeric column: evolution figure + STL + ACF + analysis."""
blocks = [model.Heading(text=model._safe_str(name), level=2)]
# --- Value-vs-time line + per-period row count (MUST-9.1). ---
drew_evolution = False
if collapsed_into is None: # skip the figure for collapsed OHLC duplicates.
t, v = _raw_series_for(raw, name)
if t is not None and resample_timeseries is not None:
try:
rs = resample_timeseries(t, v)
except Exception: # noqa: BLE001
rs = None
if _is_dict(rs) and rs.get("t"):
blocks.append(model.Figure(
make=_make_evolution_figure(name, rs),
caption=f"Evolución de «{name}» por periodo y nº de "
f"observaciones (conteo de filas)."))
drew_evolution = True
else:
blocks.append(model.Note(
f"Serie casi idéntica a «{collapsed_into}» (grupo OHLC): se omite el "
"gráfico para no repetirlo; el análisis estadístico se mantiene."))
if not drew_evolution and collapsed_into is None:
blocks.append(model.Note(
"Gráfico de evolución temporal no disponible: falta la serie cruda "
"(pásala en ctx['timeseries_raw'] = {time_col, t, series}). Se "
"muestra solo el análisis estadístico."))
# --- STL panels (MUST-9.2). ---
stl = sblock.get("stl") if _is_dict(sblock.get("stl")) else {}
if collapsed_into is None and stl:
stl_fig = _make_stl_figure(stl)
if stl_fig is not None:
blocks.append(model.Figure(
make=stl_fig,
caption=f"Descomposición STL de «{name}»: tendencia, componente "
f"estacional y resto."))
# --- ACF figure (autocorrelation structure). ---
acf = sblock.get("acf_pacf") if _is_dict(sblock.get("acf_pacf")) else {}
if collapsed_into is None and acf:
acf_fig = _make_acf_figure(acf)
if acf_fig is not None:
blocks.append(model.Figure(
make=acf_fig,
caption=f"Función de autocorrelación de «{name}»."))
# --- Textual analysis (always, even for collapsed duplicates). ---
analysis = _analysis_markdown(sblock)
if analysis:
blocks.append(model.Markdown(text=analysis))
return blocks
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_timeseries(profile: dict, ctx: dict):
"""Build the TIMESERIES Chapter, or ``None`` if the table has no date column.
Args:
profile: the ``eda`` group TableProfile dict.
ctx: presentation context; ``ctx['timeseries_raw']`` (optional) carries
the raw ordered series used to draw the value-vs-time line and the
per-period row count.
Returns:
A ``model.Chapter`` with, per numeric column, the value-vs-time evolution
+ row-count figure, the STL panels, the ACF figure and the statistical
analysis; or ``None`` when there is no temporal column (the chapter does
not apply).
"""
profile = profile or {}
if not _is_dict(profile):
profile = {}
ctx = ctx or {}
cols = profile.get("columns") or []
det = _detect(cols)
time_col = det.get("time_col")
if not time_col:
return None # no date/datetime column -> chapter does not apply.
numeric_cols = det.get("numeric_cols") or []
series_map = profile.get("series") if _is_dict(profile.get("series")) else {}
raw = ctx.get("timeseries_raw") or profile.get("timeseries_raw")
raw = raw if _is_dict(raw) else {}
# Which columns can the chapter say anything about: those with a series
# analysis block and/or a raw series to chart. Preserve the profile order.
chartable = []
for name in numeric_cols:
has_analysis = _is_dict(series_map.get(name))
has_raw, _ = _raw_series_for(raw, name)
if has_analysis or has_raw is not None:
chartable.append(name)
if not chartable:
# A date column exists but nothing numeric to chart/analyse: still a
# valid (small) chapter — show just the datetime header if we have it.
header = _datetime_header(time_col, raw)
if not header:
return None
intro = (
f"La tabla tiene una columna temporal («{time_col}») pero no hay "
"columnas numéricas con serie analizable.")
blocks = [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=intro)] + header
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
collapsed = _ohlc_groups(chartable, raw)
intro = (
"Este capítulo analiza la evolución de la tabla en el tiempo usando la "
f"columna de fecha «{time_col}». Para cada columna numérica se muestra su "
"**evolución por periodo** (valor agregado) junto al **número de filas por "
"periodo** (densidad de observaciones), su **descomposición STL** "
"(tendencia / estacionalidad / resto) y la **función de autocorrelación**; "
"debajo, el análisis de la serie: estacionariedad (ADF + KPSS), "
"autocorrelación (Ljung-Box) y, cuando procede, la transformación "
"sugerida (retornos o diferencias) para evitar correlaciones espurias.")
blocks = [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=intro)]
blocks += _datetime_header(time_col, raw)
if collapsed:
reps = sorted(set(collapsed.values()))
collapsed_names = ", ".join(sorted(collapsed.keys()))
blocks.append(model.Note(
f"Series OHLC casi idénticas detectadas ({collapsed_names}): se "
f"grafican consolidadas en «{', '.join(reps)}» para no repetir el "
"mismo gráfico; cada columna conserva su análisis estadístico."))
for name in chartable:
sblock = series_map.get(name) if _is_dict(series_map.get(name)) else {}
blocks += _column_section(name, sblock, raw, collapsed.get(name))
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,244 @@
"""Tests for the TIMESERIES chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic ``series`` blocks (shaped like
``profile_table(run_series=True)`` output) and a raw ``timeseries_raw`` bundle,
with no DuckDB, so the suite is fast and deterministic. Verifies that the chapter:
- returns ``None`` when there is no date/datetime column (the user requirement);
- never raises on ``None``/empty/garbage input;
- with a date column + raw series emits, per numeric column, the value-vs-time +
row-count evolution figure, the STL panels, the ACF figure and the textual
analysis (stationarity / autocorrelation / suggested transform);
- collapses near-identical OHLC series into one chart while keeping every
column's analysis;
- renders without cutting anything in both PDF and PPTX (every column heading
survives in the rendered output).
"""
import math
import os
import re
import tempfile
from pypdf import PdfReader
from datascience.automatic_eda.chapters.timeseries import (
build_timeseries, CHAPTER_VERSION, _VERDICT_GLOSS,
)
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures shaped like the real profile_table(run_series=True) output.
# --------------------------------------------------------------------------- #
def _dates(n: int) -> list:
"""n consecutive daily ISO date strings starting 2021-01-01."""
from datetime import date, timedelta
start = date(2021, 1, 1)
return [(start + timedelta(days=i)).isoformat() for i in range(n)]
def _series_block(n=120, verdict="non_stationary", autocorr=True, levels=True,
with_stl_values=True):
"""A synthetic ``series`` block like _build_series_block produces."""
trend = [float(i) for i in range(n)]
seasonal = [math.sin(i / 6.0) for i in range(n)]
resid = [0.1 * ((-1) ** i) for i in range(n)]
acf = [1.0] + [max(0.0, 0.9 - 0.05 * k) for k in range(1, 21)]
block = {
"order_col": "fecha",
"ordered": True,
"n": n,
"stationarity": {
"n": n, "verdict": verdict,
"adf": {"p_value": 0.42, "stationary": False},
"kpss": {"p_value": 0.01, "stationary": False},
"warning": ("serie no estacionaria: riesgo de correlación espuria"
if verdict != "stationary" else None),
},
"acf_pacf": {
"n": n, "nlags": 20, "acf": acf,
"significant_acf_lags": [1, 2, 3, 4, 5],
"ljung_box": {"stat": 123.4, "p_value": 0.0 if autocorr else 0.7,
"lags": 20},
"is_autocorrelated": autocorr,
},
"period_source": "datetime_freq",
"stl": {
"n": n, "period": 7, "period_inferred": False, "robust": False,
"trend": {"values": trend} if with_stl_values else {
"note": "serie larga: solo estadisticos", "mean": 60.0},
"seasonal": {"values": seasonal} if with_stl_values else {"mean": 0.0},
"resid": {"values": resid} if with_stl_values else {"mean": 0.0},
"trend_strength": 0.95, "seasonal_strength": 0.42,
},
}
if levels:
block["levels_suggested"] = True
block["levels_kind"] = "returns"
block["levels_reason"] = ("columna financiera no estacionaria: usar "
"retornos evita correlación espuria.")
block["to_returns"] = {"method": "log", "mean": 0.001, "std": 0.02}
else:
block["levels_suggested"] = False
return block
def _profile(numeric_names=("precio",), n=120, with_stl_values=True):
cols = [{"name": "fecha", "inferred_type": "datetime",
"semantic_type": "datetime_iso"}]
series_map = {}
for nm in numeric_names:
cols.append({"name": nm, "inferred_type": "numeric",
"numeric": {"min": 1.0, "max": 200.0, "mean": 100.0,
"median": 95.0, "std": 40.0}})
series_map[nm] = _series_block(n=n, with_stl_values=with_stl_values)
return {"table": "cotizaciones", "n_rows": n, "n_cols": len(cols),
"columns": cols, "series": series_map}
def _ctx_raw(numeric_names=("precio",), n=120):
t = _dates(n)
series = {}
for j, nm in enumerate(numeric_names):
series[nm] = [float(100 + i + 5 * j) for i in range(n)]
return {"timeseries_raw": {"time_col": "fecha", "t": t, "series": series}}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_estructura_y_figuras():
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
assert ch is not None
assert ch.id == "timeseries"
assert ch.version == CHAPTER_VERSION
kinds = [b.kind for b in ch.blocks]
assert kinds[0] == "heading" # chapter title
assert kinds[1] == "markdown" # intro
assert "kv_table" in kinds # datetime profile header (MUST-9.3)
# Per column: evolution figure + STL figure + ACF figure + analysis markdown.
figs = [b for b in ch.blocks if b.kind == "figure"]
assert len(figs) >= 3, "evolución + STL + ACF esperadas"
# Lazy makers must produce real matplotlib figures.
import matplotlib.pyplot as plt
for f in figs:
fig = f.make()
assert fig is not None
plt.close(fig)
def test_golden_evolucion_tiene_dos_paneles_valor_y_conteo():
# MUST-9.1: the evolution figure has a value panel + a row-count panel.
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
figs = [b for b in ch.blocks if b.kind == "figure"]
import matplotlib.pyplot as plt
fig = figs[0].make() # first figure is the evolution one.
assert len(fig.axes) == 2, "panel de valor + panel de conteo de filas"
plt.close(fig)
def test_golden_analisis_textual_presente():
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "Estacionariedad" in md
assert "Autocorrelación" in md
assert "STL" in md
# Verdict gloss surfaced for the non-stationary preset.
assert _VERDICT_GLOSS["non_stationary"].split(":")[0] in md
# Levels/returns suggestion surfaced.
assert "retornos" in md.lower()
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_sin_columna_fecha_devuelve_none():
prof = {"columns": [
{"name": "precio", "inferred_type": "numeric", "numeric": {"mean": 1.0}},
{"name": "ciudad", "inferred_type": "categorical",
"categorical": {"top": []}},
], "series": {"precio": _series_block()}}
assert build_timeseries(prof, {}) is None
def test_edge_none_y_vacio_no_revienta():
assert build_timeseries(None, None) is None
assert build_timeseries({}, {}) is None
assert build_timeseries({"columns": []}, {}) is None
# Date column but nothing numeric/series and no raw -> None (nothing to say).
assert build_timeseries(
{"columns": [{"name": "fecha", "inferred_type": "datetime"}]}, {}) is None
def test_edge_sin_raw_degrada_pero_mantiene_analisis():
# No ctx['timeseries_raw']: the chapter must still build (STL/ACF/analysis
# from the profile) and note that the evolution chart is unavailable.
ch = build_timeseries(_profile(("precio",)), {})
assert ch is not None
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "evolución temporal no disponible" in notes
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "Estacionariedad" in md
def test_edge_stl_solo_estadisticos_no_dibuja_panel_pero_no_revienta():
# Long series: STL carries only stats (no 'values') -> no STL figure, but the
# strengths still surface in the textual analysis.
ch = build_timeseries(_profile(("precio",), with_stl_values=False),
_ctx_raw(("precio",)))
assert ch is not None
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "STL" in md
# --------------------------------------------------------------------------- #
# OHLC consolidation (MUST-9.3).
# --------------------------------------------------------------------------- #
def test_ohlc_consolidacion():
names = ("Open", "High", "Low", "Close")
ch = build_timeseries(_profile(names), _ctx_raw(names))
assert ch is not None
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "OHLC" in notes
# Only the representative draws the evolution figure; the other 3 are collapsed
# so there are fewer evolution figures than columns.
captions = [b.caption or "" for b in ch.blocks if b.kind == "figure"]
evo = [c for c in captions if "Evolución" in c]
assert len(evo) < len(names), "las series OHLC deben consolidarse"
# Every column still has its analysis markdown (one heading per column).
headings = [b.text for b in ch.blocks if b.kind == "heading" and b.level == 2]
for nm in names:
assert nm in headings
# --------------------------------------------------------------------------- #
# Anti-cut: PDF + PPTX.
# --------------------------------------------------------------------------- #
def test_anti_corte_pdf_y_pptx():
names = tuple(f"serie_{i}" for i in range(6))
prof = _profile(names, n=90)
ctx = _ctx_raw(names, n=90)
ch = build_timeseries(prof, ctx)
col_headings = [b.text for b in ch.blocks if b.kind == "heading" and b.level == 2]
assert len(col_headings) == 6
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "ts.pdf")
res_pdf = render_automatic_eda_pdf(
prof, pdf, {"ctx": ctx, "write_manifest": False})
assert res_pdf["path"] == pdf
txt = _pdf_text(pdf)
for nm in col_headings:
assert nm in txt, f"columna '{nm}' cortada/ausente en el PDF"
pptx = os.path.join(d, "ts.pptx")
res_pptx = render_automatic_eda_pptx(
prof, pptx, {"ctx": ctx, "write_manifest": False})
assert res_pptx["path"] == pptx
assert res_pptx["n_slides"] >= 6
@@ -0,0 +1,68 @@
---
name: detect_time_column
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def detect_time_column(columns: list) -> dict"
description: "Detecta, a partir de la lista de ColumnProfile de un TableProfile del grupo eda, cual es la columna de orden temporal y que columnas numericas hay para graficar una serie en el tiempo. Una columna es temporal si inferred_type=='datetime' o semantic_type in {datetime_iso, date_eu}; time_col es la primera temporal en orden. Es la pieza que usa el capitulo TIMESERIES del AutomaticEDA para decidir si aplica. Lectura defensiva dict-no-throw: nunca lanza, siempre devuelve las mismas claves."
tags: [eda, timeseries, datetime, profiling, column-detection, automatic-eda, datascience, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: columns
desc: "lista de ColumnProfile dict de un TableProfile del grupo eda. Cada elemento suele tener name, inferred_type, semantic_type y numeric. Elementos que no sean dict se ignoran; None/no-lista/vacia -> dict 'no aplica'."
output: "dict SIEMPRE con: time_col (str|None, columna temporal elegida = primera temporal), time_semantic (str, semantic_type de la temporal o ''), numeric_cols (list[str], columnas con inferred_type=='numeric' en orden), n_datetime_cols (int), datetime_cols (list[str], todas las temporales en orden de aparicion), reason (str en espanol explicando la eleccion). Nunca lanza excepcion."
tested: true
tests: ["test_golden_datetime_y_numericas", "test_deteccion_por_semantic_type_date_eu", "test_sin_columna_temporal", "test_columns_none_no_revienta", "test_columns_vacia_no_revienta", "test_columns_no_lista_no_revienta", "test_elementos_basura_se_ignoran", "test_varias_datetime_elige_la_primera"]
test_file_path: "python/functions/datascience/detect_time_column_test.py"
file_path: "python/functions/datascience/detect_time_column.py"
---
## Ejemplo
```python
from datascience import detect_time_column
columns = [
{"name": "fecha", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "ventas", "inferred_type": "numeric"},
{"name": "unidades", "inferred_type": "numeric"},
{"name": "region", "inferred_type": "text"},
]
res = detect_time_column(columns)
res["time_col"] # -> "fecha"
res["numeric_cols"] # -> ["ventas", "unidades"]
res["n_datetime_cols"] # -> 1
# Sin columna temporal: el capitulo TIMESERIES no aplica.
detect_time_column([{"name": "id", "inferred_type": "numeric"}])["time_col"] # -> None
```
## Cuando usarla
Cuando el capitulo TIMESERIES del AutomaticEDA recibe un TableProfile y necesita
decidir si la tabla admite analisis de serie temporal: si `time_col` es None no
hay eje de tiempo y el capitulo se salta; si hay `time_col` y `numeric_cols`,
úsalas como eje X (orden cronologico) y series Y. Tambien sirve para enrutar el
resto del pipeline (acf_pacf / stl_decompose / adf_kpss_stationarity) sobre las
columnas numericas detectadas.
## Gotchas
- Es pura y stdlib-only (sin numpy ni DuckDB): segura de llamar en cualquier paso.
- `time_col` se elige por ORDEN de aparicion en la lista, no por "mejor candidata".
Si hay varias columnas datetime y quieres otra, filtra `datetime_cols` tu mismo.
- Solo mira metadatos del perfil (`inferred_type`/`semantic_type`); no parsea ni
valida los valores reales de la columna. La calidad de la deteccion depende de
que el profiler (summarize_table_duckdb / infer_semantic_type) haya inferido bien.
- Las claves del semantic_type son exactamente las del profiler: `datetime_iso`
(ISO 8601) y `date_eu` (DD/MM/AAAA). Otros formatos de fecha no se detectan por
semantic_type salvo que `inferred_type` ya sea `"datetime"`.
- `numeric_cols` se basa en `inferred_type == "numeric"` (no en "integer"/"float");
si tu profiler usa otra etiqueta, normalizala antes.
@@ -0,0 +1,112 @@
"""Detecta la columna temporal y las columnas numericas de un TableProfile (grupo eda).
Funcion pura y determinista: a partir de la lista de columnas de un TableProfile
producido por el grupo de capacidad `eda` (cada elemento es un ColumnProfile dict),
decide cual es la columna de orden temporal y que columnas numericas hay disponibles
para graficar una serie en el tiempo. Es la pieza que usa el capitulo TIMESERIES del
AutomaticEDA para decidir si la tabla admite analisis de serie temporal.
Lectura 100% defensiva al estilo "dict-no-throw" del grupo eda: nunca lanza
excepcion, siempre devuelve el mismo conjunto de claves.
"""
# semantic_type que el profiler (infer_semantic_type) emite para fechas/datetimes.
_DATETIME_SEMANTICS = ("datetime_iso", "date_eu")
def detect_time_column(columns: list) -> dict:
"""Detecta la columna temporal y las numericas de una lista de ColumnProfile.
Recorre los ColumnProfile de un TableProfile y clasifica cada columna como
temporal o numerica leyendo de forma defensiva sus claves. Una columna es
temporal si su ``inferred_type == "datetime"`` o si su ``semantic_type`` esta
en {``"datetime_iso"``, ``"date_eu"``}. La columna temporal elegida
(``time_col``) es la PRIMERA temporal en el orden de la lista. Las numericas
(``numeric_cols``) son las de ``inferred_type == "numeric"``, en orden.
Funcion pura: no hace I/O, no muta el input, es determinista.
Args:
columns: lista de ColumnProfile dict del grupo eda. Cada elemento suele
tener claves como ``name``, ``inferred_type``, ``semantic_type`` y
``numeric``. Los elementos que no sean dict se ignoran. Si ``columns``
es None, no es lista o esta vacia, se devuelve el dict "no aplica".
Returns:
Siempre un dict con las mismas claves::
{
"time_col": str | None, # columna temporal elegida (None si no hay)
"time_semantic": str, # semantic_type de la temporal ("" si no aplica)
"numeric_cols": [str, ...], # columnas con inferred_type == "numeric"
"n_datetime_cols": int, # nº de columnas temporales detectadas
"datetime_cols": [str, ...],# todas las temporales, en orden de aparicion
"reason": str, # frase corta (en espanol) que explica la eleccion
}
"""
# Caso "no aplica": entrada invalida o vacia.
if not isinstance(columns, list) or not columns:
return {
"time_col": None,
"time_semantic": "",
"numeric_cols": [],
"n_datetime_cols": 0,
"datetime_cols": [],
"reason": "no se detecto columna de fecha/datetime",
}
datetime_cols: list[str] = []
datetime_semantics: list[str] = []
numeric_cols: list[str] = []
for col in columns:
# Ignora elementos que no sean dict sin fallar.
if not isinstance(col, dict):
continue
name = col.get("name")
if name is None:
name = ""
else:
name = str(name)
inferred_type = col.get("inferred_type") or ""
semantic_type = col.get("semantic_type") or ""
is_datetime = inferred_type == "datetime" or semantic_type in _DATETIME_SEMANTICS
if is_datetime:
datetime_cols.append(name)
datetime_semantics.append(semantic_type)
if inferred_type == "numeric":
numeric_cols.append(name)
if not datetime_cols:
return {
"time_col": None,
"time_semantic": "",
"numeric_cols": numeric_cols,
"n_datetime_cols": 0,
"datetime_cols": [],
"reason": "no se detecto columna de fecha/datetime",
}
time_col = datetime_cols[0]
time_semantic = datetime_semantics[0]
if len(datetime_cols) == 1:
reason = f"columna temporal '{time_col}' detectada"
else:
reason = (
f"{len(datetime_cols)} columnas temporales; se elige la primera "
f"'{time_col}'"
)
return {
"time_col": time_col,
"time_semantic": time_semantic,
"numeric_cols": numeric_cols,
"n_datetime_cols": len(datetime_cols),
"datetime_cols": datetime_cols,
"reason": reason,
}
@@ -0,0 +1,102 @@
"""Tests para detect_time_column (grupo eda). Self-contained, sin DuckDB."""
from detect_time_column import detect_time_column
def test_golden_datetime_y_numericas():
columns = [
{"name": "fecha", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "ventas", "inferred_type": "numeric"},
{"name": "unidades", "inferred_type": "numeric"},
{"name": "region", "inferred_type": "text"},
]
res = detect_time_column(columns)
assert res["time_col"] == "fecha"
assert res["time_semantic"] == "datetime_iso"
assert res["numeric_cols"] == ["ventas", "unidades"]
assert res["n_datetime_cols"] == 1
assert res["datetime_cols"] == ["fecha"]
assert isinstance(res["reason"], str) and res["reason"]
def test_deteccion_por_semantic_type_date_eu():
# inferred_type no es datetime, pero semantic_type date_eu => temporal.
columns = [
{"name": "id", "inferred_type": "numeric"},
{"name": "dia", "inferred_type": "text", "semantic_type": "date_eu"},
{"name": "importe", "inferred_type": "numeric"},
]
res = detect_time_column(columns)
assert res["time_col"] == "dia"
assert res["time_semantic"] == "date_eu"
assert res["numeric_cols"] == ["id", "importe"]
assert res["n_datetime_cols"] == 1
assert res["datetime_cols"] == ["dia"]
def test_sin_columna_temporal():
columns = [
{"name": "id", "inferred_type": "numeric"},
{"name": "nombre", "inferred_type": "text"},
{"name": "activo", "inferred_type": "boolean"},
]
res = detect_time_column(columns)
assert res["time_col"] is None
assert res["time_semantic"] == ""
assert res["numeric_cols"] == ["id"]
assert res["n_datetime_cols"] == 0
assert res["datetime_cols"] == []
assert res["reason"] == "no se detecto columna de fecha/datetime"
def test_columns_none_no_revienta():
res = detect_time_column(None)
assert res["time_col"] is None
assert res["time_semantic"] == ""
assert res["numeric_cols"] == []
assert res["n_datetime_cols"] == 0
assert res["datetime_cols"] == []
assert res["reason"] == "no se detecto columna de fecha/datetime"
def test_columns_vacia_no_revienta():
res = detect_time_column([])
assert res["time_col"] is None
assert res["numeric_cols"] == []
assert res["n_datetime_cols"] == 0
def test_columns_no_lista_no_revienta():
# Un dict (no lista) tambien debe caer en el caso "no aplica".
res = detect_time_column({"name": "fecha", "inferred_type": "datetime"})
assert res["time_col"] is None
assert res["numeric_cols"] == []
def test_elementos_basura_se_ignoran():
columns = [
None,
"no soy un dict",
42,
{"name": "ts", "inferred_type": "datetime"},
{"name": "valor", "inferred_type": "numeric"},
]
res = detect_time_column(columns)
assert res["time_col"] == "ts"
assert res["numeric_cols"] == ["valor"]
assert res["n_datetime_cols"] == 1
def test_varias_datetime_elige_la_primera():
columns = [
{"name": "created_at", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "metric", "inferred_type": "numeric"},
{"name": "updated_at", "inferred_type": "datetime", "semantic_type": "datetime_iso"},
{"name": "fecha_baja", "inferred_type": "text", "semantic_type": "date_eu"},
]
res = detect_time_column(columns)
assert res["time_col"] == "created_at"
assert res["time_semantic"] == "datetime_iso"
assert res["n_datetime_cols"] == 3
assert res["datetime_cols"] == ["created_at", "updated_at", "fecha_baja"]
assert res["numeric_cols"] == ["metric"]
@@ -0,0 +1,92 @@
---
name: extract_timeseries_raw
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def extract_timeseries_raw(query_fn, table: str, time_col: str, value_cols: list, max_rows: int = 5000) -> dict"
description: "Extrae la serie temporal CRUDA (fechas + una o varias columnas numericas) de una tabla, ordenada cronologicamente, para alimentar el render del capitulo TIMESERIES de AutomaticEDA (linea valor-vs-tiempo + conteo por periodo). Recibe un lector read-only inyectado `query_fn(sql) -> dict` (mismo contrato que duckdb_query_readonly / pg_query / el `_q` de profile_table) y NO abre ninguna conexion por su cuenta. Construye UNA sola query con identificadores escapados, ORDER BY por la columna temporal y LIMIT. Devuelve dict dict-no-throw: t (fechas ISO string), series (lista paralela float|None por columna) y n. El capitulo no toca la BD: recibe esto en ctx['timeseries_raw']. Reutilizable tambien por profile_table en una fase futura."
tags: [eda, timeseries, datascience, automatic-eda, extraction, read-only, duckdb, postgres, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [datetime]
params:
- name: query_fn
desc: "callable lector read-only del backend activo. Recibe un string SQL y devuelve un dict {'status':'ok','rows':[{col:val,...},...]} (mismo contrato que duckdb_query_readonly o el `_q` de profile_table). NO se abre ninguna conexion dentro de la funcion: toda la lectura pasa por query_fn. Si es None -> error."
- name: table
desc: "nombre de la tabla de la que extraer la serie. Se escapa con comillas dobles en la query."
- name: time_col
desc: "nombre de la columna de orden temporal. Se usa en ORDER BY (cronologico ascendente) y se filtra IS NOT NULL. Sus valores se devuelven en `t` como string ISO."
- name: value_cols
desc: "lista de nombres de columnas numericas a extraer. Cada una produce una entrada en `series` con una lista paralela a `t`. Vacia o None -> status error."
- name: max_rows
desc: "limite de filas a leer (clausula LIMIT). Default 5000. Protege el render frente a tablas enormes."
output: "dict (nunca lanza). En exito: {'status':'ok','time_col':str,'t':[str,...] (fechas ISO en orden),'series':{col:[float|None,...],...} (paralela a t por value_col, None si el valor no es convertible a float),'n':int}. En error (sin lanzar): {'status':'error','error':str,'time_col':str,'t':[],'series':{},'n':0}. Errores: query_fn None, value_cols vacia, table/time_col vacios, o query_fn devuelve status!='ok' (se propaga su error)."
tested: true
tests: ["test_golden_t_y_series_alineadas", "test_valor_no_convertible_da_none", "test_value_cols_vacia_status_error", "test_query_fn_status_error_propaga", "test_query_fn_none_da_error_sin_reventar", "test_sql_contiene_order_by_y_limit"]
test_file_path: "python/functions/datascience/extract_timeseries_raw_test.py"
file_path: "python/functions/datascience/extract_timeseries_raw.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience import extract_timeseries_raw
from infra import duckdb_query_readonly
# El lector read-only se inyecta como closure (igual que el `_q` de profile_table).
db = "data/ventas.duckdb"
def _q(sql):
return duckdb_query_readonly(db, sql)
res = extract_timeseries_raw(_q, "ventas_diarias", "fecha", ["importe", "unidades"])
# res == {
# "status": "ok",
# "time_col": "fecha",
# "t": ["2024-01-01", "2024-01-02", ...],
# "series": {"importe": [1234.5, 980.0, ...], "unidades": [12.0, 9.0, ...]},
# "n": 365,
# }
# Se entrega al capitulo TIMESERIES sin que este toque la BD:
ctx = {"timeseries_raw": res}
```
## Cuando usarla
Cuando el capitulo TIMESERIES de AutomaticEDA necesita pintar una serie
valor-vs-tiempo (o conteo por periodo) y NO debe abrir la base de datos por su
cuenta: extraes aqui las fechas + columnas numericas ordenadas y se las pasas en
`ctx['timeseries_raw']`. Usala tambien siempre que quieras la secuencia cruda
ordenada cronologicamente de una o varias columnas para alimentar otros
contrastes de serie (ADF/KPSS, ACF/PACF, STL) reutilizando un unico lector
read-only inyectado, en vez de hacer N muestreos a mano.
## Gotchas
- **Impura**: lee de la base de datos a traves de `query_fn`. No abre conexiones
por su cuenta — depende por completo del lector inyectado. Sigue el estilo
dict-no-throw del grupo `eda`: nunca lanza; ante cualquier fallo devuelve
`{"status":"error","error":...}` con `t=[]`, `series={}`, `n=0`.
- **`error_type` en el frontmatter es `error_go_core` por convencion del registry**
(toda funcion impura debe declararlo y el indexer lo exige), pero el codigo
NO lanza esa excepcion: degrada al dict de error. Es metadata, no comportamiento.
- **No loguear los datos crudos**: `t`/`series` pueden contener datos sensibles
(igual que un HAR). No volcar el dict completo a logs ni a telemetria; en
trazas usa solo `n` y los nombres de columna.
- **Alineacion por fila**: `series[col][i]` corresponde a `t[i]`. Un valor no
convertible a float se guarda como `None` (no se descarta la fila) para no
romper la alineacion temporal.
- **Orden**: el orden cronologico depende del `ORDER BY "time_col"` del backend.
Si `time_col` esta guardada como texto con formato no lexicograficamente
ordenable (p.ej. `DD/MM/YYYY`), el orden no sera el real — normaliza la columna
a date/timestamp antes, o pasa una columna ya ordenable.
- **`max_rows`**: con LIMIT, si la tabla supera `max_rows` obtienes solo el primer
tramo cronologico, no un muestreo uniforme. Sube `max_rows` si necesitas el rango
completo.
@@ -0,0 +1,122 @@
"""extract_timeseries_raw — extrae la serie temporal CRUDA de una tabla.
Lector read-only inyectado: recibe `query_fn(sql) -> dict` con el mismo contrato
que duckdb_query_readonly / pg_query (y que el `_q` de profile_table):
`{"status": "ok", "rows": [{col: val, ...}, ...]}`. Esta funcion NO abre ninguna
conexion por su cuenta — solo usa `query_fn`. Construye UNA sola query ordenada
por la columna temporal y devuelve las fechas (`t`) mas cada columna numerica en
listas paralelas (`series`), listas para alimentar el render del capitulo
TIMESERIES de AutomaticEDA (linea valor-vs-tiempo + conteo por periodo) sin que
el capitulo toque la base de datos: recibe esto en `ctx['timeseries_raw']`.
Estilo dict-no-throw del grupo `eda`: nunca lanza; captura cualquier excepcion y
degrada a `{"status": "error", "error": str, ...}`.
"""
from datetime import date, datetime
def _to_float(value):
"""Convierte un valor a float de forma defensiva. None si no es convertible."""
if value is None:
return None
if isinstance(value, bool):
# Un bool es subclase de int en Python; no es un valor de serie valido.
return None
if isinstance(value, (int, float)):
return float(value)
s = str(value).strip()
if not s:
return None
try:
return float(s)
except (TypeError, ValueError):
return None
def _to_iso(value):
"""Convierte un valor temporal a string ISO conservando el orden de la query.
date/datetime -> isoformat(); cualquier otro valor (string, etc.) -> str().
None se preserva como None.
"""
if value is None:
return None
if isinstance(value, (datetime, date)):
return value.isoformat()
return str(value)
def extract_timeseries_raw(query_fn, table, time_col, value_cols, max_rows=5000):
"""Extrae la serie temporal cruda (fechas + columnas numericas) de una tabla.
Args:
query_fn: callable lector read-only del backend activo. Recibe un string
SQL y devuelve un dict {"status": "ok", "rows": [{col: val, ...}]}
(mismo contrato que duckdb_query_readonly / el `_q` de profile_table).
No se abre ninguna conexion aqui: toda la lectura pasa por query_fn.
table: nombre de la tabla.
time_col: nombre de la columna de orden temporal.
value_cols: lista de nombres de columnas numericas a extraer.
max_rows: limite de filas (LIMIT). Default 5000.
Returns:
dict (nunca lanza):
{
"status": "ok" | "error",
"error": str, # solo si status == "error"
"time_col": str,
"t": [str, ...], # time_col como ISO string, en orden
"series": {col: [float|None, ...], ...}, # paralela a t por columna
"n": int # nº de filas devueltas
}
"""
base = {"status": "ok", "time_col": time_col, "t": [], "series": {}, "n": 0}
try:
if query_fn is None:
return {**base, "status": "error", "error": "query_fn es None"}
if not value_cols:
return {**base, "status": "error", "error": "value_cols vacío"}
if not table or not time_col:
return {
**base,
"status": "error",
"error": "table y time_col son obligatorios",
}
# Identificadores escapados con comillas dobles (como hace profile_table)
# para tolerar nombres con mayusculas/espacios/palabras reservadas.
cols_sql = ", ".join(f'"{c}"' for c in value_cols)
sql = (
f'SELECT "{time_col}", {cols_sql} FROM "{table}" '
f'WHERE "{time_col}" IS NOT NULL '
f'ORDER BY "{time_col}" '
f"LIMIT {int(max_rows)}"
)
q = query_fn(sql)
if not isinstance(q, dict) or q.get("status") != "ok":
err = (
q.get("error", "query_fn fallo")
if isinstance(q, dict)
else "query_fn no devolvio un dict"
)
return {**base, "status": "error", "error": err}
rows = q.get("rows", []) or []
t = []
series = {c: [] for c in value_cols}
for row in rows:
t.append(_to_iso(row.get(time_col)))
for c in value_cols:
series[c].append(_to_float(row.get(c)))
return {
"status": "ok",
"time_col": time_col,
"t": t,
"series": series,
"n": len(t),
}
except Exception as e: # noqa: BLE001 - dict-no-throw: degradar, nunca lanzar
return {**base, "status": "error", "error": str(e)}
@@ -0,0 +1,109 @@
"""Tests para extract_timeseries_raw.
No usa DuckDB real: inyecta un query_fn FAKE (closure) que devuelve filas
predefinidas y, opcionalmente, captura el SQL recibido para verificar la query
generada (ORDER BY por la columna temporal + LIMIT). Asi el test es
autocontenido y no depende de ningun backend.
"""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from extract_timeseries_raw import extract_timeseries_raw
def _fake_query(rows, captured=None, status="ok", error=None):
"""Crea un query_fn FAKE.
`captured` (lista opcional) recibe el SQL ejecutado para poder inspeccionarlo.
`status`/`error` permiten simular un fallo del backend.
"""
def _q(sql):
if captured is not None:
captured.append(sql)
if status != "ok":
return {"status": "error", "error": error or "boom"}
return {"status": "ok", "rows": rows}
return _q
def test_golden_t_y_series_alineadas():
"""Golden: t y series alineadas, floats convertidos, n correcto."""
rows = [
{"fecha": "2024-01-01", "ventas": "10", "stock": 5},
{"fecha": "2024-01-02", "ventas": "20.5", "stock": 7},
{"fecha": "2024-01-03", "ventas": 30, "stock": 9},
]
res = extract_timeseries_raw(_fake_query(rows), "t", "fecha", ["ventas", "stock"])
assert res["status"] == "ok"
assert res["n"] == 3
assert res["time_col"] == "fecha"
assert res["t"] == ["2024-01-01", "2024-01-02", "2024-01-03"]
assert res["series"]["ventas"] == [10.0, 20.5, 30.0]
assert res["series"]["stock"] == [5.0, 7.0, 9.0]
def test_valor_no_convertible_da_none():
"""Valor no convertible a float -> None en la serie (alineacion preservada)."""
rows = [
{"fecha": "2024-01-01", "ventas": "abc"},
{"fecha": "2024-01-02", "ventas": None},
{"fecha": "2024-01-03", "ventas": "12.5"},
]
res = extract_timeseries_raw(_fake_query(rows), "t", "fecha", ["ventas"])
assert res["status"] == "ok"
assert res["series"]["ventas"] == [None, None, 12.5]
assert res["n"] == 3
def test_value_cols_vacia_status_error():
"""value_cols vacia -> status error con t/series/n vacios."""
res = extract_timeseries_raw(_fake_query([]), "t", "fecha", [])
assert res["status"] == "error"
assert "value_cols" in res["error"]
assert res["t"] == []
assert res["series"] == {}
assert res["n"] == 0
def test_query_fn_status_error_propaga():
"""query_fn que devuelve status != ok -> se propaga como error."""
res = extract_timeseries_raw(
_fake_query([], status="error", error="db locked"),
"t",
"fecha",
["ventas"],
)
assert res["status"] == "error"
assert "db locked" in res["error"]
assert res["n"] == 0
def test_query_fn_none_da_error_sin_reventar():
"""query_fn None -> error degradado, sin excepcion."""
res = extract_timeseries_raw(None, "t", "fecha", ["ventas"])
assert res["status"] == "error"
assert res["t"] == []
assert res["n"] == 0
def test_sql_contiene_order_by_y_limit():
"""La query generada ordena por time_col y aplica el LIMIT sobre la tabla."""
captured = []
rows = [{"fecha": "2024-01-01", "ventas": 1}]
extract_timeseries_raw(
_fake_query(rows, captured),
"ventas_tbl",
"fecha",
["ventas"],
max_rows=123,
)
assert len(captured) == 1
sql = captured[0]
assert 'ORDER BY "fecha"' in sql
assert "LIMIT 123" in sql
assert 'FROM "ventas_tbl"' in sql
@@ -0,0 +1,79 @@
---
name: profile_datetime
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def profile_datetime(values: list) -> dict"
description: "Perfil minimo de una columna fecha/datetime para la cabecera del capitulo TIMESERIES de AutomaticEDA. Acepta datetime.date, datetime.datetime y strings ISO mezclados, parsea defensivamente e ignora lo no parseable (nunca lanza). Devuelve rango (min/max ISO), n, n_distinct, span_days, frecuencia inferida (daily/weekly/monthly/quarterly/yearly/irregular/unknown) a partir del paso mediano entre fechas distintas, is_regular (pasos ~constantes), n_gaps (huecos en la rejilla) y median_step_days. Solo stdlib (datetime + statistics)."
tags: [statistics, timeseries, datetime, profiling, frequency, eda, automatic_eda, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [datetime, statistics]
params:
- name: values
desc: "lista de valores fecha. Acepta datetime.date, datetime.datetime y strings ISO ('2021-06-28', '2021-06-28T00:00:00', '2021-06-28 12:00:00'). None, vacios y no parseables se ignoran; tz-aware se normaliza a naive. Si values es None o no iterable se trata como lista vacia."
output: "dict SIEMPRE presente con: 'min'/'max' (ISO date YYYY-MM-DD o None), 'n' (valores parseables), 'n_distinct' (fechas unicas), 'span_days' (float o None), 'freq' (daily|weekly|monthly|quarterly|yearly|irregular|unknown), 'is_regular' (bool), 'n_gaps' (int), 'median_step_days' (float o None) y 'note' (str). Con <2 valores o una sola fecha distinta: freq='unknown', is_regular=False, n_gaps=0, median_step_days=None y nota. Nunca lanza."
tested: true
tests: ["test_serie_diaria_regular_golden", "test_serie_mensual_freq_monthly", "test_serie_con_hueco_cuenta_gaps", "test_strings_iso_mezclados_con_datetime", "test_lista_vacia_y_none_devuelve_unknown", "test_valores_no_parseables_ignorados", "test_span_days_correcto", "test_una_sola_fecha_es_coherente"]
test_file_path: "python/functions/datascience/profile_datetime_test.py"
file_path: "python/functions/datascience/profile_datetime.py"
---
## Ejemplo
```python
from datascience import profile_datetime
from datetime import date, datetime, timedelta
# Serie diaria regular de 30 dias
fechas = [date(2021, 1, 1) + timedelta(days=i) for i in range(30)]
res = profile_datetime(fechas)
res["freq"] # -> "daily"
res["is_regular"] # -> True
res["n_gaps"] # -> 0
res["min"], res["max"] # -> ("2021-01-01", "2021-01-30")
res["span_days"] # -> 29.0
# Acepta strings ISO mezclados con objetos datetime/date; ignora lo no parseable
profile_datetime(["2021-06-28", datetime(2021, 6, 29, 12), "basura", None])["n"] # -> 2
# Columna vacia o sin fechas validas
profile_datetime([])["freq"] # -> "unknown" + note "datos insuficientes"
```
## Cuando usarla
Cuando construyes la cabecera del capitulo TIMESERIES de un EDA y necesitas
caracterizar la columna de fecha antes de modelar: que rango cubre, cada cuanto
llegan los datos (frecuencia), si la cadencia es regular y si hay huecos en la
rejilla temporal. Es el complemento de fecha al perfil numerico/categorico del
TableProfile (cierra el `datetime{}=None` pendiente). Pasale la columna de fechas
en bruto (tal cual venga de la BD: dates, datetimes o strings ISO) y usa `freq` +
`is_regular` + `n_gaps` para decidir si conviene resamplear, rellenar huecos o
desestacionalizar mas adelante.
## Gotchas
- Es pura y stdlib-only, pero la inferencia de `freq` es heuristica por bandas
sobre el **paso mediano entre fechas distintas** (se deduplica antes de medir).
Cualquier paso fuera de las bandas conocidas (incluido sub-diario, p.ej. datos
horarios) cae en `"irregular"`: no hay banda hourly.
- El analisis de frecuencia/regularidad/huecos necesita **>=2 fechas distintas**.
Con 0-1 valores parseables o una sola fecha unica, `freq="unknown"`,
`median_step_days=None` y `n_gaps=0`, pero `min`/`max`/`span_days` siguen siendo
coherentes si hay al menos una fecha.
- `min`/`max` se reportan como ISO **date** (`YYYY-MM-DD`); la hora se conserva
internamente para calcular `span_days` y `median_step_days` (que pueden ser
fraccionarios con datetimes sub-diarios) pero no aparece en min/max.
- Los datetime con zona horaria se normalizan a naive (se descarta el tzinfo) para
poder mezclarlos con fechas naive sin que las restas lancen; esto puede desplazar
la fecha en datetimes con offset grande. Para EDA es despreciable.
- `is_regular` usa tolerancia ±25% sobre el paso mediano y umbral del 80% de los
pasos dentro de banda; series de "primero de mes" (deltas 28-31) salen regulares.
- `n_gaps` solo se calcula cuando `freq` es una rejilla regular conocida; con
`freq` `"irregular"` o `"unknown"` siempre es 0.
@@ -0,0 +1,183 @@
"""Perfil minimo de una columna fecha/datetime para la cabecera TIMESERIES (grupo eda).
Funcion pura y determinista que resume una columna temporal: rango (min/max),
numero de fechas distintas, frecuencia inferida (daily/weekly/monthly/quarterly/
yearly/irregular), regularidad de los pasos, huecos respecto a la rejilla inferida
y paso mediano entre fechas consecutivas. Cierra el `datetime{}=None` que hoy deja
pendiente el TableProfile de AutomaticEDA.
Acepta valores heterogeneos (``datetime.date``, ``datetime.datetime`` y strings
ISO como ``"2021-06-28"``, ``"2021-06-28T00:00:00"`` o ``"2021-06-28 12:00:00"``),
parsea de forma defensiva, ignora lo que no se puede parsear y NUNCA lanza.
Solo usa stdlib (``datetime`` + ``statistics``).
"""
from __future__ import annotations
import statistics
from datetime import date, datetime
def _parse_one(v) -> datetime | None:
"""Parsea un valor a ``datetime`` naive, o devuelve None si no es una fecha.
Acepta ``datetime.datetime``, ``datetime.date`` y strings ISO. Cualquier
datetime con zona horaria se normaliza a naive (se descarta el tzinfo) para
poder mezclarlo con fechas naive sin que las restas lancen ``TypeError``.
"""
if v is None or isinstance(v, bool):
return None
# datetime es subclase de date: comprobar datetime primero.
if isinstance(v, datetime):
return v.replace(tzinfo=None)
if isinstance(v, date):
return datetime(v.year, v.month, v.day)
if isinstance(v, str):
s = v.strip()
if not s:
return None
try:
dt = datetime.fromisoformat(s)
except ValueError:
return None
return dt.replace(tzinfo=None)
return None
def _infer_freq(median_step_days: float) -> str:
"""Clasifica la frecuencia a partir del paso mediano (en dias) entre fechas.
Bandas con tolerancia: ~1 dia -> daily, ~7 -> weekly, 28-31 -> monthly,
89-92 -> quarterly, 360-366 -> yearly. Cualquier paso fuera de las bandas
(incluido sub-diario) -> irregular.
"""
m = median_step_days
if 0.5 <= m <= 1.5:
return "daily"
if 6.0 <= m <= 8.0:
return "weekly"
if 28.0 <= m <= 31.0:
return "monthly"
if 89.0 <= m <= 92.0:
return "quarterly"
if 360.0 <= m <= 366.0:
return "yearly"
return "irregular"
def profile_datetime(values: list) -> dict:
"""Perfila una columna de fechas para la cabecera del capitulo TIMESERIES.
Funcion pura y determinista: no hace I/O, no muta el input y nunca lanza.
El analisis de frecuencia, regularidad y huecos se hace sobre las **fechas
distintas ordenadas** (se deduplica antes de calcular los pasos): los valores
repetidos generarian pasos de 0 dias que distorsionarian el mediano y la
inferencia. ``n`` cuenta los valores parseables (con duplicados) y
``n_distinct`` las fechas unicas.
Args:
values: lista de valores fecha. Acepta ``datetime.date``,
``datetime.datetime`` y strings ISO (``"2021-06-28"``,
``"2021-06-28T00:00:00"``, ``"2021-06-28 12:00:00"``). Los valores
None, vacios o no parseables se ignoran. Si ``values`` es None o no
iterable se trata como lista vacia.
Returns:
Siempre un dict con esta forma::
{
"min": str | None, # fecha minima ISO date (YYYY-MM-DD)
"max": str | None, # fecha maxima ISO date
"n": int, # nº de valores fecha parseables
"n_distinct": int, # nº de fechas distintas
"span_days": float | None, # (max - min) en dias
"freq": str, # daily|weekly|monthly|quarterly|
# yearly|irregular|unknown
"is_regular": bool, # pasos ~constantes (tolerancia ±25%)
"n_gaps": int, # saltos > ~1.5x el paso mediano
"median_step_days": float | None, # paso mediano entre fechas
"note": str # "" o nota corta
}
Con menos de 2 valores parseables (o una sola fecha distinta) devuelve
``freq="unknown"``, ``is_regular=False``, ``n_gaps=0``,
``median_step_days=None`` y la nota correspondiente, manteniendo min/max
y span_days coherentes cuando hay al menos una fecha.
"""
base = {
"min": None,
"max": None,
"n": 0,
"n_distinct": 0,
"span_days": None,
"freq": "unknown",
"is_regular": False,
"n_gaps": 0,
"median_step_days": None,
"note": "",
}
if values is None:
values = []
try:
iterator = list(values)
except TypeError:
iterator = []
parsed: list[datetime] = []
for v in iterator:
dt = _parse_one(v)
if dt is not None:
parsed.append(dt)
n = len(parsed)
base["n"] = n
if n == 0:
base["note"] = "datos insuficientes"
return base
distinct = sorted(set(parsed))
n_distinct = len(distinct)
dt_min = min(parsed)
dt_max = max(parsed)
base["n_distinct"] = n_distinct
base["min"] = dt_min.date().isoformat()
base["max"] = dt_max.date().isoformat()
base["span_days"] = round((dt_max - dt_min).total_seconds() / 86400.0, 6)
# Sin al menos dos fechas distintas no hay pasos que medir.
if n_distinct < 2:
base["note"] = "datos insuficientes" if n < 2 else "una sola fecha distinta"
return base
steps = [
(distinct[i + 1] - distinct[i]).total_seconds() / 86400.0
for i in range(n_distinct - 1)
]
median_step = float(statistics.median(steps))
base["median_step_days"] = round(median_step, 6)
freq = _infer_freq(median_step)
base["freq"] = freq
# Regularidad: >=80% de los pasos dentro de ±25% del paso mediano.
if median_step > 0:
tol = 0.25 * median_step
within = sum(1 for s in steps if abs(s - median_step) <= tol)
base["is_regular"] = (within / len(steps)) >= 0.8
else:
base["is_regular"] = False
# Huecos: pasos que superan ~1.5x el mediano. Solo tiene sentido cuando la
# frecuencia es una rejilla regular conocida (no irregular/unknown).
if freq not in ("unknown", "irregular") and median_step > 0:
threshold = 1.5 * median_step
base["n_gaps"] = sum(1 for s in steps if s > threshold)
else:
base["n_gaps"] = 0
return base
@@ -0,0 +1,127 @@
"""Tests para profile_datetime."""
from datetime import date, datetime, timedelta
from profile_datetime import profile_datetime
def test_serie_diaria_regular_golden():
# 30 dias consecutivos: frecuencia diaria, regular, sin huecos.
fechas = [date(2021, 1, 1) + timedelta(days=i) for i in range(30)]
res = profile_datetime(fechas)
assert res["n"] == 30
assert res["n_distinct"] == 30
assert res["min"] == "2021-01-01"
assert res["max"] == "2021-01-30"
assert res["span_days"] == 29.0
assert res["freq"] == "daily"
assert res["is_regular"] is True
assert res["n_gaps"] == 0
assert res["median_step_days"] == 1.0
assert res["note"] == ""
def test_serie_mensual_freq_monthly():
# Primero de mes durante 14 meses: paso mediano ~30/31 dias -> monthly.
fechas = []
y, m = 2021, 1
for _ in range(14):
fechas.append(date(y, m, 1))
m += 1
if m > 12:
m = 1
y += 1
res = profile_datetime(fechas)
assert res["n"] == 14
assert res["freq"] == "monthly"
assert res["min"] == "2021-01-01"
assert res["max"] == "2022-02-01"
assert 28.0 <= res["median_step_days"] <= 31.0
def test_serie_con_hueco_cuenta_gaps():
# Serie diaria con un hueco de 3 dias (faltan i=7,8,9) -> n_gaps >= 1.
fechas = [
date(2021, 1, 1) + timedelta(days=i)
for i in range(20)
if i not in (7, 8, 9)
]
res = profile_datetime(fechas)
assert res["freq"] == "daily"
assert res["n_gaps"] >= 1
assert res["median_step_days"] == 1.0
def test_strings_iso_mezclados_con_datetime():
# Mezcla de strings ISO (varios formatos) y objetos datetime/date.
valores = [
"2021-06-28",
datetime(2021, 6, 29, 12, 0, 0),
"2021-06-30T00:00:00",
date(2021, 7, 1),
]
res = profile_datetime(valores)
assert res["n"] == 4
assert res["n_distinct"] == 4
assert res["min"] == "2021-06-28"
assert res["max"] == "2021-07-01"
assert res["freq"] == "daily"
assert res["note"] == ""
def test_lista_vacia_y_none_devuelve_unknown():
for entrada in ([], None):
res = profile_datetime(entrada)
assert res["n"] == 0
assert res["n_distinct"] == 0
assert res["min"] is None
assert res["max"] is None
assert res["span_days"] is None
assert res["freq"] == "unknown"
assert res["is_regular"] is False
assert res["n_gaps"] == 0
assert res["median_step_days"] is None
assert res["note"] == "datos insuficientes"
def test_valores_no_parseables_ignorados():
# Strings basura, None, ints y un date valido mezclados: ignora lo no fecha.
valores = [
"no es una fecha",
None,
"2021-01-01",
"2021-01-02",
12345,
"tampoco",
date(2021, 1, 3),
"",
]
res = profile_datetime(valores)
assert res["n"] == 3 # solo 3 fechas parseables
assert res["n_distinct"] == 3
assert res["freq"] == "daily"
assert res["min"] == "2021-01-01"
assert res["max"] == "2021-01-03"
def test_span_days_correcto():
# Dos fechas a un anio de distancia: span 365 dias -> yearly.
res = profile_datetime([date(2020, 1, 1), date(2020, 12, 31)])
assert res["n"] == 2
assert res["n_distinct"] == 2
assert res["span_days"] == 365.0
assert res["median_step_days"] == 365.0
assert res["freq"] == "yearly"
def test_una_sola_fecha_es_coherente():
# Un unico valor: min == max, span 0, freq unknown, nota datos insuficientes.
res = profile_datetime(["2021-06-28"])
assert res["n"] == 1
assert res["n_distinct"] == 1
assert res["min"] == "2021-06-28"
assert res["max"] == "2021-06-28"
assert res["span_days"] == 0.0
assert res["freq"] == "unknown"
assert res["median_step_days"] is None
assert res["note"] == "datos insuficientes"
@@ -0,0 +1,72 @@
---
name: resample_timeseries
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def resample_timeseries(t: list, v: list, freq: str = \"auto\", agg: str = \"mean\", max_points: int = 400) -> dict"
description: "Agrega una serie temporal por periodo para graficar su evolucion y el CONTEO de observaciones por bucket. Nucleo del capitulo TIMESERIES de AutomaticEDA (grupo eda): recibe las fechas y los valores YA leidos (pura, sin tocar ninguna base de datos), empareja t[i] con v[i] por indice, parsea fechas defensivamente, trunca cada fecha al inicio de su bucket (daily/weekly/monthly/quarterly/yearly), y agrega los valores numericos validos por bucket mientras cuenta TODAS las observaciones con fecha valida (densidad temporal, incluida la fila cuyo valor es None). freq='auto' infiere del delta mediano entre fechas. Si hay mas buckets que max_points hace downsampling uniforme conservando primero y ultimo. Estilo dict-no-throw: NUNCA lanza; entrada vacia o longitudes incompatibles devuelve listas vacias + note='datos insuficientes'."
tags: [eda, timeseries, resample, aggregate, profiling, datascience, time]
params:
- name: t
desc: "Lista de fechas paralela a v. Acepta strings ISO ('YYYY-MM-DD' o 'YYYY-MM-DDTHH:MM:SS', con 'Z' opcional), datetime.date o datetime.datetime. Se parsea defensivamente; los pares cuya fecha no parsea se descartan junto con su valor."
- name: v
desc: "Lista de valores numericos (float/int) paralela a t. Puede contener None o valores no numericos: se ignoran en la agregacion pero la fila sigue contando en 'count' si su fecha es valida. bool, NaN e Inf se tratan como no numericos."
- name: freq
desc: "Granularidad del bucket: 'auto' (infiere del delta mediano en dias entre fechas: <=3 daily, <=16 weekly, <=75 monthly, <=200 quarterly, mayor yearly) o explicita en {daily, weekly, monthly, quarterly, yearly}. Una frecuencia desconocida cae a 'auto'."
- name: agg
desc: "Agregacion por bucket sobre los valores numericos validos: 'mean' | 'sum' | 'median' | 'last' (valor de la observacion cronologicamente mas reciente del bucket) | 'min' | 'max'. Una agregacion desconocida cae a 'mean'."
- name: max_points
desc: "Tope de buckets en la salida. Si n_buckets > max_points hace downsampling uniforme (1 de cada k buckets equiespaciados, conservando el primero y el ultimo) para no saturar el grafico del PDF/PPTX. max_points<=0 desactiva el limite."
output: "Dict siempre con las mismas claves: t (lista de etiquetas ISO 'YYYY-MM-DD' por bucket, orden cronologico), v (lista paralela del valor agregado por bucket segun agg; None si el bucket no tiene ningun valor numerico valido), count (lista paralela del nº de observaciones con fecha valida por bucket), freq (frecuencia efectivamente usada), agg (agregacion usada), n_in (nº de pares (t,v) con fecha valida que entraron), n_buckets (nº de buckets antes del downsample), downsampled (bool, True si se aplico downsampling), note ('' o 'datos insuficientes' cuando no hay pares validos / longitudes incompatibles / listas vacias). Numericos de v en float, count en int."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: true
tests: ["test_daily_a_mensual_mean", "test_agg_sum_y_last", "test_count_cuenta_observacion_con_valor_none", "test_downsampling_respeta_max_points_y_extremos", "test_freq_auto_infiere_mensual", "test_edge_listas_vacias_o_desiguales"]
test_file_path: "python/functions/datascience/resample_timeseries_test.py"
file_path: "python/functions/datascience/resample_timeseries.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from datascience.resample_timeseries import resample_timeseries
# Serie diaria agregada a buckets mensuales: media del valor + conteo de filas.
t = ["2020-01-01", "2020-01-15", "2020-02-01", "2020-02-10", "2020-02-20"]
v = [10.0, 20.0, 30.0, 40.0, 50.0]
r = resample_timeseries(t, v, freq="monthly", agg="mean")
print(r["t"]) # ['2020-01-01', '2020-02-01']
print(r["v"]) # [15.0, 40.0]
print(r["count"]) # [2, 3] <- densidad: nº de observaciones por mes
print(r["freq"], r["downsampled"]) # monthly False
# freq='auto' infiere la granularidad del delta mediano entre fechas.
mensual = [f"2022-{m:02d}-01" for m in range(1, 13)]
print(resample_timeseries(mensual, list(range(1, 13)))["freq"]) # monthly
```
## Cuando usarla
- Usala en el capitulo TIMESERIES de `AutomaticEDA` para construir, a partir de una columna temporal (`detect_time_column`) y una columna numerica, la doble serie que el renderer dibuja: la EVOLUCION del valor agregado por periodo y el CONTEO de observaciones por periodo.
- Cuando ya tengas las fechas y los valores leidos en memoria (de DuckDB, polars, CSV, etc.) y solo necesites agregarlos por dia/semana/mes/trimestre/año sin volver a tocar la base de datos — esta funcion es pura y recibe los datos por parametro.
- Cuando quieras un downsampling controlado para que una serie muy larga (miles de fechas) quepa en un grafico de un PDF/PPTX sin saturarlo, conservando el primer y el ultimo punto.
- Cuando no sepas la cadencia de la serie: pasa `freq="auto"` y deja que la infiera del delta mediano.
## Gotchas
- Funcion pura, sin I/O y determinista. NUNCA lanza: ante entrada invalida (listas vacias, longitudes distintas o todas las fechas no parseables) devuelve listas vacias + `note="datos insuficientes"`.
- `count` cuenta OBSERVACIONES con fecha valida en el bucket (densidad temporal), aunque su valor numerico sea `None`/no numerico. `v` agrega SOLO los valores numericos validos del bucket; si no hay ninguno, `v` del bucket es `None` mientras `count` sigue reflejando las filas. No confundas `count` (filas) con el nº de valores agregados.
- `bool`, `NaN` e `Inf` se tratan como NO numericos (se ignoran en `v`). Un string que no parsea a numero tambien se ignora en `v` pero su fila cuenta si la fecha es valida.
- El truncado de bucket usa el inicio del periodo: semana = lunes ISO (`weekday()==0`), mes = dia 1, trimestre = primer dia del trimestre (ene/abr/jul/oct), año = 1 de enero. La etiqueta de cada bucket es esa fecha de inicio en ISO `YYYY-MM-DD`, no un rango.
- El downsampling (`n_buckets > max_points`) reduce la salida a `<= max_points` puntos equiespaciados conservando primero y ultimo, pero `n_buckets` SIEMPRE reporta el conteo real previo al recorte. Si necesitas todos los buckets, sube `max_points` o ponlo `<=0`.
- Las fechas con hora se truncan a su `date()` antes de agrupar: la granularidad minima es el dia (no hay buckets horarios).
- `freq` desconocida o no-string cae a `"auto"`; `agg` desconocida cae a `"mean"`. El campo devuelto refleja la opcion efectivamente usada.
@@ -0,0 +1,275 @@
"""Agrega una serie temporal por periodo para el capitulo TIMESERIES (grupo eda).
Funcion pura y determinista: recibe las fechas y los valores YA leidos (nunca
toca una base de datos ni hace I/O) y los agrega por bucket temporal para poder
graficar la evolucion de la serie y, en paralelo, el CONTEO de observaciones por
periodo (densidad temporal).
Estilo "dict-no-throw" del grupo eda: NUNCA lanza excepcion, siempre devuelve el
mismo conjunto de claves. Lectura y parseo de fechas 100% defensivos. Solo usa la
libreria estandar (``datetime``, ``statistics``, ``re``).
"""
from __future__ import annotations
import datetime
import re
import statistics
# Frecuencias soportadas, de mas fina a mas gruesa.
_FREQS = ("daily", "weekly", "monthly", "quarterly", "yearly")
# Agregaciones soportadas.
_AGGS = ("mean", "sum", "median", "last", "min", "max")
# Acepta el inicio de una fecha ISO con cualquier separador posterior
# (incluido un caracter raro entre la fecha y la hora).
_DATE_RE = re.compile(r"(\d{4})-(\d{2})-(\d{2})")
def _to_date(x) -> "datetime.date | None":
"""Parsea defensivamente un valor a ``datetime.date``; devuelve None si falla."""
if x is None:
return None
# datetime es subclase de date: comprobarlo primero.
if isinstance(x, datetime.datetime):
return x.date()
if isinstance(x, datetime.date):
return x
s = str(x).strip()
if not s:
return None
# Camino feliz: ISO completo (con o sin hora, con o sin 'Z' final).
try:
s2 = s[:-1] if s.endswith("Z") else s
return datetime.datetime.fromisoformat(s2).date()
except ValueError:
pass
# Fallback robusto: extrae el prefijo YYYY-MM-DD con cualquier separador.
m = _DATE_RE.match(s)
if m:
try:
return datetime.date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
return None
def _to_number(x) -> "float | None":
"""Convierte a float si es numerico finito; devuelve None en otro caso."""
if x is None:
return None
if isinstance(x, bool):
# bool es subclase de int: lo tratamos como no-numerico para una serie.
return None
try:
f = float(x)
except (TypeError, ValueError):
return None
# Descarta NaN / Inf (no agregables de forma estable).
if f != f or f in (float("inf"), float("-inf")):
return None
return f
def _infer_freq(dates_sorted: list) -> str:
"""Infiere la frecuencia desde el delta mediano (en dias) entre fechas."""
if len(dates_sorted) < 2:
return "daily"
diffs = [
(dates_sorted[i + 1] - dates_sorted[i]).days
for i in range(len(dates_sorted) - 1)
]
diffs = [d for d in diffs if d > 0] # ignora duplicados del mismo dia
if not diffs:
return "daily"
med = statistics.median(diffs)
if med <= 3:
return "daily"
if med <= 16:
return "weekly"
if med <= 75:
return "monthly"
if med <= 200:
return "quarterly"
return "yearly"
def _bucket_start(d: "datetime.date", freq: str) -> "datetime.date":
"""Trunca una fecha al inicio de su bucket segun la frecuencia."""
if freq == "weekly":
return d - datetime.timedelta(days=d.weekday()) # lunes ISO
if freq == "monthly":
return datetime.date(d.year, d.month, 1)
if freq == "quarterly":
first_month = ((d.month - 1) // 3) * 3 + 1
return datetime.date(d.year, first_month, 1)
if freq == "yearly":
return datetime.date(d.year, 1, 1)
return d # daily (o cualquier otra cosa): la propia fecha
def _downsample_indices(n: int, max_points: int) -> list:
"""Indices equiespaciados conservando primero y ultimo (<= max_points)."""
if max_points <= 0 or max_points >= n:
return list(range(n))
if max_points == 1:
return [0]
idx = sorted({round(i * (n - 1) / (max_points - 1)) for i in range(max_points)})
return idx
def _empty(freq_req: str, agg: str) -> dict:
"""Resultado canonico cuando no hay datos suficientes."""
eff_freq = freq_req if freq_req in _FREQS else "auto"
return {
"t": [],
"v": [],
"count": [],
"freq": eff_freq,
"agg": agg if agg in _AGGS else "mean",
"n_in": 0,
"n_buckets": 0,
"downsampled": False,
"note": "datos insuficientes",
}
def resample_timeseries(
t: list,
v: list,
freq: str = "auto",
agg: str = "mean",
max_points: int = 400,
) -> dict:
"""Agrega una serie temporal por periodo (buckets) para graficarla.
Empareja ``t[i]`` con ``v[i]`` por indice, descarta los pares cuya fecha no
parsea, trunca cada fecha al inicio de su bucket segun ``freq`` y agrupa. Por
cada bucket devuelve el valor agregado (``agg`` sobre los valores numericos
validos) y el CONTEO de observaciones con fecha valida (densidad temporal),
independientemente de si su valor numerico es ``None``.
Funcion pura: no hace I/O, no muta los inputs, es determinista, NUNCA lanza.
Args:
t: lista de fechas paralela a ``v``. Acepta strings ISO
(``"YYYY-MM-DD"`` o ``"YYYY-MM-DDTHH:MM:SS"``, con ``Z`` opcional),
``datetime.date`` o ``datetime.datetime``. Se parsea defensivamente;
las fechas que no parsean se descartan junto con su valor.
v: lista de valores numericos (float/int). Puede contener ``None`` o
valores no numericos: estos se ignoran en la agregacion, pero la fila
sigue contando en ``count`` (siempre que su fecha sea valida).
freq: ``"auto"`` (infiere del delta mediano entre fechas) o uno de
``"daily"``, ``"weekly"``, ``"monthly"``, ``"quarterly"``,
``"yearly"``. Una frecuencia desconocida cae a ``"auto"``.
agg: agregacion por bucket: ``"mean"``, ``"sum"``, ``"median"``,
``"last"`` (valor de la observacion cronologicamente mas reciente),
``"min"`` o ``"max"``. Una agregacion desconocida cae a ``"mean"``.
max_points: si tras agregar hay mas buckets que este limite, se hace
downsampling uniforme (1 de cada k buckets equiespaciados,
conservando el primero y el ultimo) para no saturar el grafico.
Returns:
Siempre un dict con las mismas claves::
{
"t": [str, ...], # etiqueta ISO YYYY-MM-DD de cada bucket
"v": [float|None, ...], # valor agregado por bucket (None si vacio)
"count": [int, ...], # nº de observaciones con fecha valida
"freq": str, # frecuencia efectivamente usada
"agg": str, # agregacion usada
"n_in": int, # nº de pares (t,v) con fecha valida
"n_buckets": int, # nº de buckets antes del downsample
"downsampled": bool, # True si se aplico downsampling
"note": str, # "" o nota (p.ej. "datos insuficientes")
}
"""
agg = agg if agg in _AGGS else "mean"
freq_req = freq if isinstance(freq, str) else "auto"
# Validacion de entrada: deben ser listas de igual longitud y no vacias.
if (
not isinstance(t, list)
or not isinstance(v, list)
or len(t) == 0
or len(t) != len(v)
):
return _empty(freq_req, agg)
# Empareja por indice y descarta fechas no parseables.
parsed: list = [] # (date, original_index, number_or_None)
for i, (ti, vi) in enumerate(zip(t, v)):
d = _to_date(ti)
if d is None:
continue
parsed.append((d, i, _to_number(vi)))
n_in = len(parsed)
if n_in == 0:
return _empty(freq_req, agg)
# Resuelve la frecuencia efectiva.
if freq_req in _FREQS:
eff_freq = freq_req
else:
dates_sorted = sorted(d for d, _, _ in parsed)
eff_freq = _infer_freq(dates_sorted)
# Agrupa por bucket.
buckets: dict = {}
for d, idx, num in parsed:
b = _bucket_start(d, eff_freq)
slot = buckets.get(b)
if slot is None:
slot = {"count": 0, "vals": [], "last_key": None, "last_val": None}
buckets[b] = slot
slot["count"] += 1
if num is not None:
slot["vals"].append(num)
key = (d, idx)
if slot["last_key"] is None or key > slot["last_key"]:
slot["last_key"] = key
slot["last_val"] = num
ordered = sorted(buckets.items(), key=lambda kv: kv[0])
n_buckets = len(ordered)
def _aggregate(vals: list, last_val) -> "float | None":
if not vals:
return None
if agg == "sum":
return float(sum(vals))
if agg == "median":
return float(statistics.median(vals))
if agg == "last":
return float(last_val) if last_val is not None else None
if agg == "min":
return float(min(vals))
if agg == "max":
return float(max(vals))
return float(statistics.fmean(vals)) # mean (default)
t_out = [b.isoformat() for b, _ in ordered]
v_out = [_aggregate(s["vals"], s["last_val"]) for _, s in ordered]
c_out = [s["count"] for _, s in ordered]
downsampled = False
if n_buckets > max_points > 0:
keep = _downsample_indices(n_buckets, max_points)
t_out = [t_out[i] for i in keep]
v_out = [v_out[i] for i in keep]
c_out = [c_out[i] for i in keep]
downsampled = True
return {
"t": t_out,
"v": v_out,
"count": c_out,
"freq": eff_freq,
"agg": agg,
"n_in": n_in,
"n_buckets": n_buckets,
"downsampled": downsampled,
"note": "",
}
@@ -0,0 +1,118 @@
"""Tests para resample_timeseries (grupo eda)."""
import datetime
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from resample_timeseries import resample_timeseries
def test_daily_a_mensual_mean():
# Serie diaria agregada a buckets mensuales con agg="mean".
t = [
"2020-01-01", "2020-01-15",
"2020-02-01", "2020-02-10", "2020-02-20",
]
v = [10.0, 20.0, 30.0, 40.0, 50.0]
r = resample_timeseries(t, v, freq="monthly", agg="mean")
assert r["t"] == ["2020-01-01", "2020-02-01"]
assert r["v"] == [15.0, 40.0] # (10+20)/2 ; (30+40+50)/3
assert r["count"] == [2, 3]
assert r["freq"] == "monthly"
assert r["agg"] == "mean"
assert r["n_in"] == 5
assert r["n_buckets"] == 2
assert r["downsampled"] is False
assert r["note"] == ""
def test_agg_sum_y_last():
t = [
"2020-01-01", "2020-01-15",
"2020-02-01", "2020-02-10", "2020-02-20",
]
v = [10.0, 20.0, 30.0, 40.0, 50.0]
r_sum = resample_timeseries(t, v, freq="monthly", agg="sum")
assert r_sum["v"] == [30.0, 120.0]
assert r_sum["agg"] == "sum"
# last = valor de la observacion cronologicamente mas reciente del bucket,
# aunque el orden de entrada este desordenado.
t2 = ["2020-02-20", "2020-02-01", "2020-02-10", "2020-01-15", "2020-01-01"]
v2 = [50.0, 30.0, 40.0, 20.0, 10.0]
r_last = resample_timeseries(t2, v2, freq="monthly", agg="last")
assert r_last["t"] == ["2020-01-01", "2020-02-01"]
assert r_last["v"] == [20.0, 50.0] # Jan->2020-01-15=20 ; Feb->2020-02-20=50
assert r_last["agg"] == "last"
def test_count_cuenta_observacion_con_valor_none():
# Un bucket con un valor None: count cuenta la fila, v ignora el None.
t = ["2020-03-05", "2020-03-06", "2020-03-20"]
v = [None, 7.0, 9.0]
r = resample_timeseries(t, v, freq="monthly", agg="mean")
assert r["t"] == ["2020-03-01"]
assert r["count"] == [3] # 3 filas con fecha valida
assert r["v"] == [8.0] # media de los validos: (7+9)/2
assert r["n_in"] == 3
# Bucket entero sin ningun valor numerico valido -> v = None, count sigue.
r2 = resample_timeseries(
["2020-04-01", "2020-04-02"], [None, "n/a"], freq="monthly"
)
assert r2["t"] == ["2020-04-01"]
assert r2["count"] == [2]
assert r2["v"] == [None]
def test_downsampling_respeta_max_points_y_extremos():
base = datetime.date(2021, 1, 1)
t = [(base + datetime.timedelta(days=i)).isoformat() for i in range(500)]
v = [float(i) for i in range(500)]
r = resample_timeseries(t, v, freq="daily", agg="mean", max_points=400)
assert r["n_buckets"] == 500
assert r["downsampled"] is True
assert len(r["t"]) <= 400
assert len(r["t"]) == len(r["v"]) == len(r["count"])
# Primero y ultimo bucket conservados.
assert r["t"][0] == "2021-01-01"
assert r["t"][-1] == (base + datetime.timedelta(days=499)).isoformat()
def test_freq_auto_infiere_mensual():
# Fechas separadas ~1 mes -> auto infiere "monthly".
t = [f"2022-{m:02d}-01" for m in range(1, 13)]
v = [float(m) for m in range(1, 13)]
r = resample_timeseries(t, v, freq="auto", agg="mean")
assert r["freq"] == "monthly"
assert r["n_buckets"] == 12
assert r["count"] == [1] * 12
# Fechas diarias consecutivas -> auto infiere "daily".
base = datetime.date(2023, 1, 1)
td = [(base + datetime.timedelta(days=i)).isoformat() for i in range(20)]
rd = resample_timeseries(td, [float(i) for i in range(20)], freq="auto")
assert rd["freq"] == "daily"
def test_edge_listas_vacias_o_desiguales():
vacio = resample_timeseries([], [])
assert vacio["t"] == [] and vacio["v"] == [] and vacio["count"] == []
assert vacio["note"] == "datos insuficientes"
assert vacio["n_in"] == 0 and vacio["n_buckets"] == 0
desigual = resample_timeseries(["2020-01-01", "2020-01-02"], [1.0])
assert desigual["note"] == "datos insuficientes"
assert desigual["t"] == []
# Todas las fechas invalidas -> tambien insuficiente.
invalidas = resample_timeseries(["no-fecha", "tampoco"], [1.0, 2.0])
assert invalidas["note"] == "datos insuficientes"
assert invalidas["n_in"] == 0