package main import ( "crypto/sha256" "database/sql" "encoding/json" "fmt" "os" "path/filepath" "strings" "text/tabwriter" "time" "fn-registry/functions/infra" ) // SequenceConfig holds tunable parameters for sequence detection. type SequenceConfig struct { WindowSecs int // max gap between consecutive calls in same sequence LookbackDays int // how far back to scan calls MinOccurrences int // minimum occurrences to be a candidate MinSessions int // minimum distinct sessions to be a candidate MinSuccessRate float64 } func defaultSequenceConfig() SequenceConfig { return SequenceConfig{ WindowSecs: 30, LookbackDays: 30, MinOccurrences: 5, MinSessions: 2, MinSuccessRate: 0.9, } } // callRow is a single row from the calls table. type callRow struct { ID int64 SessionID string FunctionID string Success int TS int64 } // seqKey is the canonical identifier for a sequence. type seqKey struct { Hash string // sha256[:12] of joined function_ids FunctionIDs []string // ordered list Length int } // seqStats accumulated across sessions. type seqStats struct { Key seqKey Occurrences int Sessions map[string]struct{} // distinct session_ids SuccessCount int // all calls in sequence succeeded FirstSeen int64 LastSeen int64 } // SequenceResult is the final merged record for a sequence. type SequenceResult struct { SeqHash string FunctionIDs []string Length int Occurrences int SessionCount int SuccessCount int SuccessRate float64 FirstSeen int64 LastSeen int64 } // SequenceCandidate wraps SequenceResult with pipeline-promotion metadata. type SequenceCandidate struct { SequenceResult SuggestedName string CoveredBy string // ID of existing pipeline that covers it, or "" ProposalExists bool // true if a pending proposal already exists } // detectSequences reads calls from operations.db, computes sequences, upserts // into function_sequences, and returns candidates for promotion. func detectSequences(db *DB, cfg SequenceConfig) ([]SequenceResult, error) { cutoff := time.Now().Unix() - int64(cfg.LookbackDays)*86400 rows, err := db.conn.Query(` SELECT id, session_id, function_id, success, ts FROM calls WHERE function_id != '' AND session_id != '' AND ts >= ? ORDER BY session_id, ts`, cutoff) if err != nil { return nil, fmt.Errorf("query calls: %w", err) } defer rows.Close() var calls []callRow for rows.Next() { var c callRow if err := rows.Scan(&c.ID, &c.SessionID, &c.FunctionID, &c.Success, &c.TS); err != nil { return nil, err } calls = append(calls, c) } if err := rows.Err(); err != nil { return nil, err } // Group by session. bySession := map[string][]callRow{} for _, c := range calls { bySession[c.SessionID] = append(bySession[c.SessionID], c) } // Accumulate sequence stats. statsMap := map[string]*seqStats{} for sessID, sessionCalls := range bySession { // Slide a window of 2 and 3 over consecutive calls within the time gap. for i := 0; i < len(sessionCalls); i++ { // Try length 2. if i+1 < len(sessionCalls) { c0, c1 := sessionCalls[i], sessionCalls[i+1] if c1.TS-c0.TS <= int64(cfg.WindowSecs) { ids := []string{c0.FunctionID, c1.FunctionID} acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1, c0.TS, c1.TS) } } // Try length 3. if i+2 < len(sessionCalls) { c0, c1, c2 := sessionCalls[i], sessionCalls[i+1], sessionCalls[i+2] if c1.TS-c0.TS <= int64(cfg.WindowSecs) && c2.TS-c1.TS <= int64(cfg.WindowSecs) { ids := []string{c0.FunctionID, c1.FunctionID, c2.FunctionID} acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1 && c2.Success == 1, c0.TS, c2.TS) } } } } // Upsert into function_sequences. if err := upsertSequences(db.conn, statsMap); err != nil { return nil, fmt.Errorf("upsert sequences: %w", err) } // Return all detected sequences as SequenceResult. results := make([]SequenceResult, 0, len(statsMap)) for _, s := range statsMap { rate := 0.0 if s.Occurrences > 0 { rate = float64(s.SuccessCount) / float64(s.Occurrences) } results = append(results, SequenceResult{ SeqHash: s.Key.Hash, FunctionIDs: s.Key.FunctionIDs, Length: s.Key.Length, Occurrences: s.Occurrences, SessionCount: len(s.Sessions), SuccessCount: s.SuccessCount, SuccessRate: rate, FirstSeen: s.FirstSeen, LastSeen: s.LastSeen, }) } return results, nil } // filterCandidates applies promotion criteria to the full result set. func filterCandidates(results []SequenceResult, cfg SequenceConfig) []SequenceResult { var out []SequenceResult for _, r := range results { if r.Occurrences >= cfg.MinOccurrences && r.SessionCount >= cfg.MinSessions && r.SuccessRate >= cfg.MinSuccessRate { out = append(out, r) } } return out } // enrichCandidates looks up registry.db to check if any existing pipeline // covers the sequence and whether a pending proposal already exists. func enrichCandidates(candidates []SequenceResult, registryRoot string) ([]SequenceCandidate, error) { regPath := filepath.Join(registryRoot, "registry.db") conn, err := sql.Open("sqlite3", regPath+"?_journal_mode=WAL") if err != nil { return nil, fmt.Errorf("open registry: %w", err) } defer conn.Close() // Load pipelines with their uses_functions. type pipe struct { ID string UsesFunctions []string } prows, err := conn.Query("SELECT id, uses_functions FROM functions WHERE kind='pipeline' AND uses_functions != '[]'") if err != nil { return nil, fmt.Errorf("query pipelines: %w", err) } var pipelines []pipe for prows.Next() { var id, raw string if err := prows.Scan(&id, &raw); err != nil { continue } var ids []string _ = json.Unmarshal([]byte(raw), &ids) pipelines = append(pipelines, pipe{id, ids}) } prows.Close() // Load existing pending proposals for seq_hash. pendingSeqs := map[string]struct{}{} propRows, err := conn.Query("SELECT evidence FROM proposals WHERE status='pending' AND kind='new_pipeline'") if err == nil { for propRows.Next() { var rawEvidence string if err := propRows.Scan(&rawEvidence); err != nil { continue } var ev map[string]any if err := json.Unmarshal([]byte(rawEvidence), &ev); err != nil { continue } if h, ok := ev["sequence_hash"].(string); ok { pendingSeqs[h] = struct{}{} } } propRows.Close() } out := make([]SequenceCandidate, 0, len(candidates)) for _, c := range candidates { cand := SequenceCandidate{ SequenceResult: c, SuggestedName: suggestPipelineName(c.FunctionIDs), } // Check coverage: a pipeline covers the sequence if its uses_functions // contains all sequence function_ids as a subset. seqSet := map[string]struct{}{} for _, id := range c.FunctionIDs { seqSet[id] = struct{}{} } for _, p := range pipelines { pipeSet := map[string]struct{}{} for _, id := range p.UsesFunctions { pipeSet[id] = struct{}{} } covered := true for id := range seqSet { if _, ok := pipeSet[id]; !ok { covered = false break } } if covered { cand.CoveredBy = p.ID break } } if _, ok := pendingSeqs[c.SeqHash]; ok { cand.ProposalExists = true } out = append(out, cand) } return out, nil } // buildProposalsFromCandidates creates infra.ProposalDraft entries for uncovered, // non-duplicate candidates. func buildProposalsFromCandidates(candidates []SequenceCandidate) []infra.ProposalDraft { var drafts []infra.ProposalDraft for _, c := range candidates { if c.CoveredBy != "" || c.ProposalExists { continue } exampleSessions := []string{} // sessions not available at this point; seq_hash is enough evidence := map[string]any{ "rule_id": "sequence_promote", "sequence_hash": c.SeqHash, "function_ids": c.FunctionIDs, "occurrences": c.Occurrences, "sessions_count": c.SessionCount, "success_rate": fmt.Sprintf("%.2f", c.SuccessRate), "last_seen": c.LastSeen, "first_seen": c.FirstSeen, "session_ids": exampleSessions, "lookback_days": 30, } title := fmt.Sprintf("Promote sequence %s to pipeline", joinArrow(c.FunctionIDs)) desc := fmt.Sprintf( "Sequence `%s` appeared %d times across %d sessions "+ "(success rate %.0f%%). Promote to pipeline `%s` to make it one-shot.", joinArrow(c.FunctionIDs), c.Occurrences, c.SessionCount, c.SuccessRate*100, c.SuggestedName, ) drafts = append(drafts, infra.ProposalDraft{ ID: deterministicSeqID(c.SeqHash), Kind: "new_pipeline", TargetID: "", Title: title, Description: desc, RuleID: "sequence_promote", Evidence: evidence, }) } return drafts } // ---- Output helpers ---- func printSequenceReport(w *tabwriter.Writer, allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) { len2 := 0 len3 := 0 for _, r := range allResults { if r.Length == 2 { len2++ } else { len3++ } } fmt.Fprintf(w, "SEQUENCE DETECTION — window %ds, lookback %dd\n", cfg.WindowSecs, cfg.LookbackDays) fmt.Fprintln(w, "─────────────────────────────────────────────────") fmt.Fprintf(w, "Sequences found:\t%d\n", len(allResults)) fmt.Fprintf(w, " Length 2:\t%d\n", len2) fmt.Fprintf(w, " Length 3:\t%d\n", len3) fmt.Fprintf(w, "Promotion candidates:\t%d\n", len(candidates)) fmt.Fprintf(w, " (occ >= %d, sess >= %d, success_rate >= %.0f%%)\n", cfg.MinOccurrences, cfg.MinSessions, cfg.MinSuccessRate*100) w.Flush() if len(candidates) == 0 { fmt.Println("\nNo promotion candidates yet.") return } fmt.Println("\nCANDIDATES:") for i, c := range candidates { ago := humanAgo(c.LastSeen) fmt.Printf("[%d] occ=%d sess=%d success=%.0f%% last=%s\n", i+1, c.Occurrences, c.SessionCount, c.SuccessRate*100, ago) for j, id := range c.FunctionIDs { if j == 0 { fmt.Printf(" %s\n", id) } else { fmt.Printf(" → %s\n", id) } } fmt.Printf(" Suggested pipeline name: %s\n", c.SuggestedName) if c.CoveredBy != "" { fmt.Printf(" Already covered by: %s ✓ SKIP\n", c.CoveredBy) } else if c.ProposalExists { fmt.Printf(" Pending proposal already exists → SKIP\n") } else { fmt.Printf(" NOT COVERED → proposal candidate\n") } fmt.Println() } } func printSequenceReportJSON(allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) error { out := map[string]any{ "window_secs": cfg.WindowSecs, "lookback_days": cfg.LookbackDays, "min_occurrences": cfg.MinOccurrences, "min_sessions": cfg.MinSessions, "min_success_rate": cfg.MinSuccessRate, "sequences_total": len(allResults), "sequences_length_2": countByLen(allResults, 2), "sequences_length_3": countByLen(allResults, 3), "candidates_total": len(candidates), "candidates": candidates, } enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") return enc.Encode(out) } // ---- DB helpers ---- func upsertSequences(conn *sql.DB, statsMap map[string]*seqStats) error { tx, err := conn.Begin() if err != nil { return err } stmt, err := tx.Prepare(` INSERT INTO function_sequences (seq_hash, function_ids, length, occurrences, sessions_count, success_count, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(seq_hash) DO UPDATE SET occurrences = occurrences + excluded.occurrences, sessions_count = MAX(sessions_count, excluded.sessions_count), success_count = success_count + excluded.success_count, last_seen = MAX(last_seen, excluded.last_seen), first_seen = MIN(first_seen, excluded.first_seen)`) if err != nil { _ = tx.Rollback() return err } defer stmt.Close() for _, s := range statsMap { raw, _ := json.Marshal(s.Key.FunctionIDs) _, err := stmt.Exec( s.Key.Hash, string(raw), s.Key.Length, s.Occurrences, len(s.Sessions), s.SuccessCount, s.FirstSeen, s.LastSeen, ) if err != nil { _ = tx.Rollback() return err } } return tx.Commit() } // ---- Pure helpers ---- func seqHash(ids []string) string { key := strings.Join(ids, "|") h := sha256.Sum256([]byte(key)) return fmt.Sprintf("%x", h)[:12] } func acc(m map[string]*seqStats, ids []string, sessID string, success bool, firstTS, lastTS int64) { h := seqHash(ids) s, ok := m[h] if !ok { cp := make([]string, len(ids)) copy(cp, ids) s = &seqStats{ Key: seqKey{Hash: h, FunctionIDs: cp, Length: len(ids)}, Sessions: map[string]struct{}{}, } m[h] = s } s.Occurrences++ s.Sessions[sessID] = struct{}{} if success { s.SuccessCount++ } if s.FirstSeen == 0 || firstTS < s.FirstSeen { s.FirstSeen = firstTS } if lastTS > s.LastSeen { s.LastSeen = lastTS } } // suggestPipelineName derives a pipeline name from function IDs by extracting // meaningful tokens and joining them. func suggestPipelineName(ids []string) string { // Strip lang/domain suffix: "redeploy_cpp_app_windows_bash_infra" → "redeploy_cpp_app_windows" // Strategy: remove last two underscore-separated tokens if they look like lang/domain. langs := map[string]bool{"go": true, "py": true, "bash": true, "typescript": true} domains := map[string]bool{ "core": true, "infra": true, "finance": true, "datascience": true, "cybersecurity": true, "shell": true, "tui": true, "pipelines": true, "browser": true, "notebook": true, } strip := func(id string) string { parts := strings.Split(id, "_") n := len(parts) if n >= 2 && langs[parts[n-1]] { parts = parts[:n-1] n-- } if n >= 2 && domains[parts[n-1]] { parts = parts[:n-1] } return strings.Join(parts, "_") } tokens := make([]string, 0, len(ids)) seen := map[string]bool{} for _, id := range ids { base := strip(id) // Take the first meaningful token (verb) from each. parts := strings.SplitN(base, "_", 2) verb := parts[0] if !seen[verb] { tokens = append(tokens, verb) seen[verb] = true } if len(tokens) < 3 { // Also add the full base but trimmed. if !seen[base] { seen[base] = true } } } // Build: first_second_third_bash_pipelines style. // Fallback: just join stripped names with _and_. stripped := make([]string, len(ids)) for i, id := range ids { stripped[i] = strip(id) } return strings.Join(stripped, "_then_") + "_bash_pipelines" } func joinArrow(ids []string) string { return strings.Join(ids, " → ") } func humanAgo(ts int64) string { diff := time.Now().Unix() - ts if diff < 60 { return fmt.Sprintf("%ds ago", diff) } if diff < 3600 { return fmt.Sprintf("%dm ago", diff/60) } if diff < 86400 { return fmt.Sprintf("%dh ago", diff/3600) } return fmt.Sprintf("%dd ago", diff/86400) } func countByLen(results []SequenceResult, l int) int { n := 0 for _, r := range results { if r.Length == l { n++ } } return n } func deterministicSeqID(seqHash string) string { return "auto_seq_" + seqHash } // ---- CLI runner ---- func runSequences( dbPath string, detect bool, report bool, propose bool, formatJSON bool, rootOverride string, cfg SequenceConfig, ) { if !detect { fmt.Fprintln(os.Stderr, "sequences: specify --detect to run detection") os.Exit(2) } db, err := openDB(dbPath) if err != nil { fmt.Fprintf(os.Stderr, "error: open call_monitor db: %v\n", err) os.Exit(1) } defer db.Close() allResults, err := detectSequences(db, cfg) if err != nil { fmt.Fprintf(os.Stderr, "error: detect sequences: %v\n", err) os.Exit(1) } rawCandidates := filterCandidates(allResults, cfg) root := resolveRegistryRoot(rootOverride) var enriched []SequenceCandidate if _, err := os.Stat(filepath.Join(root, "registry.db")); err == nil { enriched, err = enrichCandidates(rawCandidates, root) if err != nil { // Non-fatal: report without enrichment. fmt.Fprintf(os.Stderr, "warning: enrich candidates: %v\n", err) for _, r := range rawCandidates { enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)}) } } } else { for _, r := range rawCandidates { enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)}) } } if report || !propose { if formatJSON { if err := printSequenceReportJSON(allResults, enriched, cfg); err != nil { fmt.Fprintf(os.Stderr, "error: json output: %v\n", err) os.Exit(1) } } else { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) printSequenceReport(tw, allResults, enriched, cfg) } } if propose { if _, err := os.Stat(filepath.Join(root, "registry.db")); err != nil { fmt.Fprintf(os.Stderr, "error: registry.db not found — cannot write proposals\n") os.Exit(1) } drafts := buildProposalsFromCandidates(enriched) if len(drafts) == 0 { fmt.Println("sequences --propose: no new proposals to write (all candidates covered or already pending).") return } inserted, total, err := infra.PersistProposalDrafts(root, drafts) if err != nil { fmt.Fprintf(os.Stderr, "error: persist proposals: %v\n", err) os.Exit(1) } fmt.Printf("sequences --propose: %d new proposals inserted (out of %d candidates)\n", inserted, total) } }