Files
egutierrez a4df450d41 fix: findOpsDB falla con error en vez de crear operations.db en la raíz
Antes, si no encontraba operations.db subiendo directorios, hacía
fallback silencioso a ./operations.db — lo que creaba la BD en la raíz
violando la regla de db_locations. Ahora retorna error explícito
indicando que se debe ejecutar fn ops init en el directorio correcto.

También elimina operations.db espuria de la raíz (2 executions de
metabase_registry creadas por el fallback).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:29:47 +02:00

1632 lines
36 KiB
Go

package main
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"text/tabwriter"
"time"
ops "fn-registry/fn_operations"
"fn-registry/registry"
)
const opsDBName = "operations.db"
func cmdOps(args []string) {
if len(args) < 1 {
printOpsUsage()
os.Exit(1)
}
switch args[0] {
case "init":
cmdOpsInit(args[1:])
case "entity":
cmdOpsEntity(args[1:])
case "relation":
cmdOpsRelation(args[1:])
case "graph":
cmdOpsGraph()
case "snapshot":
cmdOpsSnapshot(args[1:])
case "execution":
cmdOpsExecution(args[1:])
case "assertion":
cmdOpsAssertion(args[1:])
case "log":
cmdOpsLog(args[1:])
case "help", "-h", "--help":
printOpsUsage()
default:
fmt.Fprintf(os.Stderr, "unknown ops command: %s\n", args[0])
printOpsUsage()
os.Exit(1)
}
}
func printOpsUsage() {
fmt.Println(`fn ops — operations CLI
Usage:
fn ops init [path] Crea operations.db en path (default: .)
fn ops entity add <flags> Añade entity
fn ops entity list [-d domain] [-s status] Lista entities
fn ops entity show <id> Muestra entity
fn ops entity delete <id> Elimina entity
fn ops relation add <flags> Añade relation
fn ops relation list [entity_id] Lista relations
fn ops relation show <id> Muestra relation
fn ops relation delete <id> Elimina relation
fn ops graph Grafo ASCII de entities y relations
fn ops snapshot list Lista tipos snapshotted
fn ops snapshot check Compara snapshots vs registry
fn ops snapshot update <id>|--all Re-snapshot desde registry
Entity flags:
--id <id> --name <name> --type-ref <type_id> --source <source>
--domain <domain> --status <status> --description <desc>
--tags <t1,t2> --metadata <json> --notes <text>
Relation flags:
--id <id> --name <name> --from <entity_id> --to <entity_id>
--via <function_id> --direction <uni|bi|inverse> --status <status>
--purity <pure|impure> --weight <0.0-1.0> --description <desc>
--tags <t1,t2> --notes <text>
Execution commands:
fn ops execution add <flags> Registra ejecucion
fn ops execution list [--pipeline-id <id>] [-s status]
fn ops execution show <id> Muestra ejecucion
Assertion commands:
fn ops assertion add <flags> Añade assertion
fn ops assertion list [--entity-id <id>] Lista assertions
fn ops assertion show <id> Muestra assertion
fn ops assertion delete <id> Elimina assertion
fn ops assertion eval --entity-id <id> Evalua assertions activas
fn ops assertion result add <flags> Registra resultado manual
fn ops assertion result list [--assertion-id <id>]
Log commands:
fn ops log add <flags> Registra log entry
fn ops log list [--level <lvl>] [--source <src>] [--limit N]
fn ops log show <id> Muestra log entry
Log flags:
--id <id> --level <debug|info|warn|error> --source <src>
--entity-id <id> --execution-id <id> --message <text> --metadata <json>`)
}
// --- ops init ---
func cmdOpsInit(args []string) {
dir := "."
if len(args) > 0 {
dir = args[0]
}
path := filepath.Join(dir, opsDBName)
if _, err := os.Stat(path); err == nil {
fmt.Fprintf(os.Stderr, "operations.db already exists at %s\n", path)
os.Exit(1)
}
// Copy from template if available, otherwise create fresh
templatePath := filepath.Join(root(), "fn_operations", "project_template", "operations.db")
if _, err := os.Stat(templatePath); err == nil {
src, err := os.Open(templatePath)
if err != nil {
fmt.Fprintf(os.Stderr, "error opening template: %v\n", err)
os.Exit(1)
}
defer src.Close()
if err := os.MkdirAll(dir, 0o755); err != nil {
fmt.Fprintf(os.Stderr, "error creating directory: %v\n", err)
os.Exit(1)
}
dst, err := os.Create(path)
if err != nil {
fmt.Fprintf(os.Stderr, "error creating db: %v\n", err)
os.Exit(1)
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
fmt.Fprintf(os.Stderr, "error copying template: %v\n", err)
os.Exit(1)
}
fmt.Printf("operations.db created at %s (from template)\n", path)
return
}
// Create fresh
db, err := ops.Open(path)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
db.Close()
fmt.Printf("operations.db created at %s\n", path)
}
// --- ops entity ---
func cmdOpsEntity(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops entity <add|list|show|delete> ...")
os.Exit(1)
}
switch args[0] {
case "add":
cmdOpsEntityAdd(args[1:])
case "list":
cmdOpsEntityList(args[1:])
case "show":
cmdOpsEntityShow(args[1:])
case "delete":
cmdOpsEntityDelete(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown entity command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsEntityAdd(args []string) {
var e ops.Entity
e.Status = ops.StatusActive
var tagsStr, metadataStr string
for i := 0; i < len(args); i++ {
switch args[i] {
case "--id":
i++; e.ID = args[i]
case "--name":
i++; e.Name = args[i]
case "--type-ref":
i++; e.TypeRef = args[i]
case "--source":
i++; e.Source = args[i]
case "--domain":
i++; e.Domain = args[i]
case "--status":
i++; e.Status = ops.EntityStatus(args[i])
case "--description":
i++; e.Description = args[i]
case "--tags":
i++; tagsStr = args[i]
case "--metadata":
i++; metadataStr = args[i]
case "--notes":
i++; e.Notes = args[i]
}
}
if e.Name == "" || e.TypeRef == "" || e.Source == "" {
fmt.Fprintln(os.Stderr, "required: --name, --type-ref, --source")
os.Exit(1)
}
if e.ID == "" {
e.ID = e.Name
}
if tagsStr != "" {
e.Tags = strings.Split(tagsStr, ",")
}
if metadataStr != "" {
json.Unmarshal([]byte(metadataStr), &e.Metadata)
}
opsDB := openOpsDB()
defer opsDB.Close()
// Try to open registry for type snapshot
regDB := tryOpenRegistryDB()
if regDB != nil {
defer regDB.Close()
}
if err := ops.InsertEntityWithSnapshot(opsDB, regDB, &e); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Entity %s added\n", e.ID)
}
func cmdOpsEntityList(args []string) {
var domain string
var status ops.EntityStatus
for i := 0; i < len(args); i++ {
switch args[i] {
case "-d":
i++; domain = args[i]
case "-s":
i++; status = ops.EntityStatus(args[i])
}
}
db := openOpsDB()
defer db.Close()
entities, err := db.ListEntities(domain, status)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(entities) == 0 {
fmt.Println("No entities.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tTYPE_REF\tSTATUS\tSOURCE\tDOMAIN")
for _, e := range entities {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", e.ID, e.TypeRef, e.Status, e.Source, e.Domain)
}
w.Flush()
}
func cmdOpsEntityShow(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops entity show <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
e, err := db.GetEntity(args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if e == nil {
fmt.Fprintf(os.Stderr, "entity not found: %s\n", args[0])
os.Exit(1)
}
fmt.Printf("ID: %s\n", e.ID)
fmt.Printf("Name: %s\n", e.Name)
fmt.Printf("Type ref: %s\n", e.TypeRef)
fmt.Printf("Status: %s\n", e.Status)
fmt.Printf("Source: %s\n", e.Source)
fmt.Printf("Domain: %s\n", e.Domain)
fmt.Printf("Description: %s\n", e.Description)
fmt.Printf("Tags: %s\n", strings.Join(e.Tags, ", "))
if len(e.Metadata) > 0 {
meta, _ := json.MarshalIndent(e.Metadata, " ", " ")
fmt.Printf("Metadata: %s\n", meta)
}
if e.Notes != "" {
fmt.Printf("Notes: %s\n", e.Notes)
}
fmt.Printf("Created: %s\n", e.CreatedAt.Format("2006-01-02 15:04:05"))
fmt.Printf("Updated: %s\n", e.UpdatedAt.Format("2006-01-02 15:04:05"))
}
func cmdOpsEntityDelete(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops entity delete <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
if err := db.DeleteEntity(args[0]); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Entity %s deleted\n", args[0])
}
// --- ops relation ---
func cmdOpsRelation(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops relation <add|list|show|delete> ...")
os.Exit(1)
}
switch args[0] {
case "add":
cmdOpsRelationAdd(args[1:])
case "list":
cmdOpsRelationList(args[1:])
case "show":
cmdOpsRelationShow(args[1:])
case "delete":
cmdOpsRelationDelete(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown relation command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsRelationAdd(args []string) {
var r ops.Relation
r.Direction = ops.DirUnidirectional
r.Status = ops.RelDesigned
var tagsStr string
for i := 0; i < len(args); i++ {
switch args[i] {
case "--id":
i++; r.ID = args[i]
case "--name":
i++; r.Name = args[i]
case "--from":
i++; r.FromEntity = args[i]
case "--to":
i++; r.ToEntity = args[i]
case "--via":
i++; r.Via = args[i]
case "--direction":
i++; r.Direction = ops.Direction(args[i])
case "--status":
i++; r.Status = ops.RelationStatus(args[i])
case "--purity":
i++; r.Purity = args[i]
case "--weight":
i++
var w float64
fmt.Sscanf(args[i], "%f", &w)
r.Weight = &w
case "--description":
i++; r.Description = args[i]
case "--tags":
i++; tagsStr = args[i]
case "--notes":
i++; r.Notes = args[i]
}
}
if r.Name == "" || r.ToEntity == "" {
fmt.Fprintln(os.Stderr, "required: --name, --to (and --from for simple relations)")
os.Exit(1)
}
if r.ID == "" && r.FromEntity != "" {
via := "semantic"
if r.Via != "" {
via = r.Via
}
r.ID = fmt.Sprintf("%s__to__%s__via__%s", r.FromEntity, r.ToEntity, via)
}
if tagsStr != "" {
r.Tags = strings.Split(tagsStr, ",")
}
db := openOpsDB()
defer db.Close()
if err := ops.InsertRelationSafe(db, &r); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Relation %s added\n", r.ID)
}
func cmdOpsRelationList(args []string) {
var entityID string
if len(args) > 0 {
entityID = args[0]
}
db := openOpsDB()
defer db.Close()
rels, err := db.ListRelations(entityID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(rels) == 0 {
fmt.Println("No relations.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tNAME\tFROM\tTO\tVIA\tDIRECTION\tSTATUS")
for _, r := range rels {
from := r.FromEntity
if from == "" {
from = "(inputs)"
}
via := r.Via
if via == "" {
via = "-"
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
r.ID, r.Name, from, r.ToEntity, via, r.Direction, r.Status)
}
w.Flush()
}
func cmdOpsRelationShow(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops relation show <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
r, err := db.GetRelation(args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if r == nil {
fmt.Fprintf(os.Stderr, "relation not found: %s\n", args[0])
os.Exit(1)
}
fmt.Printf("ID: %s\n", r.ID)
fmt.Printf("Name: %s\n", r.Name)
fmt.Printf("From: %s\n", r.FromEntity)
fmt.Printf("To: %s\n", r.ToEntity)
fmt.Printf("Via: %s\n", r.Via)
fmt.Printf("Description: %s\n", r.Description)
fmt.Printf("Purity: %s\n", r.Purity)
fmt.Printf("Direction: %s\n", r.Direction)
if r.Weight != nil {
fmt.Printf("Weight: %.2f\n", *r.Weight)
}
fmt.Printf("Status: %s\n", r.Status)
fmt.Printf("Tags: %s\n", strings.Join(r.Tags, ", "))
if r.Notes != "" {
fmt.Printf("Notes: %s\n", r.Notes)
}
// Show inputs if any
inputs, _ := db.GetRelationInputs(r.ID)
if len(inputs) > 0 {
fmt.Println("\nInputs:")
for _, ri := range inputs {
ord := ""
if ri.Order != nil {
ord = fmt.Sprintf(" (order: %d)", *ri.Order)
}
fmt.Printf(" %s [%s]%s\n", ri.EntityID, ri.Role, ord)
}
}
}
func cmdOpsRelationDelete(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops relation delete <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
if err := db.DeleteRelation(args[0]); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Relation %s deleted\n", args[0])
}
// --- ops graph ---
func cmdOpsGraph() {
db := openOpsDB()
defer db.Close()
g, err := ops.GetEntityGraph(db)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(g.Entities) == 0 {
fmt.Println("Empty graph.")
return
}
fmt.Println("Entities:")
for _, e := range g.Entities {
fmt.Printf(" [%s] (%s) status:%s source:%s\n", e.ID, e.TypeRef, e.Status, e.Source)
}
if len(g.Relations) > 0 {
fmt.Println("\nRelations:")
for _, r := range g.Relations {
via := ""
if r.Via != "" {
via = fmt.Sprintf(" via:%s", r.Via)
}
inputs, hasInputs := g.Inputs[r.ID]
if hasInputs {
sources := make([]string, len(inputs))
for i, ri := range inputs {
sources[i] = fmt.Sprintf("%s[%s]", ri.EntityID, ri.Role)
}
fmt.Printf(" (%s) %s → %s%s\n",
strings.Join(sources, " + "), r.Name, r.ToEntity, via)
} else {
from := r.FromEntity
if from == "" {
from = "?"
}
dir := "→"
if r.Direction == ops.DirBidirectional {
dir = "↔"
}
fmt.Printf(" %s %s %s %s%s\n", from, dir, r.Name, r.ToEntity, via)
}
}
}
}
// --- ops snapshot ---
func cmdOpsSnapshot(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops snapshot <list|check|update> [id|--all]")
os.Exit(1)
}
switch args[0] {
case "list":
cmdOpsSnapshotList()
case "check":
cmdOpsSnapshotCheck()
case "update":
cmdOpsSnapshotUpdate(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown snapshot command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsSnapshotList() {
db := openOpsDB()
defer db.Close()
snaps, err := db.ListTypeSnapshots()
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(snaps) == 0 {
fmt.Println("No type snapshots.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tVERSION\tLANG\tALGEBRAIC\tSNAPPED_AT")
for _, s := range snaps {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
s.ID, s.Version, s.Lang, s.Algebraic, s.SnappedAt.Format("2006-01-02 15:04"))
}
w.Flush()
}
func cmdOpsSnapshotCheck() {
opsDB := openOpsDB()
defer opsDB.Close()
regDB := requireRegistryDB()
defer regDB.Close()
results, err := ops.CheckSnapshots(opsDB, regDB)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(results) == 0 {
fmt.Println("No type snapshots to check.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tLOCAL\tREGISTRY\tSTATUS")
for _, r := range results {
regVer := r.RegistryVersion
if regVer == "" {
regVer = "-"
}
status := "✓"
switch r.Status {
case ops.SnapshotOutdated:
status = "← OUTDATED"
case ops.SnapshotMissing:
status = "⚠ not in registry"
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", r.ID, r.LocalVersion, regVer, status)
}
w.Flush()
}
func cmdOpsSnapshotUpdate(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops snapshot update <type_id> | --all")
os.Exit(1)
}
opsDB := openOpsDB()
defer opsDB.Close()
regDB := requireRegistryDB()
defer regDB.Close()
if args[0] == "--all" {
// Update all outdated
results, err := ops.CheckSnapshots(opsDB, regDB)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
updated := 0
for _, r := range results {
if r.Status != ops.SnapshotOutdated {
continue
}
old, new_, err := ops.UpdateSnapshot(opsDB, regDB, r.ID)
if err != nil {
fmt.Fprintf(os.Stderr, " error updating %s: %v\n", r.ID, err)
continue
}
printSnapshotDiff(r.ID, old, new_)
updated++
}
if updated == 0 {
fmt.Println("All snapshots are up to date.")
} else {
fmt.Printf("\n%d snapshot(s) updated.\n", updated)
}
return
}
// Update single
typeID := args[0]
old, new_, err := ops.UpdateSnapshot(opsDB, regDB, typeID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
printSnapshotDiff(typeID, old, new_)
}
func printSnapshotDiff(id string, old, new_ *ops.TypeSnapshot) {
fmt.Printf("Updated %s: %s → %s\n", id, old.Version, new_.Version)
if old.Definition != new_.Definition {
fmt.Println(" Definition changed:")
fmt.Printf(" - %s\n", strings.ReplaceAll(strings.TrimSpace(old.Definition), "\n", "\n - "))
fmt.Printf(" + %s\n", strings.ReplaceAll(strings.TrimSpace(new_.Definition), "\n", "\n + "))
}
if old.Description != new_.Description {
fmt.Printf(" Description: %q → %q\n", old.Description, new_.Description)
}
}
func requireRegistryDB() *registry.DB {
db := tryOpenRegistryDB()
if db == nil {
fmt.Fprintln(os.Stderr, "error: cannot find registry.db")
fmt.Fprintln(os.Stderr, "Set FN_REGISTRY_ROOT or run from the registry directory.")
os.Exit(1)
}
return db
}
// --- helpers ---
func findOpsDB() (string, error) {
dir, _ := os.Getwd()
for {
path := filepath.Join(dir, opsDBName)
if _, err := os.Stat(path); err == nil {
return path, nil
}
parent := filepath.Dir(dir)
if parent == dir {
break
}
dir = parent
}
return "", fmt.Errorf("operations.db not found (searched from %s up to /)\nRun 'fn ops init <path>' to create one inside an app directory", dir)
}
func openOpsDB() *ops.DB {
path, err := findOpsDB()
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
db, err := ops.Open(path)
if err != nil {
fmt.Fprintf(os.Stderr, "error opening operations.db: %v\n", err)
os.Exit(1)
}
return db
}
// --- Execution subcommands ---
func cmdOpsExecution(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops execution <add|list|show>")
os.Exit(1)
}
switch args[0] {
case "add":
cmdOpsExecutionAdd(args[1:])
case "list":
cmdOpsExecutionList(args[1:])
case "show":
cmdOpsExecutionShow(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown execution command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsExecutionAdd(args []string) {
var id, pipelineID, relationID, status, startedAtStr, endedAtStr, errorMsg, metricsStr string
var recordsIn, recordsOut *int64
i := 0
for i < len(args) {
switch args[i] {
case "--id":
i++
id = args[i]
case "--pipeline-id":
i++
pipelineID = args[i]
case "--relation-id":
i++
relationID = args[i]
case "--status", "-s":
i++
status = args[i]
case "--started-at":
i++
startedAtStr = args[i]
case "--ended-at":
i++
endedAtStr = args[i]
case "--records-in":
i++
v := parseInt64(args[i])
recordsIn = &v
case "--records-out":
i++
v := parseInt64(args[i])
recordsOut = &v
case "--error":
i++
errorMsg = args[i]
case "--metrics":
i++
metricsStr = args[i]
}
i++
}
if pipelineID == "" || status == "" {
fmt.Fprintln(os.Stderr, "error: --pipeline-id and --status are required")
os.Exit(1)
}
if id == "" {
id = fmt.Sprintf("exec_%d", timeNow().UnixNano())
}
var startedAt time.Time
if startedAtStr != "" {
var err error
startedAt, err = time.Parse(time.RFC3339, startedAtStr)
if err != nil {
fmt.Fprintf(os.Stderr, "error: invalid started-at: %v\n", err)
os.Exit(1)
}
} else {
startedAt = timeNow()
}
var endedAt *time.Time
if endedAtStr != "" {
t, err := time.Parse(time.RFC3339, endedAtStr)
if err != nil {
fmt.Fprintf(os.Stderr, "error: invalid ended-at: %v\n", err)
os.Exit(1)
}
endedAt = &t
}
var metrics map[string]any
if metricsStr != "" {
if err := json.Unmarshal([]byte(metricsStr), &metrics); err != nil {
fmt.Fprintf(os.Stderr, "error: invalid metrics JSON: %v\n", err)
os.Exit(1)
}
}
e := &ops.Execution{
ID: id,
PipelineID: pipelineID,
RelationID: relationID,
Status: ops.ExecutionStatus(status),
StartedAt: startedAt,
EndedAt: endedAt,
RecordsIn: recordsIn,
RecordsOut: recordsOut,
Error: errorMsg,
Metrics: metrics,
}
db := openOpsDB()
defer db.Close()
if err := ops.InsertExecutionSafe(db, e); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Recorded execution: %s\n", e.ID)
}
func cmdOpsExecutionList(args []string) {
var pipelineID, relationID, status string
i := 0
for i < len(args) {
switch args[i] {
case "--pipeline-id":
i++
pipelineID = args[i]
case "--relation-id":
i++
relationID = args[i]
case "-s":
i++
status = args[i]
}
i++
}
db := openOpsDB()
defer db.Close()
execs, err := db.ListExecutions(pipelineID, relationID, ops.ExecutionStatus(status))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(execs) == 0 {
fmt.Println("No executions.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tPIPELINE\tSTATUS\tDURATION_MS\tRECORDS_IN\tRECORDS_OUT")
for _, e := range execs {
dur := "-"
if e.DurationMs != nil {
dur = fmt.Sprintf("%d", *e.DurationMs)
}
rin := "-"
if e.RecordsIn != nil {
rin = fmt.Sprintf("%d", *e.RecordsIn)
}
rout := "-"
if e.RecordsOut != nil {
rout = fmt.Sprintf("%d", *e.RecordsOut)
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", e.ID, e.PipelineID, e.Status, dur, rin, rout)
}
w.Flush()
}
func cmdOpsExecutionShow(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops execution show <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
e, err := db.GetExecution(args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if e == nil {
fmt.Fprintf(os.Stderr, "execution %q not found\n", args[0])
os.Exit(1)
}
fmt.Printf("ID: %s\n", e.ID)
fmt.Printf("Pipeline: %s\n", e.PipelineID)
if e.RelationID != "" {
fmt.Printf("Relation: %s\n", e.RelationID)
}
fmt.Printf("Status: %s\n", e.Status)
fmt.Printf("Started at: %s\n", e.StartedAt.Format(time.RFC3339))
if e.EndedAt != nil {
fmt.Printf("Ended at: %s\n", e.EndedAt.Format(time.RFC3339))
}
if e.DurationMs != nil {
fmt.Printf("Duration: %d ms\n", *e.DurationMs)
}
if e.RecordsIn != nil {
fmt.Printf("Records in: %d\n", *e.RecordsIn)
}
if e.RecordsOut != nil {
fmt.Printf("Records out: %d\n", *e.RecordsOut)
}
if e.Error != "" {
fmt.Printf("Error: %s\n", e.Error)
}
if len(e.Metrics) > 0 {
m, _ := json.MarshalIndent(e.Metrics, " ", " ")
fmt.Printf("Metrics: %s\n", string(m))
}
}
// --- Assertion subcommands ---
func cmdOpsAssertion(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops assertion <add|list|show|delete|eval|result>")
os.Exit(1)
}
switch args[0] {
case "add":
cmdOpsAssertionAdd(args[1:])
case "list":
cmdOpsAssertionList(args[1:])
case "show":
cmdOpsAssertionShow(args[1:])
case "delete":
cmdOpsAssertionDelete(args[1:])
case "eval":
cmdOpsAssertionEval(args[1:])
case "result":
cmdOpsAssertionResult(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown assertion command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsAssertionAdd(args []string) {
var id, entityID, name, kind, rule, severity, description string
i := 0
for i < len(args) {
switch args[i] {
case "--id":
i++
id = args[i]
case "--entity-id":
i++
entityID = args[i]
case "--name":
i++
name = args[i]
case "--kind":
i++
kind = args[i]
case "--rule":
i++
rule = args[i]
case "--severity":
i++
severity = args[i]
case "--description":
i++
description = args[i]
}
i++
}
if entityID == "" || name == "" || kind == "" || rule == "" {
fmt.Fprintln(os.Stderr, "error: --entity-id, --name, --kind, and --rule are required")
os.Exit(1)
}
if id == "" {
id = fmt.Sprintf("assert_%s_%d", name, timeNow().UnixNano())
}
if severity == "" {
severity = "warning"
}
a := &ops.Assertion{
ID: id,
EntityID: entityID,
Name: name,
Kind: kind,
Rule: rule,
Severity: ops.Severity(severity),
Description: description,
Active: true,
}
db := openOpsDB()
defer db.Close()
if err := ops.InsertAssertionSafe(db, a); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Created assertion: %s\n", a.ID)
}
func cmdOpsAssertionList(args []string) {
var entityID string
var activeFilter *bool
i := 0
for i < len(args) {
switch args[i] {
case "--entity-id":
i++
entityID = args[i]
case "--active":
v := true
activeFilter = &v
case "--inactive":
v := false
activeFilter = &v
}
i++
}
db := openOpsDB()
defer db.Close()
assertions, err := db.ListAssertions(entityID, activeFilter)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(assertions) == 0 {
fmt.Println("No assertions.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tENTITY\tNAME\tKIND\tSEVERITY\tACTIVE")
for _, a := range assertions {
active := "yes"
if !a.Active {
active = "no"
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", a.ID, a.EntityID, a.Name, a.Kind, a.Severity, active)
}
w.Flush()
}
func cmdOpsAssertionShow(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops assertion show <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
a, err := db.GetAssertion(args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if a == nil {
fmt.Fprintf(os.Stderr, "assertion %q not found\n", args[0])
os.Exit(1)
}
active := "yes"
if !a.Active {
active = "no"
}
fmt.Printf("ID: %s\n", a.ID)
fmt.Printf("Entity: %s\n", a.EntityID)
fmt.Printf("Name: %s\n", a.Name)
fmt.Printf("Kind: %s\n", a.Kind)
fmt.Printf("Rule: %s\n", a.Rule)
fmt.Printf("Severity: %s\n", a.Severity)
fmt.Printf("Description: %s\n", a.Description)
fmt.Printf("Active: %s\n", active)
fmt.Printf("Created: %s\n", a.CreatedAt.Format(time.RFC3339))
// Show recent results
results, err := db.ListAssertionResults(a.ID, "")
if err == nil && len(results) > 0 {
fmt.Printf("\nRecent results:\n")
limit := 5
if len(results) < limit {
limit = len(results)
}
for _, r := range results[:limit] {
fmt.Printf(" [%s] %s — %s %s\n", r.EvaluatedAt.Format(time.RFC3339), r.Status, r.Message, formatResultValue(r.Value))
}
}
}
func cmdOpsAssertionDelete(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops assertion delete <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
if err := db.DeleteAssertion(args[0]); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Deleted assertion: %s\n", args[0])
}
func cmdOpsAssertionEval(args []string) {
var entityID, executionID string
react := false
i := 0
for i < len(args) {
switch args[i] {
case "--entity-id":
i++
entityID = args[i]
case "--execution-id":
i++
executionID = args[i]
case "--react":
react = true
}
i++
}
if entityID == "" {
fmt.Fprintln(os.Stderr, "error: --entity-id is required")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
results, err := ops.EvalEntityAssertions(db, entityID, executionID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(results) == 0 {
fmt.Println("No active assertions for this entity.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ASSERTION\tSTATUS\tMESSAGE")
for _, r := range results {
fmt.Fprintf(w, "%s\t%s\t%s\n", r.AssertionID, r.Status, truncate(r.Message, 60))
}
w.Flush()
// Summary
pass, fail, skip := 0, 0, 0
for _, r := range results {
switch r.Status {
case ops.ResultPass:
pass++
case ops.ResultFail:
fail++
case ops.ResultSkip:
skip++
}
}
fmt.Printf("\nResults: %d pass, %d fail, %d skip\n", pass, fail, skip)
// Reactive loop: apply severity rules
if react && fail > 0 {
var regDB *registry.DB
regDB = tryOpenRegistryDB()
// regDB can be nil — React handles it (just won't create proposals)
rr, err := ops.React(db, regDB, results)
if regDB != nil {
defer regDB.Close()
}
if err != nil {
fmt.Fprintf(os.Stderr, "error in reactive loop: %v\n", err)
os.Exit(1)
}
if len(rr.EntityUpdates) > 0 {
fmt.Println("\nEntity status changes:")
for _, u := range rr.EntityUpdates {
fmt.Printf(" %s: %s -> %s (%s)\n", u.EntityID, u.OldStatus, u.NewStatus, u.Reason)
}
}
if len(rr.Proposals) > 0 {
fmt.Println("\nProposals created:")
for _, pid := range rr.Proposals {
fmt.Printf(" %s\n", pid)
}
}
} else if !react && fail > 0 {
fmt.Println("\nTip: use --react to apply severity rules (update entity status, create proposals)")
}
}
// --- Assertion Result subcommands ---
func cmdOpsAssertionResult(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops assertion result <add|list>")
os.Exit(1)
}
switch args[0] {
case "add":
cmdOpsAssertionResultAdd(args[1:])
case "list":
cmdOpsAssertionResultList(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown assertion result command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsAssertionResultAdd(args []string) {
var id, assertionID, executionID, status, valueStr, message string
i := 0
for i < len(args) {
switch args[i] {
case "--id":
i++
id = args[i]
case "--assertion-id":
i++
assertionID = args[i]
case "--execution-id":
i++
executionID = args[i]
case "--status":
i++
status = args[i]
case "--value":
i++
valueStr = args[i]
case "--message":
i++
message = args[i]
}
i++
}
if assertionID == "" || status == "" {
fmt.Fprintln(os.Stderr, "error: --assertion-id and --status are required")
os.Exit(1)
}
if id == "" {
id = fmt.Sprintf("ar_%d", timeNow().UnixNano())
}
var value map[string]any
if valueStr != "" {
if err := json.Unmarshal([]byte(valueStr), &value); err != nil {
fmt.Fprintf(os.Stderr, "error: invalid value JSON: %v\n", err)
os.Exit(1)
}
}
ar := &ops.AssertionResult{
ID: id,
AssertionID: assertionID,
ExecutionID: executionID,
Status: ops.AssertionResultStatus(status),
Value: value,
Message: message,
EvaluatedAt: timeNow(),
}
db := openOpsDB()
defer db.Close()
if err := db.InsertAssertionResult(ar); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Recorded result: %s (%s)\n", ar.ID, ar.Status)
}
func cmdOpsAssertionResultList(args []string) {
var assertionID, executionID string
i := 0
for i < len(args) {
switch args[i] {
case "--assertion-id":
i++
assertionID = args[i]
case "--execution-id":
i++
executionID = args[i]
}
i++
}
db := openOpsDB()
defer db.Close()
results, err := db.ListAssertionResults(assertionID, executionID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(results) == 0 {
fmt.Println("No assertion results.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ID\tASSERTION\tEXECUTION\tSTATUS\tEVALUATED_AT")
for _, r := range results {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", r.ID, r.AssertionID, r.ExecutionID, r.Status, r.EvaluatedAt.Format(time.RFC3339))
}
w.Flush()
}
// --- helpers for new commands ---
func timeNow() time.Time {
return time.Now().UTC()
}
func parseInt64(s string) int64 {
var v int64
fmt.Sscanf(s, "%d", &v)
return v
}
func formatResultValue(v map[string]any) string {
if len(v) == 0 {
return ""
}
b, _ := json.Marshal(v)
s := string(b)
if len(s) > 40 {
return s[:37] + "..."
}
return s
}
// --- Log subcommands ---
func cmdOpsLog(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops log <add|list|show>")
os.Exit(1)
}
switch args[0] {
case "add":
cmdOpsLogAdd(args[1:])
case "list":
cmdOpsLogList(args[1:])
case "show":
cmdOpsLogShow(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown log command: %s\n", args[0])
os.Exit(1)
}
}
func cmdOpsLogAdd(args []string) {
var id, level, source, entityID, executionID, message, metadataStr string
i := 0
for i < len(args) {
switch args[i] {
case "--id":
i++
id = args[i]
case "--level":
i++
level = args[i]
case "--source":
i++
source = args[i]
case "--entity-id":
i++
entityID = args[i]
case "--execution-id":
i++
executionID = args[i]
case "--message", "-m":
i++
message = args[i]
case "--metadata":
i++
metadataStr = args[i]
}
i++
}
if message == "" {
// Read from stdin if no message flag
b, err := io.ReadAll(os.Stdin)
if err == nil && len(b) > 0 {
message = strings.TrimSpace(string(b))
}
}
if message == "" {
fmt.Fprintln(os.Stderr, "error: --message is required (or pipe to stdin)")
os.Exit(1)
}
if level == "" {
level = "info"
}
if id == "" {
id = fmt.Sprintf("log_%d", timeNow().UnixNano())
}
var metadata map[string]any
if metadataStr != "" {
if err := json.Unmarshal([]byte(metadataStr), &metadata); err != nil {
fmt.Fprintf(os.Stderr, "error: invalid metadata JSON: %v\n", err)
os.Exit(1)
}
}
l := &ops.Log{
ID: id,
Level: ops.LogLevel(level),
Source: source,
EntityID: entityID,
ExecutionID: executionID,
Message: message,
Metadata: metadata,
}
db := openOpsDB()
defer db.Close()
if err := db.InsertLog(l); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Logged: [%s] %s\n", l.Level, truncate(l.Message, 60))
}
func cmdOpsLogList(args []string) {
var level, source, entityID, executionID string
limit := 50
i := 0
for i < len(args) {
switch args[i] {
case "--level":
i++
level = args[i]
case "--source":
i++
source = args[i]
case "--entity-id":
i++
entityID = args[i]
case "--execution-id":
i++
executionID = args[i]
case "--limit", "-n":
i++
limit = int(parseInt64(args[i]))
}
i++
}
db := openOpsDB()
defer db.Close()
logs, err := db.ListLogs(ops.LogLevel(level), source, entityID, executionID, limit)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(logs) == 0 {
fmt.Println("No logs.")
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "CREATED_AT\tLEVEL\tSOURCE\tMESSAGE")
for _, l := range logs {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", l.CreatedAt.Format(time.RFC3339), l.Level, l.Source, truncate(l.Message, 60))
}
w.Flush()
}
func cmdOpsLogShow(args []string) {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "usage: fn ops log show <id>")
os.Exit(1)
}
db := openOpsDB()
defer db.Close()
l, err := db.GetLog(args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if l == nil {
fmt.Fprintf(os.Stderr, "log %q not found\n", args[0])
os.Exit(1)
}
fmt.Printf("ID: %s\n", l.ID)
fmt.Printf("Level: %s\n", l.Level)
fmt.Printf("Source: %s\n", l.Source)
if l.EntityID != "" {
fmt.Printf("Entity: %s\n", l.EntityID)
}
if l.ExecutionID != "" {
fmt.Printf("Execution: %s\n", l.ExecutionID)
}
fmt.Printf("Message: %s\n", l.Message)
if len(l.Metadata) > 0 {
m, _ := json.MarshalIndent(l.Metadata, " ", " ")
fmt.Printf("Metadata: %s\n", string(m))
}
fmt.Printf("Created: %s\n", l.CreatedAt.Format(time.RFC3339))
}
func tryOpenRegistryDB() *registry.DB {
// Try FN_REGISTRY_ROOT env var first
if envRoot := os.Getenv("FN_REGISTRY_ROOT"); envRoot != "" {
path := filepath.Join(envRoot, dbName)
if _, err := os.Stat(path); err == nil {
db, err := registry.Open(path)
if err == nil {
return db
}
}
}
// Try root() (finds go.mod walking up from cwd)
path := filepath.Join(root(), dbName)
if _, err := os.Stat(path); err == nil {
db, err := registry.Open(path)
if err == nil {
return db
}
}
// Try executable's directory
if exe, err := os.Executable(); err == nil {
exeDir := filepath.Dir(exe)
path := filepath.Join(exeDir, dbName)
if _, err := os.Stat(path); err == nil {
db, err := registry.Open(path)
if err == nil {
return db
}
}
}
return nil
}