package main import ( "encoding/json" "fmt" "io" "os" "path/filepath" "strings" "text/tabwriter" "time" ops "fn-registry/fn_operations" "fn-registry/registry" ) const opsDBName = "operations.db" func cmdOps(args []string) { if len(args) < 1 { printOpsUsage() os.Exit(1) } switch args[0] { case "init": cmdOpsInit(args[1:]) case "entity": cmdOpsEntity(args[1:]) case "relation": cmdOpsRelation(args[1:]) case "graph": cmdOpsGraph() case "snapshot": cmdOpsSnapshot(args[1:]) case "execution": cmdOpsExecution(args[1:]) case "assertion": cmdOpsAssertion(args[1:]) case "log": cmdOpsLog(args[1:]) case "help", "-h", "--help": printOpsUsage() default: fmt.Fprintf(os.Stderr, "unknown ops command: %s\n", args[0]) printOpsUsage() os.Exit(1) } } func printOpsUsage() { fmt.Println(`fn ops — operations CLI Usage: fn ops init [path] Crea operations.db en path (default: .) fn ops entity add Añade entity fn ops entity list [-d domain] [-s status] Lista entities fn ops entity show Muestra entity fn ops entity delete Elimina entity fn ops relation add Añade relation fn ops relation list [entity_id] Lista relations fn ops relation show Muestra relation fn ops relation delete Elimina relation fn ops graph Grafo ASCII de entities y relations fn ops snapshot list Lista tipos snapshotted fn ops snapshot check Compara snapshots vs registry fn ops snapshot update |--all Re-snapshot desde registry Entity flags: --id --name --type-ref --source --domain --status --description --tags --metadata --notes Relation flags: --id --name --from --to --via --direction --status --purity --weight <0.0-1.0> --description --tags --notes Execution commands: fn ops execution add Registra ejecucion fn ops execution list [--pipeline-id ] [-s status] fn ops execution show Muestra ejecucion Assertion commands: fn ops assertion add Añade assertion fn ops assertion list [--entity-id ] Lista assertions fn ops assertion show Muestra assertion fn ops assertion delete Elimina assertion fn ops assertion eval --entity-id Evalua assertions activas fn ops assertion result add Registra resultado manual fn ops assertion result list [--assertion-id ] Log commands: fn ops log add Registra log entry fn ops log list [--level ] [--source ] [--limit N] fn ops log show Muestra log entry Log flags: --id --level --source --entity-id --execution-id --message --metadata `) } // --- ops init --- func cmdOpsInit(args []string) { dir := "." if len(args) > 0 { dir = args[0] } path := filepath.Join(dir, opsDBName) if _, err := os.Stat(path); err == nil { fmt.Fprintf(os.Stderr, "operations.db already exists at %s\n", path) os.Exit(1) } // Copy from template if available, otherwise create fresh templatePath := filepath.Join(root(), "fn_operations", "project_template", "operations.db") if _, err := os.Stat(templatePath); err == nil { src, err := os.Open(templatePath) if err != nil { fmt.Fprintf(os.Stderr, "error opening template: %v\n", err) os.Exit(1) } defer src.Close() if err := os.MkdirAll(dir, 0o755); err != nil { fmt.Fprintf(os.Stderr, "error creating directory: %v\n", err) os.Exit(1) } dst, err := os.Create(path) if err != nil { fmt.Fprintf(os.Stderr, "error creating db: %v\n", err) os.Exit(1) } defer dst.Close() if _, err := io.Copy(dst, src); err != nil { fmt.Fprintf(os.Stderr, "error copying template: %v\n", err) os.Exit(1) } fmt.Printf("operations.db created at %s (from template)\n", path) return } // Create fresh db, err := ops.Open(path) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } db.Close() fmt.Printf("operations.db created at %s\n", path) } // --- ops entity --- func cmdOpsEntity(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops entity ...") os.Exit(1) } switch args[0] { case "add": cmdOpsEntityAdd(args[1:]) case "list": cmdOpsEntityList(args[1:]) case "show": cmdOpsEntityShow(args[1:]) case "delete": cmdOpsEntityDelete(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown entity command: %s\n", args[0]) os.Exit(1) } } func cmdOpsEntityAdd(args []string) { var e ops.Entity e.Status = ops.StatusActive var tagsStr, metadataStr string for i := 0; i < len(args); i++ { switch args[i] { case "--id": i++; e.ID = args[i] case "--name": i++; e.Name = args[i] case "--type-ref": i++; e.TypeRef = args[i] case "--source": i++; e.Source = args[i] case "--domain": i++; e.Domain = args[i] case "--status": i++; e.Status = ops.EntityStatus(args[i]) case "--description": i++; e.Description = args[i] case "--tags": i++; tagsStr = args[i] case "--metadata": i++; metadataStr = args[i] case "--notes": i++; e.Notes = args[i] } } if e.Name == "" || e.TypeRef == "" || e.Source == "" { fmt.Fprintln(os.Stderr, "required: --name, --type-ref, --source") os.Exit(1) } if e.ID == "" { e.ID = e.Name } if tagsStr != "" { e.Tags = strings.Split(tagsStr, ",") } if metadataStr != "" { json.Unmarshal([]byte(metadataStr), &e.Metadata) } opsDB := openOpsDB() defer opsDB.Close() // Try to open registry for type snapshot regDB := tryOpenRegistryDB() if regDB != nil { defer regDB.Close() } if err := ops.InsertEntityWithSnapshot(opsDB, regDB, &e); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Entity %s added\n", e.ID) } func cmdOpsEntityList(args []string) { var domain string var status ops.EntityStatus for i := 0; i < len(args); i++ { switch args[i] { case "-d": i++; domain = args[i] case "-s": i++; status = ops.EntityStatus(args[i]) } } db := openOpsDB() defer db.Close() entities, err := db.ListEntities(domain, status) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(entities) == 0 { fmt.Println("No entities.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tTYPE_REF\tSTATUS\tSOURCE\tDOMAIN") for _, e := range entities { fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", e.ID, e.TypeRef, e.Status, e.Source, e.Domain) } w.Flush() } func cmdOpsEntityShow(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops entity show ") os.Exit(1) } db := openOpsDB() defer db.Close() e, err := db.GetEntity(args[0]) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if e == nil { fmt.Fprintf(os.Stderr, "entity not found: %s\n", args[0]) os.Exit(1) } fmt.Printf("ID: %s\n", e.ID) fmt.Printf("Name: %s\n", e.Name) fmt.Printf("Type ref: %s\n", e.TypeRef) fmt.Printf("Status: %s\n", e.Status) fmt.Printf("Source: %s\n", e.Source) fmt.Printf("Domain: %s\n", e.Domain) fmt.Printf("Description: %s\n", e.Description) fmt.Printf("Tags: %s\n", strings.Join(e.Tags, ", ")) if len(e.Metadata) > 0 { meta, _ := json.MarshalIndent(e.Metadata, " ", " ") fmt.Printf("Metadata: %s\n", meta) } if e.Notes != "" { fmt.Printf("Notes: %s\n", e.Notes) } fmt.Printf("Created: %s\n", e.CreatedAt.Format("2006-01-02 15:04:05")) fmt.Printf("Updated: %s\n", e.UpdatedAt.Format("2006-01-02 15:04:05")) } func cmdOpsEntityDelete(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops entity delete ") os.Exit(1) } db := openOpsDB() defer db.Close() if err := db.DeleteEntity(args[0]); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Entity %s deleted\n", args[0]) } // --- ops relation --- func cmdOpsRelation(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops relation ...") os.Exit(1) } switch args[0] { case "add": cmdOpsRelationAdd(args[1:]) case "list": cmdOpsRelationList(args[1:]) case "show": cmdOpsRelationShow(args[1:]) case "delete": cmdOpsRelationDelete(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown relation command: %s\n", args[0]) os.Exit(1) } } func cmdOpsRelationAdd(args []string) { var r ops.Relation r.Direction = ops.DirUnidirectional r.Status = ops.RelDesigned var tagsStr string for i := 0; i < len(args); i++ { switch args[i] { case "--id": i++; r.ID = args[i] case "--name": i++; r.Name = args[i] case "--from": i++; r.FromEntity = args[i] case "--to": i++; r.ToEntity = args[i] case "--via": i++; r.Via = args[i] case "--direction": i++; r.Direction = ops.Direction(args[i]) case "--status": i++; r.Status = ops.RelationStatus(args[i]) case "--purity": i++; r.Purity = args[i] case "--weight": i++ var w float64 fmt.Sscanf(args[i], "%f", &w) r.Weight = &w case "--description": i++; r.Description = args[i] case "--tags": i++; tagsStr = args[i] case "--notes": i++; r.Notes = args[i] } } if r.Name == "" || r.ToEntity == "" { fmt.Fprintln(os.Stderr, "required: --name, --to (and --from for simple relations)") os.Exit(1) } if r.ID == "" && r.FromEntity != "" { via := "semantic" if r.Via != "" { via = r.Via } r.ID = fmt.Sprintf("%s__to__%s__via__%s", r.FromEntity, r.ToEntity, via) } if tagsStr != "" { r.Tags = strings.Split(tagsStr, ",") } db := openOpsDB() defer db.Close() if err := ops.InsertRelationSafe(db, &r); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Relation %s added\n", r.ID) } func cmdOpsRelationList(args []string) { var entityID string if len(args) > 0 { entityID = args[0] } db := openOpsDB() defer db.Close() rels, err := db.ListRelations(entityID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(rels) == 0 { fmt.Println("No relations.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tNAME\tFROM\tTO\tVIA\tDIRECTION\tSTATUS") for _, r := range rels { from := r.FromEntity if from == "" { from = "(inputs)" } via := r.Via if via == "" { via = "-" } fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", r.ID, r.Name, from, r.ToEntity, via, r.Direction, r.Status) } w.Flush() } func cmdOpsRelationShow(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops relation show ") os.Exit(1) } db := openOpsDB() defer db.Close() r, err := db.GetRelation(args[0]) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if r == nil { fmt.Fprintf(os.Stderr, "relation not found: %s\n", args[0]) os.Exit(1) } fmt.Printf("ID: %s\n", r.ID) fmt.Printf("Name: %s\n", r.Name) fmt.Printf("From: %s\n", r.FromEntity) fmt.Printf("To: %s\n", r.ToEntity) fmt.Printf("Via: %s\n", r.Via) fmt.Printf("Description: %s\n", r.Description) fmt.Printf("Purity: %s\n", r.Purity) fmt.Printf("Direction: %s\n", r.Direction) if r.Weight != nil { fmt.Printf("Weight: %.2f\n", *r.Weight) } fmt.Printf("Status: %s\n", r.Status) fmt.Printf("Tags: %s\n", strings.Join(r.Tags, ", ")) if r.Notes != "" { fmt.Printf("Notes: %s\n", r.Notes) } // Show inputs if any inputs, _ := db.GetRelationInputs(r.ID) if len(inputs) > 0 { fmt.Println("\nInputs:") for _, ri := range inputs { ord := "" if ri.Order != nil { ord = fmt.Sprintf(" (order: %d)", *ri.Order) } fmt.Printf(" %s [%s]%s\n", ri.EntityID, ri.Role, ord) } } } func cmdOpsRelationDelete(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops relation delete ") os.Exit(1) } db := openOpsDB() defer db.Close() if err := db.DeleteRelation(args[0]); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Relation %s deleted\n", args[0]) } // --- ops graph --- func cmdOpsGraph() { db := openOpsDB() defer db.Close() g, err := ops.GetEntityGraph(db) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(g.Entities) == 0 { fmt.Println("Empty graph.") return } fmt.Println("Entities:") for _, e := range g.Entities { fmt.Printf(" [%s] (%s) status:%s source:%s\n", e.ID, e.TypeRef, e.Status, e.Source) } if len(g.Relations) > 0 { fmt.Println("\nRelations:") for _, r := range g.Relations { via := "" if r.Via != "" { via = fmt.Sprintf(" via:%s", r.Via) } inputs, hasInputs := g.Inputs[r.ID] if hasInputs { sources := make([]string, len(inputs)) for i, ri := range inputs { sources[i] = fmt.Sprintf("%s[%s]", ri.EntityID, ri.Role) } fmt.Printf(" (%s) %s → %s%s\n", strings.Join(sources, " + "), r.Name, r.ToEntity, via) } else { from := r.FromEntity if from == "" { from = "?" } dir := "→" if r.Direction == ops.DirBidirectional { dir = "↔" } fmt.Printf(" %s %s %s %s%s\n", from, dir, r.Name, r.ToEntity, via) } } } } // --- ops snapshot --- func cmdOpsSnapshot(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops snapshot [id|--all]") os.Exit(1) } switch args[0] { case "list": cmdOpsSnapshotList() case "check": cmdOpsSnapshotCheck() case "update": cmdOpsSnapshotUpdate(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown snapshot command: %s\n", args[0]) os.Exit(1) } } func cmdOpsSnapshotList() { db := openOpsDB() defer db.Close() snaps, err := db.ListTypeSnapshots() if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(snaps) == 0 { fmt.Println("No type snapshots.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tVERSION\tLANG\tALGEBRAIC\tSNAPPED_AT") for _, s := range snaps { fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", s.ID, s.Version, s.Lang, s.Algebraic, s.SnappedAt.Format("2006-01-02 15:04")) } w.Flush() } func cmdOpsSnapshotCheck() { opsDB := openOpsDB() defer opsDB.Close() regDB := requireRegistryDB() defer regDB.Close() results, err := ops.CheckSnapshots(opsDB, regDB) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(results) == 0 { fmt.Println("No type snapshots to check.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tLOCAL\tREGISTRY\tSTATUS") for _, r := range results { regVer := r.RegistryVersion if regVer == "" { regVer = "-" } status := "✓" switch r.Status { case ops.SnapshotOutdated: status = "← OUTDATED" case ops.SnapshotMissing: status = "⚠ not in registry" } fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", r.ID, r.LocalVersion, regVer, status) } w.Flush() } func cmdOpsSnapshotUpdate(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops snapshot update | --all") os.Exit(1) } opsDB := openOpsDB() defer opsDB.Close() regDB := requireRegistryDB() defer regDB.Close() if args[0] == "--all" { // Update all outdated results, err := ops.CheckSnapshots(opsDB, regDB) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } updated := 0 for _, r := range results { if r.Status != ops.SnapshotOutdated { continue } old, new_, err := ops.UpdateSnapshot(opsDB, regDB, r.ID) if err != nil { fmt.Fprintf(os.Stderr, " error updating %s: %v\n", r.ID, err) continue } printSnapshotDiff(r.ID, old, new_) updated++ } if updated == 0 { fmt.Println("All snapshots are up to date.") } else { fmt.Printf("\n%d snapshot(s) updated.\n", updated) } return } // Update single typeID := args[0] old, new_, err := ops.UpdateSnapshot(opsDB, regDB, typeID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } printSnapshotDiff(typeID, old, new_) } func printSnapshotDiff(id string, old, new_ *ops.TypeSnapshot) { fmt.Printf("Updated %s: %s → %s\n", id, old.Version, new_.Version) if old.Definition != new_.Definition { fmt.Println(" Definition changed:") fmt.Printf(" - %s\n", strings.ReplaceAll(strings.TrimSpace(old.Definition), "\n", "\n - ")) fmt.Printf(" + %s\n", strings.ReplaceAll(strings.TrimSpace(new_.Definition), "\n", "\n + ")) } if old.Description != new_.Description { fmt.Printf(" Description: %q → %q\n", old.Description, new_.Description) } } func requireRegistryDB() *registry.DB { db := tryOpenRegistryDB() if db == nil { fmt.Fprintln(os.Stderr, "error: cannot find registry.db") fmt.Fprintln(os.Stderr, "Set FN_REGISTRY_ROOT or run from the registry directory.") os.Exit(1) } return db } // --- helpers --- func findOpsDB() (string, error) { dir, _ := os.Getwd() for { path := filepath.Join(dir, opsDBName) if _, err := os.Stat(path); err == nil { return path, nil } parent := filepath.Dir(dir) if parent == dir { break } dir = parent } return "", fmt.Errorf("operations.db not found (searched from %s up to /)\nRun 'fn ops init ' to create one inside an app directory", dir) } func openOpsDB() *ops.DB { path, err := findOpsDB() if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } db, err := ops.Open(path) if err != nil { fmt.Fprintf(os.Stderr, "error opening operations.db: %v\n", err) os.Exit(1) } return db } // --- Execution subcommands --- func cmdOpsExecution(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops execution ") os.Exit(1) } switch args[0] { case "add": cmdOpsExecutionAdd(args[1:]) case "list": cmdOpsExecutionList(args[1:]) case "show": cmdOpsExecutionShow(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown execution command: %s\n", args[0]) os.Exit(1) } } func cmdOpsExecutionAdd(args []string) { var id, pipelineID, relationID, status, startedAtStr, endedAtStr, errorMsg, metricsStr string var recordsIn, recordsOut *int64 i := 0 for i < len(args) { switch args[i] { case "--id": i++ id = args[i] case "--pipeline-id": i++ pipelineID = args[i] case "--relation-id": i++ relationID = args[i] case "--status", "-s": i++ status = args[i] case "--started-at": i++ startedAtStr = args[i] case "--ended-at": i++ endedAtStr = args[i] case "--records-in": i++ v := parseInt64(args[i]) recordsIn = &v case "--records-out": i++ v := parseInt64(args[i]) recordsOut = &v case "--error": i++ errorMsg = args[i] case "--metrics": i++ metricsStr = args[i] } i++ } if pipelineID == "" || status == "" { fmt.Fprintln(os.Stderr, "error: --pipeline-id and --status are required") os.Exit(1) } if id == "" { id = fmt.Sprintf("exec_%d", timeNow().UnixNano()) } var startedAt time.Time if startedAtStr != "" { var err error startedAt, err = time.Parse(time.RFC3339, startedAtStr) if err != nil { fmt.Fprintf(os.Stderr, "error: invalid started-at: %v\n", err) os.Exit(1) } } else { startedAt = timeNow() } var endedAt *time.Time if endedAtStr != "" { t, err := time.Parse(time.RFC3339, endedAtStr) if err != nil { fmt.Fprintf(os.Stderr, "error: invalid ended-at: %v\n", err) os.Exit(1) } endedAt = &t } var metrics map[string]any if metricsStr != "" { if err := json.Unmarshal([]byte(metricsStr), &metrics); err != nil { fmt.Fprintf(os.Stderr, "error: invalid metrics JSON: %v\n", err) os.Exit(1) } } e := &ops.Execution{ ID: id, PipelineID: pipelineID, RelationID: relationID, Status: ops.ExecutionStatus(status), StartedAt: startedAt, EndedAt: endedAt, RecordsIn: recordsIn, RecordsOut: recordsOut, Error: errorMsg, Metrics: metrics, } db := openOpsDB() defer db.Close() if err := ops.InsertExecutionSafe(db, e); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Recorded execution: %s\n", e.ID) } func cmdOpsExecutionList(args []string) { var pipelineID, relationID, status string i := 0 for i < len(args) { switch args[i] { case "--pipeline-id": i++ pipelineID = args[i] case "--relation-id": i++ relationID = args[i] case "-s": i++ status = args[i] } i++ } db := openOpsDB() defer db.Close() execs, err := db.ListExecutions(pipelineID, relationID, ops.ExecutionStatus(status)) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(execs) == 0 { fmt.Println("No executions.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tPIPELINE\tSTATUS\tDURATION_MS\tRECORDS_IN\tRECORDS_OUT") for _, e := range execs { dur := "-" if e.DurationMs != nil { dur = fmt.Sprintf("%d", *e.DurationMs) } rin := "-" if e.RecordsIn != nil { rin = fmt.Sprintf("%d", *e.RecordsIn) } rout := "-" if e.RecordsOut != nil { rout = fmt.Sprintf("%d", *e.RecordsOut) } fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", e.ID, e.PipelineID, e.Status, dur, rin, rout) } w.Flush() } func cmdOpsExecutionShow(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops execution show ") os.Exit(1) } db := openOpsDB() defer db.Close() e, err := db.GetExecution(args[0]) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if e == nil { fmt.Fprintf(os.Stderr, "execution %q not found\n", args[0]) os.Exit(1) } fmt.Printf("ID: %s\n", e.ID) fmt.Printf("Pipeline: %s\n", e.PipelineID) if e.RelationID != "" { fmt.Printf("Relation: %s\n", e.RelationID) } fmt.Printf("Status: %s\n", e.Status) fmt.Printf("Started at: %s\n", e.StartedAt.Format(time.RFC3339)) if e.EndedAt != nil { fmt.Printf("Ended at: %s\n", e.EndedAt.Format(time.RFC3339)) } if e.DurationMs != nil { fmt.Printf("Duration: %d ms\n", *e.DurationMs) } if e.RecordsIn != nil { fmt.Printf("Records in: %d\n", *e.RecordsIn) } if e.RecordsOut != nil { fmt.Printf("Records out: %d\n", *e.RecordsOut) } if e.Error != "" { fmt.Printf("Error: %s\n", e.Error) } if len(e.Metrics) > 0 { m, _ := json.MarshalIndent(e.Metrics, " ", " ") fmt.Printf("Metrics: %s\n", string(m)) } } // --- Assertion subcommands --- func cmdOpsAssertion(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops assertion ") os.Exit(1) } switch args[0] { case "add": cmdOpsAssertionAdd(args[1:]) case "list": cmdOpsAssertionList(args[1:]) case "show": cmdOpsAssertionShow(args[1:]) case "delete": cmdOpsAssertionDelete(args[1:]) case "eval": cmdOpsAssertionEval(args[1:]) case "result": cmdOpsAssertionResult(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown assertion command: %s\n", args[0]) os.Exit(1) } } func cmdOpsAssertionAdd(args []string) { var id, entityID, name, kind, rule, severity, description string i := 0 for i < len(args) { switch args[i] { case "--id": i++ id = args[i] case "--entity-id": i++ entityID = args[i] case "--name": i++ name = args[i] case "--kind": i++ kind = args[i] case "--rule": i++ rule = args[i] case "--severity": i++ severity = args[i] case "--description": i++ description = args[i] } i++ } if entityID == "" || name == "" || kind == "" || rule == "" { fmt.Fprintln(os.Stderr, "error: --entity-id, --name, --kind, and --rule are required") os.Exit(1) } if id == "" { id = fmt.Sprintf("assert_%s_%d", name, timeNow().UnixNano()) } if severity == "" { severity = "warning" } a := &ops.Assertion{ ID: id, EntityID: entityID, Name: name, Kind: kind, Rule: rule, Severity: ops.Severity(severity), Description: description, Active: true, } db := openOpsDB() defer db.Close() if err := ops.InsertAssertionSafe(db, a); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Created assertion: %s\n", a.ID) } func cmdOpsAssertionList(args []string) { var entityID string var activeFilter *bool i := 0 for i < len(args) { switch args[i] { case "--entity-id": i++ entityID = args[i] case "--active": v := true activeFilter = &v case "--inactive": v := false activeFilter = &v } i++ } db := openOpsDB() defer db.Close() assertions, err := db.ListAssertions(entityID, activeFilter) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(assertions) == 0 { fmt.Println("No assertions.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tENTITY\tNAME\tKIND\tSEVERITY\tACTIVE") for _, a := range assertions { active := "yes" if !a.Active { active = "no" } fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", a.ID, a.EntityID, a.Name, a.Kind, a.Severity, active) } w.Flush() } func cmdOpsAssertionShow(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops assertion show ") os.Exit(1) } db := openOpsDB() defer db.Close() a, err := db.GetAssertion(args[0]) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if a == nil { fmt.Fprintf(os.Stderr, "assertion %q not found\n", args[0]) os.Exit(1) } active := "yes" if !a.Active { active = "no" } fmt.Printf("ID: %s\n", a.ID) fmt.Printf("Entity: %s\n", a.EntityID) fmt.Printf("Name: %s\n", a.Name) fmt.Printf("Kind: %s\n", a.Kind) fmt.Printf("Rule: %s\n", a.Rule) fmt.Printf("Severity: %s\n", a.Severity) fmt.Printf("Description: %s\n", a.Description) fmt.Printf("Active: %s\n", active) fmt.Printf("Created: %s\n", a.CreatedAt.Format(time.RFC3339)) // Show recent results results, err := db.ListAssertionResults(a.ID, "") if err == nil && len(results) > 0 { fmt.Printf("\nRecent results:\n") limit := 5 if len(results) < limit { limit = len(results) } for _, r := range results[:limit] { fmt.Printf(" [%s] %s — %s %s\n", r.EvaluatedAt.Format(time.RFC3339), r.Status, r.Message, formatResultValue(r.Value)) } } } func cmdOpsAssertionDelete(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops assertion delete ") os.Exit(1) } db := openOpsDB() defer db.Close() if err := db.DeleteAssertion(args[0]); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Deleted assertion: %s\n", args[0]) } func cmdOpsAssertionEval(args []string) { var entityID, executionID string react := false i := 0 for i < len(args) { switch args[i] { case "--entity-id": i++ entityID = args[i] case "--execution-id": i++ executionID = args[i] case "--react": react = true } i++ } if entityID == "" { fmt.Fprintln(os.Stderr, "error: --entity-id is required") os.Exit(1) } db := openOpsDB() defer db.Close() results, err := ops.EvalEntityAssertions(db, entityID, executionID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(results) == 0 { fmt.Println("No active assertions for this entity.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ASSERTION\tSTATUS\tMESSAGE") for _, r := range results { fmt.Fprintf(w, "%s\t%s\t%s\n", r.AssertionID, r.Status, truncate(r.Message, 60)) } w.Flush() // Summary pass, fail, skip := 0, 0, 0 for _, r := range results { switch r.Status { case ops.ResultPass: pass++ case ops.ResultFail: fail++ case ops.ResultSkip: skip++ } } fmt.Printf("\nResults: %d pass, %d fail, %d skip\n", pass, fail, skip) // Reactive loop: apply severity rules if react && fail > 0 { var regDB *registry.DB regDB = tryOpenRegistryDB() // regDB can be nil — React handles it (just won't create proposals) rr, err := ops.React(db, regDB, results) if regDB != nil { defer regDB.Close() } if err != nil { fmt.Fprintf(os.Stderr, "error in reactive loop: %v\n", err) os.Exit(1) } if len(rr.EntityUpdates) > 0 { fmt.Println("\nEntity status changes:") for _, u := range rr.EntityUpdates { fmt.Printf(" %s: %s -> %s (%s)\n", u.EntityID, u.OldStatus, u.NewStatus, u.Reason) } } if len(rr.Proposals) > 0 { fmt.Println("\nProposals created:") for _, pid := range rr.Proposals { fmt.Printf(" %s\n", pid) } } } else if !react && fail > 0 { fmt.Println("\nTip: use --react to apply severity rules (update entity status, create proposals)") } } // --- Assertion Result subcommands --- func cmdOpsAssertionResult(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops assertion result ") os.Exit(1) } switch args[0] { case "add": cmdOpsAssertionResultAdd(args[1:]) case "list": cmdOpsAssertionResultList(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown assertion result command: %s\n", args[0]) os.Exit(1) } } func cmdOpsAssertionResultAdd(args []string) { var id, assertionID, executionID, status, valueStr, message string i := 0 for i < len(args) { switch args[i] { case "--id": i++ id = args[i] case "--assertion-id": i++ assertionID = args[i] case "--execution-id": i++ executionID = args[i] case "--status": i++ status = args[i] case "--value": i++ valueStr = args[i] case "--message": i++ message = args[i] } i++ } if assertionID == "" || status == "" { fmt.Fprintln(os.Stderr, "error: --assertion-id and --status are required") os.Exit(1) } if id == "" { id = fmt.Sprintf("ar_%d", timeNow().UnixNano()) } var value map[string]any if valueStr != "" { if err := json.Unmarshal([]byte(valueStr), &value); err != nil { fmt.Fprintf(os.Stderr, "error: invalid value JSON: %v\n", err) os.Exit(1) } } ar := &ops.AssertionResult{ ID: id, AssertionID: assertionID, ExecutionID: executionID, Status: ops.AssertionResultStatus(status), Value: value, Message: message, EvaluatedAt: timeNow(), } db := openOpsDB() defer db.Close() if err := db.InsertAssertionResult(ar); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Recorded result: %s (%s)\n", ar.ID, ar.Status) } func cmdOpsAssertionResultList(args []string) { var assertionID, executionID string i := 0 for i < len(args) { switch args[i] { case "--assertion-id": i++ assertionID = args[i] case "--execution-id": i++ executionID = args[i] } i++ } db := openOpsDB() defer db.Close() results, err := db.ListAssertionResults(assertionID, executionID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(results) == 0 { fmt.Println("No assertion results.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ID\tASSERTION\tEXECUTION\tSTATUS\tEVALUATED_AT") for _, r := range results { fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", r.ID, r.AssertionID, r.ExecutionID, r.Status, r.EvaluatedAt.Format(time.RFC3339)) } w.Flush() } // --- helpers for new commands --- func timeNow() time.Time { return time.Now().UTC() } func parseInt64(s string) int64 { var v int64 fmt.Sscanf(s, "%d", &v) return v } func formatResultValue(v map[string]any) string { if len(v) == 0 { return "" } b, _ := json.Marshal(v) s := string(b) if len(s) > 40 { return s[:37] + "..." } return s } // --- Log subcommands --- func cmdOpsLog(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops log ") os.Exit(1) } switch args[0] { case "add": cmdOpsLogAdd(args[1:]) case "list": cmdOpsLogList(args[1:]) case "show": cmdOpsLogShow(args[1:]) default: fmt.Fprintf(os.Stderr, "unknown log command: %s\n", args[0]) os.Exit(1) } } func cmdOpsLogAdd(args []string) { var id, level, source, entityID, executionID, message, metadataStr string i := 0 for i < len(args) { switch args[i] { case "--id": i++ id = args[i] case "--level": i++ level = args[i] case "--source": i++ source = args[i] case "--entity-id": i++ entityID = args[i] case "--execution-id": i++ executionID = args[i] case "--message", "-m": i++ message = args[i] case "--metadata": i++ metadataStr = args[i] } i++ } if message == "" { // Read from stdin if no message flag b, err := io.ReadAll(os.Stdin) if err == nil && len(b) > 0 { message = strings.TrimSpace(string(b)) } } if message == "" { fmt.Fprintln(os.Stderr, "error: --message is required (or pipe to stdin)") os.Exit(1) } if level == "" { level = "info" } if id == "" { id = fmt.Sprintf("log_%d", timeNow().UnixNano()) } var metadata map[string]any if metadataStr != "" { if err := json.Unmarshal([]byte(metadataStr), &metadata); err != nil { fmt.Fprintf(os.Stderr, "error: invalid metadata JSON: %v\n", err) os.Exit(1) } } l := &ops.Log{ ID: id, Level: ops.LogLevel(level), Source: source, EntityID: entityID, ExecutionID: executionID, Message: message, Metadata: metadata, } db := openOpsDB() defer db.Close() if err := db.InsertLog(l); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } fmt.Printf("Logged: [%s] %s\n", l.Level, truncate(l.Message, 60)) } func cmdOpsLogList(args []string) { var level, source, entityID, executionID string limit := 50 i := 0 for i < len(args) { switch args[i] { case "--level": i++ level = args[i] case "--source": i++ source = args[i] case "--entity-id": i++ entityID = args[i] case "--execution-id": i++ executionID = args[i] case "--limit", "-n": i++ limit = int(parseInt64(args[i])) } i++ } db := openOpsDB() defer db.Close() logs, err := db.ListLogs(ops.LogLevel(level), source, entityID, executionID, limit) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if len(logs) == 0 { fmt.Println("No logs.") return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "CREATED_AT\tLEVEL\tSOURCE\tMESSAGE") for _, l := range logs { fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", l.CreatedAt.Format(time.RFC3339), l.Level, l.Source, truncate(l.Message, 60)) } w.Flush() } func cmdOpsLogShow(args []string) { if len(args) < 1 { fmt.Fprintln(os.Stderr, "usage: fn ops log show ") os.Exit(1) } db := openOpsDB() defer db.Close() l, err := db.GetLog(args[0]) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if l == nil { fmt.Fprintf(os.Stderr, "log %q not found\n", args[0]) os.Exit(1) } fmt.Printf("ID: %s\n", l.ID) fmt.Printf("Level: %s\n", l.Level) fmt.Printf("Source: %s\n", l.Source) if l.EntityID != "" { fmt.Printf("Entity: %s\n", l.EntityID) } if l.ExecutionID != "" { fmt.Printf("Execution: %s\n", l.ExecutionID) } fmt.Printf("Message: %s\n", l.Message) if len(l.Metadata) > 0 { m, _ := json.MarshalIndent(l.Metadata, " ", " ") fmt.Printf("Metadata: %s\n", string(m)) } fmt.Printf("Created: %s\n", l.CreatedAt.Format(time.RFC3339)) } func tryOpenRegistryDB() *registry.DB { // Try FN_REGISTRY_ROOT env var first if envRoot := os.Getenv("FN_REGISTRY_ROOT"); envRoot != "" { path := filepath.Join(envRoot, dbName) if _, err := os.Stat(path); err == nil { db, err := registry.Open(path) if err == nil { return db } } } // Try root() (finds go.mod walking up from cwd) path := filepath.Join(root(), dbName) if _, err := os.Stat(path); err == nil { db, err := registry.Open(path) if err == nil { return db } } // Try executable's directory if exe, err := os.Executable(); err == nil { exeDir := filepath.Dir(exe) path := filepath.Join(exeDir, dbName) if _, err := os.Stat(path); err == nil { db, err := registry.Open(path) if err == nil { return db } } } return nil }