feat(eda): capítulo TIMESERIES del AutomaticEDA (evolución + análisis de serie)

Capítulo nuevo build_timeseries(profile, ctx) -> Chapter|None del motor
AutomaticEDA. Cuando la tabla tiene columna de fecha/datetime, grafica la
evolución de cada columna numérica por periodo (valor agregado + conteo de filas)
y los paneles de descomposición STL y autocorrelación (ACF), con el análisis de
la serie: estacionariedad (ADF+KPSS), autocorrelación (Ljung-Box), fuerzas de
tendencia/estacionalidad (Hyndman) y la transformación sugerida (retornos o
diferencias) para evitar correlaciones espurias. Sin columna temporal devuelve
None. Consolida series OHLC casi idénticas en un único gráfico conservando el
análisis de cada columna.

La serie cruda llega por ctx['timeseries_raw'] (mismo patrón que modelos con
raw_numeric); las figuras son perezosas (Figure.make) y el paginador del núcleo
garantiza no-corte en PDF y PPTX. CHAPTER_VERSION 1.0.0.

Cubre los MUST del diseño (report 2043): MUST-9.1 (línea valor-vs-tiempo + conteo
por periodo), MUST-9.2 (paneles STL + ACF), MUST-9.3 (perfil datetime +
consolidación OHLC).

Funciones nuevas del registry (grupo eda), delegadas a fn-constructor, no inline:
- detect_time_column (pure): detecta la columna temporal y las numéricas
- profile_datetime (pure): rango/frecuencia/regularidad/huecos de la fecha
- resample_timeseries (pure): agrega la serie por periodo + conteo
- extract_timeseries_raw (impure): lee la serie cruda ordenada de DuckDB/PG

