feat: executions, assertions y bucle reactivo en fn_operations

Añade Execution, Assertion, AssertionResult al paquete fn_operations.
Motor de evaluación de assertions con reescritura SQL automática.
Bucle reactivo: ExecuteAndReact evalúa assertions y cambia status de
entities (corrupted/stale) + auto-crea proposals en registry.
CLI fn ops: assertion (add/list/show/delete/eval) y execution (add/list/show).
Migración 002_executions_assertions.sql con FTS para assertions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-28 17:13:37 +01:00
parent 8d98faccd9
commit 9ba1f86c34
11 changed files with 2230 additions and 87 deletions
+474
View File
@@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
)
func tempDB(t *testing.T) *DB {
@@ -351,6 +352,479 @@ func TestValidateEntity(t *testing.T) {
}
}
func TestExecutionCRUD(t *testing.T) {
db := tempDB(t)
started := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC)
ended := time.Date(2026, 3, 28, 10, 5, 0, 0, time.UTC)
rin := int64(1000)
rout := int64(60)
e := &Execution{
ID: "exec_1",
PipelineID: "tick_to_ohlcv_go_finance",
RelationID: "rel_1",
Status: ExecSuccess,
StartedAt: started,
EndedAt: &ended,
RecordsIn: &rin,
RecordsOut: &rout,
Metrics: map[string]any{"mean_close": 42000},
}
if err := InsertExecutionSafe(db, e); err != nil {
t.Fatalf("insert: %v", err)
}
// Auto-calculated duration
if e.DurationMs == nil || *e.DurationMs != 300000 {
t.Errorf("duration_ms = %v, want 300000", e.DurationMs)
}
got, err := db.GetExecution("exec_1")
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected execution, got nil")
}
if got.PipelineID != "tick_to_ohlcv_go_finance" {
t.Errorf("pipeline_id = %q", got.PipelineID)
}
if got.Metrics["mean_close"] != float64(42000) {
t.Errorf("metrics mean_close = %v", got.Metrics["mean_close"])
}
// List
all, err := db.ListExecutions("", "", "")
if err != nil {
t.Fatalf("list: %v", err)
}
if len(all) != 1 {
t.Errorf("list = %d, want 1", len(all))
}
byPipeline, _ := db.ListExecutions("tick_to_ohlcv_go_finance", "", "")
if len(byPipeline) != 1 {
t.Errorf("list by pipeline = %d, want 1", len(byPipeline))
}
byStatus, _ := db.ListExecutions("", "", ExecFailure)
if len(byStatus) != 0 {
t.Errorf("list by failure status = %d, want 0", len(byStatus))
}
}
func TestAssertionCRUD(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
Metadata: map[string]any{"close": 120, "high": 150, "low": 90}})
a := &Assertion{
ID: "assert_close_positive",
EntityID: "e1",
Name: "close positivo",
Kind: "range",
Rule: "close > 0",
Severity: SeverityCritical,
Active: true,
}
if err := InsertAssertionSafe(db, a); err != nil {
t.Fatalf("insert: %v", err)
}
got, err := db.GetAssertion("assert_close_positive")
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected assertion, got nil")
}
if got.Rule != "close > 0" {
t.Errorf("rule = %q", got.Rule)
}
if !got.Active {
t.Error("expected active = true")
}
// Update
got.Active = false
if err := db.UpdateAssertion(got); err != nil {
t.Fatalf("update: %v", err)
}
updated, _ := db.GetAssertion("assert_close_positive")
if updated.Active {
t.Error("expected active = false after update")
}
// List with active filter
active := true
byActive, _ := db.ListAssertions("e1", &active)
if len(byActive) != 0 {
t.Errorf("list active = %d, want 0 (we deactivated it)", len(byActive))
}
// Search FTS
updated.Active = true
db.UpdateAssertion(updated)
found, err := db.SearchAssertions("close", "")
if err != nil {
t.Fatalf("search: %v", err)
}
if len(found) != 1 {
t.Errorf("search 'close' = %d, want 1", len(found))
}
// Delete
if err := db.DeleteAssertion("assert_close_positive"); err != nil {
t.Fatalf("delete: %v", err)
}
deleted, _ := db.GetAssertion("assert_close_positive")
if deleted != nil {
t.Error("expected nil after delete")
}
}
func TestAssertionResultCRUD(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true})
ar := &AssertionResult{
ID: "ar_1",
AssertionID: "a1",
ExecutionID: "exec_1",
Status: ResultFail,
Value: map[string]any{"x": -5},
Message: "rule failed: x > 0",
EvaluatedAt: time.Now().UTC(),
}
if err := db.InsertAssertionResult(ar); err != nil {
t.Fatalf("insert: %v", err)
}
got, err := db.GetAssertionResult("ar_1")
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected result, got nil")
}
if got.Status != ResultFail {
t.Errorf("status = %q, want fail", got.Status)
}
// List by assertion
byAssertion, _ := db.ListAssertionResults("a1", "")
if len(byAssertion) != 1 {
t.Errorf("list by assertion = %d, want 1", len(byAssertion))
}
// List by execution
byExec, _ := db.ListAssertionResults("", "exec_1")
if len(byExec) != 1 {
t.Errorf("list by execution = %d, want 1", len(byExec))
}
}
func TestEvalAssertion(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
Metadata: map[string]any{"close": 120, "high": 150, "low": 90, "open": 100},
})
tests := []struct {
name string
rule string
kind string
wantStatus AssertionResultStatus
}{
{"range pass", "close > 0 AND close < 1000000", "range", ResultPass},
{"range fail", "close > 200", "range", ResultFail},
{"consistency pass", "low <= close AND close <= high", "consistency", ResultPass},
{"consistency fail", "low > close", "consistency", ResultFail},
{"null pass (field exists)", "close IS NOT NULL", "null", ResultPass},
{"json_extract direct", "json_extract(metadata, '$.close') > 0", "range", ResultPass},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &Assertion{
ID: "test_" + tt.name, EntityID: "e1", Name: tt.name,
Kind: tt.kind, Rule: tt.rule, Severity: SeverityWarning, Active: true,
}
result, err := EvalAssertion(db, a, "")
if err != nil {
t.Fatalf("eval: %v", err)
}
if result.Status != tt.wantStatus {
t.Errorf("status = %q, want %q (message: %s)", result.Status, tt.wantStatus, result.Message)
}
})
}
}
func TestEvalEntityAssertions(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
Metadata: map[string]any{"close": 120},
})
db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "pass", Kind: "range", Rule: "close > 0", Severity: SeverityWarning, Active: true})
db.InsertAssertion(&Assertion{ID: "a2", EntityID: "e1", Name: "fail", Kind: "range", Rule: "close > 200", Severity: SeverityCritical, Active: true})
db.InsertAssertion(&Assertion{ID: "a3", EntityID: "e1", Name: "inactive", Kind: "range", Rule: "close > 999", Severity: SeverityInfo, Active: false})
results, err := EvalEntityAssertions(db, "e1", "exec_test")
if err != nil {
t.Fatalf("eval: %v", err)
}
// Should only eval active assertions (a1, a2), not a3
if len(results) != 2 {
t.Fatalf("expected 2 results, got %d", len(results))
}
pass, fail := 0, 0
for _, r := range results {
switch r.Status {
case ResultPass:
pass++
case ResultFail:
fail++
}
if r.ExecutionID != "exec_test" {
t.Errorf("execution_id = %q, want exec_test", r.ExecutionID)
}
}
if pass != 1 || fail != 1 {
t.Errorf("pass=%d fail=%d, want 1 and 1", pass, fail)
}
// Results should be persisted
stored, _ := db.ListAssertionResults("", "exec_test")
if len(stored) != 2 {
t.Errorf("stored results = %d, want 2", len(stored))
}
}
func TestReactCriticalCorruptsEntity(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
Metadata: map[string]any{"close": 120},
})
db.InsertAssertion(&Assertion{ID: "a_crit", EntityID: "e1", Name: "critical_fail", Kind: "range", Rule: "close > 200", Severity: SeverityCritical, Active: true})
// Simulate failed assertion result
results := []AssertionResult{
{ID: "ar1", AssertionID: "a_crit", Status: ResultFail, Message: "close > 200 failed", EvaluatedAt: time.Now().UTC()},
}
rr, err := React(db, nil, results)
if err != nil {
t.Fatalf("react: %v", err)
}
if len(rr.EntityUpdates) != 1 {
t.Fatalf("expected 1 entity update, got %d", len(rr.EntityUpdates))
}
if rr.EntityUpdates[0].NewStatus != StatusCorrupted {
t.Errorf("new status = %q, want corrupted", rr.EntityUpdates[0].NewStatus)
}
// Verify entity in DB
entity, _ := db.GetEntity("e1")
if entity.Status != StatusCorrupted {
t.Errorf("entity status = %q, want corrupted", entity.Status)
}
}
func TestReactWarningStalesEntity(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
Metadata: map[string]any{"close": 120},
})
db.InsertAssertion(&Assertion{ID: "a_warn", EntityID: "e1", Name: "warn_fail", Kind: "range", Rule: "close > 200", Severity: SeverityWarning, Active: true})
results := []AssertionResult{
{ID: "ar1", AssertionID: "a_warn", Status: ResultFail, Message: "warning", EvaluatedAt: time.Now().UTC()},
}
rr, err := React(db, nil, results)
if err != nil {
t.Fatalf("react: %v", err)
}
if len(rr.EntityUpdates) != 1 {
t.Fatalf("expected 1 entity update, got %d", len(rr.EntityUpdates))
}
if rr.EntityUpdates[0].NewStatus != StatusStale {
t.Errorf("new status = %q, want stale", rr.EntityUpdates[0].NewStatus)
}
}
func TestReactWarningDoesNotDowngradeCorrupted(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusCorrupted, Source: "test",
})
db.InsertAssertion(&Assertion{ID: "a_warn", EntityID: "e1", Name: "warn", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true})
results := []AssertionResult{
{ID: "ar1", AssertionID: "a_warn", Status: ResultFail, EvaluatedAt: time.Now().UTC()},
}
rr, err := React(db, nil, results)
if err != nil {
t.Fatalf("react: %v", err)
}
// Warning should NOT change corrupted entity to stale
if len(rr.EntityUpdates) != 0 {
t.Errorf("expected no entity updates, got %d", len(rr.EntityUpdates))
}
}
func TestReactInfoNoStatusChange(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{
ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test",
})
db.InsertAssertion(&Assertion{ID: "a_info", EntityID: "e1", Name: "info", Kind: "range", Rule: "x > 0", Severity: SeverityInfo, Active: true})
results := []AssertionResult{
{ID: "ar1", AssertionID: "a_info", Status: ResultFail, EvaluatedAt: time.Now().UTC()},
}
rr, err := React(db, nil, results)
if err != nil {
t.Fatalf("react: %v", err)
}
if len(rr.EntityUpdates) != 0 {
t.Errorf("info fail should not change status, got %d updates", len(rr.EntityUpdates))
}
}
func TestValidateExecution(t *testing.T) {
tests := []struct {
name string
e Execution
wantErr bool
}{
{"valid", Execution{ID: "e1", PipelineID: "p1", Status: ExecSuccess, StartedAt: time.Now()}, false},
{"missing id", Execution{PipelineID: "p1", Status: ExecSuccess, StartedAt: time.Now()}, true},
{"missing pipeline", Execution{ID: "e1", Status: ExecSuccess, StartedAt: time.Now()}, true},
{"missing status", Execution{ID: "e1", PipelineID: "p1", StartedAt: time.Now()}, true},
{"invalid status", Execution{ID: "e1", PipelineID: "p1", Status: "invalid", StartedAt: time.Now()}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateExecution(&tt.e)
if (err != nil) != tt.wantErr {
t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestValidateAssertion(t *testing.T) {
known := map[string]bool{"e1": true}
tests := []struct {
name string
a Assertion
wantErr bool
}{
{"valid", Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning}, false},
{"missing entity", Assertion{ID: "a1", EntityID: "unknown", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning}, true},
{"missing rule", Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Severity: SeverityWarning}, true},
{"missing kind", Assertion{ID: "a1", EntityID: "e1", Name: "test", Rule: "x > 0", Severity: SeverityWarning}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateAssertion(&tt.a, known)
if (err != nil) != tt.wantErr {
t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestMigrations(t *testing.T) {
db := tempDB(t)
var count int
err := db.conn.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count)
if err != nil {
t.Fatalf("query: %v", err)
}
if count < 2 {
t.Errorf("expected at least 2 migrations, got %d", count)
}
// Verify all new tables exist
for _, table := range []string{"executions", "assertions", "assertion_results"} {
_, err := db.conn.Exec("SELECT 1 FROM " + table + " LIMIT 1")
if err != nil {
t.Errorf("table %s should exist: %v", table, err)
}
}
}
func TestRecordExecutionWithResults(t *testing.T) {
db := tempDB(t)
db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"})
db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test"})
db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true})
started := time.Now().UTC()
ended := started.Add(5 * time.Second)
e := &Execution{
ID: "exec_tx", PipelineID: "p1", Status: ExecSuccess,
StartedAt: started, EndedAt: &ended,
}
results := []AssertionResult{
{ID: "ar_tx_1", AssertionID: "a1", ExecutionID: "exec_tx", Status: ResultPass, EvaluatedAt: time.Now().UTC()},
}
if err := RecordExecutionWithResults(db, e, results); err != nil {
t.Fatalf("record: %v", err)
}
// Verify both were persisted
gotExec, _ := db.GetExecution("exec_tx")
if gotExec == nil {
t.Fatal("execution not found")
}
gotResult, _ := db.GetAssertionResult("ar_tx_1")
if gotResult == nil {
t.Fatal("assertion result not found")
}
}
func TestGetEntityGraph(t *testing.T) {
db := tempDB(t)