533 lines
15 KiB
Go
533 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os/exec"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"fn-registry/functions/infra"
|
|
)
|
|
|
|
// Target is one (app, pc) combination derived from registry.db apps tagged
|
|
// service + service_targets.
|
|
type Target struct {
|
|
AppID string
|
|
Name string
|
|
Runtime string
|
|
Port int
|
|
HealthEndpoint string
|
|
HealthTimeoutS int
|
|
SystemdUnit string
|
|
SystemdScope string
|
|
IsLocalOnly bool
|
|
PCID string
|
|
}
|
|
|
|
// ServiceCheck is the result of one probe of a target.
|
|
type ServiceCheck struct {
|
|
AppID string
|
|
PCID string
|
|
SystemdState string
|
|
PortListening bool
|
|
HTTPStatus int
|
|
HTTPLatencyMs int
|
|
Error string
|
|
Overall string
|
|
}
|
|
|
|
// loadTargets queries registry.db for the cross product of services x pc_targets.
|
|
func loadTargets(registryRoot string) ([]Target, error) {
|
|
dbPath := registryRoot + "/registry.db"
|
|
db, err := sql.Open("sqlite3", dbPath+"?mode=ro&_journal_mode=WAL")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open registry.db: %w", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
rows, err := db.Query(`
|
|
SELECT a.id, a.name,
|
|
COALESCE(a.service_runtime,''),
|
|
COALESCE(a.service_port,0),
|
|
COALESCE(a.service_health_endpoint,''),
|
|
COALESCE(a.service_health_timeout_s,3),
|
|
COALESCE(a.service_systemd_unit,''),
|
|
COALESCE(a.service_systemd_scope,''),
|
|
COALESCE(a.service_is_local_only,0),
|
|
t.pc_id
|
|
FROM apps a
|
|
JOIN service_targets t ON t.app_id = a.id
|
|
WHERE a.tags LIKE '%service%'
|
|
ORDER BY a.name, t.pc_id
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query targets: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []Target
|
|
for rows.Next() {
|
|
var t Target
|
|
var localOnly int
|
|
if err := rows.Scan(
|
|
&t.AppID, &t.Name, &t.Runtime, &t.Port, &t.HealthEndpoint, &t.HealthTimeoutS,
|
|
&t.SystemdUnit, &t.SystemdScope, &localOnly, &t.PCID,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
t.IsLocalOnly = localOnly != 0
|
|
if t.HealthTimeoutS <= 0 {
|
|
t.HealthTimeoutS = 3
|
|
}
|
|
out = append(out, t)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// probeTarget probes one target and returns a ServiceCheck.
|
|
func probeTarget(t Target, selfPC string) ServiceCheck {
|
|
c := ServiceCheck{AppID: t.AppID, PCID: t.PCID}
|
|
|
|
// Local check only when the target lives on selfPC. Remote PCs always go
|
|
// over SSH; if SSH has no route the probe returns "no-route" — that is
|
|
// exactly what we want to surface "PC unreachable" upstream.
|
|
if t.PCID == selfPC {
|
|
probeLocal(&c, t)
|
|
} else {
|
|
probeRemote(&c, t)
|
|
}
|
|
c.Overall = computeOverall(t, c)
|
|
return c
|
|
}
|
|
|
|
func probeLocal(c *ServiceCheck, t Target) {
|
|
// systemd state.
|
|
if t.SystemdUnit != "" {
|
|
c.SystemdState = systemctlIsActiveLocal(t.SystemdUnit, t.SystemdScope)
|
|
} else {
|
|
c.SystemdState = "n/a"
|
|
}
|
|
|
|
// Port listening (TCP localhost).
|
|
if t.Port > 0 {
|
|
c.PortListening = portListeningLocal(t.Port, time.Duration(t.HealthTimeoutS)*time.Second)
|
|
}
|
|
|
|
// HTTP health.
|
|
if t.Port > 0 && t.HealthEndpoint != "" {
|
|
status, latency, err := httpProbe("127.0.0.1", t.Port, t.HealthEndpoint, time.Duration(t.HealthTimeoutS)*time.Second)
|
|
c.HTTPStatus = status
|
|
c.HTTPLatencyMs = latency
|
|
if err != nil {
|
|
c.Error = err.Error()
|
|
}
|
|
}
|
|
}
|
|
|
|
func probeRemote(c *ServiceCheck, t Target) {
|
|
conn := infra.SSHConn{Host: t.PCID}
|
|
|
|
// docker-compose runtime: state via `docker compose ls`, no systemctl.
|
|
if t.Runtime == "docker-compose" {
|
|
probeDockerComposeRemote(c, t, conn)
|
|
return
|
|
}
|
|
|
|
// systemd state via ssh.
|
|
if t.SystemdUnit != "" {
|
|
cmd := fmt.Sprintf("systemctl is-active %s", shellEscape(t.SystemdUnit))
|
|
if t.SystemdScope == "user" {
|
|
cmd = "systemctl --user is-active " + shellEscape(t.SystemdUnit)
|
|
}
|
|
stdout, stderr, code, err := infra.SSHExec(conn, cmd)
|
|
// SSH transport failure (ssh exit 255 = resolve/connect refused/auth)
|
|
// or local exec error → mark unreachable. systemctl exit codes 3 ("inactive"),
|
|
// 4 ("unit not found") are still semantic answers from a reachable host.
|
|
if err != nil || code == 255 || code < 0 {
|
|
c.SystemdState = "no-route"
|
|
detail := strings.TrimSpace(stderr)
|
|
if err != nil {
|
|
detail = err.Error()
|
|
}
|
|
c.Error = detail
|
|
return
|
|
}
|
|
// systemctl returns "inactive" exit 3 when unit is stopped OR missing.
|
|
// Distinguish via stderr (unit not found).
|
|
stderrLower := strings.ToLower(stderr)
|
|
if strings.Contains(stderrLower, "could not be found") ||
|
|
strings.Contains(stderrLower, "not-found") ||
|
|
strings.Contains(stderrLower, "not found") {
|
|
c.SystemdState = "not-installed"
|
|
return
|
|
}
|
|
c.SystemdState = strings.TrimSpace(stdout)
|
|
if c.SystemdState == "" {
|
|
c.SystemdState = "unknown"
|
|
}
|
|
} else {
|
|
c.SystemdState = "n/a"
|
|
}
|
|
|
|
if t.Port > 0 {
|
|
// Port listening via ss on remote.
|
|
cmd := fmt.Sprintf("ss -tln '( sport = :%d )' | tail -n +2 | head -n1", t.Port)
|
|
stdout, _, _, err := infra.SSHExec(conn, cmd)
|
|
if err == nil && strings.TrimSpace(stdout) != "" {
|
|
c.PortListening = true
|
|
}
|
|
|
|
// HTTP probe via curl on remote.
|
|
if t.HealthEndpoint != "" {
|
|
url := fmt.Sprintf("http://127.0.0.1:%d%s", t.Port, t.HealthEndpoint)
|
|
fmtCmd := `curl -s -o /dev/null -w "%{http_code} %{time_total}" -m ` +
|
|
strconv.Itoa(t.HealthTimeoutS) + " " + shellEscape(url)
|
|
stdout, stderr, _, err := infra.SSHExec(conn, fmtCmd)
|
|
if err != nil {
|
|
c.Error = strings.TrimSpace(stderr)
|
|
return
|
|
}
|
|
parts := strings.Fields(strings.TrimSpace(stdout))
|
|
if len(parts) >= 1 {
|
|
if code, perr := strconv.Atoi(parts[0]); perr == nil {
|
|
c.HTTPStatus = code
|
|
}
|
|
}
|
|
if len(parts) >= 2 {
|
|
if secs, perr := strconv.ParseFloat(parts[1], 64); perr == nil {
|
|
c.HTTPLatencyMs = int(secs * 1000)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func computeOverall(t Target, c ServiceCheck) string {
|
|
if strings.HasPrefix(c.SystemdState, "no-route") {
|
|
return "no-route"
|
|
}
|
|
if c.SystemdState == "not-installed" {
|
|
return "not-installed"
|
|
}
|
|
|
|
// When the probe returned a systemd is-active style answer, route through
|
|
// the systemd branch regardless of the declared runtime. This covers the
|
|
// case where an app advertises `runtime: docker-compose` (because it lives
|
|
// on the primary VPS) but also runs as a bare systemd unit on a developer
|
|
// PC. The PC-local probe always asks systemctl; trust its output.
|
|
switch c.SystemdState {
|
|
case "active", "inactive", "failed", "activating", "deactivating", "reloading", "unknown", "n/a":
|
|
// fallthrough to systemd-style aggregation below
|
|
default:
|
|
if t.Runtime == "docker-compose" {
|
|
if c.SystemdState == "" {
|
|
return "unknown"
|
|
}
|
|
ran, exited, _ := parseComposeCounts(c.SystemdState)
|
|
if ran == 0 {
|
|
return "down"
|
|
}
|
|
if exited > 0 {
|
|
return "degraded"
|
|
}
|
|
if t.Port > 0 && t.HealthEndpoint != "" &&
|
|
(c.HTTPStatus > 0 && (c.HTTPStatus < 200 || c.HTTPStatus >= 400)) {
|
|
return "degraded"
|
|
}
|
|
return "ok"
|
|
}
|
|
}
|
|
|
|
// Systemd-runtime apps.
|
|
if strings.HasPrefix(t.Runtime, "systemd-") || t.Runtime == "stdio" || c.SystemdState == "active" || c.SystemdState == "inactive" || c.SystemdState == "failed" {
|
|
if c.SystemdState != "active" {
|
|
return "down"
|
|
}
|
|
if t.Port > 0 {
|
|
if !c.PortListening {
|
|
return "degraded"
|
|
}
|
|
if t.HealthEndpoint != "" && (c.HTTPStatus < 200 || c.HTTPStatus >= 400) {
|
|
return "degraded"
|
|
}
|
|
}
|
|
return "ok"
|
|
}
|
|
|
|
// manual / unknown runtime: lean on port + http only.
|
|
if t.Port > 0 {
|
|
if !c.PortListening {
|
|
return "down"
|
|
}
|
|
if t.HealthEndpoint != "" && (c.HTTPStatus < 200 || c.HTTPStatus >= 400) {
|
|
return "degraded"
|
|
}
|
|
return "ok"
|
|
}
|
|
|
|
return "unknown"
|
|
}
|
|
|
|
// parseComposeCounts parses a docker-compose `ls` status string like
|
|
// "running(10)", "running(8) | exited(2)" and returns (running, exited, total).
|
|
// Tolerant: unknown tokens map to 0.
|
|
func parseComposeCounts(s string) (running, exited, total int) {
|
|
parts := strings.Split(s, "|")
|
|
for _, p := range parts {
|
|
p = strings.TrimSpace(p)
|
|
// Format: kind(N)
|
|
open := strings.Index(p, "(")
|
|
closeIdx := strings.Index(p, ")")
|
|
if open == -1 || closeIdx == -1 || closeIdx < open {
|
|
continue
|
|
}
|
|
kind := strings.TrimSpace(p[:open])
|
|
n, err := strconv.Atoi(strings.TrimSpace(p[open+1 : closeIdx]))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
switch kind {
|
|
case "running":
|
|
running += n
|
|
case "exited", "stopped", "dead":
|
|
exited += n
|
|
}
|
|
total += n
|
|
}
|
|
return
|
|
}
|
|
|
|
// systemctlIsActiveLocal runs `systemctl [--user] is-active <unit>` and returns
|
|
// a normalized state. Distinguishes "not-installed" (unit file absent) from
|
|
// "inactive" (unit present but stopped). Both are unhealthy but mean different
|
|
// things for the human: not-installed = forget systemctl, deploy the unit.
|
|
func systemctlIsActiveLocal(unit, scope string) string {
|
|
args := []string{"is-active", unit}
|
|
if scope == "user" {
|
|
args = append([]string{"--user"}, args...)
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
cmd := exec.CommandContext(ctx, "systemctl", args...)
|
|
var so, se strings.Builder
|
|
cmd.Stdout = &so
|
|
cmd.Stderr = &se
|
|
_ = cmd.Run()
|
|
state := strings.TrimSpace(so.String())
|
|
stderr := strings.ToLower(se.String())
|
|
if strings.Contains(stderr, "could not be found") ||
|
|
strings.Contains(stderr, "not-found") ||
|
|
strings.Contains(stderr, "not found") {
|
|
return "not-installed"
|
|
}
|
|
if state == "" {
|
|
return "unknown"
|
|
}
|
|
return state
|
|
}
|
|
|
|
func portListeningLocal(port int, timeout time.Duration) bool {
|
|
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), timeout)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
conn.Close()
|
|
return true
|
|
}
|
|
|
|
func httpProbe(host string, port int, path string, timeout time.Duration) (int, int, error) {
|
|
client := &http.Client{Timeout: timeout}
|
|
url := fmt.Sprintf("http://%s:%d%s", host, port, path)
|
|
start := time.Now()
|
|
resp, err := client.Get(url)
|
|
latencyMs := int(time.Since(start) / time.Millisecond)
|
|
if err != nil {
|
|
return 0, latencyMs, err
|
|
}
|
|
defer resp.Body.Close()
|
|
return resp.StatusCode, latencyMs, nil
|
|
}
|
|
|
|
// probeDockerComposeRemote queries `docker compose ls --format json` over SSH
|
|
// and maps the running/expected ratio into a synthetic systemd_state-like value
|
|
// + port/HTTP probes already declared.
|
|
//
|
|
// Project name defaults to t.Name (the app's frontmatter `name`). Override
|
|
// later via a `docker_compose_project` field if needed.
|
|
func probeDockerComposeRemote(c *ServiceCheck, t Target, conn infra.SSHConn) {
|
|
project := t.Name
|
|
|
|
// `docker compose ls --format json` returns array of {Name, Status, ConfigFiles}.
|
|
// Status string examples: "running(10)", "running(8) | exited(2)", "exited(5)".
|
|
cmd := "docker compose ls --all --format json"
|
|
stdout, stderr, code, err := infra.SSHExec(conn, cmd)
|
|
if err != nil || code != 0 {
|
|
c.SystemdState = "no-route"
|
|
detail := strings.TrimSpace(stderr)
|
|
if err != nil {
|
|
detail = err.Error()
|
|
}
|
|
c.Error = detail
|
|
return
|
|
}
|
|
|
|
// Tolerant parse: find the JSON object whose "Name" matches the project.
|
|
// Try variants: the app's name, hyphenized, and slashes-stripped.
|
|
candidates := []string{project, strings.ReplaceAll(project, "_", "-")}
|
|
var status string
|
|
var found bool
|
|
var matched string
|
|
for _, cand := range candidates {
|
|
if cand == "" {
|
|
continue
|
|
}
|
|
if s, ok := extractComposeStatus(stdout, cand); ok {
|
|
status, found, matched = s, true, cand
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
c.SystemdState = "not-installed"
|
|
c.Error = "no docker compose project named " + project +
|
|
" (tried " + strings.Join(candidates, ", ") + ")"
|
|
return
|
|
}
|
|
_ = matched // reserved for future telemetry
|
|
|
|
c.SystemdState = status
|
|
if t.Port > 0 {
|
|
probeRemotePort(c, t, conn)
|
|
}
|
|
}
|
|
|
|
// extractComposeStatus parses the JSON array emitted by
|
|
// `docker compose ls --format json` and returns the Status field of the entry
|
|
// with Name == project. Tolerant to whitespace and field ordering.
|
|
func extractComposeStatus(jsonBlob, project string) (string, bool) {
|
|
// Naive but reliable: find `"Name":"<project>"` and the nearest
|
|
// `"Status":"..."` field within the same object.
|
|
needle := `"Name":"` + project + `"`
|
|
idx := strings.Index(jsonBlob, needle)
|
|
if idx == -1 {
|
|
// Try alternative: "Name": "project" with space.
|
|
needle2 := `"Name": "` + project + `"`
|
|
idx = strings.Index(jsonBlob, needle2)
|
|
if idx == -1 {
|
|
return "", false
|
|
}
|
|
}
|
|
// Bound the search window to the surrounding object.
|
|
open := strings.LastIndex(jsonBlob[:idx], "{")
|
|
close := strings.Index(jsonBlob[idx:], "}")
|
|
if open == -1 || close == -1 {
|
|
return "", false
|
|
}
|
|
obj := jsonBlob[open : idx+close+1]
|
|
// Find "Status":"..."
|
|
const stKey = `"Status":`
|
|
si := strings.Index(obj, stKey)
|
|
if si == -1 {
|
|
return "", false
|
|
}
|
|
// Skip whitespace and opening quote.
|
|
rest := obj[si+len(stKey):]
|
|
rest = strings.TrimLeft(rest, " \t")
|
|
if !strings.HasPrefix(rest, `"`) {
|
|
return "", false
|
|
}
|
|
rest = rest[1:]
|
|
end := strings.Index(rest, `"`)
|
|
if end == -1 {
|
|
return "", false
|
|
}
|
|
return rest[:end], true
|
|
}
|
|
|
|
// probeRemotePort runs the port + http probes when a docker compose project
|
|
// declares a port that should be reachable on the remote host's loopback.
|
|
func probeRemotePort(c *ServiceCheck, t Target, conn infra.SSHConn) {
|
|
cmd := fmt.Sprintf("ss -tln '( sport = :%d )' | tail -n +2 | head -n1", t.Port)
|
|
stdout, _, _, err := infra.SSHExec(conn, cmd)
|
|
if err == nil && strings.TrimSpace(stdout) != "" {
|
|
c.PortListening = true
|
|
}
|
|
if t.HealthEndpoint == "" {
|
|
return
|
|
}
|
|
url := fmt.Sprintf("http://127.0.0.1:%d%s", t.Port, t.HealthEndpoint)
|
|
fmtCmd := `curl -s -o /dev/null -w "%{http_code} %{time_total}" -m ` +
|
|
strconv.Itoa(t.HealthTimeoutS) + " " + shellEscape(url)
|
|
stdout, stderr, _, err := infra.SSHExec(conn, fmtCmd)
|
|
if err != nil {
|
|
c.Error = strings.TrimSpace(stderr)
|
|
return
|
|
}
|
|
parts := strings.Fields(strings.TrimSpace(stdout))
|
|
if len(parts) >= 1 {
|
|
if code, perr := strconv.Atoi(parts[0]); perr == nil {
|
|
c.HTTPStatus = code
|
|
}
|
|
}
|
|
if len(parts) >= 2 {
|
|
if secs, perr := strconv.ParseFloat(parts[1], 64); perr == nil {
|
|
c.HTTPLatencyMs = int(secs * 1000)
|
|
}
|
|
}
|
|
}
|
|
|
|
// restartTarget runs `systemctl [--user] restart <unit>` locally or via SSH
|
|
// depending on whether the target lives on selfPC or a remote PC.
|
|
//
|
|
// Returns (stdout, stderr, exit_code, transport_error).
|
|
// - transport_error != nil when ssh / exec failed to even run (DNS, network).
|
|
// In that case exit_code is -1.
|
|
// - exit_code != 0 when the command ran but systemctl rejected the request
|
|
// (unit not found, permission denied, etc.). transport_error is nil.
|
|
func restartTarget(t Target, selfPC string) (string, string, int, error) {
|
|
if t.SystemdUnit == "" {
|
|
return "", "no systemd_unit declared in app.md `service:` block", -1,
|
|
fmt.Errorf("no systemd_unit")
|
|
}
|
|
|
|
if t.PCID == selfPC || t.IsLocalOnly {
|
|
args := []string{"restart", t.SystemdUnit}
|
|
if t.SystemdScope == "user" {
|
|
args = append([]string{"--user"}, args...)
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
cmd := exec.CommandContext(ctx, "systemctl", args...)
|
|
var so, se strings.Builder
|
|
cmd.Stdout = &so
|
|
cmd.Stderr = &se
|
|
err := cmd.Run()
|
|
code := 0
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
code = exitErr.ExitCode()
|
|
err = nil
|
|
} else if err != nil {
|
|
return so.String(), se.String(), -1, err
|
|
}
|
|
return so.String(), se.String(), code, nil
|
|
}
|
|
|
|
// Remote via SSH.
|
|
conn := infra.SSHConn{Host: t.PCID}
|
|
cmd := fmt.Sprintf("systemctl restart %s", shellEscape(t.SystemdUnit))
|
|
if t.SystemdScope == "user" {
|
|
cmd = "systemctl --user restart " + shellEscape(t.SystemdUnit)
|
|
}
|
|
stdout, stderr, code, err := infra.SSHExec(conn, cmd)
|
|
return stdout, stderr, code, err
|
|
}
|
|
|
|
func shellEscape(s string) string {
|
|
// Conservative single-quote escape for arguments embedded in remote commands.
|
|
return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'"
|
|
}
|