Verificación: 69 tests verdes (capítulo 9 + funciones 28 + núcleo/renderers);
golden real sobre seattle-weather (estacional) y aapl (OHLC) con PDF+PPTX sin
cortar nada (cols_cortadas=[]).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 15:35:42 +02:00
parent 415154d9a3
commit a69d14d38e
15 changed files with 2324 additions and 0 deletions
@@ -0,0 +1,613 @@
"""Time-series chapter (TIMESERIES) for AutomaticEDA.
This chapter applies **only when the table has a date/datetime column**. When it
does, it draws — exactly the user requirement — the evolution of the data over
time (the value of each numeric column aggregated per period *and* the count of
rows per period) plus the statistical analysis of the series (stationarity,
autocorrelation, trend and seasonality). When there is no temporal column
``build_timeseries`` returns ``None``.
Data sources, read defensively and never recomputed here:
- ``profile['columns']`` — to detect the time column and the numeric columns.
Delegated to the pure registry function ``detect_time_column`` (group ``eda``).
- ``profile['series'][col]`` — the per-column time-series analysis already
produced by ``profile_table(run_series=True)``: ``stationarity`` (ADF+KPSS),
``acf_pacf`` (ACF/PACF + Ljung-Box), ``stl`` (trend/seasonal/resid +
Hyndman strengths) and the levels/returns suggestion.
- ``ctx['timeseries_raw']`` (or ``profile['timeseries_raw']``) — the *raw* ordered
series ``{time_col, t:[iso...], series:{col:[float|None]}}`` needed to draw the
value-vs-time line and the per-period row count. Exactly like ``modelos`` reads
``raw_numeric`` from ``ctx``, this chapter looks for the raw series there and
degrades honestly when it is absent (it still renders the textual analysis).
The raw series is aggregated per period with the pure registry function
``resample_timeseries`` and the datetime header is built with ``profile_datetime``
(both group ``eda``). Every figure is emitted as a lazy ``Figure`` so the
renderers rasterize and scale it to fit a whole page/slide; tables go through
``DataTable``/``KVTable`` so the paginator splits them repeating the header. No
content is ever cut.
ctx keys this chapter consumes (all optional):
timeseries_raw : dict — ``{time_col, t:[...], series:{col:[...]}}`` raw
ordered series used to draw the value-vs-time line and the row-count
panel. When absent the chapter omits those figures (with a note) and
renders only the analysis available in ``profile['series']``.
Contract: build_<id>(profile, ctx) -> Chapter | None ; CHAPTER_VERSION = "x.y.z".
Reads everything defensively (``.get``) and never raises.
"""
from __future__ import annotations
from .. import model
# Pure/impure registry functions (group ``eda``) consumed by this chapter,
# imported defensively so the chapter still builds (degrading the affected
# section to a note) if any of them is somehow unavailable.
try:
from datascience.detect_time_column import detect_time_column
except Exception: # noqa: BLE001 — keep the chapter importable no matter what.
detect_time_column = None # type: ignore[assignment]
try:
from datascience.profile_datetime import profile_datetime
except Exception: # noqa: BLE001
profile_datetime = None # type: ignore[assignment]
try:
from datascience.resample_timeseries import resample_timeseries
except Exception: # noqa: BLE001
resample_timeseries = None # type: ignore[assignment]
CHAPTER_VERSION = "1.0.0"
CHAPTER_ID = "timeseries"
CHAPTER_TITLE = "Series temporales"
# Plain-Spanish gloss for the stationarity verdict of adf_kpss_stationarity.
_VERDICT_GLOSS = {
"stationary": "estacionaria: media y varianza estables en el tiempo; se "
"puede modelar directamente.",
"non_stationary": "no estacionaria: tiene tendencia o varianza cambiante "
"(raíz unitaria). Correlacionar o modelar sus niveles "
"produce relaciones espurias (Granger-Newbold); conviene "
"diferenciar o pasar a retornos.",
"inconclusive": "resultado no concluyente (ADF y KPSS discrepan): tratar con "
"cautela, probablemente cerca de la no estacionariedad.",
}
# OHLC-style name fragments used to collapse near-identical financial series.
_OHLC_HINTS = ("open", "high", "low", "close", "adj", "price", "vwap")
def _fmt_num(value, decimals: int = 3) -> str:
"""Compact, defensive number formatting shared with the other chapters."""
if value is None:
return ""
if isinstance(value, bool):
return "" if value else "no"
if isinstance(value, int):
return f"{value:,}".replace(",", ".")
if isinstance(value, float):
if value != value: # NaN
return "NaN"
if value in (float("inf"), float("-inf")):
return str(value)
text = f"{value:.{decimals}f}".rstrip("0").rstrip(".")
return text if text else "0"
return model._safe_str(value)
def _is_dict(v) -> bool:
return isinstance(v, dict)
# --------------------------------------------------------------------------- #
# Detection: which column is the time axis and which numeric columns to chart.
# --------------------------------------------------------------------------- #
def _detect(cols: list) -> dict:
"""Return ``{time_col, numeric_cols, ...}`` via the registry function.
Falls back to an inline scan (datetime inferred_type / datetime semantic
types) when ``detect_time_column`` is unavailable, so the chapter still works.
"""
if detect_time_column is not None:
try:
res = detect_time_column(cols)
if _is_dict(res):
return res
except Exception: # noqa: BLE001 — degrade to the inline scan.
pass
time_col = None
numeric_cols = []
for c in cols or []:
if not _is_dict(c):
continue
it = c.get("inferred_type")
sem = c.get("semantic_type")
if time_col is None and (
it == "datetime" or sem in ("datetime_iso", "date_eu")):
time_col = c.get("name")
if it == "numeric":
numeric_cols.append(c.get("name"))
return {"time_col": time_col, "numeric_cols": numeric_cols,
"time_semantic": "", "reason": "inline fallback"}
def _raw_series_for(raw: dict, col: str):
"""Return (t_list, v_list) for a column from the raw bundle, or (None, None)."""
if not _is_dict(raw):
return None, None
t = raw.get("t")
series = raw.get("series") if _is_dict(raw.get("series")) else {}
v = series.get(col)
if isinstance(t, list) and isinstance(v, list) and t and len(t) == len(v):
return t, v
return None, None
def _ohlc_groups(numeric_cols: list, raw: dict) -> dict:
"""Map each numeric column to a representative to collapse OHLC duplicates.
When several numeric columns are near-identical financial level series
(open/high/low/close/adj close), charting each one repeats the same figure
four times. We keep the first OHLC-looking column as the representative for
the *figures* and list the collapsed ones in a note; the textual analysis is
still produced for every column. Detection is by name only (cheap, no extra
data dependency) and conservative: only collapses when >=2 OHLC-like names
are present.
"""
ohlc = [c for c in numeric_cols
if isinstance(c, str) and any(h in c.lower() for h in _OHLC_HINTS)]
if len(ohlc) < 2:
return {}
representative = ohlc[0]
return {c: representative for c in ohlc if c != representative}
# --------------------------------------------------------------------------- #
# Datetime header (MUST-9.3): range / frequency / regularity / gaps.
# --------------------------------------------------------------------------- #
def _datetime_header(time_col: str, raw: dict) -> list:
"""Build the datetime profile header from the raw time axis, when present."""
blocks: list = []
t, _ = (raw.get("t"), None) if _is_dict(raw) else (None, None)
if not (isinstance(t, list) and t and profile_datetime is not None):
return blocks
try:
dt = profile_datetime(t)
except Exception: # noqa: BLE001
return blocks
if not _is_dict(dt):
return blocks
freq_gloss = {
"daily": "diaria", "weekly": "semanal", "monthly": "mensual",
"quarterly": "trimestral", "yearly": "anual",
"irregular": "irregular", "unknown": "indeterminada",
}
rows = [
("Columna de fecha", model._safe_str(time_col)),
("Rango", f"{model._safe_str(dt.get('min'))}"
f"{model._safe_str(dt.get('max'))}"),
("Observaciones", _fmt_num(dt.get("n"))),
("Fechas distintas", _fmt_num(dt.get("n_distinct"))),
("Frecuencia", freq_gloss.get(dt.get("freq"), model._safe_str(dt.get("freq")))),
("Regular", "" if dt.get("is_regular") else "no"),
]
span = dt.get("span_days")
if span is not None:
rows.append(("Duración (días)", _fmt_num(span, 1)))
n_gaps = dt.get("n_gaps")
if n_gaps is not None:
rows.append(("Huecos en la rejilla", _fmt_num(n_gaps)))
blocks.append(model.KVTable(rows=rows, title="Perfil temporal"))
note = dt.get("note")
if note:
blocks.append(model.Note(model._safe_str(note)))
return blocks
# --------------------------------------------------------------------------- #
# Figure builders (lazy: matplotlib only imported when the renderer draws them).
# --------------------------------------------------------------------------- #
def _parse_dates(labels: list):
"""Parse a list of ISO-ish strings/dates to datetime, dropping unparseable.
Returns (dates, kept_index) so callers can align the values list.
"""
from datetime import date, datetime
out = []
keep = []
for i, lab in enumerate(labels):
if isinstance(lab, datetime):
out.append(lab)
keep.append(i)
continue
if isinstance(lab, date):
out.append(datetime(lab.year, lab.month, lab.day))
keep.append(i)
continue
s = model._safe_str(lab).strip()
if not s:
continue
s2 = s.replace("T", " ")
parsed = None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
parsed = datetime.strptime(s2[:len(fmt) + 4] if False else s2, fmt)
break
except ValueError:
continue
if parsed is None:
try:
parsed = datetime.fromisoformat(s.replace("T", " "))
except ValueError:
continue
out.append(parsed)
keep.append(i)
return out, keep
def _make_evolution_figure(name: str, rs: dict):
"""Lazy callable: value-vs-time line + per-period row-count panel (MUST-9.1)."""
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
t_labels = rs.get("t") or []
v = rs.get("v") or []
counts = rs.get("count") or []
dates, keep = _parse_dates(t_labels)
vv = [v[i] if i < len(v) else None for i in keep]
cc = [counts[i] if i < len(counts) else 0 for i in keep]
fig, (ax_v, ax_c) = plt.subplots(
2, 1, figsize=(7.0, 4.6), sharex=True,
gridspec_kw={"height_ratios": [3.0, 1.2], "hspace": 0.12})
# Top: value aggregated per period (line; gaps where the value is None).
xs = [d for d, val in zip(dates, vv) if val is not None]
ys = [val for val in vv if val is not None]
if xs and ys:
ax_v.plot(xs, ys, color="#4e79a7", linewidth=1.4, zorder=3)
ax_v.fill_between(xs, ys, min(ys), color="#9ec6df", alpha=0.18,
zorder=1)
else:
ax_v.text(0.5, 0.5, "(sin valores numéricos)", ha="center",
va="center", fontsize=9, color="#8a8a8a",
transform=ax_v.transAxes)
ax_v.set_ylabel(name, fontsize=8)
ax_v.tick_params(labelsize=7)
ax_v.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax_v.spines[spine].set_visible(False)
# Bottom: number of observations per period (density / gaps).
if dates and cc:
# Bar width ~ median spacing so bars do not overlap nor leave gaps.
width = 1.0
if len(dates) > 1:
deltas = sorted((dates[i + 1] - dates[i]).days
for i in range(len(dates) - 1))
width = max(deltas[len(deltas) // 2] * 0.8, 1.0)
ax_c.bar(dates, cc, width=width, color="#59a14f", alpha=0.75,
align="center")
ax_c.set_ylabel("nº filas", fontsize=8)
ax_c.tick_params(labelsize=7)
ax_c.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax_c.spines[spine].set_visible(False)
ax_c.xaxis.set_major_locator(mdates.AutoDateLocator())
ax_c.xaxis.set_major_formatter(mdates.ConciseDateFormatter(
ax_c.xaxis.get_major_locator()))
freq = rs.get("freq")
suptitle = f"{name} — evolución temporal"
if freq:
suptitle += f" (agregado {freq})"
fig.suptitle(suptitle, fontsize=10, fontweight="bold", x=0.02, ha="left")
return fig
return _draw
def _make_stl_figure(stl: dict):
"""Lazy callable: the STL trend/seasonal/resid panels, or None if no values.
``stl_decompose`` only carries the component *values* for short series; for
long ones it returns just summary stats (``note``). In that case there is
nothing to plot and we return None (the caller renders the strengths as text).
"""
def _component_values(comp):
if _is_dict(comp):
vals = comp.get("values")
if isinstance(vals, list) and vals:
return [x for x in vals]
return None
trend = _component_values(stl.get("trend"))
seasonal = _component_values(stl.get("seasonal"))
resid = _component_values(stl.get("resid"))
if not any([trend, seasonal, resid]):
return None
def _draw():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
panels = [("Tendencia", trend, "#4e79a7"),
("Estacional", seasonal, "#59a14f"),
("Resto", resid, "#e15759")]
panels = [(lbl, vals, col) for lbl, vals, col in panels if vals]
fig, axes = plt.subplots(len(panels), 1, figsize=(7.0, 1.4 * len(panels) + 0.6),
sharex=True)
if len(panels) == 1:
axes = [axes]
for ax, (lbl, vals, col) in zip(axes, panels):
ax.plot(range(len(vals)), vals, color=col, linewidth=1.2)
ax.set_ylabel(lbl, fontsize=8)
ax.tick_params(labelsize=7)
ax.grid(axis="y", color="#eeeeee", linewidth=0.6)
for spine in ("top", "right"):
ax.spines[spine].set_visible(False)
axes[-1].set_xlabel("índice temporal", fontsize=8)
fig.suptitle("Descomposición STL", fontsize=10, fontweight="bold",
x=0.02, ha="left")
fig.tight_layout(rect=(0, 0, 1, 0.96))
return fig
return _draw
def _make_acf_figure(acf_pacf: dict):
"""Lazy callable: the ACF stem plot with ±1.96/√n bands, or None."""
acf = acf_pacf.get("acf")
n = acf_pacf.get("n")
if not (isinstance(acf, list) and len(acf) > 1 and isinstance(n, int) and n > 0):
return None
def _draw():
import math
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
lags = list(range(len(acf)))
fig, ax = plt.subplots(figsize=(7.0, 3.2))
ax.vlines(lags, 0, acf, color="#4e79a7", linewidth=1.4)
ax.plot(lags, acf, "o", color="#4e79a7", markersize=3)
band = 1.96 / math.sqrt(n)
ax.axhspan(-band, band, color="#cccccc", alpha=0.3,
label="banda ±1.96/√n (ruido blanco)")
ax.axhline(0, color="#888888", linewidth=0.8)
ax.set_xlabel("retardo (lag)", fontsize=8)
ax.set_ylabel("ACF", fontsize=8)
ax.tick_params(labelsize=7)
ax.legend(fontsize=7, loc="upper right", framealpha=0.85)
ax.set_title("Autocorrelación (ACF): lags fuera de la banda = "
"correlación significativa", fontsize=9)
fig.tight_layout()
return fig
return _draw
# --------------------------------------------------------------------------- #
# Per-column textual analysis from profile['series'][col].
# --------------------------------------------------------------------------- #
def _analysis_markdown(sblock: dict) -> str:
"""One markdown block summarizing stationarity / autocorrelation / STL."""
parts: list = []
stat = sblock.get("stationarity") if _is_dict(sblock.get("stationarity")) else {}
verdict = stat.get("verdict")
if verdict:
adf = stat.get("adf") if _is_dict(stat.get("adf")) else {}
kpss = stat.get("kpss") if _is_dict(stat.get("kpss")) else {}
line = (f"**Estacionariedad:** {_VERDICT_GLOSS.get(verdict, verdict)} "
f"(ADF p={_fmt_num(adf.get('p_value'), 4)}, "
f"KPSS p={_fmt_num(kpss.get('p_value'), 4)}).")
warning = stat.get("warning")
if warning:
line += f"{model._safe_str(warning)}"
parts.append(line)
acf = sblock.get("acf_pacf") if _is_dict(sblock.get("acf_pacf")) else {}
if acf:
is_auto = acf.get("is_autocorrelated")
lb = acf.get("ljung_box") if _is_dict(acf.get("ljung_box")) else {}
sig = acf.get("significant_acf_lags") or []
if is_auto is True:
ac_line = ("**Autocorrelación:** la serie está autocorrelada "
"(Ljung-Box rechaza independencia, "
f"p={_fmt_num(lb.get('p_value'), 4)}): los valores dependen "
"de su pasado, no es ruido blanco.")
if sig:
shown = ", ".join(str(x) for x in sig[:8])
more = "" if len(sig) > 8 else ""
ac_line += f" Lags significativos: {shown}{more}."
elif is_auto is False:
ac_line = ("**Autocorrelación:** no se detecta autocorrelación "
"significativa (compatible con ruido blanco, Ljung-Box "
f"p={_fmt_num(lb.get('p_value'), 4)}).")
else:
ac_line = "**Autocorrelación:** no evaluable (datos insuficientes)."
parts.append(ac_line)
stl = sblock.get("stl") if _is_dict(sblock.get("stl")) else {}
if stl:
ts = stl.get("trend_strength")
ss = stl.get("seasonal_strength")
if ts is not None or ss is not None:
parts.append(
"**Descomposición STL:** fuerza de tendencia "
f"{_fmt_num(ts, 2)} y fuerza estacional {_fmt_num(ss, 2)} "
"(escala 01 de Hyndman: cuanto más alto, más marcada la "
"componente).")
elif stl.get("note"):
parts.append(f"**Descomposición STL:** {model._safe_str(stl.get('note'))}")
if sblock.get("levels_suggested"):
reason = sblock.get("levels_reason")
kind = sblock.get("levels_kind")
tr = sblock.get("to_returns") if _is_dict(sblock.get("to_returns")) else None
line = "**Transformación sugerida:** "
line += "pasar a retornos" if kind == "returns" else "diferenciar la serie"
if reason:
line += f"{model._safe_str(reason)}"
if tr and tr.get("mean") is not None:
line += (f" (retornos: media {_fmt_num(tr.get('mean'), 5)}, "
f"σ {_fmt_num(tr.get('std'), 5)}).")
parts.append(line)
return "\n\n".join(parts)
# --------------------------------------------------------------------------- #
# Per-column section.
# --------------------------------------------------------------------------- #
def _column_section(name: str, sblock: dict, raw: dict, collapsed_into) -> list:
"""Blocks for one numeric column: evolution figure + STL + ACF + analysis."""
blocks = [model.Heading(text=model._safe_str(name), level=2)]
# --- Value-vs-time line + per-period row count (MUST-9.1). ---
drew_evolution = False
if collapsed_into is None: # skip the figure for collapsed OHLC duplicates.
t, v = _raw_series_for(raw, name)
if t is not None and resample_timeseries is not None:
try:
rs = resample_timeseries(t, v)
except Exception: # noqa: BLE001
rs = None
if _is_dict(rs) and rs.get("t"):
blocks.append(model.Figure(
make=_make_evolution_figure(name, rs),
caption=f"Evolución de «{name}» por periodo y nº de "
f"observaciones (conteo de filas)."))
drew_evolution = True
else:
blocks.append(model.Note(
f"Serie casi idéntica a «{collapsed_into}» (grupo OHLC): se omite el "
"gráfico para no repetirlo; el análisis estadístico se mantiene."))
if not drew_evolution and collapsed_into is None:
blocks.append(model.Note(
"Gráfico de evolución temporal no disponible: falta la serie cruda "
"(pásala en ctx['timeseries_raw'] = {time_col, t, series}). Se "
"muestra solo el análisis estadístico."))
# --- STL panels (MUST-9.2). ---
stl = sblock.get("stl") if _is_dict(sblock.get("stl")) else {}
if collapsed_into is None and stl:
stl_fig = _make_stl_figure(stl)
if stl_fig is not None:
blocks.append(model.Figure(
make=stl_fig,
caption=f"Descomposición STL de «{name}»: tendencia, componente "
f"estacional y resto."))
# --- ACF figure (autocorrelation structure). ---
acf = sblock.get("acf_pacf") if _is_dict(sblock.get("acf_pacf")) else {}
if collapsed_into is None and acf:
acf_fig = _make_acf_figure(acf)
if acf_fig is not None:
blocks.append(model.Figure(
make=acf_fig,
caption=f"Función de autocorrelación de «{name}»."))
# --- Textual analysis (always, even for collapsed duplicates). ---
analysis = _analysis_markdown(sblock)
if analysis:
blocks.append(model.Markdown(text=analysis))
return blocks
# --------------------------------------------------------------------------- #
# Entry point.
# --------------------------------------------------------------------------- #
def build_timeseries(profile: dict, ctx: dict):
"""Build the TIMESERIES Chapter, or ``None`` if the table has no date column.
Args:
profile: the ``eda`` group TableProfile dict.
ctx: presentation context; ``ctx['timeseries_raw']`` (optional) carries
the raw ordered series used to draw the value-vs-time line and the
per-period row count.
Returns:
A ``model.Chapter`` with, per numeric column, the value-vs-time evolution
+ row-count figure, the STL panels, the ACF figure and the statistical
analysis; or ``None`` when there is no temporal column (the chapter does
not apply).
"""
profile = profile or {}
if not _is_dict(profile):
profile = {}
ctx = ctx or {}
cols = profile.get("columns") or []
det = _detect(cols)
time_col = det.get("time_col")
if not time_col:
return None # no date/datetime column -> chapter does not apply.
numeric_cols = det.get("numeric_cols") or []
series_map = profile.get("series") if _is_dict(profile.get("series")) else {}
raw = ctx.get("timeseries_raw") or profile.get("timeseries_raw")
raw = raw if _is_dict(raw) else {}
# Which columns can the chapter say anything about: those with a series
# analysis block and/or a raw series to chart. Preserve the profile order.
chartable = []
for name in numeric_cols:
has_analysis = _is_dict(series_map.get(name))
has_raw, _ = _raw_series_for(raw, name)
if has_analysis or has_raw is not None:
chartable.append(name)
if not chartable:
# A date column exists but nothing numeric to chart/analyse: still a
# valid (small) chapter — show just the datetime header if we have it.
header = _datetime_header(time_col, raw)
if not header:
return None
intro = (
f"La tabla tiene una columna temporal («{time_col}») pero no hay "
"columnas numéricas con serie analizable.")
blocks = [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=intro)] + header
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
collapsed = _ohlc_groups(chartable, raw)
intro = (
"Este capítulo analiza la evolución de la tabla en el tiempo usando la "
f"columna de fecha «{time_col}». Para cada columna numérica se muestra su "
"**evolución por periodo** (valor agregado) junto al **número de filas por "
"periodo** (densidad de observaciones), su **descomposición STL** "
"(tendencia / estacionalidad / resto) y la **función de autocorrelación**; "
"debajo, el análisis de la serie: estacionariedad (ADF + KPSS), "
"autocorrelación (Ljung-Box) y, cuando procede, la transformación "
"sugerida (retornos o diferencias) para evitar correlaciones espurias.")
blocks = [model.Heading(text=CHAPTER_TITLE, level=1),
model.Markdown(text=intro)]
blocks += _datetime_header(time_col, raw)
if collapsed:
reps = sorted(set(collapsed.values()))
collapsed_names = ", ".join(sorted(collapsed.keys()))
blocks.append(model.Note(
f"Series OHLC casi idénticas detectadas ({collapsed_names}): se "
f"grafican consolidadas en «{', '.join(reps)}» para no repetir el "
"mismo gráfico; cada columna conserva su análisis estadístico."))
for name in chartable:
sblock = series_map.get(name) if _is_dict(series_map.get(name)) else {}
blocks += _column_section(name, sblock, raw, collapsed.get(name))
return model.Chapter(id=CHAPTER_ID, title=CHAPTER_TITLE,
version=CHAPTER_VERSION, blocks=blocks)
@@ -0,0 +1,244 @@
"""Tests for the TIMESERIES chapter — DoD: golden + edges + anti-cut.
Self-contained: builds synthetic ``series`` blocks (shaped like
``profile_table(run_series=True)`` output) and a raw ``timeseries_raw`` bundle,
with no DuckDB, so the suite is fast and deterministic. Verifies that the chapter:
- returns ``None`` when there is no date/datetime column (the user requirement);
- never raises on ``None``/empty/garbage input;
- with a date column + raw series emits, per numeric column, the value-vs-time +
row-count evolution figure, the STL panels, the ACF figure and the textual
analysis (stationarity / autocorrelation / suggested transform);
- collapses near-identical OHLC series into one chart while keeping every
column's analysis;
- renders without cutting anything in both PDF and PPTX (every column heading
survives in the rendered output).
"""
import math
import os
import re
import tempfile
from pypdf import PdfReader
from datascience.automatic_eda.chapters.timeseries import (
build_timeseries, CHAPTER_VERSION, _VERDICT_GLOSS,
)
from datascience.render_automatic_eda_pdf import render_automatic_eda_pdf
from datascience.render_automatic_eda_pptx import render_automatic_eda_pptx
# --------------------------------------------------------------------------- #
# Synthetic fixtures shaped like the real profile_table(run_series=True) output.
# --------------------------------------------------------------------------- #
def _dates(n: int) -> list:
"""n consecutive daily ISO date strings starting 2021-01-01."""
from datetime import date, timedelta
start = date(2021, 1, 1)
return [(start + timedelta(days=i)).isoformat() for i in range(n)]
def _series_block(n=120, verdict="non_stationary", autocorr=True, levels=True,
with_stl_values=True):
"""A synthetic ``series`` block like _build_series_block produces."""
trend = [float(i) for i in range(n)]
seasonal = [math.sin(i / 6.0) for i in range(n)]
resid = [0.1 * ((-1) ** i) for i in range(n)]
acf = [1.0] + [max(0.0, 0.9 - 0.05 * k) for k in range(1, 21)]
block = {
"order_col": "fecha",
"ordered": True,
"n": n,
"stationarity": {
"n": n, "verdict": verdict,
"adf": {"p_value": 0.42, "stationary": False},
"kpss": {"p_value": 0.01, "stationary": False},
"warning": ("serie no estacionaria: riesgo de correlación espuria"
if verdict != "stationary" else None),
},
"acf_pacf": {
"n": n, "nlags": 20, "acf": acf,
"significant_acf_lags": [1, 2, 3, 4, 5],
"ljung_box": {"stat": 123.4, "p_value": 0.0 if autocorr else 0.7,
"lags": 20},
"is_autocorrelated": autocorr,
},
"period_source": "datetime_freq",
"stl": {
"n": n, "period": 7, "period_inferred": False, "robust": False,
"trend": {"values": trend} if with_stl_values else {
"note": "serie larga: solo estadisticos", "mean": 60.0},
"seasonal": {"values": seasonal} if with_stl_values else {"mean": 0.0},
"resid": {"values": resid} if with_stl_values else {"mean": 0.0},
"trend_strength": 0.95, "seasonal_strength": 0.42,
},
}
if levels:
block["levels_suggested"] = True
block["levels_kind"] = "returns"
block["levels_reason"] = ("columna financiera no estacionaria: usar "
"retornos evita correlación espuria.")
block["to_returns"] = {"method": "log", "mean": 0.001, "std": 0.02}
else:
block["levels_suggested"] = False
return block
def _profile(numeric_names=("precio",), n=120, with_stl_values=True):
cols = [{"name": "fecha", "inferred_type": "datetime",
"semantic_type": "datetime_iso"}]
series_map = {}
for nm in numeric_names:
cols.append({"name": nm, "inferred_type": "numeric",
"numeric": {"min": 1.0, "max": 200.0, "mean": 100.0,
"median": 95.0, "std": 40.0}})
series_map[nm] = _series_block(n=n, with_stl_values=with_stl_values)
return {"table": "cotizaciones", "n_rows": n, "n_cols": len(cols),
"columns": cols, "series": series_map}
def _ctx_raw(numeric_names=("precio",), n=120):
t = _dates(n)
series = {}
for j, nm in enumerate(numeric_names):
series[nm] = [float(100 + i + 5 * j) for i in range(n)]
return {"timeseries_raw": {"time_col": "fecha", "t": t, "series": series}}
def _pdf_text(path: str) -> str:
txt = "".join((pg.extract_text() or "") for pg in PdfReader(path).pages)
return re.sub(r"\s+", " ", txt)
# --------------------------------------------------------------------------- #
# Golden.
# --------------------------------------------------------------------------- #
def test_golden_estructura_y_figuras():
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
assert ch is not None
assert ch.id == "timeseries"
assert ch.version == CHAPTER_VERSION
kinds = [b.kind for b in ch.blocks]
assert kinds[0] == "heading" # chapter title
assert kinds[1] == "markdown" # intro
assert "kv_table" in kinds # datetime profile header (MUST-9.3)
# Per column: evolution figure + STL figure + ACF figure + analysis markdown.
figs = [b for b in ch.blocks if b.kind == "figure"]
assert len(figs) >= 3, "evolución + STL + ACF esperadas"
# Lazy makers must produce real matplotlib figures.
import matplotlib.pyplot as plt
for f in figs:
fig = f.make()
assert fig is not None
plt.close(fig)
def test_golden_evolucion_tiene_dos_paneles_valor_y_conteo():
# MUST-9.1: the evolution figure has a value panel + a row-count panel.
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
figs = [b for b in ch.blocks if b.kind == "figure"]
import matplotlib.pyplot as plt
fig = figs[0].make() # first figure is the evolution one.
assert len(fig.axes) == 2, "panel de valor + panel de conteo de filas"
plt.close(fig)
def test_golden_analisis_textual_presente():
ch = build_timeseries(_profile(("precio",)), _ctx_raw(("precio",)))
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "Estacionariedad" in md
assert "Autocorrelación" in md
assert "STL" in md
# Verdict gloss surfaced for the non-stationary preset.
assert _VERDICT_GLOSS["non_stationary"].split(":")[0] in md
# Levels/returns suggestion surfaced.
assert "retornos" in md.lower()
# --------------------------------------------------------------------------- #
# Edges.
# --------------------------------------------------------------------------- #
def test_edge_sin_columna_fecha_devuelve_none():
prof = {"columns": [
{"name": "precio", "inferred_type": "numeric", "numeric": {"mean": 1.0}},
{"name": "ciudad", "inferred_type": "categorical",
"categorical": {"top": []}},
], "series": {"precio": _series_block()}}
assert build_timeseries(prof, {}) is None
def test_edge_none_y_vacio_no_revienta():
assert build_timeseries(None, None) is None
assert build_timeseries({}, {}) is None
assert build_timeseries({"columns": []}, {}) is None
# Date column but nothing numeric/series and no raw -> None (nothing to say).
assert build_timeseries(
{"columns": [{"name": "fecha", "inferred_type": "datetime"}]}, {}) is None
def test_edge_sin_raw_degrada_pero_mantiene_analisis():
# No ctx['timeseries_raw']: the chapter must still build (STL/ACF/analysis
# from the profile) and note that the evolution chart is unavailable.
ch = build_timeseries(_profile(("precio",)), {})
assert ch is not None
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "evolución temporal no disponible" in notes
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "Estacionariedad" in md
def test_edge_stl_solo_estadisticos_no_dibuja_panel_pero_no_revienta():
# Long series: STL carries only stats (no 'values') -> no STL figure, but the
# strengths still surface in the textual analysis.
ch = build_timeseries(_profile(("precio",), with_stl_values=False),
_ctx_raw(("precio",)))
assert ch is not None
md = " ".join(b.text for b in ch.blocks if b.kind == "markdown")
assert "STL" in md
# --------------------------------------------------------------------------- #
# OHLC consolidation (MUST-9.3).
# --------------------------------------------------------------------------- #
def test_ohlc_consolidacion():
names = ("Open", "High", "Low", "Close")
ch = build_timeseries(_profile(names), _ctx_raw(names))
assert ch is not None
notes = " ".join(b.text for b in ch.blocks if b.kind == "note")
assert "OHLC" in notes
# Only the representative draws the evolution figure; the other 3 are collapsed
# so there are fewer evolution figures than columns.
captions = [b.caption or "" for b in ch.blocks if b.kind == "figure"]
evo = [c for c in captions if "Evolución" in c]
assert len(evo) < len(names), "las series OHLC deben consolidarse"
# Every column still has its analysis markdown (one heading per column).
headings = [b.text for b in ch.blocks if b.kind == "heading" and b.level == 2]
for nm in names:
assert nm in headings
# --------------------------------------------------------------------------- #
# Anti-cut: PDF + PPTX.
# --------------------------------------------------------------------------- #
def test_anti_corte_pdf_y_pptx():
names = tuple(f"serie_{i}" for i in range(6))
prof = _profile(names, n=90)
ctx = _ctx_raw(names, n=90)
ch = build_timeseries(prof, ctx)
col_headings = [b.text for b in ch.blocks if b.kind == "heading" and b.level == 2]
assert len(col_headings) == 6
with tempfile.TemporaryDirectory() as d:
pdf = os.path.join(d, "ts.pdf")
res_pdf = render_automatic_eda_pdf(
prof, pdf, {"ctx": ctx, "write_manifest": False})
assert res_pdf["path"] == pdf
txt = _pdf_text(pdf)
for nm in col_headings:
assert nm in txt, f"columna '{nm}' cortada/ausente en el PDF"
pptx = os.path.join(d, "ts.pptx")
res_pptx = render_automatic_eda_pptx(
prof, pptx, {"ctx": ctx, "write_manifest": False})
assert res_pptx["path"] == pptx
assert res_pptx["n_slides"] >= 6