e3c8979e8d
- cmd/fn/doctor.go - cmd/fn/main.go - cpp/apps/primitives_gallery/playground/tables/CMakeLists.txt - cpp/apps/primitives_gallery/playground/tables/data_table.cpp - cpp/apps/primitives_gallery/playground/tables/data_table_logic.cpp - cpp/apps/primitives_gallery/playground/tables/data_table_logic.h - cpp/apps/primitives_gallery/playground/tables/self_test.cpp - cpp/apps/primitives_gallery/playground/tables/tql.cpp - cpp/apps/primitives_gallery/playground/tables/viz.cpp - cpp/apps/primitives_gallery/playground/tables/viz.h - ... Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
155 lines
4.7 KiB
Go
155 lines
4.7 KiB
Go
package infra
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// WriteReport summarises the outcome of a VaultIndexWrite call.
|
|
type WriteReport struct {
|
|
Inserted int // rows newly inserted into files
|
|
Updated int // rows updated (upserted) in files
|
|
Pruned int // rows deleted from files (only when prune=true)
|
|
FTS int // rows inserted into files_fts
|
|
}
|
|
|
|
// VaultIndexWrite upserts a slice of VaultFile into the vault_index.db opened
|
|
// as db, updates the files_fts FTS5 table, and optionally prunes stale rows.
|
|
//
|
|
// All changes run inside a single transaction.
|
|
//
|
|
// Counting strategy: the set of rel_paths already in the DB is read before the
|
|
// loop. An upsert is counted as Inserted if the rel_path was absent, Updated if
|
|
// it was present. This avoids N+1 queries while remaining correct.
|
|
//
|
|
// FTS5: all affected rows are deleted and re-inserted with rel_path and empty
|
|
// content_text. Downstream profilers (csv_profiles, pdf_extracts, knowledge_docs)
|
|
// are responsible for populating content_text with meaningful text.
|
|
//
|
|
// Prune: if prune=true, every row in files whose rel_path is NOT in the provided
|
|
// slice is deleted. Cascades to csv_profiles, pdf_extracts, knowledge_docs via FK.
|
|
func VaultIndexWrite(db *sql.DB, files []VaultFile, prune bool) (WriteReport, error) {
|
|
var report WriteReport
|
|
if len(files) == 0 && !prune {
|
|
return report, nil
|
|
}
|
|
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: begin tx: %w", err)
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
tx.Rollback() //nolint:errcheck
|
|
}
|
|
}()
|
|
|
|
// Load existing rel_paths into a set to distinguish insert vs update.
|
|
existing := make(map[string]struct{})
|
|
rows, err := tx.Query(`SELECT rel_path FROM files`)
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: query existing: %w", err)
|
|
}
|
|
for rows.Next() {
|
|
var rp string
|
|
if err := rows.Scan(&rp); err != nil {
|
|
rows.Close()
|
|
return report, fmt.Errorf("vault_index_write: scan existing: %w", err)
|
|
}
|
|
existing[rp] = struct{}{}
|
|
}
|
|
rows.Close()
|
|
if err := rows.Err(); err != nil {
|
|
return report, fmt.Errorf("vault_index_write: rows err: %w", err)
|
|
}
|
|
|
|
now := time.Now().Unix()
|
|
|
|
upsertStmt, err := tx.Prepare(`
|
|
INSERT INTO files (rel_path, size, mtime, sha256, mime, ext, bucket, sub_bucket, indexed_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(rel_path) DO UPDATE SET
|
|
size = excluded.size,
|
|
mtime = excluded.mtime,
|
|
sha256 = excluded.sha256,
|
|
mime = excluded.mime,
|
|
ext = excluded.ext,
|
|
bucket = excluded.bucket,
|
|
sub_bucket = excluded.sub_bucket,
|
|
indexed_at = excluded.indexed_at
|
|
`)
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: prepare upsert: %w", err)
|
|
}
|
|
defer upsertStmt.Close()
|
|
|
|
ftsDeleteStmt, err := tx.Prepare(`DELETE FROM files_fts WHERE rel_path = ?`)
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: prepare fts delete: %w", err)
|
|
}
|
|
defer ftsDeleteStmt.Close()
|
|
|
|
ftsInsertStmt, err := tx.Prepare(`INSERT INTO files_fts(rel_path, content_text) VALUES (?, '')`)
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: prepare fts insert: %w", err)
|
|
}
|
|
defer ftsInsertStmt.Close()
|
|
|
|
for _, f := range files {
|
|
_, err = upsertStmt.Exec(
|
|
f.RelPath, f.Size, f.Mtime, f.Sha256,
|
|
f.Mime, f.Ext, f.Bucket, f.SubBucket, now,
|
|
)
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: upsert %q: %w", f.RelPath, err)
|
|
}
|
|
|
|
if _, wasExisting := existing[f.RelPath]; wasExisting {
|
|
report.Updated++
|
|
} else {
|
|
report.Inserted++
|
|
}
|
|
|
|
// Refresh FTS row.
|
|
if _, err = ftsDeleteStmt.Exec(f.RelPath); err != nil {
|
|
return report, fmt.Errorf("vault_index_write: fts delete %q: %w", f.RelPath, err)
|
|
}
|
|
if _, err = ftsInsertStmt.Exec(f.RelPath); err != nil {
|
|
return report, fmt.Errorf("vault_index_write: fts insert %q: %w", f.RelPath, err)
|
|
}
|
|
report.FTS++
|
|
}
|
|
|
|
// Prune rows not present in the incoming slice.
|
|
if prune && len(files) > 0 {
|
|
keep := make([]string, len(files))
|
|
for i, f := range files {
|
|
keep[i] = "'" + strings.ReplaceAll(f.RelPath, "'", "''") + "'"
|
|
}
|
|
inClause := strings.Join(keep, ",")
|
|
res, err := tx.Exec(fmt.Sprintf(
|
|
`DELETE FROM files WHERE rel_path NOT IN (%s)`, inClause,
|
|
))
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: prune: %w", err)
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
report.Pruned = int(n)
|
|
} else if prune && len(files) == 0 {
|
|
// prune=true with empty slice means delete everything.
|
|
res, err := tx.Exec(`DELETE FROM files`)
|
|
if err != nil {
|
|
return report, fmt.Errorf("vault_index_write: prune all: %w", err)
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
report.Pruned = int(n)
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
return report, fmt.Errorf("vault_index_write: commit: %w", err)
|
|
}
|
|
return report, nil
|
|
}
|