e3c8979e8d
- cmd/fn/doctor.go - cmd/fn/main.go - cpp/apps/primitives_gallery/playground/tables/CMakeLists.txt - cpp/apps/primitives_gallery/playground/tables/data_table.cpp - cpp/apps/primitives_gallery/playground/tables/data_table_logic.cpp - cpp/apps/primitives_gallery/playground/tables/data_table_logic.h - cpp/apps/primitives_gallery/playground/tables/self_test.cpp - cpp/apps/primitives_gallery/playground/tables/tql.cpp - cpp/apps/primitives_gallery/playground/tables/viz.cpp - cpp/apps/primitives_gallery/playground/tables/viz.h - ... Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
217 lines
7.2 KiB
Python
217 lines
7.2 KiB
Python
"""vault_csv_profile — Perfila un CSV del vault y persiste metadata en vault_index.db."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
def _detect_encoding(path: Path) -> str:
|
|
"""Detecta encoding del archivo con chardet o por intentos."""
|
|
try:
|
|
import chardet
|
|
|
|
with open(path, "rb") as f:
|
|
raw = f.read(min(65536, path.stat().st_size))
|
|
result = chardet.detect(raw)
|
|
if result and result.get("encoding") and result.get("confidence", 0) >= 0.6:
|
|
return result["encoding"]
|
|
except Exception:
|
|
pass
|
|
|
|
for enc in ("utf-8-sig", "utf-8", "latin-1", "cp1252"):
|
|
try:
|
|
with open(path, encoding=enc) as f:
|
|
f.read(4096)
|
|
return enc
|
|
except (UnicodeDecodeError, LookupError):
|
|
continue
|
|
|
|
return "utf-8?"
|
|
|
|
|
|
def _read_with_polars(path: Path, encoding: str) -> tuple[list[dict], int]:
|
|
"""Lee CSV con polars. Retorna (cols, n_rows)."""
|
|
import polars as pl
|
|
|
|
enc = encoding.rstrip("?").replace("utf-8-sig", "utf8").replace("utf-8", "utf8")
|
|
if enc not in ("utf8", "utf-8"):
|
|
enc = "utf8"
|
|
|
|
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=1000)
|
|
schema = lf.collect_schema()
|
|
cols = [{"name": name, "dtype": str(dtype)} for name, dtype in schema.items()]
|
|
n_rows = lf.select(pl.len()).collect().item()
|
|
return cols, n_rows
|
|
|
|
|
|
def _read_with_pandas(path: Path, encoding: str) -> tuple[list[dict], int]:
|
|
"""Fallback: lee CSV con pandas."""
|
|
import pandas as pd
|
|
|
|
enc = encoding.rstrip("?") or "utf-8"
|
|
df = pd.read_csv(path, encoding=enc, encoding_errors="replace", nrows=None)
|
|
cols = [{"name": col, "dtype": str(df[col].dtype)} for col in df.columns]
|
|
n_rows = len(df)
|
|
return cols, n_rows
|
|
|
|
|
|
def _detect_dates(path: Path, encoding: str) -> tuple[str | None, str | None]:
|
|
"""Intenta detectar columna de fecha y retorna (date_min, date_max) en ISO."""
|
|
try:
|
|
import polars as pl
|
|
|
|
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=0)
|
|
schema = lf.collect_schema()
|
|
df = lf.collect()
|
|
|
|
for col_name, dtype in schema.items():
|
|
if "Date" in str(dtype) or "Datetime" in str(dtype):
|
|
series = df[col_name].drop_nulls()
|
|
if len(series) > 0:
|
|
mn = series.min()
|
|
mx = series.max()
|
|
return str(mn)[:10], str(mx)[:10]
|
|
|
|
# Intenta parsear columnas string como fecha
|
|
for col_name, dtype in schema.items():
|
|
if "Utf8" not in str(dtype) and "String" not in str(dtype):
|
|
continue
|
|
series = df[col_name].drop_nulls()
|
|
if len(series) == 0:
|
|
continue
|
|
try:
|
|
parsed = series.str.to_date(strict=False)
|
|
valid = parsed.drop_nulls()
|
|
if len(valid) / max(len(series), 1) >= 0.8:
|
|
mn = valid.min()
|
|
mx = valid.max()
|
|
return str(mn)[:10], str(mx)[:10]
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return None, None
|
|
|
|
|
|
def _build_fts_text(path: Path, cols: list[dict], encoding: str) -> str:
|
|
"""Construye content_text para files_fts: nombres de cols + primeras 5 filas."""
|
|
col_names = " ".join(c["name"] for c in cols)
|
|
try:
|
|
import polars as pl
|
|
|
|
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True)
|
|
sample = lf.head(5).collect()
|
|
rows_text = " ".join(
|
|
" ".join(str(v) for v in row) for row in sample.iter_rows()
|
|
)
|
|
return f"{col_names} {rows_text}".strip()
|
|
except Exception:
|
|
pass
|
|
return col_names
|
|
|
|
|
|
def vault_csv_profile(
|
|
vault_path: str,
|
|
rel_path: str,
|
|
db_path: str | None = None,
|
|
) -> dict:
|
|
"""Perfila un CSV del vault: schema, n_rows, encoding, fechas; persiste en vault_index.db.
|
|
|
|
Args:
|
|
vault_path: Ruta absoluta a la raiz del vault.
|
|
rel_path: Ruta relativa al CSV dentro del vault.
|
|
db_path: Override de la ruta a vault_index.db. Por defecto <vault_path>/vault_index.db.
|
|
|
|
Returns:
|
|
Dict con rel_path, cols, n_rows, encoding, date_min, date_max, persisted.
|
|
|
|
Raises:
|
|
RuntimeError: Si el archivo no existe o no se puede leer.
|
|
"""
|
|
vault = Path(vault_path)
|
|
csv_file = vault / rel_path
|
|
if not csv_file.exists():
|
|
raise RuntimeError(f"vault_csv_profile: archivo no encontrado: {csv_file}")
|
|
|
|
db = Path(db_path) if db_path else vault / "vault_index.db"
|
|
|
|
# Resultado por defecto para CSV vacío
|
|
result: dict = {
|
|
"rel_path": rel_path,
|
|
"cols": [],
|
|
"n_rows": 0,
|
|
"encoding": "utf-8",
|
|
"date_min": None,
|
|
"date_max": None,
|
|
"persisted": False,
|
|
}
|
|
|
|
# Detectar encoding
|
|
encoding = _detect_encoding(csv_file)
|
|
result["encoding"] = encoding
|
|
|
|
# Leer schema y n_rows — short-circuit para archivos vacíos
|
|
if csv_file.stat().st_size == 0:
|
|
cols, n_rows = [], 0
|
|
else:
|
|
try:
|
|
cols, n_rows = _read_with_polars(csv_file, encoding)
|
|
except Exception:
|
|
try:
|
|
cols, n_rows = _read_with_pandas(csv_file, encoding)
|
|
except Exception as exc:
|
|
raise RuntimeError(f"vault_csv_profile: no se pudo leer {rel_path}: {exc}") from exc
|
|
|
|
result["cols"] = cols
|
|
result["n_rows"] = n_rows
|
|
|
|
# Detección de fechas (solo si hay filas)
|
|
if n_rows > 0 and cols:
|
|
date_min, date_max = _detect_dates(csv_file, encoding)
|
|
result["date_min"] = date_min
|
|
result["date_max"] = date_max
|
|
|
|
# Construir texto para FTS
|
|
fts_text = _build_fts_text(csv_file, cols, encoding) if cols else ""
|
|
|
|
# Persistir en vault_index.db
|
|
if db.exists():
|
|
conn = sqlite3.connect(str(db))
|
|
try:
|
|
cols_json = __import__("json").dumps(cols)
|
|
now = int(time.time())
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO csv_profiles(rel_path, cols_json, n_rows, encoding, date_min, date_max, profiled_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(rel_path) DO UPDATE SET
|
|
cols_json=excluded.cols_json,
|
|
n_rows=excluded.n_rows,
|
|
encoding=excluded.encoding,
|
|
date_min=excluded.date_min,
|
|
date_max=excluded.date_max,
|
|
profiled_at=excluded.profiled_at
|
|
""",
|
|
(rel_path, cols_json, n_rows, encoding, result["date_min"], result["date_max"], now),
|
|
)
|
|
# Actualizar files_fts (rowid debe coincidir con files)
|
|
conn.execute("DELETE FROM files_fts WHERE rel_path = ?", (rel_path,))
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO files_fts(rowid, rel_path, content_text)
|
|
VALUES ((SELECT rowid FROM files WHERE rel_path = ?), ?, ?)
|
|
""",
|
|
(rel_path, rel_path, fts_text),
|
|
)
|
|
conn.commit()
|
|
result["persisted"] = True
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
return result
|