9ba1f86c34
Añade Execution, Assertion, AssertionResult al paquete fn_operations. Motor de evaluación de assertions con reescritura SQL automática. Bucle reactivo: ExecuteAndReact evalúa assertions y cambia status de entities (corrupted/stale) + auto-crea proposals en registry. CLI fn ops: assertion (add/list/show/delete/eval) y execution (add/list/show). Migración 002_executions_assertions.sql con FTS para assertions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
850 lines
24 KiB
Go
850 lines
24 KiB
Go
package fn_operations
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func tempDB(t *testing.T) *DB {
|
|
t.Helper()
|
|
path := filepath.Join(t.TempDir(), "test.db")
|
|
db, err := Open(path)
|
|
if err != nil {
|
|
t.Fatalf("opening test db: %v", err)
|
|
}
|
|
t.Cleanup(func() { db.Close() })
|
|
return db
|
|
}
|
|
|
|
func TestOpenAndClose(t *testing.T) {
|
|
path := filepath.Join(t.TempDir(), "test.db")
|
|
db, err := Open(path)
|
|
if err != nil {
|
|
t.Fatalf("open: %v", err)
|
|
}
|
|
if err := db.Close(); err != nil {
|
|
t.Fatalf("close: %v", err)
|
|
}
|
|
if _, err := os.Stat(path); os.IsNotExist(err) {
|
|
t.Fatal("db file should exist")
|
|
}
|
|
}
|
|
|
|
func TestTypeSnapshotCRUD(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
ts := &TypeSnapshot{
|
|
ID: "ohlcv_go_finance",
|
|
Version: "1.0.0",
|
|
Lang: "go",
|
|
Algebraic: "product",
|
|
Definition: "type OHLCV struct { ... }",
|
|
Description: "Vela de mercado",
|
|
}
|
|
|
|
if err := db.InsertTypeSnapshot(ts); err != nil {
|
|
t.Fatalf("insert: %v", err)
|
|
}
|
|
|
|
got, err := db.GetTypeSnapshot("ohlcv_go_finance")
|
|
if err != nil {
|
|
t.Fatalf("get: %v", err)
|
|
}
|
|
if got == nil {
|
|
t.Fatal("expected snapshot, got nil")
|
|
}
|
|
if got.Definition != ts.Definition {
|
|
t.Errorf("definition = %q, want %q", got.Definition, ts.Definition)
|
|
}
|
|
|
|
// INSERT OR IGNORE: second insert should not overwrite
|
|
ts2 := &TypeSnapshot{
|
|
ID: "ohlcv_go_finance",
|
|
Version: "2.0.0",
|
|
Lang: "go",
|
|
Algebraic: "product",
|
|
Definition: "type OHLCV struct { CHANGED }",
|
|
Description: "Changed",
|
|
}
|
|
if err := db.InsertTypeSnapshot(ts2); err != nil {
|
|
t.Fatalf("insert duplicate: %v", err)
|
|
}
|
|
got2, _ := db.GetTypeSnapshot("ohlcv_go_finance")
|
|
if got2.Version != "1.0.0" {
|
|
t.Errorf("snapshot should be immutable, got version %q", got2.Version)
|
|
}
|
|
|
|
// Not found
|
|
missing, err := db.GetTypeSnapshot("nonexistent")
|
|
if err != nil {
|
|
t.Fatalf("get missing: %v", err)
|
|
}
|
|
if missing != nil {
|
|
t.Error("expected nil for missing snapshot")
|
|
}
|
|
|
|
all, err := db.ListTypeSnapshots()
|
|
if err != nil {
|
|
t.Fatalf("list: %v", err)
|
|
}
|
|
if len(all) != 1 {
|
|
t.Errorf("expected 1 snapshot, got %d", len(all))
|
|
}
|
|
}
|
|
|
|
func TestEntityCRUD(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
// Insert snapshot first (type_ref)
|
|
db.InsertTypeSnapshot(&TypeSnapshot{
|
|
ID: "tick_go_finance", Version: "1.0.0", Lang: "go", Algebraic: "product",
|
|
})
|
|
|
|
e := &Entity{
|
|
ID: "ticks_btcusdt_2024",
|
|
Name: "ticks_btcusdt_2024",
|
|
TypeRef: "tick_go_finance",
|
|
Status: StatusActive,
|
|
Source: "binance_api",
|
|
Domain: "market_data",
|
|
Tags: []string{"btc", "binance"},
|
|
Metadata: map[string]any{
|
|
"pair": "BTCUSDT",
|
|
"exchange": "binance",
|
|
},
|
|
}
|
|
|
|
if err := db.InsertEntity(e); err != nil {
|
|
t.Fatalf("insert: %v", err)
|
|
}
|
|
|
|
got, err := db.GetEntity("ticks_btcusdt_2024")
|
|
if err != nil {
|
|
t.Fatalf("get: %v", err)
|
|
}
|
|
if got == nil {
|
|
t.Fatal("expected entity, got nil")
|
|
}
|
|
if got.Source != "binance_api" {
|
|
t.Errorf("source = %q, want binance_api", got.Source)
|
|
}
|
|
if len(got.Tags) != 2 {
|
|
t.Errorf("tags len = %d, want 2", len(got.Tags))
|
|
}
|
|
if got.Metadata["pair"] != "BTCUSDT" {
|
|
t.Errorf("metadata pair = %v, want BTCUSDT", got.Metadata["pair"])
|
|
}
|
|
|
|
// Update
|
|
got.Status = StatusStale
|
|
if err := db.UpdateEntity(got); err != nil {
|
|
t.Fatalf("update: %v", err)
|
|
}
|
|
updated, _ := db.GetEntity("ticks_btcusdt_2024")
|
|
if updated.Status != StatusStale {
|
|
t.Errorf("status = %q, want stale", updated.Status)
|
|
}
|
|
|
|
// List
|
|
all, err := db.ListEntities("market_data", "")
|
|
if err != nil {
|
|
t.Fatalf("list: %v", err)
|
|
}
|
|
if len(all) != 1 {
|
|
t.Errorf("expected 1, got %d", len(all))
|
|
}
|
|
|
|
// Search
|
|
found, err := db.SearchEntities("btcusdt", "")
|
|
if err != nil {
|
|
t.Fatalf("search: %v", err)
|
|
}
|
|
if len(found) != 1 {
|
|
t.Errorf("search expected 1, got %d", len(found))
|
|
}
|
|
|
|
// Delete
|
|
if err := db.DeleteEntity("ticks_btcusdt_2024"); err != nil {
|
|
t.Fatalf("delete: %v", err)
|
|
}
|
|
deleted, _ := db.GetEntity("ticks_btcusdt_2024")
|
|
if deleted != nil {
|
|
t.Error("expected nil after delete")
|
|
}
|
|
}
|
|
|
|
func TestRelationCRUD(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
// Setup entities
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
|
|
r := &Relation{
|
|
ID: "a__to__b__via__transform",
|
|
Name: "TRANSFORMA",
|
|
FromEntity: "a",
|
|
ToEntity: "b",
|
|
Direction: DirUnidirectional,
|
|
Status: RelDesigned,
|
|
}
|
|
|
|
if err := InsertRelationSafe(db, r); err != nil {
|
|
t.Fatalf("insert relation: %v", err)
|
|
}
|
|
|
|
got, err := db.GetRelation("a__to__b__via__transform")
|
|
if err != nil {
|
|
t.Fatalf("get: %v", err)
|
|
}
|
|
if got == nil {
|
|
t.Fatal("expected relation, got nil")
|
|
}
|
|
if got.Name != "TRANSFORMA" {
|
|
t.Errorf("name = %q, want TRANSFORMA", got.Name)
|
|
}
|
|
|
|
// List by entity
|
|
rels, err := db.ListRelations("a")
|
|
if err != nil {
|
|
t.Fatalf("list: %v", err)
|
|
}
|
|
if len(rels) != 1 {
|
|
t.Errorf("expected 1, got %d", len(rels))
|
|
}
|
|
|
|
// Delete
|
|
if err := db.DeleteRelation("a__to__b__via__transform"); err != nil {
|
|
t.Fatalf("delete: %v", err)
|
|
}
|
|
deleted, _ := db.GetRelation("a__to__b__via__transform")
|
|
if deleted != nil {
|
|
t.Error("expected nil after delete")
|
|
}
|
|
}
|
|
|
|
func TestRelationInputs(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "c", Name: "c", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
|
|
r := &Relation{
|
|
ID: "multi__to__c",
|
|
Name: "ENRIQUECE",
|
|
ToEntity: "c",
|
|
Direction: DirUnidirectional,
|
|
Status: RelDesigned,
|
|
}
|
|
|
|
inputs := []RelationInput{
|
|
{ID: "i1", RelationID: "multi__to__c", EntityID: "a", Role: "base"},
|
|
{ID: "i2", RelationID: "multi__to__c", EntityID: "b", Role: "lookup"},
|
|
}
|
|
|
|
if err := InsertRelationWithInputs(db, r, inputs); err != nil {
|
|
t.Fatalf("insert with inputs: %v", err)
|
|
}
|
|
|
|
got, err := db.GetRelationInputs("multi__to__c")
|
|
if err != nil {
|
|
t.Fatalf("get inputs: %v", err)
|
|
}
|
|
if len(got) != 2 {
|
|
t.Errorf("expected 2 inputs, got %d", len(got))
|
|
}
|
|
}
|
|
|
|
func TestCycleDetectionCausal(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "c", Name: "c", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
|
|
// a -> b (causal)
|
|
InsertRelationSafe(db, &Relation{
|
|
ID: "ab", Name: "T1", FromEntity: "a", ToEntity: "b", Via: "fn1",
|
|
Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned,
|
|
})
|
|
|
|
// b -> c (causal)
|
|
InsertRelationSafe(db, &Relation{
|
|
ID: "bc", Name: "T2", FromEntity: "b", ToEntity: "c", Via: "fn2",
|
|
Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned,
|
|
})
|
|
|
|
// c -> a (causal) should fail — creates cycle
|
|
err := InsertRelationSafe(db, &Relation{
|
|
ID: "ca", Name: "T3", FromEntity: "c", ToEntity: "a", Via: "fn3",
|
|
Purity: "impure", Direction: DirUnidirectional, Status: RelDesigned,
|
|
})
|
|
if err == nil {
|
|
t.Fatal("expected cycle error, got nil")
|
|
}
|
|
}
|
|
|
|
func TestCycleDetectionSemanticAllowed(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "juan", Name: "juan", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "paula", Name: "paula", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
|
|
// juan -> paula (semantic, no via)
|
|
if err := InsertRelationSafe(db, &Relation{
|
|
ID: "jp", Name: "CONOCE A", FromEntity: "juan", ToEntity: "paula",
|
|
Direction: DirBidirectional, Status: RelRunning,
|
|
}); err != nil {
|
|
t.Fatalf("insert semantic: %v", err)
|
|
}
|
|
|
|
// paula -> juan (semantic, no via) — should succeed, no cycle check
|
|
if err := InsertRelationSafe(db, &Relation{
|
|
ID: "pj", Name: "CONOCE A", FromEntity: "paula", ToEntity: "juan",
|
|
Direction: DirBidirectional, Status: RelRunning,
|
|
}); err != nil {
|
|
t.Fatalf("semantic cycle should be allowed: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestValidateEntity(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
entity Entity
|
|
wantErr bool
|
|
}{
|
|
{
|
|
name: "valid",
|
|
entity: Entity{ID: "x", Name: "x", TypeRef: "t1", Status: StatusActive, Source: "test"},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "missing name",
|
|
entity: Entity{ID: "x", TypeRef: "t1", Status: StatusActive, Source: "test"},
|
|
wantErr: true,
|
|
},
|
|
{
|
|
name: "missing source",
|
|
entity: Entity{ID: "x", Name: "x", TypeRef: "t1", Status: StatusActive},
|
|
wantErr: true,
|
|
},
|
|
{
|
|
name: "missing type_ref",
|
|
entity: Entity{ID: "x", Name: "x", Status: StatusActive, Source: "test"},
|
|
wantErr: true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
err := ValidateEntity(&tt.entity)
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("ValidateEntity() error = %v, wantErr %v", err, tt.wantErr)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestExecutionCRUD(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
started := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC)
|
|
ended := time.Date(2026, 3, 28, 10, 5, 0, 0, time.UTC)
|
|
rin := int64(1000)
|
|
rout := int64(60)
|
|
|
|
e := &Execution{
|
|
ID: "exec_1",
|
|
PipelineID: "tick_to_ohlcv_go_finance",
|
|
RelationID: "rel_1",
|
|
Status: ExecSuccess,
|
|
StartedAt: started,
|
|
EndedAt: &ended,
|
|
RecordsIn: &rin,
|
|
RecordsOut: &rout,
|
|
Metrics: map[string]any{"mean_close": 42000},
|
|
}
|
|
|
|
if err := InsertExecutionSafe(db, e); err != nil {
|
|
t.Fatalf("insert: %v", err)
|
|
}
|
|
|
|
// Auto-calculated duration
|
|
if e.DurationMs == nil || *e.DurationMs != 300000 {
|
|
t.Errorf("duration_ms = %v, want 300000", e.DurationMs)
|
|
}
|
|
|
|
got, err := db.GetExecution("exec_1")
|
|
if err != nil {
|
|
t.Fatalf("get: %v", err)
|
|
}
|
|
if got == nil {
|
|
t.Fatal("expected execution, got nil")
|
|
}
|
|
if got.PipelineID != "tick_to_ohlcv_go_finance" {
|
|
t.Errorf("pipeline_id = %q", got.PipelineID)
|
|
}
|
|
if got.Metrics["mean_close"] != float64(42000) {
|
|
t.Errorf("metrics mean_close = %v", got.Metrics["mean_close"])
|
|
}
|
|
|
|
// List
|
|
all, err := db.ListExecutions("", "", "")
|
|
if err != nil {
|
|
t.Fatalf("list: %v", err)
|
|
}
|
|
if len(all) != 1 {
|
|
t.Errorf("list = %d, want 1", len(all))
|
|
}
|
|
|
|
byPipeline, _ := db.ListExecutions("tick_to_ohlcv_go_finance", "", "")
|
|
if len(byPipeline) != 1 {
|
|
t.Errorf("list by pipeline = %d, want 1", len(byPipeline))
|
|
}
|
|
|
|
byStatus, _ := db.ListExecutions("", "", ExecFailure)
|
|
if len(byStatus) != 0 {
|
|
t.Errorf("list by failure status = %d, want 0", len(byStatus))
|
|
}
|
|
}
|
|
|
|
func TestAssertionCRUD(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
|
|
Metadata: map[string]any{"close": 120, "high": 150, "low": 90}})
|
|
|
|
a := &Assertion{
|
|
ID: "assert_close_positive",
|
|
EntityID: "e1",
|
|
Name: "close positivo",
|
|
Kind: "range",
|
|
Rule: "close > 0",
|
|
Severity: SeverityCritical,
|
|
Active: true,
|
|
}
|
|
|
|
if err := InsertAssertionSafe(db, a); err != nil {
|
|
t.Fatalf("insert: %v", err)
|
|
}
|
|
|
|
got, err := db.GetAssertion("assert_close_positive")
|
|
if err != nil {
|
|
t.Fatalf("get: %v", err)
|
|
}
|
|
if got == nil {
|
|
t.Fatal("expected assertion, got nil")
|
|
}
|
|
if got.Rule != "close > 0" {
|
|
t.Errorf("rule = %q", got.Rule)
|
|
}
|
|
if !got.Active {
|
|
t.Error("expected active = true")
|
|
}
|
|
|
|
// Update
|
|
got.Active = false
|
|
if err := db.UpdateAssertion(got); err != nil {
|
|
t.Fatalf("update: %v", err)
|
|
}
|
|
updated, _ := db.GetAssertion("assert_close_positive")
|
|
if updated.Active {
|
|
t.Error("expected active = false after update")
|
|
}
|
|
|
|
// List with active filter
|
|
active := true
|
|
byActive, _ := db.ListAssertions("e1", &active)
|
|
if len(byActive) != 0 {
|
|
t.Errorf("list active = %d, want 0 (we deactivated it)", len(byActive))
|
|
}
|
|
|
|
// Search FTS
|
|
updated.Active = true
|
|
db.UpdateAssertion(updated)
|
|
found, err := db.SearchAssertions("close", "")
|
|
if err != nil {
|
|
t.Fatalf("search: %v", err)
|
|
}
|
|
if len(found) != 1 {
|
|
t.Errorf("search 'close' = %d, want 1", len(found))
|
|
}
|
|
|
|
// Delete
|
|
if err := db.DeleteAssertion("assert_close_positive"); err != nil {
|
|
t.Fatalf("delete: %v", err)
|
|
}
|
|
deleted, _ := db.GetAssertion("assert_close_positive")
|
|
if deleted != nil {
|
|
t.Error("expected nil after delete")
|
|
}
|
|
}
|
|
|
|
func TestAssertionResultCRUD(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true})
|
|
|
|
ar := &AssertionResult{
|
|
ID: "ar_1",
|
|
AssertionID: "a1",
|
|
ExecutionID: "exec_1",
|
|
Status: ResultFail,
|
|
Value: map[string]any{"x": -5},
|
|
Message: "rule failed: x > 0",
|
|
EvaluatedAt: time.Now().UTC(),
|
|
}
|
|
|
|
if err := db.InsertAssertionResult(ar); err != nil {
|
|
t.Fatalf("insert: %v", err)
|
|
}
|
|
|
|
got, err := db.GetAssertionResult("ar_1")
|
|
if err != nil {
|
|
t.Fatalf("get: %v", err)
|
|
}
|
|
if got == nil {
|
|
t.Fatal("expected result, got nil")
|
|
}
|
|
if got.Status != ResultFail {
|
|
t.Errorf("status = %q, want fail", got.Status)
|
|
}
|
|
|
|
// List by assertion
|
|
byAssertion, _ := db.ListAssertionResults("a1", "")
|
|
if len(byAssertion) != 1 {
|
|
t.Errorf("list by assertion = %d, want 1", len(byAssertion))
|
|
}
|
|
|
|
// List by execution
|
|
byExec, _ := db.ListAssertionResults("", "exec_1")
|
|
if len(byExec) != 1 {
|
|
t.Errorf("list by execution = %d, want 1", len(byExec))
|
|
}
|
|
}
|
|
|
|
func TestEvalAssertion(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{
|
|
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
|
|
Metadata: map[string]any{"close": 120, "high": 150, "low": 90, "open": 100},
|
|
})
|
|
|
|
tests := []struct {
|
|
name string
|
|
rule string
|
|
kind string
|
|
wantStatus AssertionResultStatus
|
|
}{
|
|
{"range pass", "close > 0 AND close < 1000000", "range", ResultPass},
|
|
{"range fail", "close > 200", "range", ResultFail},
|
|
{"consistency pass", "low <= close AND close <= high", "consistency", ResultPass},
|
|
{"consistency fail", "low > close", "consistency", ResultFail},
|
|
{"null pass (field exists)", "close IS NOT NULL", "null", ResultPass},
|
|
{"json_extract direct", "json_extract(metadata, '$.close') > 0", "range", ResultPass},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
a := &Assertion{
|
|
ID: "test_" + tt.name, EntityID: "e1", Name: tt.name,
|
|
Kind: tt.kind, Rule: tt.rule, Severity: SeverityWarning, Active: true,
|
|
}
|
|
result, err := EvalAssertion(db, a, "")
|
|
if err != nil {
|
|
t.Fatalf("eval: %v", err)
|
|
}
|
|
if result.Status != tt.wantStatus {
|
|
t.Errorf("status = %q, want %q (message: %s)", result.Status, tt.wantStatus, result.Message)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEvalEntityAssertions(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{
|
|
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
|
|
Metadata: map[string]any{"close": 120},
|
|
})
|
|
|
|
db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "pass", Kind: "range", Rule: "close > 0", Severity: SeverityWarning, Active: true})
|
|
db.InsertAssertion(&Assertion{ID: "a2", EntityID: "e1", Name: "fail", Kind: "range", Rule: "close > 200", Severity: SeverityCritical, Active: true})
|
|
db.InsertAssertion(&Assertion{ID: "a3", EntityID: "e1", Name: "inactive", Kind: "range", Rule: "close > 999", Severity: SeverityInfo, Active: false})
|
|
|
|
results, err := EvalEntityAssertions(db, "e1", "exec_test")
|
|
if err != nil {
|
|
t.Fatalf("eval: %v", err)
|
|
}
|
|
|
|
// Should only eval active assertions (a1, a2), not a3
|
|
if len(results) != 2 {
|
|
t.Fatalf("expected 2 results, got %d", len(results))
|
|
}
|
|
|
|
pass, fail := 0, 0
|
|
for _, r := range results {
|
|
switch r.Status {
|
|
case ResultPass:
|
|
pass++
|
|
case ResultFail:
|
|
fail++
|
|
}
|
|
if r.ExecutionID != "exec_test" {
|
|
t.Errorf("execution_id = %q, want exec_test", r.ExecutionID)
|
|
}
|
|
}
|
|
if pass != 1 || fail != 1 {
|
|
t.Errorf("pass=%d fail=%d, want 1 and 1", pass, fail)
|
|
}
|
|
|
|
// Results should be persisted
|
|
stored, _ := db.ListAssertionResults("", "exec_test")
|
|
if len(stored) != 2 {
|
|
t.Errorf("stored results = %d, want 2", len(stored))
|
|
}
|
|
}
|
|
|
|
func TestReactCriticalCorruptsEntity(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{
|
|
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
|
|
Metadata: map[string]any{"close": 120},
|
|
})
|
|
db.InsertAssertion(&Assertion{ID: "a_crit", EntityID: "e1", Name: "critical_fail", Kind: "range", Rule: "close > 200", Severity: SeverityCritical, Active: true})
|
|
|
|
// Simulate failed assertion result
|
|
results := []AssertionResult{
|
|
{ID: "ar1", AssertionID: "a_crit", Status: ResultFail, Message: "close > 200 failed", EvaluatedAt: time.Now().UTC()},
|
|
}
|
|
|
|
rr, err := React(db, nil, results)
|
|
if err != nil {
|
|
t.Fatalf("react: %v", err)
|
|
}
|
|
|
|
if len(rr.EntityUpdates) != 1 {
|
|
t.Fatalf("expected 1 entity update, got %d", len(rr.EntityUpdates))
|
|
}
|
|
if rr.EntityUpdates[0].NewStatus != StatusCorrupted {
|
|
t.Errorf("new status = %q, want corrupted", rr.EntityUpdates[0].NewStatus)
|
|
}
|
|
|
|
// Verify entity in DB
|
|
entity, _ := db.GetEntity("e1")
|
|
if entity.Status != StatusCorrupted {
|
|
t.Errorf("entity status = %q, want corrupted", entity.Status)
|
|
}
|
|
}
|
|
|
|
func TestReactWarningStalesEntity(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{
|
|
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
|
|
Metadata: map[string]any{"close": 120},
|
|
})
|
|
db.InsertAssertion(&Assertion{ID: "a_warn", EntityID: "e1", Name: "warn_fail", Kind: "range", Rule: "close > 200", Severity: SeverityWarning, Active: true})
|
|
|
|
results := []AssertionResult{
|
|
{ID: "ar1", AssertionID: "a_warn", Status: ResultFail, Message: "warning", EvaluatedAt: time.Now().UTC()},
|
|
}
|
|
|
|
rr, err := React(db, nil, results)
|
|
if err != nil {
|
|
t.Fatalf("react: %v", err)
|
|
}
|
|
|
|
if len(rr.EntityUpdates) != 1 {
|
|
t.Fatalf("expected 1 entity update, got %d", len(rr.EntityUpdates))
|
|
}
|
|
if rr.EntityUpdates[0].NewStatus != StatusStale {
|
|
t.Errorf("new status = %q, want stale", rr.EntityUpdates[0].NewStatus)
|
|
}
|
|
}
|
|
|
|
func TestReactWarningDoesNotDowngradeCorrupted(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{
|
|
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusCorrupted, Source: "test",
|
|
})
|
|
db.InsertAssertion(&Assertion{ID: "a_warn", EntityID: "e1", Name: "warn", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true})
|
|
|
|
results := []AssertionResult{
|
|
{ID: "ar1", AssertionID: "a_warn", Status: ResultFail, EvaluatedAt: time.Now().UTC()},
|
|
}
|
|
|
|
rr, err := React(db, nil, results)
|
|
if err != nil {
|
|
t.Fatalf("react: %v", err)
|
|
}
|
|
|
|
// Warning should NOT change corrupted entity to stale
|
|
if len(rr.EntityUpdates) != 0 {
|
|
t.Errorf("expected no entity updates, got %d", len(rr.EntityUpdates))
|
|
}
|
|
}
|
|
|
|
func TestReactInfoNoStatusChange(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{
|
|
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
|
|
})
|
|
db.InsertAssertion(&Assertion{ID: "a_info", EntityID: "e1", Name: "info", Kind: "range", Rule: "x > 0", Severity: SeverityInfo, Active: true})
|
|
|
|
results := []AssertionResult{
|
|
{ID: "ar1", AssertionID: "a_info", Status: ResultFail, EvaluatedAt: time.Now().UTC()},
|
|
}
|
|
|
|
rr, err := React(db, nil, results)
|
|
if err != nil {
|
|
t.Fatalf("react: %v", err)
|
|
}
|
|
|
|
if len(rr.EntityUpdates) != 0 {
|
|
t.Errorf("info fail should not change status, got %d updates", len(rr.EntityUpdates))
|
|
}
|
|
}
|
|
|
|
func TestValidateExecution(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
e Execution
|
|
wantErr bool
|
|
}{
|
|
{"valid", Execution{ID: "e1", PipelineID: "p1", Status: ExecSuccess, StartedAt: time.Now()}, false},
|
|
{"missing id", Execution{PipelineID: "p1", Status: ExecSuccess, StartedAt: time.Now()}, true},
|
|
{"missing pipeline", Execution{ID: "e1", Status: ExecSuccess, StartedAt: time.Now()}, true},
|
|
{"missing status", Execution{ID: "e1", PipelineID: "p1", StartedAt: time.Now()}, true},
|
|
{"invalid status", Execution{ID: "e1", PipelineID: "p1", Status: "invalid", StartedAt: time.Now()}, true},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
err := ValidateExecution(&tt.e)
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateAssertion(t *testing.T) {
|
|
known := map[string]bool{"e1": true}
|
|
tests := []struct {
|
|
name string
|
|
a Assertion
|
|
wantErr bool
|
|
}{
|
|
{"valid", Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning}, false},
|
|
{"missing entity", Assertion{ID: "a1", EntityID: "unknown", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning}, true},
|
|
{"missing rule", Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Severity: SeverityWarning}, true},
|
|
{"missing kind", Assertion{ID: "a1", EntityID: "e1", Name: "test", Rule: "x > 0", Severity: SeverityWarning}, true},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
err := ValidateAssertion(&tt.a, known)
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestMigrations(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
var count int
|
|
err := db.conn.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("query: %v", err)
|
|
}
|
|
if count < 2 {
|
|
t.Errorf("expected at least 2 migrations, got %d", count)
|
|
}
|
|
|
|
// Verify all new tables exist
|
|
for _, table := range []string{"executions", "assertions", "assertion_results"} {
|
|
_, err := db.conn.Exec("SELECT 1 FROM " + table + " LIMIT 1")
|
|
if err != nil {
|
|
t.Errorf("table %s should exist: %v", table, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRecordExecutionWithResults(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true})
|
|
|
|
started := time.Now().UTC()
|
|
ended := started.Add(5 * time.Second)
|
|
e := &Execution{
|
|
ID: "exec_tx", PipelineID: "p1", Status: ExecSuccess,
|
|
StartedAt: started, EndedAt: &ended,
|
|
}
|
|
results := []AssertionResult{
|
|
{ID: "ar_tx_1", AssertionID: "a1", ExecutionID: "exec_tx", Status: ResultPass, EvaluatedAt: time.Now().UTC()},
|
|
}
|
|
|
|
if err := RecordExecutionWithResults(db, e, results); err != nil {
|
|
t.Fatalf("record: %v", err)
|
|
}
|
|
|
|
// Verify both were persisted
|
|
gotExec, _ := db.GetExecution("exec_tx")
|
|
if gotExec == nil {
|
|
t.Fatal("execution not found")
|
|
}
|
|
|
|
gotResult, _ := db.GetAssertionResult("ar_tx_1")
|
|
if gotResult == nil {
|
|
t.Fatal("assertion result not found")
|
|
}
|
|
}
|
|
|
|
func TestGetEntityGraph(t *testing.T) {
|
|
db := tempDB(t)
|
|
|
|
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
|
|
db.InsertEntity(&Entity{ID: "a", Name: "a", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
db.InsertEntity(&Entity{ID: "b", Name: "b", TypeRef: "t1", Status: StatusActive, Source: "test"})
|
|
InsertRelationSafe(db, &Relation{
|
|
ID: "ab", Name: "FLUYE", FromEntity: "a", ToEntity: "b",
|
|
Direction: DirUnidirectional, Status: RelDesigned,
|
|
})
|
|
|
|
g, err := GetEntityGraph(db)
|
|
if err != nil {
|
|
t.Fatalf("graph: %v", err)
|
|
}
|
|
if len(g.Entities) != 2 {
|
|
t.Errorf("entities = %d, want 2", len(g.Entities))
|
|
}
|
|
if len(g.Relations) != 1 {
|
|
t.Errorf("relations = %d, want 1", len(g.Relations))
|
|
}
|
|
}
|