package main import ( "context" "database/sql" "fmt" "net" "net/http" "os/exec" "strconv" "strings" "time" "fn-registry/functions/infra" ) // Target is one (app, pc) combination derived from registry.db apps tagged // service + service_targets. type Target struct { AppID string Name string Runtime string Port int HealthEndpoint string HealthTimeoutS int SystemdUnit string SystemdScope string IsLocalOnly bool PCID string } // ServiceCheck is the result of one probe of a target. type ServiceCheck struct { AppID string PCID string SystemdState string PortListening bool HTTPStatus int HTTPLatencyMs int Error string Overall string } // loadTargets queries registry.db for the cross product of services x pc_targets. func loadTargets(registryRoot string) ([]Target, error) { dbPath := registryRoot + "/registry.db" db, err := sql.Open("sqlite3", dbPath+"?mode=ro&_journal_mode=WAL") if err != nil { return nil, fmt.Errorf("open registry.db: %w", err) } defer db.Close() rows, err := db.Query(` SELECT a.id, a.name, COALESCE(a.service_runtime,''), COALESCE(a.service_port,0), COALESCE(a.service_health_endpoint,''), COALESCE(a.service_health_timeout_s,3), COALESCE(a.service_systemd_unit,''), COALESCE(a.service_systemd_scope,''), COALESCE(a.service_is_local_only,0), t.pc_id FROM apps a JOIN service_targets t ON t.app_id = a.id WHERE a.tags LIKE '%service%' ORDER BY a.name, t.pc_id `) if err != nil { return nil, fmt.Errorf("query targets: %w", err) } defer rows.Close() var out []Target for rows.Next() { var t Target var localOnly int if err := rows.Scan( &t.AppID, &t.Name, &t.Runtime, &t.Port, &t.HealthEndpoint, &t.HealthTimeoutS, &t.SystemdUnit, &t.SystemdScope, &localOnly, &t.PCID, ); err != nil { return nil, err } t.IsLocalOnly = localOnly != 0 if t.HealthTimeoutS <= 0 { t.HealthTimeoutS = 3 } out = append(out, t) } return out, rows.Err() } // probeTarget probes one target and returns a ServiceCheck. func probeTarget(t Target, selfPC string) ServiceCheck { c := ServiceCheck{AppID: t.AppID, PCID: t.PCID} // Local check only when the target lives on selfPC. Remote PCs always go // over SSH; if SSH has no route the probe returns "no-route" — that is // exactly what we want to surface "PC unreachable" upstream. if t.PCID == selfPC { probeLocal(&c, t) } else { probeRemote(&c, t) } c.Overall = computeOverall(t, c) return c } func probeLocal(c *ServiceCheck, t Target) { // systemd state. if t.SystemdUnit != "" { c.SystemdState = systemctlIsActiveLocal(t.SystemdUnit, t.SystemdScope) } else { c.SystemdState = "n/a" } // Port listening (TCP localhost). if t.Port > 0 { c.PortListening = portListeningLocal(t.Port, time.Duration(t.HealthTimeoutS)*time.Second) } // HTTP health. if t.Port > 0 && t.HealthEndpoint != "" { status, latency, err := httpProbe("127.0.0.1", t.Port, t.HealthEndpoint, time.Duration(t.HealthTimeoutS)*time.Second) c.HTTPStatus = status c.HTTPLatencyMs = latency if err != nil { c.Error = err.Error() } } } func probeRemote(c *ServiceCheck, t Target) { conn := infra.SSHConn{Host: t.PCID} // docker-compose runtime: state via `docker compose ls`, no systemctl. if t.Runtime == "docker-compose" { probeDockerComposeRemote(c, t, conn) return } // systemd state via ssh. if t.SystemdUnit != "" { cmd := fmt.Sprintf("systemctl is-active %s", shellEscape(t.SystemdUnit)) if t.SystemdScope == "user" { cmd = "systemctl --user is-active " + shellEscape(t.SystemdUnit) } stdout, stderr, code, err := infra.SSHExec(conn, cmd) // SSH transport failure (ssh exit 255 = resolve/connect refused/auth) // or local exec error → mark unreachable. systemctl exit codes 3 ("inactive"), // 4 ("unit not found") are still semantic answers from a reachable host. if err != nil || code == 255 || code < 0 { c.SystemdState = "no-route" detail := strings.TrimSpace(stderr) if err != nil { detail = err.Error() } c.Error = detail return } // systemctl returns "inactive" exit 3 when unit is stopped OR missing. // Distinguish via stderr (unit not found). stderrLower := strings.ToLower(stderr) if strings.Contains(stderrLower, "could not be found") || strings.Contains(stderrLower, "not-found") || strings.Contains(stderrLower, "not found") { c.SystemdState = "not-installed" return } c.SystemdState = strings.TrimSpace(stdout) if c.SystemdState == "" { c.SystemdState = "unknown" } } else { c.SystemdState = "n/a" } if t.Port > 0 { // Port listening via ss on remote. cmd := fmt.Sprintf("ss -tln '( sport = :%d )' | tail -n +2 | head -n1", t.Port) stdout, _, _, err := infra.SSHExec(conn, cmd) if err == nil && strings.TrimSpace(stdout) != "" { c.PortListening = true } // HTTP probe via curl on remote. if t.HealthEndpoint != "" { url := fmt.Sprintf("http://127.0.0.1:%d%s", t.Port, t.HealthEndpoint) fmtCmd := `curl -s -o /dev/null -w "%{http_code} %{time_total}" -m ` + strconv.Itoa(t.HealthTimeoutS) + " " + shellEscape(url) stdout, stderr, _, err := infra.SSHExec(conn, fmtCmd) if err != nil { c.Error = strings.TrimSpace(stderr) return } parts := strings.Fields(strings.TrimSpace(stdout)) if len(parts) >= 1 { if code, perr := strconv.Atoi(parts[0]); perr == nil { c.HTTPStatus = code } } if len(parts) >= 2 { if secs, perr := strconv.ParseFloat(parts[1], 64); perr == nil { c.HTTPLatencyMs = int(secs * 1000) } } } } } func computeOverall(t Target, c ServiceCheck) string { if strings.HasPrefix(c.SystemdState, "no-route") { return "no-route" } if c.SystemdState == "not-installed" { return "not-installed" } // When the probe returned a systemd is-active style answer, route through // the systemd branch regardless of the declared runtime. This covers the // case where an app advertises `runtime: docker-compose` (because it lives // on the primary VPS) but also runs as a bare systemd unit on a developer // PC. The PC-local probe always asks systemctl; trust its output. switch c.SystemdState { case "active", "inactive", "failed", "activating", "deactivating", "reloading", "unknown", "n/a": // fallthrough to systemd-style aggregation below default: if t.Runtime == "docker-compose" { if c.SystemdState == "" { return "unknown" } ran, exited, _ := parseComposeCounts(c.SystemdState) if ran == 0 { return "down" } if exited > 0 { return "degraded" } if t.Port > 0 && t.HealthEndpoint != "" && (c.HTTPStatus > 0 && (c.HTTPStatus < 200 || c.HTTPStatus >= 400)) { return "degraded" } return "ok" } } // Systemd-runtime apps. if strings.HasPrefix(t.Runtime, "systemd-") || t.Runtime == "stdio" || c.SystemdState == "active" || c.SystemdState == "inactive" || c.SystemdState == "failed" { if c.SystemdState != "active" { return "down" } if t.Port > 0 { if !c.PortListening { return "degraded" } if t.HealthEndpoint != "" && (c.HTTPStatus < 200 || c.HTTPStatus >= 400) { return "degraded" } } return "ok" } // manual / unknown runtime: lean on port + http only. if t.Port > 0 { if !c.PortListening { return "down" } if t.HealthEndpoint != "" && (c.HTTPStatus < 200 || c.HTTPStatus >= 400) { return "degraded" } return "ok" } return "unknown" } // parseComposeCounts parses a docker-compose `ls` status string like // "running(10)", "running(8) | exited(2)" and returns (running, exited, total). // Tolerant: unknown tokens map to 0. func parseComposeCounts(s string) (running, exited, total int) { parts := strings.Split(s, "|") for _, p := range parts { p = strings.TrimSpace(p) // Format: kind(N) open := strings.Index(p, "(") closeIdx := strings.Index(p, ")") if open == -1 || closeIdx == -1 || closeIdx < open { continue } kind := strings.TrimSpace(p[:open]) n, err := strconv.Atoi(strings.TrimSpace(p[open+1 : closeIdx])) if err != nil { continue } switch kind { case "running": running += n case "exited", "stopped", "dead": exited += n } total += n } return } // systemctlIsActiveLocal runs `systemctl [--user] is-active ` and returns // a normalized state. Distinguishes "not-installed" (unit file absent) from // "inactive" (unit present but stopped). Both are unhealthy but mean different // things for the human: not-installed = forget systemctl, deploy the unit. func systemctlIsActiveLocal(unit, scope string) string { args := []string{"is-active", unit} if scope == "user" { args = append([]string{"--user"}, args...) } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() cmd := exec.CommandContext(ctx, "systemctl", args...) var so, se strings.Builder cmd.Stdout = &so cmd.Stderr = &se _ = cmd.Run() state := strings.TrimSpace(so.String()) stderr := strings.ToLower(se.String()) if strings.Contains(stderr, "could not be found") || strings.Contains(stderr, "not-found") || strings.Contains(stderr, "not found") { return "not-installed" } if state == "" { return "unknown" } return state } func portListeningLocal(port int, timeout time.Duration) bool { conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), timeout) if err != nil { return false } conn.Close() return true } func httpProbe(host string, port int, path string, timeout time.Duration) (int, int, error) { client := &http.Client{Timeout: timeout} url := fmt.Sprintf("http://%s:%d%s", host, port, path) start := time.Now() resp, err := client.Get(url) latencyMs := int(time.Since(start) / time.Millisecond) if err != nil { return 0, latencyMs, err } defer resp.Body.Close() return resp.StatusCode, latencyMs, nil } // probeDockerComposeRemote queries `docker compose ls --format json` over SSH // and maps the running/expected ratio into a synthetic systemd_state-like value // + port/HTTP probes already declared. // // Project name defaults to t.Name (the app's frontmatter `name`). Override // later via a `docker_compose_project` field if needed. func probeDockerComposeRemote(c *ServiceCheck, t Target, conn infra.SSHConn) { project := t.Name // `docker compose ls --format json` returns array of {Name, Status, ConfigFiles}. // Status string examples: "running(10)", "running(8) | exited(2)", "exited(5)". cmd := "docker compose ls --all --format json" stdout, stderr, code, err := infra.SSHExec(conn, cmd) if err != nil || code != 0 { c.SystemdState = "no-route" detail := strings.TrimSpace(stderr) if err != nil { detail = err.Error() } c.Error = detail return } // Tolerant parse: find the JSON object whose "Name" matches the project. // Try variants: the app's name, hyphenized, and slashes-stripped. candidates := []string{project, strings.ReplaceAll(project, "_", "-")} var status string var found bool var matched string for _, cand := range candidates { if cand == "" { continue } if s, ok := extractComposeStatus(stdout, cand); ok { status, found, matched = s, true, cand break } } if !found { c.SystemdState = "not-installed" c.Error = "no docker compose project named " + project + " (tried " + strings.Join(candidates, ", ") + ")" return } _ = matched // reserved for future telemetry c.SystemdState = status if t.Port > 0 { probeRemotePort(c, t, conn) } } // extractComposeStatus parses the JSON array emitted by // `docker compose ls --format json` and returns the Status field of the entry // with Name == project. Tolerant to whitespace and field ordering. func extractComposeStatus(jsonBlob, project string) (string, bool) { // Naive but reliable: find `"Name":""` and the nearest // `"Status":"..."` field within the same object. needle := `"Name":"` + project + `"` idx := strings.Index(jsonBlob, needle) if idx == -1 { // Try alternative: "Name": "project" with space. needle2 := `"Name": "` + project + `"` idx = strings.Index(jsonBlob, needle2) if idx == -1 { return "", false } } // Bound the search window to the surrounding object. open := strings.LastIndex(jsonBlob[:idx], "{") close := strings.Index(jsonBlob[idx:], "}") if open == -1 || close == -1 { return "", false } obj := jsonBlob[open : idx+close+1] // Find "Status":"..." const stKey = `"Status":` si := strings.Index(obj, stKey) if si == -1 { return "", false } // Skip whitespace and opening quote. rest := obj[si+len(stKey):] rest = strings.TrimLeft(rest, " \t") if !strings.HasPrefix(rest, `"`) { return "", false } rest = rest[1:] end := strings.Index(rest, `"`) if end == -1 { return "", false } return rest[:end], true } // probeRemotePort runs the port + http probes when a docker compose project // declares a port that should be reachable on the remote host's loopback. func probeRemotePort(c *ServiceCheck, t Target, conn infra.SSHConn) { cmd := fmt.Sprintf("ss -tln '( sport = :%d )' | tail -n +2 | head -n1", t.Port) stdout, _, _, err := infra.SSHExec(conn, cmd) if err == nil && strings.TrimSpace(stdout) != "" { c.PortListening = true } if t.HealthEndpoint == "" { return } url := fmt.Sprintf("http://127.0.0.1:%d%s", t.Port, t.HealthEndpoint) fmtCmd := `curl -s -o /dev/null -w "%{http_code} %{time_total}" -m ` + strconv.Itoa(t.HealthTimeoutS) + " " + shellEscape(url) stdout, stderr, _, err := infra.SSHExec(conn, fmtCmd) if err != nil { c.Error = strings.TrimSpace(stderr) return } parts := strings.Fields(strings.TrimSpace(stdout)) if len(parts) >= 1 { if code, perr := strconv.Atoi(parts[0]); perr == nil { c.HTTPStatus = code } } if len(parts) >= 2 { if secs, perr := strconv.ParseFloat(parts[1], 64); perr == nil { c.HTTPLatencyMs = int(secs * 1000) } } } // restartTarget runs `systemctl [--user] restart ` locally or via SSH // depending on whether the target lives on selfPC or a remote PC. // // Returns (stdout, stderr, exit_code, transport_error). // - transport_error != nil when ssh / exec failed to even run (DNS, network). // In that case exit_code is -1. // - exit_code != 0 when the command ran but systemctl rejected the request // (unit not found, permission denied, etc.). transport_error is nil. func restartTarget(t Target, selfPC string) (string, string, int, error) { if t.SystemdUnit == "" { return "", "no systemd_unit declared in app.md `service:` block", -1, fmt.Errorf("no systemd_unit") } if t.PCID == selfPC || t.IsLocalOnly { args := []string{"restart", t.SystemdUnit} if t.SystemdScope == "user" { args = append([]string{"--user"}, args...) } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() cmd := exec.CommandContext(ctx, "systemctl", args...) var so, se strings.Builder cmd.Stdout = &so cmd.Stderr = &se err := cmd.Run() code := 0 if exitErr, ok := err.(*exec.ExitError); ok { code = exitErr.ExitCode() err = nil } else if err != nil { return so.String(), se.String(), -1, err } return so.String(), se.String(), code, nil } // Remote via SSH. conn := infra.SSHConn{Host: t.PCID} cmd := fmt.Sprintf("systemctl restart %s", shellEscape(t.SystemdUnit)) if t.SystemdScope == "user" { cmd = "systemctl --user restart " + shellEscape(t.SystemdUnit) } stdout, stderr, code, err := infra.SSHExec(conn, cmd) return stdout, stderr, code, err } func shellEscape(s string) string { // Conservative single-quote escape for arguments embedded in remote commands. return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'" }