Files
unibus_admin/internal/admin/server.go
T
egutierrez c412941e4c feat: route Users management through the signed control-plane API
The gateway previously managed the bus allowlist only via a direct
membership store opened with --db, falling back to a "none" backend that
left the Users tab degraded in cluster (the control plane exposed no user
HTTP endpoint). The unibus control plane now exposes an admin-only user
API (GET/POST /users, POST /users/{signpub}/revoke), and pkg/client wraps
it with ListUsers/AddUser/RevokeUser that sign each request.

busRepo now drives those client methods whenever no direct store is
configured (the cluster default), so user management works in cluster
without KV/SQLite access — the bus verifies the operator's admin identity
with requireAdmin and writes to the same store the room handlers use. A
direct store (--db) is kept as an explicit single-node fallback. The
reported users_backend becomes "control-plane" (or "sqlite" with --db),
and ErrUsersUnavailable / the "none" path are removed since a connected
gateway can always reach the API.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:10:27 +02:00

234 lines
7.0 KiB
Go

package admin
import (
"context"
"encoding/json"
"io/fs"
"log"
"net/http"
"strings"
"time"
)
// Server is the HTTP surface of the admin panel: a small REST API under /api and
// the embedded SPA on every other path. It is intentionally unauthenticated at
// this layer — the deployment fronts it with Caddy basic-auth and the gateway
// itself binds to loopback, so the network boundary is the auth boundary. The
// gateway's privileged identity never leaves this process.
type Server struct {
repo Repo
spa http.Handler
mux *http.ServeMux
}
// NewServer wires the REST handlers and the embedded SPA file server. spaFiles
// is the SPA rooted at its dist directory (index.html + assets/ at the root).
func NewServer(repo Repo, spaFiles fs.FS) *Server {
s := &Server{
repo: repo,
spa: spaHandler(spaFiles),
mux: http.NewServeMux(),
}
s.routes()
return s
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) }
func (s *Server) routes() {
// The admin gateway's own liveness (for systemd / deploy smoke). Distinct from
// the bus nodes' /healthz surfaced under /api/cluster.
s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
})
s.mux.HandleFunc("GET /api/me", s.handleMe)
s.mux.HandleFunc("GET /api/cluster", s.handleCluster)
s.mux.HandleFunc("GET /api/rooms", s.handleListRooms)
s.mux.HandleFunc("POST /api/rooms", s.handleCreateRoom)
s.mux.HandleFunc("GET /api/rooms/{id}/members", s.handleListMembers)
s.mux.HandleFunc("POST /api/rooms/{id}/invite", s.handleInvite)
s.mux.HandleFunc("POST /api/rooms/{id}/kick", s.handleKick)
s.mux.HandleFunc("GET /api/users", s.handleListUsers)
s.mux.HandleFunc("POST /api/users", s.handleAddUser)
s.mux.HandleFunc("POST /api/users/revoke", s.handleRevokeUser)
// Everything else is the SPA (and its assets). Registered last as the catch-all.
s.mux.Handle("/", s.spa)
}
// ---- handlers -------------------------------------------------------------
func (s *Server) handleMe(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, s.repo.Me(r.Context()))
}
func (s *Server) handleCluster(w http.ResponseWriter, r *http.Request) {
ctx, cancel := withTimeout(r)
defer cancel()
writeJSON(w, http.StatusOK, s.repo.Cluster(ctx))
}
func (s *Server) handleListRooms(w http.ResponseWriter, r *http.Request) {
rooms, err := s.repo.ListRooms(r.Context())
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, rooms)
}
func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
var req CreateRoomReq
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.Subject) == "" {
writeErr(w, http.StatusBadRequest, "subject required")
return
}
rv, err := s.repo.CreateRoom(r.Context(), req)
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusCreated, rv)
}
func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) {
members, err := s.repo.ListMembers(r.Context(), r.PathValue("id"))
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, members)
}
func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) {
var req InviteReq
if !decode(w, r, &req) {
return
}
if err := s.repo.Invite(r.Context(), r.PathValue("id"), req); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "invited"})
}
func (s *Server) handleKick(w http.ResponseWriter, r *http.Request) {
var req struct {
Endpoint string `json:"endpoint"`
}
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.Endpoint) == "" {
writeErr(w, http.StatusBadRequest, "endpoint required")
return
}
if err := s.repo.KickMember(r.Context(), r.PathValue("id"), req.Endpoint); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "rekeyed"})
}
func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) {
users, err := s.repo.ListUsers(r.Context())
if err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, users)
}
func (s *Server) handleAddUser(w http.ResponseWriter, r *http.Request) {
var req AddUserReq
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.SignPub) == "" || strings.TrimSpace(req.Handle) == "" {
writeErr(w, http.StatusBadRequest, "sign_pub and handle required")
return
}
if err := s.repo.AddUser(r.Context(), req); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]string{"status": "added"})
}
func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) {
var req struct {
SignPub string `json:"sign_pub"`
}
if !decode(w, r, &req) {
return
}
if strings.TrimSpace(req.SignPub) == "" {
writeErr(w, http.StatusBadRequest, "sign_pub required")
return
}
if err := s.repo.RevokeUser(r.Context(), req.SignPub); err != nil {
writeErr(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"})
}
// ---- SPA serving ----------------------------------------------------------
// spaHandler serves the embedded SPA. A request for an existing asset is served
// directly; any other path (a client-side route) falls back to index.html so the
// SPA router can take over. /api and /healthz never reach here (matched first).
func spaHandler(files fs.FS) http.Handler {
fileServer := http.FileServer(http.FS(files))
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/")
if p == "" {
p = "index.html"
}
if f, err := files.Open(p); err == nil {
_ = f.Close()
fileServer.ServeHTTP(w, r)
return
}
// Unknown path: serve index.html for SPA client-side routing.
r2 := r.Clone(r.Context())
r2.URL.Path = "/"
fileServer.ServeHTTP(w, r2)
})
}
// ---- helpers --------------------------------------------------------------
// withTimeout bounds a request-scoped operation (e.g. probing every cluster
// node) so a slow/dead node cannot hang the handler indefinitely.
func withTimeout(r *http.Request) (context.Context, context.CancelFunc) {
return context.WithTimeout(r.Context(), 6*time.Second)
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func writeErr(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]string{"error": msg})
}
// decode reads a JSON body into v, writing a 400 and returning false on failure.
func decode(w http.ResponseWriter, r *http.Request, v any) bool {
defer r.Body.Close()
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(v); err != nil {
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
log.Printf("[admin] decode body: %v", err)
return false
}
return true
}