Files
egutierrez 134d8cf59e chore: auto-commit (11 archivos)
- app.md
- call_monitor
- db.go
- main.go
- operations.db
- operations.db-shm
- operations.db-wal
- migrations/006_function_sequences.sql
- migrations/007_calls_command_snippet.sql
- sequences.go
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 02:06:45 +02:00

623 lines
17 KiB
Go

package main
import (
"crypto/sha256"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"text/tabwriter"
"time"
"fn-registry/functions/infra"
)
// SequenceConfig holds tunable parameters for sequence detection.
type SequenceConfig struct {
WindowSecs int // max gap between consecutive calls in same sequence
LookbackDays int // how far back to scan calls
MinOccurrences int // minimum occurrences to be a candidate
MinSessions int // minimum distinct sessions to be a candidate
MinSuccessRate float64
}
func defaultSequenceConfig() SequenceConfig {
return SequenceConfig{
WindowSecs: 30,
LookbackDays: 30,
MinOccurrences: 5,
MinSessions: 2,
MinSuccessRate: 0.9,
}
}
// callRow is a single row from the calls table.
type callRow struct {
ID int64
SessionID string
FunctionID string
Success int
TS int64
}
// seqKey is the canonical identifier for a sequence.
type seqKey struct {
Hash string // sha256[:12] of joined function_ids
FunctionIDs []string // ordered list
Length int
}
// seqStats accumulated across sessions.
type seqStats struct {
Key seqKey
Occurrences int
Sessions map[string]struct{} // distinct session_ids
SuccessCount int // all calls in sequence succeeded
FirstSeen int64
LastSeen int64
}
// SequenceResult is the final merged record for a sequence.
type SequenceResult struct {
SeqHash string
FunctionIDs []string
Length int
Occurrences int
SessionCount int
SuccessCount int
SuccessRate float64
FirstSeen int64
LastSeen int64
}
// SequenceCandidate wraps SequenceResult with pipeline-promotion metadata.
type SequenceCandidate struct {
SequenceResult
SuggestedName string
CoveredBy string // ID of existing pipeline that covers it, or ""
ProposalExists bool // true if a pending proposal already exists
}
// detectSequences reads calls from operations.db, computes sequences, upserts
// into function_sequences, and returns candidates for promotion.
func detectSequences(db *DB, cfg SequenceConfig) ([]SequenceResult, error) {
cutoff := time.Now().Unix() - int64(cfg.LookbackDays)*86400
rows, err := db.conn.Query(`
SELECT id, session_id, function_id, success, ts
FROM calls
WHERE function_id != ''
AND session_id != ''
AND ts >= ?
ORDER BY session_id, ts`, cutoff)
if err != nil {
return nil, fmt.Errorf("query calls: %w", err)
}
defer rows.Close()
var calls []callRow
for rows.Next() {
var c callRow
if err := rows.Scan(&c.ID, &c.SessionID, &c.FunctionID, &c.Success, &c.TS); err != nil {
return nil, err
}
calls = append(calls, c)
}
if err := rows.Err(); err != nil {
return nil, err
}
// Group by session.
bySession := map[string][]callRow{}
for _, c := range calls {
bySession[c.SessionID] = append(bySession[c.SessionID], c)
}
// Accumulate sequence stats.
statsMap := map[string]*seqStats{}
for sessID, sessionCalls := range bySession {
// Slide a window of 2 and 3 over consecutive calls within the time gap.
for i := 0; i < len(sessionCalls); i++ {
// Try length 2.
if i+1 < len(sessionCalls) {
c0, c1 := sessionCalls[i], sessionCalls[i+1]
if c1.TS-c0.TS <= int64(cfg.WindowSecs) {
ids := []string{c0.FunctionID, c1.FunctionID}
acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1, c0.TS, c1.TS)
}
}
// Try length 3.
if i+2 < len(sessionCalls) {
c0, c1, c2 := sessionCalls[i], sessionCalls[i+1], sessionCalls[i+2]
if c1.TS-c0.TS <= int64(cfg.WindowSecs) && c2.TS-c1.TS <= int64(cfg.WindowSecs) {
ids := []string{c0.FunctionID, c1.FunctionID, c2.FunctionID}
acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1 && c2.Success == 1, c0.TS, c2.TS)
}
}
}
}
// Upsert into function_sequences.
if err := upsertSequences(db.conn, statsMap); err != nil {
return nil, fmt.Errorf("upsert sequences: %w", err)
}
// Return all detected sequences as SequenceResult.
results := make([]SequenceResult, 0, len(statsMap))
for _, s := range statsMap {
rate := 0.0
if s.Occurrences > 0 {
rate = float64(s.SuccessCount) / float64(s.Occurrences)
}
results = append(results, SequenceResult{
SeqHash: s.Key.Hash,
FunctionIDs: s.Key.FunctionIDs,
Length: s.Key.Length,
Occurrences: s.Occurrences,
SessionCount: len(s.Sessions),
SuccessCount: s.SuccessCount,
SuccessRate: rate,
FirstSeen: s.FirstSeen,
LastSeen: s.LastSeen,
})
}
return results, nil
}
// filterCandidates applies promotion criteria to the full result set.
func filterCandidates(results []SequenceResult, cfg SequenceConfig) []SequenceResult {
var out []SequenceResult
for _, r := range results {
if r.Occurrences >= cfg.MinOccurrences &&
r.SessionCount >= cfg.MinSessions &&
r.SuccessRate >= cfg.MinSuccessRate {
out = append(out, r)
}
}
return out
}
// enrichCandidates looks up registry.db to check if any existing pipeline
// covers the sequence and whether a pending proposal already exists.
func enrichCandidates(candidates []SequenceResult, registryRoot string) ([]SequenceCandidate, error) {
regPath := filepath.Join(registryRoot, "registry.db")
conn, err := sql.Open("sqlite3", regPath+"?_journal_mode=WAL")
if err != nil {
return nil, fmt.Errorf("open registry: %w", err)
}
defer conn.Close()
// Load pipelines with their uses_functions.
type pipe struct {
ID string
UsesFunctions []string
}
prows, err := conn.Query("SELECT id, uses_functions FROM functions WHERE kind='pipeline' AND uses_functions != '[]'")
if err != nil {
return nil, fmt.Errorf("query pipelines: %w", err)
}
var pipelines []pipe
for prows.Next() {
var id, raw string
if err := prows.Scan(&id, &raw); err != nil {
continue
}
var ids []string
_ = json.Unmarshal([]byte(raw), &ids)
pipelines = append(pipelines, pipe{id, ids})
}
prows.Close()
// Load existing pending proposals for seq_hash.
pendingSeqs := map[string]struct{}{}
propRows, err := conn.Query("SELECT evidence FROM proposals WHERE status='pending' AND kind='new_pipeline'")
if err == nil {
for propRows.Next() {
var rawEvidence string
if err := propRows.Scan(&rawEvidence); err != nil {
continue
}
var ev map[string]any
if err := json.Unmarshal([]byte(rawEvidence), &ev); err != nil {
continue
}
if h, ok := ev["sequence_hash"].(string); ok {
pendingSeqs[h] = struct{}{}
}
}
propRows.Close()
}
out := make([]SequenceCandidate, 0, len(candidates))
for _, c := range candidates {
cand := SequenceCandidate{
SequenceResult: c,
SuggestedName: suggestPipelineName(c.FunctionIDs),
}
// Check coverage: a pipeline covers the sequence if its uses_functions
// contains all sequence function_ids as a subset.
seqSet := map[string]struct{}{}
for _, id := range c.FunctionIDs {
seqSet[id] = struct{}{}
}
for _, p := range pipelines {
pipeSet := map[string]struct{}{}
for _, id := range p.UsesFunctions {
pipeSet[id] = struct{}{}
}
covered := true
for id := range seqSet {
if _, ok := pipeSet[id]; !ok {
covered = false
break
}
}
if covered {
cand.CoveredBy = p.ID
break
}
}
if _, ok := pendingSeqs[c.SeqHash]; ok {
cand.ProposalExists = true
}
out = append(out, cand)
}
return out, nil
}
// buildProposalsFromCandidates creates infra.ProposalDraft entries for uncovered,
// non-duplicate candidates.
func buildProposalsFromCandidates(candidates []SequenceCandidate) []infra.ProposalDraft {
var drafts []infra.ProposalDraft
for _, c := range candidates {
if c.CoveredBy != "" || c.ProposalExists {
continue
}
exampleSessions := []string{} // sessions not available at this point; seq_hash is enough
evidence := map[string]any{
"rule_id": "sequence_promote",
"sequence_hash": c.SeqHash,
"function_ids": c.FunctionIDs,
"occurrences": c.Occurrences,
"sessions_count": c.SessionCount,
"success_rate": fmt.Sprintf("%.2f", c.SuccessRate),
"last_seen": c.LastSeen,
"first_seen": c.FirstSeen,
"session_ids": exampleSessions,
"lookback_days": 30,
}
title := fmt.Sprintf("Promote sequence %s to pipeline", joinArrow(c.FunctionIDs))
desc := fmt.Sprintf(
"Sequence `%s` appeared %d times across %d sessions "+
"(success rate %.0f%%). Promote to pipeline `%s` to make it one-shot.",
joinArrow(c.FunctionIDs), c.Occurrences, c.SessionCount, c.SuccessRate*100, c.SuggestedName,
)
drafts = append(drafts, infra.ProposalDraft{
ID: deterministicSeqID(c.SeqHash),
Kind: "new_pipeline",
TargetID: "",
Title: title,
Description: desc,
RuleID: "sequence_promote",
Evidence: evidence,
})
}
return drafts
}
// ---- Output helpers ----
func printSequenceReport(w *tabwriter.Writer, allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) {
len2 := 0
len3 := 0
for _, r := range allResults {
if r.Length == 2 {
len2++
} else {
len3++
}
}
fmt.Fprintf(w, "SEQUENCE DETECTION — window %ds, lookback %dd\n", cfg.WindowSecs, cfg.LookbackDays)
fmt.Fprintln(w, "─────────────────────────────────────────────────")
fmt.Fprintf(w, "Sequences found:\t%d\n", len(allResults))
fmt.Fprintf(w, " Length 2:\t%d\n", len2)
fmt.Fprintf(w, " Length 3:\t%d\n", len3)
fmt.Fprintf(w, "Promotion candidates:\t%d\n", len(candidates))
fmt.Fprintf(w, " (occ >= %d, sess >= %d, success_rate >= %.0f%%)\n",
cfg.MinOccurrences, cfg.MinSessions, cfg.MinSuccessRate*100)
w.Flush()
if len(candidates) == 0 {
fmt.Println("\nNo promotion candidates yet.")
return
}
fmt.Println("\nCANDIDATES:")
for i, c := range candidates {
ago := humanAgo(c.LastSeen)
fmt.Printf("[%d] occ=%d sess=%d success=%.0f%% last=%s\n",
i+1, c.Occurrences, c.SessionCount, c.SuccessRate*100, ago)
for j, id := range c.FunctionIDs {
if j == 0 {
fmt.Printf(" %s\n", id)
} else {
fmt.Printf(" → %s\n", id)
}
}
fmt.Printf(" Suggested pipeline name: %s\n", c.SuggestedName)
if c.CoveredBy != "" {
fmt.Printf(" Already covered by: %s ✓ SKIP\n", c.CoveredBy)
} else if c.ProposalExists {
fmt.Printf(" Pending proposal already exists → SKIP\n")
} else {
fmt.Printf(" NOT COVERED → proposal candidate\n")
}
fmt.Println()
}
}
func printSequenceReportJSON(allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) error {
out := map[string]any{
"window_secs": cfg.WindowSecs,
"lookback_days": cfg.LookbackDays,
"min_occurrences": cfg.MinOccurrences,
"min_sessions": cfg.MinSessions,
"min_success_rate": cfg.MinSuccessRate,
"sequences_total": len(allResults),
"sequences_length_2": countByLen(allResults, 2),
"sequences_length_3": countByLen(allResults, 3),
"candidates_total": len(candidates),
"candidates": candidates,
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
return enc.Encode(out)
}
// ---- DB helpers ----
func upsertSequences(conn *sql.DB, statsMap map[string]*seqStats) error {
tx, err := conn.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`
INSERT INTO function_sequences
(seq_hash, function_ids, length, occurrences, sessions_count, success_count, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(seq_hash) DO UPDATE SET
occurrences = occurrences + excluded.occurrences,
sessions_count = MAX(sessions_count, excluded.sessions_count),
success_count = success_count + excluded.success_count,
last_seen = MAX(last_seen, excluded.last_seen),
first_seen = MIN(first_seen, excluded.first_seen)`)
if err != nil {
_ = tx.Rollback()
return err
}
defer stmt.Close()
for _, s := range statsMap {
raw, _ := json.Marshal(s.Key.FunctionIDs)
_, err := stmt.Exec(
s.Key.Hash,
string(raw),
s.Key.Length,
s.Occurrences,
len(s.Sessions),
s.SuccessCount,
s.FirstSeen,
s.LastSeen,
)
if err != nil {
_ = tx.Rollback()
return err
}
}
return tx.Commit()
}
// ---- Pure helpers ----
func seqHash(ids []string) string {
key := strings.Join(ids, "|")
h := sha256.Sum256([]byte(key))
return fmt.Sprintf("%x", h)[:12]
}
func acc(m map[string]*seqStats, ids []string, sessID string, success bool, firstTS, lastTS int64) {
h := seqHash(ids)
s, ok := m[h]
if !ok {
cp := make([]string, len(ids))
copy(cp, ids)
s = &seqStats{
Key: seqKey{Hash: h, FunctionIDs: cp, Length: len(ids)},
Sessions: map[string]struct{}{},
}
m[h] = s
}
s.Occurrences++
s.Sessions[sessID] = struct{}{}
if success {
s.SuccessCount++
}
if s.FirstSeen == 0 || firstTS < s.FirstSeen {
s.FirstSeen = firstTS
}
if lastTS > s.LastSeen {
s.LastSeen = lastTS
}
}
// suggestPipelineName derives a pipeline name from function IDs by extracting
// meaningful tokens and joining them.
func suggestPipelineName(ids []string) string {
// Strip lang/domain suffix: "redeploy_cpp_app_windows_bash_infra" → "redeploy_cpp_app_windows"
// Strategy: remove last two underscore-separated tokens if they look like lang/domain.
langs := map[string]bool{"go": true, "py": true, "bash": true, "typescript": true}
domains := map[string]bool{
"core": true, "infra": true, "finance": true, "datascience": true,
"cybersecurity": true, "shell": true, "tui": true, "pipelines": true,
"browser": true, "notebook": true,
}
strip := func(id string) string {
parts := strings.Split(id, "_")
n := len(parts)
if n >= 2 && langs[parts[n-1]] {
parts = parts[:n-1]
n--
}
if n >= 2 && domains[parts[n-1]] {
parts = parts[:n-1]
}
return strings.Join(parts, "_")
}
tokens := make([]string, 0, len(ids))
seen := map[string]bool{}
for _, id := range ids {
base := strip(id)
// Take the first meaningful token (verb) from each.
parts := strings.SplitN(base, "_", 2)
verb := parts[0]
if !seen[verb] {
tokens = append(tokens, verb)
seen[verb] = true
}
if len(tokens) < 3 {
// Also add the full base but trimmed.
if !seen[base] {
seen[base] = true
}
}
}
// Build: first_second_third_bash_pipelines style.
// Fallback: just join stripped names with _and_.
stripped := make([]string, len(ids))
for i, id := range ids {
stripped[i] = strip(id)
}
return strings.Join(stripped, "_then_") + "_bash_pipelines"
}
func joinArrow(ids []string) string {
return strings.Join(ids, " → ")
}
func humanAgo(ts int64) string {
diff := time.Now().Unix() - ts
if diff < 60 {
return fmt.Sprintf("%ds ago", diff)
}
if diff < 3600 {
return fmt.Sprintf("%dm ago", diff/60)
}
if diff < 86400 {
return fmt.Sprintf("%dh ago", diff/3600)
}
return fmt.Sprintf("%dd ago", diff/86400)
}
func countByLen(results []SequenceResult, l int) int {
n := 0
for _, r := range results {
if r.Length == l {
n++
}
}
return n
}
func deterministicSeqID(seqHash string) string {
return "auto_seq_" + seqHash
}
// ---- CLI runner ----
func runSequences(
dbPath string,
detect bool,
report bool,
propose bool,
formatJSON bool,
rootOverride string,
cfg SequenceConfig,
) {
if !detect {
fmt.Fprintln(os.Stderr, "sequences: specify --detect to run detection")
os.Exit(2)
}
db, err := openDB(dbPath)
if err != nil {
fmt.Fprintf(os.Stderr, "error: open call_monitor db: %v\n", err)
os.Exit(1)
}
defer db.Close()
allResults, err := detectSequences(db, cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "error: detect sequences: %v\n", err)
os.Exit(1)
}
rawCandidates := filterCandidates(allResults, cfg)
root := resolveRegistryRoot(rootOverride)
var enriched []SequenceCandidate
if _, err := os.Stat(filepath.Join(root, "registry.db")); err == nil {
enriched, err = enrichCandidates(rawCandidates, root)
if err != nil {
// Non-fatal: report without enrichment.
fmt.Fprintf(os.Stderr, "warning: enrich candidates: %v\n", err)
for _, r := range rawCandidates {
enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)})
}
}
} else {
for _, r := range rawCandidates {
enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)})
}
}
if report || !propose {
if formatJSON {
if err := printSequenceReportJSON(allResults, enriched, cfg); err != nil {
fmt.Fprintf(os.Stderr, "error: json output: %v\n", err)
os.Exit(1)
}
} else {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
printSequenceReport(tw, allResults, enriched, cfg)
}
}
if propose {
if _, err := os.Stat(filepath.Join(root, "registry.db")); err != nil {
fmt.Fprintf(os.Stderr, "error: registry.db not found — cannot write proposals\n")
os.Exit(1)
}
drafts := buildProposalsFromCandidates(enriched)
if len(drafts) == 0 {
fmt.Println("sequences --propose: no new proposals to write (all candidates covered or already pending).")
return
}
inserted, total, err := infra.PersistProposalDrafts(root, drafts)
if err != nil {
fmt.Fprintf(os.Stderr, "error: persist proposals: %v\n", err)
os.Exit(1)
}
fmt.Printf("sequences --propose: %d new proposals inserted (out of %d candidates)\n", inserted, total)
}
}