diff --git a/cmd/fn/main.go b/cmd/fn/main.go index e8415f00..497ecead 100644 --- a/cmd/fn/main.go +++ b/cmd/fn/main.go @@ -31,6 +31,8 @@ func main() { cmdAdd(os.Args[2:]) case "ops": cmdOps(os.Args[2:]) + case "proposal": + cmdProposal(os.Args[2:]) case "help", "-h", "--help": printUsage() default: @@ -49,7 +51,8 @@ Usage: fn list [-k kind] [-d domain] [-l lang] fn show Muestra entrada completa fn add [-k kind] Abre $EDITOR con template - fn ops Gestiona operations.db (fn ops help)`) + fn ops Gestiona operations.db (fn ops help) + fn proposal Gestiona proposals`) } func root() string { diff --git a/cmd/fn/ops.go b/cmd/fn/ops.go index bb57331e..82761aee 100644 --- a/cmd/fn/ops.go +++ b/cmd/fn/ops.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "text/tabwriter" + "time" ops "fn-registry/fn_operations" "fn-registry/registry" @@ -32,6 +33,10 @@ func cmdOps(args []string) { cmdOpsGraph() case "snapshot": cmdOpsSnapshot(args[1:]) + case "execution": + cmdOpsExecution(args[1:]) + case "assertion": + cmdOpsAssertion(args[1:]) case "help", "-h", "--help": printOpsUsage() default: @@ -68,7 +73,21 @@ Relation flags: --id --name --from --to --via --direction --status --purity --weight <0.0-1.0> --description - --tags --notes `) + --tags --notes + +Execution commands: + fn ops execution add Registra ejecucion + fn ops execution list [--pipeline-id ] [-s status] + fn ops execution show Muestra ejecucion + +Assertion commands: + fn ops assertion add Añade assertion + fn ops assertion list [--entity-id ] Lista assertions + fn ops assertion show Muestra assertion + fn ops assertion delete Elimina assertion + fn ops assertion eval --entity-id Evalua assertions activas + fn ops assertion result add Registra resultado manual + fn ops assertion result list [--assertion-id ]`) } // --- ops init --- @@ -722,6 +741,663 @@ func openOpsDB() *ops.DB { return db } +// --- Execution subcommands --- + +func cmdOpsExecution(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: fn ops execution ") + os.Exit(1) + } + switch args[0] { + case "add": + cmdOpsExecutionAdd(args[1:]) + case "list": + cmdOpsExecutionList(args[1:]) + case "show": + cmdOpsExecutionShow(args[1:]) + default: + fmt.Fprintf(os.Stderr, "unknown execution command: %s\n", args[0]) + os.Exit(1) + } +} + +func cmdOpsExecutionAdd(args []string) { + var id, pipelineID, relationID, status, startedAtStr, endedAtStr, errorMsg, metricsStr string + var recordsIn, recordsOut *int64 + i := 0 + for i < len(args) { + switch args[i] { + case "--id": + i++ + id = args[i] + case "--pipeline-id": + i++ + pipelineID = args[i] + case "--relation-id": + i++ + relationID = args[i] + case "--status", "-s": + i++ + status = args[i] + case "--started-at": + i++ + startedAtStr = args[i] + case "--ended-at": + i++ + endedAtStr = args[i] + case "--records-in": + i++ + v := parseInt64(args[i]) + recordsIn = &v + case "--records-out": + i++ + v := parseInt64(args[i]) + recordsOut = &v + case "--error": + i++ + errorMsg = args[i] + case "--metrics": + i++ + metricsStr = args[i] + } + i++ + } + + if pipelineID == "" || status == "" { + fmt.Fprintln(os.Stderr, "error: --pipeline-id and --status are required") + os.Exit(1) + } + + if id == "" { + id = fmt.Sprintf("exec_%d", timeNow().UnixNano()) + } + + var startedAt time.Time + if startedAtStr != "" { + var err error + startedAt, err = time.Parse(time.RFC3339, startedAtStr) + if err != nil { + fmt.Fprintf(os.Stderr, "error: invalid started-at: %v\n", err) + os.Exit(1) + } + } else { + startedAt = timeNow() + } + + var endedAt *time.Time + if endedAtStr != "" { + t, err := time.Parse(time.RFC3339, endedAtStr) + if err != nil { + fmt.Fprintf(os.Stderr, "error: invalid ended-at: %v\n", err) + os.Exit(1) + } + endedAt = &t + } + + var metrics map[string]any + if metricsStr != "" { + if err := json.Unmarshal([]byte(metricsStr), &metrics); err != nil { + fmt.Fprintf(os.Stderr, "error: invalid metrics JSON: %v\n", err) + os.Exit(1) + } + } + + e := &ops.Execution{ + ID: id, + PipelineID: pipelineID, + RelationID: relationID, + Status: ops.ExecutionStatus(status), + StartedAt: startedAt, + EndedAt: endedAt, + RecordsIn: recordsIn, + RecordsOut: recordsOut, + Error: errorMsg, + Metrics: metrics, + } + + db := openOpsDB() + defer db.Close() + + if err := ops.InsertExecutionSafe(db, e); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Recorded execution: %s\n", e.ID) +} + +func cmdOpsExecutionList(args []string) { + var pipelineID, relationID, status string + i := 0 + for i < len(args) { + switch args[i] { + case "--pipeline-id": + i++ + pipelineID = args[i] + case "--relation-id": + i++ + relationID = args[i] + case "-s": + i++ + status = args[i] + } + i++ + } + + db := openOpsDB() + defer db.Close() + + execs, err := db.ListExecutions(pipelineID, relationID, ops.ExecutionStatus(status)) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + if len(execs) == 0 { + fmt.Println("No executions.") + return + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "ID\tPIPELINE\tSTATUS\tDURATION_MS\tRECORDS_IN\tRECORDS_OUT") + for _, e := range execs { + dur := "-" + if e.DurationMs != nil { + dur = fmt.Sprintf("%d", *e.DurationMs) + } + rin := "-" + if e.RecordsIn != nil { + rin = fmt.Sprintf("%d", *e.RecordsIn) + } + rout := "-" + if e.RecordsOut != nil { + rout = fmt.Sprintf("%d", *e.RecordsOut) + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", e.ID, e.PipelineID, e.Status, dur, rin, rout) + } + w.Flush() +} + +func cmdOpsExecutionShow(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: fn ops execution show ") + os.Exit(1) + } + + db := openOpsDB() + defer db.Close() + + e, err := db.GetExecution(args[0]) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + if e == nil { + fmt.Fprintf(os.Stderr, "execution %q not found\n", args[0]) + os.Exit(1) + } + + fmt.Printf("ID: %s\n", e.ID) + fmt.Printf("Pipeline: %s\n", e.PipelineID) + if e.RelationID != "" { + fmt.Printf("Relation: %s\n", e.RelationID) + } + fmt.Printf("Status: %s\n", e.Status) + fmt.Printf("Started at: %s\n", e.StartedAt.Format(time.RFC3339)) + if e.EndedAt != nil { + fmt.Printf("Ended at: %s\n", e.EndedAt.Format(time.RFC3339)) + } + if e.DurationMs != nil { + fmt.Printf("Duration: %d ms\n", *e.DurationMs) + } + if e.RecordsIn != nil { + fmt.Printf("Records in: %d\n", *e.RecordsIn) + } + if e.RecordsOut != nil { + fmt.Printf("Records out: %d\n", *e.RecordsOut) + } + if e.Error != "" { + fmt.Printf("Error: %s\n", e.Error) + } + if len(e.Metrics) > 0 { + m, _ := json.MarshalIndent(e.Metrics, " ", " ") + fmt.Printf("Metrics: %s\n", string(m)) + } +} + +// --- Assertion subcommands --- + +func cmdOpsAssertion(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: fn ops assertion ") + os.Exit(1) + } + switch args[0] { + case "add": + cmdOpsAssertionAdd(args[1:]) + case "list": + cmdOpsAssertionList(args[1:]) + case "show": + cmdOpsAssertionShow(args[1:]) + case "delete": + cmdOpsAssertionDelete(args[1:]) + case "eval": + cmdOpsAssertionEval(args[1:]) + case "result": + cmdOpsAssertionResult(args[1:]) + default: + fmt.Fprintf(os.Stderr, "unknown assertion command: %s\n", args[0]) + os.Exit(1) + } +} + +func cmdOpsAssertionAdd(args []string) { + var id, entityID, name, kind, rule, severity, description string + i := 0 + for i < len(args) { + switch args[i] { + case "--id": + i++ + id = args[i] + case "--entity-id": + i++ + entityID = args[i] + case "--name": + i++ + name = args[i] + case "--kind": + i++ + kind = args[i] + case "--rule": + i++ + rule = args[i] + case "--severity": + i++ + severity = args[i] + case "--description": + i++ + description = args[i] + } + i++ + } + + if entityID == "" || name == "" || kind == "" || rule == "" { + fmt.Fprintln(os.Stderr, "error: --entity-id, --name, --kind, and --rule are required") + os.Exit(1) + } + + if id == "" { + id = fmt.Sprintf("assert_%s_%d", name, timeNow().UnixNano()) + } + if severity == "" { + severity = "warning" + } + + a := &ops.Assertion{ + ID: id, + EntityID: entityID, + Name: name, + Kind: kind, + Rule: rule, + Severity: ops.Severity(severity), + Description: description, + Active: true, + } + + db := openOpsDB() + defer db.Close() + + if err := ops.InsertAssertionSafe(db, a); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Created assertion: %s\n", a.ID) +} + +func cmdOpsAssertionList(args []string) { + var entityID string + var activeFilter *bool + i := 0 + for i < len(args) { + switch args[i] { + case "--entity-id": + i++ + entityID = args[i] + case "--active": + v := true + activeFilter = &v + case "--inactive": + v := false + activeFilter = &v + } + i++ + } + + db := openOpsDB() + defer db.Close() + + assertions, err := db.ListAssertions(entityID, activeFilter) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + if len(assertions) == 0 { + fmt.Println("No assertions.") + return + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "ID\tENTITY\tNAME\tKIND\tSEVERITY\tACTIVE") + for _, a := range assertions { + active := "yes" + if !a.Active { + active = "no" + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", a.ID, a.EntityID, a.Name, a.Kind, a.Severity, active) + } + w.Flush() +} + +func cmdOpsAssertionShow(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: fn ops assertion show ") + os.Exit(1) + } + + db := openOpsDB() + defer db.Close() + + a, err := db.GetAssertion(args[0]) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + if a == nil { + fmt.Fprintf(os.Stderr, "assertion %q not found\n", args[0]) + os.Exit(1) + } + + active := "yes" + if !a.Active { + active = "no" + } + + fmt.Printf("ID: %s\n", a.ID) + fmt.Printf("Entity: %s\n", a.EntityID) + fmt.Printf("Name: %s\n", a.Name) + fmt.Printf("Kind: %s\n", a.Kind) + fmt.Printf("Rule: %s\n", a.Rule) + fmt.Printf("Severity: %s\n", a.Severity) + fmt.Printf("Description: %s\n", a.Description) + fmt.Printf("Active: %s\n", active) + fmt.Printf("Created: %s\n", a.CreatedAt.Format(time.RFC3339)) + + // Show recent results + results, err := db.ListAssertionResults(a.ID, "") + if err == nil && len(results) > 0 { + fmt.Printf("\nRecent results:\n") + limit := 5 + if len(results) < limit { + limit = len(results) + } + for _, r := range results[:limit] { + fmt.Printf(" [%s] %s — %s %s\n", r.EvaluatedAt.Format(time.RFC3339), r.Status, r.Message, formatResultValue(r.Value)) + } + } +} + +func cmdOpsAssertionDelete(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: fn ops assertion delete ") + os.Exit(1) + } + + db := openOpsDB() + defer db.Close() + + if err := db.DeleteAssertion(args[0]); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + fmt.Printf("Deleted assertion: %s\n", args[0]) +} + +func cmdOpsAssertionEval(args []string) { + var entityID, executionID string + react := false + i := 0 + for i < len(args) { + switch args[i] { + case "--entity-id": + i++ + entityID = args[i] + case "--execution-id": + i++ + executionID = args[i] + case "--react": + react = true + } + i++ + } + + if entityID == "" { + fmt.Fprintln(os.Stderr, "error: --entity-id is required") + os.Exit(1) + } + + db := openOpsDB() + defer db.Close() + + results, err := ops.EvalEntityAssertions(db, entityID, executionID) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + if len(results) == 0 { + fmt.Println("No active assertions for this entity.") + return + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "ASSERTION\tSTATUS\tMESSAGE") + for _, r := range results { + fmt.Fprintf(w, "%s\t%s\t%s\n", r.AssertionID, r.Status, truncate(r.Message, 60)) + } + w.Flush() + + // Summary + pass, fail, skip := 0, 0, 0 + for _, r := range results { + switch r.Status { + case ops.ResultPass: + pass++ + case ops.ResultFail: + fail++ + case ops.ResultSkip: + skip++ + } + } + fmt.Printf("\nResults: %d pass, %d fail, %d skip\n", pass, fail, skip) + + // Reactive loop: apply severity rules + if react && fail > 0 { + var regDB *registry.DB + regDB = tryOpenRegistryDB() + // regDB can be nil — React handles it (just won't create proposals) + + rr, err := ops.React(db, regDB, results) + if regDB != nil { + defer regDB.Close() + } + if err != nil { + fmt.Fprintf(os.Stderr, "error in reactive loop: %v\n", err) + os.Exit(1) + } + + if len(rr.EntityUpdates) > 0 { + fmt.Println("\nEntity status changes:") + for _, u := range rr.EntityUpdates { + fmt.Printf(" %s: %s -> %s (%s)\n", u.EntityID, u.OldStatus, u.NewStatus, u.Reason) + } + } + if len(rr.Proposals) > 0 { + fmt.Println("\nProposals created:") + for _, pid := range rr.Proposals { + fmt.Printf(" %s\n", pid) + } + } + } else if !react && fail > 0 { + fmt.Println("\nTip: use --react to apply severity rules (update entity status, create proposals)") + } +} + +// --- Assertion Result subcommands --- + +func cmdOpsAssertionResult(args []string) { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "usage: fn ops assertion result ") + os.Exit(1) + } + switch args[0] { + case "add": + cmdOpsAssertionResultAdd(args[1:]) + case "list": + cmdOpsAssertionResultList(args[1:]) + default: + fmt.Fprintf(os.Stderr, "unknown assertion result command: %s\n", args[0]) + os.Exit(1) + } +} + +func cmdOpsAssertionResultAdd(args []string) { + var id, assertionID, executionID, status, valueStr, message string + i := 0 + for i < len(args) { + switch args[i] { + case "--id": + i++ + id = args[i] + case "--assertion-id": + i++ + assertionID = args[i] + case "--execution-id": + i++ + executionID = args[i] + case "--status": + i++ + status = args[i] + case "--value": + i++ + valueStr = args[i] + case "--message": + i++ + message = args[i] + } + i++ + } + + if assertionID == "" || status == "" { + fmt.Fprintln(os.Stderr, "error: --assertion-id and --status are required") + os.Exit(1) + } + + if id == "" { + id = fmt.Sprintf("ar_%d", timeNow().UnixNano()) + } + + var value map[string]any + if valueStr != "" { + if err := json.Unmarshal([]byte(valueStr), &value); err != nil { + fmt.Fprintf(os.Stderr, "error: invalid value JSON: %v\n", err) + os.Exit(1) + } + } + + ar := &ops.AssertionResult{ + ID: id, + AssertionID: assertionID, + ExecutionID: executionID, + Status: ops.AssertionResultStatus(status), + Value: value, + Message: message, + EvaluatedAt: timeNow(), + } + + db := openOpsDB() + defer db.Close() + + if err := db.InsertAssertionResult(ar); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Recorded result: %s (%s)\n", ar.ID, ar.Status) +} + +func cmdOpsAssertionResultList(args []string) { + var assertionID, executionID string + i := 0 + for i < len(args) { + switch args[i] { + case "--assertion-id": + i++ + assertionID = args[i] + case "--execution-id": + i++ + executionID = args[i] + } + i++ + } + + db := openOpsDB() + defer db.Close() + + results, err := db.ListAssertionResults(assertionID, executionID) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + if len(results) == 0 { + fmt.Println("No assertion results.") + return + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "ID\tASSERTION\tEXECUTION\tSTATUS\tEVALUATED_AT") + for _, r := range results { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", r.ID, r.AssertionID, r.ExecutionID, r.Status, r.EvaluatedAt.Format(time.RFC3339)) + } + w.Flush() +} + +// --- helpers for new commands --- + +func timeNow() time.Time { + return time.Now().UTC() +} + +func parseInt64(s string) int64 { + var v int64 + fmt.Sscanf(s, "%d", &v) + return v +} + +func formatResultValue(v map[string]any) string { + if len(v) == 0 { + return "" + } + b, _ := json.Marshal(v) + s := string(b) + if len(s) > 40 { + return s[:37] + "..." + } + return s +} + func tryOpenRegistryDB() *registry.DB { // Try FN_REGISTRY_ROOT env var first if envRoot := os.Getenv("FN_REGISTRY_ROOT"); envRoot != "" { diff --git a/fn_operations/db.go b/fn_operations/db.go index 0ce57f2a..1aacc124 100644 --- a/fn_operations/db.go +++ b/fn_operations/db.go @@ -9,89 +9,6 @@ import ( _ "github.com/mattn/go-sqlite3" ) -const schemaSQL = ` -CREATE TABLE IF NOT EXISTS types_snapshot ( - id TEXT PRIMARY KEY, - version TEXT NOT NULL DEFAULT '1.0.0', - lang TEXT NOT NULL, - algebraic TEXT NOT NULL CHECK(algebraic IN ('product','sum')), - definition TEXT NOT NULL DEFAULT '', - description TEXT NOT NULL DEFAULT '', - snapped_at TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS entities ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - type_ref TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active','stale','corrupted','archived')), - description TEXT NOT NULL DEFAULT '', - domain TEXT NOT NULL DEFAULT '', - tags TEXT NOT NULL DEFAULT '[]', - source TEXT NOT NULL, - metadata TEXT NOT NULL DEFAULT '{}', - notes TEXT NOT NULL DEFAULT '', - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS relations ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - from_entity TEXT NOT NULL DEFAULT '', - to_entity TEXT NOT NULL, - via TEXT NOT NULL DEFAULT '', - description TEXT NOT NULL DEFAULT '', - purity TEXT NOT NULL DEFAULT '' CHECK(purity IN ('','pure','impure')), - direction TEXT NOT NULL DEFAULT 'unidirectional' CHECK(direction IN ('unidirectional','bidirectional','inverse')), - weight REAL, - status TEXT NOT NULL DEFAULT 'designed' CHECK(status IN ('designed','implemented','tested','running','deprecated')), - started_at TEXT, - ended_at TEXT, - "order" INTEGER, - tags TEXT NOT NULL DEFAULT '[]', - notes TEXT NOT NULL DEFAULT '', - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS relation_inputs ( - id TEXT PRIMARY KEY, - relation_id TEXT NOT NULL REFERENCES relations(id) ON DELETE CASCADE, - entity_id TEXT NOT NULL REFERENCES entities(id), - role TEXT NOT NULL, - "order" INTEGER -); - -CREATE VIRTUAL TABLE IF NOT EXISTS entities_fts USING fts5( - id, - name, - description, - tags, - domain, - content='entities', - content_rowid='rowid' -); - --- Triggers to keep entities FTS in sync -CREATE TRIGGER IF NOT EXISTS entities_ai AFTER INSERT ON entities BEGIN - INSERT INTO entities_fts(rowid, id, name, description, tags, domain) - VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain); -END; - -CREATE TRIGGER IF NOT EXISTS entities_ad AFTER DELETE ON entities BEGIN - INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags, domain) - VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain); -END; - -CREATE TRIGGER IF NOT EXISTS entities_au AFTER UPDATE ON entities BEGIN - INSERT INTO entities_fts(entities_fts, rowid, id, name, description, tags, domain) - VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain); - INSERT INTO entities_fts(rowid, id, name, description, tags, domain) - VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain); -END; -` - // DB wraps a SQLite connection for an operations database. type DB struct { conn *sql.DB @@ -115,9 +32,9 @@ func Open(path string) (*DB, error) { return nil, fmt.Errorf("setting WAL mode: %w", err) } - if _, err := conn.Exec(schemaSQL); err != nil { + if err := migrate(conn); err != nil { conn.Close() - return nil, fmt.Errorf("applying schema: %w", err) + return nil, fmt.Errorf("running migrations: %w", err) } return &DB{conn: conn, path: path}, nil diff --git a/fn_operations/eval.go b/fn_operations/eval.go new file mode 100644 index 00000000..542ec1f8 --- /dev/null +++ b/fn_operations/eval.go @@ -0,0 +1,117 @@ +package fn_operations + +import ( + "fmt" + "regexp" + "strings" + "time" +) + +// fieldPattern matches bare field names (word chars) that are NOT already inside +// a json_extract call, a SQL keyword, a string literal, or a number. +var fieldPattern = regexp.MustCompile(`\b([a-zA-Z_][a-zA-Z0-9_]*)\b`) + +// sqlKeywords that should not be rewritten to json_extract. +var sqlKeywords = map[string]bool{ + "AND": true, "OR": true, "NOT": true, "IS": true, "NULL": true, + "IN": true, "BETWEEN": true, "LIKE": true, "GLOB": true, + "TRUE": true, "FALSE": true, "CASE": true, "WHEN": true, + "THEN": true, "ELSE": true, "END": true, "SELECT": true, + "FROM": true, "WHERE": true, "AS": true, "CAST": true, +} + +// sqlFunctions that should not be rewritten. +var sqlFunctions = map[string]bool{ + "json_extract": true, "datetime": true, "now": true, + "abs": true, "avg": true, "count": true, "max": true, "min": true, + "sum": true, "total": true, "length": true, "typeof": true, + "coalesce": true, "ifnull": true, "nullif": true, + "upper": true, "lower": true, "trim": true, "replace": true, + "substr": true, "instr": true, "round": true, +} + +// rewriteRule transforms a rule expression into SQL that operates on entity metadata. +// Bare field names are rewritten to json_extract(metadata, '$.field'). +// If the rule already uses json_extract, it is left as-is. +func rewriteRule(rule string) string { + if strings.Contains(rule, "json_extract") { + return rule + } + + return fieldPattern.ReplaceAllStringFunc(rule, func(match string) string { + upper := strings.ToUpper(match) + if sqlKeywords[upper] { + return match + } + if sqlFunctions[strings.ToLower(match)] { + return match + } + // Skip numeric-looking tokens (shouldn't match the regex, but be safe) + if match[0] >= '0' && match[0] <= '9' { + return match + } + return fmt.Sprintf("json_extract(metadata, '$.%s')", match) + }) +} + +// EvalAssertion evaluates a single assertion against its entity's metadata. +// Returns a result with pass/fail/skip status. +func EvalAssertion(db *DB, a *Assertion, executionID string) (*AssertionResult, error) { + result := &AssertionResult{ + ID: fmt.Sprintf("ar_%s_%d", a.ID, time.Now().UnixNano()), + AssertionID: a.ID, + ExecutionID: executionID, + EvaluatedAt: time.Now().UTC(), + } + + rewritten := rewriteRule(a.Rule) + + q := fmt.Sprintf(` + SELECT CASE WHEN (%s) THEN 'pass' ELSE 'fail' END, + metadata + FROM entities WHERE id = ?`, rewritten) + + var status, metadataJSON string + err := db.conn.QueryRow(q, a.EntityID).Scan(&status, &metadataJSON) + if err != nil { + result.Status = ResultSkip + result.Message = fmt.Sprintf("evaluation error: %v", err) + return result, nil + } + + if status == "pass" { + result.Status = ResultPass + } else { + result.Status = ResultFail + result.Value = unmarshalJSON(metadataJSON) + result.Message = fmt.Sprintf("rule failed: %s", a.Rule) + } + + return result, nil +} + +// EvalEntityAssertions evaluates all active assertions for an entity. +// Returns results and persists them in the database. +func EvalEntityAssertions(db *DB, entityID, executionID string) ([]AssertionResult, error) { + active := true + assertions, err := db.ListAssertions(entityID, &active) + if err != nil { + return nil, fmt.Errorf("listing assertions for entity %s: %w", entityID, err) + } + + var results []AssertionResult + for _, a := range assertions { + ar, err := EvalAssertion(db, &a, executionID) + if err != nil { + return nil, fmt.Errorf("evaluating assertion %s: %w", a.ID, err) + } + + if err := db.InsertAssertionResult(ar); err != nil { + return nil, fmt.Errorf("storing result for assertion %s: %w", a.ID, err) + } + + results = append(results, *ar) + } + + return results, nil +} diff --git a/fn_operations/migrate.go b/fn_operations/migrate.go new file mode 100644 index 00000000..782b97f4 --- /dev/null +++ b/fn_operations/migrate.go @@ -0,0 +1,117 @@ +package fn_operations + +import ( + "database/sql" + "embed" + "fmt" + "path" + "sort" + "strconv" + "strings" + "time" +) + +//go:embed migrations/*.sql +var migrationsFS embed.FS + +const migrationTableSQL = ` +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + applied_at TEXT NOT NULL +);` + +// migrate applies pending migrations to the database. +func migrate(conn *sql.DB) error { + if _, err := conn.Exec(migrationTableSQL); err != nil { + return fmt.Errorf("creating schema_migrations table: %w", err) + } + + current, err := currentVersion(conn) + if err != nil { + return err + } + + files, err := listMigrations() + if err != nil { + return err + } + + for _, mf := range files { + if mf.version <= current { + continue + } + + content, err := migrationsFS.ReadFile(path.Join("migrations", mf.filename)) + if err != nil { + return fmt.Errorf("reading migration %s: %w", mf.filename, err) + } + + tx, err := conn.Begin() + if err != nil { + return fmt.Errorf("beginning transaction for migration %d: %w", mf.version, err) + } + + if _, err := tx.Exec(string(content)); err != nil { + tx.Rollback() + return fmt.Errorf("applying migration %s: %w", mf.filename, err) + } + + if _, err := tx.Exec( + "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)", + mf.version, mf.filename, time.Now().UTC().Format(time.RFC3339), + ); err != nil { + tx.Rollback() + return fmt.Errorf("recording migration %s: %w", mf.filename, err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("committing migration %s: %w", mf.filename, err) + } + } + + return nil +} + +func currentVersion(conn *sql.DB) (int, error) { + var v int + err := conn.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v) + if err != nil { + return 0, fmt.Errorf("reading current schema version: %w", err) + } + return v, nil +} + +type migrationFile struct { + version int + filename string +} + +func listMigrations() ([]migrationFile, error) { + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + return nil, fmt.Errorf("reading migrations directory: %w", err) + } + + var files []migrationFile + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".sql") { + continue + } + parts := strings.SplitN(e.Name(), "_", 2) + if len(parts) < 2 { + continue + } + v, err := strconv.Atoi(parts[0]) + if err != nil { + continue + } + files = append(files, migrationFile{version: v, filename: e.Name()}) + } + + sort.Slice(files, func(i, j int) bool { + return files[i].version < files[j].version + }) + + return files, nil +} diff --git a/fn_operations/migrations/002_executions_assertions.sql b/fn_operations/migrations/002_executions_assertions.sql new file mode 100644 index 00000000..993c8ce6 --- /dev/null +++ b/fn_operations/migrations/002_executions_assertions.sql @@ -0,0 +1,65 @@ +-- Executions, assertions, and assertion_results tables. +-- Closes the autonomous improvement loop: execute -> assert -> analyze. + +CREATE TABLE IF NOT EXISTS executions ( + id TEXT PRIMARY KEY, + pipeline_id TEXT NOT NULL, + relation_id TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL CHECK(status IN ('success','failure','partial')), + started_at TEXT NOT NULL, + ended_at TEXT, + duration_ms INTEGER, + records_in INTEGER, + records_out INTEGER, + error TEXT NOT NULL DEFAULT '', + metrics TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS assertions ( + id TEXT PRIMARY KEY, + entity_id TEXT NOT NULL REFERENCES entities(id), + name TEXT NOT NULL, + kind TEXT NOT NULL, + rule TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'warning' CHECK(severity IN ('critical','warning','info')), + description TEXT NOT NULL DEFAULT '', + active INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS assertion_results ( + id TEXT PRIMARY KEY, + assertion_id TEXT NOT NULL REFERENCES assertions(id), + execution_id TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL CHECK(status IN ('pass','fail','skip')), + value TEXT NOT NULL DEFAULT '{}', + message TEXT NOT NULL DEFAULT '', + evaluated_at TEXT NOT NULL +); + +CREATE VIRTUAL TABLE IF NOT EXISTS assertions_fts USING fts5( + id, + name, + description, + rule, + content='assertions', + content_rowid='rowid' +); + +CREATE TRIGGER IF NOT EXISTS assertions_ai AFTER INSERT ON assertions BEGIN + INSERT INTO assertions_fts(rowid, id, name, description, rule) + VALUES (new.rowid, new.id, new.name, new.description, new.rule); +END; + +CREATE TRIGGER IF NOT EXISTS assertions_ad AFTER DELETE ON assertions BEGIN + INSERT INTO assertions_fts(assertions_fts, rowid, id, name, description, rule) + VALUES ('delete', old.rowid, old.id, old.name, old.description, old.rule); +END; + +CREATE TRIGGER IF NOT EXISTS assertions_au AFTER UPDATE ON assertions BEGIN + INSERT INTO assertions_fts(assertions_fts, rowid, id, name, description, rule) + VALUES ('delete', old.rowid, old.id, old.name, old.description, old.rule); + INSERT INTO assertions_fts(rowid, id, name, description, rule) + VALUES (new.rowid, new.id, new.name, new.description, new.rule); +END; diff --git a/fn_operations/models.go b/fn_operations/models.go index 2eefeab3..3e1da45f 100644 --- a/fn_operations/models.go +++ b/fn_operations/models.go @@ -78,6 +78,73 @@ type RelationInput struct { Order *int `json:"order"` } +// ExecutionStatus represents the result of a pipeline execution. +type ExecutionStatus string + +const ( + ExecSuccess ExecutionStatus = "success" + ExecFailure ExecutionStatus = "failure" + ExecPartial ExecutionStatus = "partial" +) + +// Severity classifies the impact of an assertion failure. +type Severity string + +const ( + SeverityCritical Severity = "critical" + SeverityWarning Severity = "warning" + SeverityInfo Severity = "info" +) + +// AssertionResultStatus represents the outcome of an assertion evaluation. +type AssertionResultStatus string + +const ( + ResultPass AssertionResultStatus = "pass" + ResultFail AssertionResultStatus = "fail" + ResultSkip AssertionResultStatus = "skip" +) + +// Execution records a pipeline run with its metrics and outcome. +type Execution struct { + ID string `json:"id"` + PipelineID string `json:"pipeline_id"` + RelationID string `json:"relation_id"` + Status ExecutionStatus `json:"status"` + StartedAt time.Time `json:"started_at"` + EndedAt *time.Time `json:"ended_at"` + DurationMs *int64 `json:"duration_ms"` + RecordsIn *int64 `json:"records_in"` + RecordsOut *int64 `json:"records_out"` + Error string `json:"error"` + Metrics map[string]any `json:"metrics"` + CreatedAt time.Time `json:"created_at"` +} + +// Assertion is a formal quality rule evaluated against an entity. +type Assertion struct { + ID string `json:"id"` + EntityID string `json:"entity_id"` + Name string `json:"name"` + Kind string `json:"kind"` // free text: range, null, statistical, consistency, freshness, ... + Rule string `json:"rule"` + Severity Severity `json:"severity"` + Description string `json:"description"` + Active bool `json:"active"` + CreatedAt time.Time `json:"created_at"` +} + +// AssertionResult records one evaluation of an assertion. +type AssertionResult struct { + ID string `json:"id"` + AssertionID string `json:"assertion_id"` + ExecutionID string `json:"execution_id"` + Status AssertionResultStatus `json:"status"` + Value map[string]any `json:"value"` + Message string `json:"message"` + EvaluatedAt time.Time `json:"evaluated_at"` +} + // TypeSnapshot is an immutable copy of a registry type at point of use. type TypeSnapshot struct { ID string `json:"id"` diff --git a/fn_operations/operations.go b/fn_operations/operations.go index 973c3f64..58af8fae 100644 --- a/fn_operations/operations.go +++ b/fn_operations/operations.go @@ -277,6 +277,276 @@ func UpdateSnapshot(opsDB *DB, registryDB *registry.DB, typeID string) (old, new return oldSnap, newSnap, nil } +// InsertExecutionSafe validates and inserts an execution. +// Auto-calculates duration_ms if both started_at and ended_at are set. +func InsertExecutionSafe(db *DB, e *Execution) error { + if err := ValidateExecution(e); err != nil { + return err + } + + // Auto-calculate duration + if e.EndedAt != nil && !e.StartedAt.IsZero() && e.DurationMs == nil { + ms := e.EndedAt.Sub(e.StartedAt).Milliseconds() + e.DurationMs = &ms + } + + return db.InsertExecution(e) +} + +// InsertAssertionSafe validates that the entity exists, then inserts the assertion. +func InsertAssertionSafe(db *DB, a *Assertion) error { + entities, err := buildEntitySet(db) + if err != nil { + return err + } + + if err := ValidateAssertion(a, entities); err != nil { + return err + } + + return db.InsertAssertion(a) +} + +// RecordExecutionWithResults inserts an execution and its assertion results in a transaction. +func RecordExecutionWithResults(db *DB, e *Execution, results []AssertionResult) error { + if err := ValidateExecution(e); err != nil { + return err + } + + // Auto-calculate duration + if e.EndedAt != nil && !e.StartedAt.IsZero() && e.DurationMs == nil { + ms := e.EndedAt.Sub(e.StartedAt).Milliseconds() + e.DurationMs = &ms + } + + tx, err := db.Conn().Begin() + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer tx.Rollback() + + // Insert execution + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now().UTC() + } + var endedAt *string + if e.EndedAt != nil { + s := e.EndedAt.Format(time.RFC3339) + endedAt = &s + } + _, err = tx.Exec(` + INSERT OR REPLACE INTO executions ( + id, pipeline_id, relation_id, status, started_at, ended_at, + duration_ms, records_in, records_out, error, metrics, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + e.ID, e.PipelineID, e.RelationID, string(e.Status), + e.StartedAt.Format(time.RFC3339), endedAt, + e.DurationMs, e.RecordsIn, e.RecordsOut, e.Error, + marshalJSON(e.Metrics), e.CreatedAt.Format(time.RFC3339), + ) + if err != nil { + return fmt.Errorf("inserting execution: %w", err) + } + + // Insert assertion results + for _, ar := range results { + if ar.EvaluatedAt.IsZero() { + ar.EvaluatedAt = time.Now().UTC() + } + _, err = tx.Exec(` + INSERT OR REPLACE INTO assertion_results ( + id, assertion_id, execution_id, status, value, message, evaluated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + ar.ID, ar.AssertionID, ar.ExecutionID, string(ar.Status), + marshalJSON(ar.Value), ar.Message, ar.EvaluatedAt.Format(time.RFC3339), + ) + if err != nil { + return fmt.Errorf("inserting assertion_result: %w", err) + } + } + + return tx.Commit() +} + +// --- Reactive loop --- + +// ReactiveResult holds what the reactive loop did after analyzing assertion results. +type ReactiveResult struct { + EntityUpdates []EntityStatusChange + Proposals []string // IDs of proposals created +} + +// EntityStatusChange records a status transition triggered by assertions. +type EntityStatusChange struct { + EntityID string + OldStatus EntityStatus + NewStatus EntityStatus + Reason string +} + +// React analyzes assertion results and applies the severity rules: +// - critical fail → entity.status = corrupted +// - warning fail → entity.status = stale (only if currently active) +// - info fail → no status change +// +// If registryDB is provided and critical failures exist, a proposal is auto-created. +// Returns what changed so the caller can log or display it. +func React(opsDB *DB, registryDB *registry.DB, results []AssertionResult) (*ReactiveResult, error) { + rr := &ReactiveResult{} + + // Group failures by entity, tracking worst severity per entity. + type entityImpact struct { + worstSeverity Severity + failures []AssertionResult + assertionIDs []string + } + impacts := map[string]*entityImpact{} + + for _, ar := range results { + if ar.Status != ResultFail { + continue + } + + // Look up the assertion to get entity_id and severity + a, err := opsDB.GetAssertion(ar.AssertionID) + if err != nil || a == nil { + continue + } + + imp, ok := impacts[a.EntityID] + if !ok { + imp = &entityImpact{} + impacts[a.EntityID] = imp + } + + imp.failures = append(imp.failures, ar) + imp.assertionIDs = append(imp.assertionIDs, ar.AssertionID) + + // Track worst severity (critical > warning > info) + if severityRank(a.Severity) > severityRank(imp.worstSeverity) { + imp.worstSeverity = a.Severity + } + } + + // Apply status changes per entity + for entityID, imp := range impacts { + entity, err := opsDB.GetEntity(entityID) + if err != nil || entity == nil { + continue + } + + var newStatus EntityStatus + switch imp.worstSeverity { + case SeverityCritical: + newStatus = StatusCorrupted + case SeverityWarning: + // Only degrade active → stale, don't touch corrupted/archived + if entity.Status == StatusActive { + newStatus = StatusStale + } + default: + continue // info: no status change + } + + if newStatus == "" || newStatus == entity.Status { + continue + } + + oldStatus := entity.Status + entity.Status = newStatus + if err := opsDB.UpdateEntity(entity); err != nil { + return nil, fmt.Errorf("updating entity %s status: %w", entityID, err) + } + + reason := fmt.Sprintf("%s assertion(s) failed: %v", imp.worstSeverity, imp.assertionIDs) + rr.EntityUpdates = append(rr.EntityUpdates, EntityStatusChange{ + EntityID: entityID, + OldStatus: oldStatus, + NewStatus: newStatus, + Reason: reason, + }) + } + + // Create proposals in registry for critical failures + if registryDB != nil { + for entityID, imp := range impacts { + if imp.worstSeverity != SeverityCritical { + continue + } + + // Build evidence from failures + failureDetails := make([]map[string]any, 0, len(imp.failures)) + for _, f := range imp.failures { + failureDetails = append(failureDetails, map[string]any{ + "assertion_id": f.AssertionID, + "execution_id": f.ExecutionID, + "message": f.Message, + "value": f.Value, + }) + } + + p := ®istry.Proposal{ + ID: fmt.Sprintf("proposal_react_%s_%d", entityID, time.Now().UnixNano()), + Kind: registry.ProposalImproveFunction, + Title: fmt.Sprintf("Critical assertion failures on entity %s", entityID), + Description: fmt.Sprintf("%d critical assertion(s) failed. Entity marked as corrupted.", len(imp.failures)), + Evidence: map[string]any{ + "entity_id": entityID, + "assertion_ids": imp.assertionIDs, + "failures": failureDetails, + }, + Status: registry.ProposalPending, + CreatedBy: "reactive_loop", + } + + if err := registryDB.InsertProposal(p); err != nil { + return nil, fmt.Errorf("creating proposal for entity %s: %w", entityID, err) + } + + rr.Proposals = append(rr.Proposals, p.ID) + } + } + + return rr, nil +} + +// ExecuteAndReact is the full autonomous loop step: +// 1. Record the execution +// 2. Evaluate all active assertions on affected entities +// 3. React to failures (update entity status, create proposals) +func ExecuteAndReact(opsDB *DB, registryDB *registry.DB, e *Execution, entityIDs []string) (*ReactiveResult, error) { + // 1. Record execution + if err := InsertExecutionSafe(opsDB, e); err != nil { + return nil, fmt.Errorf("recording execution: %w", err) + } + + // 2. Evaluate assertions for each affected entity + var allResults []AssertionResult + for _, entityID := range entityIDs { + results, err := EvalEntityAssertions(opsDB, entityID, e.ID) + if err != nil { + return nil, fmt.Errorf("evaluating assertions for entity %s: %w", entityID, err) + } + allResults = append(allResults, results...) + } + + // 3. React + return React(opsDB, registryDB, allResults) +} + +func severityRank(s Severity) int { + switch s { + case SeverityCritical: + return 3 + case SeverityWarning: + return 2 + case SeverityInfo: + return 1 + default: + return 0 + } +} + func buildEntitySet(db *DB) (map[string]bool, error) { all, err := db.ListEntities("", "") if err != nil { diff --git a/fn_operations/operations_test.go b/fn_operations/operations_test.go index 23b81ec7..3d708984 100644 --- a/fn_operations/operations_test.go +++ b/fn_operations/operations_test.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "testing" + "time" ) func tempDB(t *testing.T) *DB { @@ -351,6 +352,479 @@ func TestValidateEntity(t *testing.T) { } } +func TestExecutionCRUD(t *testing.T) { + db := tempDB(t) + + started := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC) + ended := time.Date(2026, 3, 28, 10, 5, 0, 0, time.UTC) + rin := int64(1000) + rout := int64(60) + + e := &Execution{ + ID: "exec_1", + PipelineID: "tick_to_ohlcv_go_finance", + RelationID: "rel_1", + Status: ExecSuccess, + StartedAt: started, + EndedAt: &ended, + RecordsIn: &rin, + RecordsOut: &rout, + Metrics: map[string]any{"mean_close": 42000}, + } + + if err := InsertExecutionSafe(db, e); err != nil { + t.Fatalf("insert: %v", err) + } + + // Auto-calculated duration + if e.DurationMs == nil || *e.DurationMs != 300000 { + t.Errorf("duration_ms = %v, want 300000", e.DurationMs) + } + + got, err := db.GetExecution("exec_1") + if err != nil { + t.Fatalf("get: %v", err) + } + if got == nil { + t.Fatal("expected execution, got nil") + } + if got.PipelineID != "tick_to_ohlcv_go_finance" { + t.Errorf("pipeline_id = %q", got.PipelineID) + } + if got.Metrics["mean_close"] != float64(42000) { + t.Errorf("metrics mean_close = %v", got.Metrics["mean_close"]) + } + + // List + all, err := db.ListExecutions("", "", "") + if err != nil { + t.Fatalf("list: %v", err) + } + if len(all) != 1 { + t.Errorf("list = %d, want 1", len(all)) + } + + byPipeline, _ := db.ListExecutions("tick_to_ohlcv_go_finance", "", "") + if len(byPipeline) != 1 { + t.Errorf("list by pipeline = %d, want 1", len(byPipeline)) + } + + byStatus, _ := db.ListExecutions("", "", ExecFailure) + if len(byStatus) != 0 { + t.Errorf("list by failure status = %d, want 0", len(byStatus)) + } +} + +func TestAssertionCRUD(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test", + Metadata: map[string]any{"close": 120, "high": 150, "low": 90}}) + + a := &Assertion{ + ID: "assert_close_positive", + EntityID: "e1", + Name: "close positivo", + Kind: "range", + Rule: "close > 0", + Severity: SeverityCritical, + Active: true, + } + + if err := InsertAssertionSafe(db, a); err != nil { + t.Fatalf("insert: %v", err) + } + + got, err := db.GetAssertion("assert_close_positive") + if err != nil { + t.Fatalf("get: %v", err) + } + if got == nil { + t.Fatal("expected assertion, got nil") + } + if got.Rule != "close > 0" { + t.Errorf("rule = %q", got.Rule) + } + if !got.Active { + t.Error("expected active = true") + } + + // Update + got.Active = false + if err := db.UpdateAssertion(got); err != nil { + t.Fatalf("update: %v", err) + } + updated, _ := db.GetAssertion("assert_close_positive") + if updated.Active { + t.Error("expected active = false after update") + } + + // List with active filter + active := true + byActive, _ := db.ListAssertions("e1", &active) + if len(byActive) != 0 { + t.Errorf("list active = %d, want 0 (we deactivated it)", len(byActive)) + } + + // Search FTS + updated.Active = true + db.UpdateAssertion(updated) + found, err := db.SearchAssertions("close", "") + if err != nil { + t.Fatalf("search: %v", err) + } + if len(found) != 1 { + t.Errorf("search 'close' = %d, want 1", len(found)) + } + + // Delete + if err := db.DeleteAssertion("assert_close_positive"); err != nil { + t.Fatalf("delete: %v", err) + } + deleted, _ := db.GetAssertion("assert_close_positive") + if deleted != nil { + t.Error("expected nil after delete") + } +} + +func TestAssertionResultCRUD(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true}) + + ar := &AssertionResult{ + ID: "ar_1", + AssertionID: "a1", + ExecutionID: "exec_1", + Status: ResultFail, + Value: map[string]any{"x": -5}, + Message: "rule failed: x > 0", + EvaluatedAt: time.Now().UTC(), + } + + if err := db.InsertAssertionResult(ar); err != nil { + t.Fatalf("insert: %v", err) + } + + got, err := db.GetAssertionResult("ar_1") + if err != nil { + t.Fatalf("get: %v", err) + } + if got == nil { + t.Fatal("expected result, got nil") + } + if got.Status != ResultFail { + t.Errorf("status = %q, want fail", got.Status) + } + + // List by assertion + byAssertion, _ := db.ListAssertionResults("a1", "") + if len(byAssertion) != 1 { + t.Errorf("list by assertion = %d, want 1", len(byAssertion)) + } + + // List by execution + byExec, _ := db.ListAssertionResults("", "exec_1") + if len(byExec) != 1 { + t.Errorf("list by execution = %d, want 1", len(byExec)) + } +} + +func TestEvalAssertion(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ + ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test", + Metadata: map[string]any{"close": 120, "high": 150, "low": 90, "open": 100}, + }) + + tests := []struct { + name string + rule string + kind string + wantStatus AssertionResultStatus + }{ + {"range pass", "close > 0 AND close < 1000000", "range", ResultPass}, + {"range fail", "close > 200", "range", ResultFail}, + {"consistency pass", "low <= close AND close <= high", "consistency", ResultPass}, + {"consistency fail", "low > close", "consistency", ResultFail}, + {"null pass (field exists)", "close IS NOT NULL", "null", ResultPass}, + {"json_extract direct", "json_extract(metadata, '$.close') > 0", "range", ResultPass}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &Assertion{ + ID: "test_" + tt.name, EntityID: "e1", Name: tt.name, + Kind: tt.kind, Rule: tt.rule, Severity: SeverityWarning, Active: true, + } + result, err := EvalAssertion(db, a, "") + if err != nil { + t.Fatalf("eval: %v", err) + } + if result.Status != tt.wantStatus { + t.Errorf("status = %q, want %q (message: %s)", result.Status, tt.wantStatus, result.Message) + } + }) + } +} + +func TestEvalEntityAssertions(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ + ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test", + Metadata: map[string]any{"close": 120}, + }) + + db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "pass", Kind: "range", Rule: "close > 0", Severity: SeverityWarning, Active: true}) + db.InsertAssertion(&Assertion{ID: "a2", EntityID: "e1", Name: "fail", Kind: "range", Rule: "close > 200", Severity: SeverityCritical, Active: true}) + db.InsertAssertion(&Assertion{ID: "a3", EntityID: "e1", Name: "inactive", Kind: "range", Rule: "close > 999", Severity: SeverityInfo, Active: false}) + + results, err := EvalEntityAssertions(db, "e1", "exec_test") + if err != nil { + t.Fatalf("eval: %v", err) + } + + // Should only eval active assertions (a1, a2), not a3 + if len(results) != 2 { + t.Fatalf("expected 2 results, got %d", len(results)) + } + + pass, fail := 0, 0 + for _, r := range results { + switch r.Status { + case ResultPass: + pass++ + case ResultFail: + fail++ + } + if r.ExecutionID != "exec_test" { + t.Errorf("execution_id = %q, want exec_test", r.ExecutionID) + } + } + if pass != 1 || fail != 1 { + t.Errorf("pass=%d fail=%d, want 1 and 1", pass, fail) + } + + // Results should be persisted + stored, _ := db.ListAssertionResults("", "exec_test") + if len(stored) != 2 { + t.Errorf("stored results = %d, want 2", len(stored)) + } +} + +func TestReactCriticalCorruptsEntity(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ + ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test", + Metadata: map[string]any{"close": 120}, + }) + db.InsertAssertion(&Assertion{ID: "a_crit", EntityID: "e1", Name: "critical_fail", Kind: "range", Rule: "close > 200", Severity: SeverityCritical, Active: true}) + + // Simulate failed assertion result + results := []AssertionResult{ + {ID: "ar1", AssertionID: "a_crit", Status: ResultFail, Message: "close > 200 failed", EvaluatedAt: time.Now().UTC()}, + } + + rr, err := React(db, nil, results) + if err != nil { + t.Fatalf("react: %v", err) + } + + if len(rr.EntityUpdates) != 1 { + t.Fatalf("expected 1 entity update, got %d", len(rr.EntityUpdates)) + } + if rr.EntityUpdates[0].NewStatus != StatusCorrupted { + t.Errorf("new status = %q, want corrupted", rr.EntityUpdates[0].NewStatus) + } + + // Verify entity in DB + entity, _ := db.GetEntity("e1") + if entity.Status != StatusCorrupted { + t.Errorf("entity status = %q, want corrupted", entity.Status) + } +} + +func TestReactWarningStalesEntity(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ + ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test", + Metadata: map[string]any{"close": 120}, + }) + db.InsertAssertion(&Assertion{ID: "a_warn", EntityID: "e1", Name: "warn_fail", Kind: "range", Rule: "close > 200", Severity: SeverityWarning, Active: true}) + + results := []AssertionResult{ + {ID: "ar1", AssertionID: "a_warn", Status: ResultFail, Message: "warning", EvaluatedAt: time.Now().UTC()}, + } + + rr, err := React(db, nil, results) + if err != nil { + t.Fatalf("react: %v", err) + } + + if len(rr.EntityUpdates) != 1 { + t.Fatalf("expected 1 entity update, got %d", len(rr.EntityUpdates)) + } + if rr.EntityUpdates[0].NewStatus != StatusStale { + t.Errorf("new status = %q, want stale", rr.EntityUpdates[0].NewStatus) + } +} + +func TestReactWarningDoesNotDowngradeCorrupted(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ + ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusCorrupted, Source: "test", + }) + db.InsertAssertion(&Assertion{ID: "a_warn", EntityID: "e1", Name: "warn", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true}) + + results := []AssertionResult{ + {ID: "ar1", AssertionID: "a_warn", Status: ResultFail, EvaluatedAt: time.Now().UTC()}, + } + + rr, err := React(db, nil, results) + if err != nil { + t.Fatalf("react: %v", err) + } + + // Warning should NOT change corrupted entity to stale + if len(rr.EntityUpdates) != 0 { + t.Errorf("expected no entity updates, got %d", len(rr.EntityUpdates)) + } +} + +func TestReactInfoNoStatusChange(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ + ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test", + }) + db.InsertAssertion(&Assertion{ID: "a_info", EntityID: "e1", Name: "info", Kind: "range", Rule: "x > 0", Severity: SeverityInfo, Active: true}) + + results := []AssertionResult{ + {ID: "ar1", AssertionID: "a_info", Status: ResultFail, EvaluatedAt: time.Now().UTC()}, + } + + rr, err := React(db, nil, results) + if err != nil { + t.Fatalf("react: %v", err) + } + + if len(rr.EntityUpdates) != 0 { + t.Errorf("info fail should not change status, got %d updates", len(rr.EntityUpdates)) + } +} + +func TestValidateExecution(t *testing.T) { + tests := []struct { + name string + e Execution + wantErr bool + }{ + {"valid", Execution{ID: "e1", PipelineID: "p1", Status: ExecSuccess, StartedAt: time.Now()}, false}, + {"missing id", Execution{PipelineID: "p1", Status: ExecSuccess, StartedAt: time.Now()}, true}, + {"missing pipeline", Execution{ID: "e1", Status: ExecSuccess, StartedAt: time.Now()}, true}, + {"missing status", Execution{ID: "e1", PipelineID: "p1", StartedAt: time.Now()}, true}, + {"invalid status", Execution{ID: "e1", PipelineID: "p1", Status: "invalid", StartedAt: time.Now()}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateExecution(&tt.e) + if (err != nil) != tt.wantErr { + t.Errorf("error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestValidateAssertion(t *testing.T) { + known := map[string]bool{"e1": true} + tests := []struct { + name string + a Assertion + wantErr bool + }{ + {"valid", Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning}, false}, + {"missing entity", Assertion{ID: "a1", EntityID: "unknown", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning}, true}, + {"missing rule", Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Severity: SeverityWarning}, true}, + {"missing kind", Assertion{ID: "a1", EntityID: "e1", Name: "test", Rule: "x > 0", Severity: SeverityWarning}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateAssertion(&tt.a, known) + if (err != nil) != tt.wantErr { + t.Errorf("error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestMigrations(t *testing.T) { + db := tempDB(t) + + var count int + err := db.conn.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count) + if err != nil { + t.Fatalf("query: %v", err) + } + if count < 2 { + t.Errorf("expected at least 2 migrations, got %d", count) + } + + // Verify all new tables exist + for _, table := range []string{"executions", "assertions", "assertion_results"} { + _, err := db.conn.Exec("SELECT 1 FROM " + table + " LIMIT 1") + if err != nil { + t.Errorf("table %s should exist: %v", table, err) + } + } +} + +func TestRecordExecutionWithResults(t *testing.T) { + db := tempDB(t) + + db.InsertTypeSnapshot(&TypeSnapshot{ID: "t1", Version: "1.0.0", Lang: "go", Algebraic: "product"}) + db.InsertEntity(&Entity{ID: "e1", Name: "e1", TypeRef: "t1", Status: StatusActive, Source: "test"}) + db.InsertAssertion(&Assertion{ID: "a1", EntityID: "e1", Name: "test", Kind: "range", Rule: "x > 0", Severity: SeverityWarning, Active: true}) + + started := time.Now().UTC() + ended := started.Add(5 * time.Second) + e := &Execution{ + ID: "exec_tx", PipelineID: "p1", Status: ExecSuccess, + StartedAt: started, EndedAt: &ended, + } + results := []AssertionResult{ + {ID: "ar_tx_1", AssertionID: "a1", ExecutionID: "exec_tx", Status: ResultPass, EvaluatedAt: time.Now().UTC()}, + } + + if err := RecordExecutionWithResults(db, e, results); err != nil { + t.Fatalf("record: %v", err) + } + + // Verify both were persisted + gotExec, _ := db.GetExecution("exec_tx") + if gotExec == nil { + t.Fatal("execution not found") + } + + gotResult, _ := db.GetAssertionResult("ar_tx_1") + if gotResult == nil { + t.Fatal("assertion result not found") + } +} + func TestGetEntityGraph(t *testing.T) { db := tempDB(t) diff --git a/fn_operations/store.go b/fn_operations/store.go index ed1f47c3..592b8e7d 100644 --- a/fn_operations/store.go +++ b/fn_operations/store.go @@ -471,3 +471,346 @@ func (db *DB) DeleteRelationInputs(relationID string) error { _, err := db.conn.Exec("DELETE FROM relation_inputs WHERE relation_id = ?", relationID) return err } + +// --- Execution CRUD --- + +// InsertExecution inserts an execution record. +func (db *DB) InsertExecution(e *Execution) error { + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now().UTC() + } + + var endedAt *string + if e.EndedAt != nil { + s := e.EndedAt.Format(time.RFC3339) + endedAt = &s + } + + _, err := db.conn.Exec(` + INSERT OR REPLACE INTO executions ( + id, pipeline_id, relation_id, status, started_at, ended_at, + duration_ms, records_in, records_out, error, metrics, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + e.ID, e.PipelineID, e.RelationID, string(e.Status), + e.StartedAt.Format(time.RFC3339), endedAt, + e.DurationMs, e.RecordsIn, e.RecordsOut, e.Error, + marshalJSON(e.Metrics), e.CreatedAt.Format(time.RFC3339), + ) + return err +} + +// GetExecution returns an execution by ID. +func (db *DB) GetExecution(id string) (*Execution, error) { + row := db.conn.QueryRow(` + SELECT id, pipeline_id, relation_id, status, started_at, ended_at, + duration_ms, records_in, records_out, error, metrics, created_at + FROM executions WHERE id = ?`, id) + + var e Execution + var metricsJSON, createdAt, startedAt string + var endedAt *string + err := row.Scan(&e.ID, &e.PipelineID, &e.RelationID, &e.Status, + &startedAt, &endedAt, + &e.DurationMs, &e.RecordsIn, &e.RecordsOut, &e.Error, + &metricsJSON, &createdAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scanning execution: %w", err) + } + e.Metrics = unmarshalJSON(metricsJSON) + e.StartedAt, _ = time.Parse(time.RFC3339, startedAt) + e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + if endedAt != nil { + t, _ := time.Parse(time.RFC3339, *endedAt) + e.EndedAt = &t + } + return &e, nil +} + +// ListExecutions returns executions filtered by pipeline, relation, and/or status. +func (db *DB) ListExecutions(pipelineID, relationID string, status ExecutionStatus) ([]Execution, error) { + where := []string{} + args := []any{} + if pipelineID != "" { + where = append(where, "pipeline_id = ?") + args = append(args, pipelineID) + } + if relationID != "" { + where = append(where, "relation_id = ?") + args = append(args, relationID) + } + if status != "" { + where = append(where, "status = ?") + args = append(args, string(status)) + } + + q := `SELECT id, pipeline_id, relation_id, status, started_at, ended_at, + duration_ms, records_in, records_out, error, metrics, created_at + FROM executions` + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY created_at DESC" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanExecutions(rows) +} + +func scanExecutions(rows *sql.Rows) ([]Execution, error) { + var result []Execution + for rows.Next() { + var e Execution + var metricsJSON, createdAt, startedAt string + var endedAt *string + if err := rows.Scan(&e.ID, &e.PipelineID, &e.RelationID, &e.Status, + &startedAt, &endedAt, + &e.DurationMs, &e.RecordsIn, &e.RecordsOut, &e.Error, + &metricsJSON, &createdAt); err != nil { + return nil, fmt.Errorf("scanning execution: %w", err) + } + e.Metrics = unmarshalJSON(metricsJSON) + e.StartedAt, _ = time.Parse(time.RFC3339, startedAt) + e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + if endedAt != nil { + t, _ := time.Parse(time.RFC3339, *endedAt) + e.EndedAt = &t + } + result = append(result, e) + } + return result, nil +} + +// --- Assertion CRUD --- + +// InsertAssertion inserts or replaces an assertion. +func (db *DB) InsertAssertion(a *Assertion) error { + if a.CreatedAt.IsZero() { + a.CreatedAt = time.Now().UTC() + } + + active := 0 + if a.Active { + active = 1 + } + + _, err := db.conn.Exec(` + INSERT OR REPLACE INTO assertions ( + id, entity_id, name, kind, rule, severity, description, active, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + a.ID, a.EntityID, a.Name, a.Kind, a.Rule, + string(a.Severity), a.Description, active, + a.CreatedAt.Format(time.RFC3339), + ) + return err +} + +// GetAssertion returns an assertion by ID. +func (db *DB) GetAssertion(id string) (*Assertion, error) { + row := db.conn.QueryRow(` + SELECT id, entity_id, name, kind, rule, severity, description, active, created_at + FROM assertions WHERE id = ?`, id) + + var a Assertion + var active int + var createdAt string + err := row.Scan(&a.ID, &a.EntityID, &a.Name, &a.Kind, &a.Rule, + &a.Severity, &a.Description, &active, &createdAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scanning assertion: %w", err) + } + a.Active = active == 1 + a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + return &a, nil +} + +// UpdateAssertion updates an existing assertion. +func (db *DB) UpdateAssertion(a *Assertion) error { + active := 0 + if a.Active { + active = 1 + } + + _, err := db.conn.Exec(` + UPDATE assertions SET entity_id=?, name=?, kind=?, rule=?, severity=?, + description=?, active=? + WHERE id=?`, + a.EntityID, a.Name, a.Kind, a.Rule, string(a.Severity), + a.Description, active, a.ID, + ) + return err +} + +// DeleteAssertion removes an assertion by ID. +func (db *DB) DeleteAssertion(id string) error { + _, err := db.conn.Exec("DELETE FROM assertions WHERE id = ?", id) + return err +} + +// ListAssertions returns assertions filtered by entity and/or active state. +func (db *DB) ListAssertions(entityID string, active *bool) ([]Assertion, error) { + where := []string{} + args := []any{} + if entityID != "" { + where = append(where, "entity_id = ?") + args = append(args, entityID) + } + if active != nil { + v := 0 + if *active { + v = 1 + } + where = append(where, "active = ?") + args = append(args, v) + } + + q := `SELECT id, entity_id, name, kind, rule, severity, description, active, created_at + FROM assertions` + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY name" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanAssertions(rows) +} + +// SearchAssertions performs FTS search on assertions. +func (db *DB) SearchAssertions(query, entityID string) ([]Assertion, error) { + where := []string{} + args := []any{} + if query != "" { + where = append(where, "a.id IN (SELECT id FROM assertions_fts WHERE assertions_fts MATCH ?)") + args = append(args, query) + } + if entityID != "" { + where = append(where, "a.entity_id = ?") + args = append(args, entityID) + } + + q := `SELECT a.id, a.entity_id, a.name, a.kind, a.rule, a.severity, a.description, a.active, a.created_at + FROM assertions a` + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY a.name" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanAssertions(rows) +} + +func scanAssertions(rows *sql.Rows) ([]Assertion, error) { + var result []Assertion + for rows.Next() { + var a Assertion + var active int + var createdAt string + if err := rows.Scan(&a.ID, &a.EntityID, &a.Name, &a.Kind, &a.Rule, + &a.Severity, &a.Description, &active, &createdAt); err != nil { + return nil, fmt.Errorf("scanning assertion: %w", err) + } + a.Active = active == 1 + a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) + result = append(result, a) + } + return result, nil +} + +// --- AssertionResult CRUD --- + +// InsertAssertionResult inserts an assertion result. +func (db *DB) InsertAssertionResult(ar *AssertionResult) error { + if ar.EvaluatedAt.IsZero() { + ar.EvaluatedAt = time.Now().UTC() + } + + _, err := db.conn.Exec(` + INSERT OR REPLACE INTO assertion_results ( + id, assertion_id, execution_id, status, value, message, evaluated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + ar.ID, ar.AssertionID, ar.ExecutionID, string(ar.Status), + marshalJSON(ar.Value), ar.Message, ar.EvaluatedAt.Format(time.RFC3339), + ) + return err +} + +// GetAssertionResult returns an assertion result by ID. +func (db *DB) GetAssertionResult(id string) (*AssertionResult, error) { + row := db.conn.QueryRow(` + SELECT id, assertion_id, execution_id, status, value, message, evaluated_at + FROM assertion_results WHERE id = ?`, id) + + var ar AssertionResult + var valueJSON, evaluatedAt string + err := row.Scan(&ar.ID, &ar.AssertionID, &ar.ExecutionID, &ar.Status, + &valueJSON, &ar.Message, &evaluatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scanning assertion_result: %w", err) + } + ar.Value = unmarshalJSON(valueJSON) + ar.EvaluatedAt, _ = time.Parse(time.RFC3339, evaluatedAt) + return &ar, nil +} + +// ListAssertionResults returns results filtered by assertion and/or execution. +func (db *DB) ListAssertionResults(assertionID, executionID string) ([]AssertionResult, error) { + where := []string{} + args := []any{} + if assertionID != "" { + where = append(where, "assertion_id = ?") + args = append(args, assertionID) + } + if executionID != "" { + where = append(where, "execution_id = ?") + args = append(args, executionID) + } + + q := `SELECT id, assertion_id, execution_id, status, value, message, evaluated_at + FROM assertion_results` + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY evaluated_at DESC" + + rows, err := db.conn.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanAssertionResults(rows) +} + +func scanAssertionResults(rows *sql.Rows) ([]AssertionResult, error) { + var result []AssertionResult + for rows.Next() { + var ar AssertionResult + var valueJSON, evaluatedAt string + if err := rows.Scan(&ar.ID, &ar.AssertionID, &ar.ExecutionID, &ar.Status, + &valueJSON, &ar.Message, &evaluatedAt); err != nil { + return nil, fmt.Errorf("scanning assertion_result: %w", err) + } + ar.Value = unmarshalJSON(valueJSON) + ar.EvaluatedAt, _ = time.Parse(time.RFC3339, evaluatedAt) + result = append(result, ar) + } + return result, nil +} diff --git a/fn_operations/validate.go b/fn_operations/validate.go index 5eb759a0..6719c879 100644 --- a/fn_operations/validate.go +++ b/fn_operations/validate.go @@ -130,6 +130,100 @@ func ValidateRelationInputs(inputs []RelationInput, knownEntities map[string]boo return nil } +// ValidateExecution checks execution integrity rules. +func ValidateExecution(e *Execution) *ValidationError { + var errs []string + + if e.ID == "" { + errs = append(errs, "id is required") + } + if e.PipelineID == "" { + errs = append(errs, "pipeline_id is required") + } + + switch e.Status { + case ExecSuccess, ExecFailure, ExecPartial: + case "": + errs = append(errs, "status is required") + default: + errs = append(errs, fmt.Sprintf("invalid status: %s", e.Status)) + } + + if e.StartedAt.IsZero() { + errs = append(errs, "started_at is required") + } + + if len(errs) > 0 { + return &ValidationError{ID: e.ID, Errors: errs} + } + return nil +} + +// ValidateAssertion checks assertion integrity rules. +func ValidateAssertion(a *Assertion, knownEntities map[string]bool) *ValidationError { + var errs []string + + if a.ID == "" { + errs = append(errs, "id is required") + } + if a.EntityID == "" { + errs = append(errs, "entity_id is required") + } else if knownEntities != nil && !knownEntities[a.EntityID] { + errs = append(errs, fmt.Sprintf("entity_id references unknown entity: %s", a.EntityID)) + } + if a.Name == "" { + errs = append(errs, "name is required") + } + if a.Kind == "" { + errs = append(errs, "kind is required") + } + if a.Rule == "" { + errs = append(errs, "rule is required") + } + + switch a.Severity { + case SeverityCritical, SeverityWarning, SeverityInfo: + case "": + errs = append(errs, "severity is required") + default: + errs = append(errs, fmt.Sprintf("invalid severity: %s", a.Severity)) + } + + if len(errs) > 0 { + return &ValidationError{ID: a.ID, Errors: errs} + } + return nil +} + +// ValidateAssertionResult checks assertion result integrity. +func ValidateAssertionResult(ar *AssertionResult) *ValidationError { + var errs []string + + if ar.ID == "" { + errs = append(errs, "id is required") + } + if ar.AssertionID == "" { + errs = append(errs, "assertion_id is required") + } + + switch ar.Status { + case ResultPass, ResultFail, ResultSkip: + case "": + errs = append(errs, "status is required") + default: + errs = append(errs, fmt.Sprintf("invalid status: %s", ar.Status)) + } + + if ar.EvaluatedAt.IsZero() { + errs = append(errs, "evaluated_at is required") + } + + if len(errs) > 0 { + return &ValidationError{ID: ar.ID, Errors: errs} + } + return nil +} + // DetectCycle checks if adding a causal relation (from -> to) creates a cycle. // Only considers relations where via != "" (causal/transformational). // Semantic relations (via == "") are exempt from cycle detection.