feat(0003c): membershipd migrate-to-kv (idempotent SQLite -> JetStream KV)

The one-time data move decentralization needs (issue 0003c): copy the
entire control-plane state from the local SQLite database into the
replicated JetStream KV buckets, with a backup taken first.

pkg/membership:
- Snapshot / SealedKeyRecord: a backend-agnostic dump of the whole
  control plane (rooms with their real epoch, members, every sealed-key
  row across epochs, users with status).
- (*sqliteStore).ExportSnapshot and (*jetstreamStore).ExportSnapshot read
  a full Snapshot from each backend; (*jetstreamStore).importSnapshot
  writes one with raw Puts (preserving epoch/status, not resetting to
  defaults) so the migration is faithful and idempotent (every write is
  an overwrite, so re-running converges).
- MigrateSQLiteToKV orchestrates export -> import; BackupSQLite makes a
  consistent copy via SQLite's VACUUM INTO before any migration.

cmd/membershipd:
- `membershipd migrate-to-kv --db <path> --nats-url <url> [--replicas N]
  [--ca <cert>] [--no-backup]` backs up the SQLite file, connects to the
  cluster's NATS, and migrates. Dispatched on the host like `user`.

Tests (DoD: golden + edge + parity):
- TestMigrateSQLiteToKVParity: seed a representative SQLite (two rooms,
  one rekeyed to epoch 2, members, a revoked user); after migration the
  KV ExportSnapshot equals the SQLite ExportSnapshot.
- TestMigrateSQLiteToKVIdempotent: running the migration twice yields the
  same KV state.
- TestBackupSQLiteCreatesConsistentCopy: the backup reopens with
  identical data.
