Merge issue/0001b-ctrl-auth: signed control-plane auth + anti-replay

Phase 0001b of issue 0001. The control plane (membershipd HTTP) now supports
the bus-auth rollout off->soft->enforce: clients sign every request with their
Ed25519 identity (headers over method/path/ts/nonce/sha256(body)); the server
verifies the signature, clock skew (+/-30s), nonce replay (60s TTL cache), and
the user allowlist. Revocation denies access on the next request without a
restart. Default stays off so master keeps working.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 12:32:06 +02:00
7 changed files with 566 additions and 21 deletions
+8 -1
View File
@@ -40,9 +40,15 @@ func main() {
storeDir = flag.String("store-dir", "./local_files/blobs", "blob store directory")
natsPort = flag.Int("nats-port", 4250, "embedded NATS listen port (when --nats-url empty)")
natsStore = flag.String("nats-store", "./local_files/jetstream", "embedded JetStream store dir")
busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)")
)
flag.Parse()
authMode, err := membership.ParseAuthMode(*busAuth)
if err != nil {
log.Fatalf("%v", err)
}
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[membershipd] ")
@@ -78,7 +84,8 @@ func main() {
}
log.Printf("blob store: %s", *storeDir)
srv := membership.NewServer(store, blobs)
srv := membership.NewServer(store, blobs, authMode)
log.Printf("control-plane auth: %s", authMode)
addr := *bind + ":" + *httpPort
httpSrv := &http.Server{Addr: addr, Handler: srv}
+55 -8
View File
@@ -16,16 +16,20 @@ import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/frame"
"github.com/enmanuel/unibus/pkg/membership"
"github.com/enmanuel/unibus/pkg/room"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
@@ -116,17 +120,17 @@ func (c *Client) getCachedKey(roomID string, epoch int) ([]byte, bool) {
// ---- control-plane HTTP helpers ------------------------------------------
func (c *Client) doJSON(method, path string, body, out any) error {
var rdr io.Reader
var bodyBytes []byte
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("client: marshal request: %w", err)
}
rdr = bytes.NewReader(b)
bodyBytes = b
}
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
req, err := c.newSignedRequest(method, path, bodyBytes)
if err != nil {
return fmt.Errorf("client: new request: %w", err)
return err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
@@ -158,12 +162,51 @@ func (c *Client) doJSON(method, path string, body, out any) error {
// signRequest signs the canonical bytes of req (req must already have its Sig
// field cleared) with the client's Ed25519 key. It is symmetric with the
// server's verifyOwnerSig.
// server's verifyOwnerSig. This is the PAYLOAD-level owner signature that
// authorizes room operations (invite/rekey) by ownership — distinct from the
// transport-level request signature applied by newSignedRequest below, which
// authenticates the caller's identity on every request.
func (c *Client) signRequest(req any) []byte {
b, _ := json.Marshal(req)
return cs.SignEd25519(c.id.SignPriv, b)
}
// newSignedRequest builds an *http.Request to the control plane and attaches the
// transport authentication headers (X-Unibus-Pub/Ts/Nonce/Sig) signing the
// canonical request bytes with this peer's Ed25519 key. path is the request URI
// (path plus any query); body is the raw request body (nil for GET). The server
// (membership.authenticate) verifies these headers under the bus-auth flag.
//
// Signing happens on every request — including GETs — so that under enforce the
// server can authenticate the caller and reject unregistered or revoked
// identities uniformly. The canonical construction is the single source of truth
// in membership.CanonicalRequest, shared by both sides.
func (c *Client) newSignedRequest(method, path string, body []byte) (*http.Request, error) {
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
if err != nil {
return nil, fmt.Errorf("client: new request: %w", err)
}
ts := strconv.FormatInt(time.Now().Unix(), 10)
nonceRaw := make([]byte, 16)
if _, err := rand.Read(nonceRaw); err != nil {
return nil, fmt.Errorf("client: generate nonce: %w", err)
}
nonce := base64.StdEncoding.EncodeToString(nonceRaw)
canonical := membership.CanonicalRequest(method, path, ts, nonce, body)
sig := cs.SignEd25519(c.id.SignPriv, canonical)
req.Header.Set("X-Unibus-Pub", hex.EncodeToString(c.id.SignPub))
req.Header.Set("X-Unibus-Ts", ts)
req.Header.Set("X-Unibus-Nonce", nonce)
req.Header.Set("X-Unibus-Sig", base64.StdEncoding.EncodeToString(sig))
return req, nil
}
// ---- mirror of server wire types (control plane) -------------------------
type policyJSON struct {
@@ -769,9 +812,9 @@ func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) {
}
func (c *Client) putBlob(ciphertext []byte) (string, error) {
req, err := http.NewRequest("POST", c.ctrlURL+"/blobs", bytes.NewReader(ciphertext))
req, err := c.newSignedRequest("POST", "/blobs", ciphertext)
if err != nil {
return "", fmt.Errorf("client: new blob request: %w", err)
return "", err
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := c.http.Do(req)
@@ -791,7 +834,11 @@ func (c *Client) putBlob(ciphertext []byte) (string, error) {
}
func (c *Client) getBlob(hash string) ([]byte, error) {
resp, err := c.http.Get(c.ctrlURL + "/blobs/" + hash)
req, err := c.newSignedRequest("GET", "/blobs/"+hash, nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("client: get blob: %w", err)
}
+64 -3
View File
@@ -1,10 +1,12 @@
package client_test
import (
"encoding/hex"
"net"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@@ -27,6 +29,7 @@ type testHarness struct {
ctrlURL string
ns *server.Server
httpts *httptest.Server
store *membership.Store
}
func freePort(t *testing.T) int {
@@ -39,7 +42,12 @@ func freePort(t *testing.T) int {
return l.Addr().(*net.TCPAddr).Port
}
func newHarness(t *testing.T) *testHarness {
func newHarness(t *testing.T) *testHarness { return newHarnessMode(t, membership.AuthOff) }
// newHarnessMode is newHarness with an explicit control-plane auth mode, so auth
// tests can boot the real server in enforce/soft and exercise it through the
// production client (which signs every request).
func newHarnessMode(t *testing.T, mode membership.AuthMode) *testHarness {
t.Helper()
dir := t.TempDir()
@@ -58,10 +66,10 @@ func newHarness(t *testing.T) *testHarness {
ns.Shutdown()
t.Fatalf("blob store: %v", err)
}
srv := membership.NewServer(store, blobs)
srv := membership.NewServer(store, blobs, mode)
httpts := httptest.NewServer(srv)
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts}
h := &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL, ns: ns, httpts: httpts, store: store}
t.Cleanup(func() {
httpts.Close()
store.Close()
@@ -71,6 +79,15 @@ func newHarness(t *testing.T) *testHarness {
return h
}
// registerClient adds a peer's signing identity to the bus allowlist so its
// signed control-plane requests pass under enforce.
func registerClient(t *testing.T, h *testHarness, c *client.Client, handle, role string) {
t.Helper()
if err := h.store.AddUser(hex.EncodeToString(c.Endpoint().SignPub), handle, role); err != nil {
t.Fatalf("register %s: %v", handle, err)
}
}
func waitHealth(t *testing.T, ctrlURL string) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
@@ -455,6 +472,50 @@ func TestListMyRoomsDiscovery(t *testing.T) {
}
}
// TestControlPlaneAuthEnforceE2E closes the loop end to end with the production
// client against a server in enforce mode: a registered peer's signed requests
// are accepted (golden), and an unregistered peer is rejected with 401 on its
// first control-plane call (error path). This proves the client's real
// signature construction matches the server's verification.
func TestControlPlaneAuthEnforceE2E(t *testing.T) {
h := newHarnessMode(t, membership.AuthEnforce)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
registerClient(t, h, a, "alice", membership.RoleAdmin)
// Golden: registered peer's signed request is accepted.
if _, err := a.CreateRoom("room.enforced", room.ModeNATS); err != nil {
t.Fatalf("registered peer should create a room under enforce: %v", err)
}
// Error path: an unregistered peer is rejected on its first control-plane call.
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
_, err = b.CreateRoom("room.denied", room.ModeNATS)
if err == nil {
t.Fatalf("unregistered peer must be rejected under enforce")
}
if !strings.Contains(err.Error(), "401") && !strings.Contains(strings.ToLower(err.Error()), "unauthorized") {
t.Fatalf("expected a 401/unauthorized error, got %v", err)
}
// Revocation takes effect without restart: revoke A, its next request fails.
if err := h.store.RevokeUser(hex.EncodeToString(a.Endpoint().SignPub)); err != nil {
t.Fatalf("revoke A: %v", err)
}
if _, err := a.CreateRoom("room.after-revoke", room.ModeNATS); err == nil {
t.Fatalf("revoked peer must be rejected without a server restart")
}
}
// ---- test helpers ---------------------------------------------------------
type collector struct {
+185
View File
@@ -0,0 +1,185 @@
package membership
import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"net/http"
"strconv"
"sync"
"time"
cs "fn-registry/functions/cybersecurity"
)
// AuthMode is the control-plane authentication rollout state (feature flag
// bus-auth). It governs how the HTTP middleware treats a request whose signature
// is missing, invalid, replayed, skewed, or from an unregistered identity.
//
// AuthOff — do not verify anything (legacy behavior; default).
// AuthSoft — verify and LOG rejections, but let the request through. Lets
// clients migrate to signing without an outage.
// AuthEnforce — reject unauthenticated requests with 401.
type AuthMode int
const (
AuthOff AuthMode = iota
AuthSoft
AuthEnforce
)
func (m AuthMode) String() string {
switch m {
case AuthOff:
return "off"
case AuthSoft:
return "soft"
case AuthEnforce:
return "enforce"
default:
return "unknown"
}
}
// ParseAuthMode maps the bus-auth flag string to an AuthMode.
func ParseAuthMode(s string) (AuthMode, error) {
switch s {
case "off", "":
return AuthOff, nil
case "soft":
return AuthSoft, nil
case "enforce":
return AuthEnforce, nil
default:
return AuthOff, fmt.Errorf("membership: invalid bus-auth mode %q (want off|soft|enforce)", s)
}
}
// Control-plane signature headers. The client signs the canonical bytes of the
// request and presents these; the server reconstructs the canonical bytes and
// verifies. See canonicalRequest for the exact byte layout.
const (
hdrPub = "X-Unibus-Pub" // signer Ed25519 public key, lowercase hex
hdrTs = "X-Unibus-Ts" // unix seconds (string)
hdrNonce = "X-Unibus-Nonce" // 16 random bytes, std base64
hdrSig = "X-Unibus-Sig" // Ed25519 signature over canonical, std base64
)
// Anti-replay parameters. A request is accepted only if its timestamp is within
// clockSkew of now; nonces are remembered for nonceTTL so a captured request
// cannot be replayed inside its acceptance window. nonceTTL must be >= the full
// acceptance window (2*clockSkew) so a replay can never outlive its memory.
const (
clockSkew = 30 * time.Second
nonceTTL = 60 * time.Second
)
// CanonicalRequest returns the exact bytes that are signed and verified for a
// control-plane request:
//
// method "\n" path "\n" ts "\n" nonce "\n" hex(sha256(body))
//
// path is the request URI (path plus raw query) so query parameters (endpoint,
// epoch) are covered by the signature. It is exported so the client library and
// tests sign with the identical construction — the one place this format lives.
func CanonicalRequest(method, path, ts, nonce string, body []byte) []byte {
sum := sha256.Sum256(body)
return []byte(method + "\n" + path + "\n" + ts + "\n" + nonce + "\n" + hex.EncodeToString(sum[:]))
}
// nonceCache remembers recently-seen nonces to reject replays. It is an
// in-memory map guarded by a mutex with lazy expiry — sufficient for a single
// membershipd process (the spec's chosen tradeoff over a server-issued nonce
// round-trip). A distributed deployment would need a shared store.
type nonceCache struct {
mu sync.Mutex
seen map[string]time.Time
ttl time.Duration
}
func newNonceCache(ttl time.Duration) *nonceCache {
return &nonceCache{seen: make(map[string]time.Time), ttl: ttl}
}
// rememberOrReject records nonce and returns true if it was unseen, or false if
// it is a replay (still live in the cache). Expired entries are pruned lazily on
// each call so the map cannot grow without bound under steady traffic.
func (n *nonceCache) rememberOrReject(nonce string, now time.Time) bool {
n.mu.Lock()
defer n.mu.Unlock()
for k, exp := range n.seen {
if exp.Before(now) {
delete(n.seen, k)
}
}
if exp, ok := n.seen[nonce]; ok && !exp.Before(now) {
return false
}
n.seen[nonce] = now.Add(n.ttl)
return true
}
// authResult is what a successful authentication yields: the verified signing
// key (hex) and the authorized user record. Handlers may use it for fine-grained
// authorization (e.g. role checks) in later phases.
type authResult struct {
pubHex string
user User
}
// authenticate verifies the signature headers on r against body and the user
// allowlist. It returns an error describing the first failing check; the
// middleware decides whether that error blocks (enforce) or only logs (soft).
//
// Order matters: cheap, non-cryptographic checks (header presence, key shape,
// clock skew) run first; the Ed25519 verification runs before the replay cache
// is touched so an attacker cannot poison the cache with unsigned nonces; the
// allowlist lookup runs last.
func (s *Server) authenticate(r *http.Request, body []byte, now time.Time) (authResult, error) {
pubHex := r.Header.Get(hdrPub)
ts := r.Header.Get(hdrTs)
nonce := r.Header.Get(hdrNonce)
sigB64 := r.Header.Get(hdrSig)
if pubHex == "" || ts == "" || nonce == "" || sigB64 == "" {
return authResult{}, fmt.Errorf("missing auth headers")
}
pub, err := hex.DecodeString(pubHex)
if err != nil || len(pub) != 32 {
return authResult{}, fmt.Errorf("malformed %s (want 32-byte Ed25519 hex)", hdrPub)
}
tsInt, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return authResult{}, fmt.Errorf("malformed %s", hdrTs)
}
if d := now.Unix() - tsInt; d > int64(clockSkew/time.Second) || d < -int64(clockSkew/time.Second) {
return authResult{}, fmt.Errorf("timestamp out of range (skew %ds)", d)
}
sig, err := base64.StdEncoding.DecodeString(sigB64)
if err != nil {
return authResult{}, fmt.Errorf("malformed %s", hdrSig)
}
canonical := CanonicalRequest(r.Method, r.URL.RequestURI(), ts, nonce, body)
if !cs.VerifyEd25519(pub, canonical, sig) {
return authResult{}, fmt.Errorf("invalid signature")
}
if !s.nonces.rememberOrReject(nonce, now) {
return authResult{}, fmt.Errorf("replayed nonce")
}
if !s.store.IsAuthorized(pubHex) {
return authResult{}, fmt.Errorf("identity not authorized")
}
user, err := s.store.GetUser(pubHex)
if err != nil {
// IsAuthorized passed but the row vanished (race with revoke): fail closed.
return authResult{}, fmt.Errorf("identity not authorized")
}
return authResult{pubHex: pubHex, user: user}, nil
}
+194
View File
@@ -0,0 +1,194 @@
package membership
import (
"bytes"
"encoding/base64"
"encoding/hex"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"strconv"
"testing"
"time"
cs "fn-registry/functions/cybersecurity"
"github.com/enmanuel/unibus/pkg/blobstore"
)
// authHarness boots an in-process membershipd HTTP server in the given auth mode
// with a fresh store + blob store, and seeds one active admin ("alice").
type authHarness struct {
ts *httptest.Server
store *Store
alice cs.Identity
alicePub string // hex
}
func newAuthHarness(t *testing.T, mode AuthMode) *authHarness {
t.Helper()
dir := t.TempDir()
store, err := Open(filepath.Join(dir, "unibus.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
if err != nil {
t.Fatalf("open blobs: %v", err)
}
alice, err := cs.GenerateIdentity()
if err != nil {
t.Fatalf("identity: %v", err)
}
alicePub := hex.EncodeToString(alice.SignPub)
if err := store.AddUser(alicePub, "alice", RoleAdmin); err != nil {
t.Fatalf("seed admin: %v", err)
}
srv := NewServer(store, blobs, mode)
ts := httptest.NewServer(srv)
t.Cleanup(func() {
ts.Close()
store.Close()
})
return &authHarness{ts: ts, store: store, alice: alice, alicePub: alicePub}
}
// signedReq builds a control-plane request signed by id, with explicit ts/nonce
// so tests can force skew and replay. It signs via the same CanonicalRequest the
// production client uses, so the test verifies the real wire contract.
func signedReq(t *testing.T, base, method, path string, body []byte, id cs.Identity, ts int64, nonce string) *http.Request {
t.Helper()
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequest(method, base+path, rdr)
if err != nil {
t.Fatalf("new request: %v", err)
}
tss := strconv.FormatInt(ts, 10)
canonical := CanonicalRequest(method, path, tss, nonce, body)
sig := cs.SignEd25519(id.SignPriv, canonical)
req.Header.Set(hdrPub, hex.EncodeToString(id.SignPub))
req.Header.Set(hdrTs, tss)
req.Header.Set(hdrNonce, nonce)
req.Header.Set(hdrSig, base64.StdEncoding.EncodeToString(sig))
return req
}
func do(t *testing.T, req *http.Request) (int, string) {
t.Helper()
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("do request: %v", err)
}
defer resp.Body.Close()
b, _ := io.ReadAll(resp.Body)
return resp.StatusCode, string(b)
}
const okPath = "/members/alice-endpoint/rooms" // always 200 with an empty list
// Golden: a request signed by a registered, active identity is accepted.
func TestAuthGoldenAccepted(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
code, _ := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-golden"))
if code != http.StatusOK {
t.Fatalf("golden signed request should be 200, got %d", code)
}
}
// Error path: a structurally valid signature from an identity that is NOT in the
// allowlist is rejected with 401.
func TestAuthUnregisteredRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
bob, _ := cs.GenerateIdentity()
now := time.Now().Unix()
code, body := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, bob, now, "nonce-bob"))
if code != http.StatusUnauthorized {
t.Fatalf("unregistered identity should be 401, got %d (%s)", code, body)
}
}
// Error path: replaying a captured request (same nonce + signature) is rejected.
func TestAuthReplayRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
first := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
if code, body := do(t, first); code != http.StatusOK {
t.Fatalf("first request should be 200, got %d (%s)", code, body)
}
// Identical ts + nonce + signature: a replay.
second := signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, now, "nonce-replay")
if code, body := do(t, second); code != http.StatusUnauthorized {
t.Fatalf("replayed request should be 401, got %d (%s)", code, body)
}
}
// Error path: a timestamp outside the ±30s window is rejected even with a valid
// signature (defends against long-delayed captured requests).
func TestAuthClockSkewRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
stale := time.Now().Unix() - 120
code, body := do(t, signedReq(t, h.ts.URL, "GET", okPath, nil, h.alice, stale, "nonce-skew"))
if code != http.StatusUnauthorized {
t.Fatalf("clock-skewed request should be 401, got %d (%s)", code, body)
}
}
// Error path: tampering the body after signing invalidates the signature.
func TestAuthTamperedBodyRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
now := time.Now().Unix()
req := signedReq(t, h.ts.URL, "POST", "/rooms", []byte(`{"subject":"x"}`), h.alice, now, "nonce-tamper")
// Swap the body for different bytes the signature does not cover.
req.Body = io.NopCloser(bytes.NewReader([]byte(`{"subject":"evil"}`)))
req.ContentLength = int64(len(`{"subject":"evil"}`))
code, body := do(t, req)
if code != http.StatusUnauthorized {
t.Fatalf("tampered body should be 401, got %d (%s)", code, body)
}
}
// Error path: missing auth headers under enforce are rejected.
func TestAuthMissingHeadersRejected(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
req, _ := http.NewRequest("GET", h.ts.URL+okPath, nil)
code, _ := do(t, req)
if code != http.StatusUnauthorized {
t.Fatalf("unsigned request under enforce should be 401, got %d", code)
}
}
// Exemption: the health probe bypasses auth even under enforce.
func TestAuthHealthExempt(t *testing.T) {
h := newAuthHarness(t, AuthEnforce)
req, _ := http.NewRequest("GET", h.ts.URL+"/healthz", nil)
code, _ := do(t, req)
if code != http.StatusOK {
t.Fatalf("/healthz must be reachable without auth, got %d", code)
}
}
// Soft mode: an unauthenticated request is logged but allowed through, so
// clients can migrate without an outage.
func TestAuthSoftAllowsUnauthenticated(t *testing.T) {
h := newAuthHarness(t, AuthSoft)
req, _ := http.NewRequest("GET", h.ts.URL+okPath, nil)
code, _ := do(t, req)
if code != http.StatusOK {
t.Fatalf("soft mode should allow unsigned request, got %d", code)
}
}
// Off mode (default for legacy callers): no verification at all.
func TestAuthOffNoVerification(t *testing.T) {
h := newAuthHarness(t, AuthOff)
req, _ := http.NewRequest("GET", h.ts.URL+okPath, nil)
code, _ := do(t, req)
if code != http.StatusOK {
t.Fatalf("off mode should allow unsigned request, got %d", code)
}
}
+57 -8
View File
@@ -1,14 +1,17 @@
package membership
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"time"
cs "fn-registry/functions/cybersecurity"
@@ -24,20 +27,66 @@ import (
// rate limiting, and read endpoints (GET) are unauthenticated. Hardening
// (mTLS, capabilities, rate limits) is a later phase.
type Server struct {
store *Store
blobs *blobstore.Store
mux *http.ServeMux
store *Store
blobs *blobstore.Store
mux *http.ServeMux
authMode AuthMode
nonces *nonceCache
}
// NewServer wires the membership store and blob store into an http.Handler.
func NewServer(store *Store, blobs *blobstore.Store) *Server {
s := &Server{store: store, blobs: blobs, mux: http.NewServeMux()}
// NewServer wires the membership store and blob store into an http.Handler. The
// authMode selects the control-plane auth rollout state (AuthOff for callers and
// tests that have not migrated to signed requests yet).
func NewServer(store *Store, blobs *blobstore.Store, authMode AuthMode) *Server {
s := &Server{
store: store,
blobs: blobs,
mux: http.NewServeMux(),
authMode: authMode,
nonces: newNonceCache(nonceTTL),
}
s.routes()
return s
}
// ServeHTTP satisfies http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) }
// ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware
// (signature verification + anti-replay + allowlist) ahead of the router
// according to authMode, then dispatches to the matched handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.authMode == AuthOff || isAuthExempt(r) {
s.mux.ServeHTTP(w, r)
return
}
// Buffer the body so the signature can be verified over it and the handler
// still reads it. Bodies on the control plane are small (JSON metadata or a
// media blob already capped upstream), so full buffering is acceptable.
body, err := io.ReadAll(r.Body)
if err != nil {
writeErr(w, http.StatusBadRequest, "read body: "+err.Error())
return
}
_ = r.Body.Close()
r.Body = io.NopCloser(bytes.NewReader(body))
if _, err := s.authenticate(r, body, time.Now()); err != nil {
if s.authMode == AuthSoft {
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
s.mux.ServeHTTP(w, r)
return
}
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
return
}
s.mux.ServeHTTP(w, r)
}
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
// Only the unauthenticated health probe qualifies: it carries no data and is
// needed by load balancers / smoke checks / systemd before any identity exists.
func isAuthExempt(r *http.Request) bool {
return r.Method == http.MethodGet && r.URL.Path == "/healthz"
}
func (s *Server) routes() {
s.mux.HandleFunc("GET /healthz", s.handleHealth)
+3 -1
View File
@@ -860,7 +860,9 @@ func main() {
ns.Shutdown()
log.Fatalf("open blob store: %v", err)
}
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs)}
// AuthOff: the playground is a local dev gateway that has not migrated to
// signed control-plane requests yet (tracked in phase 0001e of issue 0001).
ctrlSrv := &http.Server{Addr: ctrlAddr, Handler: membership.NewServer(store, blobs, membership.AuthOff)}
go func() {
if err := ctrlSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("control plane: %v", err)