chore: sync from fn-registry agent
This commit is contained in:
+100
@@ -0,0 +1,100 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
// snapshotFromRegistry reads registry.db.functions and inserts one row in
|
||||
// function_versions per (function_id, content_hash) tuple with source='index'.
|
||||
// Duplicate rows (same hash for same function from the same source) are
|
||||
// silently ignored — so this can be re-run after every `fn index` to capture
|
||||
// only NEW versions.
|
||||
//
|
||||
// Returns (inserted_rows, total_seen, error).
|
||||
func snapshotFromRegistry(callDB *DB, registryPath string) (int, int, error) {
|
||||
rconn, err := infra.SQLiteOpen(registryPath, "")
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("open registry: %w", err)
|
||||
}
|
||||
defer rconn.Close()
|
||||
|
||||
rows, err := rconn.Query("SELECT id, content_hash, version FROM functions WHERE content_hash != ''")
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("query functions: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
now := time.Now().UTC().Unix()
|
||||
tx, err := callDB.conn.Begin()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
stmt, err := tx.Prepare(`INSERT OR IGNORE INTO function_versions
|
||||
(function_id, content_hash, version, snapped_at, source, lines_added, lines_removed)
|
||||
VALUES (?, ?, ?, ?, 'index', 0, 0)`)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, 0, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
inserted, seen := 0, 0
|
||||
for rows.Next() {
|
||||
var id, hash, version string
|
||||
if err := rows.Scan(&id, &hash, &version); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, 0, err
|
||||
}
|
||||
seen++
|
||||
res, err := stmt.Exec(id, hash, version, now)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, 0, err
|
||||
}
|
||||
if n, _ := res.RowsAffected(); n > 0 {
|
||||
inserted++
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, 0, err
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
return inserted, seen, nil
|
||||
}
|
||||
|
||||
// versionsSummary returns aggregate stats per function_id from function_versions.
|
||||
func (d *DB) versionsSummary(limit int) ([]funcVersionSummary, error) {
|
||||
rows, err := d.conn.Query(`
|
||||
SELECT function_id,
|
||||
COUNT(DISTINCT content_hash) AS versions,
|
||||
MAX(snapped_at) AS last_snapped_at
|
||||
FROM function_versions
|
||||
GROUP BY function_id
|
||||
ORDER BY versions DESC, last_snapped_at DESC
|
||||
LIMIT ?`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []funcVersionSummary
|
||||
for rows.Next() {
|
||||
var s funcVersionSummary
|
||||
if err := rows.Scan(&s.FunctionID, &s.Versions, &s.LastSnappedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
type funcVersionSummary struct {
|
||||
FunctionID string
|
||||
Versions int64
|
||||
LastSnappedAt int64
|
||||
}
|
||||
Reference in New Issue
Block a user