package main import ( "crypto/rand" "crypto/subtle" "encoding/hex" "encoding/json" "net/http" "os" "path/filepath" "strings" "time" ) // sessionCookie is the name of the gateway's session cookie. The browser sends // it automatically on same-origin fetches AND on EventSource (SSE) connections — // EventSource cannot set custom headers, so a cookie is the only way to // authenticate the stream. It is HttpOnly so page JS can never read the token. const sessionCookie = "unibus_session" // server is the gateway's HTTP surface: a small REST/SSE API under /api plus an // optional static file server for the built SPA. // // Two ways to get a session: // - POST /api/session — the WALLET model. The browser hands its own bus // identity (unlocked from its local encrypted key) and the gateway connects a // dedicated bus client AS that user. Per-user, the primary path. // - POST /api/login — the legacy operator passphrase. Binds the session to the // single shared operator gateway. Kept for backward compatibility. // - POST /api/register — the WALLET onboarding. Unauthenticated (the invite // token authorizes), it consumes a token and publishes the new user's PUBLIC // identity to the bus allowlist. type server struct { operatorGW *gateway // shared operator client (legacy passphrase login) busTemplate gatewayConfig // bus connection config; Identity is overridden per user session registrar *registrar // POST /api/register backend (mock + proxy) unlock string // passphrase that unlocks an operator session (constant-time compare) webDir string // optional path to the built SPA (web/dist); empty = API only mux *http.ServeMux sessions *sessionStore } func newServer(operatorGW *gateway, busTemplate gatewayConfig, registrar *registrar, unlock, webDir string) *server { s := &server{ operatorGW: operatorGW, busTemplate: busTemplate, registrar: registrar, unlock: unlock, webDir: webDir, mux: http.NewServeMux(), sessions: newSessionStore(), } s.routes() return s } func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } func (s *server) routes() { // Liveness, unauthenticated (systemd / deploy smoke). s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) }) // Unauthenticated onboarding / auth routes. s.mux.HandleFunc("POST /api/register", s.handleRegister) // invite token authorizes s.mux.HandleFunc("POST /api/session", s.handleSession) // wallet: per-user identity s.mux.HandleFunc("POST /api/login", s.handleLogin) // legacy operator passphrase // Session-gated routes. s.mux.HandleFunc("POST /api/logout", s.auth(s.handleLogout)) s.mux.HandleFunc("GET /api/me", s.auth(s.handleMe)) s.mux.HandleFunc("GET /api/rooms", s.auth(s.handleListRooms)) s.mux.HandleFunc("POST /api/rooms", s.auth(s.handleCreateRoom)) s.mux.HandleFunc("POST /api/rooms/{id}/join", s.auth(s.handleJoin)) s.mux.HandleFunc("POST /api/rooms/{id}/send", s.auth(s.handleSend)) s.mux.HandleFunc("GET /api/rooms/{id}/stream", s.auth(s.handleStream)) // Everything else is the SPA (when --web-dir is set). Registered last. if s.webDir != "" { s.mux.Handle("/", s.spaHandler()) } } // meResp is the identity view returned by /api/session, /api/login and /api/me: // the bus endpoint the session acts as, its signing public key, and the display // handle. type meResp struct { Endpoint string `json:"endpoint"` SignPub string `json:"sign_pub"` Handle string `json:"handle"` } // ---- auth ----------------------------------------------------------------- // auth wraps a handler so it runs only with a valid session cookie, resolving the // session (and thus the per-user gateway) it belongs to. A missing or unknown // token yields 401, which the SPA treats as "show the login screen". func (s *server) auth(next func(http.ResponseWriter, *http.Request, *session)) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { c, err := r.Cookie(sessionCookie) if err != nil { writeErr(w, http.StatusUnauthorized, "not authenticated") return } sess, ok := s.sessions.get(c.Value) if !ok { writeErr(w, http.StatusUnauthorized, "not authenticated") return } next(w, r, sess) } } // handleLogin is the legacy operator passphrase login: it unlocks a session bound // to the shared operator gateway. The wallet path (POST /api/session) is // preferred; this remains for backward compatibility with the single-operator MVP. func (s *server) handleLogin(w http.ResponseWriter, r *http.Request) { var req struct { Passphrase string `json:"passphrase"` } if !decode(w, r, &req) { return } // Constant-time compare so a wrong passphrase cannot be timed character by // character. An empty configured passphrase never matches. if s.unlock == "" || subtle.ConstantTimeCompare([]byte(req.Passphrase), []byte(s.unlock)) != 1 { writeErr(w, http.StatusUnauthorized, "wrong passphrase") return } tok := newToken() handle := s.operatorGW.endpoint if len(handle) > 8 { handle = handle[:8] } s.sessions.put(tok, &session{gw: s.operatorGW, owned: false, handle: handle, issuedAt: time.Now()}) http.SetCookie(w, &http.Cookie{ Name: sessionCookie, Value: tok, Path: "/", HttpOnly: true, SameSite: http.SameSiteLaxMode, }) writeJSON(w, http.StatusOK, meResp{Endpoint: s.operatorGW.endpoint, SignPub: hex.EncodeToString(s.operatorGW.id.SignPub), Handle: handle}) } func (s *server) handleLogout(w http.ResponseWriter, r *http.Request, _ *session) { if c, err := r.Cookie(sessionCookie); err == nil { if sess, ok := s.sessions.drop(c.Value); ok && sess.owned && sess.gw != nil { // Per-user session: tear down its bus client so the private key and the // NATS connection do not outlive the session. _ = sess.gw.Close() } } http.SetCookie(w, &http.Cookie{Name: sessionCookie, Value: "", Path: "/", MaxAge: -1, HttpOnly: true}) writeJSON(w, http.StatusOK, map[string]string{"status": "logged_out"}) } func (s *server) handleMe(w http.ResponseWriter, _ *http.Request, sess *session) { writeJSON(w, http.StatusOK, meResp{ Endpoint: sess.gw.endpoint, SignPub: hex.EncodeToString(sess.gw.id.SignPub), Handle: sess.handle, }) } // ---- rooms ---------------------------------------------------------------- func (s *server) handleListRooms(w http.ResponseWriter, _ *http.Request, sess *session) { rooms, err := sess.gw.listRooms() if err != nil { writeErr(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, rooms) } func (s *server) handleCreateRoom(w http.ResponseWriter, r *http.Request, sess *session) { var req createRoomReq if !decode(w, r, &req) { return } rv, err := sess.gw.createRoom(req) if err != nil { writeErr(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusCreated, rv) } func (s *server) handleJoin(w http.ResponseWriter, r *http.Request, sess *session) { if err := sess.gw.join(r.PathValue("id")); err != nil { writeErr(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"status": "joined"}) } func (s *server) handleSend(w http.ResponseWriter, r *http.Request, sess *session) { var req sendReq if !decode(w, r, &req) { return } if strings.TrimSpace(req.Body) == "" { writeErr(w, http.StatusBadRequest, "body required") return } if err := sess.gw.send(r.PathValue("id"), req.Body); err != nil { writeErr(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"status": "sent"}) } // handleStream is the SSE endpoint: it joins the room, attaches to the session's // fan-out hub, and streams each decrypted message as a `data:` event. For a // persisted room the hub's underlying subscription delivers history first // (scrollback) and then live messages; for an ephemeral room only live messages // flow. The stream ends when the browser disconnects (ctx cancelled). func (s *server) handleStream(w http.ResponseWriter, r *http.Request, sess *session) { flusher, ok := w.(http.Flusher) if !ok { writeErr(w, http.StatusInternalServerError, "streaming unsupported") return } ch, cleanup, err := sess.gw.openStream(r.PathValue("id")) if err != nil { writeErr(w, http.StatusBadGateway, err.Error()) return } defer cleanup() w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering (nginx/caddy) w.WriteHeader(http.StatusOK) // An initial comment opens the stream immediately so the browser's // EventSource fires `onopen` without waiting for the first message. _, _ = w.Write([]byte(": connected\n\n")) flusher.Flush() ctx := r.Context() ping := time.NewTicker(25 * time.Second) defer ping.Stop() for { select { case <-ctx.Done(): return case <-ping.C: // Comment line keeps idle proxies from closing the connection. if _, err := w.Write([]byte(": ping\n\n")); err != nil { return } flusher.Flush() case m := <-ch: b, err := json.Marshal(m) if err != nil { continue } if _, err := w.Write([]byte("data: " + string(b) + "\n\n")); err != nil { return } flusher.Flush() } } } // ---- SPA serving (optional) ----------------------------------------------- // spaHandler serves the built SPA from s.webDir. A request for an existing asset // is served directly; any other path (a client-side route) falls back to // index.html so the SPA router can take over. /api and /healthz are matched first. func (s *server) spaHandler() http.Handler { root := http.Dir(s.webDir) fileServer := http.FileServer(root) index := filepath.Join(s.webDir, "index.html") return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { p := strings.TrimPrefix(r.URL.Path, "/") if p == "" { http.ServeFile(w, r, index) return } if f, err := root.Open(p); err == nil { _ = f.Close() fileServer.ServeHTTP(w, r) return } http.ServeFile(w, r, index) // unknown path -> SPA client-side routing }) } // ---- helpers -------------------------------------------------------------- func newToken() string { b := make([]byte, 32) _, _ = rand.Read(b) return hex.EncodeToString(b) } func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } func writeErr(w http.ResponseWriter, code int, msg string) { writeJSON(w, code, map[string]string{"error": msg}) } // decode reads a JSON body into v, writing a 400 and returning false on failure. func decode(w http.ResponseWriter, r *http.Request, v any) bool { defer r.Body.Close() if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(v); err != nil { writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) return false } return true } // statFile reports whether path exists and is a regular file (used to validate // --web-dir at startup so a typo surfaces as a clear log line, not 404s later). func statFile(path string) bool { fi, err := os.Stat(path) return err == nil && !fi.IsDir() }