Plus a binary smoke (seed user -> run server -> migrate-to-kv -> re-run):
backup written, 1 user migrated, second run identical.
This commit is contained in:
agent
2026-06-07 15:09:56 +02:00
parent b8c9b2b652
commit 9013ea5e33
5 changed files with 588 additions and 0 deletions
+7
View File
@@ -33,6 +33,13 @@ func main() {
runUserCLI(os.Args[2:])
return
}
// `membershipd migrate-to-kv` is the one-time, idempotent SQLite->JetStream KV
// data move for decentralization (issue 0003c). Like the user CLI it runs on
// the host and is dispatched before the server flag set parses os.Args.
if len(os.Args) > 1 && os.Args[1] == "migrate-to-kv" {
runMigrateCLI(os.Args[2:])
return
}
var (
bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers")
+87
View File
@@ -0,0 +1,87 @@
package main
import (
"flag"
"fmt"
"os"
"time"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// runMigrateCLI implements `membershipd migrate-to-kv`, the idempotent move of
// the control-plane state from the local SQLite database into replicated
// JetStream KV (issue 0003c). It backs up the SQLite file first (VACUUM INTO),
// then connects to the target NATS and copies every room/member/key/user into
// the KV buckets. Re-running it converges to the same state.
//
// It runs on the bus host (no auth on the control-plane side), connecting to the
// cluster's NATS; --ca pins TLS when the data plane is secured.
func runMigrateCLI(args []string) {
fs := flag.NewFlagSet("migrate-to-kv", flag.ExitOnError)
dbPath := fs.String("db", defaultDBPath, "SQLite database path to migrate FROM")
natsURL := fs.String("nats-url", "", "NATS url of the cluster to migrate INTO (required)")
ca := fs.String("ca", "", "CA cert to pin TLS on the NATS connection (optional)")
replicas := fs.Int("replicas", 1, "KV replication factor (1 for a 1-2 node rollout, 3 for HA quorum)")
noBackup := fs.Bool("no-backup", false, "skip the SQLite backup before migrating (NOT recommended)")
_ = fs.Parse(args)
if *natsURL == "" {
fmt.Fprintln(os.Stderr, "membershipd migrate-to-kv: --nats-url is required (the cluster to write the KV buckets into)")
os.Exit(2)
}
// Back up the SQLite database first so a botched migration can be undone.
var backupPath string
if !*noBackup {
bak, err := membership.BackupSQLite(*dbPath)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: backup failed: %v\n", err)
os.Exit(1)
}
backupPath = bak
fmt.Printf("backed up %s -> %s\n", *dbPath, backupPath)
}
// Connect to the target NATS (optionally TLS-pinned to the bus CA).
natsOpts := []nats.Option{nats.Name("unibus-migrate")}
if *ca != "" {
tlsCfg, err := busauth.LoadCATLSConfig(*ca)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: load CA: %v\n", err)
os.Exit(1)
}
natsOpts = append(natsOpts, nats.Secure(tlsCfg))
}
nc, err := nats.Connect(*natsURL, natsOpts...)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: connect %q: %v\n", *natsURL, err)
os.Exit(1)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: jetstream: %v\n", err)
os.Exit(1)
}
report, err := membership.MigrateSQLiteToKV(*dbPath, js, membership.JetStreamConfig{
Replicas: *replicas,
OpTimeout: 30 * time.Second,
})
if err != nil {
fmt.Fprintf(os.Stderr, "membershipd migrate-to-kv: %v\n", err)
os.Exit(1)
}
report.BackupPath = backupPath
fmt.Printf("migrated to KV (replicas=%d): %d rooms, %d members, %d keys, %d users\n",
*replicas, report.Rooms, report.Members, report.Keys, report.Users)
if backupPath != "" {
fmt.Printf("rollback: restore %s if needed\n", backupPath)
}
}
+123
View File
@@ -37,6 +37,7 @@ import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go/jetstream"
@@ -508,3 +509,125 @@ func (s *jetstreamStore) HasAdmin() bool {
}
return false
}
// ---- snapshot import / export (issue 0003c migration) ---------------------
// importSnapshot writes a full Snapshot into the KV buckets, preserving each
// room's epoch and each user's status (Put, not CreateRoom/AddUser, so the exact
// state is reproduced rather than reset to defaults). Idempotent: every write is
// an overwrite, so re-running the migration converges.
func (s *jetstreamStore) importSnapshot(snap *Snapshot) error {
ctx, cancel := s.ctx()
defer cancel()
for _, r := range snap.Rooms {
b, err := json.Marshal(r)
if err != nil {
return fmt.Errorf("import: marshal room %q: %w", r.RoomID, err)
}
if _, err := s.rooms.Put(ctx, r.RoomID, b); err != nil {
return fmt.Errorf("import: put room %q: %w", r.RoomID, err)
}
}
for roomID, members := range snap.Members {
for _, m := range members {
if err := s.putMember(ctx, roomID, m); err != nil {
return fmt.Errorf("import: %w", err)
}
}
}
for _, rec := range snap.Keys {
if _, err := s.keys.Put(ctx, sealedKey(rec.RoomID, rec.Endpoint, rec.Epoch), rec.Sealed); err != nil {
return fmt.Errorf("import: put key %q/%q@%d: %w", rec.RoomID, rec.Endpoint, rec.Epoch, err)
}
}
for _, u := range snap.Users {
b, err := json.Marshal(u)
if err != nil {
return fmt.Errorf("import: marshal user %q: %w", u.SignPub, err)
}
if _, err := s.users.Put(ctx, normalizeSignPub(u.SignPub), b); err != nil {
return fmt.Errorf("import: put user %q: %w", u.SignPub, err)
}
}
return nil
}
// ExportSnapshot reads the entire KV control-plane state back into a Snapshot,
// so the migration's parity test can compare it against the SQLite source.
func (s *jetstreamStore) ExportSnapshot() (*Snapshot, error) {
snap := &Snapshot{Members: map[string][]Member{}}
roomEntries, err := s.watchAll(s.rooms)
if err != nil {
return nil, fmt.Errorf("export kv: rooms: %w", err)
}
for _, e := range roomEntries {
var r RoomInfo
if err := json.Unmarshal(e.Value(), &r); err != nil {
return nil, fmt.Errorf("export kv: unmarshal room: %w", err)
}
snap.Rooms = append(snap.Rooms, r)
}
memberEntries, err := s.watchAll(s.members)
if err != nil {
return nil, fmt.Errorf("export kv: members: %w", err)
}
for _, e := range memberEntries {
// Key is "<roomID>.<endpoint>"; neither segment contains a dot.
roomID := strings.SplitN(e.Key(), ".", 2)[0]
var m Member
if err := json.Unmarshal(e.Value(), &m); err != nil {
return nil, fmt.Errorf("export kv: unmarshal member: %w", err)
}
snap.Members[roomID] = append(snap.Members[roomID], m)
}
keyEntries, err := s.watchAll(s.keys)
if err != nil {
return nil, fmt.Errorf("export kv: keys: %w", err)
}
for _, e := range keyEntries {
// Key is "<roomID>.<endpoint>.<epoch>".
parts := strings.Split(e.Key(), ".")
if len(parts) != 3 {
continue
}
epoch, err := strconv.Atoi(parts[2])
if err != nil {
continue
}
snap.Keys = append(snap.Keys, SealedKeyRecord{RoomID: parts[0], Endpoint: parts[1], Epoch: epoch, Sealed: e.Value()})
}
users, err := s.ListUsers()
if err != nil {
return nil, fmt.Errorf("export kv: users: %w", err)
}
snap.Users = users
return snap, nil
}
// watchAll collects every current entry of a bucket (no key filter), draining
// the watcher to its initial-snapshot nil marker.
func (s *jetstreamStore) watchAll(kv jetstream.KeyValue) ([]jetstream.KeyValueEntry, error) {
ctx, cancel := s.ctx()
defer cancel()
w, err := kv.WatchAll(ctx, jetstream.IgnoreDeletes())
if err != nil {
return nil, err
}
defer w.Stop()
var out []jetstream.KeyValueEntry
for {
select {
case e := <-w.Updates():
if e == nil {
return out, nil
}
out = append(out, e)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
+176
View File
@@ -0,0 +1,176 @@
package membership
// Migration from the local SQLite control plane to replicated JetStream KV
// (issue 0003c). It is the one-time, idempotent data move that decentralization
// needs: read the entire SQLite state, write it into the KV buckets. Re-running
// it is safe (every KV write is an overwrite), so a partial/interrupted run is
// recovered by running again, and a parity test can assert the two stores hold
// the same state before and after.
import (
"database/sql"
"fmt"
"strings"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// SealedKeyRecord is one row of room_keys: the sealed room key for an endpoint
// at a given epoch. It is the unit the snapshot carries so a backend can be
// imported with the exact epoch history (CreateRoom/AddMember alone could not
// reproduce a multi-epoch room).
type SealedKeyRecord struct {
RoomID string
Endpoint string
Epoch int
Sealed []byte
}
// Snapshot is the complete control-plane state, backend-agnostic. It is what
// ExportSnapshot produces and importSnapshot consumes, so the SQLite->KV
// migration and the parity test both work in terms of it.
type Snapshot struct {
Rooms []RoomInfo
Members map[string][]Member // roomID -> members
Keys []SealedKeyRecord
Users []User
}
// MigrateReport summarizes what a migration moved, for the operator log.
type MigrateReport struct {
BackupPath string
Rooms int
Members int
Keys int
Users int
}
// MigrateSQLiteToKV reads the SQLite store at sqlitePath and writes its entire
// state into the JetStream KV buckets on js (created with cfg.Replicas). It is
// idempotent: re-running converges to the same state. The caller is responsible
// for backing up the SQLite file first (BackupSQLite) — this function only
// reads it.
func MigrateSQLiteToKV(sqlitePath string, js jetstream.JetStream, cfg JetStreamConfig) (*MigrateReport, error) {
src, err := openSQLite(sqlitePath)
if err != nil {
return nil, fmt.Errorf("migrate: open sqlite %q: %w", sqlitePath, err)
}
defer src.Close()
snap, err := src.ExportSnapshot()
if err != nil {
return nil, fmt.Errorf("migrate: export sqlite: %w", err)
}
dst, err := OpenJetStream(js, cfg)
if err != nil {
return nil, fmt.Errorf("migrate: open kv: %w", err)
}
kv := dst.(*jetstreamStore)
if err := kv.importSnapshot(snap); err != nil {
return nil, fmt.Errorf("migrate: import to kv: %w", err)
}
members := 0
for _, ms := range snap.Members {
members += len(ms)
}
return &MigrateReport{
Rooms: len(snap.Rooms),
Members: members,
Keys: len(snap.Keys),
Users: len(snap.Users),
}, nil
}
// BackupSQLite makes a consistent copy of the SQLite database next to it,
// named "<path>.bak.<unixnano>", using SQLite's own VACUUM INTO (which writes a
// transactionally-consistent snapshot even with a live WAL). It returns the
// backup path. Always call this before MigrateSQLiteToKV so a botched migration
// can be undone.
func BackupSQLite(path string) (string, error) {
dst := fmt.Sprintf("%s.bak.%d", path, time.Now().UnixNano())
db, err := sql.Open("sqlite", "file:"+path+"?_pragma=busy_timeout(5000)")
if err != nil {
return "", fmt.Errorf("backup: open %q: %w", path, err)
}
defer db.Close()
if err := db.Ping(); err != nil {
return "", fmt.Errorf("backup: ping %q: %w", path, err)
}
// VACUUM INTO writes a fresh, consistent database file; the literal path is
// safely single-quoted (it is operator-supplied, never network input).
if _, err := db.Exec("VACUUM INTO '" + strings.ReplaceAll(dst, "'", "''") + "'"); err != nil {
return "", fmt.Errorf("backup: VACUUM INTO %q: %w", dst, err)
}
return dst, nil
}
// ---- SQLite export --------------------------------------------------------
// ExportSnapshot reads the entire SQLite control-plane state into a Snapshot.
func (s *sqliteStore) ExportSnapshot() (*Snapshot, error) {
snap := &Snapshot{Members: map[string][]Member{}}
rows, err := s.db.Query(`SELECT room_id, subject, key_epoch, encrypt, persist, sign_msgs, owner_endpoint FROM rooms ORDER BY room_id`)
if err != nil {
return nil, fmt.Errorf("export: query rooms: %w", err)
}
for rows.Next() {
var r RoomInfo
var enc, per, sgn int
if err := rows.Scan(&r.RoomID, &r.Subject, &r.Epoch, &enc, &per, &sgn, &r.OwnerEndpoint); err != nil {
rows.Close()
return nil, fmt.Errorf("export: scan room: %w", err)
}
r.Encrypt, r.Persist, r.SignMsgs = enc != 0, per != 0, sgn != 0
snap.Rooms = append(snap.Rooms, r)
}
rows.Close()
if err := rows.Err(); err != nil {
return nil, err
}
mrows, err := s.db.Query(`SELECT room_id, endpoint, role, sign_pub, kex_pub FROM members ORDER BY room_id, endpoint`)
if err != nil {
return nil, fmt.Errorf("export: query members: %w", err)
}
for mrows.Next() {
var roomID string
var m Member
if err := mrows.Scan(&roomID, &m.Endpoint, &m.Role, &m.SignPub, &m.KexPub); err != nil {
mrows.Close()
return nil, fmt.Errorf("export: scan member: %w", err)
}
snap.Members[roomID] = append(snap.Members[roomID], m)
}
mrows.Close()
if err := mrows.Err(); err != nil {
return nil, err
}
krows, err := s.db.Query(`SELECT room_id, epoch, endpoint, sealed_key FROM room_keys ORDER BY room_id, endpoint, epoch`)
if err != nil {
return nil, fmt.Errorf("export: query room_keys: %w", err)
}
for krows.Next() {
var rec SealedKeyRecord
if err := krows.Scan(&rec.RoomID, &rec.Epoch, &rec.Endpoint, &rec.Sealed); err != nil {
krows.Close()
return nil, fmt.Errorf("export: scan room_key: %w", err)
}
snap.Keys = append(snap.Keys, rec)
}
krows.Close()
if err := krows.Err(); err != nil {
return nil, err
}
users, err := s.ListUsers()
if err != nil {
return nil, fmt.Errorf("export: list users: %w", err)
}
snap.Users = users
return snap, nil
}
+195
View File
@@ -0,0 +1,195 @@
package membership
import (
"path/filepath"
"reflect"
"sort"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// seedSQLite populates a SQLite store with a representative control plane: two
// rooms (one rekeyed to epoch 2 with a removed member's keys left behind), a few
// members and sealed keys, and a user allowlist with one revoked entry. It
// returns the populated *sqliteStore and its file path.
func seedSQLite(t *testing.T) (*sqliteStore, string) {
t.Helper()
path := filepath.Join(t.TempDir(), "seed.db")
s, err := openSQLite(path)
if err != nil {
t.Fatalf("openSQLite: %v", err)
}
r1 := RoomInfo{RoomID: newULID(), Subject: "room.alpha", Encrypt: true, Persist: true, SignMsgs: true, OwnerEndpoint: "ep-owner1"}
if err := s.CreateRoom(r1, []byte("o1-sign"), []byte("o1-kex"), []byte("o1-sealed-e1")); err != nil {
t.Fatalf("create r1: %v", err)
}
if err := s.AddMember(r1.RoomID, Member{Endpoint: "ep-bob", Role: "member", SignPub: []byte("bob-sign"), KexPub: []byte("bob-kex")}, 1, []byte("bob-sealed-e1")); err != nil {
t.Fatalf("add bob: %v", err)
}
// Rekey r1 to epoch 2 (owner keeps a key at the new epoch).
if err := s.BumpEpoch(r1.RoomID, 2); err != nil {
t.Fatalf("bump: %v", err)
}
if err := s.PutSealedKeys(r1.RoomID, 2, map[string][]byte{"ep-owner1": []byte("o1-sealed-e2")}); err != nil {
t.Fatalf("put keys e2: %v", err)
}
r2 := RoomInfo{RoomID: newULID(), Subject: "room.beta", Encrypt: false, Persist: false, SignMsgs: false, OwnerEndpoint: "ep-owner2"}
if err := s.CreateRoom(r2, []byte("o2-sign"), []byte("o2-kex"), nil); err != nil {
t.Fatalf("create r2: %v", err)
}
if err := s.AddUser("aa11", "alice", RoleAdmin); err != nil {
t.Fatalf("add alice: %v", err)
}
if err := s.AddUser("bb22", "bob", RoleMember); err != nil {
t.Fatalf("add bob user: %v", err)
}
if err := s.AddUser("cc33", "carol", RoleMember); err != nil {
t.Fatalf("add carol: %v", err)
}
if err := s.RevokeUser("cc33"); err != nil {
t.Fatalf("revoke carol: %v", err)
}
return s, path
}
// normalizeSnapshot sorts every slice in a Snapshot so two snapshots from
// different backends can be compared regardless of enumeration order.
func normalizeSnapshot(snap *Snapshot) {
sort.Slice(snap.Rooms, func(i, j int) bool { return snap.Rooms[i].RoomID < snap.Rooms[j].RoomID })
for _, ms := range snap.Members {
sort.Slice(ms, func(i, j int) bool { return ms[i].Endpoint < ms[j].Endpoint })
}
sort.Slice(snap.Keys, func(i, j int) bool {
a, b := snap.Keys[i], snap.Keys[j]
if a.RoomID != b.RoomID {
return a.RoomID < b.RoomID
}
if a.Endpoint != b.Endpoint {
return a.Endpoint < b.Endpoint
}
return a.Epoch < b.Epoch
})
sort.Slice(snap.Users, func(i, j int) bool { return snap.Users[i].SignPub < snap.Users[j].SignPub })
}
func newJS(t *testing.T) jetstream.JetStream {
t.Helper()
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: kvFreePort(t),
})
if err != nil {
t.Fatalf("embedded nats: %v", err)
}
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
ns.Shutdown()
t.Fatalf("nats connect: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("jetstream: %v", err)
}
t.Cleanup(func() { nc.Close(); ns.Shutdown(); ns.WaitForShutdown() })
return js
}
// TestMigrateSQLiteToKVParity is the parity test the issue mandates: after the
// migration, the KV store holds exactly the SQLite source's state.
func TestMigrateSQLiteToKVParity(t *testing.T) {
src, path := seedSQLite(t)
srcSnap, err := src.ExportSnapshot()
if err != nil {
t.Fatalf("export sqlite: %v", err)
}
src.Close() // release the file before the migration reopens it
js := newJS(t)
report, err := MigrateSQLiteToKV(path, js, JetStreamConfig{Replicas: 1, OpTimeout: 5 * time.Second})
if err != nil {
t.Fatalf("migrate: %v", err)
}
if report.Rooms != 2 || report.Users != 3 {
t.Fatalf("report mismatch: %+v", report)
}
kv, err := OpenJetStream(js, JetStreamConfig{Replicas: 1, OpTimeout: 5 * time.Second})
if err != nil {
t.Fatalf("open kv: %v", err)
}
kvSnap, err := kv.(*jetstreamStore).ExportSnapshot()
if err != nil {
t.Fatalf("export kv: %v", err)
}
normalizeSnapshot(srcSnap)
normalizeSnapshot(kvSnap)
if !reflect.DeepEqual(srcSnap, kvSnap) {
t.Fatalf("parity mismatch after migration:\n sqlite=%+v\n kv= %+v", srcSnap, kvSnap)
}
}
// TestMigrateSQLiteToKVIdempotent: running the migration twice converges to the
// same KV state (every write is an overwrite). A second run must not duplicate
// or corrupt anything.
func TestMigrateSQLiteToKVIdempotent(t *testing.T) {
src, path := seedSQLite(t)
srcSnap, _ := src.ExportSnapshot()
src.Close()
js := newJS(t)
if _, err := MigrateSQLiteToKV(path, js, JetStreamConfig{Replicas: 1}); err != nil {
t.Fatalf("migrate run 1: %v", err)
}
if _, err := MigrateSQLiteToKV(path, js, JetStreamConfig{Replicas: 1}); err != nil {
t.Fatalf("migrate run 2: %v", err)
}
kv, _ := OpenJetStream(js, JetStreamConfig{Replicas: 1})
kvSnap, err := kv.(*jetstreamStore).ExportSnapshot()
if err != nil {
t.Fatalf("export kv: %v", err)
}
normalizeSnapshot(srcSnap)
normalizeSnapshot(kvSnap)
if !reflect.DeepEqual(srcSnap, kvSnap) {
t.Fatalf("idempotency broken: a second migration changed the KV state\n sqlite=%+v\n kv= %+v", srcSnap, kvSnap)
}
}
// TestBackupSQLiteCreatesConsistentCopy verifies the pre-migration backup is a
// real, openable copy holding the same data.
func TestBackupSQLiteCreatesConsistentCopy(t *testing.T) {
src, path := seedSQLite(t)
srcSnap, _ := src.ExportSnapshot()
src.Close()
bak, err := BackupSQLite(path)
if err != nil {
t.Fatalf("backup: %v", err)
}
restored, err := openSQLite(bak)
if err != nil {
t.Fatalf("open backup: %v", err)
}
defer restored.Close()
bakSnap, err := restored.ExportSnapshot()
if err != nil {
t.Fatalf("export backup: %v", err)
}
normalizeSnapshot(srcSnap)
normalizeSnapshot(bakSnap)
if !reflect.DeepEqual(srcSnap, bakSnap) {
t.Fatalf("backup is not a faithful copy")
}
}