diff --git a/cmd/membershipd/main.go b/cmd/membershipd/main.go index ee509f5..8d0e767 100644 --- a/cmd/membershipd/main.go +++ b/cmd/membershipd/main.go @@ -33,6 +33,13 @@ func main() { runUserCLI(os.Args[2:]) return } + // `membershipd migrate-to-kv` is the one-time, idempotent SQLite->JetStream KV + // data move for decentralization (issue 0003c). Like the user CLI it runs on + // the host and is dispatched before the server flag set parses os.Args. + if len(os.Args) > 1 && os.Args[1] == "migrate-to-kv" { + runMigrateCLI(os.Args[2:]) + return + } var ( bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers") diff --git a/cmd/membershipd/migrate_cli.go b/cmd/membershipd/migrate_cli.go new file mode 100644 index 0000000..997b0ad --- /dev/null +++ b/cmd/membershipd/migrate_cli.go @@ -0,0 +1,87 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// runMigrateCLI implements `membershipd migrate-to-kv`, the idempotent move of +// the control-plane state from the local SQLite database into replicated +// JetStream KV (issue 0003c). It backs up the SQLite file first (VACUUM INTO), +// then connects to the target NATS and copies every room/member/key/user into +// the KV buckets. Re-running it converges to the same state. +// +// It runs on the bus host (no auth on the control-plane side), connecting to the +// cluster's NATS; --ca pins TLS when the data plane is secured. +func runMigrateCLI(args []string) { + fs := flag.NewFlagSet("migrate-to-kv", flag.ExitOnError) + dbPath := fs.String("db", defaultDBPath, "SQLite database path to migrate FROM") + natsURL := fs.String("nats-url", "", "NATS url of the cluster to migrate INTO (required)") + ca := fs.String("ca", "", "CA cert to pin TLS on the NATS connection (optional)") + replicas := fs.Int("replicas", 1, "KV replication factor (1 for a 1-2 node rollout, 3 for HA quorum)") + noBackup := fs.Bool("no-backup", false, "skip the SQLite backup before migrating (NOT recommended)") + _ = fs.Parse(args) + + if *natsURL == "" { + fmt.Fprintln(os.Stderr, "membershipd migrate-to-kv: --nats-url is required (the cluster to write the KV buckets into)") + os.Exit(2) + } + + // Back up the SQLite database first so a botched migration can be undone. + var backupPath string + if !*noBackup { + bak, err := membership.BackupSQLite(*dbPath) + if err != nil { + fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: backup failed: %v\n", err) + os.Exit(1) + } + backupPath = bak + fmt.Printf("backed up %s -> %s\n", *dbPath, backupPath) + } + + // Connect to the target NATS (optionally TLS-pinned to the bus CA). + natsOpts := []nats.Option{nats.Name("unibus-migrate")} + if *ca != "" { + tlsCfg, err := busauth.LoadCATLSConfig(*ca) + if err != nil { + fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: load CA: %v\n", err) + os.Exit(1) + } + natsOpts = append(natsOpts, nats.Secure(tlsCfg)) + } + nc, err := nats.Connect(*natsURL, natsOpts...) + if err != nil { + fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: connect %q: %v\n", *natsURL, err) + os.Exit(1) + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: jetstream: %v\n", err) + os.Exit(1) + } + + report, err := membership.MigrateSQLiteToKV(*dbPath, js, membership.JetStreamConfig{ + Replicas: *replicas, + OpTimeout: 30 * time.Second, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: %v\n", err) + os.Exit(1) + } + report.BackupPath = backupPath + + fmt.Printf("migrated to KV (replicas=%d): %d rooms, %d members, %d keys, %d users\n", + *replicas, report.Rooms, report.Members, report.Keys, report.Users) + if backupPath != "" { + fmt.Printf("rollback: restore %s if needed\n", backupPath) + } +} diff --git a/pkg/membership/jetstream_store.go b/pkg/membership/jetstream_store.go index 0ab63a5..74f561e 100644 --- a/pkg/membership/jetstream_store.go +++ b/pkg/membership/jetstream_store.go @@ -37,6 +37,7 @@ import ( "fmt" "sort" "strconv" + "strings" "time" "github.com/nats-io/nats.go/jetstream" @@ -508,3 +509,125 @@ func (s *jetstreamStore) HasAdmin() bool { } return false } + +// ---- snapshot import / export (issue 0003c migration) --------------------- + +// importSnapshot writes a full Snapshot into the KV buckets, preserving each +// room's epoch and each user's status (Put, not CreateRoom/AddUser, so the exact +// state is reproduced rather than reset to defaults). Idempotent: every write is +// an overwrite, so re-running the migration converges. +func (s *jetstreamStore) importSnapshot(snap *Snapshot) error { + ctx, cancel := s.ctx() + defer cancel() + for _, r := range snap.Rooms { + b, err := json.Marshal(r) + if err != nil { + return fmt.Errorf("import: marshal room %q: %w", r.RoomID, err) + } + if _, err := s.rooms.Put(ctx, r.RoomID, b); err != nil { + return fmt.Errorf("import: put room %q: %w", r.RoomID, err) + } + } + for roomID, members := range snap.Members { + for _, m := range members { + if err := s.putMember(ctx, roomID, m); err != nil { + return fmt.Errorf("import: %w", err) + } + } + } + for _, rec := range snap.Keys { + if _, err := s.keys.Put(ctx, sealedKey(rec.RoomID, rec.Endpoint, rec.Epoch), rec.Sealed); err != nil { + return fmt.Errorf("import: put key %q/%q@%d: %w", rec.RoomID, rec.Endpoint, rec.Epoch, err) + } + } + for _, u := range snap.Users { + b, err := json.Marshal(u) + if err != nil { + return fmt.Errorf("import: marshal user %q: %w", u.SignPub, err) + } + if _, err := s.users.Put(ctx, normalizeSignPub(u.SignPub), b); err != nil { + return fmt.Errorf("import: put user %q: %w", u.SignPub, err) + } + } + return nil +} + +// ExportSnapshot reads the entire KV control-plane state back into a Snapshot, +// so the migration's parity test can compare it against the SQLite source. +func (s *jetstreamStore) ExportSnapshot() (*Snapshot, error) { + snap := &Snapshot{Members: map[string][]Member{}} + + roomEntries, err := s.watchAll(s.rooms) + if err != nil { + return nil, fmt.Errorf("export kv: rooms: %w", err) + } + for _, e := range roomEntries { + var r RoomInfo + if err := json.Unmarshal(e.Value(), &r); err != nil { + return nil, fmt.Errorf("export kv: unmarshal room: %w", err) + } + snap.Rooms = append(snap.Rooms, r) + } + + memberEntries, err := s.watchAll(s.members) + if err != nil { + return nil, fmt.Errorf("export kv: members: %w", err) + } + for _, e := range memberEntries { + // Key is "."; neither segment contains a dot. + roomID := strings.SplitN(e.Key(), ".", 2)[0] + var m Member + if err := json.Unmarshal(e.Value(), &m); err != nil { + return nil, fmt.Errorf("export kv: unmarshal member: %w", err) + } + snap.Members[roomID] = append(snap.Members[roomID], m) + } + + keyEntries, err := s.watchAll(s.keys) + if err != nil { + return nil, fmt.Errorf("export kv: keys: %w", err) + } + for _, e := range keyEntries { + // Key is "..". + parts := strings.Split(e.Key(), ".") + if len(parts) != 3 { + continue + } + epoch, err := strconv.Atoi(parts[2]) + if err != nil { + continue + } + snap.Keys = append(snap.Keys, SealedKeyRecord{RoomID: parts[0], Endpoint: parts[1], Epoch: epoch, Sealed: e.Value()}) + } + + users, err := s.ListUsers() + if err != nil { + return nil, fmt.Errorf("export kv: users: %w", err) + } + snap.Users = users + return snap, nil +} + +// watchAll collects every current entry of a bucket (no key filter), draining +// the watcher to its initial-snapshot nil marker. +func (s *jetstreamStore) watchAll(kv jetstream.KeyValue) ([]jetstream.KeyValueEntry, error) { + ctx, cancel := s.ctx() + defer cancel() + w, err := kv.WatchAll(ctx, jetstream.IgnoreDeletes()) + if err != nil { + return nil, err + } + defer w.Stop() + var out []jetstream.KeyValueEntry + for { + select { + case e := <-w.Updates(): + if e == nil { + return out, nil + } + out = append(out, e) + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} diff --git a/pkg/membership/migrate.go b/pkg/membership/migrate.go new file mode 100644 index 0000000..6a12d5d --- /dev/null +++ b/pkg/membership/migrate.go @@ -0,0 +1,176 @@ +package membership + +// Migration from the local SQLite control plane to replicated JetStream KV +// (issue 0003c). It is the one-time, idempotent data move that decentralization +// needs: read the entire SQLite state, write it into the KV buckets. Re-running +// it is safe (every KV write is an overwrite), so a partial/interrupted run is +// recovered by running again, and a parity test can assert the two stores hold +// the same state before and after. + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// SealedKeyRecord is one row of room_keys: the sealed room key for an endpoint +// at a given epoch. It is the unit the snapshot carries so a backend can be +// imported with the exact epoch history (CreateRoom/AddMember alone could not +// reproduce a multi-epoch room). +type SealedKeyRecord struct { + RoomID string + Endpoint string + Epoch int + Sealed []byte +} + +// Snapshot is the complete control-plane state, backend-agnostic. It is what +// ExportSnapshot produces and importSnapshot consumes, so the SQLite->KV +// migration and the parity test both work in terms of it. +type Snapshot struct { + Rooms []RoomInfo + Members map[string][]Member // roomID -> members + Keys []SealedKeyRecord + Users []User +} + +// MigrateReport summarizes what a migration moved, for the operator log. +type MigrateReport struct { + BackupPath string + Rooms int + Members int + Keys int + Users int +} + +// MigrateSQLiteToKV reads the SQLite store at sqlitePath and writes its entire +// state into the JetStream KV buckets on js (created with cfg.Replicas). It is +// idempotent: re-running converges to the same state. The caller is responsible +// for backing up the SQLite file first (BackupSQLite) — this function only +// reads it. +func MigrateSQLiteToKV(sqlitePath string, js jetstream.JetStream, cfg JetStreamConfig) (*MigrateReport, error) { + src, err := openSQLite(sqlitePath) + if err != nil { + return nil, fmt.Errorf("migrate: open sqlite %q: %w", sqlitePath, err) + } + defer src.Close() + + snap, err := src.ExportSnapshot() + if err != nil { + return nil, fmt.Errorf("migrate: export sqlite: %w", err) + } + + dst, err := OpenJetStream(js, cfg) + if err != nil { + return nil, fmt.Errorf("migrate: open kv: %w", err) + } + kv := dst.(*jetstreamStore) + if err := kv.importSnapshot(snap); err != nil { + return nil, fmt.Errorf("migrate: import to kv: %w", err) + } + + members := 0 + for _, ms := range snap.Members { + members += len(ms) + } + return &MigrateReport{ + Rooms: len(snap.Rooms), + Members: members, + Keys: len(snap.Keys), + Users: len(snap.Users), + }, nil +} + +// BackupSQLite makes a consistent copy of the SQLite database next to it, +// named ".bak.", using SQLite's own VACUUM INTO (which writes a +// transactionally-consistent snapshot even with a live WAL). It returns the +// backup path. Always call this before MigrateSQLiteToKV so a botched migration +// can be undone. +func BackupSQLite(path string) (string, error) { + dst := fmt.Sprintf("%s.bak.%d", path, time.Now().UnixNano()) + db, err := sql.Open("sqlite", "file:"+path+"?_pragma=busy_timeout(5000)") + if err != nil { + return "", fmt.Errorf("backup: open %q: %w", path, err) + } + defer db.Close() + if err := db.Ping(); err != nil { + return "", fmt.Errorf("backup: ping %q: %w", path, err) + } + // VACUUM INTO writes a fresh, consistent database file; the literal path is + // safely single-quoted (it is operator-supplied, never network input). + if _, err := db.Exec("VACUUM INTO '" + strings.ReplaceAll(dst, "'", "''") + "'"); err != nil { + return "", fmt.Errorf("backup: VACUUM INTO %q: %w", dst, err) + } + return dst, nil +} + +// ---- SQLite export -------------------------------------------------------- + +// ExportSnapshot reads the entire SQLite control-plane state into a Snapshot. +func (s *sqliteStore) ExportSnapshot() (*Snapshot, error) { + snap := &Snapshot{Members: map[string][]Member{}} + + rows, err := s.db.Query(`SELECT room_id, subject, key_epoch, encrypt, persist, sign_msgs, owner_endpoint FROM rooms ORDER BY room_id`) + if err != nil { + return nil, fmt.Errorf("export: query rooms: %w", err) + } + for rows.Next() { + var r RoomInfo + var enc, per, sgn int + if err := rows.Scan(&r.RoomID, &r.Subject, &r.Epoch, &enc, &per, &sgn, &r.OwnerEndpoint); err != nil { + rows.Close() + return nil, fmt.Errorf("export: scan room: %w", err) + } + r.Encrypt, r.Persist, r.SignMsgs = enc != 0, per != 0, sgn != 0 + snap.Rooms = append(snap.Rooms, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return nil, err + } + + mrows, err := s.db.Query(`SELECT room_id, endpoint, role, sign_pub, kex_pub FROM members ORDER BY room_id, endpoint`) + if err != nil { + return nil, fmt.Errorf("export: query members: %w", err) + } + for mrows.Next() { + var roomID string + var m Member + if err := mrows.Scan(&roomID, &m.Endpoint, &m.Role, &m.SignPub, &m.KexPub); err != nil { + mrows.Close() + return nil, fmt.Errorf("export: scan member: %w", err) + } + snap.Members[roomID] = append(snap.Members[roomID], m) + } + mrows.Close() + if err := mrows.Err(); err != nil { + return nil, err + } + + krows, err := s.db.Query(`SELECT room_id, epoch, endpoint, sealed_key FROM room_keys ORDER BY room_id, endpoint, epoch`) + if err != nil { + return nil, fmt.Errorf("export: query room_keys: %w", err) + } + for krows.Next() { + var rec SealedKeyRecord + if err := krows.Scan(&rec.RoomID, &rec.Epoch, &rec.Endpoint, &rec.Sealed); err != nil { + krows.Close() + return nil, fmt.Errorf("export: scan room_key: %w", err) + } + snap.Keys = append(snap.Keys, rec) + } + krows.Close() + if err := krows.Err(); err != nil { + return nil, err + } + + users, err := s.ListUsers() + if err != nil { + return nil, fmt.Errorf("export: list users: %w", err) + } + snap.Users = users + return snap, nil +} diff --git a/pkg/membership/migrate_test.go b/pkg/membership/migrate_test.go new file mode 100644 index 0000000..6ab71da --- /dev/null +++ b/pkg/membership/migrate_test.go @@ -0,0 +1,195 @@ +package membership + +import ( + "path/filepath" + "reflect" + "sort" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// seedSQLite populates a SQLite store with a representative control plane: two +// rooms (one rekeyed to epoch 2 with a removed member's keys left behind), a few +// members and sealed keys, and a user allowlist with one revoked entry. It +// returns the populated *sqliteStore and its file path. +func seedSQLite(t *testing.T) (*sqliteStore, string) { + t.Helper() + path := filepath.Join(t.TempDir(), "seed.db") + s, err := openSQLite(path) + if err != nil { + t.Fatalf("openSQLite: %v", err) + } + + r1 := RoomInfo{RoomID: newULID(), Subject: "room.alpha", Encrypt: true, Persist: true, SignMsgs: true, OwnerEndpoint: "ep-owner1"} + if err := s.CreateRoom(r1, []byte("o1-sign"), []byte("o1-kex"), []byte("o1-sealed-e1")); err != nil { + t.Fatalf("create r1: %v", err) + } + if err := s.AddMember(r1.RoomID, Member{Endpoint: "ep-bob", Role: "member", SignPub: []byte("bob-sign"), KexPub: []byte("bob-kex")}, 1, []byte("bob-sealed-e1")); err != nil { + t.Fatalf("add bob: %v", err) + } + // Rekey r1 to epoch 2 (owner keeps a key at the new epoch). + if err := s.BumpEpoch(r1.RoomID, 2); err != nil { + t.Fatalf("bump: %v", err) + } + if err := s.PutSealedKeys(r1.RoomID, 2, map[string][]byte{"ep-owner1": []byte("o1-sealed-e2")}); err != nil { + t.Fatalf("put keys e2: %v", err) + } + + r2 := RoomInfo{RoomID: newULID(), Subject: "room.beta", Encrypt: false, Persist: false, SignMsgs: false, OwnerEndpoint: "ep-owner2"} + if err := s.CreateRoom(r2, []byte("o2-sign"), []byte("o2-kex"), nil); err != nil { + t.Fatalf("create r2: %v", err) + } + + if err := s.AddUser("aa11", "alice", RoleAdmin); err != nil { + t.Fatalf("add alice: %v", err) + } + if err := s.AddUser("bb22", "bob", RoleMember); err != nil { + t.Fatalf("add bob user: %v", err) + } + if err := s.AddUser("cc33", "carol", RoleMember); err != nil { + t.Fatalf("add carol: %v", err) + } + if err := s.RevokeUser("cc33"); err != nil { + t.Fatalf("revoke carol: %v", err) + } + return s, path +} + +// normalizeSnapshot sorts every slice in a Snapshot so two snapshots from +// different backends can be compared regardless of enumeration order. +func normalizeSnapshot(snap *Snapshot) { + sort.Slice(snap.Rooms, func(i, j int) bool { return snap.Rooms[i].RoomID < snap.Rooms[j].RoomID }) + for _, ms := range snap.Members { + sort.Slice(ms, func(i, j int) bool { return ms[i].Endpoint < ms[j].Endpoint }) + } + sort.Slice(snap.Keys, func(i, j int) bool { + a, b := snap.Keys[i], snap.Keys[j] + if a.RoomID != b.RoomID { + return a.RoomID < b.RoomID + } + if a.Endpoint != b.Endpoint { + return a.Endpoint < b.Endpoint + } + return a.Epoch < b.Epoch + }) + sort.Slice(snap.Users, func(i, j int) bool { return snap.Users[i].SignPub < snap.Users[j].SignPub }) +} + +func newJS(t *testing.T) jetstream.JetStream { + t.Helper() + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), + Host: "127.0.0.1", + Port: kvFreePort(t), + }) + if err != nil { + t.Fatalf("embedded nats: %v", err) + } + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + ns.Shutdown() + t.Fatalf("nats connect: %v", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("jetstream: %v", err) + } + t.Cleanup(func() { nc.Close(); ns.Shutdown(); ns.WaitForShutdown() }) + return js +} + +// TestMigrateSQLiteToKVParity is the parity test the issue mandates: after the +// migration, the KV store holds exactly the SQLite source's state. +func TestMigrateSQLiteToKVParity(t *testing.T) { + src, path := seedSQLite(t) + srcSnap, err := src.ExportSnapshot() + if err != nil { + t.Fatalf("export sqlite: %v", err) + } + src.Close() // release the file before the migration reopens it + + js := newJS(t) + report, err := MigrateSQLiteToKV(path, js, JetStreamConfig{Replicas: 1, OpTimeout: 5 * time.Second}) + if err != nil { + t.Fatalf("migrate: %v", err) + } + if report.Rooms != 2 || report.Users != 3 { + t.Fatalf("report mismatch: %+v", report) + } + + kv, err := OpenJetStream(js, JetStreamConfig{Replicas: 1, OpTimeout: 5 * time.Second}) + if err != nil { + t.Fatalf("open kv: %v", err) + } + kvSnap, err := kv.(*jetstreamStore).ExportSnapshot() + if err != nil { + t.Fatalf("export kv: %v", err) + } + + normalizeSnapshot(srcSnap) + normalizeSnapshot(kvSnap) + if !reflect.DeepEqual(srcSnap, kvSnap) { + t.Fatalf("parity mismatch after migration:\n sqlite=%+v\n kv= %+v", srcSnap, kvSnap) + } +} + +// TestMigrateSQLiteToKVIdempotent: running the migration twice converges to the +// same KV state (every write is an overwrite). A second run must not duplicate +// or corrupt anything. +func TestMigrateSQLiteToKVIdempotent(t *testing.T) { + src, path := seedSQLite(t) + srcSnap, _ := src.ExportSnapshot() + src.Close() + + js := newJS(t) + if _, err := MigrateSQLiteToKV(path, js, JetStreamConfig{Replicas: 1}); err != nil { + t.Fatalf("migrate run 1: %v", err) + } + if _, err := MigrateSQLiteToKV(path, js, JetStreamConfig{Replicas: 1}); err != nil { + t.Fatalf("migrate run 2: %v", err) + } + + kv, _ := OpenJetStream(js, JetStreamConfig{Replicas: 1}) + kvSnap, err := kv.(*jetstreamStore).ExportSnapshot() + if err != nil { + t.Fatalf("export kv: %v", err) + } + normalizeSnapshot(srcSnap) + normalizeSnapshot(kvSnap) + if !reflect.DeepEqual(srcSnap, kvSnap) { + t.Fatalf("idempotency broken: a second migration changed the KV state\n sqlite=%+v\n kv= %+v", srcSnap, kvSnap) + } +} + +// TestBackupSQLiteCreatesConsistentCopy verifies the pre-migration backup is a +// real, openable copy holding the same data. +func TestBackupSQLiteCreatesConsistentCopy(t *testing.T) { + src, path := seedSQLite(t) + srcSnap, _ := src.ExportSnapshot() + src.Close() + + bak, err := BackupSQLite(path) + if err != nil { + t.Fatalf("backup: %v", err) + } + restored, err := openSQLite(bak) + if err != nil { + t.Fatalf("open backup: %v", err) + } + defer restored.Close() + bakSnap, err := restored.ExportSnapshot() + if err != nil { + t.Fatalf("export backup: %v", err) + } + normalizeSnapshot(srcSnap) + normalizeSnapshot(bakSnap) + if !reflect.DeepEqual(srcSnap, bakSnap) { + t.Fatalf("backup is not a faithful copy") + } +}