Files
call_monitor/main.go
T
egutierrez fa2d4a3177 chore: auto-commit (7 archivos)
- app.md
- call_monitor
- main.go
- operations.db
- operations.db-shm
- operations.db-wal
- daemon.go

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 00:31:34 +02:00

284 lines
8.9 KiB
Go

// call_monitor: telemetria de invocaciones del agente al fn_registry.
// Issue 0085. Persiste eventos en operations.db local. Hook PostToolUse (0085b)
// y wrapper Python (0085c) escriben aqui. registry_dashboard lee via sqlite_api.
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
)
const defaultDBName = "operations.db"
func main() {
if len(os.Args) < 2 {
usage()
os.Exit(2)
}
sub := os.Args[1]
fs := flag.NewFlagSet(sub, flag.ExitOnError)
dbPath := fs.String("db", "", "Path to operations.db (default: ./operations.db relative to this binary's directory).")
switch sub {
case "init":
fs.Parse(os.Args[2:])
runInit(resolveDB(*dbPath))
case "status":
topN := fs.Int("top", 10, "Top N functions to list.")
fs.Parse(os.Args[2:])
runStatus(resolveDB(*dbPath), *topN)
case "snapshot":
registry := fs.String("registry", "", "Path to registry.db (default: walk up from cwd until found).")
fs.Parse(os.Args[2:])
runSnapshot(resolveDB(*dbPath), *registry)
case "copied-code":
root := fs.String("root", "", "Path to fn_registry root (default: walk up from cwd until registry.db found).")
fs.Parse(os.Args[2:])
runCopiedCode(resolveDB(*dbPath), *root)
case "propose":
root := fs.String("root", "", "Path to fn_registry root (default: walk up from cwd).")
dry := fs.Bool("dry-run", false, "Generate drafts without persisting to registry.db.proposals.")
fs.Parse(os.Args[2:])
runPropose(*root, *dry)
case "sequences":
detect := fs.Bool("detect", false, "Run sequence detection (required).")
report := fs.Bool("report", false, "Print human-readable report (implied when --propose is not set).")
propose := fs.Bool("propose", false, "Write new_pipeline proposals to registry.db for uncovered candidates.")
formatJSON := fs.Bool("format", false, "Output JSON instead of text (use with --report or --detect).")
root := fs.String("root", "", "Path to fn_registry root (default: walk up from cwd).")
minOcc := fs.Int("min-occurrences", 5, "Minimum occurrences to be a candidate.")
windowSecs := fs.Int("window-secs", 30, "Max gap in seconds between consecutive calls in a sequence.")
lookbackDays := fs.Int("lookback-days", 30, "How many days of calls to scan.")
fs.Parse(os.Args[2:])
cfg := SequenceConfig{
WindowSecs: *windowSecs,
LookbackDays: *lookbackDays,
MinOccurrences: *minOcc,
MinSessions: 2,
MinSuccessRate: 0.9,
}
runSequences(resolveDB(*dbPath), *detect, *report, *propose, *formatJSON, *root, cfg)
case "cluster-patterns":
minOcc := fs.Int("min-occurrences", 3, "Minimum occurrences to keep a cluster.")
lookback := fs.Int("lookback-days", 30, "How many days of calls to scan.")
persist := fs.Bool("persist", false, "Upsert clusters into the patterns table.")
formatJSON := fs.Bool("json", false, "Output JSON instead of text.")
toolsCSV := fs.String("tools", "heredoc_py,heredoc_bash,sqlite_direct,bash_other", "Comma-separated tool_used values to consider.")
fs.Parse(os.Args[2:])
tools := splitCSV(*toolsCSV)
runClusterPatterns(resolveDB(*dbPath), ClusterConfig{
MinOccurrences: *minOcc,
LookbackDays: *lookback,
Tools: tools,
}, *persist, *formatJSON)
case "daemon":
registry := fs.String("registry", "", "Path to registry.db (default: walk up from cwd until found).")
interval := fs.Duration("interval", 5*time.Minute, "How often to run snapshot+sequences cycle.")
fs.Parse(os.Args[2:])
runDaemon(resolveDB(*dbPath), *registry, *interval)
case "-h", "--help", "help":
usage()
default:
fmt.Fprintf(os.Stderr, "unknown subcommand: %s\n\n", sub)
usage()
os.Exit(2)
}
}
func usage() {
fmt.Fprintln(os.Stderr, `call_monitor — telemetria de invocaciones del agente al fn_registry
USO:
call_monitor <subcomando> [flags]
SUBCOMANDOS:
init Crea/abre operations.db y aplica migraciones (idempotente).
status Resumen: conteo de filas por tabla + top funciones por calls_total.
snapshot Lee registry.db.functions y snapshotea (function_id, content_hash) en
function_versions con source='index'. Idempotente: solo inserta nuevas tuplas.
sequences Detecta secuencias A→B(→C) repetidas candidatas a pipeline one-shot.
--detect Ejecutar deteccion (obligatorio).
--report Imprimir reporte human-readable (default cuando no se usa --propose).
--propose Escribir proposals new_pipeline a registry.db.
--format Output JSON en vez de texto.
--min-occurrences N Minimo de ocurrencias para ser candidata (default 5).
--window-secs N Max gap en segundos entre calls consecutivas (default 30).
--lookback-days N Dias de historial a escanear (default 30).
FLAGS GLOBALES:
--db PATH Ruta a operations.db (default: ./operations.db junto al binario).
--registry PATH (subcomando snapshot) Ruta a registry.db. Default: walk up.
EJEMPLOS:
call_monitor init
call_monitor status --top 20
call_monitor snapshot
call_monitor snapshot --registry /home/lucas/fn_registry/registry.db
call_monitor sequences --detect --report
call_monitor sequences --detect --propose
call_monitor sequences --detect --report --format --min-occurrences 3`)
}
func resolveRegistryDB(override string) string {
if override != "" {
return override
}
exe, err := os.Executable()
if err != nil {
return "registry.db"
}
dir := filepath.Dir(exe)
for {
candidate := filepath.Join(dir, "registry.db")
if _, err := os.Stat(candidate); err == nil {
return candidate
}
parent := filepath.Dir(dir)
if parent == dir {
break
}
dir = parent
}
return "registry.db"
}
func resolveRegistryRoot(override string) string {
if override != "" {
return override
}
exe, err := os.Executable()
if err != nil {
return "."
}
dir := filepath.Dir(exe)
for {
if _, err := os.Stat(filepath.Join(dir, "registry.db")); err == nil {
return dir
}
parent := filepath.Dir(dir)
if parent == dir {
break
}
dir = parent
}
return "."
}
func runCopiedCode(callDBPath, rootOverride string) {
db, err := openDB(callDBPath)
if err != nil {
fmt.Fprintf(os.Stderr, "error: open call_monitor db: %v\n", err)
os.Exit(1)
}
defer db.Close()
root := resolveRegistryRoot(rootOverride)
if _, err := os.Stat(filepath.Join(root, "registry.db")); err != nil {
fmt.Fprintf(os.Stderr, "error: registry.db not found under %s\n", root)
os.Exit(1)
}
inserted, total, err := persistCopiedCode(db, root)
if err != nil {
fmt.Fprintf(os.Stderr, "error: copied-code: %v\n", err)
os.Exit(1)
}
fmt.Printf("copied-code: %d total match(es) detected, %d newly inserted into copied_code.\n", total, inserted)
}
func runSnapshot(callDBPath, registryOverride string) {
db, err := openDB(callDBPath)
if err != nil {
fmt.Fprintf(os.Stderr, "error: open call_monitor db: %v\n", err)
os.Exit(1)
}
defer db.Close()
registryPath := resolveRegistryDB(registryOverride)
if _, err := os.Stat(registryPath); err != nil {
fmt.Fprintf(os.Stderr, "error: registry.db not found at %s\n", registryPath)
os.Exit(1)
}
inserted, seen, err := snapshotFromRegistry(db, registryPath)
if err != nil {
fmt.Fprintf(os.Stderr, "error: snapshot: %v\n", err)
os.Exit(1)
}
fmt.Printf("snapshot: %d new versions inserted (out of %d functions seen with content_hash)\n", inserted, seen)
}
func resolveDB(override string) string {
if override != "" {
return override
}
exe, err := os.Executable()
if err == nil {
return filepath.Join(filepath.Dir(exe), defaultDBName)
}
return defaultDBName
}
func runInit(path string) {
db, err := openDB(path)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
defer db.Close()
abs, _ := filepath.Abs(path)
fmt.Printf("call_monitor.operations.db ready: %s\n", abs)
}
func runStatus(path string, topN int) {
db, err := openDB(path)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
defer db.Close()
counts, err := db.tableCounts()
if err != nil {
fmt.Fprintf(os.Stderr, "table counts: %v\n", err)
os.Exit(1)
}
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(tw, "=== Tables ===")
fmt.Fprintln(tw, "TABLE\tROWS")
for _, c := range counts {
fmt.Fprintf(tw, "%s\t%d\n", c.Name, c.Rows)
}
tw.Flush()
top, err := db.topFunctions(topN)
if err != nil {
fmt.Fprintf(os.Stderr, "top functions: %v\n", err)
os.Exit(1)
}
fmt.Println()
tw = tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintf(tw, "=== Top %d functions (by calls_total) ===\n", topN)
if len(top) == 0 {
fmt.Fprintln(tw, "(no calls recorded yet)")
tw.Flush()
return
}
fmt.Fprintln(tw, "FUNCTION_ID\tCALLS\tCALLS_7D\tERRORS\tERROR_RATE\tMEAN_MS\tLAST_USED_TS")
for _, s := range top {
last := ""
if s.LastUsedAt.Valid {
last = fmt.Sprintf("%d", s.LastUsedAt.Int64)
}
fmt.Fprintf(tw, "%s\t%d\t%d\t%d\t%.2f\t%.0f\t%s\n",
s.FunctionID, s.CallsTotal, s.Calls7d, s.ErrorsTotal, s.ErrorRate, s.MeanDurationMs, last)
}
tw.Flush()
}