9b96537aa6
A cluster is only as secure as its weakest node: the data plane forwards every
subject between nodes, so one node running without enforced auth lets an
unauthenticated peer Subscribe(">") on it and harvest the traffic forwarded from
the ACL'd nodes.
- validateClusterConfig now takes the auth mode and REFUSES to join a cluster
unless --bus-auth enforce, regardless of bind (a clustered node is a production
node; there is no safe dev cluster without auth). This binary therefore cannot
BE the weak node.
- Server.Posture {enforce,acl,tls,cluster,store} is published on /healthz (non
secret operational metadata, probe stays unauthenticated) so a monitor or peer
can detect a cluster member not running enforce+ACL+TLS — covering a peer that
runs a tampered/old binary outside this node's control.
Tests:
- TestAttack0008_N1: a clustered node with --bus-auth off is refused; the same
node with enforce + full route security is allowed.
- TestClusterConfigPolicy: extended with off/soft clustered cases (refused) and
the mode parameter throughout.
- TestHealthExposesPosture: /healthz returns the posture booleans + store backend.
CGO_ENABLED=0 go build/vet/test green; govulncheck 0 reachable.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
677 lines
25 KiB
Go
677 lines
25 KiB
Go
package membership
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
cs "fn-registry/functions/cybersecurity"
|
|
|
|
"golang.org/x/time/rate"
|
|
|
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
// Body-size ceilings for the control plane. They bound how much an unauthenticated
|
|
// peer can make the server buffer in RAM before the request is even authenticated
|
|
// (the signature is verified over the full body, so the body must be read — but
|
|
// not unboundedly). maxControlBodyBytes covers JSON metadata requests; /blobs gets
|
|
// a separate, larger ceiling because media ciphertext is legitimately bigger. A
|
|
// request whose declared Content-Length already exceeds its ceiling is rejected
|
|
// before a single byte is buffered.
|
|
const (
|
|
maxControlBodyBytes = 1 << 20 // 1 MiB for JSON control-plane requests
|
|
maxBlobBytes = 16 << 20 // 16 MiB for a single media blob upload
|
|
// MaxHeaderBytes caps request header size; wired into the http.Server by the
|
|
// command. Exported so the bound lives next to its body-size siblings.
|
|
MaxHeaderBytes = 1 << 20 // 1 MiB
|
|
// maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all
|
|
// concurrent requests (audit N2). The per-request ceilings above bound one
|
|
// request; this bounds the sum, so a concurrent (even multi-IP) flood of
|
|
// max-size uploads cannot drive the resident set without limit. 128 MiB allows
|
|
// ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before
|
|
// further POSTs are shed with 503 — generous for an interactive bus, bounded
|
|
// for an attacker.
|
|
maxInflightBytes = 128 << 20 // 128 MiB
|
|
)
|
|
|
|
// Per-IP rate-limit defaults for the control plane. Tuned for an interactive
|
|
// human/agent bus rather than a high-QPS API: a steady ~20 req/s with a burst of
|
|
// 40 absorbs a chat client's bursty polling while throttling a flood. Loopback
|
|
// dev stacks pass r<=0 to disable limiting entirely.
|
|
const (
|
|
defaultRatePerSec = rate.Limit(20)
|
|
defaultRateBurst = 40
|
|
rateBucketTTL = 10 * time.Minute
|
|
)
|
|
|
|
// Server is the HTTP control plane: the authoritative source of room metadata,
|
|
// membership, and per-epoch sealed keys. The data plane (messages) is NATS.
|
|
//
|
|
// Auth model (v1): mutating endpoints require an Ed25519 signature from the
|
|
// room owner over the canonical bytes of the request (the request body with the
|
|
// "sig" field cleared). v1 trusts the internal network: there is no TLS, no
|
|
// rate limiting, and read endpoints (GET) are unauthenticated. Hardening
|
|
// (mTLS, capabilities, rate limits) is a later phase.
|
|
type Server struct {
|
|
store Store
|
|
blobs blobstore.Store
|
|
mux *http.ServeMux
|
|
authMode AuthMode
|
|
nonces nonceStore
|
|
limiter *ipRateLimiter
|
|
inflight *inflightLimiter
|
|
|
|
// RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS)
|
|
// rooms. It is the minimum-defensive control for the data plane (audit H4):
|
|
// the embedded NATS has no per-subject ACL, so a cleartext room is readable by
|
|
// any registered peer that knows (or guesses) its subject. Forcing every room
|
|
// to be end-to-end encrypted keeps message CONTENT confidential even when the
|
|
// transport offers no subject isolation. The command sets this on a public
|
|
// (non-loopback) bind. See dev/0004d-dataplane-acl.md for the full rationale
|
|
// and the residual metadata exposure this does NOT close.
|
|
RequireEncryptedRooms bool
|
|
|
|
// Posture is the node's security posture, surfaced on /healthz so an operator
|
|
// or a peer can detect a node NOT running the homogeneous enforce+ACL+TLS
|
|
// posture a secure cluster requires (audit 0008 N1). It is set by the command;
|
|
// the zero value (all false) reflects an unsecured dev node.
|
|
Posture Posture
|
|
}
|
|
|
|
// Posture describes the security posture a membershipd node runs with. It is
|
|
// non-secret operational metadata (booleans + the store backend name), published
|
|
// on /healthz so a monitor can flag a cluster member that is not enforce+ACL+TLS
|
|
// — the weak node that would let an unauthenticated peer harvest the cluster's
|
|
// forwarded traffic (audit 0008 N1).
|
|
type Posture struct {
|
|
Enforce bool `json:"enforce"`
|
|
ACL bool `json:"acl"`
|
|
TLS bool `json:"tls"`
|
|
Cluster bool `json:"cluster"`
|
|
Store string `json:"store"` // "sqlite" | "kv"
|
|
}
|
|
|
|
// NewServer wires the membership store and blob store into an http.Handler. The
|
|
// authMode selects the control-plane auth rollout state (AuthOff for callers and
|
|
// tests that have not migrated to signed requests yet). It installs a per-IP
|
|
// rate limiter with the package defaults; loopback dev behavior is unchanged
|
|
// because the burst comfortably exceeds any single client's request rate.
|
|
func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
|
|
s := &Server{
|
|
store: store,
|
|
blobs: blobs,
|
|
mux: http.NewServeMux(),
|
|
authMode: authMode,
|
|
nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries),
|
|
limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL),
|
|
inflight: newInflightLimiter(maxInflightBytes),
|
|
}
|
|
s.routes()
|
|
return s
|
|
}
|
|
|
|
// UseReplicatedNonces switches the server's anti-replay store from the
|
|
// per-process in-memory cache to a JetStream KV bucket shared across the cluster
|
|
// (issue 0003e). It MUST be called on every node of a multi-node deployment:
|
|
// otherwise a request captured on one node can be replayed to another whose
|
|
// local cache never saw the nonce. replicas is the bucket's replication factor
|
|
// (R1..R3). The TTL matches the in-memory cache (nonceTTL = 2*clockSkew), so a
|
|
// replay can never outlive its memory.
|
|
func (s *Server) UseReplicatedNonces(js jetstream.JetStream, replicas int) error {
|
|
ns, err := newKVNonceStore(js, nonceTTL, replicas, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.nonces = ns
|
|
return nil
|
|
}
|
|
|
|
// ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware
|
|
// (signature verification + anti-replay + allowlist) ahead of the router
|
|
// according to authMode, then dispatches to the matched handler.
|
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
now := time.Now()
|
|
|
|
// Per-IP rate limit runs first, ahead of auth and body reads, so a flood is
|
|
// shed at the cheapest possible point. The health probe is exempt so liveness
|
|
// checks are never throttled.
|
|
if !isAuthExempt(r) && !s.limiter.allow(clientIP(r), now) {
|
|
writeErr(w, http.StatusTooManyRequests, "rate limit exceeded")
|
|
return
|
|
}
|
|
|
|
// Cap how much body we will buffer, BEFORE reading a single byte. The ceiling
|
|
// is per-route: /blobs may legitimately carry a media ciphertext, everything
|
|
// else is small JSON. A declared Content-Length over the ceiling is rejected
|
|
// outright (no buffering); MaxBytesReader then guards against a lying or
|
|
// chunked sender by failing the read once the limit is crossed. This is the
|
|
// fix for the pre-auth DoS: without it an unauthenticated peer could make the
|
|
// server buffer an unbounded body in RAM before authenticate() ever ran.
|
|
limit := int64(maxControlBodyBytes)
|
|
if r.Method == http.MethodPost && r.URL.Path == "/blobs" {
|
|
limit = int64(maxBlobBytes)
|
|
}
|
|
if r.ContentLength > limit {
|
|
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
|
return
|
|
}
|
|
r.Body = http.MaxBytesReader(w, r.Body, limit)
|
|
|
|
// Aggregate memory bound (audit N2): the per-request ceiling above and the
|
|
// per-IP rate limit do not cap the TOTAL bytes buffered across concurrent
|
|
// requests. A POST reserves its worst-case buffered size (its route ceiling)
|
|
// from a global limiter before the body is read, and is shed with 503 when the
|
|
// cap is reached, so the resident set stays bounded under a concurrent (even
|
|
// multi-IP) upload flood instead of growing linearly with the number of
|
|
// connections. Reservation is released when the request finishes. Only POSTs
|
|
// buffer a body; GETs carry none, so they do not consume the budget.
|
|
if r.Method == http.MethodPost {
|
|
if !s.inflight.tryAcquire(limit) {
|
|
writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight")
|
|
return
|
|
}
|
|
defer s.inflight.release(limit)
|
|
}
|
|
|
|
if s.authMode == AuthOff || isAuthExempt(r) {
|
|
s.mux.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
// Buffer the (now bounded) body so the signature can be verified over it and
|
|
// the handler still reads it.
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
if isBodyTooLarge(err) {
|
|
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
|
return
|
|
}
|
|
writeErr(w, http.StatusBadRequest, "read body")
|
|
return
|
|
}
|
|
_ = r.Body.Close()
|
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
|
|
|
res, err := s.authenticate(r, body, now)
|
|
if err != nil {
|
|
if s.authMode == AuthSoft {
|
|
log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err)
|
|
s.mux.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error())
|
|
return
|
|
}
|
|
// Carry the authenticated signer's endpoint into the handler so room handlers
|
|
// can authorize by membership (audit H3). Only set on a verified identity.
|
|
s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint)))
|
|
}
|
|
|
|
// isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader
|
|
// when the body exceeds its limit, so the middleware can map it to 413.
|
|
func isBodyTooLarge(err error) bool {
|
|
var maxErr *http.MaxBytesError
|
|
return errors.As(err, &maxErr)
|
|
}
|
|
|
|
// ctxKey is the unexported type for this package's request-context keys, so the
|
|
// values cannot collide with keys set by other packages.
|
|
type ctxKey int
|
|
|
|
const ctxSignerEndpoint ctxKey = iota
|
|
|
|
// withSigner returns a context carrying the authenticated signer's endpoint id.
|
|
func withSigner(ctx context.Context, endpoint string) context.Context {
|
|
return context.WithValue(ctx, ctxSignerEndpoint, endpoint)
|
|
}
|
|
|
|
// signerEndpoint returns the authenticated signer's endpoint id and whether one
|
|
// is present. It is absent under AuthOff (no verification) and when a soft-mode
|
|
// request was let through unauthenticated — in both cases membership
|
|
// authorization is skipped, preserving dev/legacy behavior.
|
|
func signerEndpoint(r *http.Request) (string, bool) {
|
|
v, ok := r.Context().Value(ctxSignerEndpoint).(string)
|
|
return v, ok && v != ""
|
|
}
|
|
|
|
// requireMember authorizes a room request by membership (audit H3): it returns
|
|
// the signer endpoint and true when the request may proceed, or writes 403 and
|
|
// returns false when an authenticated signer is not a member of roomID. When no
|
|
// authenticated signer is present (AuthOff/dev, or soft pass-through) it allows
|
|
// the request — membership is only enforced once the caller's identity is known.
|
|
func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID string) (string, bool) {
|
|
signer, ok := signerEndpoint(r)
|
|
if !ok {
|
|
return "", true
|
|
}
|
|
if _, err := s.store.GetMember(roomID, signer); err != nil {
|
|
writeErr(w, http.StatusForbidden, "forbidden: not a member of this room")
|
|
return signer, false
|
|
}
|
|
return signer, true
|
|
}
|
|
|
|
// isAuthExempt lists requests that bypass control-plane auth even under enforce.
|
|
// Only the unauthenticated health probe qualifies: it carries no data and is
|
|
// needed by load balancers / smoke checks / systemd before any identity exists.
|
|
func isAuthExempt(r *http.Request) bool {
|
|
return r.Method == http.MethodGet && r.URL.Path == "/healthz"
|
|
}
|
|
|
|
func (s *Server) routes() {
|
|
s.mux.HandleFunc("GET /healthz", s.handleHealth)
|
|
s.mux.HandleFunc("POST /rooms", s.handleCreateRoom)
|
|
s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite)
|
|
s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey)
|
|
s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers)
|
|
s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms)
|
|
s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey)
|
|
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
|
|
s.mux.HandleFunc("POST /blobs", s.handlePutBlob)
|
|
s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob)
|
|
}
|
|
|
|
// ---- wire types -----------------------------------------------------------
|
|
|
|
type policyJSON struct {
|
|
Encrypt bool `json:"encrypt"`
|
|
Persist bool `json:"persist"`
|
|
SignMsgs bool `json:"sign_msgs"`
|
|
}
|
|
|
|
type endpointJSON struct {
|
|
Endpoint string `json:"endpoint"`
|
|
SignPub []byte `json:"sign_pub"`
|
|
KexPub []byte `json:"kex_pub"`
|
|
}
|
|
|
|
type createRoomReq struct {
|
|
Subject string `json:"subject"`
|
|
Policy policyJSON `json:"policy"`
|
|
Owner endpointJSON `json:"owner"`
|
|
SealedKeySelf []byte `json:"sealed_key_self"`
|
|
}
|
|
|
|
type createRoomResp struct {
|
|
RoomID string `json:"room_id"`
|
|
}
|
|
|
|
type inviteReq struct {
|
|
By string `json:"by"` // owner endpoint id
|
|
Sig []byte `json:"sig"` // Ed25519 over canonical(request with sig cleared)
|
|
Member endpointJSON `json:"member"`
|
|
SealedKey []byte `json:"sealed_key"`
|
|
}
|
|
|
|
type keyResp struct {
|
|
Epoch int `json:"epoch"`
|
|
SealedKey []byte `json:"sealed_key"`
|
|
}
|
|
|
|
type memberJSON struct {
|
|
Endpoint string `json:"endpoint"`
|
|
Role string `json:"role"`
|
|
SignPub []byte `json:"sign_pub"`
|
|
KexPub []byte `json:"kex_pub"`
|
|
}
|
|
|
|
type roomResp struct {
|
|
Subject string `json:"subject"`
|
|
Epoch int `json:"epoch"`
|
|
Policy policyJSON `json:"policy"`
|
|
}
|
|
|
|
type memberRoomJSON struct {
|
|
RoomID string `json:"room_id"`
|
|
Subject string `json:"subject"`
|
|
Epoch int `json:"epoch"`
|
|
Policy policyJSON `json:"policy"`
|
|
Role string `json:"role"`
|
|
}
|
|
|
|
type rekeyKey struct {
|
|
Endpoint string `json:"endpoint"`
|
|
SealedKey []byte `json:"sealed_key"`
|
|
}
|
|
|
|
type rekeyReq struct {
|
|
By string `json:"by"`
|
|
Sig []byte `json:"sig"`
|
|
NewEpoch int `json:"new_epoch"`
|
|
Keys []rekeyKey `json:"keys"`
|
|
Remove []string `json:"remove"`
|
|
}
|
|
|
|
type blobResp struct {
|
|
Hash string `json:"hash"`
|
|
}
|
|
|
|
// ---- helpers --------------------------------------------------------------
|
|
|
|
func writeJSON(w http.ResponseWriter, code int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(code)
|
|
_ = json.NewEncoder(w).Encode(v)
|
|
}
|
|
|
|
func writeErr(w http.ResponseWriter, code int, msg string) {
|
|
writeJSON(w, code, map[string]string{"error": msg})
|
|
}
|
|
|
|
// writeServerErr logs the internal error detail and returns ONLY a generic
|
|
// message to the client (audit H12): raw store/blob errors embed SQL fragments
|
|
// and filesystem paths, which must not leak to a caller. Use it for any error
|
|
// that originates inside the server (5xx, or a not-found wrapping a store error).
|
|
func writeServerErr(w http.ResponseWriter, r *http.Request, code int, publicMsg string, err error) {
|
|
log.Printf("[handler] %s %s -> %d: %v", r.Method, r.URL.Path, code, err)
|
|
writeErr(w, code, publicMsg)
|
|
}
|
|
|
|
// canonicalSig returns the bytes to verify for a request: the request struct
|
|
// re-marshaled with its Sig field cleared. The caller passes a copy with Sig
|
|
// already zeroed. This is symmetric with how the client signs.
|
|
func canonicalSig(v any) []byte {
|
|
b, _ := json.Marshal(v)
|
|
return b
|
|
}
|
|
|
|
// verifyOwnerSig checks that sig is a valid Ed25519 signature by the room owner
|
|
// over canonical(reqWithSigCleared). It returns the owner Member on success.
|
|
func (s *Server) verifyOwnerSig(roomID, by string, sig, canonical []byte) (Member, error) {
|
|
info, err := s.store.GetRoom(roomID)
|
|
if err != nil {
|
|
return Member{}, fmt.Errorf("room not found")
|
|
}
|
|
if by != info.OwnerEndpoint {
|
|
return Member{}, fmt.Errorf("requester %q is not the room owner", by)
|
|
}
|
|
owner, err := s.store.GetMember(roomID, by)
|
|
if err != nil {
|
|
return Member{}, fmt.Errorf("owner member not found")
|
|
}
|
|
if !cs.VerifyEd25519(owner.SignPub, canonical, sig) {
|
|
return Member{}, fmt.Errorf("invalid owner signature")
|
|
}
|
|
return owner, nil
|
|
}
|
|
|
|
// ---- handlers -------------------------------------------------------------
|
|
|
|
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
|
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "posture": s.Posture})
|
|
}
|
|
|
|
func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) {
|
|
var req createRoomReq
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
|
|
return
|
|
}
|
|
if req.Subject == "" || req.Owner.Endpoint == "" {
|
|
writeErr(w, http.StatusBadRequest, "subject and owner.endpoint required")
|
|
return
|
|
}
|
|
// Data-plane minimum defense (audit H4): on a public deployment cleartext
|
|
// rooms are disabled, so no message ever rides the un-ACL'd NATS subject in
|
|
// the clear for another registered peer to sniff.
|
|
if s.RequireEncryptedRooms && !req.Policy.Encrypt {
|
|
writeErr(w, http.StatusForbidden,
|
|
"cleartext rooms are disabled on this deployment; create an encrypted (Matrix-policy) room")
|
|
return
|
|
}
|
|
// Owner binding (audit H6): the declared owner must BE the authenticated
|
|
// signer — both the endpoint id and the signing key. Otherwise a registered
|
|
// peer could create rooms in another identity's name. Enforced only when an
|
|
// authenticated signer is present (AuthOff/dev trusts the caller).
|
|
if signer, ok := signerEndpoint(r); ok {
|
|
if req.Owner.Endpoint != signer || frame.EndpointID(req.Owner.SignPub) != signer {
|
|
writeErr(w, http.StatusForbidden, "forbidden: room owner must be the authenticated signer")
|
|
return
|
|
}
|
|
}
|
|
roomID := newULID()
|
|
info := RoomInfo{
|
|
RoomID: roomID,
|
|
Subject: req.Subject,
|
|
Encrypt: req.Policy.Encrypt,
|
|
Persist: req.Policy.Persist,
|
|
SignMsgs: req.Policy.SignMsgs,
|
|
OwnerEndpoint: req.Owner.Endpoint,
|
|
}
|
|
if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil {
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusCreated, createRoomResp{RoomID: roomID})
|
|
}
|
|
|
|
func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) {
|
|
roomID := r.PathValue("id")
|
|
var req inviteReq
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
|
|
return
|
|
}
|
|
// Canonical bytes = the request with Sig cleared.
|
|
sig := req.Sig
|
|
req.Sig = nil
|
|
if _, err := s.verifyOwnerSig(roomID, req.By, sig, canonicalSig(req)); err != nil {
|
|
writeErr(w, http.StatusForbidden, err.Error())
|
|
return
|
|
}
|
|
info, err := s.store.GetRoom(roomID)
|
|
if err != nil {
|
|
writeServerErr(w, r, http.StatusNotFound, "room not found", err)
|
|
return
|
|
}
|
|
m := Member{
|
|
Endpoint: req.Member.Endpoint,
|
|
Role: "member",
|
|
SignPub: req.Member.SignPub,
|
|
KexPub: req.Member.KexPub,
|
|
}
|
|
if err := s.store.AddMember(roomID, m, info.Epoch, req.SealedKey); err != nil {
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "invited"})
|
|
}
|
|
|
|
func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) {
|
|
roomID := r.PathValue("id")
|
|
endpoint := r.URL.Query().Get("endpoint")
|
|
if endpoint == "" {
|
|
writeErr(w, http.StatusBadRequest, "endpoint query param required")
|
|
return
|
|
}
|
|
// A sealed room key is sealed to one identity's X25519 key. Serving it only to
|
|
// that identity (the signer) stops a registered peer from harvesting another
|
|
// member's sealed key (audit H3). Membership is implied by owning a sealed key,
|
|
// but we also require the signer to be a member for defense in depth.
|
|
if signer, ok := signerEndpoint(r); ok {
|
|
if endpoint != signer {
|
|
writeErr(w, http.StatusForbidden, "forbidden: may only fetch your own sealed key")
|
|
return
|
|
}
|
|
if _, err := s.store.GetMember(roomID, signer); err != nil {
|
|
writeErr(w, http.StatusForbidden, "forbidden: not a member of this room")
|
|
return
|
|
}
|
|
}
|
|
epoch := 0
|
|
if e := r.URL.Query().Get("epoch"); e != "" {
|
|
if n, err := strconv.Atoi(e); err == nil {
|
|
epoch = n
|
|
}
|
|
}
|
|
ep, sealed, err := s.store.GetSealedKey(roomID, endpoint, epoch)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
writeErr(w, http.StatusForbidden,
|
|
"not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.")
|
|
return
|
|
}
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, keyResp{Epoch: ep, SealedKey: sealed})
|
|
}
|
|
|
|
func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) {
|
|
roomID := r.PathValue("id")
|
|
// Membership authorization (audit H3): the member list exposes every member's
|
|
// sign_pub + kex_pub, so it must not be served to a non-member.
|
|
if _, ok := s.requireMember(w, r, roomID); !ok {
|
|
return
|
|
}
|
|
members, err := s.store.ListMembers(roomID)
|
|
if err != nil {
|
|
writeErr(w, http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
out := make([]memberJSON, 0, len(members))
|
|
for _, m := range members {
|
|
out = append(out, memberJSON{Endpoint: m.Endpoint, Role: m.Role, SignPub: m.SignPub, KexPub: m.KexPub})
|
|
}
|
|
writeJSON(w, http.StatusOK, out)
|
|
}
|
|
|
|
func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) {
|
|
endpoint := r.PathValue("endpoint")
|
|
if endpoint == "" {
|
|
writeErr(w, http.StatusBadRequest, "endpoint required")
|
|
return
|
|
}
|
|
// A peer may only enumerate its OWN room directory (audit H3): otherwise any
|
|
// registered identity could map another's entire social graph of rooms.
|
|
if signer, ok := signerEndpoint(r); ok && endpoint != signer {
|
|
writeErr(w, http.StatusForbidden, "forbidden: may only list your own rooms")
|
|
return
|
|
}
|
|
rooms, err := s.store.ListRoomsForEndpoint(endpoint)
|
|
if err != nil {
|
|
writeErr(w, http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
out := make([]memberRoomJSON, 0, len(rooms))
|
|
for _, rm := range rooms {
|
|
out = append(out, memberRoomJSON{
|
|
RoomID: rm.RoomID,
|
|
Subject: rm.Subject,
|
|
Epoch: rm.Epoch,
|
|
Policy: policyJSON{Encrypt: rm.Encrypt, Persist: rm.Persist, SignMsgs: rm.SignMsgs},
|
|
Role: rm.Role,
|
|
})
|
|
}
|
|
writeJSON(w, http.StatusOK, out)
|
|
}
|
|
|
|
func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) {
|
|
roomID := r.PathValue("id")
|
|
if _, ok := s.requireMember(w, r, roomID); !ok {
|
|
return
|
|
}
|
|
info, err := s.store.GetRoom(roomID)
|
|
if err != nil {
|
|
writeErr(w, http.StatusNotFound, "room not found")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, roomResp{
|
|
Subject: info.Subject,
|
|
Epoch: info.Epoch,
|
|
Policy: policyJSON{Encrypt: info.Encrypt, Persist: info.Persist, SignMsgs: info.SignMsgs},
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) {
|
|
roomID := r.PathValue("id")
|
|
var req rekeyReq
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeErr(w, http.StatusBadRequest, "bad json: "+err.Error())
|
|
return
|
|
}
|
|
sig := req.Sig
|
|
req.Sig = nil
|
|
if _, err := s.verifyOwnerSig(roomID, req.By, sig, canonicalSig(req)); err != nil {
|
|
writeErr(w, http.StatusForbidden, err.Error())
|
|
return
|
|
}
|
|
if req.NewEpoch <= 0 {
|
|
writeErr(w, http.StatusBadRequest, "new_epoch must be > 0")
|
|
return
|
|
}
|
|
// Bump epoch, then store the fresh sealed keys for the remaining members,
|
|
// then remove the kicked/left members.
|
|
if err := s.store.BumpEpoch(roomID, req.NewEpoch); err != nil {
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
keys := make(map[string][]byte, len(req.Keys))
|
|
for _, k := range req.Keys {
|
|
keys[k.Endpoint] = k.SealedKey
|
|
}
|
|
if len(keys) > 0 {
|
|
if err := s.store.PutSealedKeys(roomID, req.NewEpoch, keys); err != nil {
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
}
|
|
for _, ep := range req.Remove {
|
|
if err := s.store.RemoveMember(roomID, ep); err != nil {
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"status": "rekeyed", "epoch": req.NewEpoch})
|
|
}
|
|
|
|
func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) {
|
|
// The body arrives already bounded: ServeHTTP wraps it in a MaxBytesReader
|
|
// (maxBlobBytes) and rejects an over-declared Content-Length before this
|
|
// handler runs, in every auth mode. Reading here therefore cannot buffer
|
|
// more than the ceiling; a sender that lies about its length (e.g. chunked)
|
|
// trips MaxBytesReader and we map that to 413 rather than a generic 400.
|
|
data, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
if isBodyTooLarge(err) {
|
|
writeErr(w, http.StatusRequestEntityTooLarge, "request body too large")
|
|
return
|
|
}
|
|
writeErr(w, http.StatusBadRequest, "read body")
|
|
return
|
|
}
|
|
hash, err := s.blobs.Put(data)
|
|
if err != nil {
|
|
writeServerErr(w, r, http.StatusInternalServerError, "internal error", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, blobResp{Hash: hash})
|
|
}
|
|
|
|
func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
|
|
hash := r.PathValue("hash")
|
|
if strings.ContainsAny(hash, "/\\.") {
|
|
writeErr(w, http.StatusBadRequest, "invalid hash")
|
|
return
|
|
}
|
|
data, err := s.blobs.Get(hash)
|
|
if err != nil {
|
|
writeServerErr(w, r, http.StatusNotFound, "not found", err)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/octet-stream")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(data)
|
|
}
|