package admin import ( "context" "crypto/rand" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "strconv" "strings" "time" cs "fn-registry/functions/cybersecurity" "github.com/enmanuel/unibus/pkg/busauth" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/frame" "github.com/enmanuel/unibus/pkg/membership" "github.com/enmanuel/unibus/pkg/room" ) // NodeTarget is one cluster node the gateway probes for /healthz. type NodeTarget struct { Name string URL string // e.g. https://magnus.internal:8470 } // busRepo is the live gateway: it owns the operator's admin identity, a connected // unibus client (for crypto-bearing room operations), a CA-pinned HTTP client for // signed control-plane GETs and node health probes, and — when available — a // direct membership store for the user allowlist. type busRepo struct { id cs.Identity endpoint string ctrlURLs []string // control-plane bases, tried in order (failover) httpc *http.Client // CA-pinned (or plain) client for signed GETs + healthz cli *client.Client nodes []NodeTarget // store is an OPTIONAL direct membership store for single-node user // management. When nil (the cluster default), user operations go through the // signed control-plane API on r.cli instead — see ListUsers/AddUser/RevokeUser. store membership.Store storeBackend string // "control-plane" (cli) | "sqlite" (direct store fallback) // joinBaseURL is the base URL of the end-user client that hosts /join?token=… // (NOT the admin panel). The gateway builds the shareable join link from it so // the SPA never has to know where the client lives. Empty when unconfigured. joinBaseURL string } // BusConfig wires a live gateway. type BusConfig struct { Identity cs.Identity NatsURL string CtrlURL string // primary control-plane base CtrlURLs []string // additional control-plane bases (cluster failover) NatsURLs []string // additional NATS seeds (cluster failover) CAPath string // bus CA; empty => plaintext dev connection Nodes []NodeTarget // nodes to probe for /healthz Store membership.Store StoreBackend string // JoinBaseURL is the end-user client base URL used to build invite join links. JoinBaseURL string } // NewBusRepo connects the unibus client with the admin identity and builds the // CA-pinned HTTP client used for signed GETs and node health probes. The client // connection follows the same posture seam every peer uses (client.Connect): a // non-empty CA path means TLS + nkey, empty means plaintext dev. func NewBusRepo(cfg BusConfig) (*busRepo, error) { opts := client.Options{ CtrlURLs: cfg.CtrlURLs, NatsServers: cfg.NatsURLs, } if cfg.CAPath != "" { tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath) if err != nil { return nil, fmt.Errorf("admin: load bus CA %q: %w", cfg.CAPath, err) } opts.UseNkey = true opts.TLS = tlsCfg opts.CtrlTLS = tlsCfg } cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts) if err != nil { return nil, fmt.Errorf("admin: connect bus client: %w", err) } httpc := &http.Client{Timeout: 8 * time.Second} if cfg.CAPath != "" { tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath) if err != nil { return nil, fmt.Errorf("admin: load bus CA for http %q: %w", cfg.CAPath, err) } httpc.Transport = &http.Transport{TLSClientConfig: tlsCfg} } ctrlURLs := append([]string{cfg.CtrlURL}, cfg.CtrlURLs...) // With no direct store, user management rides the signed control-plane API // (works in cluster). A direct store is an explicit single-node fallback. backend := "control-plane" if cfg.Store != nil { backend = cfg.StoreBackend } return &busRepo{ id: cfg.Identity, endpoint: frame.EndpointID(cfg.Identity.SignPub), ctrlURLs: ctrlURLs, httpc: httpc, cli: cli, nodes: cfg.Nodes, store: cfg.Store, storeBackend: backend, joinBaseURL: strings.TrimRight(cfg.JoinBaseURL, "/"), }, nil } // joinURL builds the shareable registration link for a token from the configured // client base URL. It returns "" when no base URL is configured, so the SPA can // fall back to its own origin (and warn that the link should be configured). func (r *busRepo) joinURL(token string) string { if r.joinBaseURL == "" { return "" } return r.joinBaseURL + "/join?token=" + token } // Close releases the bus client connection. func (r *busRepo) Close() error { if r.cli != nil { return r.cli.Close() } return nil } func (r *busRepo) Me(context.Context) MeInfo { return MeInfo{ Endpoint: r.endpoint, SignPub: hex.EncodeToString(r.id.SignPub), UsersBackend: r.storeBackend, Mock: false, JoinBaseURL: r.joinBaseURL, } } // ---- cluster health ------------------------------------------------------- // healthzResp is the shape membershipd's GET /healthz returns. type healthzResp struct { Status string `json:"status"` Posture struct { Enforce bool `json:"enforce"` ACL bool `json:"acl"` TLS bool `json:"tls"` Cluster bool `json:"cluster"` Store string `json:"store"` } `json:"posture"` } func (r *busRepo) Cluster(ctx context.Context) []NodeHealth { out := make([]NodeHealth, 0, len(r.nodes)) for _, n := range r.nodes { out = append(out, r.probeNode(ctx, n)) } return out } // probeNode does an unauthenticated GET /healthz (the one auth-exempt route) and // maps the response to NodeHealth. Any transport or decode failure is reported // as down with the error, never panicking the whole cluster view. func (r *busRepo) probeNode(ctx context.Context, n NodeTarget) NodeHealth { nh := NodeHealth{Name: n.Name, URL: n.URL} url := strings.TrimRight(n.URL, "/") + "/healthz" req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { nh.Error = err.Error() return nh } start := time.Now() resp, err := r.httpc.Do(req) nh.LatencyMs = time.Since(start).Milliseconds() if err != nil { nh.Error = err.Error() return nh } defer resp.Body.Close() body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16)) if resp.StatusCode != http.StatusOK { nh.Error = fmt.Sprintf("HTTP %d", resp.StatusCode) return nh } var hr healthzResp if err := json.Unmarshal(body, &hr); err != nil { nh.Error = "bad healthz json: " + err.Error() return nh } nh.Up = hr.Status == "ok" nh.Posture = Posture{ Enforce: hr.Posture.Enforce, ACL: hr.Posture.ACL, TLS: hr.Posture.TLS, Cluster: hr.Posture.Cluster, Store: hr.Posture.Store, } return nh } // ---- rooms ---------------------------------------------------------------- func (r *busRepo) ListRooms(context.Context) ([]RoomView, error) { rooms, err := r.cli.ListMyRooms() if err != nil { return nil, err } out := make([]RoomView, 0, len(rooms)) for _, rm := range rooms { out = append(out, RoomView{ RoomID: rm.RoomID, Subject: rm.Subject, Epoch: rm.Epoch, Encrypt: rm.Policy.Encrypt, Persist: rm.Policy.Persist, SignMsgs: rm.Policy.SignMsgs, Role: rm.Role, }) } return out, nil } func (r *busRepo) CreateRoom(_ context.Context, req CreateRoomReq) (RoomView, error) { p := room.Policy{Encrypt: req.Encrypt, Persist: req.Persist, SignMsgs: req.SignMsgs} roomID, err := r.cli.CreateRoom(req.Subject, p) if err != nil { return RoomView{}, err } // Under a per-subject ACL the admin's frozen NATS permissions do not yet cover // the new room's subject; refresh the session so subsequent data-plane use of // this room works. On a non-ACL bus this is a harmless reconnect. _ = r.cli.RefreshSession() return RoomView{ RoomID: roomID, Subject: req.Subject, Epoch: 1, Encrypt: req.Encrypt, Persist: req.Persist, SignMsgs: req.SignMsgs, Role: "owner", }, nil } func (r *busRepo) Invite(_ context.Context, roomID string, req InviteReq) error { signPub, err := hex.DecodeString(strings.TrimSpace(req.SignPub)) if err != nil || len(signPub) != 32 { return fmt.Errorf("admin: invite: sign_pub must be 32-byte hex") } kexPub, err := hex.DecodeString(strings.TrimSpace(req.KexPub)) if err != nil || len(kexPub) != 32 { return fmt.Errorf("admin: invite: kex_pub must be 32-byte hex") } endpoint := strings.TrimSpace(req.Endpoint) if endpoint == "" { endpoint = frame.EndpointID(signPub) } return r.cli.Invite(roomID, client.Endpoint{ID: endpoint, SignPub: signPub, KexPub: kexPub}) } func (r *busRepo) KickMember(_ context.Context, roomID, endpoint string) error { return r.cli.Kick(roomID, endpoint) } // ListMembers performs a signed GET /rooms/{id}/members. The unibus client does // not export a member listing, so the gateway builds the request with the // canonical signing construction the bus owns (membership.CanonicalRequest + // cs.SignEd25519) — reusing the bus's single source of truth for the byte layout // rather than reimplementing signing. The admin must be a member of the room // (it is, for rooms it owns) or the control plane answers 403. func (r *busRepo) ListMembers(_ context.Context, roomID string) ([]MemberView, error) { path := "/rooms/" + roomID + "/members" body, err := r.signedGET(path) if err != nil { return nil, err } var wire []struct { Endpoint string `json:"endpoint"` Role string `json:"role"` SignPub []byte `json:"sign_pub"` KexPub []byte `json:"kex_pub"` } if err := json.Unmarshal(body, &wire); err != nil { return nil, fmt.Errorf("admin: decode members: %w", err) } out := make([]MemberView, 0, len(wire)) for _, m := range wire { out = append(out, MemberView{ Endpoint: m.Endpoint, Role: m.Role, SignPub: hex.EncodeToString(m.SignPub), KexPub: hex.EncodeToString(m.KexPub), }) } return out, nil } // signedGET issues a transport-authenticated GET against each control-plane base // in turn (failover), signing the canonical request bytes with the admin's // Ed25519 key under the same X-Unibus-* header scheme the bus client uses. func (r *busRepo) signedGET(path string) ([]byte, error) { var lastErr error for _, base := range r.ctrlURLs { req, err := http.NewRequest(http.MethodGet, strings.TrimRight(base, "/")+path, nil) if err != nil { return nil, err } ts := strconv.FormatInt(time.Now().Unix(), 10) nonceRaw := make([]byte, 16) if _, err := rand.Read(nonceRaw); err != nil { return nil, fmt.Errorf("admin: nonce: %w", err) } nonce := base64.StdEncoding.EncodeToString(nonceRaw) canonical := membership.CanonicalRequest(http.MethodGet, path, ts, nonce, nil) sig := cs.SignEd25519(r.id.SignPriv, canonical) req.Header.Set("X-Unibus-Pub", hex.EncodeToString(r.id.SignPub)) req.Header.Set("X-Unibus-Ts", ts) req.Header.Set("X-Unibus-Nonce", nonce) req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig)) resp, err := r.httpc.Do(req) if err != nil { lastErr = err continue // dead node: try the next control plane } defer resp.Body.Close() body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if resp.StatusCode >= 300 { var er struct { Error string `json:"error"` } if json.Unmarshal(body, &er) == nil && er.Error != "" { return nil, fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode) } return nil, fmt.Errorf("admin: GET %s -> %d", path, resp.StatusCode) } return body, nil } return nil, fmt.Errorf("admin: GET %s: all control planes failed: %w", path, lastErr) } // ---- users ---------------------------------------------------------------- // // User management has two backends. The cluster default has no direct store // (r.store == nil): every operation goes through the unibus client's admin-only // HTTP endpoints (GET/POST /users, POST /users/{signpub}/revoke), each request // signed as the operator's admin identity and verified by the bus's requireAdmin // against the same store the room handlers use — so it works in cluster without // KV/SQLite access. A single-node deployment may instead pass --db to manage the // SQLite store directly; that path is kept as an explicit fallback. // UsersWritable reports whether the Users tab can mutate the allowlist. The live // gateway always can: either it holds a direct store, or it signs as an admin // against the control plane. (A non-admin signer is rejected at request time by // the bus with 403; that is an authorization outcome, not a missing capability.) func (r *busRepo) UsersWritable() bool { return true } func (r *busRepo) ListUsers(context.Context) ([]UserView, error) { if r.store == nil { users, err := r.cli.ListUsers() if err != nil { return nil, err } out := make([]UserView, 0, len(users)) for _, u := range users { out = append(out, UserView(u)) } return out, nil } users, err := r.store.ListUsers() if err != nil { return nil, err } out := make([]UserView, 0, len(users)) for _, u := range users { out = append(out, UserView{ SignPub: u.SignPub, Handle: u.Handle, Role: u.Role, Status: u.Status, CreatedAt: u.CreatedAt, RevokedAt: u.RevokedAt, }) } return out, nil } func (r *busRepo) AddUser(_ context.Context, req AddUserReq) error { if r.store == nil { return r.cli.AddUser(req.SignPub, req.Handle, req.Role) } return r.store.AddUser(req.SignPub, req.Handle, req.Role) } func (r *busRepo) RevokeUser(_ context.Context, signPub string) error { if r.store == nil { return r.cli.RevokeUser(signPub) } return r.store.RevokeUser(signPub) } // DeleteUser hard-deletes a user (purge), distinct from RevokeUser. Like the // other user ops it goes through the signed control plane in cluster, or the // direct store in the single-node fallback. func (r *busRepo) DeleteUser(_ context.Context, signPub string) error { if r.store == nil { return r.cli.DeleteUser(signPub) } return r.store.DeleteUser(signPub) } // ---- invites -------------------------------------------------------------- // CreateInvite mints a single-use registration invite and returns it with the // shareable join link pre-built. Cluster path goes through the signed control // plane; the single-node fallback hits the store directly. func (r *busRepo) CreateInvite(_ context.Context, req CreateInviteReq) (InviteView, error) { if r.store == nil { inv, err := r.cli.CreateInvite(req.Handle, req.Role, req.TTLSecs) if err != nil { return InviteView{}, err } return InviteView{ Token: inv.Token, Handle: inv.Handle, Role: inv.Role, ExpiresAt: inv.ExpiresAt, JoinURL: r.joinURL(inv.Token), }, nil } inv, err := r.store.CreateInvite(req.Handle, req.Role, req.TTLSecs) if err != nil { return InviteView{}, err } return InviteView{ Token: inv.Token, Handle: inv.Handle, Role: inv.Role, ExpiresAt: inv.ExpiresAt, CreatedAt: inv.CreatedAt, JoinURL: r.joinURL(inv.Token), }, nil } // ListInvites returns the PENDING invites (not used, not expired) with their join // links. The control-plane GET /invites already filters to pending; the direct // store returns everything, so we filter here for parity. func (r *busRepo) ListInvites(_ context.Context) ([]InviteView, error) { if r.store == nil { invs, err := r.cli.ListInvites() if err != nil { return nil, err } out := make([]InviteView, 0, len(invs)) for _, inv := range invs { out = append(out, InviteView{ Token: inv.Token, Handle: inv.Handle, Role: inv.Role, ExpiresAt: inv.ExpiresAt, Used: inv.Used, CreatedAt: inv.CreatedAt, JoinURL: r.joinURL(inv.Token), }) } return out, nil } invs, err := r.store.ListInvites() if err != nil { return nil, err } out := make([]InviteView, 0, len(invs)) for _, inv := range invs { if !invitePending(inv.ExpiresAt, inv.Used) { continue } out = append(out, InviteView{ Token: inv.Token, Handle: inv.Handle, Role: inv.Role, ExpiresAt: inv.ExpiresAt, Used: inv.Used, CreatedAt: inv.CreatedAt, JoinURL: r.joinURL(inv.Token), }) } return out, nil } // invitePending reports whether an invite is live (not used, not past its // deadline). A malformed deadline is treated as expired (fail closed). func invitePending(expiresAt string, used bool) bool { if used { return false } exp, err := time.Parse(time.RFC3339Nano, expiresAt) if err != nil { return false } return time.Now().UTC().Before(exp) }