diff --git a/fn_operations/migrate.go b/fn_operations/migrate.go index 782b97f4..1a4ef396 100644 --- a/fn_operations/migrate.go +++ b/fn_operations/migrate.go @@ -3,115 +3,16 @@ package fn_operations import ( "database/sql" "embed" - "fmt" - "path" - "sort" - "strconv" - "strings" - "time" + + "fn-registry/functions/infra" ) //go:embed migrations/*.sql var migrationsFS embed.FS -const migrationTableSQL = ` -CREATE TABLE IF NOT EXISTS schema_migrations ( - version INTEGER PRIMARY KEY, - name TEXT NOT NULL, - applied_at TEXT NOT NULL -);` - -// migrate applies pending migrations to the database. +// migrate applies pending migrations to the database via the registry's +// sqlite_apply_versioned_migrations_go_infra (schema_migrations tracking +// + per-file transactions). func migrate(conn *sql.DB) error { - if _, err := conn.Exec(migrationTableSQL); err != nil { - return fmt.Errorf("creating schema_migrations table: %w", err) - } - - current, err := currentVersion(conn) - if err != nil { - return err - } - - files, err := listMigrations() - if err != nil { - return err - } - - for _, mf := range files { - if mf.version <= current { - continue - } - - content, err := migrationsFS.ReadFile(path.Join("migrations", mf.filename)) - if err != nil { - return fmt.Errorf("reading migration %s: %w", mf.filename, err) - } - - tx, err := conn.Begin() - if err != nil { - return fmt.Errorf("beginning transaction for migration %d: %w", mf.version, err) - } - - if _, err := tx.Exec(string(content)); err != nil { - tx.Rollback() - return fmt.Errorf("applying migration %s: %w", mf.filename, err) - } - - if _, err := tx.Exec( - "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)", - mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339), - ); err != nil { - tx.Rollback() - return fmt.Errorf("recording migration %s: %w", mf.filename, err) - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("committing migration %s: %w", mf.filename, err) - } - } - - return nil -} - -func currentVersion(conn *sql.DB) (int, error) { - var v int - err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v) - if err != nil { - return 0, fmt.Errorf("reading current schema version: %w", err) - } - return v, nil -} - -type migrationFile struct { - version int - filename string -} - -func listMigrations() ([]migrationFile, error) { - entries, err := migrationsFS.ReadDir("migrations") - if err != nil { - return nil, fmt.Errorf("reading migrations directory: %w", err) - } - - var files []migrationFile - for _, e := range entries { - if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") { - continue - } - parts := strings.SplitN(e.Name(), "_", 2) - if len(parts) < 2 { - continue - } - v, err := strconv.Atoi(parts[0]) - if err != nil { - continue - } - files = append(files, migrationFile{version: v, filename: e.Name()}) - } - - sort.Slice(files, func(i, j int) bool { - return files[i].version < files[j].version - }) - - return files, nil + return infra.ApplyVersionedMigrations(conn, migrationsFS, "migrations") } diff --git a/functions/infra/sqlite_apply_versioned_migrations.go b/functions/infra/sqlite_apply_versioned_migrations.go new file mode 100644 index 00000000..5270ebc2 --- /dev/null +++ b/functions/infra/sqlite_apply_versioned_migrations.go @@ -0,0 +1,127 @@ +package infra + +import ( + "database/sql" + "fmt" + "io/fs" + "path" + "sort" + "strconv" + "strings" + "time" +) + +const createSchemaMigrationsTable = ` +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + applied_at TEXT NOT NULL +);` + +// ApplyVersionedMigrations applies pending SQLite migrations from fsys, tracking +// applied versions in a schema_migrations table. Each migration runs in its +// own transaction; on error the tx is rolled back and the function returns. +// +// Migration filenames must be NNN_name.sql (e.g. 001_init.sql, +// 002_add_users.sql). The numeric prefix is the version. Files without a +// numeric prefix or with non-.sql extensions are skipped. +// +// dir is the directory inside fsys containing the migrations (e.g. +// "migrations"). Idempotent: migrations whose version <= current are skipped. +func ApplyVersionedMigrations(conn *sql.DB, fsys fs.FS, dir string) error { + if _, err := conn.Exec(createSchemaMigrationsTable); err != nil { + return fmt.Errorf("apply_versioned_migrations: create schema_migrations: %w", err) + } + + current, err := versionedMigrationsCurrentVersion(conn) + if err != nil { + return err + } + + files, err := versionedMigrationsList(fsys, dir) + if err != nil { + return err + } + + for _, mf := range files { + if mf.version <= current { + continue + } + + content, err := fs.ReadFile(fsys, path.Join(dir, mf.filename)) + if err != nil { + return fmt.Errorf("apply_versioned_migrations: read %s: %w", mf.filename, err) + } + + tx, err := conn.Begin() + if err != nil { + return fmt.Errorf("apply_versioned_migrations: begin tx for %s: %w", mf.filename, err) + } + + if _, err := tx.Exec(string(content)); err != nil { + tx.Rollback() //nolint:errcheck + return fmt.Errorf("apply_versioned_migrations: exec %s: %w", mf.filename, err) + } + + if _, err := tx.Exec( + "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)", + mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339), + ); err != nil { + tx.Rollback() //nolint:errcheck + return fmt.Errorf("apply_versioned_migrations: record %s: %w", mf.filename, err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("apply_versioned_migrations: commit %s: %w", mf.filename, err) + } + } + + return nil +} + +// versionedMigrationsCurrentVersion returns MAX(version) from schema_migrations, +// or 0 if the table is empty. +func versionedMigrationsCurrentVersion(conn *sql.DB) (int, error) { + var v int + err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v) + if err != nil { + return 0, fmt.Errorf("apply_versioned_migrations: read current version: %w", err) + } + return v, nil +} + +type versionedMigrationFile struct { + version int + filename string +} + +// versionedMigrationsList reads dir from fsys and returns .sql files with a +// numeric NNN_ prefix, sorted by version ascending. +func versionedMigrationsList(fsys fs.FS, dir string) ([]versionedMigrationFile, error) { + entries, err := fs.ReadDir(fsys, dir) + if err != nil { + return nil, fmt.Errorf("apply_versioned_migrations: read dir %q: %w", dir, err) + } + + var files []versionedMigrationFile + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") { + continue + } + parts := strings.SplitN(e.Name(), "_", 2) + if len(parts) < 2 { + continue + } + v, err := strconv.Atoi(parts[0]) + if err != nil { + continue + } + files = append(files, versionedMigrationFile{version: v, filename: e.Name()}) + } + + sort.Slice(files, func(i, j int) bool { + return files[i].version < files[j].version + }) + + return files, nil +} diff --git a/functions/infra/sqlite_apply_versioned_migrations.md b/functions/infra/sqlite_apply_versioned_migrations.md new file mode 100644 index 00000000..b045da9f --- /dev/null +++ b/functions/infra/sqlite_apply_versioned_migrations.md @@ -0,0 +1,81 @@ +--- +name: sqlite_apply_versioned_migrations +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ApplyVersionedMigrations(conn *sql.DB, fsys fs.FS, dir string) error" +description: "Aplica migraciones SQLite pendientes desde un fs.FS con tracking explicito de versiones en schema_migrations. Cada migracion corre en su propia transaccion; si falla se hace rollback y se retorna el error sin avanzar la version." +tags: [sqlite, migrations, schema, versioned, transactional, embed, infra] +uses_functions: [] +uses_types: [error_go_core] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: + - "database/sql" + - "io/fs" + - "fmt" + - "path" + - "sort" + - "strconv" + - "strings" + - "time" +tested: true +tests: + - "aplica todas desde cero y registra schema_migrations" + - "idempotente por version, no vuelve a aplicar" + - "migracion intermedia falla, version anterior no avanza" + - "archivos sin prefijo numerico se ignoran" + - "dir vacio no error y no crea schema_migrations" +test_file_path: "functions/infra/sqlite_apply_versioned_migrations_test.go" +file_path: "functions/infra/sqlite_apply_versioned_migrations.go" +params: + - name: conn + desc: "Conexion SQLite abierta. Debe apuntar a la base de datos donde se gestionaran las migraciones." + - name: fsys + desc: "Sistema de archivos (embed.FS, os.DirFS, fstest.MapFS, etc.) que contiene el directorio de migraciones." + - name: dir + desc: "Ruta del directorio dentro de fsys que contiene los archivos .sql (ej. 'migrations')." +output: "nil si todas las migraciones pendientes se aplicaron correctamente; error descriptivo con el nombre del archivo que fallo en caso contrario." +--- + +## Ejemplo + +```go +//go:embed migrations/*.sql +var migrationsFS embed.FS + +func openDB(path string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", path+"?_foreign_keys=on&_journal_mode=WAL") + if err != nil { + return nil, err + } + if err := infra.ApplyVersionedMigrations(db, migrationsFS, "migrations"); err != nil { + db.Close() + return nil, fmt.Errorf("migrations: %w", err) + } + return db, nil +} +``` + +## Diferencias vs sqlite_apply_migrations_go_infra (naive) + +| Aspecto | `sqlite_apply_versioned_migrations` (esta) | `sqlite_apply_migrations` (naive) | +|---|---|---| +| Tracking | Tabla `schema_migrations` — sabe exactamente que versiones estan aplicadas | Sin tabla de tracking — reaplica todo cada vez | +| Idempotencia | Por numero de version (`version <= current` se salta) | Por error — ignora "duplicate column / already exists" | +| Transacciones | Una transaccion por archivo — rollback limpio si falla | Sin transacciones — sentencias sueltas | +| Parsing SQL | Confia en SQLite multi-statement (`tx.Exec` del contenido completo) | Split manual por `;` (fragil con strings) | +| Uso ideal | Apps con `operations.db` propias, BDs con datos vivos, deploy multi-PC | Bootstrap rapido, scripts de seed, migraciones sin estado persistente | + +**Regla practica:** usa `sqlite_apply_versioned_migrations` cuando necesites saber que se aplico, cuando, y garantizar que un fallo no deja la BD a medio migrar. Usa `sqlite_apply_migrations` para scripts de seed o inicializacion que no importa repetir. + +## Notas + +- La funcion esta adaptada directamente de `fn_operations/migrate.go` — el patron probado en produccion del registry. +- `schema_migrations` guarda `version` (INTEGER PK), `name` (filename), `applied_at` (RFC3339 UTC). +- El SQL de cada archivo se ejecuta con una sola llamada `tx.Exec(content)` sin split por `;`. Esto funciona correctamente con el driver `go-sqlite3` (CGO) que soporta multi-statement. No usar con drivers pure-Go que no soporten multi-statement. +- Archivos sin prefijo numerico parseable o sin extension `.sql` se ignoran silenciosamente. +- Compatible con `embed.FS`, `os.DirFS`, y `fstest.MapFS` (util en tests). diff --git a/functions/infra/sqlite_apply_versioned_migrations_test.go b/functions/infra/sqlite_apply_versioned_migrations_test.go new file mode 100644 index 00000000..798cfd15 --- /dev/null +++ b/functions/infra/sqlite_apply_versioned_migrations_test.go @@ -0,0 +1,158 @@ +package infra + +import ( + "database/sql" + "testing" + "testing/fstest" + + _ "github.com/mattn/go-sqlite3" +) + +func newTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open db: %v", err) + } + t.Cleanup(func() { db.Close() }) + return db +} + +func TestApplyVersionedMigrations(t *testing.T) { + t.Run("aplica todas desde cero y registra schema_migrations", func(t *testing.T) { + db := newTestDB(t) + fsys := fstest.MapFS{ + "migrations/001_init.sql": {Data: []byte("CREATE TABLE users (id INTEGER PRIMARY KEY);")}, + "migrations/002_add_email.sql": {Data: []byte("ALTER TABLE users ADD COLUMN email TEXT;")}, + } + + if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify schema_migrations has 2 rows + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil { + t.Fatalf("count schema_migrations: %v", err) + } + if count != 2 { + t.Errorf("schema_migrations rows: got %d, want 2", count) + } + + // Verify MAX(version) == 2 + var maxV int + if err := db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&maxV); err != nil { + t.Fatalf("max version: %v", err) + } + if maxV != 2 { + t.Errorf("max version: got %d, want 2", maxV) + } + + // Verify the table from migration 001 exists + if _, err := db.Exec("INSERT INTO users (id) VALUES (1)"); err != nil { + t.Errorf("users table not created: %v", err) + } + }) + + t.Run("idempotente por version, no vuelve a aplicar", func(t *testing.T) { + db := newTestDB(t) + fsys := fstest.MapFS{ + "migrations/001_init.sql": {Data: []byte("CREATE TABLE things (id INTEGER PRIMARY KEY);")}, + } + + // First run + if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil { + t.Fatalf("first run: %v", err) + } + // Second run — must not error even though CREATE TABLE would fail normally + if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil { + t.Fatalf("second run: %v", err) + } + + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 1 { + t.Errorf("expected 1 row in schema_migrations, got %d", count) + } + }) + + t.Run("migracion intermedia falla, version anterior no avanza", func(t *testing.T) { + db := newTestDB(t) + fsys := fstest.MapFS{ + "migrations/001_init.sql": {Data: []byte("CREATE TABLE ok (id INTEGER PRIMARY KEY);")}, + "migrations/002_bad.sql": {Data: []byte("THIS IS NOT VALID SQL !!!;")}, + "migrations/003_more.sql": {Data: []byte("CREATE TABLE more (id INTEGER PRIMARY KEY);")}, + } + + err := ApplyVersionedMigrations(db, fsys, "migrations") + if err == nil { + t.Fatal("expected error from bad migration, got nil") + } + + // Only version 1 should be recorded + var maxV int + if err2 := db.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&maxV); err2 != nil { + t.Fatalf("read version: %v", err2) + } + if maxV != 1 { + t.Errorf("max version after failure: got %d, want 1", maxV) + } + + // Migration 003 table must NOT exist + if _, err2 := db.Exec("INSERT INTO more (id) VALUES (1)"); err2 == nil { + t.Error("table 'more' should not exist after failed migration 002") + } + }) + + t.Run("archivos sin prefijo numerico se ignoran", func(t *testing.T) { + db := newTestDB(t) + fsys := fstest.MapFS{ + "migrations/README.sql": {Data: []byte("-- this should be ignored")}, + "migrations/init.sql": {Data: []byte("CREATE TABLE bad (id INTEGER PRIMARY KEY);")}, + "migrations/001_good.sql": {Data: []byte("CREATE TABLE good (id INTEGER PRIMARY KEY);")}, + } + + if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 1 { + t.Errorf("expected 1 migration applied, got %d", count) + } + + // Only 'good' table should exist + if _, err := db.Exec("INSERT INTO good (id) VALUES (1)"); err != nil { + t.Errorf("table 'good' should exist: %v", err) + } + if _, err := db.Exec("INSERT INTO bad (id) VALUES (1)"); err == nil { + t.Error("table 'bad' should NOT exist") + } + }) + + t.Run("dir vacio no error y no crea schema_migrations", func(t *testing.T) { + db := newTestDB(t) + fsys := fstest.MapFS{ + "migrations/.keep": {Data: []byte("")}, + } + + if err := ApplyVersionedMigrations(db, fsys, "migrations"); err != nil { + t.Fatalf("unexpected error on empty dir: %v", err) + } + + // schema_migrations IS created (the CREATE TABLE IF NOT EXISTS runs regardless) + // but it must be empty + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count); err != nil { + t.Fatalf("count schema_migrations: %v", err) + } + if count != 0 { + t.Errorf("expected 0 rows in schema_migrations, got %d", count) + } + }) +} diff --git a/registry/migrate.go b/registry/migrate.go index 212aa350..e3e86209 100644 --- a/registry/migrate.go +++ b/registry/migrate.go @@ -3,115 +3,16 @@ package registry import ( "database/sql" "embed" - "fmt" - "path" - "sort" - "strconv" - "strings" - "time" + + "fn-registry/functions/infra" ) //go:embed migrations/*.sql var migrationsFS embed.FS -const migrationTableSQL = ` -CREATE TABLE IF NOT EXISTS schema_migrations ( - version INTEGER PRIMARY KEY, - name TEXT NOT NULL, - applied_at TEXT NOT NULL -);` - -// migrate applies pending migrations to the database. +// migrate applies pending migrations to the registry database via the +// registry's sqlite_apply_versioned_migrations_go_infra (schema_migrations +// tracking + per-file transactions). func migrate(conn *sql.DB) error { - if _, err := conn.Exec(migrationTableSQL); err != nil { - return fmt.Errorf("creating schema_migrations table: %w", err) - } - - current, err := currentVersion(conn) - if err != nil { - return err - } - - files, err := listMigrations() - if err != nil { - return err - } - - for _, mf := range files { - if mf.version <= current { - continue - } - - content, err := migrationsFS.ReadFile(path.Join("migrations", mf.filename)) - if err != nil { - return fmt.Errorf("reading migration %s: %w", mf.filename, err) - } - - tx, err := conn.Begin() - if err != nil { - return fmt.Errorf("beginning transaction for migration %d: %w", mf.version, err) - } - - if _, err := tx.Exec(string(content)); err != nil { - tx.Rollback() - return fmt.Errorf("applying migration %s: %w", mf.filename, err) - } - - if _, err := tx.Exec( - "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)", - mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339), - ); err != nil { - tx.Rollback() - return fmt.Errorf("recording migration %s: %w", mf.filename, err) - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("committing migration %s: %w", mf.filename, err) - } - } - - return nil -} - -func currentVersion(conn *sql.DB) (int, error) { - var v int - err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v) - if err != nil { - return 0, fmt.Errorf("reading current schema version: %w", err) - } - return v, nil -} - -type migrationFile struct { - version int - filename string -} - -func listMigrations() ([]migrationFile, error) { - entries, err := migrationsFS.ReadDir("migrations") - if err != nil { - return nil, fmt.Errorf("reading migrations directory: %w", err) - } - - var files []migrationFile - for _, e := range entries { - if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") { - continue - } - parts := strings.SplitN(e.Name(), "_", 2) - if len(parts) < 2 { - continue - } - v, err := strconv.Atoi(parts[0]) - if err != nil { - continue - } - files = append(files, migrationFile{version: v, filename: e.Name()}) - } - - sort.Slice(files, func(i, j int) bool { - return files[i].version < files[j].version - }) - - return files, nil + return infra.ApplyVersionedMigrations(conn, migrationsFS, "migrations") }