9ba1f86c34
Añade Execution, Assertion, AssertionResult al paquete fn_operations. Motor de evaluación de assertions con reescritura SQL automática. Bucle reactivo: ExecuteAndReact evalúa assertions y cambia status de entities (corrupted/stale) + auto-crea proposals en registry. CLI fn ops: assertion (add/list/show/delete/eval) y execution (add/list/show). Migración 002_executions_assertions.sql con FTS para assertions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
561 lines
15 KiB
Go
561 lines
15 KiB
Go
package fn_operations
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"fn-registry/registry"
|
|
)
|
|
|
|
// InsertEntityWithSnapshot inserts an entity, snapshotting its type from the registry if needed.
|
|
// registryDB can be nil if the type is already snapshotted.
|
|
func InsertEntityWithSnapshot(opsDB *DB, registryDB *registry.DB, e *Entity) error {
|
|
if err := ValidateEntity(e); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if type is already snapshotted
|
|
snap, err := opsDB.GetTypeSnapshot(e.TypeRef)
|
|
if err != nil {
|
|
return fmt.Errorf("checking type snapshot: %w", err)
|
|
}
|
|
|
|
if snap == nil {
|
|
// Need to fetch from registry
|
|
if registryDB == nil {
|
|
return fmt.Errorf("type %q not found in local snapshots and no registry provided", e.TypeRef)
|
|
}
|
|
if err := SnapshotType(opsDB, registryDB, e.TypeRef); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return opsDB.InsertEntity(e)
|
|
}
|
|
|
|
// SnapshotType fetches a type from the registry and copies it to types_snapshot.
|
|
func SnapshotType(opsDB *DB, registryDB *registry.DB, typeID string) error {
|
|
t, err := registryDB.GetType(typeID)
|
|
if err != nil {
|
|
return fmt.Errorf("fetching type %q from registry: %w", typeID, err)
|
|
}
|
|
|
|
snap := &TypeSnapshot{
|
|
ID: t.ID,
|
|
Version: t.Version,
|
|
Lang: t.Lang,
|
|
Algebraic: string(t.Algebraic),
|
|
Definition: t.Definition,
|
|
Description: t.Description,
|
|
SnappedAt: time.Now().UTC(),
|
|
}
|
|
|
|
return opsDB.InsertTypeSnapshot(snap)
|
|
}
|
|
|
|
// InsertRelationSafe validates, checks for cycles, and inserts a relation.
|
|
func InsertRelationSafe(db *DB, r *Relation) error {
|
|
entities, err := buildEntitySet(db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ValidateRelation(r, entities); err != nil {
|
|
return err
|
|
}
|
|
|
|
// from_entity is required when not using relation_inputs
|
|
if r.FromEntity == "" {
|
|
return fmt.Errorf("from_entity is required (use InsertRelationWithInputs for multi-input relations)")
|
|
}
|
|
|
|
// Cycle detection only for causal relations
|
|
if r.Via != "" {
|
|
if err := DetectCycle(db, r.FromEntity, r.ToEntity); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return db.InsertRelation(r)
|
|
}
|
|
|
|
// InsertRelationWithInputs validates and inserts a relation with multiple inputs in a transaction.
|
|
func InsertRelationWithInputs(db *DB, r *Relation, inputs []RelationInput) error {
|
|
entities, err := buildEntitySet(db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ValidateRelation(r, entities); err != nil {
|
|
return err
|
|
}
|
|
if err := ValidateRelationInputs(inputs, entities); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Cycle detection for each input if causal
|
|
if r.Via != "" {
|
|
for _, input := range inputs {
|
|
if err := DetectCycle(db, input.EntityID, r.ToEntity); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
tx, err := db.Conn().Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("beginning transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Insert relation
|
|
now := time.Now().UTC()
|
|
if r.CreatedAt.IsZero() {
|
|
r.CreatedAt = now
|
|
}
|
|
r.UpdatedAt = now
|
|
|
|
var startedAt, endedAt *string
|
|
if r.StartedAt != nil {
|
|
s := r.StartedAt.Format(time.RFC3339)
|
|
startedAt = &s
|
|
}
|
|
if r.EndedAt != nil {
|
|
s := r.EndedAt.Format(time.RFC3339)
|
|
endedAt = &s
|
|
}
|
|
|
|
_, err = tx.Exec(`
|
|
INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
|
|
r.Purity, string(r.Direction), r.Weight, string(r.Status),
|
|
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
|
|
r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting relation: %w", err)
|
|
}
|
|
|
|
// Insert inputs
|
|
for _, ri := range inputs {
|
|
_, err = tx.Exec(`
|
|
INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order")
|
|
VALUES (?, ?, ?, ?, ?)`,
|
|
ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting relation_input: %w", err)
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// Graph holds the full entity-relation graph for a project.
|
|
type Graph struct {
|
|
Entities []Entity
|
|
Relations []Relation
|
|
Inputs map[string][]RelationInput
|
|
}
|
|
|
|
// GetEntityGraph returns all entities and relations for visualization.
|
|
func GetEntityGraph(db *DB) (*Graph, error) {
|
|
entities, err := db.ListEntities("", "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing entities: %w", err)
|
|
}
|
|
|
|
relations, err := db.ListRelations("")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing relations: %w", err)
|
|
}
|
|
|
|
inputs := map[string][]RelationInput{}
|
|
for _, r := range relations {
|
|
ri, err := db.GetRelationInputs(r.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting inputs for relation %s: %w", r.ID, err)
|
|
}
|
|
if len(ri) > 0 {
|
|
inputs[r.ID] = ri
|
|
}
|
|
}
|
|
|
|
return &Graph{
|
|
Entities: entities,
|
|
Relations: relations,
|
|
Inputs: inputs,
|
|
}, nil
|
|
}
|
|
|
|
// SnapshotStatus describes the state of a snapshot vs the registry.
|
|
type SnapshotStatus string
|
|
|
|
const (
|
|
SnapshotUpToDate SnapshotStatus = "up_to_date"
|
|
SnapshotOutdated SnapshotStatus = "outdated"
|
|
SnapshotMissing SnapshotStatus = "missing" // exists locally but not in registry
|
|
)
|
|
|
|
// SnapshotCheckResult holds the comparison for one type snapshot.
|
|
type SnapshotCheckResult struct {
|
|
ID string
|
|
LocalVersion string
|
|
RegistryVersion string
|
|
Status SnapshotStatus
|
|
}
|
|
|
|
// CheckSnapshots compares all local snapshots against the registry.
|
|
func CheckSnapshots(opsDB *DB, registryDB *registry.DB) ([]SnapshotCheckResult, error) {
|
|
snaps, err := opsDB.ListTypeSnapshots()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing snapshots: %w", err)
|
|
}
|
|
|
|
var results []SnapshotCheckResult
|
|
for _, snap := range snaps {
|
|
regType, err := registryDB.GetType(snap.ID)
|
|
if err != nil {
|
|
// Not found in registry
|
|
results = append(results, SnapshotCheckResult{
|
|
ID: snap.ID,
|
|
LocalVersion: snap.Version,
|
|
Status: SnapshotMissing,
|
|
})
|
|
continue
|
|
}
|
|
|
|
status := SnapshotUpToDate
|
|
if regType.Version != snap.Version {
|
|
status = SnapshotOutdated
|
|
}
|
|
|
|
results = append(results, SnapshotCheckResult{
|
|
ID: snap.ID,
|
|
LocalVersion: snap.Version,
|
|
RegistryVersion: regType.Version,
|
|
Status: status,
|
|
})
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// UpdateSnapshot re-snapshots a type from the registry, replacing the local copy.
|
|
// Returns the old and new definitions for diffing.
|
|
func UpdateSnapshot(opsDB *DB, registryDB *registry.DB, typeID string) (old, new_ *TypeSnapshot, err error) {
|
|
// Get current local snapshot
|
|
oldSnap, err := opsDB.GetTypeSnapshot(typeID)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("reading local snapshot: %w", err)
|
|
}
|
|
if oldSnap == nil {
|
|
return nil, nil, fmt.Errorf("type %q not found in local snapshots", typeID)
|
|
}
|
|
|
|
// Get current registry type
|
|
regType, err := registryDB.GetType(typeID)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("fetching type %q from registry: %w", typeID, err)
|
|
}
|
|
|
|
// Build new snapshot
|
|
newSnap := &TypeSnapshot{
|
|
ID: regType.ID,
|
|
Version: regType.Version,
|
|
Lang: regType.Lang,
|
|
Algebraic: string(regType.Algebraic),
|
|
Definition: regType.Definition,
|
|
Description: regType.Description,
|
|
SnappedAt: time.Now().UTC(),
|
|
}
|
|
|
|
if err := opsDB.UpdateTypeSnapshot(newSnap); err != nil {
|
|
return nil, nil, fmt.Errorf("updating snapshot: %w", err)
|
|
}
|
|
|
|
return oldSnap, newSnap, nil
|
|
}
|
|
|
|
// InsertExecutionSafe validates and inserts an execution.
|
|
// Auto-calculates duration_ms if both started_at and ended_at are set.
|
|
func InsertExecutionSafe(db *DB, e *Execution) error {
|
|
if err := ValidateExecution(e); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Auto-calculate duration
|
|
if e.EndedAt != nil && !e.StartedAt.IsZero() && e.DurationMs == nil {
|
|
ms := e.EndedAt.Sub(e.StartedAt).Milliseconds()
|
|
e.DurationMs = &ms
|
|
}
|
|
|
|
return db.InsertExecution(e)
|
|
}
|
|
|
|
// InsertAssertionSafe validates that the entity exists, then inserts the assertion.
|
|
func InsertAssertionSafe(db *DB, a *Assertion) error {
|
|
entities, err := buildEntitySet(db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ValidateAssertion(a, entities); err != nil {
|
|
return err
|
|
}
|
|
|
|
return db.InsertAssertion(a)
|
|
}
|
|
|
|
// RecordExecutionWithResults inserts an execution and its assertion results in a transaction.
|
|
func RecordExecutionWithResults(db *DB, e *Execution, results []AssertionResult) error {
|
|
if err := ValidateExecution(e); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Auto-calculate duration
|
|
if e.EndedAt != nil && !e.StartedAt.IsZero() && e.DurationMs == nil {
|
|
ms := e.EndedAt.Sub(e.StartedAt).Milliseconds()
|
|
e.DurationMs = &ms
|
|
}
|
|
|
|
tx, err := db.Conn().Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("beginning transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Insert execution
|
|
if e.CreatedAt.IsZero() {
|
|
e.CreatedAt = time.Now().UTC()
|
|
}
|
|
var endedAt *string
|
|
if e.EndedAt != nil {
|
|
s := e.EndedAt.Format(time.RFC3339)
|
|
endedAt = &s
|
|
}
|
|
_, err = tx.Exec(`
|
|
INSERT OR REPLACE INTO executions (
|
|
id, pipeline_id, relation_id, status, started_at, ended_at,
|
|
duration_ms, records_in, records_out, error, metrics, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
e.ID, e.PipelineID, e.RelationID, string(e.Status),
|
|
e.StartedAt.Format(time.RFC3339), endedAt,
|
|
e.DurationMs, e.RecordsIn, e.RecordsOut, e.Error,
|
|
marshalJSON(e.Metrics), e.CreatedAt.Format(time.RFC3339),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting execution: %w", err)
|
|
}
|
|
|
|
// Insert assertion results
|
|
for _, ar := range results {
|
|
if ar.EvaluatedAt.IsZero() {
|
|
ar.EvaluatedAt = time.Now().UTC()
|
|
}
|
|
_, err = tx.Exec(`
|
|
INSERT OR REPLACE INTO assertion_results (
|
|
id, assertion_id, execution_id, status, value, message, evaluated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
|
ar.ID, ar.AssertionID, ar.ExecutionID, string(ar.Status),
|
|
marshalJSON(ar.Value), ar.Message, ar.EvaluatedAt.Format(time.RFC3339),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting assertion_result: %w", err)
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// --- Reactive loop ---
|
|
|
|
// ReactiveResult holds what the reactive loop did after analyzing assertion results.
|
|
type ReactiveResult struct {
|
|
EntityUpdates []EntityStatusChange
|
|
Proposals []string // IDs of proposals created
|
|
}
|
|
|
|
// EntityStatusChange records a status transition triggered by assertions.
|
|
type EntityStatusChange struct {
|
|
EntityID string
|
|
OldStatus EntityStatus
|
|
NewStatus EntityStatus
|
|
Reason string
|
|
}
|
|
|
|
// React analyzes assertion results and applies the severity rules:
|
|
// - critical fail → entity.status = corrupted
|
|
// - warning fail → entity.status = stale (only if currently active)
|
|
// - info fail → no status change
|
|
//
|
|
// If registryDB is provided and critical failures exist, a proposal is auto-created.
|
|
// Returns what changed so the caller can log or display it.
|
|
func React(opsDB *DB, registryDB *registry.DB, results []AssertionResult) (*ReactiveResult, error) {
|
|
rr := &ReactiveResult{}
|
|
|
|
// Group failures by entity, tracking worst severity per entity.
|
|
type entityImpact struct {
|
|
worstSeverity Severity
|
|
failures []AssertionResult
|
|
assertionIDs []string
|
|
}
|
|
impacts := map[string]*entityImpact{}
|
|
|
|
for _, ar := range results {
|
|
if ar.Status != ResultFail {
|
|
continue
|
|
}
|
|
|
|
// Look up the assertion to get entity_id and severity
|
|
a, err := opsDB.GetAssertion(ar.AssertionID)
|
|
if err != nil || a == nil {
|
|
continue
|
|
}
|
|
|
|
imp, ok := impacts[a.EntityID]
|
|
if !ok {
|
|
imp = &entityImpact{}
|
|
impacts[a.EntityID] = imp
|
|
}
|
|
|
|
imp.failures = append(imp.failures, ar)
|
|
imp.assertionIDs = append(imp.assertionIDs, ar.AssertionID)
|
|
|
|
// Track worst severity (critical > warning > info)
|
|
if severityRank(a.Severity) > severityRank(imp.worstSeverity) {
|
|
imp.worstSeverity = a.Severity
|
|
}
|
|
}
|
|
|
|
// Apply status changes per entity
|
|
for entityID, imp := range impacts {
|
|
entity, err := opsDB.GetEntity(entityID)
|
|
if err != nil || entity == nil {
|
|
continue
|
|
}
|
|
|
|
var newStatus EntityStatus
|
|
switch imp.worstSeverity {
|
|
case SeverityCritical:
|
|
newStatus = StatusCorrupted
|
|
case SeverityWarning:
|
|
// Only degrade active → stale, don't touch corrupted/archived
|
|
if entity.Status == StatusActive {
|
|
newStatus = StatusStale
|
|
}
|
|
default:
|
|
continue // info: no status change
|
|
}
|
|
|
|
if newStatus == "" || newStatus == entity.Status {
|
|
continue
|
|
}
|
|
|
|
oldStatus := entity.Status
|
|
entity.Status = newStatus
|
|
if err := opsDB.UpdateEntity(entity); err != nil {
|
|
return nil, fmt.Errorf("updating entity %s status: %w", entityID, err)
|
|
}
|
|
|
|
reason := fmt.Sprintf("%s assertion(s) failed: %v", imp.worstSeverity, imp.assertionIDs)
|
|
rr.EntityUpdates = append(rr.EntityUpdates, EntityStatusChange{
|
|
EntityID: entityID,
|
|
OldStatus: oldStatus,
|
|
NewStatus: newStatus,
|
|
Reason: reason,
|
|
})
|
|
}
|
|
|
|
// Create proposals in registry for critical failures
|
|
if registryDB != nil {
|
|
for entityID, imp := range impacts {
|
|
if imp.worstSeverity != SeverityCritical {
|
|
continue
|
|
}
|
|
|
|
// Build evidence from failures
|
|
failureDetails := make([]map[string]any, 0, len(imp.failures))
|
|
for _, f := range imp.failures {
|
|
failureDetails = append(failureDetails, map[string]any{
|
|
"assertion_id": f.AssertionID,
|
|
"execution_id": f.ExecutionID,
|
|
"message": f.Message,
|
|
"value": f.Value,
|
|
})
|
|
}
|
|
|
|
p := ®istry.Proposal{
|
|
ID: fmt.Sprintf("proposal_react_%s_%d", entityID, time.Now().UnixNano()),
|
|
Kind: registry.ProposalImproveFunction,
|
|
Title: fmt.Sprintf("Critical assertion failures on entity %s", entityID),
|
|
Description: fmt.Sprintf("%d critical assertion(s) failed. Entity marked as corrupted.", len(imp.failures)),
|
|
Evidence: map[string]any{
|
|
"entity_id": entityID,
|
|
"assertion_ids": imp.assertionIDs,
|
|
"failures": failureDetails,
|
|
},
|
|
Status: registry.ProposalPending,
|
|
CreatedBy: "reactive_loop",
|
|
}
|
|
|
|
if err := registryDB.InsertProposal(p); err != nil {
|
|
return nil, fmt.Errorf("creating proposal for entity %s: %w", entityID, err)
|
|
}
|
|
|
|
rr.Proposals = append(rr.Proposals, p.ID)
|
|
}
|
|
}
|
|
|
|
return rr, nil
|
|
}
|
|
|
|
// ExecuteAndReact is the full autonomous loop step:
|
|
// 1. Record the execution
|
|
// 2. Evaluate all active assertions on affected entities
|
|
// 3. React to failures (update entity status, create proposals)
|
|
func ExecuteAndReact(opsDB *DB, registryDB *registry.DB, e *Execution, entityIDs []string) (*ReactiveResult, error) {
|
|
// 1. Record execution
|
|
if err := InsertExecutionSafe(opsDB, e); err != nil {
|
|
return nil, fmt.Errorf("recording execution: %w", err)
|
|
}
|
|
|
|
// 2. Evaluate assertions for each affected entity
|
|
var allResults []AssertionResult
|
|
for _, entityID := range entityIDs {
|
|
results, err := EvalEntityAssertions(opsDB, entityID, e.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("evaluating assertions for entity %s: %w", entityID, err)
|
|
}
|
|
allResults = append(allResults, results...)
|
|
}
|
|
|
|
// 3. React
|
|
return React(opsDB, registryDB, allResults)
|
|
}
|
|
|
|
func severityRank(s Severity) int {
|
|
switch s {
|
|
case SeverityCritical:
|
|
return 3
|
|
case SeverityWarning:
|
|
return 2
|
|
case SeverityInfo:
|
|
return 1
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func buildEntitySet(db *DB) (map[string]bool, error) {
|
|
all, err := db.ListEntities("", "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("building entity set: %w", err)
|
|
}
|
|
set := make(map[string]bool, len(all))
|
|
for _, e := range all {
|
|
set[e.ID] = true
|
|
}
|
|
return set, nil
|
|
}
|