diff --git a/fn_operations/db.go b/fn_operations/db.go new file mode 100644 index 00000000..0ce57f2a --- /dev/null +++ b/fn_operations/db.go @@ -0,0 +1,140 @@ +package fn_operations + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + + _ "github.com/mattn/go-sqlite3" +) + +const schemaSQL = ` +CREATE TABLE IF NOT EXISTS types_snapshot ( + id TEXT PRIMARY KEY, + version TEXT NOT NULL DEFAULT '1.0.0', + lang TEXT NOT NULL, + algebraic TEXT NOT NULL CHECK(algebraic IN ('product','sum')), + definition TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + snapped_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS entities ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + type_ref TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active','stale','corrupted','archived')), + description TEXT NOT NULL DEFAULT '', + domain TEXT NOT NULL DEFAULT '', + tags TEXT NOT NULL DEFAULT '[]', + source TEXT NOT NULL, + metadata TEXT NOT NULL DEFAULT '{}', + notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS relations ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + from_entity TEXT NOT NULL DEFAULT '', + to_entity TEXT NOT NULL, + via TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + purity TEXT NOT NULL DEFAULT '' CHECK(purity IN ('','pure','impure')), + direction TEXT NOT NULL DEFAULT 'unidirectional' CHECK(direction IN ('unidirectional','bidirectional','inverse')), + weight REAL, + status TEXT NOT NULL DEFAULT 'designed' CHECK(status IN ('designed','implemented','tested','running','deprecated')), + started_at TEXT, + ended_at TEXT, + "order" INTEGER, + tags TEXT NOT NULL DEFAULT '[]', + notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS relation_inputs ( + id TEXT PRIMARY KEY, + relation_id TEXT NOT NULL REFERENCES relations(id) ON DELETE CASCADE, + entity_id TEXT NOT NULL REFERENCES entities(id), + role TEXT NOT NULL, + "order" INTEGER +); + +CREATE VIRTUAL TABLE IF NOT EXISTS entities_fts USING fts5( + id, + name, + description, + tags, + domain, + content='entities', + content_rowid='rowid' +); + +-- Triggers to keep entities FTS in sync +CREATE TRIGGER IF NOT EXISTS entities_ai AFTER INSERT ON entities BEGIN + INSERT INTO entities_fts(rowid, id, name, description, tags, domain) + VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain); +END; + +CREATE TRIGGER IF NOT EXISTS entities_ad AFTER DELETE ON entities BEGIN + INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags, domain) + VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain); +END; + +CREATE TRIGGER IF NOT EXISTS entities_au AFTER UPDATE ON entities BEGIN + INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags, domain) + VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain); + INSERT INTO entities_fts(rowid, id, name, description, tags, domain) + VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain); +END; +` + +// DB wraps a SQLite connection for an operations database. +type DB struct { + conn *sql.DB + path string +} + +// Open opens or creates an operations database at the given path. +func Open(path string) (*DB, error) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("creating db directory: %w", err) + } + + conn, err := sql.Open("sqlite3", path+"?_foreign_keys=on") + if err != nil { + return nil, fmt.Errorf("opening database: %w", err) + } + + if _, err := conn.Exec("PRAGMA journal_mode=WAL"); err != nil { + conn.Close() + return nil, fmt.Errorf("setting WAL mode: %w", err) + } + + if _, err := conn.Exec(schemaSQL); err != nil { + conn.Close() + return nil, fmt.Errorf("applying schema: %w", err) + } + + return &DB{conn: conn, path: path}, nil +} + +// Conn returns the underlying sql.DB for transaction use. +func (db *DB) Conn() *sql.DB { + return db.conn +} + +// Close closes the database connection. +func (db *DB) Close() error { + return db.conn.Close() +} + +// Drop removes the database file. +func (db *DB) Drop() error { + db.Close() + return os.Remove(db.path) +} diff --git a/fn_operations/models.go b/fn_operations/models.go new file mode 100644 index 00000000..2eefeab3 --- /dev/null +++ b/fn_operations/models.go @@ -0,0 +1,90 @@ +package fn_operations + +import "time" + +// EntityStatus represents the lifecycle state of an entity. +type EntityStatus string + +const ( + StatusActive EntityStatus = "active" + StatusStale EntityStatus = "stale" + StatusCorrupted EntityStatus = "corrupted" + StatusArchived EntityStatus = "archived" +) + +// RelationStatus represents the lifecycle state of a relation. +type RelationStatus string + +const ( + RelDesigned RelationStatus = "designed" + RelImplemented RelationStatus = "implemented" + RelTested RelationStatus = "tested" + RelRunning RelationStatus = "running" + RelDeprecated RelationStatus = "deprecated" +) + +// Direction represents the directionality of a relation. +type Direction string + +const ( + DirUnidirectional Direction = "unidirectional" + DirBidirectional Direction = "bidirectional" + DirInverse Direction = "inverse" +) + +// Entity is a concrete instance of a registry type within a project context. +type Entity struct { + ID string `json:"id"` + Name string `json:"name"` + TypeRef string `json:"type_ref"` + Status EntityStatus `json:"status"` + Description string `json:"description"` + Domain string `json:"domain"` + Tags []string `json:"tags"` + Source string `json:"source"` + Metadata map[string]any `json:"metadata"` + Notes string `json:"notes"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// Relation describes how one entity connects to or transforms into another. +type Relation struct { + ID string `json:"id"` + Name string `json:"name"` + FromEntity string `json:"from_entity"` + ToEntity string `json:"to_entity"` + Via string `json:"via"` + Description string `json:"description"` + Purity string `json:"purity"` + Direction Direction `json:"direction"` + Weight *float64 `json:"weight"` + Status RelationStatus `json:"status"` + StartedAt *time.Time `json:"started_at"` + EndedAt *time.Time `json:"ended_at"` + Order *int `json:"order"` + Tags []string `json:"tags"` + Notes string `json:"notes"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// RelationInput represents one input entity in a multi-input relation. +type RelationInput struct { + ID string `json:"id"` + RelationID string `json:"relation_id"` + EntityID string `json:"entity_id"` + Role string `json:"role"` + Order *int `json:"order"` +} + +// TypeSnapshot is an immutable copy of a registry type at point of use. +type TypeSnapshot struct { + ID string `json:"id"` + Version string `json:"version"` + Lang string `json:"lang"` + Algebraic string `json:"algebraic"` + Definition string `json:"definition"` + Description string `json:"description"` + SnappedAt time.Time `json:"snapped_at"` +} diff --git a/fn_operations/operations.go b/fn_operations/operations.go new file mode 100644 index 00000000..d52fc9a5 --- /dev/null +++ b/fn_operations/operations.go @@ -0,0 +1,202 @@ +package fn_operations + +import ( + "fmt" + "time" + + "fn-registry/registry" +) + +// InsertEntityWithSnapshot inserts an entity, snapshotting its type from the registry if needed. +// registryDB can be nil if the type is already snapshotted. +func InsertEntityWithSnapshot(opsDB *DB, registryDB *registry.DB, e *Entity) error { + if err := ValidateEntity(e); err != nil { + return err + } + + // Check if type is already snapshotted + snap, err := opsDB.GetTypeSnapshot(e.TypeRef) + if err != nil { + return fmt.Errorf("checking type snapshot: %w", err) + } + + if snap == nil { + // Need to fetch from registry + if registryDB == nil { + return fmt.Errorf("type %q not found in local snapshots and no registry provided", e.TypeRef) + } + if err := SnapshotType(opsDB, registryDB, e.TypeRef); err != nil { + return err + } + } + + return opsDB.InsertEntity(e) +} + +// SnapshotType fetches a type from the registry and copies it to types_snapshot. +func SnapshotType(opsDB *DB, registryDB *registry.DB, typeID string) error { + t, err := registryDB.GetType(typeID) + if err != nil { + return fmt.Errorf("fetching type %q from registry: %w", typeID, err) + } + + snap := &TypeSnapshot{ + ID: t.ID, + Version: t.Version, + Lang: t.Lang, + Algebraic: string(t.Algebraic), + Definition: t.Definition, + Description: t.Description, + SnappedAt: time.Now().UTC(), + } + + return opsDB.InsertTypeSnapshot(snap) +} + +// InsertRelationSafe validates, checks for cycles, and inserts a relation. +func InsertRelationSafe(db *DB, r *Relation) error { + entities, err := buildEntitySet(db) + if err != nil { + return err + } + + if err := ValidateRelation(r, entities); err != nil { + return err + } + + // from_entity is required when not using relation_inputs + if r.FromEntity == "" { + return fmt.Errorf("from_entity is required (use InsertRelationWithInputs for multi-input relations)") + } + + // Cycle detection only for causal relations + if r.Via != "" { + if err := DetectCycle(db, r.FromEntity, r.ToEntity); err != nil { + return err + } + } + + return db.InsertRelation(r) +} + +// InsertRelationWithInputs validates and inserts a relation with multiple inputs in a transaction. +func InsertRelationWithInputs(db *DB, r *Relation, inputs []RelationInput) error { + entities, err := buildEntitySet(db) + if err != nil { + return err + } + + if err := ValidateRelation(r, entities); err != nil { + return err + } + if err := ValidateRelationInputs(inputs, entities); err != nil { + return err + } + + // Cycle detection for each input if causal + if r.Via != "" { + for _, input := range inputs { + if err := DetectCycle(db, input.EntityID, r.ToEntity); err != nil { + return err + } + } + } + + tx, err := db.Conn().Begin() + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer tx.Rollback() + + // Insert relation + now := time.Now().UTC() + if r.CreatedAt.IsZero() { + r.CreatedAt = now + } + r.UpdatedAt = now + + var startedAt, endedAt *string + if r.StartedAt != nil { + s := r.StartedAt.Format(time.RFC3339) + startedAt = &s + } + if r.EndedAt != nil { + s := r.EndedAt.Format(time.RFC3339) + endedAt = &s + } + + _, err = tx.Exec(` + INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description, + r.Purity, string(r.Direction), r.Weight, string(r.Status), + startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes, + r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339), + ) + if err != nil { + return fmt.Errorf("inserting relation: %w", err) + } + + // Insert inputs + for _, ri := range inputs { + _, err = tx.Exec(` + INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order") + VALUES (?, ?, ?, ?, ?)`, + ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order, + ) + if err != nil { + return fmt.Errorf("inserting relation_input: %w", err) + } + } + + return tx.Commit() +} + +// Graph holds the full entity-relation graph for a project. +type Graph struct { + Entities []Entity + Relations []Relation + Inputs map[string][]RelationInput +} + +// GetEntityGraph returns all entities and relations for visualization. +func GetEntityGraph(db *DB) (*Graph, error) { + entities, err := db.ListEntities("", "") + if err != nil { + return nil, fmt.Errorf("listing entities: %w", err) + } + + relations, err := db.ListRelations("") + if err != nil { + return nil, fmt.Errorf("listing relations: %w", err) + } + + inputs := map[string][]RelationInput{} + for _, r := range relations { + ri, err := db.GetRelationInputs(r.ID) + if err != nil { + return nil, fmt.Errorf("getting inputs for relation %s: %w", r.ID, err) + } + if len(ri) > 0 { + inputs[r.ID] = ri + } + } + + return &Graph{ + Entities: entities, + Relations: relations, + Inputs: inputs, + }, nil +} + +func buildEntitySet(db *DB) (map[string]bool, error) { + all, err := db.ListEntities("", "") + if err != nil { + return nil, fmt.Errorf("building entity set: %w", err) + } + set := make(map[string]bool, len(all)) + for _, e := range all { + set[e.ID] = true + } + return set, nil +} diff --git a/fn_operations/operations_test.go b/fn_operations/operations_test.go new file mode 100644 index 00000000..23b81ec7 --- /dev/null +++ b/fn_operations/operations_test.go @@ -0,0 +1,375 @@ +package fn_operations + +import ( + "os" + "path/filepath" + "testing" +) + +func tempDB(t *testing.T) *DB { + t.Helper() + path := filepath.Join(t.TempDir(), "test.db") + db, err := Open(path) + if err != nil { + t.Fatalf("opening test db: %v", err) + } + t.Cleanup(func() { db.Close() }) + return db +} + +func TestOpenAndClose(t *testing.T) { + path := filepath.Join(t.TempDir(), "test.db") + db, err := Open(path) + if err != nil { + t.Fatalf("open: %v", err) + } + if err := db.Close(); err != nil { + t.Fatalf("close: %v", err) + } + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Fatal("db file should exist") + } +} + +func TestTypeSnapshotCRUD(t *testing.T) { + db := tempDB(t) + + ts := &TypeSnapshot{ + ID: "ohlcv_go_finance", + Version: "1.0.0", + Lang: "go", + Algebraic: "product", + Definition: "type OHLCV struct { ... }", + Description: "Vela de mercado", + } + + if err := db.InsertTypeSnapshot(ts); err != nil { + t.Fatalf("insert: %v", err) + } + + got, err := db.GetTypeSnapshot("ohlcv_go_finance") + if err != nil { + t.Fatalf("get: %v", err) + } + if got == nil { + t.Fatal("expected snapshot, got nil") + } + if got.Definition != ts.Definition { + t.Errorf("definition = %q, want %q", got.Definition, ts.Definition) + } + + // INSERT OR IGNORE: second insert should not overwrite + ts2 := &TypeSnapshot{ + ID: "ohlcv_go_finance", + Version: "2.0.0", + Lang: "go", + Algebraic: "product", + Definition: "type OHLCV struct { CHANGED }", + Description: "Changed", + } + if err := db.InsertTypeSnapshot(ts2); err != nil { + t.Fatalf("insert duplicate: %v", err) + } + got2, _ := db.GetTypeSnapshot("ohlcv_go_finance") + if got2.Version != "1.0.0" { + t.Errorf("snapshot should be immutable, got version %q", got2.Version) + } + + // Not found + missing, err := db.GetTypeSnapshot("nonexistent") + if err != nil { + t.Fatalf("get missing: %v", err) + } + if missing != nil { + t.Error("expected nil for missing snapshot") + } + + all, err := db.ListTypeSnapshots() + if err != nil { + t.Fatalf("list: %v", err) + } + if len(all) != 1 { + t.Errorf("expected 1 snapshot, got %d", len(all)) + } +} + +func TestEntityCRUD(t *testing.T) { + db := tempDB(t) + + // Insert snapshot first (type_ref) + db.InsertTypeSnapshot(&TypeSnapshot{ + ID: "tick_go_finance", Version: "1.0.0", Lang: "go", Algebraic: "product", + }) + + e := &Entity{ + ID: "ticks_btcusdt_2024", + Name: "ticks_btcusdt_2024", + TypeRef: "tick_go_finance", + Status: StatusActive, + Source: "binance_api", + Domain: "market_data", + Tags: []string{"btc", "binance"}, + Metadata: map[string]any{ + "pair": "BTCUSDT", + "exchange": "binance", + }, + } + + if err := db.InsertEntity(e); err != nil { + t.Fatalf("insert: %v", err) + } + + got, err := db.GetEntity("ticks_btcusdt_2024") + if err != nil { + t.Fatalf("get: %v", err) + } + if got == nil { + t.Fatal("expected entity, got nil") + } + if got.Source != "binance_api" { + t.Errorf("source = %q, want binance_api", got.Source) + } + if len(got.Tags) != 2 { + t.Errorf("tags len = %d, want 2", len(got.Tags)) + } + if got.Metadata["pair"] != "BTCUSDT" { + t.Errorf("metadata pair = %v, want BTCUSDT", got.Metadata["pair"]) + } + + // Update + got.Status = StatusStale + if err := db.UpdateEntity(got); err != nil { + t.Fatalf("update: %v", err) + } + updated, _ := db.GetEntity("ticks_btcusdt_2024") + if updated.Status != StatusStale { + t.Errorf("status = %q, want stale", updated.Status) + } + + // List + all, err := db.ListEntities("market_data", "") + if err != nil { + t.Fatalf("list: %v", err) + } + if len(all) != 1 { + t.Errorf("expected 1, got %d", len(all)) + } + + // Search + found, err := db.SearchEntities("btcusdt", "") + if err != nil { + t.Fatalf("search: %v", err) + } + if len(found) != 1 { + t.Errorf("search expected 1, got %d", len(found)) + } + + // Delete + if err := db.DeleteEntity("ticks_btcusdt_2024"); err != nil { + t.Fatalf("delete: %v", err) + } + deleted, _ := db.GetEntity("ticks_btcusdt_2024") + if deleted != nil { + t.Error("expected nil after delete") + } +} + +func TestRelationCRUD(t *testing.T) { + db := tempDB(t) + + // Setup entities + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"}) + + r := &Relation{ + ID: "a__to__b__via__transform", + Name: "TRANSFORMA", + FromEntity: "a", + ToEntity: "b", + Direction: DirUnidirectional, + Status: RelDesigned, + } + + if err := InsertRelationSafe(db, r); err != nil { + t.Fatalf("insert relation: %v", err) + } + + got, err := db.GetRelation("a__to__b__via__transform") + if err != nil { + t.Fatalf("get: %v", err) + } + if got == nil { + t.Fatal("expected relation, got nil") + } + if got.Name != "TRANSFORMA" { + t.Errorf("name = %q, want TRANSFORMA", got.Name) + } + + // List by entity + rels, err := db.ListRelations("a") + if err != nil { + t.Fatalf("list: %v", err) + } + if len(rels) != 1 { + t.Errorf("expected 1, got %d", len(rels)) + } + + // Delete + if err := db.DeleteRelation("a__to__b__via__transform"); err != nil { + t.Fatalf("delete: %v", err) + } + deleted, _ := db.GetRelation("a__to__b__via__transform") + if deleted != nil { + t.Error("expected nil after delete") + } +} + +func TestRelationInputs(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "c", Name: "c", TypeRef: "t1", Status: StatusActive, Source: "test"}) + + r := &Relation{ + ID: "multi__to__c", + Name: "ENRIQUECE", + ToEntity: "c", + Direction: DirUnidirectional, + Status: RelDesigned, + } + + inputs := []RelationInput{ + {ID: "i1", RelationID: "multi__to__c", EntityID: "a", Role: "base"}, + {ID: "i2", RelationID: "multi__to__c", EntityID: "b", Role: "lookup"}, + } + + if err := InsertRelationWithInputs(db, r, inputs); err != nil { + t.Fatalf("insert with inputs: %v", err) + } + + got, err := db.GetRelationInputs("multi__to__c") + if err != nil { + t.Fatalf("get inputs: %v", err) + } + if len(got) != 2 { + t.Errorf("expected 2 inputs, got %d", len(got)) + } +} + +func TestCycleDetectionCausal(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "c", Name: "c", TypeRef: "t1", Status: StatusActive, Source: "test"}) + + // a -> b (causal) + InsertRelationSafe(db, &Relation{ + ID: "ab", Name: "T1", FromEntity: "a", ToEntity: "b", Via: "fn1", + Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned, + }) + + // b -> c (causal) + InsertRelationSafe(db, &Relation{ + ID: "bc", Name: "T2", FromEntity: "b", ToEntity: "c", Via: "fn2", + Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned, + }) + + // c -> a (causal) should fail — creates cycle + err := InsertRelationSafe(db, &Relation{ + ID: "ca", Name: "T3", FromEntity: "c", ToEntity: "a", Via: "fn3", + Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned, + }) + if err == nil { + t.Fatal("expected cycle error, got nil") + } +} + +func TestCycleDetectionSemanticAllowed(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "juan", Name: "juan", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "paula", Name: "paula", TypeRef: "t1", Status: StatusActive, Source: "test"}) + + // juan -> paula (semantic, no via) + if err := InsertRelationSafe(db, &Relation{ + ID: "jp", Name: "CONOCE A", FromEntity: "juan", ToEntity: "paula", + Direction: DirBidirectional, Status: RelRunning, + }); err != nil { + t.Fatalf("insert semantic: %v", err) + } + + // paula -> juan (semantic, no via) — should succeed, no cycle check + if err := InsertRelationSafe(db, &Relation{ + ID: "pj", Name: "CONOCE A", FromEntity: "paula", ToEntity: "juan", + Direction: DirBidirectional, Status: RelRunning, + }); err != nil { + t.Fatalf("semantic cycle should be allowed: %v", err) + } +} + +func TestValidateEntity(t *testing.T) { + tests := []struct { + name string + entity Entity + wantErr bool + }{ + { + name: "valid", + entity: Entity{ID: "x", Name: "x", TypeRef: "t1", Status: StatusActive, Source: "test"}, + wantErr: false, + }, + { + name: "missing name", + entity: Entity{ID: "x", TypeRef: "t1", Status: StatusActive, Source: "test"}, + wantErr: true, + }, + { + name: "missing source", + entity: Entity{ID: "x", Name: "x", TypeRef: "t1", Status: StatusActive}, + wantErr: true, + }, + { + name: "missing type_ref", + entity: Entity{ID: "x", Name: "x", Status: StatusActive, Source: "test"}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateEntity(&tt.entity) + if (err != nil) != tt.wantErr { + t.Errorf("ValidateEntity() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestGetEntityGraph(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"}) + InsertRelationSafe(db, &Relation{ + ID: "ab", Name: "FLUYE", FromEntity: "a", ToEntity: "b", + Direction: DirUnidirectional, Status: RelDesigned, + }) + + g, err := GetEntityGraph(db) + if err != nil { + t.Fatalf("graph: %v", err) + } + if len(g.Entities) != 2 { + t.Errorf("entities = %d, want 2", len(g.Entities)) + } + if len(g.Relations) != 1 { + t.Errorf("relations = %d, want 1", len(g.Relations)) + } +} diff --git a/fn_operations/store.go b/fn_operations/store.go new file mode 100644 index 00000000..52f2389a --- /dev/null +++ b/fn_operations/store.go @@ -0,0 +1,459 @@ +package fn_operations + +import ( + "database/sql" + "encoding/json" + "fmt" + "strings" + "time" +) + +func marshalStrings(ss []string) string { + if ss == nil { + ss = []string{} + } + b, _ := json.Marshal(ss) + return string(b) +} + +func unmarshalStrings(s string) []string { + var out []string + json.Unmarshal([]byte(s), &out) + if out == nil { + out = []string{} + } + return out +} + +func marshalJSON(v map[string]any) string { + if v == nil { + v = map[string]any{} + } + b, _ := json.Marshal(v) + return string(b) +} + +func unmarshalJSON(s string) map[string]any { + var out map[string]any + json.Unmarshal([]byte(s), &out) + if out == nil { + out = map[string]any{} + } + return out +} + +// --- TypeSnapshot CRUD --- + +// InsertTypeSnapshot inserts a type snapshot. +func (db *DB) InsertTypeSnapshot(ts *TypeSnapshot) error { + if ts.SnappedAt.IsZero() { + ts.SnappedAt = time.Now().UTC() + } + _, err := db.conn.Exec(` + INSERT OR IGNORE INTO types_snapshot (id, version, lang, algebraic, definition, description, snapped_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + ts.ID, ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description, + ts.SnappedAt.Format(time.RFC3339), + ) + return err +} + +// GetTypeSnapshot returns a type snapshot by ID. +func (db *DB) GetTypeSnapshot(id string) (*TypeSnapshot, error) { + row := db.conn.QueryRow("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot WHERE id = ?", id) + var ts TypeSnapshot + var snappedAt string + err := row.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scanning type_snapshot: %w", err) + } + ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt) + return &ts, nil +} + +// ListTypeSnapshots returns all type snapshots. +func (db *DB) ListTypeSnapshots() ([]TypeSnapshot, error) { + rows, err := db.conn.Query("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot ORDER BY id") + if err != nil { + return nil, err + } + defer rows.Close() + + var result []TypeSnapshot + for rows.Next() { + var ts TypeSnapshot + var snappedAt string + if err := rows.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt); err != nil { + return nil, fmt.Errorf("scanning type_snapshot: %w", err) + } + ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt) + result = append(result, ts) + } + return result, nil +} + +// --- Entity CRUD --- + +// InsertEntity inserts or replaces an entity. +func (db *DB) InsertEntity(e *Entity) error { + now := time.Now().UTC() + if e.CreatedAt.IsZero() { + e.CreatedAt = now + } + e.UpdatedAt = now + + _, err := db.conn.Exec(` + INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + e.ID, e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain, + marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes, + e.CreatedAt.Format(time.RFC3339), e.UpdatedAt.Format(time.RFC3339), + ) + return err +} + +// GetEntity returns an entity by ID. +func (db *DB) GetEntity(id string) (*Entity, error) { + row := db.conn.QueryRow(` + SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at + FROM entities WHERE id = ?`, id) + + var e Entity + var tagsJSON, metadataJSON, createdAt, updatedAt string + err := row.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain, + &tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scanning entity: %w", err) + } + e.Tags = unmarshalStrings(tagsJSON) + e.Metadata = unmarshalJSON(metadataJSON) + e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt) + return &e, nil +} + +// UpdateEntity updates an existing entity. +func (db *DB) UpdateEntity(e *Entity) error { + e.UpdatedAt = time.Now().UTC() + _, err := db.conn.Exec(` + UPDATE entities SET name=?, type_ref=?, status=?, description=?, domain=?, tags=?, source=?, metadata=?, notes=?, updated_at=? + WHERE id=?`, + e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain, + marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes, + e.UpdatedAt.Format(time.RFC3339), e.ID, + ) + return err +} + +// DeleteEntity removes an entity by ID. +func (db *DB) DeleteEntity(id string) error { + _, err := db.conn.Exec("DELETE FROM entities WHERE id = ?", id) + return err +} + +// ListEntities returns entities filtered by domain and/or status. +func (db *DB) ListEntities(domain string, status EntityStatus) ([]Entity, error) { + where := []string{} + args := []any{} + if domain != "" { + where = append(where, "domain = ?") + args = append(args, domain) + } + if status != "" { + where = append(where, "status = ?") + args = append(args, string(status)) + } + + q := "SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at FROM entities" + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY name" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanEntities(rows) +} + +// SearchEntities performs FTS search on entities. +func (db *DB) SearchEntities(query, domain string) ([]Entity, error) { + where := []string{} + args := []any{} + if query != "" { + where = append(where, "e.id IN (SELECT id FROM entities_fts WHERE entities_fts MATCH ?)") + args = append(args, query) + } + if domain != "" { + where = append(where, "e.domain = ?") + args = append(args, domain) + } + + q := "SELECT e.id, e.name, e.type_ref, e.status, e.description, e.domain, e.tags, e.source, e.metadata, e.notes, e.created_at, e.updated_at FROM entities e" + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY e.name" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanEntities(rows) +} + +func scanEntities(rows *sql.Rows) ([]Entity, error) { + var result []Entity + for rows.Next() { + var e Entity + var tagsJSON, metadataJSON, createdAt, updatedAt string + if err := rows.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain, + &tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt); err != nil { + return nil, fmt.Errorf("scanning entity: %w", err) + } + e.Tags = unmarshalStrings(tagsJSON) + e.Metadata = unmarshalJSON(metadataJSON) + e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt) + result = append(result, e) + } + return result, nil +} + +// --- Relation CRUD --- + +// InsertRelation inserts or replaces a relation. +func (db *DB) InsertRelation(r *Relation) error { + now := time.Now().UTC() + if r.CreatedAt.IsZero() { + r.CreatedAt = now + } + r.UpdatedAt = now + + var startedAt, endedAt *string + if r.StartedAt != nil { + s := r.StartedAt.Format(time.RFC3339) + startedAt = &s + } + if r.EndedAt != nil { + s := r.EndedAt.Format(time.RFC3339) + endedAt = &s + } + + _, err := db.conn.Exec(` + INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description, + r.Purity, string(r.Direction), r.Weight, string(r.Status), + startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes, + r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339), + ) + return err +} + +// GetRelation returns a relation by ID. +func (db *DB) GetRelation(id string) (*Relation, error) { + row := db.conn.QueryRow(` + SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at + FROM relations WHERE id = ?`, id) + return scanRelation(row) +} + +// UpdateRelation updates an existing relation. +func (db *DB) UpdateRelation(r *Relation) error { + r.UpdatedAt = time.Now().UTC() + + var startedAt, endedAt *string + if r.StartedAt != nil { + s := r.StartedAt.Format(time.RFC3339) + startedAt = &s + } + if r.EndedAt != nil { + s := r.EndedAt.Format(time.RFC3339) + endedAt = &s + } + + _, err := db.conn.Exec(` + UPDATE relations SET name=?, from_entity=?, to_entity=?, via=?, description=?, purity=?, direction=?, weight=?, status=?, started_at=?, ended_at=?, "order"=?, tags=?, notes=?, updated_at=? + WHERE id=?`, + r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description, + r.Purity, string(r.Direction), r.Weight, string(r.Status), + startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes, + r.UpdatedAt.Format(time.RFC3339), r.ID, + ) + return err +} + +// DeleteRelation removes a relation by ID (cascades to relation_inputs). +func (db *DB) DeleteRelation(id string) error { + _, err := db.conn.Exec("DELETE FROM relations WHERE id = ?", id) + return err +} + +// ListRelations returns all relations, optionally filtered by entity involvement. +func (db *DB) ListRelations(entityID string) ([]Relation, error) { + q := `SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at FROM relations` + var args []any + if entityID != "" { + q += " WHERE from_entity = ? OR to_entity = ?" + args = append(args, entityID, entityID) + } + q += " ORDER BY name" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []Relation + for rows.Next() { + r, err := scanRelationFromRows(rows) + if err != nil { + return nil, err + } + result = append(result, *r) + } + return result, nil +} + +// GetRelationsFrom returns all relations where from_entity matches. +func (db *DB) GetRelationsFrom(entityID string) ([]Relation, error) { + rows, err := db.conn.Query(` + SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at + FROM relations WHERE from_entity = ? ORDER BY name`, entityID) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []Relation + for rows.Next() { + r, err := scanRelationFromRows(rows) + if err != nil { + return nil, err + } + result = append(result, *r) + } + return result, nil +} + +// GetRelationsTo returns all relations where to_entity matches. +func (db *DB) GetRelationsTo(entityID string) ([]Relation, error) { + rows, err := db.conn.Query(` + SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at + FROM relations WHERE to_entity = ? ORDER BY name`, entityID) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []Relation + for rows.Next() { + r, err := scanRelationFromRows(rows) + if err != nil { + return nil, err + } + result = append(result, *r) + } + return result, nil +} + +func scanRelation(row *sql.Row) (*Relation, error) { + var r Relation + var tagsJSON, createdAt, updatedAt string + var startedAt, endedAt *string + err := row.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description, + &r.Purity, &r.Direction, &r.Weight, &r.Status, + &startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scanning relation: %w", err) + } + r.Tags = unmarshalStrings(tagsJSON) + r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt) + if startedAt != nil { + t, _ := time.Parse(time.RFC3339, *startedAt) + r.StartedAt = &t + } + if endedAt != nil { + t, _ := time.Parse(time.RFC3339, *endedAt) + r.EndedAt = &t + } + return &r, nil +} + +func scanRelationFromRows(rows *sql.Rows) (*Relation, error) { + var r Relation + var tagsJSON, createdAt, updatedAt string + var startedAt, endedAt *string + err := rows.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description, + &r.Purity, &r.Direction, &r.Weight, &r.Status, + &startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt) + if err != nil { + return nil, fmt.Errorf("scanning relation: %w", err) + } + r.Tags = unmarshalStrings(tagsJSON) + r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt) + if startedAt != nil { + t, _ := time.Parse(time.RFC3339, *startedAt) + r.StartedAt = &t + } + if endedAt != nil { + t, _ := time.Parse(time.RFC3339, *endedAt) + r.EndedAt = &t + } + return &r, nil +} + +// --- RelationInput CRUD --- + +// InsertRelationInput inserts a relation input. +func (db *DB) InsertRelationInput(ri *RelationInput) error { + _, err := db.conn.Exec(` + INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order") + VALUES (?, ?, ?, ?, ?)`, + ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order, + ) + return err +} + +// GetRelationInputs returns all inputs for a relation. +func (db *DB) GetRelationInputs(relationID string) ([]RelationInput, error) { + rows, err := db.conn.Query(` + SELECT id, relation_id, entity_id, role, "order" + FROM relation_inputs WHERE relation_id = ? ORDER BY "order"`, relationID) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []RelationInput + for rows.Next() { + var ri RelationInput + if err := rows.Scan(&ri.ID, &ri.RelationID, &ri.EntityID, &ri.Role, &ri.Order); err != nil { + return nil, fmt.Errorf("scanning relation_input: %w", err) + } + result = append(result, ri) + } + return result, nil +} + +// DeleteRelationInputs removes all inputs for a relation. +func (db *DB) DeleteRelationInputs(relationID string) error { + _, err := db.conn.Exec("DELETE FROM relation_inputs WHERE relation_id = ?", relationID) + return err +} diff --git a/fn_operations/validate.go b/fn_operations/validate.go new file mode 100644 index 00000000..5eb759a0 --- /dev/null +++ b/fn_operations/validate.go @@ -0,0 +1,181 @@ +package fn_operations + +import ( + "fmt" + "strings" +) + +// ValidationError represents one or more integrity violations. +type ValidationError struct { + ID string + Errors []string +} + +func (v *ValidationError) Error() string { + return fmt.Sprintf("%s: %s", v.ID, strings.Join(v.Errors, "; ")) +} + +// ValidateEntity checks entity integrity rules. +func ValidateEntity(e *Entity) *ValidationError { + var errs []string + + if e.ID == "" { + errs = append(errs, "id is required") + } + if e.Name == "" { + errs = append(errs, "name is required") + } + if e.TypeRef == "" { + errs = append(errs, "type_ref is required") + } + if e.Source == "" { + errs = append(errs, "source is required") + } + + switch e.Status { + case StatusActive, StatusStale, StatusCorrupted, StatusArchived: + case "": + errs = append(errs, "status is required") + default: + errs = append(errs, fmt.Sprintf("invalid status: %s", e.Status)) + } + + if len(errs) > 0 { + return &ValidationError{ID: e.ID, Errors: errs} + } + return nil +} + +// ValidateRelation checks relation integrity rules. +// knownEntities is a set of entity IDs that exist. +func ValidateRelation(r *Relation, knownEntities map[string]bool) *ValidationError { + var errs []string + + if r.ID == "" { + errs = append(errs, "id is required") + } + if r.Name == "" { + errs = append(errs, "name is required") + } + if r.ToEntity == "" { + errs = append(errs, "to_entity is required") + } + + // from_entity or relation_inputs — validated at operation level + if r.FromEntity != "" && r.ToEntity != "" && r.FromEntity == r.ToEntity { + errs = append(errs, "from_entity and to_entity cannot be the same") + } + + if r.FromEntity != "" && !knownEntities[r.FromEntity] { + errs = append(errs, fmt.Sprintf("from_entity references unknown entity: %s", r.FromEntity)) + } + if r.ToEntity != "" && !knownEntities[r.ToEntity] { + errs = append(errs, fmt.Sprintf("to_entity references unknown entity: %s", r.ToEntity)) + } + + if r.Weight != nil { + if *r.Weight < 0.0 || *r.Weight > 1.0 { + errs = append(errs, "weight must be between 0.0 and 1.0") + } + } + + if r.StartedAt != nil && r.EndedAt != nil { + if r.StartedAt.After(*r.EndedAt) { + errs = append(errs, "started_at must be before ended_at") + } + } + + switch r.Direction { + case DirUnidirectional, DirBidirectional, DirInverse, "": + default: + errs = append(errs, fmt.Sprintf("invalid direction: %s", r.Direction)) + } + + if len(errs) > 0 { + return &ValidationError{ID: r.ID, Errors: errs} + } + return nil +} + +// ValidateRelationInputs checks relation_inputs integrity. +func ValidateRelationInputs(inputs []RelationInput, knownEntities map[string]bool) *ValidationError { + var errs []string + + if len(inputs) < 2 { + errs = append(errs, "relation_inputs must have at least 2 entries") + } + + for i, ri := range inputs { + if ri.RelationID == "" { + errs = append(errs, fmt.Sprintf("input[%d]: relation_id is required", i)) + } + if ri.EntityID == "" { + errs = append(errs, fmt.Sprintf("input[%d]: entity_id is required", i)) + } + if ri.Role == "" { + errs = append(errs, fmt.Sprintf("input[%d]: role is required", i)) + } + if ri.EntityID != "" && !knownEntities[ri.EntityID] { + errs = append(errs, fmt.Sprintf("input[%d]: entity_id references unknown entity: %s", i, ri.EntityID)) + } + } + + if len(errs) > 0 { + id := "relation_inputs" + if len(inputs) > 0 { + id = inputs[0].RelationID + } + return &ValidationError{ID: id, Errors: errs} + } + return nil +} + +// DetectCycle checks if adding a causal relation (from -> to) creates a cycle. +// Only considers relations where via != "" (causal/transformational). +// Semantic relations (via == "") are exempt from cycle detection. +func DetectCycle(db *DB, fromEntity, toEntity string) error { + if fromEntity == "" || toEntity == "" { + return nil + } + + // BFS from toEntity following only causal relations. + // If we reach fromEntity, there's a cycle. + visited := map[string]bool{} + queue := []string{toEntity} + + for len(queue) > 0 { + current := queue[0] + queue = queue[1:] + + if visited[current] { + continue + } + visited[current] = true + + if current == fromEntity { + return fmt.Errorf("cycle detected: adding relation %s -> %s would create a causal cycle", fromEntity, toEntity) + } + + // Follow causal relations from current entity + rows, err := db.conn.Query(` + SELECT to_entity FROM relations + WHERE from_entity = ? AND via != ''`, current) + if err != nil { + return fmt.Errorf("querying relations for cycle detection: %w", err) + } + + for rows.Next() { + var next string + if err := rows.Scan(&next); err != nil { + rows.Close() + return err + } + if !visited[next] { + queue = append(queue, next) + } + } + rows.Close() + } + + return nil +}