package main import ( "context" "database/sql" "encoding/json" "flag" "fmt" "log" "net/http" "os" "path/filepath" "strings" "sync" "time" _ "github.com/mattn/go-sqlite3" ) type Server struct { registryRoot string selfPC string opsDB *sql.DB mu sync.RWMutex targets []Target } func main() { addr := flag.String("bind", "127.0.0.1:8485", "HTTP bind address") registry := flag.String("registry", defaultRegistryRoot(), "fn_registry root") opsPath := flag.String("db", "", "operations.db path (default: /operations.db)") interval := flag.Duration("interval", 15*time.Second, "check loop interval") once := flag.Bool("once", false, "run one check cycle and exit (smoke test)") flag.Parse() if *opsPath == "" { exe, _ := os.Executable() *opsPath = filepath.Join(filepath.Dir(exe), "operations.db") } selfPC := readSelfPC() log.Printf("services_api: selfPC=%q registry=%q ops=%q", selfPC, *registry, *opsPath) db, err := openOpsDB(*opsPath) if err != nil { log.Fatalf("open ops db: %v", err) } defer db.Close() srv := &Server{registryRoot: *registry, selfPC: selfPC, opsDB: db} if err := srv.refreshTargets(); err != nil { log.Fatalf("load targets: %v", err) } if *once { srv.runOneCycle() log.Printf("once: completed %d checks", len(srv.targets)) return } // Background loop. go srv.checkLoop(*interval) mux := http.NewServeMux() mux.HandleFunc("/api/health", srv.handleHealth) mux.HandleFunc("/api/services", srv.handleServices) mux.HandleFunc("/api/check", srv.handleForceCheck) mux.HandleFunc("/api/pcs", srv.handlePCs) mux.HandleFunc("/api/action/", srv.handleAction) log.Printf("services_api listening on %s", *addr) if err := http.ListenAndServe(*addr, mux); err != nil { log.Fatal(err) } } func defaultRegistryRoot() string { if v := os.Getenv("FN_REGISTRY_ROOT"); v != "" { return v } exe, _ := os.Executable() // Walk up looking for registry.db. dir := filepath.Dir(exe) for i := 0; i < 6; i++ { if _, err := os.Stat(filepath.Join(dir, "registry.db")); err == nil { return dir } parent := filepath.Dir(dir) if parent == dir { break } dir = parent } return "/home/lucas/fn_registry" } func readSelfPC() string { home, _ := os.UserHomeDir() b, err := os.ReadFile(filepath.Join(home, ".fn_pc")) if err != nil { return "unknown" } return strings.TrimSpace(string(b)) } func (s *Server) refreshTargets() error { ts, err := loadTargets(s.registryRoot) if err != nil { return err } s.mu.Lock() s.targets = ts s.mu.Unlock() return nil } func (s *Server) runOneCycle() { s.mu.RLock() targets := append([]Target(nil), s.targets...) s.mu.RUnlock() // Parallel probes (each remote SSH call can stall up to ~10s). // Cap concurrency to avoid overwhelming SSH agent / sockets. const maxParallel = 8 sem := make(chan struct{}, maxParallel) results := make(chan ServiceCheck, len(targets)) var wg sync.WaitGroup for _, t := range targets { wg.Add(1) sem <- struct{}{} go func(tt Target) { defer wg.Done() defer func() { <-sem }() done := make(chan ServiceCheck, 1) go func() { done <- probeTarget(tt, s.selfPC) }() select { case c := <-done: results <- c case <-time.After(20 * time.Second): results <- ServiceCheck{ AppID: tt.AppID, PCID: tt.PCID, SystemdState: "timeout", Error: "probe exceeded 20s", Overall: "no-route", } } }(t) } go func() { wg.Wait(); close(results) }() for c := range results { if err := upsertState(s.opsDB, c); err != nil { log.Printf("upsert %s/%s: %v", c.AppID, c.PCID, err) } } } func (s *Server) checkLoop(interval time.Duration) { tick := time.NewTicker(interval) defer tick.Stop() // First cycle immediately, then on interval. s.runOneCycle() refreshEvery := 5 * time.Minute lastRefresh := time.Now() for { <-tick.C if time.Since(lastRefresh) > refreshEvery { if err := s.refreshTargets(); err != nil { log.Printf("refresh targets: %v", err) } lastRefresh = time.Now() } s.runOneCycle() } } // HTTP handlers ------------------------------------------------------------ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "self_pc": s.selfPC}) } func (s *Server) handleServices(w http.ResponseWriter, r *http.Request) { s.mu.RLock() targets := append([]Target(nil), s.targets...) s.mu.RUnlock() rows, err := loadStates(s.opsDB, targets, s.selfPC) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]any{"error": err.Error()}) return } // Compute reachable per PC: a PC is unreachable when all its services // returned no-route (we never got systemd state) AND it is not selfPC. noRouteByPC := map[string]int{} totalByPC := map[string]int{} for _, r := range rows { totalByPC[r.PCID]++ if r.Overall == "no-route" { noRouteByPC[r.PCID]++ } } reachableByPC := map[string]bool{} for pc, total := range totalByPC { if pc == s.selfPC { reachableByPC[pc] = true continue } reachableByPC[pc] = noRouteByPC[pc] < total } for i := range rows { rows[i].Reachable = reachableByPC[rows[i].PCID] } writeJSON(w, http.StatusOK, map[string]any{ "self_pc": s.selfPC, "services": rows, "ts": time.Now().Unix(), "pcs": pcsSummary(reachableByPC, totalByPC, s.selfPC), }) } // pcsSummary builds the array used in /api/services and /api/pcs. func pcsSummary(reachable map[string]bool, total map[string]int, selfPC string) []map[string]any { out := make([]map[string]any, 0, len(total)) for pc, n := range total { out = append(out, map[string]any{ "pc_id": pc, "is_self": pc == selfPC, "reachable": reachable[pc], "services_count": n, }) } return out } func (s *Server) handleForceCheck(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second) defer cancel() done := make(chan struct{}) go func() { s.runOneCycle() close(done) }() select { case <-done: s.handleServices(w, r) case <-ctx.Done(): writeJSON(w, http.StatusGatewayTimeout, map[string]any{"error": "check loop timed out"}) } } // handleAction handles POST /api/action/{app_id}/{pc_id}/restart. // Currently the only supported action is "restart"; we keep the URL // shape general for stop/start in v2. func (s *Server) handleAction(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } // URL: /api/action/{app_id}/{pc_id}/{verb} path := strings.TrimPrefix(r.URL.Path, "/api/action/") parts := strings.Split(path, "/") if len(parts) != 3 { writeJSON(w, http.StatusBadRequest, map[string]any{ "error": "expected /api/action/{app_id}/{pc_id}/{verb}", }) return } appID, pcID, verb := parts[0], parts[1], parts[2] if verb != "restart" { writeJSON(w, http.StatusBadRequest, map[string]any{ "error": "unsupported verb (allowed: restart)", }) return } // Lookup target. s.mu.RLock() var t *Target for i := range s.targets { if s.targets[i].AppID == appID && s.targets[i].PCID == pcID { t = &s.targets[i] break } } s.mu.RUnlock() if t == nil { writeJSON(w, http.StatusNotFound, map[string]any{ "error": "target not found", }) return } stdout, stderr, code, err := restartTarget(*t, s.selfPC) if err != nil && code == -1 { writeJSON(w, http.StatusInternalServerError, map[string]any{ "app_id": appID, "pc_id": pcID, "verb": verb, "error": err.Error(), "stderr": stderr, "exit_code": code, }) return } status := http.StatusOK if code != 0 { status = http.StatusBadGateway } // Trigger a check cycle so UI refresh shows the new state immediately. go s.runOneCycle() writeJSON(w, status, map[string]any{ "app_id": appID, "pc_id": pcID, "verb": verb, "exit_code": code, "stdout": strings.TrimSpace(stdout), "stderr": strings.TrimSpace(stderr), }) } func (s *Server) handlePCs(w http.ResponseWriter, r *http.Request) { s.mu.RLock() targets := append([]Target(nil), s.targets...) s.mu.RUnlock() counts := map[string]int{} for _, t := range targets { counts[t.PCID]++ } out := make([]map[string]any, 0, len(counts)) for pc, n := range counts { out = append(out, map[string]any{ "pc_id": pc, "is_self": pc == s.selfPC, "services_count": n, }) } writeJSON(w, http.StatusOK, map[string]any{"pcs": out, "self_pc": s.selfPC}) } func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) enc := json.NewEncoder(w) enc.SetIndent("", " ") _ = enc.Encode(v) } // stub to silence unused import var _ = fmt.Sprintf