Files
services_api/main.go
T
2026-05-19 00:31:23 +02:00

363 lines
8.8 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
type Server struct {
registryRoot string
selfPC string
opsDB *sql.DB
mu sync.RWMutex
targets []Target
}
func main() {
addr := flag.String("bind", "127.0.0.1:8485", "HTTP bind address")
registry := flag.String("registry", defaultRegistryRoot(), "fn_registry root")
opsPath := flag.String("db", "", "operations.db path (default: <app_dir>/operations.db)")
interval := flag.Duration("interval", 15*time.Second, "check loop interval")
once := flag.Bool("once", false, "run one check cycle and exit (smoke test)")
flag.Parse()
if *opsPath == "" {
exe, _ := os.Executable()
*opsPath = filepath.Join(filepath.Dir(exe), "operations.db")
}
selfPC := readSelfPC()
log.Printf("services_api: selfPC=%q registry=%q ops=%q", selfPC, *registry, *opsPath)
db, err := openOpsDB(*opsPath)
if err != nil {
log.Fatalf("open ops db: %v", err)
}
defer db.Close()
srv := &Server{registryRoot: *registry, selfPC: selfPC, opsDB: db}
if err := srv.refreshTargets(); err != nil {
log.Fatalf("load targets: %v", err)
}
if *once {
srv.runOneCycle()
log.Printf("once: completed %d checks", len(srv.targets))
return
}
// Background loop.
go srv.checkLoop(*interval)
mux := http.NewServeMux()
mux.HandleFunc("/api/health", srv.handleHealth)
mux.HandleFunc("/api/services", srv.handleServices)
mux.HandleFunc("/api/check", srv.handleForceCheck)
mux.HandleFunc("/api/pcs", srv.handlePCs)
mux.HandleFunc("/api/action/", srv.handleAction)
log.Printf("services_api listening on %s", *addr)
if err := http.ListenAndServe(*addr, mux); err != nil {
log.Fatal(err)
}
}
func defaultRegistryRoot() string {
if v := os.Getenv("FN_REGISTRY_ROOT"); v != "" {
return v
}
exe, _ := os.Executable()
// Walk up looking for registry.db.
dir := filepath.Dir(exe)
for i := 0; i < 6; i++ {
if _, err := os.Stat(filepath.Join(dir, "registry.db")); err == nil {
return dir
}
parent := filepath.Dir(dir)
if parent == dir {
break
}
dir = parent
}
return "/home/lucas/fn_registry"
}
func readSelfPC() string {
home, _ := os.UserHomeDir()
b, err := os.ReadFile(filepath.Join(home, ".fn_pc"))
if err != nil {
return "unknown"
}
return strings.TrimSpace(string(b))
}
func (s *Server) refreshTargets() error {
ts, err := loadTargets(s.registryRoot)
if err != nil {
return err
}
s.mu.Lock()
s.targets = ts
s.mu.Unlock()
return nil
}
func (s *Server) runOneCycle() {
s.mu.RLock()
targets := append([]Target(nil), s.targets...)
s.mu.RUnlock()
// Parallel probes (each remote SSH call can stall up to ~10s).
// Cap concurrency to avoid overwhelming SSH agent / sockets.
const maxParallel = 8
sem := make(chan struct{}, maxParallel)
results := make(chan ServiceCheck, len(targets))
var wg sync.WaitGroup
for _, t := range targets {
wg.Add(1)
sem <- struct{}{}
go func(tt Target) {
defer wg.Done()
defer func() { <-sem }()
done := make(chan ServiceCheck, 1)
go func() { done <- probeTarget(tt, s.selfPC) }()
select {
case c := <-done:
results <- c
case <-time.After(20 * time.Second):
results <- ServiceCheck{
AppID: tt.AppID,
PCID: tt.PCID,
SystemdState: "timeout",
Error: "probe exceeded 20s",
Overall: "no-route",
}
}
}(t)
}
go func() { wg.Wait(); close(results) }()
for c := range results {
if err := upsertState(s.opsDB, c); err != nil {
log.Printf("upsert %s/%s: %v", c.AppID, c.PCID, err)
}
}
}
func (s *Server) checkLoop(interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
// First cycle immediately, then on interval.
s.runOneCycle()
refreshEvery := 5 * time.Minute
lastRefresh := time.Now()
for {
<-tick.C
if time.Since(lastRefresh) > refreshEvery {
if err := s.refreshTargets(); err != nil {
log.Printf("refresh targets: %v", err)
}
lastRefresh = time.Now()
}
s.runOneCycle()
}
}
// HTTP handlers ------------------------------------------------------------
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "self_pc": s.selfPC})
}
func (s *Server) handleServices(w http.ResponseWriter, r *http.Request) {
s.mu.RLock()
targets := append([]Target(nil), s.targets...)
s.mu.RUnlock()
rows, err := loadStates(s.opsDB, targets, s.selfPC)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"error": err.Error()})
return
}
// Compute reachable per PC: a PC is unreachable when all its services
// returned no-route (we never got systemd state) AND it is not selfPC.
noRouteByPC := map[string]int{}
totalByPC := map[string]int{}
for _, r := range rows {
totalByPC[r.PCID]++
if r.Overall == "no-route" {
noRouteByPC[r.PCID]++
}
}
reachableByPC := map[string]bool{}
for pc, total := range totalByPC {
if pc == s.selfPC {
reachableByPC[pc] = true
continue
}
reachableByPC[pc] = noRouteByPC[pc] < total
}
for i := range rows {
rows[i].Reachable = reachableByPC[rows[i].PCID]
}
writeJSON(w, http.StatusOK, map[string]any{
"self_pc": s.selfPC,
"services": rows,
"ts": time.Now().Unix(),
"pcs": pcsSummary(reachableByPC, totalByPC, s.selfPC),
})
}
// pcsSummary builds the array used in /api/services and /api/pcs.
func pcsSummary(reachable map[string]bool, total map[string]int, selfPC string) []map[string]any {
out := make([]map[string]any, 0, len(total))
for pc, n := range total {
out = append(out, map[string]any{
"pc_id": pc,
"is_self": pc == selfPC,
"reachable": reachable[pc],
"services_count": n,
})
}
return out
}
func (s *Server) handleForceCheck(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
s.runOneCycle()
close(done)
}()
select {
case <-done:
s.handleServices(w, r)
case <-ctx.Done():
writeJSON(w, http.StatusGatewayTimeout, map[string]any{"error": "check loop timed out"})
}
}
// handleAction handles POST /api/action/{app_id}/{pc_id}/restart.
// Currently the only supported action is "restart"; we keep the URL
// shape general for stop/start in v2.
func (s *Server) handleAction(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// URL: /api/action/{app_id}/{pc_id}/{verb}
path := strings.TrimPrefix(r.URL.Path, "/api/action/")
parts := strings.Split(path, "/")
if len(parts) != 3 {
writeJSON(w, http.StatusBadRequest, map[string]any{
"error": "expected /api/action/{app_id}/{pc_id}/{verb}",
})
return
}
appID, pcID, verb := parts[0], parts[1], parts[2]
if verb != "restart" {
writeJSON(w, http.StatusBadRequest, map[string]any{
"error": "unsupported verb (allowed: restart)",
})
return
}
// Lookup target.
s.mu.RLock()
var t *Target
for i := range s.targets {
if s.targets[i].AppID == appID && s.targets[i].PCID == pcID {
t = &s.targets[i]
break
}
}
s.mu.RUnlock()
if t == nil {
writeJSON(w, http.StatusNotFound, map[string]any{
"error": "target not found",
})
return
}
stdout, stderr, code, err := restartTarget(*t, s.selfPC)
if err != nil && code == -1 {
writeJSON(w, http.StatusInternalServerError, map[string]any{
"app_id": appID,
"pc_id": pcID,
"verb": verb,
"error": err.Error(),
"stderr": stderr,
"exit_code": code,
})
return
}
status := http.StatusOK
if code != 0 {
status = http.StatusBadGateway
}
// Trigger a check cycle so UI refresh shows the new state immediately.
go s.runOneCycle()
writeJSON(w, status, map[string]any{
"app_id": appID,
"pc_id": pcID,
"verb": verb,
"exit_code": code,
"stdout": strings.TrimSpace(stdout),
"stderr": strings.TrimSpace(stderr),
})
}
func (s *Server) handlePCs(w http.ResponseWriter, r *http.Request) {
s.mu.RLock()
targets := append([]Target(nil), s.targets...)
s.mu.RUnlock()
counts := map[string]int{}
for _, t := range targets {
counts[t.PCID]++
}
out := make([]map[string]any, 0, len(counts))
for pc, n := range counts {
out = append(out, map[string]any{
"pc_id": pc,
"is_self": pc == s.selfPC,
"services_count": n,
})
}
writeJSON(w, http.StatusOK, map[string]any{"pcs": out, "self_pc": s.selfPC})
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
_ = enc.Encode(v)
}
// stub to silence unused import
var _ = fmt.Sprintf