Files
fn_registry/python/functions/datascience/vault_csv_profile.py
T
egutierrez e3c8979e8d chore: auto-commit (95 archivos)
- cmd/fn/doctor.go
- cmd/fn/main.go
- cpp/apps/primitives_gallery/playground/tables/CMakeLists.txt
- cpp/apps/primitives_gallery/playground/tables/data_table.cpp
- cpp/apps/primitives_gallery/playground/tables/data_table_logic.cpp
- cpp/apps/primitives_gallery/playground/tables/data_table_logic.h
- cpp/apps/primitives_gallery/playground/tables/self_test.cpp
- cpp/apps/primitives_gallery/playground/tables/tql.cpp
- cpp/apps/primitives_gallery/playground/tables/viz.cpp
- cpp/apps/primitives_gallery/playground/tables/viz.h
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 00:50:34 +02:00

217 lines
7.2 KiB
Python

"""vault_csv_profile — Perfila un CSV del vault y persiste metadata en vault_index.db."""
from __future__ import annotations
import sqlite3
import time
from pathlib import Path
def _detect_encoding(path: Path) -> str:
"""Detecta encoding del archivo con chardet o por intentos."""
try:
import chardet
with open(path, "rb") as f:
raw = f.read(min(65536, path.stat().st_size))
result = chardet.detect(raw)
if result and result.get("encoding") and result.get("confidence", 0) >= 0.6:
return result["encoding"]
except Exception:
pass
for enc in ("utf-8-sig", "utf-8", "latin-1", "cp1252"):
try:
with open(path, encoding=enc) as f:
f.read(4096)
return enc
except (UnicodeDecodeError, LookupError):
continue
return "utf-8?"
def _read_with_polars(path: Path, encoding: str) -> tuple[list[dict], int]:
"""Lee CSV con polars. Retorna (cols, n_rows)."""
import polars as pl
enc = encoding.rstrip("?").replace("utf-8-sig", "utf8").replace("utf-8", "utf8")
if enc not in ("utf8", "utf-8"):
enc = "utf8"
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=1000)
schema = lf.collect_schema()
cols = [{"name": name, "dtype": str(dtype)} for name, dtype in schema.items()]
n_rows = lf.select(pl.len()).collect().item()
return cols, n_rows
def _read_with_pandas(path: Path, encoding: str) -> tuple[list[dict], int]:
"""Fallback: lee CSV con pandas."""
import pandas as pd
enc = encoding.rstrip("?") or "utf-8"
df = pd.read_csv(path, encoding=enc, encoding_errors="replace", nrows=None)
cols = [{"name": col, "dtype": str(df[col].dtype)} for col in df.columns]
n_rows = len(df)
return cols, n_rows
def _detect_dates(path: Path, encoding: str) -> tuple[str | None, str | None]:
"""Intenta detectar columna de fecha y retorna (date_min, date_max) en ISO."""
try:
import polars as pl
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=0)
schema = lf.collect_schema()
df = lf.collect()
for col_name, dtype in schema.items():
if "Date" in str(dtype) or "Datetime" in str(dtype):
series = df[col_name].drop_nulls()
if len(series) > 0:
mn = series.min()
mx = series.max()
return str(mn)[:10], str(mx)[:10]
# Intenta parsear columnas string como fecha
for col_name, dtype in schema.items():
if "Utf8" not in str(dtype) and "String" not in str(dtype):
continue
series = df[col_name].drop_nulls()
if len(series) == 0:
continue
try:
parsed = series.str.to_date(strict=False)
valid = parsed.drop_nulls()
if len(valid) / max(len(series), 1) >= 0.8:
mn = valid.min()
mx = valid.max()
return str(mn)[:10], str(mx)[:10]
except Exception:
continue
except Exception:
pass
return None, None
def _build_fts_text(path: Path, cols: list[dict], encoding: str) -> str:
"""Construye content_text para files_fts: nombres de cols + primeras 5 filas."""
col_names = " ".join(c["name"] for c in cols)
try:
import polars as pl
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True)
sample = lf.head(5).collect()
rows_text = " ".join(
" ".join(str(v) for v in row) for row in sample.iter_rows()
)
return f"{col_names} {rows_text}".strip()
except Exception:
pass
return col_names
def vault_csv_profile(
vault_path: str,
rel_path: str,
db_path: str | None = None,
) -> dict:
"""Perfila un CSV del vault: schema, n_rows, encoding, fechas; persiste en vault_index.db.
Args:
vault_path: Ruta absoluta a la raiz del vault.
rel_path: Ruta relativa al CSV dentro del vault.
db_path: Override de la ruta a vault_index.db. Por defecto <vault_path>/vault_index.db.
Returns:
Dict con rel_path, cols, n_rows, encoding, date_min, date_max, persisted.
Raises:
RuntimeError: Si el archivo no existe o no se puede leer.
"""
vault = Path(vault_path)
csv_file = vault / rel_path
if not csv_file.exists():
raise RuntimeError(f"vault_csv_profile: archivo no encontrado: {csv_file}")
db = Path(db_path) if db_path else vault / "vault_index.db"
# Resultado por defecto para CSV vacío
result: dict = {
"rel_path": rel_path,
"cols": [],
"n_rows": 0,
"encoding": "utf-8",
"date_min": None,
"date_max": None,
"persisted": False,
}
# Detectar encoding
encoding = _detect_encoding(csv_file)
result["encoding"] = encoding
# Leer schema y n_rows — short-circuit para archivos vacíos
if csv_file.stat().st_size == 0:
cols, n_rows = [], 0
else:
try:
cols, n_rows = _read_with_polars(csv_file, encoding)
except Exception:
try:
cols, n_rows = _read_with_pandas(csv_file, encoding)
except Exception as exc:
raise RuntimeError(f"vault_csv_profile: no se pudo leer {rel_path}: {exc}") from exc
result["cols"] = cols
result["n_rows"] = n_rows
# Detección de fechas (solo si hay filas)
if n_rows > 0 and cols:
date_min, date_max = _detect_dates(csv_file, encoding)
result["date_min"] = date_min
result["date_max"] = date_max
# Construir texto para FTS
fts_text = _build_fts_text(csv_file, cols, encoding) if cols else ""
# Persistir en vault_index.db
if db.exists():
conn = sqlite3.connect(str(db))
try:
cols_json = __import__("json").dumps(cols)
now = int(time.time())
conn.execute(
"""
INSERT INTO csv_profiles(rel_path, cols_json, n_rows, encoding, date_min, date_max, profiled_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(rel_path) DO UPDATE SET
cols_json=excluded.cols_json,
n_rows=excluded.n_rows,
encoding=excluded.encoding,
date_min=excluded.date_min,
date_max=excluded.date_max,
profiled_at=excluded.profiled_at
""",
(rel_path, cols_json, n_rows, encoding, result["date_min"], result["date_max"], now),
)
# Actualizar files_fts (rowid debe coincidir con files)
conn.execute("DELETE FROM files_fts WHERE rel_path = ?", (rel_path,))
conn.execute(
"""
INSERT INTO files_fts(rowid, rel_path, content_text)
VALUES ((SELECT rowid FROM files WHERE rel_path = ?), ?, ?)
""",
(rel_path, rel_path, fts_text),
)
conn.commit()
result["persisted"] = True
except Exception:
conn.rollback()
raise
finally:
conn.close()
return result