feat: fn_operations library — entities, relations, types_snapshot con FTS y ciclos

Paquete Go completo con modelos (Entity, Relation, RelationInput, TypeSnapshot),
DB SQLite con WAL + FTS5 en entities, CRUD para las 4 tablas, validacion de
integridad, deteccion de ciclos solo en relaciones causales (via != ''), y
operaciones de alto nivel con snapshot automatico de tipos del registry.
9 tests, todos pasan.
This commit is contained in:
2026-03-28 04:37:50 +01:00
parent 4acb411915
commit 67401cb967
6 changed files with 1447 additions and 0 deletions
+140
View File
@@ -0,0 +1,140 @@
package fn_operations
import (
"database/sql"
"fmt"
"os"
"path/filepath"
_ "github.com/mattn/go-sqlite3"
)
const schemaSQL = `
CREATE TABLE IF NOT EXISTS types_snapshot (
id TEXT PRIMARY KEY,
version TEXT NOT NULL DEFAULT '1.0.0',
lang TEXT NOT NULL,
algebraic TEXT NOT NULL CHECK(algebraic IN ('product','sum')),
definition TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
snapped_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type_ref TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active','stale','corrupted','archived')),
description TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS relations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
from_entity TEXT NOT NULL DEFAULT '',
to_entity TEXT NOT NULL,
via TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
purity TEXT NOT NULL DEFAULT '' CHECK(purity IN ('','pure','impure')),
direction TEXT NOT NULL DEFAULT 'unidirectional' CHECK(direction IN ('unidirectional','bidirectional','inverse')),
weight REAL,
status TEXT NOT NULL DEFAULT 'designed' CHECK(status IN ('designed','implemented','tested','running','deprecated')),
started_at TEXT,
ended_at TEXT,
"order" INTEGER,
tags TEXT NOT NULL DEFAULT '[]',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS relation_inputs (
id TEXT PRIMARY KEY,
relation_id TEXT NOT NULL REFERENCES relations(id) ON DELETE CASCADE,
entity_id TEXT NOT NULL REFERENCES entities(id),
role TEXT NOT NULL,
"order" INTEGER
);
CREATE VIRTUAL TABLE IF NOT EXISTS entities_fts USING fts5(
id,
name,
description,
tags,
domain,
content='entities',
content_rowid='rowid'
);
-- Triggers to keep entities FTS in sync
CREATE TRIGGER IF NOT EXISTS entities_ai AFTER INSERT ON entities BEGIN
INSERT INTO entities_fts(rowid, id, name, description, tags, domain)
VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain);
END;
CREATE TRIGGER IF NOT EXISTS entities_ad AFTER DELETE ON entities BEGIN
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags, domain)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain);
END;
CREATE TRIGGER IF NOT EXISTS entities_au AFTER UPDATE ON entities BEGIN
INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags, domain)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain);
INSERT INTO entities_fts(rowid, id, name, description, tags, domain)
VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain);
END;
`
// DB wraps a SQLite connection for an operations database.
type DB struct {
conn *sql.DB
path string
}
// Open opens or creates an operations database at the given path.
func Open(path string) (*DB, error) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("creating db directory: %w", err)
}
conn, err := sql.Open("sqlite3", path+"?_foreign_keys=on")
if err != nil {
return nil, fmt.Errorf("opening database: %w", err)
}
if _, err := conn.Exec("PRAGMA journal_mode=WAL"); err != nil {
conn.Close()
return nil, fmt.Errorf("setting WAL mode: %w", err)
}
if _, err := conn.Exec(schemaSQL); err != nil {
conn.Close()
return nil, fmt.Errorf("applying schema: %w", err)
}
return &DB{conn: conn, path: path}, nil
}
// Conn returns the underlying sql.DB for transaction use.
func (db *DB) Conn() *sql.DB {
return db.conn
}
// Close closes the database connection.
func (db *DB) Close() error {
return db.conn.Close()
}
// Drop removes the database file.
func (db *DB) Drop() error {
db.Close()
return os.Remove(db.path)
}
+90
View File
@@ -0,0 +1,90 @@
package fn_operations
import "time"
// EntityStatus represents the lifecycle state of an entity.
type EntityStatus string
const (
StatusActive EntityStatus = "active"
StatusStale EntityStatus = "stale"
StatusCorrupted EntityStatus = "corrupted"
StatusArchived EntityStatus = "archived"
)
// RelationStatus represents the lifecycle state of a relation.
type RelationStatus string
const (
RelDesigned RelationStatus = "designed"
RelImplemented RelationStatus = "implemented"
RelTested RelationStatus = "tested"
RelRunning RelationStatus = "running"
RelDeprecated RelationStatus = "deprecated"
)
// Direction represents the directionality of a relation.
type Direction string
const (
DirUnidirectional Direction = "unidirectional"
DirBidirectional Direction = "bidirectional"
DirInverse Direction = "inverse"
)
// Entity is a concrete instance of a registry type within a project context.
type Entity struct {
ID string `json:"id"`
Name string `json:"name"`
TypeRef string `json:"type_ref"`
Status EntityStatus `json:"status"`
Description string `json:"description"`
Domain string `json:"domain"`
Tags []string `json:"tags"`
Source string `json:"source"`
Metadata map[string]any `json:"metadata"`
Notes string `json:"notes"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Relation describes how one entity connects to or transforms into another.
type Relation struct {
ID string `json:"id"`
Name string `json:"name"`
FromEntity string `json:"from_entity"`
ToEntity string `json:"to_entity"`
Via string `json:"via"`
Description string `json:"description"`
Purity string `json:"purity"`
Direction Direction `json:"direction"`
Weight *float64 `json:"weight"`
Status RelationStatus `json:"status"`
StartedAt *time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at"`
Order *int `json:"order"`
Tags []string `json:"tags"`
Notes string `json:"notes"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// RelationInput represents one input entity in a multi-input relation.
type RelationInput struct {
ID string `json:"id"`
RelationID string `json:"relation_id"`
EntityID string `json:"entity_id"`
Role string `json:"role"`
Order *int `json:"order"`
}
// TypeSnapshot is an immutable copy of a registry type at point of use.
type TypeSnapshot struct {
ID string `json:"id"`
Version string `json:"version"`
Lang string `json:"lang"`
Algebraic string `json:"algebraic"`
Definition string `json:"definition"`
Description string `json:"description"`
SnappedAt time.Time `json:"snapped_at"`
}
+202
View File
@@ -0,0 +1,202 @@
package fn_operations
import (
"fmt"
"time"
"fn-registry/registry"
)
// InsertEntityWithSnapshot inserts an entity, snapshotting its type from the registry if needed.
// registryDB can be nil if the type is already snapshotted.
func InsertEntityWithSnapshot(opsDB *DB, registryDB *registry.DB, e *Entity) error {
if err := ValidateEntity(e); err != nil {
return err
}
// Check if type is already snapshotted
snap, err := opsDB.GetTypeSnapshot(e.TypeRef)
if err != nil {
return fmt.Errorf("checking type snapshot: %w", err)
}
if snap == nil {
// Need to fetch from registry
if registryDB == nil {
return fmt.Errorf("type %q not found in local snapshots and no registry provided", e.TypeRef)
}
if err := SnapshotType(opsDB, registryDB, e.TypeRef); err != nil {
return err
}
}
return opsDB.InsertEntity(e)
}
// SnapshotType fetches a type from the registry and copies it to types_snapshot.
func SnapshotType(opsDB *DB, registryDB *registry.DB, typeID string) error {
t, err := registryDB.GetType(typeID)
if err != nil {
return fmt.Errorf("fetching type %q from registry: %w", typeID, err)
}
snap := &TypeSnapshot{
ID: t.ID,
Version: t.Version,
Lang: t.Lang,
Algebraic: string(t.Algebraic),
Definition: t.Definition,
Description: t.Description,
SnappedAt: time.Now().UTC(),
}
return opsDB.InsertTypeSnapshot(snap)
}
// InsertRelationSafe validates, checks for cycles, and inserts a relation.
func InsertRelationSafe(db *DB, r *Relation) error {
entities, err := buildEntitySet(db)
if err != nil {
return err
}
if err := ValidateRelation(r, entities); err != nil {
return err
}
// from_entity is required when not using relation_inputs
if r.FromEntity == "" {
return fmt.Errorf("from_entity is required (use InsertRelationWithInputs for multi-input relations)")
}
// Cycle detection only for causal relations
if r.Via != "" {
if err := DetectCycle(db, r.FromEntity, r.ToEntity); err != nil {
return err
}
}
return db.InsertRelation(r)
}
// InsertRelationWithInputs validates and inserts a relation with multiple inputs in a transaction.
func InsertRelationWithInputs(db *DB, r *Relation, inputs []RelationInput) error {
entities, err := buildEntitySet(db)
if err != nil {
return err
}
if err := ValidateRelation(r, entities); err != nil {
return err
}
if err := ValidateRelationInputs(inputs, entities); err != nil {
return err
}
// Cycle detection for each input if causal
if r.Via != "" {
for _, input := range inputs {
if err := DetectCycle(db, input.EntityID, r.ToEntity); err != nil {
return err
}
}
}
tx, err := db.Conn().Begin()
if err != nil {
return fmt.Errorf("beginning transaction: %w", err)
}
defer tx.Rollback()
// Insert relation
now := time.Now().UTC()
if r.CreatedAt.IsZero() {
r.CreatedAt = now
}
r.UpdatedAt = now
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err = tx.Exec(`
INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339),
)
if err != nil {
return fmt.Errorf("inserting relation: %w", err)
}
// Insert inputs
for _, ri := range inputs {
_, err = tx.Exec(`
INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order")
VALUES (?, ?, ?, ?, ?)`,
ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order,
)
if err != nil {
return fmt.Errorf("inserting relation_input: %w", err)
}
}
return tx.Commit()
}
// Graph holds the full entity-relation graph for a project.
type Graph struct {
Entities []Entity
Relations []Relation
Inputs map[string][]RelationInput
}
// GetEntityGraph returns all entities and relations for visualization.
func GetEntityGraph(db *DB) (*Graph, error) {
entities, err := db.ListEntities("", "")
if err != nil {
return nil, fmt.Errorf("listing entities: %w", err)
}
relations, err := db.ListRelations("")
if err != nil {
return nil, fmt.Errorf("listing relations: %w", err)
}
inputs := map[string][]RelationInput{}
for _, r := range relations {
ri, err := db.GetRelationInputs(r.ID)
if err != nil {
return nil, fmt.Errorf("getting inputs for relation %s: %w", r.ID, err)
}
if len(ri) > 0 {
inputs[r.ID] = ri
}
}
return &Graph{
Entities: entities,
Relations: relations,
Inputs: inputs,
}, nil
}
func buildEntitySet(db *DB) (map[string]bool, error) {
all, err := db.ListEntities("", "")
if err != nil {
return nil, fmt.Errorf("building entity set: %w", err)
}
set := make(map[string]bool, len(all))
for _, e := range all {
set[e.ID] = true
}
return set, nil
}
+375
View File
@@ -0,0 +1,375 @@
package fn_operations
import (
"os"
"path/filepath"
"testing"
)
func tempDB(t *testing.T) *DB {
t.Helper()
path := filepath.Join(t.TempDir(), "test.db")
db, err := Open(path)
if err != nil {
t.Fatalf("opening test db: %v", err)
}
t.Cleanup(func() { db.Close() })
return db
}
func TestOpenAndClose(t *testing.T) {
path := filepath.Join(t.TempDir(), "test.db")
db, err := Open(path)
if err != nil {
t.Fatalf("open: %v", err)
}
if err := db.Close(); err != nil {
t.Fatalf("close: %v", err)
}
if _, err := os.Stat(path); os.IsNotExist(err) {
t.Fatal("db file should exist")
}
}
func TestTypeSnapshotCRUD(t *testing.T) {
db := tempDB(t)
ts := &TypeSnapshot{
ID: "ohlcv_go_finance",
Version: "1.0.0",
Lang: "go",
Algebraic: "product",
Definition: "type OHLCV struct { ... }",
Description: "Vela de mercado",
}
if err := db.InsertTypeSnapshot(ts); err != nil {
t.Fatalf("insert: %v", err)
}
got, err := db.GetTypeSnapshot("ohlcv_go_finance")
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected snapshot, got nil")
}
if got.Definition != ts.Definition {
t.Errorf("definition = %q, want %q", got.Definition, ts.Definition)
}
// INSERT OR IGNORE: second insert should not overwrite
ts2 := &TypeSnapshot{
ID: "ohlcv_go_finance",
Version: "2.0.0",
Lang: "go",
Algebraic: "product",
Definition: "type OHLCV struct { CHANGED }",
Description: "Changed",
}
if err := db.InsertTypeSnapshot(ts2); err != nil {
t.Fatalf("insert duplicate: %v", err)
}
got2, _ := db.GetTypeSnapshot("ohlcv_go_finance")
if got2.Version != "1.0.0" {
t.Errorf("snapshot should be immutable, got version %q", got2.Version)
}
// Not found
missing, err := db.GetTypeSnapshot("nonexistent")
if err != nil {
t.Fatalf("get missing: %v", err)
}
if missing != nil {
t.Error("expected nil for missing snapshot")
}
all, err := db.ListTypeSnapshots()
if err != nil {
t.Fatalf("list: %v", err)
}
if len(all) != 1 {
t.Errorf("expected 1 snapshot, got %d", len(all))
}
}
func TestEntityCRUD(t *testing.T) {
db := tempDB(t)
// Insert snapshot first (type_ref)
db.InsertTypeSnapshot(&TypeSnapshot{
ID: "tick_go_finance", Version: "1.0.0", Lang: "go", Algebraic: "product",
})
e := &Entity{
ID: "ticks_btcusdt_2024",
Name: "ticks_btcusdt_2024",
TypeRef: "tick_go_finance",
Status: StatusActive,
Source: "binance_api",
Domain: "market_data",
Tags: []string{"btc", "binance"},
Metadata: map[string]any{
"pair": "BTCUSDT",
"exchange": "binance",
},
}
if err := db.InsertEntity(e); err != nil {
t.Fatalf("insert: %v", err)
}
got, err := db.GetEntity("ticks_btcusdt_2024")
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected entity, got nil")
}
if got.Source != "binance_api" {
t.Errorf("source = %q, want binance_api", got.Source)
}
if len(got.Tags) != 2 {
t.Errorf("tags len = %d, want 2", len(got.Tags))
}
if got.Metadata["pair"] != "BTCUSDT" {
t.Errorf("metadata pair = %v, want BTCUSDT", got.Metadata["pair"])
}
// Update
got.Status = StatusStale
if err := db.UpdateEntity(got); err != nil {
t.Fatalf("update: %v", err)
}
updated, _ := db.GetEntity("ticks_btcusdt_2024")
if updated.Status != StatusStale {
t.Errorf("status = %q, want stale", updated.Status)
}
// List
all, err := db.ListEntities("market_data", "")
if err != nil {
t.Fatalf("list: %v", err)
}
if len(all) != 1 {
t.Errorf("expected 1, got %d", len(all))
}
// Search
found, err := db.SearchEntities("btcusdt", "")
if err != nil {
t.Fatalf("search: %v", err)
}
if len(found) != 1 {
t.Errorf("search expected 1, got %d", len(found))
}
// Delete
if err := db.DeleteEntity("ticks_btcusdt_2024"); err != nil {
t.Fatalf("delete: %v", err)
}
deleted, _ := db.GetEntity("ticks_btcusdt_2024")
if deleted != nil {
t.Error("expected nil after delete")
}
}
func TestRelationCRUD(t *testing.T) {
db := tempDB(t)
// Setup entities
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
r := &Relation{
ID: "a__to__b__via__transform",
Name: "TRANSFORMA",
FromEntity: "a",
ToEntity: "b",
Direction: DirUnidirectional,
Status: RelDesigned,
}
if err := InsertRelationSafe(db, r); err != nil {
t.Fatalf("insert relation: %v", err)
}
got, err := db.GetRelation("a__to__b__via__transform")
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected relation, got nil")
}
if got.Name != "TRANSFORMA" {
t.Errorf("name = %q, want TRANSFORMA", got.Name)
}
// List by entity
rels, err := db.ListRelations("a")
if err != nil {
t.Fatalf("list: %v", err)
}
if len(rels) != 1 {
t.Errorf("expected 1, got %d", len(rels))
}
// Delete
if err := db.DeleteRelation("a__to__b__via__transform"); err != nil {
t.Fatalf("delete: %v", err)
}
deleted, _ := db.GetRelation("a__to__b__via__transform")
if deleted != nil {
t.Error("expected nil after delete")
}
}
func TestRelationInputs(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "c", Name: "c", TypeRef: "t1", Status: StatusActive, Source: "test"})
r := &Relation{
ID: "multi__to__c",
Name: "ENRIQUECE",
ToEntity: "c",
Direction: DirUnidirectional,
Status: RelDesigned,
}
inputs := []RelationInput{
{ID: "i1", RelationID: "multi__to__c", EntityID: "a", Role: "base"},
{ID: "i2", RelationID: "multi__to__c", EntityID: "b", Role: "lookup"},
}
if err := InsertRelationWithInputs(db, r, inputs); err != nil {
t.Fatalf("insert with inputs: %v", err)
}
got, err := db.GetRelationInputs("multi__to__c")
if err != nil {
t.Fatalf("get inputs: %v", err)
}
if len(got) != 2 {
t.Errorf("expected 2 inputs, got %d", len(got))
}
}
func TestCycleDetectionCausal(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "c", Name: "c", TypeRef: "t1", Status: StatusActive, Source: "test"})
// a -> b (causal)
InsertRelationSafe(db, &Relation{
ID: "ab", Name: "T1", FromEntity: "a", ToEntity: "b", Via: "fn1",
Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned,
})
// b -> c (causal)
InsertRelationSafe(db, &Relation{
ID: "bc", Name: "T2", FromEntity: "b", ToEntity: "c", Via: "fn2",
Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned,
})
// c -> a (causal) should fail — creates cycle
err := InsertRelationSafe(db, &Relation{
ID: "ca", Name: "T3", FromEntity: "c", ToEntity: "a", Via: "fn3",
Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned,
})
if err == nil {
t.Fatal("expected cycle error, got nil")
}
}
func TestCycleDetectionSemanticAllowed(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "juan", Name: "juan", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "paula", Name: "paula", TypeRef: "t1", Status: StatusActive, Source: "test"})
// juan -> paula (semantic, no via)
if err := InsertRelationSafe(db, &Relation{
ID: "jp", Name: "CONOCE A", FromEntity: "juan", ToEntity: "paula",
Direction: DirBidirectional, Status: RelRunning,
}); err != nil {
t.Fatalf("insert semantic: %v", err)
}
// paula -> juan (semantic, no via) — should succeed, no cycle check
if err := InsertRelationSafe(db, &Relation{
ID: "pj", Name: "CONOCE A", FromEntity: "paula", ToEntity: "juan",
Direction: DirBidirectional, Status: RelRunning,
}); err != nil {
t.Fatalf("semantic cycle should be allowed: %v", err)
}
}
func TestValidateEntity(t *testing.T) {
tests := []struct {
name string
entity Entity
wantErr bool
}{
{
name: "valid",
entity: Entity{ID: "x", Name: "x", TypeRef: "t1", Status: StatusActive, Source: "test"},
wantErr: false,
},
{
name: "missing name",
entity: Entity{ID: "x", TypeRef: "t1", Status: StatusActive, Source: "test"},
wantErr: true,
},
{
name: "missing source",
entity: Entity{ID: "x", Name: "x", TypeRef: "t1", Status: StatusActive},
wantErr: true,
},
{
name: "missing type_ref",
entity: Entity{ID: "x", Name: "x", Status: StatusActive, Source: "test"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateEntity(&tt.entity)
if (err != nil) != tt.wantErr {
t.Errorf("ValidateEntity() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestGetEntityGraph(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
InsertRelationSafe(db, &Relation{
ID: "ab", Name: "FLUYE", FromEntity: "a", ToEntity: "b",
Direction: DirUnidirectional, Status: RelDesigned,
})
g, err := GetEntityGraph(db)
if err != nil {
t.Fatalf("graph: %v", err)
}
if len(g.Entities) != 2 {
t.Errorf("entities = %d, want 2", len(g.Entities))
}
if len(g.Relations) != 1 {
t.Errorf("relations = %d, want 1", len(g.Relations))
}
}
+459
View File
@@ -0,0 +1,459 @@
package fn_operations
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
)
func marshalStrings(ss []string) string {
if ss == nil {
ss = []string{}
}
b, _ := json.Marshal(ss)
return string(b)
}
func unmarshalStrings(s string) []string {
var out []string
json.Unmarshal([]byte(s), &out)
if out == nil {
out = []string{}
}
return out
}
func marshalJSON(v map[string]any) string {
if v == nil {
v = map[string]any{}
}
b, _ := json.Marshal(v)
return string(b)
}
func unmarshalJSON(s string) map[string]any {
var out map[string]any
json.Unmarshal([]byte(s), &out)
if out == nil {
out = map[string]any{}
}
return out
}
// --- TypeSnapshot CRUD ---
// InsertTypeSnapshot inserts a type snapshot.
func (db *DB) InsertTypeSnapshot(ts *TypeSnapshot) error {
if ts.SnappedAt.IsZero() {
ts.SnappedAt = time.Now().UTC()
}
_, err := db.conn.Exec(`
INSERT OR IGNORE INTO types_snapshot (id, version, lang, algebraic, definition, description, snapped_at)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
ts.ID, ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description,
ts.SnappedAt.Format(time.RFC3339),
)
return err
}
// GetTypeSnapshot returns a type snapshot by ID.
func (db *DB) GetTypeSnapshot(id string) (*TypeSnapshot, error) {
row := db.conn.QueryRow("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot WHERE id = ?", id)
var ts TypeSnapshot
var snappedAt string
err := row.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
}
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
return &ts, nil
}
// ListTypeSnapshots returns all type snapshots.
func (db *DB) ListTypeSnapshots() ([]TypeSnapshot, error) {
rows, err := db.conn.Query("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot ORDER BY id")
if err != nil {
return nil, err
}
defer rows.Close()
var result []TypeSnapshot
for rows.Next() {
var ts TypeSnapshot
var snappedAt string
if err := rows.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt); err != nil {
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
}
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
result = append(result, ts)
}
return result, nil
}
// --- Entity CRUD ---
// InsertEntity inserts or replaces an entity.
func (db *DB) InsertEntity(e *Entity) error {
now := time.Now().UTC()
if e.CreatedAt.IsZero() {
e.CreatedAt = now
}
e.UpdatedAt = now
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
e.CreatedAt.Format(time.RFC3339), e.UpdatedAt.Format(time.RFC3339),
)
return err
}
// GetEntity returns an entity by ID.
func (db *DB) GetEntity(id string) (*Entity, error) {
row := db.conn.QueryRow(`
SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at
FROM entities WHERE id = ?`, id)
var e Entity
var tagsJSON, metadataJSON, createdAt, updatedAt string
err := row.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning entity: %w", err)
}
e.Tags = unmarshalStrings(tagsJSON)
e.Metadata = unmarshalJSON(metadataJSON)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
return &e, nil
}
// UpdateEntity updates an existing entity.
func (db *DB) UpdateEntity(e *Entity) error {
e.UpdatedAt = time.Now().UTC()
_, err := db.conn.Exec(`
UPDATE entities SET name=?, type_ref=?, status=?, description=?, domain=?, tags=?, source=?, metadata=?, notes=?, updated_at=?
WHERE id=?`,
e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
e.UpdatedAt.Format(time.RFC3339), e.ID,
)
return err
}
// DeleteEntity removes an entity by ID.
func (db *DB) DeleteEntity(id string) error {
_, err := db.conn.Exec("DELETE FROM entities WHERE id = ?", id)
return err
}
// ListEntities returns entities filtered by domain and/or status.
func (db *DB) ListEntities(domain string, status EntityStatus) ([]Entity, error) {
where := []string{}
args := []any{}
if domain != "" {
where = append(where, "domain = ?")
args = append(args, domain)
}
if status != "" {
where = append(where, "status = ?")
args = append(args, string(status))
}
q := "SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at FROM entities"
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEntities(rows)
}
// SearchEntities performs FTS search on entities.
func (db *DB) SearchEntities(query, domain string) ([]Entity, error) {
where := []string{}
args := []any{}
if query != "" {
where = append(where, "e.id IN (SELECT id FROM entities_fts WHERE entities_fts MATCH ?)")
args = append(args, query)
}
if domain != "" {
where = append(where, "e.domain = ?")
args = append(args, domain)
}
q := "SELECT e.id, e.name, e.type_ref, e.status, e.description, e.domain, e.tags, e.source, e.metadata, e.notes, e.created_at, e.updated_at FROM entities e"
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY e.name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEntities(rows)
}
func scanEntities(rows *sql.Rows) ([]Entity, error) {
var result []Entity
for rows.Next() {
var e Entity
var tagsJSON, metadataJSON, createdAt, updatedAt string
if err := rows.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("scanning entity: %w", err)
}
e.Tags = unmarshalStrings(tagsJSON)
e.Metadata = unmarshalJSON(metadataJSON)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
result = append(result, e)
}
return result, nil
}
// --- Relation CRUD ---
// InsertRelation inserts or replaces a relation.
func (db *DB) InsertRelation(r *Relation) error {
now := time.Now().UTC()
if r.CreatedAt.IsZero() {
r.CreatedAt = now
}
r.UpdatedAt = now
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339),
)
return err
}
// GetRelation returns a relation by ID.
func (db *DB) GetRelation(id string) (*Relation, error) {
row := db.conn.QueryRow(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE id = ?`, id)
return scanRelation(row)
}
// UpdateRelation updates an existing relation.
func (db *DB) UpdateRelation(r *Relation) error {
r.UpdatedAt = time.Now().UTC()
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
UPDATE relations SET name=?, from_entity=?, to_entity=?, via=?, description=?, purity=?, direction=?, weight=?, status=?, started_at=?, ended_at=?, "order"=?, tags=?, notes=?, updated_at=?
WHERE id=?`,
r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.UpdatedAt.Format(time.RFC3339), r.ID,
)
return err
}
// DeleteRelation removes a relation by ID (cascades to relation_inputs).
func (db *DB) DeleteRelation(id string) error {
_, err := db.conn.Exec("DELETE FROM relations WHERE id = ?", id)
return err
}
// ListRelations returns all relations, optionally filtered by entity involvement.
func (db *DB) ListRelations(entityID string) ([]Relation, error) {
q := `SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at FROM relations`
var args []any
if entityID != "" {
q += " WHERE from_entity = ? OR to_entity = ?"
args = append(args, entityID, entityID)
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
// GetRelationsFrom returns all relations where from_entity matches.
func (db *DB) GetRelationsFrom(entityID string) ([]Relation, error) {
rows, err := db.conn.Query(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE from_entity = ? ORDER BY name`, entityID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
// GetRelationsTo returns all relations where to_entity matches.
func (db *DB) GetRelationsTo(entityID string) ([]Relation, error) {
rows, err := db.conn.Query(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE to_entity = ? ORDER BY name`, entityID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
func scanRelation(row *sql.Row) (*Relation, error) {
var r Relation
var tagsJSON, createdAt, updatedAt string
var startedAt, endedAt *string
err := row.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
&r.Purity, &r.Direction, &r.Weight, &r.Status,
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning relation: %w", err)
}
r.Tags = unmarshalStrings(tagsJSON)
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
if startedAt != nil {
t, _ := time.Parse(time.RFC3339, *startedAt)
r.StartedAt = &t
}
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
r.EndedAt = &t
}
return &r, nil
}
func scanRelationFromRows(rows *sql.Rows) (*Relation, error) {
var r Relation
var tagsJSON, createdAt, updatedAt string
var startedAt, endedAt *string
err := rows.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
&r.Purity, &r.Direction, &r.Weight, &r.Status,
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
if err != nil {
return nil, fmt.Errorf("scanning relation: %w", err)
}
r.Tags = unmarshalStrings(tagsJSON)
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
if startedAt != nil {
t, _ := time.Parse(time.RFC3339, *startedAt)
r.StartedAt = &t
}
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
r.EndedAt = &t
}
return &r, nil
}
// --- RelationInput CRUD ---
// InsertRelationInput inserts a relation input.
func (db *DB) InsertRelationInput(ri *RelationInput) error {
_, err := db.conn.Exec(`
INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order")
VALUES (?, ?, ?, ?, ?)`,
ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order,
)
return err
}
// GetRelationInputs returns all inputs for a relation.
func (db *DB) GetRelationInputs(relationID string) ([]RelationInput, error) {
rows, err := db.conn.Query(`
SELECT id, relation_id, entity_id, role, "order"
FROM relation_inputs WHERE relation_id = ? ORDER BY "order"`, relationID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []RelationInput
for rows.Next() {
var ri RelationInput
if err := rows.Scan(&ri.ID, &ri.RelationID, &ri.EntityID, &ri.Role, &ri.Order); err != nil {
return nil, fmt.Errorf("scanning relation_input: %w", err)
}
result = append(result, ri)
}
return result, nil
}
// DeleteRelationInputs removes all inputs for a relation.
func (db *DB) DeleteRelationInputs(relationID string) error {
_, err := db.conn.Exec("DELETE FROM relation_inputs WHERE relation_id = ?", relationID)
return err
}
+181
View File
@@ -0,0 +1,181 @@
package fn_operations
import (
"fmt"
"strings"
)
// ValidationError represents one or more integrity violations.
type ValidationError struct {
ID string
Errors []string
}
func (v *ValidationError) Error() string {
return fmt.Sprintf("%s: %s", v.ID, strings.Join(v.Errors, "; "))
}
// ValidateEntity checks entity integrity rules.
func ValidateEntity(e *Entity) *ValidationError {
var errs []string
if e.ID == "" {
errs = append(errs, "id is required")
}
if e.Name == "" {
errs = append(errs, "name is required")
}
if e.TypeRef == "" {
errs = append(errs, "type_ref is required")
}
if e.Source == "" {
errs = append(errs, "source is required")
}
switch e.Status {
case StatusActive, StatusStale, StatusCorrupted, StatusArchived:
case "":
errs = append(errs, "status is required")
default:
errs = append(errs, fmt.Sprintf("invalid status: %s", e.Status))
}
if len(errs) > 0 {
return &ValidationError{ID: e.ID, Errors: errs}
}
return nil
}
// ValidateRelation checks relation integrity rules.
// knownEntities is a set of entity IDs that exist.
func ValidateRelation(r *Relation, knownEntities map[string]bool) *ValidationError {
var errs []string
if r.ID == "" {
errs = append(errs, "id is required")
}
if r.Name == "" {
errs = append(errs, "name is required")
}
if r.ToEntity == "" {
errs = append(errs, "to_entity is required")
}
// from_entity or relation_inputs — validated at operation level
if r.FromEntity != "" && r.ToEntity != "" && r.FromEntity == r.ToEntity {
errs = append(errs, "from_entity and to_entity cannot be the same")
}
if r.FromEntity != "" && !knownEntities[r.FromEntity] {
errs = append(errs, fmt.Sprintf("from_entity references unknown entity: %s", r.FromEntity))
}
if r.ToEntity != "" && !knownEntities[r.ToEntity] {
errs = append(errs, fmt.Sprintf("to_entity references unknown entity: %s", r.ToEntity))
}
if r.Weight != nil {
if *r.Weight < 0.0 || *r.Weight > 1.0 {
errs = append(errs, "weight must be between 0.0 and 1.0")
}
}
if r.StartedAt != nil && r.EndedAt != nil {
if r.StartedAt.After(*r.EndedAt) {
errs = append(errs, "started_at must be before ended_at")
}
}
switch r.Direction {
case DirUnidirectional, DirBidirectional, DirInverse, "":
default:
errs = append(errs, fmt.Sprintf("invalid direction: %s", r.Direction))
}
if len(errs) > 0 {
return &ValidationError{ID: r.ID, Errors: errs}
}
return nil
}
// ValidateRelationInputs checks relation_inputs integrity.
func ValidateRelationInputs(inputs []RelationInput, knownEntities map[string]bool) *ValidationError {
var errs []string
if len(inputs) < 2 {
errs = append(errs, "relation_inputs must have at least 2 entries")
}
for i, ri := range inputs {
if ri.RelationID == "" {
errs = append(errs, fmt.Sprintf("input[%d]: relation_id is required", i))
}
if ri.EntityID == "" {
errs = append(errs, fmt.Sprintf("input[%d]: entity_id is required", i))
}
if ri.Role == "" {
errs = append(errs, fmt.Sprintf("input[%d]: role is required", i))
}
if ri.EntityID != "" && !knownEntities[ri.EntityID] {
errs = append(errs, fmt.Sprintf("input[%d]: entity_id references unknown entity: %s", i, ri.EntityID))
}
}
if len(errs) > 0 {
id := "relation_inputs"
if len(inputs) > 0 {
id = inputs[0].RelationID
}
return &ValidationError{ID: id, Errors: errs}
}
return nil
}
// DetectCycle checks if adding a causal relation (from -> to) creates a cycle.
// Only considers relations where via != "" (causal/transformational).
// Semantic relations (via == "") are exempt from cycle detection.
func DetectCycle(db *DB, fromEntity, toEntity string) error {
if fromEntity == "" || toEntity == "" {
return nil
}
// BFS from toEntity following only causal relations.
// If we reach fromEntity, there's a cycle.
visited := map[string]bool{}
queue := []string{toEntity}
for len(queue) > 0 {
current := queue[0]
queue = queue[1:]
if visited[current] {
continue
}
visited[current] = true
if current == fromEntity {
return fmt.Errorf("cycle detected: adding relation %s -> %s would create a causal cycle", fromEntity, toEntity)
}
// Follow causal relations from current entity
rows, err := db.conn.Query(`
SELECT to_entity FROM relations
WHERE from_entity = ? AND via != ''`, current)
if err != nil {
return fmt.Errorf("querying relations for cycle detection: %w", err)
}
for rows.Next() {
var next string
if err := rows.Scan(&next); err != nil {
rows.Close()
return err
}
if !visited[next] {
queue = append(queue, next)
}
}
rows.Close()
}
return nil
}