diff --git a/app.md b/app.md index ff82ef6..f3dfc95 100644 --- a/app.md +++ b/app.md @@ -116,6 +116,28 @@ CGO_ENABLED=1 go build -tags fts5 -o call_monitor . - 0085e..h: clusterizacion, proposals automaticas, gating - p95 en mean_duration_ms via percentile calc o ext. +## Automation (systemd user timer) + +`sequences --detect --propose --report` corre cada 6h via systemd user timer (00:00, 06:00, 12:00, 18:00). Detecta secuencias A→B(→C) repetidas en `calls` y genera proposals `new_pipeline` en `registry.db` (idempotente — dedupea contra proposals existentes). + +Unit files versionados en `systemd/` para sincronizar entre PCs. Activacion en un PC nuevo: + +```bash +cp systemd/call_monitor_sequences.* ~/.config/systemd/user/ +systemctl --user daemon-reload +systemctl --user enable --now call_monitor_sequences.timer +``` + +Verificar: + +```bash +systemctl --user list-timers call_monitor_sequences.timer +systemctl --user status call_monitor_sequences.service +journalctl --user -u call_monitor_sequences.service -n 30 --no-pager +``` + +Para correr manualmente fuera del schedule: `systemctl --user start call_monitor_sequences.service`. + ## Notas - BD vive **junto al binario** (`/operations.db`) por defecto, no en el cwd del agente. Hook puede pasar `--db` explicito si conviene. diff --git a/call_monitor b/call_monitor index aeaa8ce..667a324 100755 Binary files a/call_monitor and b/call_monitor differ diff --git a/db.go b/db.go index f66008b..46973ae 100644 --- a/db.go +++ b/db.go @@ -33,7 +33,7 @@ type tableCount struct { } func (d *DB) tableCounts() ([]tableCount, error) { - tables := []string{"sessions", "calls", "code_writes", "test_runs", "e2e_runs_fn", "violations", "patterns", "function_versions", "copied_code"} + tables := []string{"sessions", "calls", "code_writes", "test_runs", "e2e_runs_fn", "violations", "patterns", "function_versions", "copied_code", "function_sequences"} out := make([]tableCount, 0, len(tables)) for _, t := range tables { var n int64 diff --git a/main.go b/main.go index 5aee331..6a2f677 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,24 @@ func main() { dry := fs.Bool("dry-run", false, "Generate drafts without persisting to registry.db.proposals.") fs.Parse(os.Args[2:]) runPropose(*root, *dry) + case "sequences": + detect := fs.Bool("detect", false, "Run sequence detection (required).") + report := fs.Bool("report", false, "Print human-readable report (implied when --propose is not set).") + propose := fs.Bool("propose", false, "Write new_pipeline proposals to registry.db for uncovered candidates.") + formatJSON := fs.Bool("format", false, "Output JSON instead of text (use with --report or --detect).") + root := fs.String("root", "", "Path to fn_registry root (default: walk up from cwd).") + minOcc := fs.Int("min-occurrences", 5, "Minimum occurrences to be a candidate.") + windowSecs := fs.Int("window-secs", 30, "Max gap in seconds between consecutive calls in a sequence.") + lookbackDays := fs.Int("lookback-days", 30, "How many days of calls to scan.") + fs.Parse(os.Args[2:]) + cfg := SequenceConfig{ + WindowSecs: *windowSecs, + LookbackDays: *lookbackDays, + MinOccurrences: *minOcc, + MinSessions: 2, + MinSuccessRate: 0.9, + } + runSequences(resolveDB(*dbPath), *detect, *report, *propose, *formatJSON, *root, cfg) case "-h", "--help", "help": usage() default: @@ -60,10 +78,18 @@ USO: call_monitor [flags] SUBCOMANDOS: - init Crea/abre operations.db y aplica migraciones (idempotente). - status Resumen: conteo de filas por tabla + top funciones por calls_total. - snapshot Lee registry.db.functions y snapshotea (function_id, content_hash) en - function_versions con source='index'. Idempotente: solo inserta nuevas tuplas. + init Crea/abre operations.db y aplica migraciones (idempotente). + status Resumen: conteo de filas por tabla + top funciones por calls_total. + snapshot Lee registry.db.functions y snapshotea (function_id, content_hash) en + function_versions con source='index'. Idempotente: solo inserta nuevas tuplas. + sequences Detecta secuencias A→B(→C) repetidas candidatas a pipeline one-shot. + --detect Ejecutar deteccion (obligatorio). + --report Imprimir reporte human-readable (default cuando no se usa --propose). + --propose Escribir proposals new_pipeline a registry.db. + --format Output JSON en vez de texto. + --min-occurrences N Minimo de ocurrencias para ser candidata (default 5). + --window-secs N Max gap en segundos entre calls consecutivas (default 30). + --lookback-days N Dias de historial a escanear (default 30). FLAGS GLOBALES: --db PATH Ruta a operations.db (default: ./operations.db junto al binario). @@ -73,7 +99,10 @@ EJEMPLOS: call_monitor init call_monitor status --top 20 call_monitor snapshot - call_monitor snapshot --registry /home/lucas/fn_registry/registry.db`) + call_monitor snapshot --registry /home/lucas/fn_registry/registry.db + call_monitor sequences --detect --report + call_monitor sequences --detect --propose + call_monitor sequences --detect --report --format --min-occurrences 3`) } func resolveRegistryDB(override string) string { diff --git a/migrations/006_function_sequences.sql b/migrations/006_function_sequences.sql new file mode 100644 index 0000000..e2d124b --- /dev/null +++ b/migrations/006_function_sequences.sql @@ -0,0 +1,18 @@ +-- function_sequences: secuencias A→B(→C) repetidas de calls del agente. +-- Issue 0087 parte B. Candidatas a promoverse a pipeline one-shot. +-- Aditivo. Idempotente. Aplicado via embed.FS al abrir operations.db. + +CREATE TABLE IF NOT EXISTS function_sequences ( + seq_hash TEXT PRIMARY KEY, + function_ids TEXT NOT NULL, -- JSON array ["fn_a","fn_b"] o ["fn_a","fn_b","fn_c"] + length INTEGER NOT NULL, + occurrences INTEGER NOT NULL DEFAULT 0, + sessions_count INTEGER NOT NULL DEFAULT 0, + success_count INTEGER NOT NULL DEFAULT 0, + last_seen INTEGER NOT NULL, + first_seen INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_function_sequences_occurrences ON function_sequences(occurrences); +CREATE INDEX IF NOT EXISTS idx_function_sequences_last_seen ON function_sequences(last_seen); +CREATE INDEX IF NOT EXISTS idx_function_sequences_length ON function_sequences(length); diff --git a/migrations/007_calls_command_snippet.sql b/migrations/007_calls_command_snippet.sql new file mode 100644 index 0000000..70cecf8 --- /dev/null +++ b/migrations/007_calls_command_snippet.sql @@ -0,0 +1,13 @@ +-- 007_calls_command_snippet.sql — issue 0087 +-- +-- Anade columna command_snippet a `calls` para guardar un fragmento del +-- comando/heredoc original cuando NO golpea una funcion del registry +-- (function_id == ''). Sirve al Monitor tab del registry_dashboard para +-- mostrar "que codigo lanzo Claude" en filas que no son llamadas registry. +-- +-- - Solo se rellena cuando function_id IS empty (politica del hook PostToolUse). +-- - Truncado a 200 chars maximo. Redactado: args sensibles (paths privados, +-- tokens, passwords) se reemplazan por antes de persistir. +-- - Aditivo, idempotente. SQLite < 3.35 no soporta DROP COLUMN, asi que +-- esta columna se queda para siempre — perfecto, es nuestra intencion. +ALTER TABLE calls ADD COLUMN command_snippet TEXT NOT NULL DEFAULT ''; diff --git a/operations.db b/operations.db index 98c88fc..b538bca 100644 Binary files a/operations.db and b/operations.db differ diff --git a/operations.db-shm b/operations.db-shm index a4c8204..1130b52 100644 Binary files a/operations.db-shm and b/operations.db-shm differ diff --git a/operations.db-wal b/operations.db-wal index d1def94..a12a7ff 100644 Binary files a/operations.db-wal and b/operations.db-wal differ diff --git a/sequences.go b/sequences.go new file mode 100644 index 0000000..d4b5f2a --- /dev/null +++ b/sequences.go @@ -0,0 +1,622 @@ +package main + +import ( + "crypto/sha256" + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "text/tabwriter" + "time" + + "fn-registry/functions/infra" +) + +// SequenceConfig holds tunable parameters for sequence detection. +type SequenceConfig struct { + WindowSecs int // max gap between consecutive calls in same sequence + LookbackDays int // how far back to scan calls + MinOccurrences int // minimum occurrences to be a candidate + MinSessions int // minimum distinct sessions to be a candidate + MinSuccessRate float64 +} + +func defaultSequenceConfig() SequenceConfig { + return SequenceConfig{ + WindowSecs: 30, + LookbackDays: 30, + MinOccurrences: 5, + MinSessions: 2, + MinSuccessRate: 0.9, + } +} + +// callRow is a single row from the calls table. +type callRow struct { + ID int64 + SessionID string + FunctionID string + Success int + TS int64 +} + +// seqKey is the canonical identifier for a sequence. +type seqKey struct { + Hash string // sha256[:12] of joined function_ids + FunctionIDs []string // ordered list + Length int +} + +// seqStats accumulated across sessions. +type seqStats struct { + Key seqKey + Occurrences int + Sessions map[string]struct{} // distinct session_ids + SuccessCount int // all calls in sequence succeeded + FirstSeen int64 + LastSeen int64 +} + +// SequenceResult is the final merged record for a sequence. +type SequenceResult struct { + SeqHash string + FunctionIDs []string + Length int + Occurrences int + SessionCount int + SuccessCount int + SuccessRate float64 + FirstSeen int64 + LastSeen int64 +} + +// SequenceCandidate wraps SequenceResult with pipeline-promotion metadata. +type SequenceCandidate struct { + SequenceResult + SuggestedName string + CoveredBy string // ID of existing pipeline that covers it, or "" + ProposalExists bool // true if a pending proposal already exists +} + +// detectSequences reads calls from operations.db, computes sequences, upserts +// into function_sequences, and returns candidates for promotion. +func detectSequences(db *DB, cfg SequenceConfig) ([]SequenceResult, error) { + cutoff := time.Now().Unix() - int64(cfg.LookbackDays)*86400 + + rows, err := db.conn.Query(` + SELECT id, session_id, function_id, success, ts + FROM calls + WHERE function_id != '' + AND session_id != '' + AND ts >= ? + ORDER BY session_id, ts`, cutoff) + if err != nil { + return nil, fmt.Errorf("query calls: %w", err) + } + defer rows.Close() + + var calls []callRow + for rows.Next() { + var c callRow + if err := rows.Scan(&c.ID, &c.SessionID, &c.FunctionID, &c.Success, &c.TS); err != nil { + return nil, err + } + calls = append(calls, c) + } + if err := rows.Err(); err != nil { + return nil, err + } + + // Group by session. + bySession := map[string][]callRow{} + for _, c := range calls { + bySession[c.SessionID] = append(bySession[c.SessionID], c) + } + + // Accumulate sequence stats. + statsMap := map[string]*seqStats{} + + for sessID, sessionCalls := range bySession { + // Slide a window of 2 and 3 over consecutive calls within the time gap. + for i := 0; i < len(sessionCalls); i++ { + // Try length 2. + if i+1 < len(sessionCalls) { + c0, c1 := sessionCalls[i], sessionCalls[i+1] + if c1.TS-c0.TS <= int64(cfg.WindowSecs) { + ids := []string{c0.FunctionID, c1.FunctionID} + acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1, c0.TS, c1.TS) + } + } + // Try length 3. + if i+2 < len(sessionCalls) { + c0, c1, c2 := sessionCalls[i], sessionCalls[i+1], sessionCalls[i+2] + if c1.TS-c0.TS <= int64(cfg.WindowSecs) && c2.TS-c1.TS <= int64(cfg.WindowSecs) { + ids := []string{c0.FunctionID, c1.FunctionID, c2.FunctionID} + acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1 && c2.Success == 1, c0.TS, c2.TS) + } + } + } + } + + // Upsert into function_sequences. + if err := upsertSequences(db.conn, statsMap); err != nil { + return nil, fmt.Errorf("upsert sequences: %w", err) + } + + // Return all detected sequences as SequenceResult. + results := make([]SequenceResult, 0, len(statsMap)) + for _, s := range statsMap { + rate := 0.0 + if s.Occurrences > 0 { + rate = float64(s.SuccessCount) / float64(s.Occurrences) + } + results = append(results, SequenceResult{ + SeqHash: s.Key.Hash, + FunctionIDs: s.Key.FunctionIDs, + Length: s.Key.Length, + Occurrences: s.Occurrences, + SessionCount: len(s.Sessions), + SuccessCount: s.SuccessCount, + SuccessRate: rate, + FirstSeen: s.FirstSeen, + LastSeen: s.LastSeen, + }) + } + return results, nil +} + +// filterCandidates applies promotion criteria to the full result set. +func filterCandidates(results []SequenceResult, cfg SequenceConfig) []SequenceResult { + var out []SequenceResult + for _, r := range results { + if r.Occurrences >= cfg.MinOccurrences && + r.SessionCount >= cfg.MinSessions && + r.SuccessRate >= cfg.MinSuccessRate { + out = append(out, r) + } + } + return out +} + +// enrichCandidates looks up registry.db to check if any existing pipeline +// covers the sequence and whether a pending proposal already exists. +func enrichCandidates(candidates []SequenceResult, registryRoot string) ([]SequenceCandidate, error) { + regPath := filepath.Join(registryRoot, "registry.db") + conn, err := sql.Open("sqlite3", regPath+"?_journal_mode=WAL") + if err != nil { + return nil, fmt.Errorf("open registry: %w", err) + } + defer conn.Close() + + // Load pipelines with their uses_functions. + type pipe struct { + ID string + UsesFunctions []string + } + prows, err := conn.Query("SELECT id, uses_functions FROM functions WHERE kind='pipeline' AND uses_functions != '[]'") + if err != nil { + return nil, fmt.Errorf("query pipelines: %w", err) + } + var pipelines []pipe + for prows.Next() { + var id, raw string + if err := prows.Scan(&id, &raw); err != nil { + continue + } + var ids []string + _ = json.Unmarshal([]byte(raw), &ids) + pipelines = append(pipelines, pipe{id, ids}) + } + prows.Close() + + // Load existing pending proposals for seq_hash. + pendingSeqs := map[string]struct{}{} + propRows, err := conn.Query("SELECT evidence FROM proposals WHERE status='pending' AND kind='new_pipeline'") + if err == nil { + for propRows.Next() { + var rawEvidence string + if err := propRows.Scan(&rawEvidence); err != nil { + continue + } + var ev map[string]any + if err := json.Unmarshal([]byte(rawEvidence), &ev); err != nil { + continue + } + if h, ok := ev["sequence_hash"].(string); ok { + pendingSeqs[h] = struct{}{} + } + } + propRows.Close() + } + + out := make([]SequenceCandidate, 0, len(candidates)) + for _, c := range candidates { + cand := SequenceCandidate{ + SequenceResult: c, + SuggestedName: suggestPipelineName(c.FunctionIDs), + } + + // Check coverage: a pipeline covers the sequence if its uses_functions + // contains all sequence function_ids as a subset. + seqSet := map[string]struct{}{} + for _, id := range c.FunctionIDs { + seqSet[id] = struct{}{} + } + for _, p := range pipelines { + pipeSet := map[string]struct{}{} + for _, id := range p.UsesFunctions { + pipeSet[id] = struct{}{} + } + covered := true + for id := range seqSet { + if _, ok := pipeSet[id]; !ok { + covered = false + break + } + } + if covered { + cand.CoveredBy = p.ID + break + } + } + + if _, ok := pendingSeqs[c.SeqHash]; ok { + cand.ProposalExists = true + } + + out = append(out, cand) + } + return out, nil +} + +// buildProposalsFromCandidates creates infra.ProposalDraft entries for uncovered, +// non-duplicate candidates. +func buildProposalsFromCandidates(candidates []SequenceCandidate) []infra.ProposalDraft { + var drafts []infra.ProposalDraft + for _, c := range candidates { + if c.CoveredBy != "" || c.ProposalExists { + continue + } + exampleSessions := []string{} // sessions not available at this point; seq_hash is enough + evidence := map[string]any{ + "rule_id": "sequence_promote", + "sequence_hash": c.SeqHash, + "function_ids": c.FunctionIDs, + "occurrences": c.Occurrences, + "sessions_count": c.SessionCount, + "success_rate": fmt.Sprintf("%.2f", c.SuccessRate), + "last_seen": c.LastSeen, + "first_seen": c.FirstSeen, + "session_ids": exampleSessions, + "lookback_days": 30, + } + title := fmt.Sprintf("Promote sequence %s to pipeline", joinArrow(c.FunctionIDs)) + desc := fmt.Sprintf( + "Sequence `%s` appeared %d times across %d sessions "+ + "(success rate %.0f%%). Promote to pipeline `%s` to make it one-shot.", + joinArrow(c.FunctionIDs), c.Occurrences, c.SessionCount, c.SuccessRate*100, c.SuggestedName, + ) + drafts = append(drafts, infra.ProposalDraft{ + ID: deterministicSeqID(c.SeqHash), + Kind: "new_pipeline", + TargetID: "", + Title: title, + Description: desc, + RuleID: "sequence_promote", + Evidence: evidence, + }) + } + return drafts +} + +// ---- Output helpers ---- + +func printSequenceReport(w *tabwriter.Writer, allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) { + len2 := 0 + len3 := 0 + for _, r := range allResults { + if r.Length == 2 { + len2++ + } else { + len3++ + } + } + + fmt.Fprintf(w, "SEQUENCE DETECTION — window %ds, lookback %dd\n", cfg.WindowSecs, cfg.LookbackDays) + fmt.Fprintln(w, "─────────────────────────────────────────────────") + fmt.Fprintf(w, "Sequences found:\t%d\n", len(allResults)) + fmt.Fprintf(w, " Length 2:\t%d\n", len2) + fmt.Fprintf(w, " Length 3:\t%d\n", len3) + fmt.Fprintf(w, "Promotion candidates:\t%d\n", len(candidates)) + fmt.Fprintf(w, " (occ >= %d, sess >= %d, success_rate >= %.0f%%)\n", + cfg.MinOccurrences, cfg.MinSessions, cfg.MinSuccessRate*100) + w.Flush() + + if len(candidates) == 0 { + fmt.Println("\nNo promotion candidates yet.") + return + } + + fmt.Println("\nCANDIDATES:") + for i, c := range candidates { + ago := humanAgo(c.LastSeen) + fmt.Printf("[%d] occ=%d sess=%d success=%.0f%% last=%s\n", + i+1, c.Occurrences, c.SessionCount, c.SuccessRate*100, ago) + for j, id := range c.FunctionIDs { + if j == 0 { + fmt.Printf(" %s\n", id) + } else { + fmt.Printf(" → %s\n", id) + } + } + fmt.Printf(" Suggested pipeline name: %s\n", c.SuggestedName) + if c.CoveredBy != "" { + fmt.Printf(" Already covered by: %s ✓ SKIP\n", c.CoveredBy) + } else if c.ProposalExists { + fmt.Printf(" Pending proposal already exists → SKIP\n") + } else { + fmt.Printf(" NOT COVERED → proposal candidate\n") + } + fmt.Println() + } +} + +func printSequenceReportJSON(allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) error { + out := map[string]any{ + "window_secs": cfg.WindowSecs, + "lookback_days": cfg.LookbackDays, + "min_occurrences": cfg.MinOccurrences, + "min_sessions": cfg.MinSessions, + "min_success_rate": cfg.MinSuccessRate, + "sequences_total": len(allResults), + "sequences_length_2": countByLen(allResults, 2), + "sequences_length_3": countByLen(allResults, 3), + "candidates_total": len(candidates), + "candidates": candidates, + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(out) +} + +// ---- DB helpers ---- + +func upsertSequences(conn *sql.DB, statsMap map[string]*seqStats) error { + tx, err := conn.Begin() + if err != nil { + return err + } + stmt, err := tx.Prepare(` + INSERT INTO function_sequences + (seq_hash, function_ids, length, occurrences, sessions_count, success_count, first_seen, last_seen) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(seq_hash) DO UPDATE SET + occurrences = occurrences + excluded.occurrences, + sessions_count = MAX(sessions_count, excluded.sessions_count), + success_count = success_count + excluded.success_count, + last_seen = MAX(last_seen, excluded.last_seen), + first_seen = MIN(first_seen, excluded.first_seen)`) + if err != nil { + _ = tx.Rollback() + return err + } + defer stmt.Close() + + for _, s := range statsMap { + raw, _ := json.Marshal(s.Key.FunctionIDs) + _, err := stmt.Exec( + s.Key.Hash, + string(raw), + s.Key.Length, + s.Occurrences, + len(s.Sessions), + s.SuccessCount, + s.FirstSeen, + s.LastSeen, + ) + if err != nil { + _ = tx.Rollback() + return err + } + } + return tx.Commit() +} + +// ---- Pure helpers ---- + +func seqHash(ids []string) string { + key := strings.Join(ids, "|") + h := sha256.Sum256([]byte(key)) + return fmt.Sprintf("%x", h)[:12] +} + +func acc(m map[string]*seqStats, ids []string, sessID string, success bool, firstTS, lastTS int64) { + h := seqHash(ids) + s, ok := m[h] + if !ok { + cp := make([]string, len(ids)) + copy(cp, ids) + s = &seqStats{ + Key: seqKey{Hash: h, FunctionIDs: cp, Length: len(ids)}, + Sessions: map[string]struct{}{}, + } + m[h] = s + } + s.Occurrences++ + s.Sessions[sessID] = struct{}{} + if success { + s.SuccessCount++ + } + if s.FirstSeen == 0 || firstTS < s.FirstSeen { + s.FirstSeen = firstTS + } + if lastTS > s.LastSeen { + s.LastSeen = lastTS + } +} + +// suggestPipelineName derives a pipeline name from function IDs by extracting +// meaningful tokens and joining them. +func suggestPipelineName(ids []string) string { + // Strip lang/domain suffix: "redeploy_cpp_app_windows_bash_infra" → "redeploy_cpp_app_windows" + // Strategy: remove last two underscore-separated tokens if they look like lang/domain. + langs := map[string]bool{"go": true, "py": true, "bash": true, "typescript": true} + domains := map[string]bool{ + "core": true, "infra": true, "finance": true, "datascience": true, + "cybersecurity": true, "shell": true, "tui": true, "pipelines": true, + "browser": true, "notebook": true, + } + + strip := func(id string) string { + parts := strings.Split(id, "_") + n := len(parts) + if n >= 2 && langs[parts[n-1]] { + parts = parts[:n-1] + n-- + } + if n >= 2 && domains[parts[n-1]] { + parts = parts[:n-1] + } + return strings.Join(parts, "_") + } + + tokens := make([]string, 0, len(ids)) + seen := map[string]bool{} + for _, id := range ids { + base := strip(id) + // Take the first meaningful token (verb) from each. + parts := strings.SplitN(base, "_", 2) + verb := parts[0] + if !seen[verb] { + tokens = append(tokens, verb) + seen[verb] = true + } + if len(tokens) < 3 { + // Also add the full base but trimmed. + if !seen[base] { + seen[base] = true + } + } + } + // Build: first_second_third_bash_pipelines style. + // Fallback: just join stripped names with _and_. + stripped := make([]string, len(ids)) + for i, id := range ids { + stripped[i] = strip(id) + } + return strings.Join(stripped, "_then_") + "_bash_pipelines" +} + +func joinArrow(ids []string) string { + return strings.Join(ids, " → ") +} + +func humanAgo(ts int64) string { + diff := time.Now().Unix() - ts + if diff < 60 { + return fmt.Sprintf("%ds ago", diff) + } + if diff < 3600 { + return fmt.Sprintf("%dm ago", diff/60) + } + if diff < 86400 { + return fmt.Sprintf("%dh ago", diff/3600) + } + return fmt.Sprintf("%dd ago", diff/86400) +} + +func countByLen(results []SequenceResult, l int) int { + n := 0 + for _, r := range results { + if r.Length == l { + n++ + } + } + return n +} + +func deterministicSeqID(seqHash string) string { + return "auto_seq_" + seqHash +} + +// ---- CLI runner ---- + +func runSequences( + dbPath string, + detect bool, + report bool, + propose bool, + formatJSON bool, + rootOverride string, + cfg SequenceConfig, +) { + if !detect { + fmt.Fprintln(os.Stderr, "sequences: specify --detect to run detection") + os.Exit(2) + } + + db, err := openDB(dbPath) + if err != nil { + fmt.Fprintf(os.Stderr, "error: open call_monitor db: %v\n", err) + os.Exit(1) + } + defer db.Close() + + allResults, err := detectSequences(db, cfg) + if err != nil { + fmt.Fprintf(os.Stderr, "error: detect sequences: %v\n", err) + os.Exit(1) + } + + rawCandidates := filterCandidates(allResults, cfg) + + root := resolveRegistryRoot(rootOverride) + var enriched []SequenceCandidate + + if _, err := os.Stat(filepath.Join(root, "registry.db")); err == nil { + enriched, err = enrichCandidates(rawCandidates, root) + if err != nil { + // Non-fatal: report without enrichment. + fmt.Fprintf(os.Stderr, "warning: enrich candidates: %v\n", err) + for _, r := range rawCandidates { + enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)}) + } + } + } else { + for _, r := range rawCandidates { + enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)}) + } + } + + if report || !propose { + if formatJSON { + if err := printSequenceReportJSON(allResults, enriched, cfg); err != nil { + fmt.Fprintf(os.Stderr, "error: json output: %v\n", err) + os.Exit(1) + } + } else { + tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + printSequenceReport(tw, allResults, enriched, cfg) + } + } + + if propose { + if _, err := os.Stat(filepath.Join(root, "registry.db")); err != nil { + fmt.Fprintf(os.Stderr, "error: registry.db not found — cannot write proposals\n") + os.Exit(1) + } + drafts := buildProposalsFromCandidates(enriched) + if len(drafts) == 0 { + fmt.Println("sequences --propose: no new proposals to write (all candidates covered or already pending).") + return + } + inserted, total, err := infra.PersistProposalDrafts(root, drafts) + if err != nil { + fmt.Fprintf(os.Stderr, "error: persist proposals: %v\n", err) + os.Exit(1) + } + fmt.Printf("sequences --propose: %d new proposals inserted (out of %d candidates)\n", inserted, total) + } +} diff --git a/systemd/call_monitor_sequences.service b/systemd/call_monitor_sequences.service new file mode 100644 index 0000000..023f0fa --- /dev/null +++ b/systemd/call_monitor_sequences.service @@ -0,0 +1,15 @@ +[Unit] +Description=Detect repeated function-call sequences and propose pipelines (issue 0087) +After=network.target + +[Service] +Type=oneshot +WorkingDirectory=/home/lucas/fn_registry/projects/fn_monitoring/apps/call_monitor +ExecStart=/home/lucas/fn_registry/projects/fn_monitoring/apps/call_monitor/call_monitor sequences --detect --propose --report +StandardOutput=journal +StandardError=journal +Environment="FN_REGISTRY_ROOT=/home/lucas/fn_registry" +Environment="HOME=/home/lucas" + +[Install] +WantedBy=default.target diff --git a/systemd/call_monitor_sequences.timer b/systemd/call_monitor_sequences.timer new file mode 100644 index 0000000..f393e8b --- /dev/null +++ b/systemd/call_monitor_sequences.timer @@ -0,0 +1,10 @@ +[Unit] +Description=Run call_monitor sequences detection every 6 hours (issue 0087) + +[Timer] +OnCalendar=*-*-* 00/6:00:00 +Persistent=true +Unit=call_monitor_sequences.service + +[Install] +WantedBy=timers.target