package fn_operations import ( "fmt" "time" "fn-registry/registry" ) // InsertEntityWithSnapshot inserts an entity, snapshotting its type from the registry if needed. // registryDB can be nil if the type is already snapshotted. func InsertEntityWithSnapshot(opsDB *DB, registryDB *registry.DB, e *Entity) error { if err := ValidateEntity(e); err != nil { return err } // Check if type is already snapshotted snap, err := opsDB.GetTypeSnapshot(e.TypeRef) if err != nil { return fmt.Errorf("checking type snapshot: %w", err) } if snap == nil { // Need to fetch from registry if registryDB == nil { return fmt.Errorf("type %q not found in local snapshots and no registry provided", e.TypeRef) } if err := SnapshotType(opsDB, registryDB, e.TypeRef); err != nil { return err } } return opsDB.InsertEntity(e) } // SnapshotType fetches a type from the registry and copies it to types_snapshot. func SnapshotType(opsDB *DB, registryDB *registry.DB, typeID string) error { t, err := registryDB.GetType(typeID) if err != nil { return fmt.Errorf("fetching type %q from registry: %w", typeID, err) } snap := &TypeSnapshot{ ID: t.ID, Version: t.Version, Lang: t.Lang, Algebraic: string(t.Algebraic), Definition: t.Definition, Description: t.Description, SnappedAt: time.Now().UTC(), } return opsDB.InsertTypeSnapshot(snap) } // InsertRelationSafe validates, checks for cycles, and inserts a relation. func InsertRelationSafe(db *DB, r *Relation) error { entities, err := buildEntitySet(db) if err != nil { return err } if err := ValidateRelation(r, entities); err != nil { return err } // from_entity is required when not using relation_inputs if r.FromEntity == "" { return fmt.Errorf("from_entity is required (use InsertRelationWithInputs for multi-input relations)") } // Cycle detection only for causal relations if r.Via != "" { if err := DetectCycle(db, r.FromEntity, r.ToEntity); err != nil { return err } } return db.InsertRelation(r) } // InsertRelationWithInputs validates and inserts a relation with multiple inputs in a transaction. func InsertRelationWithInputs(db *DB, r *Relation, inputs []RelationInput) error { entities, err := buildEntitySet(db) if err != nil { return err } if err := ValidateRelation(r, entities); err != nil { return err } if err := ValidateRelationInputs(inputs, entities); err != nil { return err } // Cycle detection for each input if causal if r.Via != "" { for _, input := range inputs { if err := DetectCycle(db, input.EntityID, r.ToEntity); err != nil { return err } } } tx, err := db.Conn().Begin() if err != nil { return fmt.Errorf("beginning transaction: %w", err) } defer tx.Rollback() // Insert relation now := time.Now().UTC() if r.CreatedAt.IsZero() { r.CreatedAt = now } r.UpdatedAt = now var startedAt, endedAt *string if r.StartedAt != nil { s := r.StartedAt.Format(time.RFC3339) startedAt = &s } if r.EndedAt != nil { s := r.EndedAt.Format(time.RFC3339) endedAt = &s } _, err = tx.Exec(` INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description, r.Purity, string(r.Direction), r.Weight, string(r.Status), startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes, r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339), ) if err != nil { return fmt.Errorf("inserting relation: %w", err) } // Insert inputs for _, ri := range inputs { _, err = tx.Exec(` INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order") VALUES (?, ?, ?, ?, ?)`, ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order, ) if err != nil { return fmt.Errorf("inserting relation_input: %w", err) } } return tx.Commit() } // Graph holds the full entity-relation graph for a project. type Graph struct { Entities []Entity Relations []Relation Inputs map[string][]RelationInput } // GetEntityGraph returns all entities and relations for visualization. func GetEntityGraph(db *DB) (*Graph, error) { entities, err := db.ListEntities("", "") if err != nil { return nil, fmt.Errorf("listing entities: %w", err) } relations, err := db.ListRelations("") if err != nil { return nil, fmt.Errorf("listing relations: %w", err) } inputs := map[string][]RelationInput{} for _, r := range relations { ri, err := db.GetRelationInputs(r.ID) if err != nil { return nil, fmt.Errorf("getting inputs for relation %s: %w", r.ID, err) } if len(ri) > 0 { inputs[r.ID] = ri } } return &Graph{ Entities: entities, Relations: relations, Inputs: inputs, }, nil } // SnapshotStatus describes the state of a snapshot vs the registry. type SnapshotStatus string const ( SnapshotUpToDate SnapshotStatus = "up_to_date" SnapshotOutdated SnapshotStatus = "outdated" SnapshotMissing SnapshotStatus = "missing" // exists locally but not in registry ) // SnapshotCheckResult holds the comparison for one type snapshot. type SnapshotCheckResult struct { ID string LocalVersion string RegistryVersion string Status SnapshotStatus } // CheckSnapshots compares all local snapshots against the registry. func CheckSnapshots(opsDB *DB, registryDB *registry.DB) ([]SnapshotCheckResult, error) { snaps, err := opsDB.ListTypeSnapshots() if err != nil { return nil, fmt.Errorf("listing snapshots: %w", err) } var results []SnapshotCheckResult for _, snap := range snaps { regType, err := registryDB.GetType(snap.ID) if err != nil { // Not found in registry results = append(results, SnapshotCheckResult{ ID: snap.ID, LocalVersion: snap.Version, Status: SnapshotMissing, }) continue } status := SnapshotUpToDate if regType.Version != snap.Version { status = SnapshotOutdated } results = append(results, SnapshotCheckResult{ ID: snap.ID, LocalVersion: snap.Version, RegistryVersion: regType.Version, Status: status, }) } return results, nil } // UpdateSnapshot re-snapshots a type from the registry, replacing the local copy. // Returns the old and new definitions for diffing. func UpdateSnapshot(opsDB *DB, registryDB *registry.DB, typeID string) (old, new_ *TypeSnapshot, err error) { // Get current local snapshot oldSnap, err := opsDB.GetTypeSnapshot(typeID) if err != nil { return nil, nil, fmt.Errorf("reading local snapshot: %w", err) } if oldSnap == nil { return nil, nil, fmt.Errorf("type %q not found in local snapshots", typeID) } // Get current registry type regType, err := registryDB.GetType(typeID) if err != nil { return nil, nil, fmt.Errorf("fetching type %q from registry: %w", typeID, err) } // Build new snapshot newSnap := &TypeSnapshot{ ID: regType.ID, Version: regType.Version, Lang: regType.Lang, Algebraic: string(regType.Algebraic), Definition: regType.Definition, Description: regType.Description, SnappedAt: time.Now().UTC(), } if err := opsDB.UpdateTypeSnapshot(newSnap); err != nil { return nil, nil, fmt.Errorf("updating snapshot: %w", err) } return oldSnap, newSnap, nil } // InsertExecutionSafe validates and inserts an execution. // Auto-calculates duration_ms if both started_at and ended_at are set. func InsertExecutionSafe(db *DB, e *Execution) error { if err := ValidateExecution(e); err != nil { return err } // Auto-calculate duration if e.EndedAt != nil && !e.StartedAt.IsZero() && e.DurationMs == nil { ms := e.EndedAt.Sub(e.StartedAt).Milliseconds() e.DurationMs = &ms } return db.InsertExecution(e) } // InsertAssertionSafe validates that the entity exists, then inserts the assertion. func InsertAssertionSafe(db *DB, a *Assertion) error { entities, err := buildEntitySet(db) if err != nil { return err } if err := ValidateAssertion(a, entities); err != nil { return err } return db.InsertAssertion(a) } // RecordExecutionWithResults inserts an execution and its assertion results in a transaction. func RecordExecutionWithResults(db *DB, e *Execution, results []AssertionResult) error { if err := ValidateExecution(e); err != nil { return err } // Auto-calculate duration if e.EndedAt != nil && !e.StartedAt.IsZero() && e.DurationMs == nil { ms := e.EndedAt.Sub(e.StartedAt).Milliseconds() e.DurationMs = &ms } tx, err := db.Conn().Begin() if err != nil { return fmt.Errorf("beginning transaction: %w", err) } defer tx.Rollback() // Insert execution if e.CreatedAt.IsZero() { e.CreatedAt = time.Now().UTC() } var endedAt *string if e.EndedAt != nil { s := e.EndedAt.Format(time.RFC3339) endedAt = &s } _, err = tx.Exec(` INSERT OR REPLACE INTO executions ( id, pipeline_id, relation_id, status, started_at, ended_at, duration_ms, records_in, records_out, error, metrics, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.PipelineID, e.RelationID, string(e.Status), e.StartedAt.Format(time.RFC3339), endedAt, e.DurationMs, e.RecordsIn, e.RecordsOut, e.Error, marshalJSON(e.Metrics), e.CreatedAt.Format(time.RFC3339), ) if err != nil { return fmt.Errorf("inserting execution: %w", err) } // Insert assertion results for _, ar := range results { if ar.EvaluatedAt.IsZero() { ar.EvaluatedAt = time.Now().UTC() } _, err = tx.Exec(` INSERT OR REPLACE INTO assertion_results ( id, assertion_id, execution_id, status, value, message, evaluated_at ) VALUES (?, ?, ?, ?, ?, ?, ?)`, ar.ID, ar.AssertionID, ar.ExecutionID, string(ar.Status), marshalJSON(ar.Value), ar.Message, ar.EvaluatedAt.Format(time.RFC3339), ) if err != nil { return fmt.Errorf("inserting assertion_result: %w", err) } } return tx.Commit() } // --- Reactive loop --- // ReactiveResult holds what the reactive loop did after analyzing assertion results. type ReactiveResult struct { EntityUpdates []EntityStatusChange Proposals []string // IDs of proposals created } // EntityStatusChange records a status transition triggered by assertions. type EntityStatusChange struct { EntityID string OldStatus EntityStatus NewStatus EntityStatus Reason string } // React analyzes assertion results and applies the severity rules: // - critical fail → entity.status = corrupted // - warning fail → entity.status = stale (only if currently active) // - info fail → no status change // // If registryDB is provided and critical failures exist, a proposal is auto-created. // Returns what changed so the caller can log or display it. func React(opsDB *DB, registryDB *registry.DB, results []AssertionResult) (*ReactiveResult, error) { rr := &ReactiveResult{} // Group failures by entity, tracking worst severity per entity. type entityImpact struct { worstSeverity Severity failures []AssertionResult assertionIDs []string } impacts := map[string]*entityImpact{} for _, ar := range results { if ar.Status != ResultFail { continue } // Look up the assertion to get entity_id and severity a, err := opsDB.GetAssertion(ar.AssertionID) if err != nil || a == nil { continue } imp, ok := impacts[a.EntityID] if !ok { imp = &entityImpact{} impacts[a.EntityID] = imp } imp.failures = append(imp.failures, ar) imp.assertionIDs = append(imp.assertionIDs, ar.AssertionID) // Track worst severity (critical > warning > info) if severityRank(a.Severity) > severityRank(imp.worstSeverity) { imp.worstSeverity = a.Severity } } // Apply status changes per entity for entityID, imp := range impacts { entity, err := opsDB.GetEntity(entityID) if err != nil || entity == nil { continue } var newStatus EntityStatus switch imp.worstSeverity { case SeverityCritical: newStatus = StatusCorrupted case SeverityWarning: // Only degrade active → stale, don't touch corrupted/archived if entity.Status == StatusActive { newStatus = StatusStale } default: continue // info: no status change } if newStatus == "" || newStatus == entity.Status { continue } oldStatus := entity.Status entity.Status = newStatus if err := opsDB.UpdateEntity(entity); err != nil { return nil, fmt.Errorf("updating entity %s status: %w", entityID, err) } reason := fmt.Sprintf("%s assertion(s) failed: %v", imp.worstSeverity, imp.assertionIDs) rr.EntityUpdates = append(rr.EntityUpdates, EntityStatusChange{ EntityID: entityID, OldStatus: oldStatus, NewStatus: newStatus, Reason: reason, }) } // Create proposals in registry for critical failures if registryDB != nil { for entityID, imp := range impacts { if imp.worstSeverity != SeverityCritical { continue } // Build evidence from failures failureDetails := make([]map[string]any, 0, len(imp.failures)) for _, f := range imp.failures { failureDetails = append(failureDetails, map[string]any{ "assertion_id": f.AssertionID, "execution_id": f.ExecutionID, "message": f.Message, "value": f.Value, }) } p := ®istry.Proposal{ ID: fmt.Sprintf("proposal_react_%s_%d", entityID, time.Now().UnixNano()), Kind: registry.ProposalImproveFunction, Title: fmt.Sprintf("Critical assertion failures on entity %s", entityID), Description: fmt.Sprintf("%d critical assertion(s) failed. Entity marked as corrupted.", len(imp.failures)), Evidence: map[string]any{ "entity_id": entityID, "assertion_ids": imp.assertionIDs, "failures": failureDetails, }, Status: registry.ProposalPending, CreatedBy: "reactive_loop", } if err := registryDB.InsertProposal(p); err != nil { return nil, fmt.Errorf("creating proposal for entity %s: %w", entityID, err) } rr.Proposals = append(rr.Proposals, p.ID) } } return rr, nil } // ExecuteAndReact is the full autonomous loop step: // 1. Record the execution // 2. Evaluate all active assertions on affected entities // 3. React to failures (update entity status, create proposals) func ExecuteAndReact(opsDB *DB, registryDB *registry.DB, e *Execution, entityIDs []string) (*ReactiveResult, error) { // 1. Record execution if err := InsertExecutionSafe(opsDB, e); err != nil { return nil, fmt.Errorf("recording execution: %w", err) } // 2. Evaluate assertions for each affected entity var allResults []AssertionResult for _, entityID := range entityIDs { results, err := EvalEntityAssertions(opsDB, entityID, e.ID) if err != nil { return nil, fmt.Errorf("evaluating assertions for entity %s: %w", entityID, err) } allResults = append(allResults, results...) } // 3. React return React(opsDB, registryDB, allResults) } func severityRank(s Severity) int { switch s { case SeverityCritical: return 3 case SeverityWarning: return 2 case SeverityInfo: return 1 default: return 0 } } func buildEntitySet(db *DB) (map[string]bool, error) { all, err := db.ListEntities("", "") if err != nil { return nil, fmt.Errorf("building entity set: %w", err) } set := make(map[string]bool, len(all)) for _, e := range all { set[e.ID] = true } return set, nil }