Files

290 lines
9.2 KiB
Go

// Package membership implements the authoritative control plane of unibus:
// room metadata, member directory, and per-epoch sealed room keys.
//
// The data plane (actual messages) is NATS; this package owns the SQLite-backed
// state that NATS cannot: who is in a room, what their public keys are, and the
// encrypted room key K each member needs to participate at a given epoch.
//
// Migrations are embedded and applied idempotently on Open. The embedded copy
// under pkg/membership/migrations mirrors the module-root migrations/ directory
// (kept in sync); both are additive-only per .claude/rules/db_migrations.md.
package membership
import (
"database/sql"
"embed"
"fmt"
"io/fs"
"sort"
"strings"
"time"
// modernc.org/sqlite registers the pure-Go "sqlite" driver (no CGO).
_ "modernc.org/sqlite"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
// Member is a participant of a room with their published public keys.
type Member struct {
Endpoint string `json:"endpoint"`
Role string `json:"role"`
SignPub []byte `json:"sign_pub"`
KexPub []byte `json:"kex_pub"`
}
// RoomInfo is the metadata of a room.
type RoomInfo struct {
RoomID string
Subject string
Epoch int
Encrypt bool
Persist bool
SignMsgs bool
OwnerEndpoint string
}
// Store is the SQLite-backed membership/key store.
type Store struct {
db *sql.DB
}
// Open opens (creating if needed) the SQLite database at path and applies all
// embedded migrations idempotently.
func Open(path string) (*Store, error) {
// _pragma busy_timeout avoids spurious "database is locked" under concurrent
// HTTP handlers; foreign_keys kept off — we manage referential integrity in code.
dsn := fmt.Sprintf("file:%s?_pragma=busy_timeout(5000)&_pragma=journal_mode(WAL)", path)
db, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, fmt.Errorf("membership: open db: %w", err)
}
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("membership: ping db: %w", err)
}
s := &Store{db: db}
if err := s.applyMigrations(); err != nil {
db.Close()
return nil, err
}
return s, nil
}
// Close closes the underlying database.
func (s *Store) Close() error { return s.db.Close() }
// applyMigrations runs every embedded migration in lexical order, tolerating
// the "already applied" errors that SQLite's non-idempotent DDL produces.
func (s *Store) applyMigrations() error {
files, err := fs.Glob(migrationsFS, "migrations/*.sql")
if err != nil {
return fmt.Errorf("membership: glob migrations: %w", err)
}
sort.Strings(files)
for _, f := range files {
b, err := migrationsFS.ReadFile(f)
if err != nil {
return fmt.Errorf("membership: read %s: %w", f, err)
}
if _, err := s.db.Exec(string(b)); err != nil {
msg := err.Error()
if !strings.Contains(msg, "duplicate column") && !strings.Contains(msg, "already exists") {
return fmt.Errorf("membership: apply %s: %w", f, err)
}
}
}
return nil
}
func nowRFC3339() string { return time.Now().UTC().Format(time.RFC3339Nano) }
// CreateRoom inserts a room at epoch 1, registers the owner as a member with
// role "owner", and stores the owner's sealed key for epoch 1. Idempotent
// inserts are not used: a duplicate room_id returns an error.
func (s *Store) CreateRoom(info RoomInfo, ownerSignPub, ownerKexPub, ownerSealedKey []byte) error {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("membership: begin: %w", err)
}
defer tx.Rollback()
now := nowRFC3339()
if _, err := tx.Exec(
`INSERT INTO rooms (room_id, subject, key_epoch, encrypt, persist, sign_msgs, owner_endpoint, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
info.RoomID, info.Subject, 1,
b2i(info.Encrypt), b2i(info.Persist), b2i(info.SignMsgs),
info.OwnerEndpoint, now,
); err != nil {
return fmt.Errorf("membership: insert room: %w", err)
}
if _, err := tx.Exec(
`INSERT INTO members (room_id, endpoint, role, joined_at, sign_pub, kex_pub)
VALUES (?, ?, 'owner', ?, ?, ?)`,
info.RoomID, info.OwnerEndpoint, now, ownerSignPub, ownerKexPub,
); err != nil {
return fmt.Errorf("membership: insert owner member: %w", err)
}
if info.Encrypt {
if _, err := tx.Exec(
`INSERT INTO room_keys (room_id, epoch, endpoint, sealed_key) VALUES (?, 1, ?, ?)`,
info.RoomID, info.OwnerEndpoint, ownerSealedKey,
); err != nil {
return fmt.Errorf("membership: insert owner key: %w", err)
}
}
return tx.Commit()
}
// GetRoom returns room metadata (including current epoch).
func (s *Store) GetRoom(roomID string) (RoomInfo, error) {
var info RoomInfo
var enc, per, sgn int
err := s.db.QueryRow(
`SELECT room_id, subject, key_epoch, encrypt, persist, sign_msgs, owner_endpoint
FROM rooms WHERE room_id = ?`, roomID,
).Scan(&info.RoomID, &info.Subject, &info.Epoch, &enc, &per, &sgn, &info.OwnerEndpoint)
if err != nil {
return RoomInfo{}, fmt.Errorf("membership: get room %q: %w", roomID, err)
}
info.Encrypt, info.Persist, info.SignMsgs = enc != 0, per != 0, sgn != 0
return info, nil
}
// AddMember inserts a member at the given role and stores their sealed key for
// the supplied epoch.
func (s *Store) AddMember(roomID string, m Member, epoch int, sealedKey []byte) error {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("membership: begin: %w", err)
}
defer tx.Rollback()
now := nowRFC3339()
if _, err := tx.Exec(
`INSERT INTO members (room_id, endpoint, role, joined_at, sign_pub, kex_pub)
VALUES (?, ?, ?, ?, ?, ?)`,
roomID, m.Endpoint, m.Role, now, m.SignPub, m.KexPub,
); err != nil {
return fmt.Errorf("membership: insert member: %w", err)
}
if len(sealedKey) > 0 {
if _, err := tx.Exec(
`INSERT INTO room_keys (room_id, epoch, endpoint, sealed_key) VALUES (?, ?, ?, ?)`,
roomID, epoch, m.Endpoint, sealedKey,
); err != nil {
return fmt.Errorf("membership: insert member key: %w", err)
}
}
return tx.Commit()
}
// GetMember returns a single member of a room.
func (s *Store) GetMember(roomID, endpoint string) (Member, error) {
var m Member
err := s.db.QueryRow(
`SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? AND endpoint = ?`,
roomID, endpoint,
).Scan(&m.Endpoint, &m.Role, &m.SignPub, &m.KexPub)
if err != nil {
return Member{}, fmt.Errorf("membership: get member %q/%q: %w", roomID, endpoint, err)
}
return m, nil
}
// ListMembers returns all members of a room ordered by endpoint.
func (s *Store) ListMembers(roomID string) ([]Member, error) {
rows, err := s.db.Query(
`SELECT endpoint, role, sign_pub, kex_pub FROM members WHERE room_id = ? ORDER BY endpoint`,
roomID,
)
if err != nil {
return nil, fmt.Errorf("membership: list members %q: %w", roomID, err)
}
defer rows.Close()
var out []Member
for rows.Next() {
var m Member
if err := rows.Scan(&m.Endpoint, &m.Role, &m.SignPub, &m.KexPub); err != nil {
return nil, fmt.Errorf("membership: scan member: %w", err)
}
out = append(out, m)
}
return out, rows.Err()
}
// GetSealedKey returns the sealed room key for an endpoint at a given epoch.
// If epoch <= 0, the latest epoch for that endpoint is returned.
func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
var ep int
var sealed []byte
var err error
if epoch <= 0 {
err = s.db.QueryRow(
`SELECT epoch, sealed_key FROM room_keys
WHERE room_id = ? AND endpoint = ? ORDER BY epoch DESC LIMIT 1`,
roomID, endpoint,
).Scan(&ep, &sealed)
} else {
err = s.db.QueryRow(
`SELECT epoch, sealed_key FROM room_keys
WHERE room_id = ? AND endpoint = ? AND epoch = ?`,
roomID, endpoint, epoch,
).Scan(&ep, &sealed)
}
if err != nil {
return 0, nil, fmt.Errorf("membership: get sealed key %q/%q@%d: %w", roomID, endpoint, epoch, err)
}
return ep, sealed, nil
}
// PutSealedKeys stores a batch of sealed keys for the given epoch (endpoint ->
// sealed bytes), upserting on conflict so a rekey can overwrite stale entries.
func (s *Store) PutSealedKeys(roomID string, epoch int, keys map[string][]byte) error {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("membership: begin: %w", err)
}
defer tx.Rollback()
for endpoint, sealed := range keys {
if _, err := tx.Exec(
`INSERT INTO room_keys (room_id, epoch, endpoint, sealed_key) VALUES (?, ?, ?, ?)
ON CONFLICT(room_id, epoch, endpoint) DO UPDATE SET sealed_key = excluded.sealed_key`,
roomID, epoch, endpoint, sealed,
); err != nil {
return fmt.Errorf("membership: put sealed key for %q: %w", endpoint, err)
}
}
return tx.Commit()
}
// BumpEpoch sets the room's current key_epoch to newEpoch.
func (s *Store) BumpEpoch(roomID string, newEpoch int) error {
if _, err := s.db.Exec(`UPDATE rooms SET key_epoch = ? WHERE room_id = ?`, newEpoch, roomID); err != nil {
return fmt.Errorf("membership: bump epoch %q->%d: %w", roomID, newEpoch, err)
}
return nil
}
// RemoveMember deletes a member from a room. Their sealed keys for past epochs
// are left intact (they encrypt only data that member could already read).
func (s *Store) RemoveMember(roomID, endpoint string) error {
if _, err := s.db.Exec(`DELETE FROM members WHERE room_id = ? AND endpoint = ?`, roomID, endpoint); err != nil {
return fmt.Errorf("membership: remove member %q/%q: %w", roomID, endpoint, err)
}
return nil
}
func b2i(b bool) int {
if b {
return 1
}
return 0
}