Files
unibus_admin/internal/admin/repo_bus.go
T
Egutierrez 8d893d216b feat: scaffold unibus_admin gateway (Go REST + embed SPA placeholder)
Single Go binary: serves an embedded Mantine SPA and a small REST API over the
unibus control plane. Holds the operator ADMIN identity, signs every
control-plane request, never exposes a private key to the browser.

- internal/admin: Repo interface + mock + bus implementations, REST server
- repo_bus: rooms via pkg/client, members via signed GET (CanonicalRequest +
  SignEd25519), cluster via /healthz (CA-pinned), users via membership.Store
- identity loaded from pass entry or 0600 file (operator-identity JSON)
- go build CGO_ENABLED=0 green; go vet clean

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 19:27:49 +02:00

367 lines
11 KiB
Go

package admin
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/busauth"
"github.com/enmanuel/unibus/pkg/client"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
)
// NodeTarget is one cluster node the gateway probes for /healthz.
type NodeTarget struct {
Name string
URL string // e.g. https://magnus.internal:8470
}
// busRepo is the live gateway: it owns the operator's admin identity, a connected
// unibus client (for crypto-bearing room operations), a CA-pinned HTTP client for
// signed control-plane GETs and node health probes, and — when available — a
// direct membership store for the user allowlist.
type busRepo struct {
id cs.Identity
endpoint string
ctrlURLs []string // control-plane bases, tried in order (failover)
httpc *http.Client // CA-pinned (or plain) client for signed GETs + healthz
cli *client.Client
nodes []NodeTarget
store membership.Store // optional; nil => Users tab degraded
storeBackend string // "sqlite" | "kv" | "none"
}
// BusConfig wires a live gateway.
type BusConfig struct {
Identity cs.Identity
NatsURL string
CtrlURL string // primary control-plane base
CtrlURLs []string // additional control-plane bases (cluster failover)
NatsURLs []string // additional NATS seeds (cluster failover)
CAPath string // bus CA; empty => plaintext dev connection
Nodes []NodeTarget // nodes to probe for /healthz
Store membership.Store
StoreBackend string
}
// NewBusRepo connects the unibus client with the admin identity and builds the
// CA-pinned HTTP client used for signed GETs and node health probes. The client
// connection follows the same posture seam every peer uses (client.Connect): a
// non-empty CA path means TLS + nkey, empty means plaintext dev.
func NewBusRepo(cfg BusConfig) (*busRepo, error) {
opts := client.Options{
CtrlURLs: cfg.CtrlURLs,
NatsServers: cfg.NatsURLs,
}
if cfg.CAPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
if err != nil {
return nil, fmt.Errorf("admin: load bus CA %q: %w", cfg.CAPath, err)
}
opts.UseNkey = true
opts.TLS = tlsCfg
opts.CtrlTLS = tlsCfg
}
cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts)
if err != nil {
return nil, fmt.Errorf("admin: connect bus client: %w", err)
}
httpc := &http.Client{Timeout: 8 * time.Second}
if cfg.CAPath != "" {
tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath)
if err != nil {
return nil, fmt.Errorf("admin: load bus CA for http %q: %w", cfg.CAPath, err)
}
httpc.Transport = &http.Transport{TLSClientConfig: tlsCfg}
}
ctrlURLs := append([]string{cfg.CtrlURL}, cfg.CtrlURLs...)
backend := cfg.StoreBackend
if cfg.Store == nil {
backend = "none"
}
return &busRepo{
id: cfg.Identity,
endpoint: frame.EndpointID(cfg.Identity.SignPub),
ctrlURLs: ctrlURLs,
httpc: httpc,
cli: cli,
nodes: cfg.Nodes,
store: cfg.Store,
storeBackend: backend,
}, nil
}
// Close releases the bus client connection.
func (r *busRepo) Close() error {
if r.cli != nil {
return r.cli.Close()
}
return nil
}
func (r *busRepo) Me(context.Context) MeInfo {
return MeInfo{
Endpoint: r.endpoint,
SignPub: hex.EncodeToString(r.id.SignPub),
UsersBackend: r.storeBackend,
Mock: false,
}
}
// ---- cluster health -------------------------------------------------------
// healthzResp is the shape membershipd's GET /healthz returns.
type healthzResp struct {
Status string `json:"status"`
Posture struct {
Enforce bool `json:"enforce"`
ACL bool `json:"acl"`
TLS bool `json:"tls"`
Cluster bool `json:"cluster"`
Store string `json:"store"`
} `json:"posture"`
}
func (r *busRepo) Cluster(ctx context.Context) []NodeHealth {
out := make([]NodeHealth, 0, len(r.nodes))
for _, n := range r.nodes {
out = append(out, r.probeNode(ctx, n))
}
return out
}
// probeNode does an unauthenticated GET /healthz (the one auth-exempt route) and
// maps the response to NodeHealth. Any transport or decode failure is reported
// as down with the error, never panicking the whole cluster view.
func (r *busRepo) probeNode(ctx context.Context, n NodeTarget) NodeHealth {
nh := NodeHealth{Name: n.Name, URL: n.URL}
url := strings.TrimRight(n.URL, "/") + "/healthz"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
nh.Error = err.Error()
return nh
}
start := time.Now()
resp, err := r.httpc.Do(req)
nh.LatencyMs = time.Since(start).Milliseconds()
if err != nil {
nh.Error = err.Error()
return nh
}
defer resp.Body.Close()
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16))
if resp.StatusCode != http.StatusOK {
nh.Error = fmt.Sprintf("HTTP %d", resp.StatusCode)
return nh
}
var hr healthzResp
if err := json.Unmarshal(body, &hr); err != nil {
nh.Error = "bad healthz json: " + err.Error()
return nh
}
nh.Up = hr.Status == "ok"
nh.Posture = Posture{
Enforce: hr.Posture.Enforce,
ACL: hr.Posture.ACL,
TLS: hr.Posture.TLS,
Cluster: hr.Posture.Cluster,
Store: hr.Posture.Store,
}
return nh
}
// ---- rooms ----------------------------------------------------------------
func (r *busRepo) ListRooms(context.Context) ([]RoomView, error) {
rooms, err := r.cli.ListMyRooms()
if err != nil {
return nil, err
}
out := make([]RoomView, 0, len(rooms))
for _, rm := range rooms {
out = append(out, RoomView{
RoomID: rm.RoomID,
Subject: rm.Subject,
Epoch: rm.Epoch,
Encrypt: rm.Policy.Encrypt,
Persist: rm.Policy.Persist,
SignMsgs: rm.Policy.SignMsgs,
Role: rm.Role,
})
}
return out, nil
}
func (r *busRepo) CreateRoom(_ context.Context, req CreateRoomReq) (RoomView, error) {
p := room.Policy{Encrypt: req.Encrypt, Persist: req.Persist, SignMsgs: req.SignMsgs}
roomID, err := r.cli.CreateRoom(req.Subject, p)
if err != nil {
return RoomView{}, err
}
// Under a per-subject ACL the admin's frozen NATS permissions do not yet cover
// the new room's subject; refresh the session so subsequent data-plane use of
// this room works. On a non-ACL bus this is a harmless reconnect.
_ = r.cli.RefreshSession()
return RoomView{
RoomID: roomID,
Subject: req.Subject,
Epoch: 1,
Encrypt: req.Encrypt,
Persist: req.Persist,
SignMsgs: req.SignMsgs,
Role: "owner",
}, nil
}
func (r *busRepo) Invite(_ context.Context, roomID string, req InviteReq) error {
signPub, err := hex.DecodeString(strings.TrimSpace(req.SignPub))
if err != nil || len(signPub) != 32 {
return fmt.Errorf("admin: invite: sign_pub must be 32-byte hex")
}
kexPub, err := hex.DecodeString(strings.TrimSpace(req.KexPub))
if err != nil || len(kexPub) != 32 {
return fmt.Errorf("admin: invite: kex_pub must be 32-byte hex")
}
endpoint := strings.TrimSpace(req.Endpoint)
if endpoint == "" {
endpoint = frame.EndpointID(signPub)
}
return r.cli.Invite(roomID, client.Endpoint{ID: endpoint, SignPub: signPub, KexPub: kexPub})
}
func (r *busRepo) KickMember(_ context.Context, roomID, endpoint string) error {
return r.cli.Kick(roomID, endpoint)
}
// ListMembers performs a signed GET /rooms/{id}/members. The unibus client does
// not export a member listing, so the gateway builds the request with the
// canonical signing construction the bus owns (membership.CanonicalRequest +
// cs.SignEd25519) — reusing the bus's single source of truth for the byte layout
// rather than reimplementing signing. The admin must be a member of the room
// (it is, for rooms it owns) or the control plane answers 403.
func (r *busRepo) ListMembers(_ context.Context, roomID string) ([]MemberView, error) {
path := "/rooms/" + roomID + "/members"
body, err := r.signedGET(path)
if err != nil {
return nil, err
}
var wire []struct {
Endpoint string `json:"endpoint"`
Role string `json:"role"`
SignPub []byte `json:"sign_pub"`
KexPub []byte `json:"kex_pub"`
}
if err := json.Unmarshal(body, &wire); err != nil {
return nil, fmt.Errorf("admin: decode members: %w", err)
}
out := make([]MemberView, 0, len(wire))
for _, m := range wire {
out = append(out, MemberView{
Endpoint: m.Endpoint,
Role: m.Role,
SignPub: hex.EncodeToString(m.SignPub),
KexPub: hex.EncodeToString(m.KexPub),
})
}
return out, nil
}
// signedGET issues a transport-authenticated GET against each control-plane base
// in turn (failover), signing the canonical request bytes with the admin's
// Ed25519 key under the same X-Unibus-* header scheme the bus client uses.
func (r *busRepo) signedGET(path string) ([]byte, error) {
var lastErr error
for _, base := range r.ctrlURLs {
req, err := http.NewRequest(http.MethodGet, strings.TrimRight(base, "/")+path, nil)
if err != nil {
return nil, err
}
ts := strconv.FormatInt(time.Now().Unix(), 10)
nonceRaw := make([]byte, 16)
if _, err := rand.Read(nonceRaw); err != nil {
return nil, fmt.Errorf("admin: nonce: %w", err)
}
nonce := base64.StdEncoding.EncodeToString(nonceRaw)
canonical := membership.CanonicalRequest(http.MethodGet, path, ts, nonce, nil)
sig := cs.SignEd25519(r.id.SignPriv, canonical)
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(r.id.SignPub))
req.Header.Set("X-Unibus-Ts", ts)
req.Header.Set("X-Unibus-Nonce", nonce)
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
resp, err := r.httpc.Do(req)
if err != nil {
lastErr = err
continue // dead node: try the next control plane
}
defer resp.Body.Close()
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if resp.StatusCode >= 300 {
var er struct {
Error string `json:"error"`
}
if json.Unmarshal(body, &er) == nil && er.Error != "" {
return nil, fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
}
return nil, fmt.Errorf("admin: GET %s -> %d", path, resp.StatusCode)
}
return body, nil
}
return nil, fmt.Errorf("admin: GET %s: all control planes failed: %w", path, lastErr)
}
// ---- users ----------------------------------------------------------------
func (r *busRepo) UsersWritable() bool { return r.store != nil }
func (r *busRepo) ListUsers(context.Context) ([]UserView, error) {
if r.store == nil {
return nil, ErrUsersUnavailable
}
users, err := r.store.ListUsers()
if err != nil {
return nil, err
}
out := make([]UserView, 0, len(users))
for _, u := range users {
out = append(out, UserView{
SignPub: u.SignPub,
Handle: u.Handle,
Role: u.Role,
Status: u.Status,
CreatedAt: u.CreatedAt,
RevokedAt: u.RevokedAt,
})
}
return out, nil
}
func (r *busRepo) AddUser(_ context.Context, req AddUserReq) error {
if r.store == nil {
return ErrUsersUnavailable
}
return r.store.AddUser(req.SignPub, req.Handle, req.Role)
}
func (r *busRepo) RevokeUser(_ context.Context, signPub string) error {
if r.store == nil {
return ErrUsersUnavailable
}
return r.store.RevokeUser(signPub)
}