chore: auto-commit (95 archivos)

- cmd/fn/doctor.go
- cmd/fn/main.go
- cpp/apps/primitives_gallery/playground/tables/CMakeLists.txt
- cpp/apps/primitives_gallery/playground/tables/data_table.cpp
- cpp/apps/primitives_gallery/playground/tables/data_table_logic.cpp
- cpp/apps/primitives_gallery/playground/tables/data_table_logic.h
- cpp/apps/primitives_gallery/playground/tables/self_test.cpp
- cpp/apps/primitives_gallery/playground/tables/tql.cpp
- cpp/apps/primitives_gallery/playground/tables/viz.cpp
- cpp/apps/primitives_gallery/playground/tables/viz.h
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 00:50:34 +02:00
parent a2bbf23374
commit e3c8979e8d
189 changed files with 18964 additions and 330 deletions
@@ -0,0 +1,161 @@
"""Tests para vault_csv_profile."""
from __future__ import annotations
import os
import sqlite3
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from vault_csv_profile import vault_csv_profile
def _make_vault(tmp: Path) -> tuple[Path, Path]:
"""Crea un vault mínimo con vault_index.db y tabla files + files_fts + csv_profiles."""
db = tmp / "vault_index.db"
conn = sqlite3.connect(str(db))
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS files (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
rel_path TEXT UNIQUE NOT NULL,
size_bytes INTEGER,
ext TEXT
);
CREATE VIRTUAL TABLE IF NOT EXISTS files_fts
USING fts5(rel_path, content_text, content='', contentless_delete=1);
CREATE TABLE IF NOT EXISTS csv_profiles (
rel_path TEXT PRIMARY KEY,
cols_json TEXT,
n_rows INTEGER,
encoding TEXT,
date_min TEXT,
date_max TEXT,
profiled_at INTEGER
);
"""
)
conn.commit()
conn.close()
return tmp, db
def _insert_file_entry(db: Path, rel_path: str):
"""Inserta entrada en files para que files_fts tenga rowid válido."""
conn = sqlite3.connect(str(db))
conn.execute(
"INSERT OR IGNORE INTO files(rel_path, size_bytes, ext) VALUES (?, 0, '.csv')",
(rel_path,),
)
conn.commit()
conn.close()
def test_csv_basic(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "data/basic.csv"
csv_file = vault / rel
csv_file.parent.mkdir(parents=True, exist_ok=True)
csv_file.write_text("nombre,edad,score\nAna,30,9.5\nBob,25,8.0\nCarla,35,7.5\n", encoding="utf-8")
_insert_file_entry(db, rel)
result = vault_csv_profile(str(vault), rel, db_path=str(db))
assert result["rel_path"] == rel
assert result["n_rows"] == 3
assert len(result["cols"]) == 3
col_names = [c["name"] for c in result["cols"]]
assert "nombre" in col_names
assert "edad" in col_names
assert "score" in col_names
assert result["persisted"] is True
# Verificar persistencia en csv_profiles
conn = sqlite3.connect(str(db))
row = conn.execute("SELECT n_rows FROM csv_profiles WHERE rel_path = ?", (rel,)).fetchone()
conn.close()
assert row is not None
assert row[0] == 3
def test_csv_date_detection(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "data/fechas.csv"
csv_file = vault / rel
csv_file.parent.mkdir(parents=True, exist_ok=True)
csv_file.write_text(
"fecha,valor\n2023-01-01,100\n2023-06-15,200\n2023-12-31,300\n",
encoding="utf-8",
)
_insert_file_entry(db, rel)
result = vault_csv_profile(str(vault), rel, db_path=str(db))
assert result["date_min"] is not None
assert result["date_max"] is not None
assert result["date_min"] <= "2023-01-01"
assert result["date_max"] >= "2023-12-31"
def test_csv_encoding_latin1(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "data/tildes.csv"
csv_file = vault / rel
csv_file.parent.mkdir(parents=True, exist_ok=True)
csv_file.write_bytes(
"ciudad,poblacion\nMálaga,500000\nCórdoba,320000\n".encode("latin-1")
)
_insert_file_entry(db, rel)
result = vault_csv_profile(str(vault), rel, db_path=str(db))
assert result["n_rows"] == 2
assert result["encoding"] != "utf-8?"
# encoding detectado (algún valor no vacío)
assert result["encoding"]
assert result["persisted"] is True
def test_csv_empty(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "data/empty.csv"
csv_file = vault / rel
csv_file.parent.mkdir(parents=True, exist_ok=True)
csv_file.write_text("", encoding="utf-8")
_insert_file_entry(db, rel)
result = vault_csv_profile(str(vault), rel, db_path=str(db))
assert result["n_rows"] == 0
assert result["cols"] == []
assert result["date_min"] is None
assert result["date_max"] is None
def test_csv_persists_fts(tmp_path):
"""FTS5 contentless: verifica que las columnas son buscables con MATCH."""
vault, db = _make_vault(tmp_path)
rel = "data/fts_test.csv"
csv_file = vault / rel
csv_file.parent.mkdir(parents=True, exist_ok=True)
csv_file.write_text("producto,precio\nManzana,1.5\nPera,2.0\n", encoding="utf-8")
_insert_file_entry(db, rel)
vault_csv_profile(str(vault), rel, db_path=str(db))
conn = sqlite3.connect(str(db))
# FTS5 contentless no permite SELECT directo — usar MATCH para verificar indexado
row_prod = conn.execute(
"SELECT rowid FROM files_fts WHERE files_fts MATCH 'producto'",
).fetchone()
row_prec = conn.execute(
"SELECT rowid FROM files_fts WHERE files_fts MATCH 'precio'",
).fetchone()
conn.close()
assert row_prod is not None, "FTS no encontró 'producto'"
assert row_prec is not None, "FTS no encontró 'precio'"
@@ -0,0 +1,147 @@
"""Tests para vault_pdf_extract."""
from __future__ import annotations
import os
import sqlite3
import sys
from pathlib import Path
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from vault_pdf_extract import vault_pdf_extract
def _make_vault(tmp: Path) -> tuple[Path, Path]:
"""Crea un vault mínimo con vault_index.db."""
db = tmp / "vault_index.db"
conn = sqlite3.connect(str(db))
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS files (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
rel_path TEXT UNIQUE NOT NULL,
size_bytes INTEGER,
ext TEXT
);
CREATE VIRTUAL TABLE IF NOT EXISTS files_fts
USING fts5(rel_path, content_text, content='', contentless_delete=1);
CREATE TABLE IF NOT EXISTS pdf_extracts (
rel_path TEXT PRIMARY KEY,
page_count INTEGER,
text_len INTEGER,
extracted_to TEXT,
extracted_at INTEGER
);
"""
)
conn.commit()
conn.close()
return tmp, db
def _insert_file_entry(db: Path, rel_path: str):
conn = sqlite3.connect(str(db))
conn.execute(
"INSERT OR IGNORE INTO files(rel_path, size_bytes, ext) VALUES (?, 0, '.pdf')",
(rel_path,),
)
conn.commit()
conn.close()
def _make_pdf(path: Path, text: str = "Hello vault PDF.\nPage two content."):
"""Crea un PDF mínimo con fitz para tests."""
import fitz
doc = fitz.open()
page = doc.new_page()
page.insert_text((72, 72), text)
doc.save(str(path))
doc.close()
def test_pdf_extract_basic(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "docs/test.pdf"
pdf = vault / rel
pdf.parent.mkdir(parents=True, exist_ok=True)
_make_pdf(pdf)
_insert_file_entry(db, rel)
result = vault_pdf_extract(str(vault), rel, db_path=str(db))
assert result["rel_path"] == rel
assert result["page_count"] >= 1
assert result["text_len"] > 0
assert result["persisted"] is True
conn = sqlite3.connect(str(db))
row = conn.execute("SELECT page_count, text_len FROM pdf_extracts WHERE rel_path=?", (rel,)).fetchone()
conn.close()
assert row is not None
assert row[0] >= 1
assert row[1] > 0
def test_pdf_dump_text_creates_file(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "docs/dump.pdf"
pdf = vault / rel
pdf.parent.mkdir(parents=True, exist_ok=True)
_make_pdf(pdf, "Contenido para dump a disco.")
_insert_file_entry(db, rel)
# Crear data/processed/ para que se use ese directorio
(vault / "data" / "processed").mkdir(parents=True, exist_ok=True)
result = vault_pdf_extract(str(vault), rel, db_path=str(db), dump_text=True)
assert result["extracted_to"] is not None
txt_path = vault / result["extracted_to"]
assert txt_path.exists()
assert txt_path.stat().st_size > 0
def test_pdf_no_dump(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "docs/nodump.pdf"
pdf = vault / rel
pdf.parent.mkdir(parents=True, exist_ok=True)
_make_pdf(pdf, "No se debe volcar a disco.")
_insert_file_entry(db, rel)
result = vault_pdf_extract(str(vault), rel, db_path=str(db), dump_text=False)
assert result["extracted_to"] is None
def test_pdf_persists_to_fts(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "docs/fts.pdf"
pdf = vault / rel
pdf.parent.mkdir(parents=True, exist_ok=True)
_make_pdf(pdf, "Texto especial para FTS xyzpdftest.")
_insert_file_entry(db, rel)
vault_pdf_extract(str(vault), rel, db_path=str(db), dump_text=False)
conn = sqlite3.connect(str(db))
# FTS5 contentless: no permite SELECT directo, usar MATCH
row = conn.execute(
"SELECT rowid FROM files_fts WHERE files_fts MATCH 'xyzpdftest'",
).fetchone()
conn.close()
assert row is not None, "FTS no encontró el texto del PDF"
def test_pdf_corrupt_errors(tmp_path):
vault, db = _make_vault(tmp_path)
rel = "docs/corrupt.pdf"
pdf = vault / rel
pdf.parent.mkdir(parents=True, exist_ok=True)
pdf.write_bytes(b"%PDF-1.4 garbage bytes \x00\x01\x02 not a real pdf")
_insert_file_entry(db, rel)
with pytest.raises(RuntimeError, match="corrupto|inválido|PDF"):
vault_pdf_extract(str(vault), rel, db_path=str(db))
@@ -0,0 +1,61 @@
---
name: vault_csv_profile
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def vault_csv_profile(vault_path: str, rel_path: str, db_path: str | None = None) -> dict"
description: "Perfila un CSV del vault: detecta encoding, lee schema con polars, extrae n_rows y columnas de fecha; persiste en csv_profiles y actualiza files_fts para búsqueda por contenido."
tags: [vault, csv, profiling, polars, encoding, datascience, fts]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [sqlite3, time, pathlib, json, polars, chardet]
params:
- name: vault_path
desc: "Ruta absoluta a la raiz del vault donde vive el CSV y vault_index.db."
- name: rel_path
desc: "Ruta relativa al CSV dentro del vault (ej. 'data/raw/ventas.csv')."
- name: db_path
desc: "Override opcional de la ruta a vault_index.db. Por defecto <vault_path>/vault_index.db."
output: "Dict con: rel_path (str), cols (list de {name, dtype}), n_rows (int), encoding (str), date_min/date_max (ISO yyyy-mm-dd o None), persisted (bool)."
tested: true
tests:
- "test_csv_basic"
- "test_csv_date_detection"
- "test_csv_encoding_latin1"
- "test_csv_empty"
- "test_csv_persists_fts"
test_file_path: "python/functions/datascience/tests/test_vault_csv_profile.py"
file_path: "python/functions/datascience/vault_csv_profile.py"
---
## Ejemplo
```python
from vault_csv_profile import vault_csv_profile
result = vault_csv_profile("/vaults/mi_vault", "data/raw/ventas.csv")
# {
# "rel_path": "data/raw/ventas.csv",
# "cols": [{"name": "fecha", "dtype": "String"}, {"name": "importe", "dtype": "Float64"}],
# "n_rows": 1500,
# "encoding": "utf-8",
# "date_min": "2023-01-01",
# "date_max": "2023-12-31",
# "persisted": True
# }
```
## Notas
- Usa polars (lazy scan) como motor principal; pandas como fallback.
- Detección de encoding: chardet con confianza >= 0.6, luego intentos utf-8-sig → utf-8 → latin-1 → cp1252.
- Detección de fechas: columnas Date/Datetime nativas de polars, o columnas String con ≥80% de valores parseables como fecha.
- El FTS text incluye nombres de columnas + primeras 5 filas concatenadas.
- Upsert en csv_profiles por rel_path; el rowid de files_fts se ancla al rowid de la tabla files para que vault_search funcione correctamente.
- Si vault_index.db no existe, la función retorna el dict sin intentar persistir (persisted=False).
- Dependencias: polars, chardet (ambas instaladas en python/.venv con uv add).
@@ -0,0 +1,216 @@
"""vault_csv_profile — Perfila un CSV del vault y persiste metadata en vault_index.db."""
from __future__ import annotations
import sqlite3
import time
from pathlib import Path
def _detect_encoding(path: Path) -> str:
"""Detecta encoding del archivo con chardet o por intentos."""
try:
import chardet
with open(path, "rb") as f:
raw = f.read(min(65536, path.stat().st_size))
result = chardet.detect(raw)
if result and result.get("encoding") and result.get("confidence", 0) >= 0.6:
return result["encoding"]
except Exception:
pass
for enc in ("utf-8-sig", "utf-8", "latin-1", "cp1252"):
try:
with open(path, encoding=enc) as f:
f.read(4096)
return enc
except (UnicodeDecodeError, LookupError):
continue
return "utf-8?"
def _read_with_polars(path: Path, encoding: str) -> tuple[list[dict], int]:
"""Lee CSV con polars. Retorna (cols, n_rows)."""
import polars as pl
enc = encoding.rstrip("?").replace("utf-8-sig", "utf8").replace("utf-8", "utf8")
if enc not in ("utf8", "utf-8"):
enc = "utf8"
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=1000)
schema = lf.collect_schema()
cols = [{"name": name, "dtype": str(dtype)} for name, dtype in schema.items()]
n_rows = lf.select(pl.len()).collect().item()
return cols, n_rows
def _read_with_pandas(path: Path, encoding: str) -> tuple[list[dict], int]:
"""Fallback: lee CSV con pandas."""
import pandas as pd
enc = encoding.rstrip("?") or "utf-8"
df = pd.read_csv(path, encoding=enc, encoding_errors="replace", nrows=None)
cols = [{"name": col, "dtype": str(df[col].dtype)} for col in df.columns]
n_rows = len(df)
return cols, n_rows
def _detect_dates(path: Path, encoding: str) -> tuple[str | None, str | None]:
"""Intenta detectar columna de fecha y retorna (date_min, date_max) en ISO."""
try:
import polars as pl
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True, infer_schema_length=0)
schema = lf.collect_schema()
df = lf.collect()
for col_name, dtype in schema.items():
if "Date" in str(dtype) or "Datetime" in str(dtype):
series = df[col_name].drop_nulls()
if len(series) > 0:
mn = series.min()
mx = series.max()
return str(mn)[:10], str(mx)[:10]
# Intenta parsear columnas string como fecha
for col_name, dtype in schema.items():
if "Utf8" not in str(dtype) and "String" not in str(dtype):
continue
series = df[col_name].drop_nulls()
if len(series) == 0:
continue
try:
parsed = series.str.to_date(strict=False)
valid = parsed.drop_nulls()
if len(valid) / max(len(series), 1) >= 0.8:
mn = valid.min()
mx = valid.max()
return str(mn)[:10], str(mx)[:10]
except Exception:
continue
except Exception:
pass
return None, None
def _build_fts_text(path: Path, cols: list[dict], encoding: str) -> str:
"""Construye content_text para files_fts: nombres de cols + primeras 5 filas."""
col_names = " ".join(c["name"] for c in cols)
try:
import polars as pl
lf = pl.scan_csv(path, encoding="utf8", ignore_errors=True)
sample = lf.head(5).collect()
rows_text = " ".join(
" ".join(str(v) for v in row) for row in sample.iter_rows()
)
return f"{col_names} {rows_text}".strip()
except Exception:
pass
return col_names
def vault_csv_profile(
vault_path: str,
rel_path: str,
db_path: str | None = None,
) -> dict:
"""Perfila un CSV del vault: schema, n_rows, encoding, fechas; persiste en vault_index.db.
Args:
vault_path: Ruta absoluta a la raiz del vault.
rel_path: Ruta relativa al CSV dentro del vault.
db_path: Override de la ruta a vault_index.db. Por defecto <vault_path>/vault_index.db.
Returns:
Dict con rel_path, cols, n_rows, encoding, date_min, date_max, persisted.
Raises:
RuntimeError: Si el archivo no existe o no se puede leer.
"""
vault = Path(vault_path)
csv_file = vault / rel_path
if not csv_file.exists():
raise RuntimeError(f"vault_csv_profile: archivo no encontrado: {csv_file}")
db = Path(db_path) if db_path else vault / "vault_index.db"
# Resultado por defecto para CSV vacío
result: dict = {
"rel_path": rel_path,
"cols": [],
"n_rows": 0,
"encoding": "utf-8",
"date_min": None,
"date_max": None,
"persisted": False,
}
# Detectar encoding
encoding = _detect_encoding(csv_file)
result["encoding"] = encoding
# Leer schema y n_rows — short-circuit para archivos vacíos
if csv_file.stat().st_size == 0:
cols, n_rows = [], 0
else:
try:
cols, n_rows = _read_with_polars(csv_file, encoding)
except Exception:
try:
cols, n_rows = _read_with_pandas(csv_file, encoding)
except Exception as exc:
raise RuntimeError(f"vault_csv_profile: no se pudo leer {rel_path}: {exc}") from exc
result["cols"] = cols
result["n_rows"] = n_rows
# Detección de fechas (solo si hay filas)
if n_rows > 0 and cols:
date_min, date_max = _detect_dates(csv_file, encoding)
result["date_min"] = date_min
result["date_max"] = date_max
# Construir texto para FTS
fts_text = _build_fts_text(csv_file, cols, encoding) if cols else ""
# Persistir en vault_index.db
if db.exists():
conn = sqlite3.connect(str(db))
try:
cols_json = __import__("json").dumps(cols)
now = int(time.time())
conn.execute(
"""
INSERT INTO csv_profiles(rel_path, cols_json, n_rows, encoding, date_min, date_max, profiled_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(rel_path) DO UPDATE SET
cols_json=excluded.cols_json,
n_rows=excluded.n_rows,
encoding=excluded.encoding,
date_min=excluded.date_min,
date_max=excluded.date_max,
profiled_at=excluded.profiled_at
""",
(rel_path, cols_json, n_rows, encoding, result["date_min"], result["date_max"], now),
)
# Actualizar files_fts (rowid debe coincidir con files)
conn.execute("DELETE FROM files_fts WHERE rel_path = ?", (rel_path,))
conn.execute(
"""
INSERT INTO files_fts(rowid, rel_path, content_text)
VALUES ((SELECT rowid FROM files WHERE rel_path = ?), ?, ?)
""",
(rel_path, rel_path, fts_text),
)
conn.commit()
result["persisted"] = True
except Exception:
conn.rollback()
raise
finally:
conn.close()
return result
@@ -0,0 +1,60 @@
---
name: vault_pdf_extract
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def vault_pdf_extract(vault_path: str, rel_path: str, db_path: str | None = None, dump_text: bool = True) -> dict"
description: "Extrae texto de un PDF del vault con PyMuPDF; persiste page_count y text_len en pdf_extracts; vuelca texto a .txt en data/processed/ o .vault_extracts/; actualiza files_fts para búsqueda por contenido."
tags: [vault, pdf, extract, pymupdf, fts, datascience]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [sqlite3, time, pathlib, fitz]
params:
- name: vault_path
desc: "Ruta absoluta a la raiz del vault donde vive el PDF y vault_index.db."
- name: rel_path
desc: "Ruta relativa al PDF dentro del vault (ej. 'docs/informe.pdf')."
- name: db_path
desc: "Override opcional de la ruta a vault_index.db. Por defecto <vault_path>/vault_index.db."
- name: dump_text
desc: "Si True (default), escribe el texto extraído a un .txt. La carpeta destino es data/processed/ si existe, si no .vault_extracts/."
output: "Dict con: rel_path (str), page_count (int), text_len (int), extracted_to (ruta relativa al .txt o None), persisted (bool)."
tested: true
tests:
- "test_pdf_extract_basic"
- "test_pdf_dump_text_creates_file"
- "test_pdf_no_dump"
- "test_pdf_persists_to_fts"
- "test_pdf_corrupt_errors"
test_file_path: "python/functions/datascience/tests/test_vault_pdf_extract.py"
file_path: "python/functions/datascience/vault_pdf_extract.py"
---
## Ejemplo
```python
from vault_pdf_extract import vault_pdf_extract
result = vault_pdf_extract("/vaults/mi_vault", "docs/informe_anual.pdf")
# {
# "rel_path": "docs/informe_anual.pdf",
# "page_count": 24,
# "text_len": 45210,
# "extracted_to": "data/processed/informe_anual.txt",
# "persisted": True
# }
```
## Notas
- Requiere PyMuPDF (paquete `pymupdf`, importado como `fitz`). Ya instalado en python/.venv.
- El texto se trunca a 10 MB antes de insertarlo en files_fts para evitar tablas FTS5 masivas.
- Layout de volcado: si `<vault_path>/data/processed/` existe, se usa; si no, se crea `<vault_path>/.vault_extracts/`.
- PDFs corruptos levantan RuntimeError con mensaje descriptivo.
- El rowid de files_fts se ancla al rowid de la tabla files (subquery) para que vault_search funcione correctamente.
- Si vault_index.db no existe, retorna el dict sin intentar persistir (persisted=False).
@@ -0,0 +1,121 @@
"""vault_pdf_extract — Extrae texto de un PDF del vault y persiste en vault_index.db."""
from __future__ import annotations
import sqlite3
import time
from pathlib import Path
def vault_pdf_extract(
vault_path: str,
rel_path: str,
db_path: str | None = None,
dump_text: bool = True,
) -> dict:
"""Extrae texto de un PDF del vault; persiste page_count, text_len y actualiza files_fts.
Args:
vault_path: Ruta absoluta a la raiz del vault.
rel_path: Ruta relativa al PDF dentro del vault.
db_path: Override opcional de la ruta a vault_index.db.
dump_text: Si True, escribe el texto extraído a un .txt en data/processed/ o .vault_extracts/.
Returns:
Dict con: rel_path, page_count, text_len, extracted_to (ruta relativa o None), persisted.
Raises:
RuntimeError: Si el PDF no existe, está corrupto o no se puede leer.
"""
try:
import fitz # PyMuPDF
except ImportError as exc:
raise RuntimeError(
"vault_pdf_extract requiere PyMuPDF. Instalar con: uv add pymupdf"
) from exc
vault = Path(vault_path)
pdf_file = vault / rel_path
if not pdf_file.exists():
raise RuntimeError(f"vault_pdf_extract: archivo no encontrado: {pdf_file}")
db = Path(db_path) if db_path else vault / "vault_index.db"
# Abrir PDF
try:
doc = fitz.open(str(pdf_file))
except Exception as exc:
raise RuntimeError(f"vault_pdf_extract: PDF corrupto o inválido ({rel_path}): {exc}") from exc
page_count = doc.page_count
text_parts: list[str] = []
for page in doc:
try:
text_parts.append(page.get_text())
except Exception:
text_parts.append("")
doc.close()
full_text = "\n".join(text_parts)
text_len = len(full_text)
# Truncar a 10 MB para FTS
_MAX_FTS = 10 * 1024 * 1024
fts_text = full_text[:_MAX_FTS]
# Dump text a disco
extracted_to: str | None = None
if dump_text and full_text.strip():
basename = Path(rel_path).stem
# Preferir data/processed/ si existe; si no, usar .vault_extracts/
processed_dir = vault / "data" / "processed"
if not processed_dir.exists():
processed_dir = vault / ".vault_extracts"
processed_dir.mkdir(parents=True, exist_ok=True)
txt_path = processed_dir / f"{basename}.txt"
txt_path.write_text(full_text, encoding="utf-8")
extracted_to = str(txt_path.relative_to(vault))
# Persistir en vault_index.db
persisted = False
if db.exists():
conn = sqlite3.connect(str(db))
try:
now = int(time.time())
conn.execute(
"""
INSERT INTO pdf_extracts(rel_path, page_count, text_len, extracted_to, extracted_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(rel_path) DO UPDATE SET
page_count=excluded.page_count,
text_len=excluded.text_len,
extracted_to=excluded.extracted_to,
extracted_at=excluded.extracted_at
""",
(rel_path, page_count, text_len, extracted_to, now),
)
# Actualizar files_fts (rowid debe coincidir con files)
conn.execute("DELETE FROM files_fts WHERE rel_path = ?", (rel_path,))
if fts_text.strip():
conn.execute(
"""
INSERT INTO files_fts(rowid, rel_path, content_text)
VALUES ((SELECT rowid FROM files WHERE rel_path = ?), ?, ?)
""",
(rel_path, rel_path, fts_text),
)
conn.commit()
persisted = True
except Exception:
conn.rollback()
raise
finally:
conn.close()
return {
"rel_path": rel_path,
"page_count": page_count,
"text_len": text_len,
"extracted_to": extracted_to,
"persisted": persisted,
}