package membership import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "strconv" "strings" "time" cs "fn-registry/functions/cybersecurity" "golang.org/x/time/rate" "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/frame" "github.com/nats-io/nats.go/jetstream" ) // Body-size ceilings for the control plane. They bound how much an unauthenticated // peer can make the server buffer in RAM before the request is even authenticated // (the signature is verified over the full body, so the body must be read — but // not unboundedly). maxControlBodyBytes covers JSON metadata requests; /blobs gets // a separate, larger ceiling because media ciphertext is legitimately bigger. A // request whose declared Content-Length already exceeds its ceiling is rejected // before a single byte is buffered. const ( maxControlBodyBytes = 1 << 20 // 1 MiB for JSON control-plane requests maxBlobBytes = 16 << 20 // 16 MiB for a single media blob upload // MaxHeaderBytes caps request header size; wired into the http.Server by the // command. Exported so the bound lives next to its body-size siblings. MaxHeaderBytes = 1 << 20 // 1 MiB // maxInflightBytes is the GLOBAL cap on request-body bytes buffered across all // concurrent requests (audit N2). The per-request ceilings above bound one // request; this bounds the sum, so a concurrent (even multi-IP) flood of // max-size uploads cannot drive the resident set without limit. 128 MiB allows // ~8 concurrent 16 MiB blob uploads or ~128 concurrent control requests before // further POSTs are shed with 503 — generous for an interactive bus, bounded // for an attacker. maxInflightBytes = 128 << 20 // 128 MiB ) // Per-IP rate-limit defaults for the control plane. Tuned for an interactive // human/agent bus rather than a high-QPS API: a steady ~20 req/s with a burst of // 40 absorbs a chat client's bursty polling while throttling a flood. Loopback // dev stacks pass r<=0 to disable limiting entirely. const ( defaultRatePerSec = rate.Limit(20) defaultRateBurst = 40 rateBucketTTL = 10 * time.Minute ) // Server is the HTTP control plane: the authoritative source of room metadata, // membership, and per-epoch sealed keys. The data plane (messages) is NATS. // // Auth model (v1): mutating endpoints require an Ed25519 signature from the // room owner over the canonical bytes of the request (the request body with the // "sig" field cleared). v1 trusts the internal network: there is no TLS, no // rate limiting, and read endpoints (GET) are unauthenticated. Hardening // (mTLS, capabilities, rate limits) is a later phase. type Server struct { store Store blobs blobstore.Store mux *http.ServeMux authMode AuthMode nonces nonceStore limiter *ipRateLimiter inflight *inflightLimiter // RequireEncryptedRooms, when true, refuses to create cleartext (ModeNATS) // rooms. It is the minimum-defensive control for the data plane (audit H4): // the embedded NATS has no per-subject ACL, so a cleartext room is readable by // any registered peer that knows (or guesses) its subject. Forcing every room // to be end-to-end encrypted keeps message CONTENT confidential even when the // transport offers no subject isolation. The command sets this on a public // (non-loopback) bind. See dev/0004d-dataplane-acl.md for the full rationale // and the residual metadata exposure this does NOT close. RequireEncryptedRooms bool // Posture is the node's security posture, surfaced on /healthz so an operator // or a peer can detect a node NOT running the homogeneous enforce+ACL+TLS // posture a secure cluster requires (audit 0008 N1). It is set by the command; // the zero value (all false) reflects an unsecured dev node. Posture Posture } // Posture describes the security posture a membershipd node runs with. It is // non-secret operational metadata (booleans + the store backend name), published // on /healthz so a monitor can flag a cluster member that is not enforce+ACL+TLS // — the weak node that would let an unauthenticated peer harvest the cluster's // forwarded traffic (audit 0008 N1). type Posture struct { Enforce bool `json:"enforce"` ACL bool `json:"acl"` TLS bool `json:"tls"` Cluster bool `json:"cluster"` Store string `json:"store"` // "sqlite" | "kv" } // NewServer wires the membership store and blob store into an http.Handler. The // authMode selects the control-plane auth rollout state (AuthOff for callers and // tests that have not migrated to signed requests yet). It installs a per-IP // rate limiter with the package defaults; loopback dev behavior is unchanged // because the burst comfortably exceeds any single client's request rate. func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server { s := &Server{ store: store, blobs: blobs, mux: http.NewServeMux(), authMode: authMode, nonces: newMemNonceCache(nonceTTL, maxNonceCacheEntries), limiter: newIPRateLimiter(defaultRatePerSec, defaultRateBurst, rateBucketTTL), inflight: newInflightLimiter(maxInflightBytes), } s.routes() return s } // UseReplicatedNonces switches the server's anti-replay store from the // per-process in-memory cache to a JetStream KV bucket shared across the cluster // (issue 0003e). It MUST be called on every node of a multi-node deployment: // otherwise a request captured on one node can be replayed to another whose // local cache never saw the nonce. replicas is the bucket's replication factor // (R1..R3). The TTL matches the in-memory cache (nonceTTL = 2*clockSkew), so a // replay can never outlive its memory. func (s *Server) UseReplicatedNonces(js jetstream.JetStream, replicas int) error { ns, err := newKVNonceStore(js, nonceTTL, replicas, 0) if err != nil { return err } s.nonces = ns return nil } // ServeHTTP satisfies http.Handler. It runs the control-plane auth middleware // (signature verification + anti-replay + allowlist) ahead of the router // according to authMode, then dispatches to the matched handler. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { now := time.Now() // Per-IP rate limit runs first, ahead of auth and body reads, so a flood is // shed at the cheapest possible point. ONLY the health probe is exempt so // liveness checks are never throttled — note this is isRateExempt, NOT // isAuthExempt: POST /register is auth-exempt (no admin signature) but stays // rate-limited, since it is the one un-signed path that mutates the allowlist. if !isRateExempt(r) && !s.limiter.allow(clientIP(r), now) { writeErr(w, http.StatusTooManyRequests, "rate limit exceeded") return } // Cap how much body we will buffer, BEFORE reading a single byte. The ceiling // is per-route: /blobs may legitimately carry a media ciphertext, everything // else is small JSON. A declared Content-Length over the ceiling is rejected // outright (no buffering); MaxBytesReader then guards against a lying or // chunked sender by failing the read once the limit is crossed. This is the // fix for the pre-auth DoS: without it an unauthenticated peer could make the // server buffer an unbounded body in RAM before authenticate() ever ran. limit := int64(maxControlBodyBytes) if r.Method == http.MethodPost && r.URL.Path == "/blobs" { limit = int64(maxBlobBytes) } if r.ContentLength > limit { writeErr(w, http.StatusRequestEntityTooLarge, "request body too large") return } r.Body = http.MaxBytesReader(w, r.Body, limit) // Aggregate memory bound (audit N2): the per-request ceiling above and the // per-IP rate limit do not cap the TOTAL bytes buffered across concurrent // requests. A POST reserves its worst-case buffered size (its route ceiling) // from a global limiter before the body is read, and is shed with 503 when the // cap is reached, so the resident set stays bounded under a concurrent (even // multi-IP) upload flood instead of growing linearly with the number of // connections. Reservation is released when the request finishes. Only POSTs // buffer a body; GETs carry none, so they do not consume the budget. if r.Method == http.MethodPost { if !s.inflight.tryAcquire(limit) { writeErr(w, http.StatusServiceUnavailable, "server busy: too many concurrent uploads in flight") return } defer s.inflight.release(limit) } if s.authMode == AuthOff || isAuthExempt(r) { s.mux.ServeHTTP(w, r) return } // Buffer the (now bounded) body so the signature can be verified over it and // the handler still reads it. body, err := io.ReadAll(r.Body) if err != nil { if isBodyTooLarge(err) { writeErr(w, http.StatusRequestEntityTooLarge, "request body too large") return } writeErr(w, http.StatusBadRequest, "read body") return } _ = r.Body.Close() r.Body = io.NopCloser(bytes.NewReader(body)) res, err := s.authenticate(r, body, now) if err != nil { if s.authMode == AuthSoft { log.Printf("[auth] soft: would reject %s %s: %v", r.Method, r.URL.Path, err) s.mux.ServeHTTP(w, r) return } writeErr(w, http.StatusUnauthorized, "unauthorized: "+err.Error()) return } // Carry the authenticated signer's endpoint AND signing key into the handler. // Room handlers authorize by membership via the endpoint (audit H3); the // user-management handlers authorize by role via the signing key (the endpoint // id is a one-way hash of the key, so it cannot be reversed to look the signer // up in the user allowlist). Both are set only on a verified identity. s.mux.ServeHTTP(w, r.WithContext(withSigner(r.Context(), res.endpoint, res.pubHex))) } // isBodyTooLarge reports whether err is the sentinel returned by MaxBytesReader // when the body exceeds its limit, so the middleware can map it to 413. func isBodyTooLarge(err error) bool { var maxErr *http.MaxBytesError return errors.As(err, &maxErr) } // ctxKey is the unexported type for this package's request-context keys, so the // values cannot collide with keys set by other packages. type ctxKey int const ( ctxSignerEndpoint ctxKey = iota ctxSignerPub ) // withSigner returns a context carrying the authenticated signer's endpoint id // and signing public key (lowercase hex). The endpoint authorizes room // membership; the signing key authorizes user-management by role, because the // endpoint id is a one-way hash of the key (base64url(sha256(signPub))) and so // cannot be reversed to look the signer up in the user allowlist. func withSigner(ctx context.Context, endpoint, pubHex string) context.Context { ctx = context.WithValue(ctx, ctxSignerEndpoint, endpoint) return context.WithValue(ctx, ctxSignerPub, pubHex) } // signerEndpoint returns the authenticated signer's endpoint id and whether one // is present. It is absent under AuthOff (no verification) and when a soft-mode // request was let through unauthenticated — in both cases membership // authorization is skipped, preserving dev/legacy behavior. func signerEndpoint(r *http.Request) (string, bool) { v, ok := r.Context().Value(ctxSignerEndpoint).(string) return v, ok && v != "" } // signerPubHex returns the authenticated signer's signing public key (lowercase // hex) and whether one is present. Like signerEndpoint it is absent under // AuthOff and on a soft-mode pass-through; the user-management handlers treat // that absence as "no admin identity" and deny (default-deny), since a // privilege-granting operation must never run without a verified admin. func signerPubHex(r *http.Request) (string, bool) { v, ok := r.Context().Value(ctxSignerPub).(string) return v, ok && v != "" } // requireMember authorizes a room request by membership (audit H3): it returns // the signer endpoint and true when the request may proceed, or writes 403 and // returns false when an authenticated signer is not a member of roomID. When no // authenticated signer is present (AuthOff/dev, or soft pass-through) it allows // the request — membership is only enforced once the caller's identity is known. func (s *Server) requireMember(w http.ResponseWriter, r *http.Request, roomID string) (string, bool) { signer, ok := signerEndpoint(r) if !ok { return "", true } if _, err := s.store.GetMember(roomID, signer); err != nil { writeErr(w, http.StatusForbidden, "forbidden: not a member of this room") return signer, false } return signer, true } // requireAdmin authorizes a user-management request: it returns the signer's // signing-key hex and true ONLY when the authenticated signer is a user with // role admin and active status; otherwise it writes 403 and returns false. // // Default-deny, with no dev relaxation: unlike requireMember (which allows a // request when no authenticated signer is present, preserving AuthOff/dev // behavior for room reads), this denies whenever the signer is absent or is not // a verified active admin. The user-management endpoints grant and revoke bus // access, so they must never be reachable without a verified admin identity — // the store is consulted on every call so a just-revoked admin is denied // immediately, and any store error fails closed. func (s *Server) requireAdmin(w http.ResponseWriter, r *http.Request) (string, bool) { pubHex, ok := signerPubHex(r) if !ok { writeErr(w, http.StatusForbidden, "forbidden: admin role required") return "", false } u, err := s.store.GetUser(pubHex) if err != nil || u.Role != RoleAdmin || u.Status != StatusActive { writeErr(w, http.StatusForbidden, "forbidden: admin role required") return "", false } return pubHex, true } // isRateExempt lists requests that bypass the per-IP rate limiter. Only the // health probe qualifies: a load balancer / systemd / smoke check polls it and // must never be throttled. Everything else — including POST /register — is rate // limited. func isRateExempt(r *http.Request) bool { return r.Method == http.MethodGet && r.URL.Path == "/healthz" } // isAuthExempt lists requests that bypass control-plane signature auth even under // enforce. Two qualify: // - GET /healthz: carries no data, needed before any identity exists. // - POST /register: the wallet-model join path. The registering identity is not // yet in the allowlist, so it CANNOT produce an accepted admin signature; // authorization is the single-use bearer invite token, validated inside the // handler (ConsumeInvite). It stays rate-limited (see isRateExempt) and // strictly validates the hex keys before spending the token. func isAuthExempt(r *http.Request) bool { if r.Method == http.MethodGet && r.URL.Path == "/healthz" { return true } return r.Method == http.MethodPost && r.URL.Path == "/register" } func (s *Server) routes() { s.mux.HandleFunc("GET /healthz", s.handleHealth) s.mux.HandleFunc("POST /rooms", s.handleCreateRoom) s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite) s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey) s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers) s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms) s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey) s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom) s.mux.HandleFunc("POST /blobs", s.handlePutBlob) s.mux.HandleFunc("GET /blobs/{hash}", s.handleGetBlob) // User-management (admin-only) — the HTTP-signed equivalent of the local // `membershipd user` CLI, so the admin panel manages the bus allowlist by // signing as an admin instead of needing direct store/KV access. All three // pass through requireAdmin; they hit the same store the room handlers do. s.mux.HandleFunc("GET /users", s.handleListUsers) s.mux.HandleFunc("POST /users", s.handleAddUser) s.mux.HandleFunc("POST /users/{signpub}/revoke", s.handleRevokeUser) // Hard-delete (purge) a user — distinct from revoke (status flip). Admin-only. s.mux.HandleFunc("DELETE /users/{signpub}", s.handleDeleteUser) // Invites — the wallet-model account-creation path. The admin mints a // single-use link (POST /invites, admin-only); the new user's client redeems // it without an admin signature (POST /register, token-authorized). Listing // and cancelling a pending invite are admin-only. s.mux.HandleFunc("POST /invites", s.handleCreateInvite) s.mux.HandleFunc("GET /invites", s.handleListInvites) s.mux.HandleFunc("DELETE /invites/{token}", s.handleCancelInvite) s.mux.HandleFunc("POST /register", s.handleRegister) } // ---- wire types ----------------------------------------------------------- type policyJSON struct { Encrypt bool `json:"encrypt"` Persist bool `json:"persist"` SignMsgs bool `json:"sign_msgs"` } type endpointJSON struct { Endpoint string `json:"endpoint"` SignPub []byte `json:"sign_pub"` KexPub []byte `json:"kex_pub"` } type createRoomReq struct { Subject string `json:"subject"` Policy policyJSON `json:"policy"` Owner endpointJSON `json:"owner"` SealedKeySelf []byte `json:"sealed_key_self"` } type createRoomResp struct { RoomID string `json:"room_id"` } type inviteReq struct { By string `json:"by"` // owner endpoint id Sig []byte `json:"sig"` // Ed25519 over canonical(request with sig cleared) Member endpointJSON `json:"member"` SealedKey []byte `json:"sealed_key"` } type keyResp struct { Epoch int `json:"epoch"` SealedKey []byte `json:"sealed_key"` } type memberJSON struct { Endpoint string `json:"endpoint"` Role string `json:"role"` SignPub []byte `json:"sign_pub"` KexPub []byte `json:"kex_pub"` } type roomResp struct { Subject string `json:"subject"` Epoch int `json:"epoch"` Policy policyJSON `json:"policy"` } type memberRoomJSON struct { RoomID string `json:"room_id"` Subject string `json:"subject"` Epoch int `json:"epoch"` Policy policyJSON `json:"policy"` Role string `json:"role"` } type rekeyKey struct { Endpoint string `json:"endpoint"` SealedKey []byte `json:"sealed_key"` } type rekeyReq struct { By string `json:"by"` Sig []byte `json:"sig"` NewEpoch int `json:"new_epoch"` Keys []rekeyKey `json:"keys"` Remove []string `json:"remove"` } type blobResp struct { Hash string `json:"hash"` } // userJSON is the wire representation of a bus user on the admin endpoints. It // carries the full record the panel needs to render the allowlist, including // status (so revoked users are visible) and the timestamps. revoked_at is // omitted for an active user. type userJSON struct { SignPub string `json:"sign_pub"` Handle string `json:"handle"` Role string `json:"role"` Status string `json:"status"` CreatedAt string `json:"created_at"` RevokedAt string `json:"revoked_at,omitempty"` } // addUserReq is the POST /users body: the new user's Ed25519 signing key // (64-hex), human handle, and role. role is optional and defaults to member. type addUserReq struct { SignPub string `json:"sign_pub"` Handle string `json:"handle"` Role string `json:"role"` } // createInviteReq is the POST /invites body (admin-only): the handle and role the // future user will receive (fixed here, NOT chosen by the registering client) and // an optional TTL in seconds (non-positive uses the 7-day default). type createInviteReq struct { Handle string `json:"handle"` Role string `json:"role"` TTLSecs int `json:"ttl_secs"` } // createInviteResp is the POST /invites reply: the bearer token to put in the // join link and its absolute expiry. The token is shown ONCE here; the admin // copies the link immediately. type createInviteResp struct { Token string `json:"token"` ExpiresAt string `json:"expires_at"` } // inviteJSON is the wire representation of a pending invite on GET /invites. It // omits the audit fields (used_*) because the listing is of pending invites only; // used_at is carried so a client can render "expires in N". type inviteJSON struct { Token string `json:"token"` Handle string `json:"handle"` Role string `json:"role"` ExpiresAt string `json:"expires_at"` Used bool `json:"used"` CreatedAt string `json:"created_at"` } // registerReq is the POST /register body. It is the ONLY allowlist-mutating // request that carries no admin signature: the bearer Token authorizes it. The // client supplies its freshly-generated public keys (sign_pub = Ed25519 identity, // kex_pub = X25519 key-exchange), both 64-hex. The handle and role come from the // invite, never from this body — the client cannot escalate. type registerReq struct { Token string `json:"token"` SignPub string `json:"sign_pub"` KexPub string `json:"kex_pub"` } // ---- helpers -------------------------------------------------------------- func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } func writeErr(w http.ResponseWriter, code int, msg string) { writeJSON(w, code, map[string]string{"error": msg}) } // writeServerErr logs the internal error detail and returns ONLY a generic // message to the client (audit H12): raw store/blob errors embed SQL fragments // and filesystem paths, which must not leak to a caller. Use it for any error // that originates inside the server (5xx, or a not-found wrapping a store error). func writeServerErr(w http.ResponseWriter, r *http.Request, code int, publicMsg string, err error) { log.Printf("[handler] %s %s -> %d: %v", r.Method, r.URL.Path, code, err) writeErr(w, code, publicMsg) } // canonicalSig returns the bytes to verify for a request: the request struct // re-marshaled with its Sig field cleared. The caller passes a copy with Sig // already zeroed. This is symmetric with how the client signs. func canonicalSig(v any) []byte { b, _ := json.Marshal(v) return b } // verifyOwnerSig checks that sig is a valid Ed25519 signature by the room owner // over canonical(reqWithSigCleared). It returns the owner Member on success. func (s *Server) verifyOwnerSig(roomID, by string, sig, canonical []byte) (Member, error) { info, err := s.store.GetRoom(roomID) if err != nil { return Member{}, fmt.Errorf("room not found") } if by != info.OwnerEndpoint { return Member{}, fmt.Errorf("requester %q is not the room owner", by) } owner, err := s.store.GetMember(roomID, by) if err != nil { return Member{}, fmt.Errorf("owner member not found") } if !cs.VerifyEd25519(owner.SignPub, canonical, sig) { return Member{}, fmt.Errorf("invalid owner signature") } return owner, nil } // ---- handlers ------------------------------------------------------------- func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "posture": s.Posture}) } func (s *Server) handleCreateRoom(w http.ResponseWriter, r *http.Request) { var req createRoomReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return } if req.Subject == "" || req.Owner.Endpoint == "" { writeErr(w, http.StatusBadRequest, "subject and owner.endpoint required") return } // Data-plane minimum defense (audit H4): on a public deployment cleartext // rooms are disabled, so no message ever rides the un-ACL'd NATS subject in // the clear for another registered peer to sniff. if s.RequireEncryptedRooms && !req.Policy.Encrypt { writeErr(w, http.StatusForbidden, "cleartext rooms are disabled on this deployment; create an encrypted (Matrix-policy) room") return } // Owner binding (audit H6): the declared owner must BE the authenticated // signer — both the endpoint id and the signing key. Otherwise a registered // peer could create rooms in another identity's name. Enforced only when an // authenticated signer is present (AuthOff/dev trusts the caller). if signer, ok := signerEndpoint(r); ok { if req.Owner.Endpoint != signer || frame.EndpointID(req.Owner.SignPub) != signer { writeErr(w, http.StatusForbidden, "forbidden: room owner must be the authenticated signer") return } } roomID := newULID() info := RoomInfo{ RoomID: roomID, Subject: req.Subject, Encrypt: req.Policy.Encrypt, Persist: req.Policy.Persist, SignMsgs: req.Policy.SignMsgs, OwnerEndpoint: req.Owner.Endpoint, } if err := s.store.CreateRoom(info, req.Owner.SignPub, req.Owner.KexPub, req.SealedKeySelf); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusCreated, createRoomResp{RoomID: roomID}) } func (s *Server) handleInvite(w http.ResponseWriter, r *http.Request) { roomID := r.PathValue("id") var req inviteReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return } // Canonical bytes = the request with Sig cleared. sig := req.Sig req.Sig = nil if _, err := s.verifyOwnerSig(roomID, req.By, sig, canonicalSig(req)); err != nil { writeErr(w, http.StatusForbidden, err.Error()) return } info, err := s.store.GetRoom(roomID) if err != nil { writeServerErr(w, r, http.StatusNotFound, "room not found", err) return } m := Member{ Endpoint: req.Member.Endpoint, Role: "member", SignPub: req.Member.SignPub, KexPub: req.Member.KexPub, } if err := s.store.AddMember(roomID, m, info.Epoch, req.SealedKey); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusOK, map[string]string{"status": "invited"}) } func (s *Server) handleGetKey(w http.ResponseWriter, r *http.Request) { roomID := r.PathValue("id") endpoint := r.URL.Query().Get("endpoint") if endpoint == "" { writeErr(w, http.StatusBadRequest, "endpoint query param required") return } // A sealed room key is sealed to one identity's X25519 key. Serving it only to // that identity (the signer) stops a registered peer from harvesting another // member's sealed key (audit H3). Membership is implied by owning a sealed key, // but we also require the signer to be a member for defense in depth. if signer, ok := signerEndpoint(r); ok { if endpoint != signer { writeErr(w, http.StatusForbidden, "forbidden: may only fetch your own sealed key") return } if _, err := s.store.GetMember(roomID, signer); err != nil { writeErr(w, http.StatusForbidden, "forbidden: not a member of this room") return } } epoch := 0 if e := r.URL.Query().Get("epoch"); e != "" { if n, err := strconv.Atoi(e); err == nil { epoch = n } } ep, sealed, err := s.store.GetSealedKey(roomID, endpoint, epoch) if err != nil { if errors.Is(err, ErrNotFound) { writeErr(w, http.StatusForbidden, "not invited to this encrypted room: no key has been sealed for your identity. Ask the room owner to invite you before joining.") return } writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusOK, keyResp{Epoch: ep, SealedKey: sealed}) } func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) { roomID := r.PathValue("id") // Membership authorization (audit H3): the member list exposes every member's // sign_pub + kex_pub, so it must not be served to a non-member. if _, ok := s.requireMember(w, r, roomID); !ok { return } members, err := s.store.ListMembers(roomID) if err != nil { writeErr(w, http.StatusInternalServerError, "internal error") return } out := make([]memberJSON, 0, len(members)) for _, m := range members { out = append(out, memberJSON{Endpoint: m.Endpoint, Role: m.Role, SignPub: m.SignPub, KexPub: m.KexPub}) } writeJSON(w, http.StatusOK, out) } func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) { endpoint := r.PathValue("endpoint") if endpoint == "" { writeErr(w, http.StatusBadRequest, "endpoint required") return } // A peer may only enumerate its OWN room directory (audit H3): otherwise any // registered identity could map another's entire social graph of rooms. if signer, ok := signerEndpoint(r); ok && endpoint != signer { writeErr(w, http.StatusForbidden, "forbidden: may only list your own rooms") return } rooms, err := s.store.ListRoomsForEndpoint(endpoint) if err != nil { writeErr(w, http.StatusInternalServerError, "internal error") return } out := make([]memberRoomJSON, 0, len(rooms)) for _, rm := range rooms { out = append(out, memberRoomJSON{ RoomID: rm.RoomID, Subject: rm.Subject, Epoch: rm.Epoch, Policy: policyJSON{Encrypt: rm.Encrypt, Persist: rm.Persist, SignMsgs: rm.SignMsgs}, Role: rm.Role, }) } writeJSON(w, http.StatusOK, out) } func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) { roomID := r.PathValue("id") if _, ok := s.requireMember(w, r, roomID); !ok { return } info, err := s.store.GetRoom(roomID) if err != nil { writeErr(w, http.StatusNotFound, "room not found") return } writeJSON(w, http.StatusOK, roomResp{ Subject: info.Subject, Epoch: info.Epoch, Policy: policyJSON{Encrypt: info.Encrypt, Persist: info.Persist, SignMsgs: info.SignMsgs}, }) } func (s *Server) handleRekey(w http.ResponseWriter, r *http.Request) { roomID := r.PathValue("id") var req rekeyReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return } sig := req.Sig req.Sig = nil if _, err := s.verifyOwnerSig(roomID, req.By, sig, canonicalSig(req)); err != nil { writeErr(w, http.StatusForbidden, err.Error()) return } if req.NewEpoch <= 0 { writeErr(w, http.StatusBadRequest, "new_epoch must be > 0") return } // Bump epoch, then store the fresh sealed keys for the remaining members, // then remove the kicked/left members. if err := s.store.BumpEpoch(roomID, req.NewEpoch); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } keys := make(map[string][]byte, len(req.Keys)) for _, k := range req.Keys { keys[k.Endpoint] = k.SealedKey } if len(keys) > 0 { if err := s.store.PutSealedKeys(roomID, req.NewEpoch, keys); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } } for _, ep := range req.Remove { if err := s.store.RemoveMember(roomID, ep); err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } } writeJSON(w, http.StatusOK, map[string]any{"status": "rekeyed", "epoch": req.NewEpoch}) } func (s *Server) handlePutBlob(w http.ResponseWriter, r *http.Request) { // The body arrives already bounded: ServeHTTP wraps it in a MaxBytesReader // (maxBlobBytes) and rejects an over-declared Content-Length before this // handler runs, in every auth mode. Reading here therefore cannot buffer // more than the ceiling; a sender that lies about its length (e.g. chunked) // trips MaxBytesReader and we map that to 413 rather than a generic 400. data, err := io.ReadAll(r.Body) if err != nil { if isBodyTooLarge(err) { writeErr(w, http.StatusRequestEntityTooLarge, "request body too large") return } writeErr(w, http.StatusBadRequest, "read body") return } hash, err := s.blobs.Put(data) if err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusOK, blobResp{Hash: hash}) } func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) { hash := r.PathValue("hash") if strings.ContainsAny(hash, "/\\.") { writeErr(w, http.StatusBadRequest, "invalid hash") return } data, err := s.blobs.Get(hash) if err != nil { writeServerErr(w, r, http.StatusNotFound, "not found", err) return } w.Header().Set("Content-Type", "application/octet-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write(data) } // ---- user-management handlers (admin-only) -------------------------------- // handleListUsers returns the full bus allowlist, including revoked users, so an // admin sees the complete picture (a revoked identity stays auditable). Admin-only. func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } users, err := s.store.ListUsers() if err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } out := make([]userJSON, 0, len(users)) for _, u := range users { out = append(out, userJSON{ SignPub: u.SignPub, Handle: u.Handle, Role: u.Role, Status: u.Status, CreatedAt: u.CreatedAt, RevokedAt: u.RevokedAt, }) } writeJSON(w, http.StatusOK, out) } // handleAddUser registers a new bus user from an admin-supplied Ed25519 signing // key. It mirrors the `membershipd user add` CLI: the key must be 64-hex, the // role must be admin or member (empty defaults to member), and re-adding an // already-registered key is a 409 that leaves the existing row untouched — no // silent upsert that could flip a role or clobber status. Admin-only. func (s *Server) handleAddUser(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } var req addUserReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return } if req.SignPub == "" || req.Handle == "" { writeErr(w, http.StatusBadRequest, "sign_pub and handle required") return } if err := ValidateSignPubHex(req.SignPub); err != nil { writeErr(w, http.StatusBadRequest, err.Error()) return } role := req.Role if role == "" { role = RoleMember } if role != RoleAdmin && role != RoleMember { writeErr(w, http.StatusBadRequest, fmt.Sprintf("invalid role %q (want %q or %q)", role, RoleAdmin, RoleMember)) return } if err := s.store.AddUser(req.SignPub, req.Handle, role); err != nil { if errors.Is(err, ErrUserExists) { // Idempotency contract (mirrors the CLI): re-adding a key is an explicit, // non-destructive conflict. To replace a user, revoke then add again. writeErr(w, http.StatusConflict, "user already registered (unchanged); revoke it first to replace") return } writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusCreated, map[string]string{"status": "added"}) } // handleRevokeUser revokes a bus user by signing key. Revocation is a status // flip (no hard delete) so the identity stays auditable and IsAuthorized denies // it on both planes immediately. Revoking an unknown or already-revoked key is a // 404. Admin-only. func (s *Server) handleRevokeUser(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } signPub := r.PathValue("signpub") if err := ValidateSignPubHex(signPub); err != nil { writeErr(w, http.StatusBadRequest, err.Error()) return } if err := s.store.RevokeUser(signPub); err != nil { writeServerErr(w, r, http.StatusNotFound, "no active user with that key", err) return } writeJSON(w, http.StatusOK, map[string]string{"status": "revoked"}) } // handleDeleteUser hard-deletes a bus user by signing key — the purge that the // admin panel's "Eliminar" (permanent) action maps to, distinct from revoke's // status flip. The row is removed entirely (no audit trail kept); use revoke when // an auditable record must remain. Deleting an unknown key is a 404. Admin-only. // // Security note: like revoke, this does NOT special-case the last admin — an // admin can delete the final admin and lock the HTTP user-management surface. The // recovery seam is the local `membershipd user add` CLI (which re-seeds an admin // directly against the store), the same chicken-egg breaker that seeds the first // admin. func (s *Server) handleDeleteUser(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } signPub := r.PathValue("signpub") if err := ValidateSignPubHex(signPub); err != nil { writeErr(w, http.StatusBadRequest, err.Error()) return } if err := s.store.DeleteUser(signPub); err != nil { if errors.Is(err, ErrNotFound) { writeErr(w, http.StatusNotFound, "no user with that key") return } writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusOK, map[string]string{"status": "deleted"}) } // ---- invite handlers ------------------------------------------------------ // handleCreateInvite mints a single-use registration invite. The handle and role // are fixed here by the admin; the role is validated (admin|member, empty -> // member) so an unknown role is a clean 400 rather than an opaque 500. The reply // carries the bearer token and its expiry — the admin turns the token into the // join link. Admin-only. func (s *Server) handleCreateInvite(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } var req createInviteReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return } if req.Handle == "" { writeErr(w, http.StatusBadRequest, "handle required") return } if req.Role != "" && req.Role != RoleAdmin && req.Role != RoleMember { writeErr(w, http.StatusBadRequest, fmt.Sprintf("invalid role %q (want %q or %q)", req.Role, RoleAdmin, RoleMember)) return } inv, err := s.store.CreateInvite(req.Handle, req.Role, req.TTLSecs) if err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusCreated, createInviteResp{Token: inv.Token, ExpiresAt: inv.ExpiresAt}) } // handleListInvites returns the PENDING invites (not yet used and not expired), so // the admin panel shows only live links worth copying. Consumed/expired invites // are filtered out here rather than at the store, which exposes the full set for // other callers. Admin-only. func (s *Server) handleListInvites(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } invites, err := s.store.ListInvites() if err != nil { writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } out := make([]inviteJSON, 0, len(invites)) for _, inv := range invites { if inv.Used || inviteIsExpired(inv.ExpiresAt) { continue // pending only } out = append(out, inviteJSON{ Token: inv.Token, Handle: inv.Handle, Role: inv.Role, ExpiresAt: inv.ExpiresAt, Used: inv.Used, CreatedAt: inv.CreatedAt, }) } writeJSON(w, http.StatusOK, out) } // handleCancelInvite cancels (hard-deletes) a pending invite, so an admin can // revoke a link before it is redeemed. Cancelling an unknown token is a 404. // Admin-only. func (s *Server) handleCancelInvite(w http.ResponseWriter, r *http.Request) { if _, ok := s.requireAdmin(w, r); !ok { return } token := r.PathValue("token") if token == "" { writeErr(w, http.StatusBadRequest, "token required") return } if err := s.store.CancelInvite(token); err != nil { if errors.Is(err, ErrNotFound) { writeErr(w, http.StatusNotFound, "no such invite") return } writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) return } writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"}) } // handleRegister redeems an invite: the wallet-model join path. It is auth-exempt // (no admin signature; see isAuthExempt) but rate-limited and strictly validated. // The client presents the single-use token plus its freshly-generated public keys // (sign_pub Ed25519, kex_pub X25519). Both keys are validated as 64-hex BEFORE the // token is spent, the handle and role come from the invite (never this body), and // ConsumeInvite enforces single-use atomically. Errors map to precise codes so a // client can tell "unknown" from "used" from "expired". func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) { var req registerReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return } if req.Token == "" { writeErr(w, http.StatusBadRequest, "token required") return } if err := ValidateSignPubHex(req.SignPub); err != nil { writeErr(w, http.StatusBadRequest, err.Error()) return } if err := ValidateKexPubHex(req.KexPub); err != nil { writeErr(w, http.StatusBadRequest, err.Error()) return } err := s.store.ConsumeInvite(req.Token, req.SignPub, req.KexPub) switch { case err == nil: writeJSON(w, http.StatusCreated, map[string]string{"status": "registered"}) case errors.Is(err, ErrNotFound): writeErr(w, http.StatusNotFound, "invalid or unknown invite token") case errors.Is(err, ErrInviteUsed): writeErr(w, http.StatusConflict, "invite already used") case errors.Is(err, ErrInviteExpired): writeErr(w, http.StatusGone, "invite expired") case errors.Is(err, ErrUserExists): writeErr(w, http.StatusConflict, "identity already registered") default: writeServerErr(w, r, http.StatusInternalServerError, "internal error", err) } }