chore: auto-commit (11 archivos)
- app.md - call_monitor - db.go - main.go - operations.db - operations.db-shm - operations.db-wal - migrations/006_function_sequences.sql - migrations/007_calls_command_snippet.sql - sequences.go - ... Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -116,6 +116,28 @@ CGO_ENABLED=1 go build -tags fts5 -o call_monitor .
|
||||
- 0085e..h: clusterizacion, proposals automaticas, gating
|
||||
- p95 en mean_duration_ms via percentile calc o ext.
|
||||
|
||||
## Automation (systemd user timer)
|
||||
|
||||
`sequences --detect --propose --report` corre cada 6h via systemd user timer (00:00, 06:00, 12:00, 18:00). Detecta secuencias A→B(→C) repetidas en `calls` y genera proposals `new_pipeline` en `registry.db` (idempotente — dedupea contra proposals existentes).
|
||||
|
||||
Unit files versionados en `systemd/` para sincronizar entre PCs. Activacion en un PC nuevo:
|
||||
|
||||
```bash
|
||||
cp systemd/call_monitor_sequences.* ~/.config/systemd/user/
|
||||
systemctl --user daemon-reload
|
||||
systemctl --user enable --now call_monitor_sequences.timer
|
||||
```
|
||||
|
||||
Verificar:
|
||||
|
||||
```bash
|
||||
systemctl --user list-timers call_monitor_sequences.timer
|
||||
systemctl --user status call_monitor_sequences.service
|
||||
journalctl --user -u call_monitor_sequences.service -n 30 --no-pager
|
||||
```
|
||||
|
||||
Para correr manualmente fuera del schedule: `systemctl --user start call_monitor_sequences.service`.
|
||||
|
||||
## Notas
|
||||
|
||||
- BD vive **junto al binario** (`<exe_dir>/operations.db`) por defecto, no en el cwd del agente. Hook puede pasar `--db` explicito si conviene.
|
||||
|
||||
Binary file not shown.
@@ -33,7 +33,7 @@ type tableCount struct {
|
||||
}
|
||||
|
||||
func (d *DB) tableCounts() ([]tableCount, error) {
|
||||
tables := []string{"sessions", "calls", "code_writes", "test_runs", "e2e_runs_fn", "violations", "patterns", "function_versions", "copied_code"}
|
||||
tables := []string{"sessions", "calls", "code_writes", "test_runs", "e2e_runs_fn", "violations", "patterns", "function_versions", "copied_code", "function_sequences"}
|
||||
out := make([]tableCount, 0, len(tables))
|
||||
for _, t := range tables {
|
||||
var n int64
|
||||
|
||||
@@ -44,6 +44,24 @@ func main() {
|
||||
dry := fs.Bool("dry-run", false, "Generate drafts without persisting to registry.db.proposals.")
|
||||
fs.Parse(os.Args[2:])
|
||||
runPropose(*root, *dry)
|
||||
case "sequences":
|
||||
detect := fs.Bool("detect", false, "Run sequence detection (required).")
|
||||
report := fs.Bool("report", false, "Print human-readable report (implied when --propose is not set).")
|
||||
propose := fs.Bool("propose", false, "Write new_pipeline proposals to registry.db for uncovered candidates.")
|
||||
formatJSON := fs.Bool("format", false, "Output JSON instead of text (use with --report or --detect).")
|
||||
root := fs.String("root", "", "Path to fn_registry root (default: walk up from cwd).")
|
||||
minOcc := fs.Int("min-occurrences", 5, "Minimum occurrences to be a candidate.")
|
||||
windowSecs := fs.Int("window-secs", 30, "Max gap in seconds between consecutive calls in a sequence.")
|
||||
lookbackDays := fs.Int("lookback-days", 30, "How many days of calls to scan.")
|
||||
fs.Parse(os.Args[2:])
|
||||
cfg := SequenceConfig{
|
||||
WindowSecs: *windowSecs,
|
||||
LookbackDays: *lookbackDays,
|
||||
MinOccurrences: *minOcc,
|
||||
MinSessions: 2,
|
||||
MinSuccessRate: 0.9,
|
||||
}
|
||||
runSequences(resolveDB(*dbPath), *detect, *report, *propose, *formatJSON, *root, cfg)
|
||||
case "-h", "--help", "help":
|
||||
usage()
|
||||
default:
|
||||
@@ -64,6 +82,14 @@ SUBCOMANDOS:
|
||||
status Resumen: conteo de filas por tabla + top funciones por calls_total.
|
||||
snapshot Lee registry.db.functions y snapshotea (function_id, content_hash) en
|
||||
function_versions con source='index'. Idempotente: solo inserta nuevas tuplas.
|
||||
sequences Detecta secuencias A→B(→C) repetidas candidatas a pipeline one-shot.
|
||||
--detect Ejecutar deteccion (obligatorio).
|
||||
--report Imprimir reporte human-readable (default cuando no se usa --propose).
|
||||
--propose Escribir proposals new_pipeline a registry.db.
|
||||
--format Output JSON en vez de texto.
|
||||
--min-occurrences N Minimo de ocurrencias para ser candidata (default 5).
|
||||
--window-secs N Max gap en segundos entre calls consecutivas (default 30).
|
||||
--lookback-days N Dias de historial a escanear (default 30).
|
||||
|
||||
FLAGS GLOBALES:
|
||||
--db PATH Ruta a operations.db (default: ./operations.db junto al binario).
|
||||
@@ -73,7 +99,10 @@ EJEMPLOS:
|
||||
call_monitor init
|
||||
call_monitor status --top 20
|
||||
call_monitor snapshot
|
||||
call_monitor snapshot --registry /home/lucas/fn_registry/registry.db`)
|
||||
call_monitor snapshot --registry /home/lucas/fn_registry/registry.db
|
||||
call_monitor sequences --detect --report
|
||||
call_monitor sequences --detect --propose
|
||||
call_monitor sequences --detect --report --format --min-occurrences 3`)
|
||||
}
|
||||
|
||||
func resolveRegistryDB(override string) string {
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
-- function_sequences: secuencias A→B(→C) repetidas de calls del agente.
|
||||
-- Issue 0087 parte B. Candidatas a promoverse a pipeline one-shot.
|
||||
-- Aditivo. Idempotente. Aplicado via embed.FS al abrir operations.db.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS function_sequences (
|
||||
seq_hash TEXT PRIMARY KEY,
|
||||
function_ids TEXT NOT NULL, -- JSON array ["fn_a","fn_b"] o ["fn_a","fn_b","fn_c"]
|
||||
length INTEGER NOT NULL,
|
||||
occurrences INTEGER NOT NULL DEFAULT 0,
|
||||
sessions_count INTEGER NOT NULL DEFAULT 0,
|
||||
success_count INTEGER NOT NULL DEFAULT 0,
|
||||
last_seen INTEGER NOT NULL,
|
||||
first_seen INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_function_sequences_occurrences ON function_sequences(occurrences);
|
||||
CREATE INDEX IF NOT EXISTS idx_function_sequences_last_seen ON function_sequences(last_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_function_sequences_length ON function_sequences(length);
|
||||
@@ -0,0 +1,13 @@
|
||||
-- 007_calls_command_snippet.sql — issue 0087
|
||||
--
|
||||
-- Anade columna command_snippet a `calls` para guardar un fragmento del
|
||||
-- comando/heredoc original cuando NO golpea una funcion del registry
|
||||
-- (function_id == ''). Sirve al Monitor tab del registry_dashboard para
|
||||
-- mostrar "que codigo lanzo Claude" en filas que no son llamadas registry.
|
||||
--
|
||||
-- - Solo se rellena cuando function_id IS empty (politica del hook PostToolUse).
|
||||
-- - Truncado a 200 chars maximo. Redactado: args sensibles (paths privados,
|
||||
-- tokens, passwords) se reemplazan por <REDACTED> antes de persistir.
|
||||
-- - Aditivo, idempotente. SQLite < 3.35 no soporta DROP COLUMN, asi que
|
||||
-- esta columna se queda para siempre — perfecto, es nuestra intencion.
|
||||
ALTER TABLE calls ADD COLUMN command_snippet TEXT NOT NULL DEFAULT '';
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
+622
@@ -0,0 +1,622 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
// SequenceConfig holds tunable parameters for sequence detection.
|
||||
type SequenceConfig struct {
|
||||
WindowSecs int // max gap between consecutive calls in same sequence
|
||||
LookbackDays int // how far back to scan calls
|
||||
MinOccurrences int // minimum occurrences to be a candidate
|
||||
MinSessions int // minimum distinct sessions to be a candidate
|
||||
MinSuccessRate float64
|
||||
}
|
||||
|
||||
func defaultSequenceConfig() SequenceConfig {
|
||||
return SequenceConfig{
|
||||
WindowSecs: 30,
|
||||
LookbackDays: 30,
|
||||
MinOccurrences: 5,
|
||||
MinSessions: 2,
|
||||
MinSuccessRate: 0.9,
|
||||
}
|
||||
}
|
||||
|
||||
// callRow is a single row from the calls table.
|
||||
type callRow struct {
|
||||
ID int64
|
||||
SessionID string
|
||||
FunctionID string
|
||||
Success int
|
||||
TS int64
|
||||
}
|
||||
|
||||
// seqKey is the canonical identifier for a sequence.
|
||||
type seqKey struct {
|
||||
Hash string // sha256[:12] of joined function_ids
|
||||
FunctionIDs []string // ordered list
|
||||
Length int
|
||||
}
|
||||
|
||||
// seqStats accumulated across sessions.
|
||||
type seqStats struct {
|
||||
Key seqKey
|
||||
Occurrences int
|
||||
Sessions map[string]struct{} // distinct session_ids
|
||||
SuccessCount int // all calls in sequence succeeded
|
||||
FirstSeen int64
|
||||
LastSeen int64
|
||||
}
|
||||
|
||||
// SequenceResult is the final merged record for a sequence.
|
||||
type SequenceResult struct {
|
||||
SeqHash string
|
||||
FunctionIDs []string
|
||||
Length int
|
||||
Occurrences int
|
||||
SessionCount int
|
||||
SuccessCount int
|
||||
SuccessRate float64
|
||||
FirstSeen int64
|
||||
LastSeen int64
|
||||
}
|
||||
|
||||
// SequenceCandidate wraps SequenceResult with pipeline-promotion metadata.
|
||||
type SequenceCandidate struct {
|
||||
SequenceResult
|
||||
SuggestedName string
|
||||
CoveredBy string // ID of existing pipeline that covers it, or ""
|
||||
ProposalExists bool // true if a pending proposal already exists
|
||||
}
|
||||
|
||||
// detectSequences reads calls from operations.db, computes sequences, upserts
|
||||
// into function_sequences, and returns candidates for promotion.
|
||||
func detectSequences(db *DB, cfg SequenceConfig) ([]SequenceResult, error) {
|
||||
cutoff := time.Now().Unix() - int64(cfg.LookbackDays)*86400
|
||||
|
||||
rows, err := db.conn.Query(`
|
||||
SELECT id, session_id, function_id, success, ts
|
||||
FROM calls
|
||||
WHERE function_id != ''
|
||||
AND session_id != ''
|
||||
AND ts >= ?
|
||||
ORDER BY session_id, ts`, cutoff)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query calls: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var calls []callRow
|
||||
for rows.Next() {
|
||||
var c callRow
|
||||
if err := rows.Scan(&c.ID, &c.SessionID, &c.FunctionID, &c.Success, &c.TS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
calls = append(calls, c)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Group by session.
|
||||
bySession := map[string][]callRow{}
|
||||
for _, c := range calls {
|
||||
bySession[c.SessionID] = append(bySession[c.SessionID], c)
|
||||
}
|
||||
|
||||
// Accumulate sequence stats.
|
||||
statsMap := map[string]*seqStats{}
|
||||
|
||||
for sessID, sessionCalls := range bySession {
|
||||
// Slide a window of 2 and 3 over consecutive calls within the time gap.
|
||||
for i := 0; i < len(sessionCalls); i++ {
|
||||
// Try length 2.
|
||||
if i+1 < len(sessionCalls) {
|
||||
c0, c1 := sessionCalls[i], sessionCalls[i+1]
|
||||
if c1.TS-c0.TS <= int64(cfg.WindowSecs) {
|
||||
ids := []string{c0.FunctionID, c1.FunctionID}
|
||||
acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1, c0.TS, c1.TS)
|
||||
}
|
||||
}
|
||||
// Try length 3.
|
||||
if i+2 < len(sessionCalls) {
|
||||
c0, c1, c2 := sessionCalls[i], sessionCalls[i+1], sessionCalls[i+2]
|
||||
if c1.TS-c0.TS <= int64(cfg.WindowSecs) && c2.TS-c1.TS <= int64(cfg.WindowSecs) {
|
||||
ids := []string{c0.FunctionID, c1.FunctionID, c2.FunctionID}
|
||||
acc(statsMap, ids, sessID, c0.Success == 1 && c1.Success == 1 && c2.Success == 1, c0.TS, c2.TS)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert into function_sequences.
|
||||
if err := upsertSequences(db.conn, statsMap); err != nil {
|
||||
return nil, fmt.Errorf("upsert sequences: %w", err)
|
||||
}
|
||||
|
||||
// Return all detected sequences as SequenceResult.
|
||||
results := make([]SequenceResult, 0, len(statsMap))
|
||||
for _, s := range statsMap {
|
||||
rate := 0.0
|
||||
if s.Occurrences > 0 {
|
||||
rate = float64(s.SuccessCount) / float64(s.Occurrences)
|
||||
}
|
||||
results = append(results, SequenceResult{
|
||||
SeqHash: s.Key.Hash,
|
||||
FunctionIDs: s.Key.FunctionIDs,
|
||||
Length: s.Key.Length,
|
||||
Occurrences: s.Occurrences,
|
||||
SessionCount: len(s.Sessions),
|
||||
SuccessCount: s.SuccessCount,
|
||||
SuccessRate: rate,
|
||||
FirstSeen: s.FirstSeen,
|
||||
LastSeen: s.LastSeen,
|
||||
})
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// filterCandidates applies promotion criteria to the full result set.
|
||||
func filterCandidates(results []SequenceResult, cfg SequenceConfig) []SequenceResult {
|
||||
var out []SequenceResult
|
||||
for _, r := range results {
|
||||
if r.Occurrences >= cfg.MinOccurrences &&
|
||||
r.SessionCount >= cfg.MinSessions &&
|
||||
r.SuccessRate >= cfg.MinSuccessRate {
|
||||
out = append(out, r)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// enrichCandidates looks up registry.db to check if any existing pipeline
|
||||
// covers the sequence and whether a pending proposal already exists.
|
||||
func enrichCandidates(candidates []SequenceResult, registryRoot string) ([]SequenceCandidate, error) {
|
||||
regPath := filepath.Join(registryRoot, "registry.db")
|
||||
conn, err := sql.Open("sqlite3", regPath+"?_journal_mode=WAL")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open registry: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Load pipelines with their uses_functions.
|
||||
type pipe struct {
|
||||
ID string
|
||||
UsesFunctions []string
|
||||
}
|
||||
prows, err := conn.Query("SELECT id, uses_functions FROM functions WHERE kind='pipeline' AND uses_functions != '[]'")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query pipelines: %w", err)
|
||||
}
|
||||
var pipelines []pipe
|
||||
for prows.Next() {
|
||||
var id, raw string
|
||||
if err := prows.Scan(&id, &raw); err != nil {
|
||||
continue
|
||||
}
|
||||
var ids []string
|
||||
_ = json.Unmarshal([]byte(raw), &ids)
|
||||
pipelines = append(pipelines, pipe{id, ids})
|
||||
}
|
||||
prows.Close()
|
||||
|
||||
// Load existing pending proposals for seq_hash.
|
||||
pendingSeqs := map[string]struct{}{}
|
||||
propRows, err := conn.Query("SELECT evidence FROM proposals WHERE status='pending' AND kind='new_pipeline'")
|
||||
if err == nil {
|
||||
for propRows.Next() {
|
||||
var rawEvidence string
|
||||
if err := propRows.Scan(&rawEvidence); err != nil {
|
||||
continue
|
||||
}
|
||||
var ev map[string]any
|
||||
if err := json.Unmarshal([]byte(rawEvidence), &ev); err != nil {
|
||||
continue
|
||||
}
|
||||
if h, ok := ev["sequence_hash"].(string); ok {
|
||||
pendingSeqs[h] = struct{}{}
|
||||
}
|
||||
}
|
||||
propRows.Close()
|
||||
}
|
||||
|
||||
out := make([]SequenceCandidate, 0, len(candidates))
|
||||
for _, c := range candidates {
|
||||
cand := SequenceCandidate{
|
||||
SequenceResult: c,
|
||||
SuggestedName: suggestPipelineName(c.FunctionIDs),
|
||||
}
|
||||
|
||||
// Check coverage: a pipeline covers the sequence if its uses_functions
|
||||
// contains all sequence function_ids as a subset.
|
||||
seqSet := map[string]struct{}{}
|
||||
for _, id := range c.FunctionIDs {
|
||||
seqSet[id] = struct{}{}
|
||||
}
|
||||
for _, p := range pipelines {
|
||||
pipeSet := map[string]struct{}{}
|
||||
for _, id := range p.UsesFunctions {
|
||||
pipeSet[id] = struct{}{}
|
||||
}
|
||||
covered := true
|
||||
for id := range seqSet {
|
||||
if _, ok := pipeSet[id]; !ok {
|
||||
covered = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if covered {
|
||||
cand.CoveredBy = p.ID
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := pendingSeqs[c.SeqHash]; ok {
|
||||
cand.ProposalExists = true
|
||||
}
|
||||
|
||||
out = append(out, cand)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// buildProposalsFromCandidates creates infra.ProposalDraft entries for uncovered,
|
||||
// non-duplicate candidates.
|
||||
func buildProposalsFromCandidates(candidates []SequenceCandidate) []infra.ProposalDraft {
|
||||
var drafts []infra.ProposalDraft
|
||||
for _, c := range candidates {
|
||||
if c.CoveredBy != "" || c.ProposalExists {
|
||||
continue
|
||||
}
|
||||
exampleSessions := []string{} // sessions not available at this point; seq_hash is enough
|
||||
evidence := map[string]any{
|
||||
"rule_id": "sequence_promote",
|
||||
"sequence_hash": c.SeqHash,
|
||||
"function_ids": c.FunctionIDs,
|
||||
"occurrences": c.Occurrences,
|
||||
"sessions_count": c.SessionCount,
|
||||
"success_rate": fmt.Sprintf("%.2f", c.SuccessRate),
|
||||
"last_seen": c.LastSeen,
|
||||
"first_seen": c.FirstSeen,
|
||||
"session_ids": exampleSessions,
|
||||
"lookback_days": 30,
|
||||
}
|
||||
title := fmt.Sprintf("Promote sequence %s to pipeline", joinArrow(c.FunctionIDs))
|
||||
desc := fmt.Sprintf(
|
||||
"Sequence `%s` appeared %d times across %d sessions "+
|
||||
"(success rate %.0f%%). Promote to pipeline `%s` to make it one-shot.",
|
||||
joinArrow(c.FunctionIDs), c.Occurrences, c.SessionCount, c.SuccessRate*100, c.SuggestedName,
|
||||
)
|
||||
drafts = append(drafts, infra.ProposalDraft{
|
||||
ID: deterministicSeqID(c.SeqHash),
|
||||
Kind: "new_pipeline",
|
||||
TargetID: "",
|
||||
Title: title,
|
||||
Description: desc,
|
||||
RuleID: "sequence_promote",
|
||||
Evidence: evidence,
|
||||
})
|
||||
}
|
||||
return drafts
|
||||
}
|
||||
|
||||
// ---- Output helpers ----
|
||||
|
||||
func printSequenceReport(w *tabwriter.Writer, allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) {
|
||||
len2 := 0
|
||||
len3 := 0
|
||||
for _, r := range allResults {
|
||||
if r.Length == 2 {
|
||||
len2++
|
||||
} else {
|
||||
len3++
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "SEQUENCE DETECTION — window %ds, lookback %dd\n", cfg.WindowSecs, cfg.LookbackDays)
|
||||
fmt.Fprintln(w, "─────────────────────────────────────────────────")
|
||||
fmt.Fprintf(w, "Sequences found:\t%d\n", len(allResults))
|
||||
fmt.Fprintf(w, " Length 2:\t%d\n", len2)
|
||||
fmt.Fprintf(w, " Length 3:\t%d\n", len3)
|
||||
fmt.Fprintf(w, "Promotion candidates:\t%d\n", len(candidates))
|
||||
fmt.Fprintf(w, " (occ >= %d, sess >= %d, success_rate >= %.0f%%)\n",
|
||||
cfg.MinOccurrences, cfg.MinSessions, cfg.MinSuccessRate*100)
|
||||
w.Flush()
|
||||
|
||||
if len(candidates) == 0 {
|
||||
fmt.Println("\nNo promotion candidates yet.")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("\nCANDIDATES:")
|
||||
for i, c := range candidates {
|
||||
ago := humanAgo(c.LastSeen)
|
||||
fmt.Printf("[%d] occ=%d sess=%d success=%.0f%% last=%s\n",
|
||||
i+1, c.Occurrences, c.SessionCount, c.SuccessRate*100, ago)
|
||||
for j, id := range c.FunctionIDs {
|
||||
if j == 0 {
|
||||
fmt.Printf(" %s\n", id)
|
||||
} else {
|
||||
fmt.Printf(" → %s\n", id)
|
||||
}
|
||||
}
|
||||
fmt.Printf(" Suggested pipeline name: %s\n", c.SuggestedName)
|
||||
if c.CoveredBy != "" {
|
||||
fmt.Printf(" Already covered by: %s ✓ SKIP\n", c.CoveredBy)
|
||||
} else if c.ProposalExists {
|
||||
fmt.Printf(" Pending proposal already exists → SKIP\n")
|
||||
} else {
|
||||
fmt.Printf(" NOT COVERED → proposal candidate\n")
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
}
|
||||
|
||||
func printSequenceReportJSON(allResults []SequenceResult, candidates []SequenceCandidate, cfg SequenceConfig) error {
|
||||
out := map[string]any{
|
||||
"window_secs": cfg.WindowSecs,
|
||||
"lookback_days": cfg.LookbackDays,
|
||||
"min_occurrences": cfg.MinOccurrences,
|
||||
"min_sessions": cfg.MinSessions,
|
||||
"min_success_rate": cfg.MinSuccessRate,
|
||||
"sequences_total": len(allResults),
|
||||
"sequences_length_2": countByLen(allResults, 2),
|
||||
"sequences_length_3": countByLen(allResults, 3),
|
||||
"candidates_total": len(candidates),
|
||||
"candidates": candidates,
|
||||
}
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(out)
|
||||
}
|
||||
|
||||
// ---- DB helpers ----
|
||||
|
||||
func upsertSequences(conn *sql.DB, statsMap map[string]*seqStats) error {
|
||||
tx, err := conn.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stmt, err := tx.Prepare(`
|
||||
INSERT INTO function_sequences
|
||||
(seq_hash, function_ids, length, occurrences, sessions_count, success_count, first_seen, last_seen)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(seq_hash) DO UPDATE SET
|
||||
occurrences = occurrences + excluded.occurrences,
|
||||
sessions_count = MAX(sessions_count, excluded.sessions_count),
|
||||
success_count = success_count + excluded.success_count,
|
||||
last_seen = MAX(last_seen, excluded.last_seen),
|
||||
first_seen = MIN(first_seen, excluded.first_seen)`)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
for _, s := range statsMap {
|
||||
raw, _ := json.Marshal(s.Key.FunctionIDs)
|
||||
_, err := stmt.Exec(
|
||||
s.Key.Hash,
|
||||
string(raw),
|
||||
s.Key.Length,
|
||||
s.Occurrences,
|
||||
len(s.Sessions),
|
||||
s.SuccessCount,
|
||||
s.FirstSeen,
|
||||
s.LastSeen,
|
||||
)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// ---- Pure helpers ----
|
||||
|
||||
func seqHash(ids []string) string {
|
||||
key := strings.Join(ids, "|")
|
||||
h := sha256.Sum256([]byte(key))
|
||||
return fmt.Sprintf("%x", h)[:12]
|
||||
}
|
||||
|
||||
func acc(m map[string]*seqStats, ids []string, sessID string, success bool, firstTS, lastTS int64) {
|
||||
h := seqHash(ids)
|
||||
s, ok := m[h]
|
||||
if !ok {
|
||||
cp := make([]string, len(ids))
|
||||
copy(cp, ids)
|
||||
s = &seqStats{
|
||||
Key: seqKey{Hash: h, FunctionIDs: cp, Length: len(ids)},
|
||||
Sessions: map[string]struct{}{},
|
||||
}
|
||||
m[h] = s
|
||||
}
|
||||
s.Occurrences++
|
||||
s.Sessions[sessID] = struct{}{}
|
||||
if success {
|
||||
s.SuccessCount++
|
||||
}
|
||||
if s.FirstSeen == 0 || firstTS < s.FirstSeen {
|
||||
s.FirstSeen = firstTS
|
||||
}
|
||||
if lastTS > s.LastSeen {
|
||||
s.LastSeen = lastTS
|
||||
}
|
||||
}
|
||||
|
||||
// suggestPipelineName derives a pipeline name from function IDs by extracting
|
||||
// meaningful tokens and joining them.
|
||||
func suggestPipelineName(ids []string) string {
|
||||
// Strip lang/domain suffix: "redeploy_cpp_app_windows_bash_infra" → "redeploy_cpp_app_windows"
|
||||
// Strategy: remove last two underscore-separated tokens if they look like lang/domain.
|
||||
langs := map[string]bool{"go": true, "py": true, "bash": true, "typescript": true}
|
||||
domains := map[string]bool{
|
||||
"core": true, "infra": true, "finance": true, "datascience": true,
|
||||
"cybersecurity": true, "shell": true, "tui": true, "pipelines": true,
|
||||
"browser": true, "notebook": true,
|
||||
}
|
||||
|
||||
strip := func(id string) string {
|
||||
parts := strings.Split(id, "_")
|
||||
n := len(parts)
|
||||
if n >= 2 && langs[parts[n-1]] {
|
||||
parts = parts[:n-1]
|
||||
n--
|
||||
}
|
||||
if n >= 2 && domains[parts[n-1]] {
|
||||
parts = parts[:n-1]
|
||||
}
|
||||
return strings.Join(parts, "_")
|
||||
}
|
||||
|
||||
tokens := make([]string, 0, len(ids))
|
||||
seen := map[string]bool{}
|
||||
for _, id := range ids {
|
||||
base := strip(id)
|
||||
// Take the first meaningful token (verb) from each.
|
||||
parts := strings.SplitN(base, "_", 2)
|
||||
verb := parts[0]
|
||||
if !seen[verb] {
|
||||
tokens = append(tokens, verb)
|
||||
seen[verb] = true
|
||||
}
|
||||
if len(tokens) < 3 {
|
||||
// Also add the full base but trimmed.
|
||||
if !seen[base] {
|
||||
seen[base] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// Build: first_second_third_bash_pipelines style.
|
||||
// Fallback: just join stripped names with _and_.
|
||||
stripped := make([]string, len(ids))
|
||||
for i, id := range ids {
|
||||
stripped[i] = strip(id)
|
||||
}
|
||||
return strings.Join(stripped, "_then_") + "_bash_pipelines"
|
||||
}
|
||||
|
||||
func joinArrow(ids []string) string {
|
||||
return strings.Join(ids, " → ")
|
||||
}
|
||||
|
||||
func humanAgo(ts int64) string {
|
||||
diff := time.Now().Unix() - ts
|
||||
if diff < 60 {
|
||||
return fmt.Sprintf("%ds ago", diff)
|
||||
}
|
||||
if diff < 3600 {
|
||||
return fmt.Sprintf("%dm ago", diff/60)
|
||||
}
|
||||
if diff < 86400 {
|
||||
return fmt.Sprintf("%dh ago", diff/3600)
|
||||
}
|
||||
return fmt.Sprintf("%dd ago", diff/86400)
|
||||
}
|
||||
|
||||
func countByLen(results []SequenceResult, l int) int {
|
||||
n := 0
|
||||
for _, r := range results {
|
||||
if r.Length == l {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func deterministicSeqID(seqHash string) string {
|
||||
return "auto_seq_" + seqHash
|
||||
}
|
||||
|
||||
// ---- CLI runner ----
|
||||
|
||||
func runSequences(
|
||||
dbPath string,
|
||||
detect bool,
|
||||
report bool,
|
||||
propose bool,
|
||||
formatJSON bool,
|
||||
rootOverride string,
|
||||
cfg SequenceConfig,
|
||||
) {
|
||||
if !detect {
|
||||
fmt.Fprintln(os.Stderr, "sequences: specify --detect to run detection")
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
db, err := openDB(dbPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error: open call_monitor db: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
allResults, err := detectSequences(db, cfg)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error: detect sequences: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
rawCandidates := filterCandidates(allResults, cfg)
|
||||
|
||||
root := resolveRegistryRoot(rootOverride)
|
||||
var enriched []SequenceCandidate
|
||||
|
||||
if _, err := os.Stat(filepath.Join(root, "registry.db")); err == nil {
|
||||
enriched, err = enrichCandidates(rawCandidates, root)
|
||||
if err != nil {
|
||||
// Non-fatal: report without enrichment.
|
||||
fmt.Fprintf(os.Stderr, "warning: enrich candidates: %v\n", err)
|
||||
for _, r := range rawCandidates {
|
||||
enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for _, r := range rawCandidates {
|
||||
enriched = append(enriched, SequenceCandidate{SequenceResult: r, SuggestedName: suggestPipelineName(r.FunctionIDs)})
|
||||
}
|
||||
}
|
||||
|
||||
if report || !propose {
|
||||
if formatJSON {
|
||||
if err := printSequenceReportJSON(allResults, enriched, cfg); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error: json output: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
} else {
|
||||
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||
printSequenceReport(tw, allResults, enriched, cfg)
|
||||
}
|
||||
}
|
||||
|
||||
if propose {
|
||||
if _, err := os.Stat(filepath.Join(root, "registry.db")); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error: registry.db not found — cannot write proposals\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
drafts := buildProposalsFromCandidates(enriched)
|
||||
if len(drafts) == 0 {
|
||||
fmt.Println("sequences --propose: no new proposals to write (all candidates covered or already pending).")
|
||||
return
|
||||
}
|
||||
inserted, total, err := infra.PersistProposalDrafts(root, drafts)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error: persist proposals: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("sequences --propose: %d new proposals inserted (out of %d candidates)\n", inserted, total)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
[Unit]
|
||||
Description=Detect repeated function-call sequences and propose pipelines (issue 0087)
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
WorkingDirectory=/home/lucas/fn_registry/projects/fn_monitoring/apps/call_monitor
|
||||
ExecStart=/home/lucas/fn_registry/projects/fn_monitoring/apps/call_monitor/call_monitor sequences --detect --propose --report
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
Environment="FN_REGISTRY_ROOT=/home/lucas/fn_registry"
|
||||
Environment="HOME=/home/lucas"
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
@@ -0,0 +1,10 @@
|
||||
[Unit]
|
||||
Description=Run call_monitor sequences detection every 6 hours (issue 0087)
|
||||
|
||||
[Timer]
|
||||
OnCalendar=*-*-* 00/6:00:00
|
||||
Persistent=true
|
||||
Unit=call_monitor_sequences.service
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
||||
Reference in New Issue
Block a user