feat(infra): sqlite_apply_versioned_migrations + dedup fn_operations + registry

Promueve patron versionado (schema_migrations + tx por archivo) al registry
como sqlite_apply_versioned_migrations_go_infra. Migra fn_operations/migrate.go
y registry/migrate.go al consumirla. ~200 LOC duplicadas eliminadas.

- functions/infra/sqlite_apply_versioned_migrations.{go,md,_test.go}: nueva,
  5/5 tests pass. Generaliza fs.FS + dir param (fn_operations usaba embed.FS
  hardcoded). Distinta de sqlite_apply_migrations_go_infra (naive split-by-`;`,
  idempotent-by-error) — esta hace tracking explicito + transactions.
- fn_operations/migrate.go: 111 LOC -> 17. Wrapper sobre infra.ApplyVersionedMigrations.
- registry/migrate.go: idem. Mismo patron copy-paste, ahora unificado.

Smoke: ./fn ops init crea operations.db con schema_migrations poblada.
fn_operations + registry tests: PASS. fn index registra nueva fn (1091 total).
This commit is contained in:
2026-05-09 12:50:51 +02:00
parent f65178025d
commit 618c07a12c
5 changed files with 378 additions and 210 deletions
+6 -105
View File
@@ -3,115 +3,16 @@ package fn_operations
import (
"database/sql"
"embed"
"fmt"
"path"
"sort"
"strconv"
"strings"
"time"
"fn-registry/functions/infra"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
const migrationTableSQL = `
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL
);`
// migrate applies pending migrations to the database.
// migrate applies pending migrations to the database via the registry's
// sqlite_apply_versioned_migrations_go_infra (schema_migrations tracking
// + per-file transactions).
func migrate(conn *sql.DB) error {
if _, err := conn.Exec(migrationTableSQL); err != nil {
return fmt.Errorf("creating schema_migrations table: %w", err)
}
current, err := currentVersion(conn)
if err != nil {
return err
}
files, err := listMigrations()
if err != nil {
return err
}
for _, mf := range files {
if mf.version <= current {
continue
}
content, err := migrationsFS.ReadFile(path.Join("migrations", mf.filename))
if err != nil {
return fmt.Errorf("reading migration %s: %w", mf.filename, err)
}
tx, err := conn.Begin()
if err != nil {
return fmt.Errorf("beginning transaction for migration %d: %w", mf.version, err)
}
if _, err := tx.Exec(string(content)); err != nil {
tx.Rollback()
return fmt.Errorf("applying migration %s: %w", mf.filename, err)
}
if _, err := tx.Exec(
"INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)",
mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339),
); err != nil {
tx.Rollback()
return fmt.Errorf("recording migration %s: %w", mf.filename, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing migration %s: %w", mf.filename, err)
}
}
return nil
}
func currentVersion(conn *sql.DB) (int, error) {
var v int
err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v)
if err != nil {
return 0, fmt.Errorf("reading current schema version: %w", err)
}
return v, nil
}
type migrationFile struct {
version int
filename string
}
func listMigrations() ([]migrationFile, error) {
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
return nil, fmt.Errorf("reading migrations directory: %w", err)
}
var files []migrationFile
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") {
continue
}
parts := strings.SplitN(e.Name(), "_", 2)
if len(parts) < 2 {
continue
}
v, err := strconv.Atoi(parts[0])
if err != nil {
continue
}
files = append(files, migrationFile{version: v, filename: e.Name()})
}
sort.Slice(files, func(i, j int) bool {
return files[i].version < files[j].version
})
return files, nil
return infra.ApplyVersionedMigrations(conn, migrationsFS, "migrations")
}
@@ -0,0 +1,127 @@
package infra
import (
"database/sql"
"fmt"
"io/fs"
"path"
"sort"
"strconv"
"strings"
"time"
)
const createSchemaMigrationsTable = `
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL
);`
// ApplyVersionedMigrations applies pending SQLite migrations from fsys, tracking
// applied versions in a schema_migrations table. Each migration runs in its
// own transaction; on error the tx is rolled back and the function returns.
//
// Migration filenames must be NNN_name.sql (e.g. 001_init.sql,
// 002_add_users.sql). The numeric prefix is the version. Files without a
// numeric prefix or with non-.sql extensions are skipped.
//
// dir is the directory inside fsys containing the migrations (e.g.
// "migrations"). Idempotent: migrations whose version <= current are skipped.
func ApplyVersionedMigrations(conn *sql.DB, fsys fs.FS, dir string) error {
if _, err := conn.Exec(createSchemaMigrationsTable); err != nil {
return fmt.Errorf("apply_versioned_migrations: create schema_migrations: %w", err)
}
current, err := versionedMigrationsCurrentVersion(conn)
if err != nil {
return err
}
files, err := versionedMigrationsList(fsys, dir)
if err != nil {
return err
}
for _, mf := range files {
if mf.version <= current {
continue
}
content, err := fs.ReadFile(fsys, path.Join(dir, mf.filename))
if err != nil {
return fmt.Errorf("apply_versioned_migrations: read %s: %w", mf.filename, err)
}
tx, err := conn.Begin()
if err != nil {
return fmt.Errorf("apply_versioned_migrations: begin tx for %s: %w", mf.filename, err)
}
if _, err := tx.Exec(string(content)); err != nil {
tx.Rollback() //nolint:errcheck
return fmt.Errorf("apply_versioned_migrations: exec %s: %w", mf.filename, err)
}
if _, err := tx.Exec(
"INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)",
mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339),
); err != nil {
tx.Rollback() //nolint:errcheck
return fmt.Errorf("apply_versioned_migrations: record %s: %w", mf.filename, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("apply_versioned_migrations: commit %s: %w", mf.filename, err)
}
}
return nil
}
// versionedMigrationsCurrentVersion returns MAX(version) from schema_migrations,
// or 0 if the table is empty.
func versionedMigrationsCurrentVersion(conn *sql.DB) (int, error) {
var v int
err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v)
if err != nil {
return 0, fmt.Errorf("apply_versioned_migrations: read current version: %w", err)
}
return v, nil
}
type versionedMigrationFile struct {
version int
filename string
}
// versionedMigrationsList reads dir from fsys and returns .sql files with a
// numeric NNN_ prefix, sorted by version ascending.
func versionedMigrationsList(fsys fs.FS, dir string) ([]versionedMigrationFile, error) {
entries, err := fs.ReadDir(fsys, dir)
if err != nil {
return nil, fmt.Errorf("apply_versioned_migrations: read dir %q: %w", dir, err)
}
var files []versionedMigrationFile
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") {
continue
}
parts := strings.SplitN(e.Name(), "_", 2)
if len(parts) < 2 {
continue
}
v, err := strconv.Atoi(parts[0])
if err != nil {
continue
}
files = append(files, versionedMigrationFile{version: v, filename: e.Name()})
}
sort.Slice(files, func(i, j int) bool {
return files[i].version < files[j].version
})
return files, nil
}
@@ -0,0 +1,81 @@
---
name: sqlite_apply_versioned_migrations
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ApplyVersionedMigrations(conn *sql.DB, fsys fs.FS, dir string) error"
description: "Aplica migraciones SQLite pendientes desde un fs.FS con tracking explicito de versiones en schema_migrations. Cada migracion corre en su propia transaccion; si falla se hace rollback y se retorna el error sin avanzar la version."
tags: [sqlite, migrations, schema, versioned, transactional, embed, infra]
uses_functions: []
uses_types: [error_go_core]
returns: []
returns_optional: false
error_type: "error_go_core"
imports:
- "database/sql"
- "io/fs"
- "fmt"
- "path"
- "sort"
- "strconv"
- "strings"
- "time"
tested: true
tests:
- "aplica todas desde cero y registra schema_migrations"
- "idempotente por version, no vuelve a aplicar"
- "migracion intermedia falla, version anterior no avanza"
- "archivos sin prefijo numerico se ignoran"
- "dir vacio no error y no crea schema_migrations"
test_file_path: "functions/infra/sqlite_apply_versioned_migrations_test.go"
file_path: "functions/infra/sqlite_apply_versioned_migrations.go"
params:
- name: conn
desc: "Conexion SQLite abierta. Debe apuntar a la base de datos donde se gestionaran las migraciones."
- name: fsys
desc: "Sistema de archivos (embed.FS, os.DirFS, fstest.MapFS, etc.) que contiene el directorio de migraciones."
- name: dir
desc: "Ruta del directorio dentro de fsys que contiene los archivos .sql (ej. 'migrations')."
output: "nil si todas las migraciones pendientes se aplicaron correctamente; error descriptivo con el nombre del archivo que fallo en caso contrario."
---
## Ejemplo
```go
//go:embed migrations/*.sql
var migrationsFS embed.FS
func openDB(path string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", path+"?_foreign_keys=on&_journal_mode=WAL")
if err != nil {
return nil, err
}
if err := infra.ApplyVersionedMigrations(db, migrationsFS, "migrations"); err != nil {
db.Close()
return nil, fmt.Errorf("migrations: %w", err)
}
return db, nil
}
```
## Diferencias vs sqlite_apply_migrations_go_infra (naive)
| Aspecto | `sqlite_apply_versioned_migrations` (esta) | `sqlite_apply_migrations` (naive) |
|---|---|---|
| Tracking | Tabla `schema_migrations` — sabe exactamente que versiones estan aplicadas | Sin tabla de tracking — reaplica todo cada vez |
| Idempotencia | Por numero de version (`version <= current` se salta) | Por error — ignora "duplicate column / already exists" |
| Transacciones | Una transaccion por archivo — rollback limpio si falla | Sin transacciones — sentencias sueltas |
| Parsing SQL | Confia en SQLite multi-statement (`tx.Exec` del contenido completo) | Split manual por `;` (fragil con strings) |
| Uso ideal | Apps con `operations.db` propias, BDs con datos vivos, deploy multi-PC | Bootstrap rapido, scripts de seed, migraciones sin estado persistente |
**Regla practica:** usa `sqlite_apply_versioned_migrations` cuando necesites saber que se aplico, cuando, y garantizar que un fallo no deja la BD a medio migrar. Usa `sqlite_apply_migrations` para scripts de seed o inicializacion que no importa repetir.
## Notas
- La funcion esta adaptada directamente de `fn_operations/migrate.go` — el patron probado en produccion del registry.
- `schema_migrations` guarda `version` (INTEGER PK), `name` (filename), `applied_at` (RFC3339 UTC).
- El SQL de cada archivo se ejecuta con una sola llamada `tx.Exec(content)` sin split por `;`. Esto funciona correctamente con el driver `go-sqlite3` (CGO) que soporta multi-statement. No usar con drivers pure-Go que no soporten multi-statement.
- Archivos sin prefijo numerico parseable o sin extension `.sql` se ignoran silenciosamente.
- Compatible con `embed.FS`, `os.DirFS`, y `fstest.MapFS` (util en tests).
@@ -0,0 +1,158 @@
package infra
import (
"database/sql"
"testing"
"testing/fstest"
_ "github.com/mattn/go-sqlite3"
)
func newTestDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("open db: %v", err)
}
t.Cleanup(func() { db.Close() })
return db
}
func TestApplyVersionedMigrations(t *testing.T) {
t.Run("aplica todas desde cero y registra schema_migrations", func(t *testing.T) {
db := newTestDB(t)
fsys := fstest.MapFS{
"migrations/001_init.sql": {Data: []byte("CREATE TABLE users (id INTEGER PRIMARY KEY);")},
"migrations/002_add_email.sql": {Data: []byte("ALTER TABLE users ADD COLUMN email TEXT;")},
}
if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Verify schema_migrations has 2 rows
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil {
t.Fatalf("count schema_migrations: %v", err)
}
if count != 2 {
t.Errorf("schema_migrations rows: got %d, want 2", count)
}
// Verify MAX(version) == 2
var maxV int
if err := db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&maxV); err != nil {
t.Fatalf("max version: %v", err)
}
if maxV != 2 {
t.Errorf("max version: got %d, want 2", maxV)
}
// Verify the table from migration 001 exists
if _, err := db.Exec("INSERT INTO users (id) VALUES (1)"); err != nil {
t.Errorf("users table not created: %v", err)
}
})
t.Run("idempotente por version, no vuelve a aplicar", func(t *testing.T) {
db := newTestDB(t)
fsys := fstest.MapFS{
"migrations/001_init.sql": {Data: []byte("CREATE TABLE things (id INTEGER PRIMARY KEY);")},
}
// First run
if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil {
t.Fatalf("first run: %v", err)
}
// Second run — must not error even though CREATE TABLE would fail normally
if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil {
t.Fatalf("second run: %v", err)
}
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil {
t.Fatalf("count: %v", err)
}
if count != 1 {
t.Errorf("expected 1 row in schema_migrations, got %d", count)
}
})
t.Run("migracion intermedia falla, version anterior no avanza", func(t *testing.T) {
db := newTestDB(t)
fsys := fstest.MapFS{
"migrations/001_init.sql": {Data: []byte("CREATE TABLE ok (id INTEGER PRIMARY KEY);")},
"migrations/002_bad.sql": {Data: []byte("THIS IS NOT VALID SQL !!!;")},
"migrations/003_more.sql": {Data: []byte("CREATE TABLE more (id INTEGER PRIMARY KEY);")},
}
err := ApplyVersionedMigrations(db, fsys, "migrations")
if err == nil {
t.Fatal("expected error from bad migration, got nil")
}
// Only version 1 should be recorded
var maxV int
if err2 := db.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&maxV); err2 != nil {
t.Fatalf("read version: %v", err2)
}
if maxV != 1 {
t.Errorf("max version after failure: got %d, want 1", maxV)
}
// Migration 003 table must NOT exist
if _, err2 := db.Exec("INSERT INTO more (id) VALUES (1)"); err2 == nil {
t.Error("table 'more' should not exist after failed migration 002")
}
})
t.Run("archivos sin prefijo numerico se ignoran", func(t *testing.T) {
db := newTestDB(t)
fsys := fstest.MapFS{
"migrations/README.sql": {Data: []byte("-- this should be ignored")},
"migrations/init.sql": {Data: []byte("CREATE TABLE bad (id INTEGER PRIMARY KEY);")},
"migrations/001_good.sql": {Data: []byte("CREATE TABLE good (id INTEGER PRIMARY KEY);")},
}
if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil {
t.Fatalf("count: %v", err)
}
if count != 1 {
t.Errorf("expected 1 migration applied, got %d", count)
}
// Only 'good' table should exist
if _, err := db.Exec("INSERT INTO good (id) VALUES (1)"); err != nil {
t.Errorf("table 'good' should exist: %v", err)
}
if _, err := db.Exec("INSERT INTO bad (id) VALUES (1)"); err == nil {
t.Error("table 'bad' should NOT exist")
}
})
t.Run("dir vacio no error y no crea schema_migrations", func(t *testing.T) {
db := newTestDB(t)
fsys := fstest.MapFS{
"migrations/.keep": {Data: []byte("")},
}
if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil {
t.Fatalf("unexpected error on empty dir: %v", err)
}
// schema_migrations IS created (the CREATE TABLE IF NOT EXISTS runs regardless)
// but it must be empty
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil {
t.Fatalf("count schema_migrations: %v", err)
}
if count != 0 {
t.Errorf("expected 0 rows in schema_migrations, got %d", count)
}
})
}
+6 -105
View File
@@ -3,115 +3,16 @@ package registry
import (
"database/sql"
"embed"
"fmt"
"path"
"sort"
"strconv"
"strings"
"time"
"fn-registry/functions/infra"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
const migrationTableSQL = `
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL
);`
// migrate applies pending migrations to the database.
// migrate applies pending migrations to the registry database via the
// registry's sqlite_apply_versioned_migrations_go_infra (schema_migrations
// tracking + per-file transactions).
func migrate(conn *sql.DB) error {
if _, err := conn.Exec(migrationTableSQL); err != nil {
return fmt.Errorf("creating schema_migrations table: %w", err)
}
current, err := currentVersion(conn)
if err != nil {
return err
}
files, err := listMigrations()
if err != nil {
return err
}
for _, mf := range files {
if mf.version <= current {
continue
}
content, err := migrationsFS.ReadFile(path.Join("migrations", mf.filename))
if err != nil {
return fmt.Errorf("reading migration %s: %w", mf.filename, err)
}
tx, err := conn.Begin()
if err != nil {
return fmt.Errorf("beginning transaction for migration %d: %w", mf.version, err)
}
if _, err := tx.Exec(string(content)); err != nil {
tx.Rollback()
return fmt.Errorf("applying migration %s: %w", mf.filename, err)
}
if _, err := tx.Exec(
"INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)",
mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339),
); err != nil {
tx.Rollback()
return fmt.Errorf("recording migration %s: %w", mf.filename, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing migration %s: %w", mf.filename, err)
}
}
return nil
}
func currentVersion(conn *sql.DB) (int, error) {
var v int
err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v)
if err != nil {
return 0, fmt.Errorf("reading current schema version: %w", err)
}
return v, nil
}
type migrationFile struct {
version int
filename string
}
func listMigrations() ([]migrationFile, error) {
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
return nil, fmt.Errorf("reading migrations directory: %w", err)
}
var files []migrationFile
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") {
continue
}
parts := strings.SplitN(e.Name(), "_", 2)
if len(parts) < 2 {
continue
}
v, err := strconv.Atoi(parts[0])
if err != nil {
continue
}
files = append(files, migrationFile{version: v, filename: e.Name()})
}
sort.Slice(files, func(i, j int) bool {
return files[i].version < files[j].version
})
return files, nil
return infra.ApplyVersionedMigrations(conn, migrationsFS, "migrations")
}