chore: sync from fn-registry agent
This commit is contained in:
@@ -0,0 +1,145 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"embed"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
//go:embed migrations/*.sql
|
||||
var migrationsFS embed.FS
|
||||
|
||||
// openOpsDB opens the operations.db and applies pending migrations.
|
||||
func openOpsDB(path string) (*sql.DB, error) {
|
||||
db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open ops db: %w", err)
|
||||
}
|
||||
if err := infra.ApplyVersionedMigrations(db, migrationsFS, "migrations"); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("apply migrations: %w", err)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// upsertState writes the latest check into service_state. When `overall` changed
|
||||
// vs previous row, also appends a row to service_transition for audit.
|
||||
func upsertState(db *sql.DB, c ServiceCheck) error {
|
||||
now := time.Now().Unix()
|
||||
|
||||
var prevOverall string
|
||||
row := db.QueryRow(
|
||||
"SELECT overall FROM service_state WHERE app_id = ? AND pc_id = ?",
|
||||
c.AppID, c.PCID,
|
||||
)
|
||||
if err := row.Scan(&prevOverall); err != nil && err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
|
||||
changeTS := now
|
||||
if prevOverall == c.Overall {
|
||||
// Preserve last_change_ts.
|
||||
var oldChange int64
|
||||
_ = db.QueryRow(
|
||||
"SELECT last_change_ts FROM service_state WHERE app_id = ? AND pc_id = ?",
|
||||
c.AppID, c.PCID,
|
||||
).Scan(&oldChange)
|
||||
if oldChange > 0 {
|
||||
changeTS = oldChange
|
||||
}
|
||||
}
|
||||
|
||||
listening := 0
|
||||
if c.PortListening {
|
||||
listening = 1
|
||||
}
|
||||
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO service_state
|
||||
(app_id, pc_id, systemd_state, port_listening, http_status, http_latency_ms,
|
||||
last_check_ts, last_change_ts, last_error, overall)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(app_id, pc_id) DO UPDATE SET
|
||||
systemd_state = excluded.systemd_state,
|
||||
port_listening = excluded.port_listening,
|
||||
http_status = excluded.http_status,
|
||||
http_latency_ms = excluded.http_latency_ms,
|
||||
last_check_ts = excluded.last_check_ts,
|
||||
last_change_ts = excluded.last_change_ts,
|
||||
last_error = excluded.last_error,
|
||||
overall = excluded.overall
|
||||
`, c.AppID, c.PCID, c.SystemdState, listening, c.HTTPStatus, c.HTTPLatencyMs,
|
||||
now, changeTS, c.Error, c.Overall)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upsert state: %w", err)
|
||||
}
|
||||
|
||||
if prevOverall != "" && prevOverall != c.Overall {
|
||||
_, err = db.Exec(`
|
||||
INSERT INTO service_transition (ts, app_id, pc_id, from_state, to_state, detail)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
`, now, c.AppID, c.PCID, prevOverall, c.Overall, c.Error)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// StateRow is one row of /api/services output.
|
||||
type StateRow struct {
|
||||
AppID string `json:"app_id"`
|
||||
AppName string `json:"app_name"`
|
||||
PCID string `json:"pc_id"`
|
||||
IsSelf bool `json:"is_self"`
|
||||
Reachable bool `json:"reachable"`
|
||||
Runtime string `json:"runtime"`
|
||||
Port int `json:"port"`
|
||||
HealthEndpoint string `json:"health_endpoint"`
|
||||
SystemdUnit string `json:"systemd_unit"`
|
||||
SystemdScope string `json:"systemd_scope"`
|
||||
SystemdState string `json:"systemd_state"`
|
||||
PortListening bool `json:"port_listening"`
|
||||
HTTPStatus int `json:"http_status"`
|
||||
HTTPLatencyMs int `json:"http_latency_ms"`
|
||||
LastCheckTS int64 `json:"last_check_ts"`
|
||||
LastChangeTS int64 `json:"last_change_ts"`
|
||||
LastError string `json:"last_error"`
|
||||
Overall string `json:"overall"`
|
||||
}
|
||||
|
||||
// loadStates returns the latest snapshot from service_state joined with the
|
||||
// declared service metadata from registry.db (already merged in memory by the
|
||||
// caller through the targetsCache).
|
||||
func loadStates(opsDB *sql.DB, targets []Target, selfPC string) ([]StateRow, error) {
|
||||
rows := make([]StateRow, 0, len(targets))
|
||||
for _, t := range targets {
|
||||
r := StateRow{
|
||||
AppID: t.AppID,
|
||||
AppName: t.Name,
|
||||
PCID: t.PCID,
|
||||
IsSelf: t.PCID == selfPC,
|
||||
Runtime: t.Runtime,
|
||||
Port: t.Port,
|
||||
HealthEndpoint: t.HealthEndpoint,
|
||||
SystemdUnit: t.SystemdUnit,
|
||||
SystemdScope: t.SystemdScope,
|
||||
Overall: "unknown",
|
||||
}
|
||||
var listening int
|
||||
err := opsDB.QueryRow(`
|
||||
SELECT systemd_state, port_listening, http_status, http_latency_ms,
|
||||
last_check_ts, last_change_ts, last_error, overall
|
||||
FROM service_state WHERE app_id = ? AND pc_id = ?
|
||||
`, t.AppID, t.PCID).Scan(
|
||||
&r.SystemdState, &listening, &r.HTTPStatus, &r.HTTPLatencyMs,
|
||||
&r.LastCheckTS, &r.LastChangeTS, &r.LastError, &r.Overall,
|
||||
)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return nil, err
|
||||
}
|
||||
r.PortListening = listening != 0
|
||||
rows = append(rows, r)
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
Reference in New Issue
Block a user