"""vault_csv_profile — Perfila un CSV del vault y persiste metadata en vault_index.db.""" from __future__ import annotations import sqlite3 import time from pathlib import Path def _detect_encoding(path: Path) -> str: """Detecta encoding del archivo con chardet o por intentos.""" try: import chardet with open(path, "rb") as f: raw = f.read(min(65536, path.stat().st_size)) result = chardet.detect(raw) if result and result.get("encoding") and result.get("confidence", 0) >= 0.6: return result["encoding"] except Exception: pass for enc in ("utf-8-sig", "utf-8", "latin-1", "cp1252"): try: with open(path, encoding=enc) as f: f.read(4096) return enc except (UnicodeDecodeError, LookupError): continue return "utf-8?" def _read_with_polars(path: Path, encoding: str) -> tuple[list[dict], int]: """Lee CSV con polars. Retorna (cols, n_rows).""" import polars as pl enc = encoding.rstrip("?").replace("utf-8-sig", "utf8").replace("utf-8", "utf8") if enc not in ("utf8", "utf-8"): enc = "utf8" lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=1000) schema = lf.collect_schema() cols = [{"name": name, "dtype": str(dtype)} for name, dtype in schema.items()] n_rows = lf.select(pl.len()).collect().item() return cols, n_rows def _read_with_pandas(path: Path, encoding: str) -> tuple[list[dict], int]: """Fallback: lee CSV con pandas.""" import pandas as pd enc = encoding.rstrip("?") or "utf-8" df = pd.read_csv(path, encoding=enc, encoding_errors="replace", nrows=None) cols = [{"name": col, "dtype": str(df[col].dtype)} for col in df.columns] n_rows = len(df) return cols, n_rows def _detect_dates(path: Path, encoding: str) -> tuple[str | None, str | None]: """Intenta detectar columna de fecha y retorna (date_min, date_max) en ISO.""" try: import polars as pl lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=0) schema = lf.collect_schema() df = lf.collect() for col_name, dtype in schema.items(): if "Date" in str(dtype) or "Datetime" in str(dtype): series = df[col_name].drop_nulls() if len(series) > 0: mn = series.min() mx = series.max() return str(mn)[:10], str(mx)[:10] # Intenta parsear columnas string como fecha for col_name, dtype in schema.items(): if "Utf8" not in str(dtype) and "String" not in str(dtype): continue series = df[col_name].drop_nulls() if len(series) == 0: continue try: parsed = series.str.to_date(strict=False) valid = parsed.drop_nulls() if len(valid) / max(len(series), 1) >= 0.8: mn = valid.min() mx = valid.max() return str(mn)[:10], str(mx)[:10] except Exception: continue except Exception: pass return None, None def _build_fts_text(path: Path, cols: list[dict], encoding: str) -> str: """Construye content_text para files_fts: nombres de cols + primeras 5 filas.""" col_names = " ".join(c["name"] for c in cols) try: import polars as pl lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True) sample = lf.head(5).collect() rows_text = " ".join( " ".join(str(v) for v in row) for row in sample.iter_rows() ) return f"{col_names} {rows_text}".strip() except Exception: pass return col_names def vault_csv_profile( vault_path: str, rel_path: str, db_path: str | None = None, ) -> dict: """Perfila un CSV del vault: schema, n_rows, encoding, fechas; persiste en vault_index.db. Args: vault_path: Ruta absoluta a la raiz del vault. rel_path: Ruta relativa al CSV dentro del vault. db_path: Override de la ruta a vault_index.db. Por defecto /vault_index.db. Returns: Dict con rel_path, cols, n_rows, encoding, date_min, date_max, persisted. Raises: RuntimeError: Si el archivo no existe o no se puede leer. """ vault = Path(vault_path) csv_file = vault / rel_path if not csv_file.exists(): raise RuntimeError(f"vault_csv_profile: archivo no encontrado: {csv_file}") db = Path(db_path) if db_path else vault / "vault_index.db" # Resultado por defecto para CSV vacío result: dict = { "rel_path": rel_path, "cols": [], "n_rows": 0, "encoding": "utf-8", "date_min": None, "date_max": None, "persisted": False, } # Detectar encoding encoding = _detect_encoding(csv_file) result["encoding"] = encoding # Leer schema y n_rows — short-circuit para archivos vacíos if csv_file.stat().st_size == 0: cols, n_rows = [], 0 else: try: cols, n_rows = _read_with_polars(csv_file, encoding) except Exception: try: cols, n_rows = _read_with_pandas(csv_file, encoding) except Exception as exc: raise RuntimeError(f"vault_csv_profile: no se pudo leer {rel_path}: {exc}") from exc result["cols"] = cols result["n_rows"] = n_rows # Detección de fechas (solo si hay filas) if n_rows > 0 and cols: date_min, date_max = _detect_dates(csv_file, encoding) result["date_min"] = date_min result["date_max"] = date_max # Construir texto para FTS fts_text = _build_fts_text(csv_file, cols, encoding) if cols else "" # Persistir en vault_index.db if db.exists(): conn = sqlite3.connect(str(db)) try: cols_json = __import__("json").dumps(cols) now = int(time.time()) conn.execute( """ INSERT INTO csv_profiles(rel_path, cols_json, n_rows, encoding, date_min, date_max, profiled_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(rel_path) DO UPDATE SET cols_json=excluded.cols_json, n_rows=excluded.n_rows, encoding=excluded.encoding, date_min=excluded.date_min, date_max=excluded.date_max, profiled_at=excluded.profiled_at """, (rel_path, cols_json, n_rows, encoding, result["date_min"], result["date_max"], now), ) # Actualizar files_fts (rowid debe coincidir con files) conn.execute("DELETE FROM files_fts WHERE rel_path = ?", (rel_path,)) conn.execute( """ INSERT INTO files_fts(rowid, rel_path, content_text) VALUES ((SELECT rowid FROM files WHERE rel_path = ?), ?, ?) """, (rel_path, rel_path, fts_text), ) conn.commit() result["persisted"] = True except Exception: conn.rollback() raise finally: conn.close() return result