363 lines
8.8 KiB
Go
363 lines
8.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
type Server struct {
|
|
registryRoot string
|
|
selfPC string
|
|
opsDB *sql.DB
|
|
|
|
mu sync.RWMutex
|
|
targets []Target
|
|
}
|
|
|
|
func main() {
|
|
addr := flag.String("bind", "127.0.0.1:8485", "HTTP bind address")
|
|
registry := flag.String("registry", defaultRegistryRoot(), "fn_registry root")
|
|
opsPath := flag.String("db", "", "operations.db path (default: <app_dir>/operations.db)")
|
|
interval := flag.Duration("interval", 15*time.Second, "check loop interval")
|
|
once := flag.Bool("once", false, "run one check cycle and exit (smoke test)")
|
|
flag.Parse()
|
|
|
|
if *opsPath == "" {
|
|
exe, _ := os.Executable()
|
|
*opsPath = filepath.Join(filepath.Dir(exe), "operations.db")
|
|
}
|
|
|
|
selfPC := readSelfPC()
|
|
log.Printf("services_api: selfPC=%q registry=%q ops=%q", selfPC, *registry, *opsPath)
|
|
|
|
db, err := openOpsDB(*opsPath)
|
|
if err != nil {
|
|
log.Fatalf("open ops db: %v", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
srv := &Server{registryRoot: *registry, selfPC: selfPC, opsDB: db}
|
|
if err := srv.refreshTargets(); err != nil {
|
|
log.Fatalf("load targets: %v", err)
|
|
}
|
|
|
|
if *once {
|
|
srv.runOneCycle()
|
|
log.Printf("once: completed %d checks", len(srv.targets))
|
|
return
|
|
}
|
|
|
|
// Background loop.
|
|
go srv.checkLoop(*interval)
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/api/health", srv.handleHealth)
|
|
mux.HandleFunc("/api/services", srv.handleServices)
|
|
mux.HandleFunc("/api/check", srv.handleForceCheck)
|
|
mux.HandleFunc("/api/pcs", srv.handlePCs)
|
|
mux.HandleFunc("/api/action/", srv.handleAction)
|
|
|
|
log.Printf("services_api listening on %s", *addr)
|
|
if err := http.ListenAndServe(*addr, mux); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func defaultRegistryRoot() string {
|
|
if v := os.Getenv("FN_REGISTRY_ROOT"); v != "" {
|
|
return v
|
|
}
|
|
exe, _ := os.Executable()
|
|
// Walk up looking for registry.db.
|
|
dir := filepath.Dir(exe)
|
|
for i := 0; i < 6; i++ {
|
|
if _, err := os.Stat(filepath.Join(dir, "registry.db")); err == nil {
|
|
return dir
|
|
}
|
|
parent := filepath.Dir(dir)
|
|
if parent == dir {
|
|
break
|
|
}
|
|
dir = parent
|
|
}
|
|
return "/home/lucas/fn_registry"
|
|
}
|
|
|
|
func readSelfPC() string {
|
|
home, _ := os.UserHomeDir()
|
|
b, err := os.ReadFile(filepath.Join(home, ".fn_pc"))
|
|
if err != nil {
|
|
return "unknown"
|
|
}
|
|
return strings.TrimSpace(string(b))
|
|
}
|
|
|
|
func (s *Server) refreshTargets() error {
|
|
ts, err := loadTargets(s.registryRoot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.mu.Lock()
|
|
s.targets = ts
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) runOneCycle() {
|
|
s.mu.RLock()
|
|
targets := append([]Target(nil), s.targets...)
|
|
s.mu.RUnlock()
|
|
|
|
// Parallel probes (each remote SSH call can stall up to ~10s).
|
|
// Cap concurrency to avoid overwhelming SSH agent / sockets.
|
|
const maxParallel = 8
|
|
sem := make(chan struct{}, maxParallel)
|
|
results := make(chan ServiceCheck, len(targets))
|
|
var wg sync.WaitGroup
|
|
|
|
for _, t := range targets {
|
|
wg.Add(1)
|
|
sem <- struct{}{}
|
|
go func(tt Target) {
|
|
defer wg.Done()
|
|
defer func() { <-sem }()
|
|
done := make(chan ServiceCheck, 1)
|
|
go func() { done <- probeTarget(tt, s.selfPC) }()
|
|
select {
|
|
case c := <-done:
|
|
results <- c
|
|
case <-time.After(20 * time.Second):
|
|
results <- ServiceCheck{
|
|
AppID: tt.AppID,
|
|
PCID: tt.PCID,
|
|
SystemdState: "timeout",
|
|
Error: "probe exceeded 20s",
|
|
Overall: "no-route",
|
|
}
|
|
}
|
|
}(t)
|
|
}
|
|
|
|
go func() { wg.Wait(); close(results) }()
|
|
|
|
for c := range results {
|
|
if err := upsertState(s.opsDB, c); err != nil {
|
|
log.Printf("upsert %s/%s: %v", c.AppID, c.PCID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) checkLoop(interval time.Duration) {
|
|
tick := time.NewTicker(interval)
|
|
defer tick.Stop()
|
|
// First cycle immediately, then on interval.
|
|
s.runOneCycle()
|
|
refreshEvery := 5 * time.Minute
|
|
lastRefresh := time.Now()
|
|
for {
|
|
<-tick.C
|
|
if time.Since(lastRefresh) > refreshEvery {
|
|
if err := s.refreshTargets(); err != nil {
|
|
log.Printf("refresh targets: %v", err)
|
|
}
|
|
lastRefresh = time.Now()
|
|
}
|
|
s.runOneCycle()
|
|
}
|
|
}
|
|
|
|
// HTTP handlers ------------------------------------------------------------
|
|
|
|
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "self_pc": s.selfPC})
|
|
}
|
|
|
|
func (s *Server) handleServices(w http.ResponseWriter, r *http.Request) {
|
|
s.mu.RLock()
|
|
targets := append([]Target(nil), s.targets...)
|
|
s.mu.RUnlock()
|
|
|
|
rows, err := loadStates(s.opsDB, targets, s.selfPC)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]any{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// Compute reachable per PC: a PC is unreachable when all its services
|
|
// returned no-route (we never got systemd state) AND it is not selfPC.
|
|
noRouteByPC := map[string]int{}
|
|
totalByPC := map[string]int{}
|
|
for _, r := range rows {
|
|
totalByPC[r.PCID]++
|
|
if r.Overall == "no-route" {
|
|
noRouteByPC[r.PCID]++
|
|
}
|
|
}
|
|
reachableByPC := map[string]bool{}
|
|
for pc, total := range totalByPC {
|
|
if pc == s.selfPC {
|
|
reachableByPC[pc] = true
|
|
continue
|
|
}
|
|
reachableByPC[pc] = noRouteByPC[pc] < total
|
|
}
|
|
for i := range rows {
|
|
rows[i].Reachable = reachableByPC[rows[i].PCID]
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"self_pc": s.selfPC,
|
|
"services": rows,
|
|
"ts": time.Now().Unix(),
|
|
"pcs": pcsSummary(reachableByPC, totalByPC, s.selfPC),
|
|
})
|
|
}
|
|
|
|
// pcsSummary builds the array used in /api/services and /api/pcs.
|
|
func pcsSummary(reachable map[string]bool, total map[string]int, selfPC string) []map[string]any {
|
|
out := make([]map[string]any, 0, len(total))
|
|
for pc, n := range total {
|
|
out = append(out, map[string]any{
|
|
"pc_id": pc,
|
|
"is_self": pc == selfPC,
|
|
"reachable": reachable[pc],
|
|
"services_count": n,
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *Server) handleForceCheck(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
|
|
defer cancel()
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.runOneCycle()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
s.handleServices(w, r)
|
|
case <-ctx.Done():
|
|
writeJSON(w, http.StatusGatewayTimeout, map[string]any{"error": "check loop timed out"})
|
|
}
|
|
}
|
|
|
|
// handleAction handles POST /api/action/{app_id}/{pc_id}/restart.
|
|
// Currently the only supported action is "restart"; we keep the URL
|
|
// shape general for stop/start in v2.
|
|
func (s *Server) handleAction(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
// URL: /api/action/{app_id}/{pc_id}/{verb}
|
|
path := strings.TrimPrefix(r.URL.Path, "/api/action/")
|
|
parts := strings.Split(path, "/")
|
|
if len(parts) != 3 {
|
|
writeJSON(w, http.StatusBadRequest, map[string]any{
|
|
"error": "expected /api/action/{app_id}/{pc_id}/{verb}",
|
|
})
|
|
return
|
|
}
|
|
appID, pcID, verb := parts[0], parts[1], parts[2]
|
|
if verb != "restart" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]any{
|
|
"error": "unsupported verb (allowed: restart)",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Lookup target.
|
|
s.mu.RLock()
|
|
var t *Target
|
|
for i := range s.targets {
|
|
if s.targets[i].AppID == appID && s.targets[i].PCID == pcID {
|
|
t = &s.targets[i]
|
|
break
|
|
}
|
|
}
|
|
s.mu.RUnlock()
|
|
if t == nil {
|
|
writeJSON(w, http.StatusNotFound, map[string]any{
|
|
"error": "target not found",
|
|
})
|
|
return
|
|
}
|
|
|
|
stdout, stderr, code, err := restartTarget(*t, s.selfPC)
|
|
if err != nil && code == -1 {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]any{
|
|
"app_id": appID,
|
|
"pc_id": pcID,
|
|
"verb": verb,
|
|
"error": err.Error(),
|
|
"stderr": stderr,
|
|
"exit_code": code,
|
|
})
|
|
return
|
|
}
|
|
status := http.StatusOK
|
|
if code != 0 {
|
|
status = http.StatusBadGateway
|
|
}
|
|
// Trigger a check cycle so UI refresh shows the new state immediately.
|
|
go s.runOneCycle()
|
|
|
|
writeJSON(w, status, map[string]any{
|
|
"app_id": appID,
|
|
"pc_id": pcID,
|
|
"verb": verb,
|
|
"exit_code": code,
|
|
"stdout": strings.TrimSpace(stdout),
|
|
"stderr": strings.TrimSpace(stderr),
|
|
})
|
|
}
|
|
|
|
func (s *Server) handlePCs(w http.ResponseWriter, r *http.Request) {
|
|
s.mu.RLock()
|
|
targets := append([]Target(nil), s.targets...)
|
|
s.mu.RUnlock()
|
|
|
|
counts := map[string]int{}
|
|
for _, t := range targets {
|
|
counts[t.PCID]++
|
|
}
|
|
out := make([]map[string]any, 0, len(counts))
|
|
for pc, n := range counts {
|
|
out = append(out, map[string]any{
|
|
"pc_id": pc,
|
|
"is_self": pc == s.selfPC,
|
|
"services_count": n,
|
|
})
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"pcs": out, "self_pc": s.selfPC})
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, code int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(code)
|
|
enc := json.NewEncoder(w)
|
|
enc.SetIndent("", " ")
|
|
_ = enc.Encode(v)
|
|
}
|
|
|
|
// stub to silence unused import
|
|
var _ = fmt.Sprintf
|