Files
services_api/check.go
T
2026-05-19 00:31:23 +02:00

533 lines
15 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"net"
"net/http"
"os/exec"
"strconv"
"strings"
"time"
"fn-registry/functions/infra"
)
// Target is one (app, pc) combination derived from registry.db apps tagged
// service + service_targets.
type Target struct {
AppID string
Name string
Runtime string
Port int
HealthEndpoint string
HealthTimeoutS int
SystemdUnit string
SystemdScope string
IsLocalOnly bool
PCID string
}
// ServiceCheck is the result of one probe of a target.
type ServiceCheck struct {
AppID string
PCID string
SystemdState string
PortListening bool
HTTPStatus int
HTTPLatencyMs int
Error string
Overall string
}
// loadTargets queries registry.db for the cross product of services x pc_targets.
func loadTargets(registryRoot string) ([]Target, error) {
dbPath := registryRoot + "/registry.db"
db, err := sql.Open("sqlite3", dbPath+"?mode=ro&_journal_mode=WAL")
if err != nil {
return nil, fmt.Errorf("open registry.db: %w", err)
}
defer db.Close()
rows, err := db.Query(`
SELECT a.id, a.name,
COALESCE(a.service_runtime,''),
COALESCE(a.service_port,0),
COALESCE(a.service_health_endpoint,''),
COALESCE(a.service_health_timeout_s,3),
COALESCE(a.service_systemd_unit,''),
COALESCE(a.service_systemd_scope,''),
COALESCE(a.service_is_local_only,0),
t.pc_id
FROM apps a
JOIN service_targets t ON t.app_id = a.id
WHERE a.tags LIKE '%service%'
ORDER BY a.name, t.pc_id
`)
if err != nil {
return nil, fmt.Errorf("query targets: %w", err)
}
defer rows.Close()
var out []Target
for rows.Next() {
var t Target
var localOnly int
if err := rows.Scan(
&t.AppID, &t.Name, &t.Runtime, &t.Port, &t.HealthEndpoint, &t.HealthTimeoutS,
&t.SystemdUnit, &t.SystemdScope, &localOnly, &t.PCID,
); err != nil {
return nil, err
}
t.IsLocalOnly = localOnly != 0
if t.HealthTimeoutS <= 0 {
t.HealthTimeoutS = 3
}
out = append(out, t)
}
return out, rows.Err()
}
// probeTarget probes one target and returns a ServiceCheck.
func probeTarget(t Target, selfPC string) ServiceCheck {
c := ServiceCheck{AppID: t.AppID, PCID: t.PCID}
// Local check only when the target lives on selfPC. Remote PCs always go
// over SSH; if SSH has no route the probe returns "no-route" — that is
// exactly what we want to surface "PC unreachable" upstream.
if t.PCID == selfPC {
probeLocal(&c, t)
} else {
probeRemote(&c, t)
}
c.Overall = computeOverall(t, c)
return c
}
func probeLocal(c *ServiceCheck, t Target) {
// systemd state.
if t.SystemdUnit != "" {
c.SystemdState = systemctlIsActiveLocal(t.SystemdUnit, t.SystemdScope)
} else {
c.SystemdState = "n/a"
}
// Port listening (TCP localhost).
if t.Port > 0 {
c.PortListening = portListeningLocal(t.Port, time.Duration(t.HealthTimeoutS)*time.Second)
}
// HTTP health.
if t.Port > 0 && t.HealthEndpoint != "" {
status, latency, err := httpProbe("127.0.0.1", t.Port, t.HealthEndpoint, time.Duration(t.HealthTimeoutS)*time.Second)
c.HTTPStatus = status
c.HTTPLatencyMs = latency
if err != nil {
c.Error = err.Error()
}
}
}
func probeRemote(c *ServiceCheck, t Target) {
conn := infra.SSHConn{Host: t.PCID}
// docker-compose runtime: state via `docker compose ls`, no systemctl.
if t.Runtime == "docker-compose" {
probeDockerComposeRemote(c, t, conn)
return
}
// systemd state via ssh.
if t.SystemdUnit != "" {
cmd := fmt.Sprintf("systemctl is-active %s", shellEscape(t.SystemdUnit))
if t.SystemdScope == "user" {
cmd = "systemctl --user is-active " + shellEscape(t.SystemdUnit)
}
stdout, stderr, code, err := infra.SSHExec(conn, cmd)
// SSH transport failure (ssh exit 255 = resolve/connect refused/auth)
// or local exec error → mark unreachable. systemctl exit codes 3 ("inactive"),
// 4 ("unit not found") are still semantic answers from a reachable host.
if err != nil || code == 255 || code < 0 {
c.SystemdState = "no-route"
detail := strings.TrimSpace(stderr)
if err != nil {
detail = err.Error()
}
c.Error = detail
return
}
// systemctl returns "inactive" exit 3 when unit is stopped OR missing.
// Distinguish via stderr (unit not found).
stderrLower := strings.ToLower(stderr)
if strings.Contains(stderrLower, "could not be found") ||
strings.Contains(stderrLower, "not-found") ||
strings.Contains(stderrLower, "not found") {
c.SystemdState = "not-installed"
return
}
c.SystemdState = strings.TrimSpace(stdout)
if c.SystemdState == "" {
c.SystemdState = "unknown"
}
} else {
c.SystemdState = "n/a"
}
if t.Port > 0 {
// Port listening via ss on remote.
cmd := fmt.Sprintf("ss -tln '( sport = :%d )' | tail -n +2 | head -n1", t.Port)
stdout, _, _, err := infra.SSHExec(conn, cmd)
if err == nil && strings.TrimSpace(stdout) != "" {
c.PortListening = true
}
// HTTP probe via curl on remote.
if t.HealthEndpoint != "" {
url := fmt.Sprintf("http://127.0.0.1:%d%s", t.Port, t.HealthEndpoint)
fmtCmd := `curl -s -o /dev/null -w "%{http_code} %{time_total}" -m ` +
strconv.Itoa(t.HealthTimeoutS) + " " + shellEscape(url)
stdout, stderr, _, err := infra.SSHExec(conn, fmtCmd)
if err != nil {
c.Error = strings.TrimSpace(stderr)
return
}
parts := strings.Fields(strings.TrimSpace(stdout))
if len(parts) >= 1 {
if code, perr := strconv.Atoi(parts[0]); perr == nil {
c.HTTPStatus = code
}
}
if len(parts) >= 2 {
if secs, perr := strconv.ParseFloat(parts[1], 64); perr == nil {
c.HTTPLatencyMs = int(secs * 1000)
}
}
}
}
}
func computeOverall(t Target, c ServiceCheck) string {
if strings.HasPrefix(c.SystemdState, "no-route") {
return "no-route"
}
if c.SystemdState == "not-installed" {
return "not-installed"
}
// When the probe returned a systemd is-active style answer, route through
// the systemd branch regardless of the declared runtime. This covers the
// case where an app advertises `runtime: docker-compose` (because it lives
// on the primary VPS) but also runs as a bare systemd unit on a developer
// PC. The PC-local probe always asks systemctl; trust its output.
switch c.SystemdState {
case "active", "inactive", "failed", "activating", "deactivating", "reloading", "unknown", "n/a":
// fallthrough to systemd-style aggregation below
default:
if t.Runtime == "docker-compose" {
if c.SystemdState == "" {
return "unknown"
}
ran, exited, _ := parseComposeCounts(c.SystemdState)
if ran == 0 {
return "down"
}
if exited > 0 {
return "degraded"
}
if t.Port > 0 && t.HealthEndpoint != "" &&
(c.HTTPStatus > 0 && (c.HTTPStatus < 200 || c.HTTPStatus >= 400)) {
return "degraded"
}
return "ok"
}
}
// Systemd-runtime apps.
if strings.HasPrefix(t.Runtime, "systemd-") || t.Runtime == "stdio" || c.SystemdState == "active" || c.SystemdState == "inactive" || c.SystemdState == "failed" {
if c.SystemdState != "active" {
return "down"
}
if t.Port > 0 {
if !c.PortListening {
return "degraded"
}
if t.HealthEndpoint != "" && (c.HTTPStatus < 200 || c.HTTPStatus >= 400) {
return "degraded"
}
}
return "ok"
}
// manual / unknown runtime: lean on port + http only.
if t.Port > 0 {
if !c.PortListening {
return "down"
}
if t.HealthEndpoint != "" && (c.HTTPStatus < 200 || c.HTTPStatus >= 400) {
return "degraded"
}
return "ok"
}
return "unknown"
}
// parseComposeCounts parses a docker-compose `ls` status string like
// "running(10)", "running(8) | exited(2)" and returns (running, exited, total).
// Tolerant: unknown tokens map to 0.
func parseComposeCounts(s string) (running, exited, total int) {
parts := strings.Split(s, "|")
for _, p := range parts {
p = strings.TrimSpace(p)
// Format: kind(N)
open := strings.Index(p, "(")
closeIdx := strings.Index(p, ")")
if open == -1 || closeIdx == -1 || closeIdx < open {
continue
}
kind := strings.TrimSpace(p[:open])
n, err := strconv.Atoi(strings.TrimSpace(p[open+1 : closeIdx]))
if err != nil {
continue
}
switch kind {
case "running":
running += n
case "exited", "stopped", "dead":
exited += n
}
total += n
}
return
}
// systemctlIsActiveLocal runs `systemctl [--user] is-active <unit>` and returns
// a normalized state. Distinguishes "not-installed" (unit file absent) from
// "inactive" (unit present but stopped). Both are unhealthy but mean different
// things for the human: not-installed = forget systemctl, deploy the unit.
func systemctlIsActiveLocal(unit, scope string) string {
args := []string{"is-active", unit}
if scope == "user" {
args = append([]string{"--user"}, args...)
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "systemctl", args...)
var so, se strings.Builder
cmd.Stdout = &so
cmd.Stderr = &se
_ = cmd.Run()
state := strings.TrimSpace(so.String())
stderr := strings.ToLower(se.String())
if strings.Contains(stderr, "could not be found") ||
strings.Contains(stderr, "not-found") ||
strings.Contains(stderr, "not found") {
return "not-installed"
}
if state == "" {
return "unknown"
}
return state
}
func portListeningLocal(port int, timeout time.Duration) bool {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), timeout)
if err != nil {
return false
}
conn.Close()
return true
}
func httpProbe(host string, port int, path string, timeout time.Duration) (int, int, error) {
client := &http.Client{Timeout: timeout}
url := fmt.Sprintf("http://%s:%d%s", host, port, path)
start := time.Now()
resp, err := client.Get(url)
latencyMs := int(time.Since(start) / time.Millisecond)
if err != nil {
return 0, latencyMs, err
}
defer resp.Body.Close()
return resp.StatusCode, latencyMs, nil
}
// probeDockerComposeRemote queries `docker compose ls --format json` over SSH
// and maps the running/expected ratio into a synthetic systemd_state-like value
// + port/HTTP probes already declared.
//
// Project name defaults to t.Name (the app's frontmatter `name`). Override
// later via a `docker_compose_project` field if needed.
func probeDockerComposeRemote(c *ServiceCheck, t Target, conn infra.SSHConn) {
project := t.Name
// `docker compose ls --format json` returns array of {Name, Status, ConfigFiles}.
// Status string examples: "running(10)", "running(8) | exited(2)", "exited(5)".
cmd := "docker compose ls --all --format json"
stdout, stderr, code, err := infra.SSHExec(conn, cmd)
if err != nil || code != 0 {
c.SystemdState = "no-route"
detail := strings.TrimSpace(stderr)
if err != nil {
detail = err.Error()
}
c.Error = detail
return
}
// Tolerant parse: find the JSON object whose "Name" matches the project.
// Try variants: the app's name, hyphenized, and slashes-stripped.
candidates := []string{project, strings.ReplaceAll(project, "_", "-")}
var status string
var found bool
var matched string
for _, cand := range candidates {
if cand == "" {
continue
}
if s, ok := extractComposeStatus(stdout, cand); ok {
status, found, matched = s, true, cand
break
}
}
if !found {
c.SystemdState = "not-installed"
c.Error = "no docker compose project named " + project +
" (tried " + strings.Join(candidates, ", ") + ")"
return
}
_ = matched // reserved for future telemetry
c.SystemdState = status
if t.Port > 0 {
probeRemotePort(c, t, conn)
}
}
// extractComposeStatus parses the JSON array emitted by
// `docker compose ls --format json` and returns the Status field of the entry
// with Name == project. Tolerant to whitespace and field ordering.
func extractComposeStatus(jsonBlob, project string) (string, bool) {
// Naive but reliable: find `"Name":"<project>"` and the nearest
// `"Status":"..."` field within the same object.
needle := `"Name":"` + project + `"`
idx := strings.Index(jsonBlob, needle)
if idx == -1 {
// Try alternative: "Name": "project" with space.
needle2 := `"Name": "` + project + `"`
idx = strings.Index(jsonBlob, needle2)
if idx == -1 {
return "", false
}
}
// Bound the search window to the surrounding object.
open := strings.LastIndex(jsonBlob[:idx], "{")
close := strings.Index(jsonBlob[idx:], "}")
if open == -1 || close == -1 {
return "", false
}
obj := jsonBlob[open : idx+close+1]
// Find "Status":"..."
const stKey = `"Status":`
si := strings.Index(obj, stKey)
if si == -1 {
return "", false
}
// Skip whitespace and opening quote.
rest := obj[si+len(stKey):]
rest = strings.TrimLeft(rest, " \t")
if !strings.HasPrefix(rest, `"`) {
return "", false
}
rest = rest[1:]
end := strings.Index(rest, `"`)
if end == -1 {
return "", false
}
return rest[:end], true
}
// probeRemotePort runs the port + http probes when a docker compose project
// declares a port that should be reachable on the remote host's loopback.
func probeRemotePort(c *ServiceCheck, t Target, conn infra.SSHConn) {
cmd := fmt.Sprintf("ss -tln '( sport = :%d )' | tail -n +2 | head -n1", t.Port)
stdout, _, _, err := infra.SSHExec(conn, cmd)
if err == nil && strings.TrimSpace(stdout) != "" {
c.PortListening = true
}
if t.HealthEndpoint == "" {
return
}
url := fmt.Sprintf("http://127.0.0.1:%d%s", t.Port, t.HealthEndpoint)
fmtCmd := `curl -s -o /dev/null -w "%{http_code} %{time_total}" -m ` +
strconv.Itoa(t.HealthTimeoutS) + " " + shellEscape(url)
stdout, stderr, _, err := infra.SSHExec(conn, fmtCmd)
if err != nil {
c.Error = strings.TrimSpace(stderr)
return
}
parts := strings.Fields(strings.TrimSpace(stdout))
if len(parts) >= 1 {
if code, perr := strconv.Atoi(parts[0]); perr == nil {
c.HTTPStatus = code
}
}
if len(parts) >= 2 {
if secs, perr := strconv.ParseFloat(parts[1], 64); perr == nil {
c.HTTPLatencyMs = int(secs * 1000)
}
}
}
// restartTarget runs `systemctl [--user] restart <unit>` locally or via SSH
// depending on whether the target lives on selfPC or a remote PC.
//
// Returns (stdout, stderr, exit_code, transport_error).
// - transport_error != nil when ssh / exec failed to even run (DNS, network).
// In that case exit_code is -1.
// - exit_code != 0 when the command ran but systemctl rejected the request
// (unit not found, permission denied, etc.). transport_error is nil.
func restartTarget(t Target, selfPC string) (string, string, int, error) {
if t.SystemdUnit == "" {
return "", "no systemd_unit declared in app.md `service:` block", -1,
fmt.Errorf("no systemd_unit")
}
if t.PCID == selfPC || t.IsLocalOnly {
args := []string{"restart", t.SystemdUnit}
if t.SystemdScope == "user" {
args = append([]string{"--user"}, args...)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "systemctl", args...)
var so, se strings.Builder
cmd.Stdout = &so
cmd.Stderr = &se
err := cmd.Run()
code := 0
if exitErr, ok := err.(*exec.ExitError); ok {
code = exitErr.ExitCode()
err = nil
} else if err != nil {
return so.String(), se.String(), -1, err
}
return so.String(), se.String(), code, nil
}
// Remote via SSH.
conn := infra.SSHConn{Host: t.PCID}
cmd := fmt.Sprintf("systemctl restart %s", shellEscape(t.SystemdUnit))
if t.SystemdScope == "user" {
cmd = "systemctl --user restart " + shellEscape(t.SystemdUnit)
}
stdout, stderr, code, err := infra.SSHExec(conn, cmd)
return stdout, stderr, code, err
}
func shellEscape(s string) string {
// Conservative single-quote escape for arguments embedded in remote commands.
return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'"
}