f65271dc92
Wire the bus's new account surface into the admin gateway:
- POST /api/invites, GET /api/invites: mint and list single-use registration
invites (CreateInvite/ListInvites on the Repo). The gateway pre-builds the
shareable join link (JoinURL) from a configurable end-user client base URL so
the SPA does not need to know where the client lives.
- DELETE /api/users/{pub}: hard-delete (purge) a user, distinct from the existing
revoke.
- Both backends covered: signed control-plane (cluster default) via the unibus
client's CreateInvite/ListInvites/DeleteUser, and the direct membership store
(single-node --db fallback). For the direct store, ListInvites filters to
pending (the control plane already does so server-side).
- New --join-base-url flag / UNIBUS_JOIN_BASE_URL env feeds the join link base
URL (the END-USER client, NOT the panel's own URL); surfaced on /api/me.
- Mock repo gains the same methods for UI iteration.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
512 lines
16 KiB
Go
512 lines
16 KiB
Go
package admin
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
cs "fn-registry/functions/cybersecurity"
|
|
|
|
"github.com/enmanuel/unibus/pkg/busauth"
|
|
"github.com/enmanuel/unibus/pkg/client"
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/enmanuel/unibus/pkg/membership"
|
|
"github.com/enmanuel/unibus/pkg/room"
|
|
)
|
|
|
|
// NodeTarget is one cluster node the gateway probes for /healthz.
|
|
type NodeTarget struct {
|
|
Name string
|
|
URL string // e.g. https://magnus.internal:8470
|
|
}
|
|
|
|
// busRepo is the live gateway: it owns the operator's admin identity, a connected
|
|
// unibus client (for crypto-bearing room operations), a CA-pinned HTTP client for
|
|
// signed control-plane GETs and node health probes, and — when available — a
|
|
// direct membership store for the user allowlist.
|
|
type busRepo struct {
|
|
id cs.Identity
|
|
endpoint string
|
|
ctrlURLs []string // control-plane bases, tried in order (failover)
|
|
httpc *http.Client // CA-pinned (or plain) client for signed GETs + healthz
|
|
cli *client.Client
|
|
nodes []NodeTarget
|
|
|
|
// store is an OPTIONAL direct membership store for single-node user
|
|
// management. When nil (the cluster default), user operations go through the
|
|
// signed control-plane API on r.cli instead — see ListUsers/AddUser/RevokeUser.
|
|
store membership.Store
|
|
storeBackend string // "control-plane" (cli) | "sqlite" (direct store fallback)
|
|
|
|
// joinBaseURL is the base URL of the end-user client that hosts /join?token=…
|
|
// (NOT the admin panel). The gateway builds the shareable join link from it so
|
|
// the SPA never has to know where the client lives. Empty when unconfigured.
|
|
joinBaseURL string
|
|
}
|
|
|
|
// BusConfig wires a live gateway.
|
|
type BusConfig struct {
|
|
Identity cs.Identity
|
|
NatsURL string
|
|
CtrlURL string // primary control-plane base
|
|
CtrlURLs []string // additional control-plane bases (cluster failover)
|
|
NatsURLs []string // additional NATS seeds (cluster failover)
|
|
CAPath string // bus CA; empty => plaintext dev connection
|
|
Nodes []NodeTarget // nodes to probe for /healthz
|
|
Store membership.Store
|
|
StoreBackend string
|
|
// JoinBaseURL is the end-user client base URL used to build invite join links.
|
|
JoinBaseURL string
|
|
}
|
|
|
|
// NewBusRepo connects the unibus client with the admin identity and builds the
|
|
// CA-pinned HTTP client used for signed GETs and node health probes. The client
|
|
// connection follows the same posture seam every peer uses (client.Connect): a
|
|
// non-empty CA path means TLS + nkey, empty means plaintext dev.
|
|
func NewBusRepo(cfg BusConfig) (*busRepo, error) {
|
|
opts := client.Options{
|
|
CtrlURLs: cfg.CtrlURLs,
|
|
NatsServers: cfg.NatsURLs,
|
|
}
|
|
if cfg.CAPath != "" {
|
|
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("admin: load bus CA %q: %w", cfg.CAPath, err)
|
|
}
|
|
opts.UseNkey = true
|
|
opts.TLS = tlsCfg
|
|
opts.CtrlTLS = tlsCfg
|
|
}
|
|
cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("admin: connect bus client: %w", err)
|
|
}
|
|
|
|
httpc := &http.Client{Timeout: 8 * time.Second}
|
|
if cfg.CAPath != "" {
|
|
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("admin: load bus CA for http %q: %w", cfg.CAPath, err)
|
|
}
|
|
httpc.Transport = &http.Transport{TLSClientConfig: tlsCfg}
|
|
}
|
|
|
|
ctrlURLs := append([]string{cfg.CtrlURL}, cfg.CtrlURLs...)
|
|
// With no direct store, user management rides the signed control-plane API
|
|
// (works in cluster). A direct store is an explicit single-node fallback.
|
|
backend := "control-plane"
|
|
if cfg.Store != nil {
|
|
backend = cfg.StoreBackend
|
|
}
|
|
return &busRepo{
|
|
id: cfg.Identity,
|
|
endpoint: frame.EndpointID(cfg.Identity.SignPub),
|
|
ctrlURLs: ctrlURLs,
|
|
httpc: httpc,
|
|
cli: cli,
|
|
nodes: cfg.Nodes,
|
|
store: cfg.Store,
|
|
storeBackend: backend,
|
|
joinBaseURL: strings.TrimRight(cfg.JoinBaseURL, "/"),
|
|
}, nil
|
|
}
|
|
|
|
// joinURL builds the shareable registration link for a token from the configured
|
|
// client base URL. It returns "" when no base URL is configured, so the SPA can
|
|
// fall back to its own origin (and warn that the link should be configured).
|
|
func (r *busRepo) joinURL(token string) string {
|
|
if r.joinBaseURL == "" {
|
|
return ""
|
|
}
|
|
return r.joinBaseURL + "/join?token=" + token
|
|
}
|
|
|
|
// Close releases the bus client connection.
|
|
func (r *busRepo) Close() error {
|
|
if r.cli != nil {
|
|
return r.cli.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *busRepo) Me(context.Context) MeInfo {
|
|
return MeInfo{
|
|
Endpoint: r.endpoint,
|
|
SignPub: hex.EncodeToString(r.id.SignPub),
|
|
UsersBackend: r.storeBackend,
|
|
Mock: false,
|
|
JoinBaseURL: r.joinBaseURL,
|
|
}
|
|
}
|
|
|
|
// ---- cluster health -------------------------------------------------------
|
|
|
|
// healthzResp is the shape membershipd's GET /healthz returns.
|
|
type healthzResp struct {
|
|
Status string `json:"status"`
|
|
Posture struct {
|
|
Enforce bool `json:"enforce"`
|
|
ACL bool `json:"acl"`
|
|
TLS bool `json:"tls"`
|
|
Cluster bool `json:"cluster"`
|
|
Store string `json:"store"`
|
|
} `json:"posture"`
|
|
}
|
|
|
|
func (r *busRepo) Cluster(ctx context.Context) []NodeHealth {
|
|
out := make([]NodeHealth, 0, len(r.nodes))
|
|
for _, n := range r.nodes {
|
|
out = append(out, r.probeNode(ctx, n))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// probeNode does an unauthenticated GET /healthz (the one auth-exempt route) and
|
|
// maps the response to NodeHealth. Any transport or decode failure is reported
|
|
// as down with the error, never panicking the whole cluster view.
|
|
func (r *busRepo) probeNode(ctx context.Context, n NodeTarget) NodeHealth {
|
|
nh := NodeHealth{Name: n.Name, URL: n.URL}
|
|
url := strings.TrimRight(n.URL, "/") + "/healthz"
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
nh.Error = err.Error()
|
|
return nh
|
|
}
|
|
start := time.Now()
|
|
resp, err := r.httpc.Do(req)
|
|
nh.LatencyMs = time.Since(start).Milliseconds()
|
|
if err != nil {
|
|
nh.Error = err.Error()
|
|
return nh
|
|
}
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16))
|
|
if resp.StatusCode != http.StatusOK {
|
|
nh.Error = fmt.Sprintf("HTTP %d", resp.StatusCode)
|
|
return nh
|
|
}
|
|
var hr healthzResp
|
|
if err := json.Unmarshal(body, &hr); err != nil {
|
|
nh.Error = "bad healthz json: " + err.Error()
|
|
return nh
|
|
}
|
|
nh.Up = hr.Status == "ok"
|
|
nh.Posture = Posture{
|
|
Enforce: hr.Posture.Enforce,
|
|
ACL: hr.Posture.ACL,
|
|
TLS: hr.Posture.TLS,
|
|
Cluster: hr.Posture.Cluster,
|
|
Store: hr.Posture.Store,
|
|
}
|
|
return nh
|
|
}
|
|
|
|
// ---- rooms ----------------------------------------------------------------
|
|
|
|
func (r *busRepo) ListRooms(context.Context) ([]RoomView, error) {
|
|
rooms, err := r.cli.ListMyRooms()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]RoomView, 0, len(rooms))
|
|
for _, rm := range rooms {
|
|
out = append(out, RoomView{
|
|
RoomID: rm.RoomID,
|
|
Subject: rm.Subject,
|
|
Epoch: rm.Epoch,
|
|
Encrypt: rm.Policy.Encrypt,
|
|
Persist: rm.Policy.Persist,
|
|
SignMsgs: rm.Policy.SignMsgs,
|
|
Role: rm.Role,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (r *busRepo) CreateRoom(_ context.Context, req CreateRoomReq) (RoomView, error) {
|
|
p := room.Policy{Encrypt: req.Encrypt, Persist: req.Persist, SignMsgs: req.SignMsgs}
|
|
roomID, err := r.cli.CreateRoom(req.Subject, p)
|
|
if err != nil {
|
|
return RoomView{}, err
|
|
}
|
|
// Under a per-subject ACL the admin's frozen NATS permissions do not yet cover
|
|
// the new room's subject; refresh the session so subsequent data-plane use of
|
|
// this room works. On a non-ACL bus this is a harmless reconnect.
|
|
_ = r.cli.RefreshSession()
|
|
return RoomView{
|
|
RoomID: roomID,
|
|
Subject: req.Subject,
|
|
Epoch: 1,
|
|
Encrypt: req.Encrypt,
|
|
Persist: req.Persist,
|
|
SignMsgs: req.SignMsgs,
|
|
Role: "owner",
|
|
}, nil
|
|
}
|
|
|
|
func (r *busRepo) Invite(_ context.Context, roomID string, req InviteReq) error {
|
|
signPub, err := hex.DecodeString(strings.TrimSpace(req.SignPub))
|
|
if err != nil || len(signPub) != 32 {
|
|
return fmt.Errorf("admin: invite: sign_pub must be 32-byte hex")
|
|
}
|
|
kexPub, err := hex.DecodeString(strings.TrimSpace(req.KexPub))
|
|
if err != nil || len(kexPub) != 32 {
|
|
return fmt.Errorf("admin: invite: kex_pub must be 32-byte hex")
|
|
}
|
|
endpoint := strings.TrimSpace(req.Endpoint)
|
|
if endpoint == "" {
|
|
endpoint = frame.EndpointID(signPub)
|
|
}
|
|
return r.cli.Invite(roomID, client.Endpoint{ID: endpoint, SignPub: signPub, KexPub: kexPub})
|
|
}
|
|
|
|
func (r *busRepo) KickMember(_ context.Context, roomID, endpoint string) error {
|
|
return r.cli.Kick(roomID, endpoint)
|
|
}
|
|
|
|
// ListMembers performs a signed GET /rooms/{id}/members. The unibus client does
|
|
// not export a member listing, so the gateway builds the request with the
|
|
// canonical signing construction the bus owns (membership.CanonicalRequest +
|
|
// cs.SignEd25519) — reusing the bus's single source of truth for the byte layout
|
|
// rather than reimplementing signing. The admin must be a member of the room
|
|
// (it is, for rooms it owns) or the control plane answers 403.
|
|
func (r *busRepo) ListMembers(_ context.Context, roomID string) ([]MemberView, error) {
|
|
path := "/rooms/" + roomID + "/members"
|
|
body, err := r.signedGET(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var wire []struct {
|
|
Endpoint string `json:"endpoint"`
|
|
Role string `json:"role"`
|
|
SignPub []byte `json:"sign_pub"`
|
|
KexPub []byte `json:"kex_pub"`
|
|
}
|
|
if err := json.Unmarshal(body, &wire); err != nil {
|
|
return nil, fmt.Errorf("admin: decode members: %w", err)
|
|
}
|
|
out := make([]MemberView, 0, len(wire))
|
|
for _, m := range wire {
|
|
out = append(out, MemberView{
|
|
Endpoint: m.Endpoint,
|
|
Role: m.Role,
|
|
SignPub: hex.EncodeToString(m.SignPub),
|
|
KexPub: hex.EncodeToString(m.KexPub),
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// signedGET issues a transport-authenticated GET against each control-plane base
|
|
// in turn (failover), signing the canonical request bytes with the admin's
|
|
// Ed25519 key under the same X-Unibus-* header scheme the bus client uses.
|
|
func (r *busRepo) signedGET(path string) ([]byte, error) {
|
|
var lastErr error
|
|
for _, base := range r.ctrlURLs {
|
|
req, err := http.NewRequest(http.MethodGet, strings.TrimRight(base, "/")+path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ts := strconv.FormatInt(time.Now().Unix(), 10)
|
|
nonceRaw := make([]byte, 16)
|
|
if _, err := rand.Read(nonceRaw); err != nil {
|
|
return nil, fmt.Errorf("admin: nonce: %w", err)
|
|
}
|
|
nonce := base64.StdEncoding.EncodeToString(nonceRaw)
|
|
canonical := membership.CanonicalRequest(http.MethodGet, path, ts, nonce, nil)
|
|
sig := cs.SignEd25519(r.id.SignPriv, canonical)
|
|
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(r.id.SignPub))
|
|
req.Header.Set("X-Unibus-Ts", ts)
|
|
req.Header.Set("X-Unibus-Nonce", nonce)
|
|
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
|
|
|
|
resp, err := r.httpc.Do(req)
|
|
if err != nil {
|
|
lastErr = err
|
|
continue // dead node: try the next control plane
|
|
}
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
|
if resp.StatusCode >= 300 {
|
|
var er struct {
|
|
Error string `json:"error"`
|
|
}
|
|
if json.Unmarshal(body, &er) == nil && er.Error != "" {
|
|
return nil, fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
|
|
}
|
|
return nil, fmt.Errorf("admin: GET %s -> %d", path, resp.StatusCode)
|
|
}
|
|
return body, nil
|
|
}
|
|
return nil, fmt.Errorf("admin: GET %s: all control planes failed: %w", path, lastErr)
|
|
}
|
|
|
|
// ---- users ----------------------------------------------------------------
|
|
//
|
|
// User management has two backends. The cluster default has no direct store
|
|
// (r.store == nil): every operation goes through the unibus client's admin-only
|
|
// HTTP endpoints (GET/POST /users, POST /users/{signpub}/revoke), each request
|
|
// signed as the operator's admin identity and verified by the bus's requireAdmin
|
|
// against the same store the room handlers use — so it works in cluster without
|
|
// KV/SQLite access. A single-node deployment may instead pass --db to manage the
|
|
// SQLite store directly; that path is kept as an explicit fallback.
|
|
|
|
// UsersWritable reports whether the Users tab can mutate the allowlist. The live
|
|
// gateway always can: either it holds a direct store, or it signs as an admin
|
|
// against the control plane. (A non-admin signer is rejected at request time by
|
|
// the bus with 403; that is an authorization outcome, not a missing capability.)
|
|
func (r *busRepo) UsersWritable() bool { return true }
|
|
|
|
func (r *busRepo) ListUsers(context.Context) ([]UserView, error) {
|
|
if r.store == nil {
|
|
users, err := r.cli.ListUsers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]UserView, 0, len(users))
|
|
for _, u := range users {
|
|
out = append(out, UserView(u))
|
|
}
|
|
return out, nil
|
|
}
|
|
users, err := r.store.ListUsers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]UserView, 0, len(users))
|
|
for _, u := range users {
|
|
out = append(out, UserView{
|
|
SignPub: u.SignPub,
|
|
Handle: u.Handle,
|
|
Role: u.Role,
|
|
Status: u.Status,
|
|
CreatedAt: u.CreatedAt,
|
|
RevokedAt: u.RevokedAt,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (r *busRepo) AddUser(_ context.Context, req AddUserReq) error {
|
|
if r.store == nil {
|
|
return r.cli.AddUser(req.SignPub, req.Handle, req.Role)
|
|
}
|
|
return r.store.AddUser(req.SignPub, req.Handle, req.Role)
|
|
}
|
|
|
|
func (r *busRepo) RevokeUser(_ context.Context, signPub string) error {
|
|
if r.store == nil {
|
|
return r.cli.RevokeUser(signPub)
|
|
}
|
|
return r.store.RevokeUser(signPub)
|
|
}
|
|
|
|
// DeleteUser hard-deletes a user (purge), distinct from RevokeUser. Like the
|
|
// other user ops it goes through the signed control plane in cluster, or the
|
|
// direct store in the single-node fallback.
|
|
func (r *busRepo) DeleteUser(_ context.Context, signPub string) error {
|
|
if r.store == nil {
|
|
return r.cli.DeleteUser(signPub)
|
|
}
|
|
return r.store.DeleteUser(signPub)
|
|
}
|
|
|
|
// ---- invites --------------------------------------------------------------
|
|
|
|
// CreateInvite mints a single-use registration invite and returns it with the
|
|
// shareable join link pre-built. Cluster path goes through the signed control
|
|
// plane; the single-node fallback hits the store directly.
|
|
func (r *busRepo) CreateInvite(_ context.Context, req CreateInviteReq) (InviteView, error) {
|
|
if r.store == nil {
|
|
inv, err := r.cli.CreateInvite(req.Handle, req.Role, req.TTLSecs)
|
|
if err != nil {
|
|
return InviteView{}, err
|
|
}
|
|
return InviteView{
|
|
Token: inv.Token,
|
|
Handle: inv.Handle,
|
|
Role: inv.Role,
|
|
ExpiresAt: inv.ExpiresAt,
|
|
JoinURL: r.joinURL(inv.Token),
|
|
}, nil
|
|
}
|
|
inv, err := r.store.CreateInvite(req.Handle, req.Role, req.TTLSecs)
|
|
if err != nil {
|
|
return InviteView{}, err
|
|
}
|
|
return InviteView{
|
|
Token: inv.Token,
|
|
Handle: inv.Handle,
|
|
Role: inv.Role,
|
|
ExpiresAt: inv.ExpiresAt,
|
|
CreatedAt: inv.CreatedAt,
|
|
JoinURL: r.joinURL(inv.Token),
|
|
}, nil
|
|
}
|
|
|
|
// ListInvites returns the PENDING invites (not used, not expired) with their join
|
|
// links. The control-plane GET /invites already filters to pending; the direct
|
|
// store returns everything, so we filter here for parity.
|
|
func (r *busRepo) ListInvites(_ context.Context) ([]InviteView, error) {
|
|
if r.store == nil {
|
|
invs, err := r.cli.ListInvites()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]InviteView, 0, len(invs))
|
|
for _, inv := range invs {
|
|
out = append(out, InviteView{
|
|
Token: inv.Token,
|
|
Handle: inv.Handle,
|
|
Role: inv.Role,
|
|
ExpiresAt: inv.ExpiresAt,
|
|
Used: inv.Used,
|
|
CreatedAt: inv.CreatedAt,
|
|
JoinURL: r.joinURL(inv.Token),
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
invs, err := r.store.ListInvites()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]InviteView, 0, len(invs))
|
|
for _, inv := range invs {
|
|
if !invitePending(inv.ExpiresAt, inv.Used) {
|
|
continue
|
|
}
|
|
out = append(out, InviteView{
|
|
Token: inv.Token,
|
|
Handle: inv.Handle,
|
|
Role: inv.Role,
|
|
ExpiresAt: inv.ExpiresAt,
|
|
Used: inv.Used,
|
|
CreatedAt: inv.CreatedAt,
|
|
JoinURL: r.joinURL(inv.Token),
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// invitePending reports whether an invite is live (not used, not past its
|
|
// deadline). A malformed deadline is treated as expired (fail closed).
|
|
func invitePending(expiresAt string, used bool) bool {
|
|
if used {
|
|
return false
|
|
}
|
|
exp, err := time.Parse(time.RFC3339Nano, expiresAt)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return time.Now().UTC().Before(exp)
|
|
}
|