package fn_operations import ( "fmt" "strings" ) // ValidationError represents one or more integrity violations. type ValidationError struct { ID string Errors []string } func (v *ValidationError) Error() string { return fmt.Sprintf("%s: %s", v.ID, strings.Join(v.Errors, "; ")) } // ValidateEntity checks entity integrity rules. func ValidateEntity(e *Entity) *ValidationError { var errs []string if e.ID == "" { errs = append(errs, "id is required") } if e.Name == "" { errs = append(errs, "name is required") } if e.TypeRef == "" { errs = append(errs, "type_ref is required") } if e.Source == "" { errs = append(errs, "source is required") } switch e.Status { case StatusActive, StatusStale, StatusCorrupted, StatusArchived: case "": errs = append(errs, "status is required") default: errs = append(errs, fmt.Sprintf("invalid status: %s", e.Status)) } if len(errs) > 0 { return &ValidationError{ID: e.ID, Errors: errs} } return nil } // ValidateRelation checks relation integrity rules. // knownEntities is a set of entity IDs that exist. func ValidateRelation(r *Relation, knownEntities map[string]bool) *ValidationError { var errs []string if r.ID == "" { errs = append(errs, "id is required") } if r.Name == "" { errs = append(errs, "name is required") } if r.ToEntity == "" { errs = append(errs, "to_entity is required") } // from_entity or relation_inputs — validated at operation level if r.FromEntity != "" && r.ToEntity != "" && r.FromEntity == r.ToEntity { errs = append(errs, "from_entity and to_entity cannot be the same") } if r.FromEntity != "" && !knownEntities[r.FromEntity] { errs = append(errs, fmt.Sprintf("from_entity references unknown entity: %s", r.FromEntity)) } if r.ToEntity != "" && !knownEntities[r.ToEntity] { errs = append(errs, fmt.Sprintf("to_entity references unknown entity: %s", r.ToEntity)) } if r.Weight != nil { if *r.Weight < 0.0 || *r.Weight > 1.0 { errs = append(errs, "weight must be between 0.0 and 1.0") } } if r.StartedAt != nil && r.EndedAt != nil { if r.StartedAt.After(*r.EndedAt) { errs = append(errs, "started_at must be before ended_at") } } switch r.Direction { case DirUnidirectional, DirBidirectional, DirInverse, "": default: errs = append(errs, fmt.Sprintf("invalid direction: %s", r.Direction)) } if len(errs) > 0 { return &ValidationError{ID: r.ID, Errors: errs} } return nil } // ValidateRelationInputs checks relation_inputs integrity. func ValidateRelationInputs(inputs []RelationInput, knownEntities map[string]bool) *ValidationError { var errs []string if len(inputs) < 2 { errs = append(errs, "relation_inputs must have at least 2 entries") } for i, ri := range inputs { if ri.RelationID == "" { errs = append(errs, fmt.Sprintf("input[%d]: relation_id is required", i)) } if ri.EntityID == "" { errs = append(errs, fmt.Sprintf("input[%d]: entity_id is required", i)) } if ri.Role == "" { errs = append(errs, fmt.Sprintf("input[%d]: role is required", i)) } if ri.EntityID != "" && !knownEntities[ri.EntityID] { errs = append(errs, fmt.Sprintf("input[%d]: entity_id references unknown entity: %s", i, ri.EntityID)) } } if len(errs) > 0 { id := "relation_inputs" if len(inputs) > 0 { id = inputs[0].RelationID } return &ValidationError{ID: id, Errors: errs} } return nil } // ValidateExecution checks execution integrity rules. func ValidateExecution(e *Execution) *ValidationError { var errs []string if e.ID == "" { errs = append(errs, "id is required") } if e.PipelineID == "" { errs = append(errs, "pipeline_id is required") } switch e.Status { case ExecSuccess, ExecFailure, ExecPartial: case "": errs = append(errs, "status is required") default: errs = append(errs, fmt.Sprintf("invalid status: %s", e.Status)) } if e.StartedAt.IsZero() { errs = append(errs, "started_at is required") } if len(errs) > 0 { return &ValidationError{ID: e.ID, Errors: errs} } return nil } // ValidateAssertion checks assertion integrity rules. func ValidateAssertion(a *Assertion, knownEntities map[string]bool) *ValidationError { var errs []string if a.ID == "" { errs = append(errs, "id is required") } if a.EntityID == "" { errs = append(errs, "entity_id is required") } else if knownEntities != nil && !knownEntities[a.EntityID] { errs = append(errs, fmt.Sprintf("entity_id references unknown entity: %s", a.EntityID)) } if a.Name == "" { errs = append(errs, "name is required") } if a.Kind == "" { errs = append(errs, "kind is required") } if a.Rule == "" { errs = append(errs, "rule is required") } switch a.Severity { case SeverityCritical, SeverityWarning, SeverityInfo: case "": errs = append(errs, "severity is required") default: errs = append(errs, fmt.Sprintf("invalid severity: %s", a.Severity)) } if len(errs) > 0 { return &ValidationError{ID: a.ID, Errors: errs} } return nil } // ValidateAssertionResult checks assertion result integrity. func ValidateAssertionResult(ar *AssertionResult) *ValidationError { var errs []string if ar.ID == "" { errs = append(errs, "id is required") } if ar.AssertionID == "" { errs = append(errs, "assertion_id is required") } switch ar.Status { case ResultPass, ResultFail, ResultSkip: case "": errs = append(errs, "status is required") default: errs = append(errs, fmt.Sprintf("invalid status: %s", ar.Status)) } if ar.EvaluatedAt.IsZero() { errs = append(errs, "evaluated_at is required") } if len(errs) > 0 { return &ValidationError{ID: ar.ID, Errors: errs} } return nil } // DetectCycle checks if adding a causal relation (from -> to) creates a cycle. // Only considers relations where via != "" (causal/transformational). // Semantic relations (via == "") are exempt from cycle detection. func DetectCycle(db *DB, fromEntity, toEntity string) error { if fromEntity == "" || toEntity == "" { return nil } // BFS from toEntity following only causal relations. // If we reach fromEntity, there's a cycle. visited := map[string]bool{} queue := []string{toEntity} for len(queue) > 0 { current := queue[0] queue = queue[1:] if visited[current] { continue } visited[current] = true if current == fromEntity { return fmt.Errorf("cycle detected: adding relation %s -> %s would create a causal cycle", fromEntity, toEntity) } // Follow causal relations from current entity rows, err := db.conn.Query(` SELECT to_entity FROM relations WHERE from_entity = ? AND via != ''`, current) if err != nil { return fmt.Errorf("querying relations for cycle detection: %w", err) } for rows.Next() { var next string if err := rows.Scan(&next); err != nil { rows.Close() return err } if !visited[next] { queue = append(queue, next) } } rows.Close() } return nil }