From fb8a03cf0c4f2fce23271f392f864b821eae50ad Mon Sep 17 00:00:00 2001 From: agent Date: Sun, 7 Jun 2026 21:14:08 +0200 Subject: [PATCH] feat(webgw): web gateway peer (REST + SSE) for the chat SPA MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add cmd/webgw: a single Go binary that holds the operator's bus identity, connects to the bus as a real authenticated peer (pkg/client), and exposes a small REST + SSE API the browser consumes. The browser never signs, never speaks NATS, and never sees a private key. Endpoints (all under /api, gated by a session cookie except login): POST /api/login unlock a session with the operator passphrase POST /api/logout GET /api/me operator identity the gateway acts as GET /api/rooms ListMyRooms POST /api/rooms CreateRoom (default policy: encrypted+persisted+signed) POST /api/rooms/{id}/join Join (fetch room key) POST /api/rooms/{id}/send Publish (sealed + signed by the peer) GET /api/rooms/{id}/stream SSE of decrypted frames (history then live) Design notes: - One fan-out hub per room: a single bus subscription is multiplexed to N SSE clients, avoiding the per-(room,endpoint) durable-consumer contention that multiple Subscribe calls would cause. - Posture seam mirrors unibus_admin/clientcheck: empty --ca = plaintext dev, non-empty = TLS+nkey on both planes; RefreshSession after a membership change only under the secured (ACL) posture. - Identity loaded from `pass` or a 0600 file, held only in memory. - Session auth: passphrase compared in constant time; opaque HttpOnly cookie so EventSource (which cannot set headers) can authenticate the stream. TRUST MODEL: room content stays end-to-end encrypted on the bus. The gateway reads plaintext only because it acts AS the operator's client — a legitimate member of each room holding the room key. The per-browser wallet (WebCrypto) that moves decryption into the browser is phase 2. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/webgw/gateway.go | 246 ++++++++++++++++++++++++++++++++++ cmd/webgw/hub.go | 140 ++++++++++++++++++++ cmd/webgw/identity.go | 98 ++++++++++++++ cmd/webgw/main.go | 180 +++++++++++++++++++++++++ cmd/webgw/server.go | 301 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 965 insertions(+) create mode 100644 cmd/webgw/gateway.go create mode 100644 cmd/webgw/hub.go create mode 100644 cmd/webgw/identity.go create mode 100644 cmd/webgw/main.go create mode 100644 cmd/webgw/server.go diff --git a/cmd/webgw/gateway.go b/cmd/webgw/gateway.go new file mode 100644 index 00000000..738f761b --- /dev/null +++ b/cmd/webgw/gateway.go @@ -0,0 +1,246 @@ +package main + +import ( + "encoding/hex" + "fmt" + "strings" + "sync" + + cs "fn-registry/functions/cybersecurity" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/room" +) + +// gateway is the live web gateway: it owns the operator's identity and a single +// connected unibus client, and turns the bus's crypto-bearing API into the plain +// REST/SSE surface the browser consumes. The browser never signs, never speaks +// NATS, and never sees a private key — the gateway is the legitimate room member +// that seals/opens payloads on the browser's behalf. +// +// TRUST MODEL: content stays end-to-end encrypted on the wire. The gateway can +// read plaintext because it acts AS the operator's client — a real member of +// each room, holding the room key K like any peer. It is the same trust a native +// desktop client has. In the wallet phase (per-browser WebCrypto identity) the +// decryption can move into the browser; today, for the single-operator MVP, the +// gateway decrypts server-side and pushes cleartext over a loopback/authenticated +// SSE channel. +type gateway struct { + id cs.Identity + endpoint string + cli *client.Client + refreshACL bool // call RefreshSession after a membership change (needed under a per-subject ACL bus) + + mu sync.Mutex + hubs map[string]*roomHub // roomID -> live fan-out of decrypted frames to SSE clients +} + +// gatewayConfig wires a live gateway. +type gatewayConfig struct { + Identity cs.Identity + NatsURL string + CtrlURL string + CtrlURLs []string + NatsURLs []string + CAPath string // bus CA; empty => plaintext dev connection (matches a loopback membershipd) +} + +// newGateway connects the unibus client with the operator identity following the +// same posture seam every peer uses: a non-empty CA path means TLS + nkey, empty +// means plaintext dev. When a CA is configured the bus is assumed to enforce a +// per-subject ACL, so membership changes trigger a session refresh. +func newGateway(cfg gatewayConfig) (*gateway, error) { + opts := client.Options{ + CtrlURLs: cfg.CtrlURLs, + NatsServers: cfg.NatsURLs, + } + if cfg.CAPath != "" { + tlsCfg, err := busauth.LoadCATLSConfig(cfg.CAPath) + if err != nil { + return nil, fmt.Errorf("webgw: load bus CA %q: %w", cfg.CAPath, err) + } + opts.UseNkey = true + opts.TLS = tlsCfg + opts.CtrlTLS = tlsCfg + } + cli, err := client.NewWithOptions(cfg.NatsURL, cfg.CtrlURL, cfg.Identity, opts) + if err != nil { + return nil, fmt.Errorf("webgw: connect bus client: %w", err) + } + return &gateway{ + id: cfg.Identity, + endpoint: frame.EndpointID(cfg.Identity.SignPub), + cli: cli, + refreshACL: cfg.CAPath != "", + hubs: map[string]*roomHub{}, + }, nil +} + +// Close stops every hub and releases the bus client connection. +func (g *gateway) Close() error { + g.mu.Lock() + for _, h := range g.hubs { + h.stop() + } + g.hubs = map[string]*roomHub{} + g.mu.Unlock() + if g.cli != nil { + return g.cli.Close() + } + return nil +} + +// ---- wire types (browser-facing JSON) ------------------------------------ + +// meInfo is what GET /api/me returns: the operator identity the gateway acts as. +type meInfo struct { + Endpoint string `json:"endpoint"` + SignPub string `json:"sign_pub"` +} + +// roomWire is the browser view of a room. It deliberately omits messages: those +// stream over SSE (GET /api/rooms/{id}/stream), not in the room list. +type roomWire struct { + ID string `json:"id"` + Subject string `json:"subject"` + Name string `json:"name"` + Epoch int `json:"epoch"` + Encrypt bool `json:"encrypt"` + Persist bool `json:"persist"` + SignMsgs bool `json:"sign_msgs"` + Role string `json:"role"` +} + +// createRoomReq is the POST /api/rooms body. Encrypt/Persist/SignMsgs are +// pointers so an omitted field falls back to the chat default rather than to the +// Go zero value (false). The common case — the browser sending only {subject, +// encrypted} — maps encrypted onto all three (the Matrix-like chat policy). +type createRoomReq struct { + Subject string `json:"subject"` + Encrypted *bool `json:"encrypted,omitempty"` + Encrypt *bool `json:"encrypt,omitempty"` + Persist *bool `json:"persist,omitempty"` + SignMsgs *bool `json:"sign_msgs,omitempty"` +} + +// policy resolves the requested policy. A bare {subject} defaults to the +// Matrix-like chat room (encrypted + persisted + signed) so a created room keeps +// durable, end-to-end-encrypted, authored history. Callers can override any leg. +func (r createRoomReq) policy() room.Policy { + enc, per, sig := true, true, true + if r.Encrypted != nil { + enc, per, sig = *r.Encrypted, *r.Encrypted, *r.Encrypted + } + if r.Encrypt != nil { + enc = *r.Encrypt + } + if r.Persist != nil { + per = *r.Persist + } + if r.SignMsgs != nil { + sig = *r.SignMsgs + } + return room.Policy{Encrypt: enc, Persist: per, SignMsgs: sig} +} + +// sendReq is the POST /api/rooms/{id}/send body. +type sendReq struct { + Body string `json:"body"` +} + +// msgWire is one decrypted message pushed over SSE. +type msgWire struct { + ID string `json:"id"` + Sender string `json:"sender"` + Body string `json:"body"` + TS int64 `json:"ts"` // epoch ms (decoded from the frame's ULID id) + Mine bool `json:"mine"` +} + +// ---- operations ----------------------------------------------------------- + +func (g *gateway) me() meInfo { + return meInfo{Endpoint: g.endpoint, SignPub: hex.EncodeToString(g.id.SignPub)} +} + +// subjectName derives a short, human-friendly room name from its bus subject by +// dropping the leading namespace segment (room., test., proc., agent.). It is a +// display nicety only; the canonical identity stays the subject/room id. +func subjectName(subject string) string { + for _, p := range []string{"room.", "test.", "proc.", "agent.", "rpc."} { + if strings.HasPrefix(subject, p) { + return strings.TrimPrefix(subject, p) + } + } + return subject +} + +func (g *gateway) listRooms() ([]roomWire, error) { + rooms, err := g.cli.ListMyRooms() + if err != nil { + return nil, err + } + out := make([]roomWire, 0, len(rooms)) + for _, rm := range rooms { + out = append(out, roomWire{ + ID: rm.RoomID, + Subject: rm.Subject, + Name: subjectName(rm.Subject), + Epoch: rm.Epoch, + Encrypt: rm.Policy.Encrypt, + Persist: rm.Policy.Persist, + SignMsgs: rm.Policy.SignMsgs, + Role: rm.Role, + }) + } + return out, nil +} + +func (g *gateway) createRoom(req createRoomReq) (roomWire, error) { + subject := strings.TrimSpace(req.Subject) + if subject == "" { + return roomWire{}, fmt.Errorf("webgw: subject required") + } + p := req.policy() + roomID, err := g.cli.CreateRoom(subject, p) + if err != nil { + return roomWire{}, err + } + // Under a per-subject ACL the operator's frozen NATS permissions do not yet + // cover the new room's subject; refresh so subsequent data-plane use works. On + // a plaintext/non-ACL dev bus this is unnecessary and would needlessly drop any + // live SSE subscriptions, so it is gated on the secured posture. + if g.refreshACL { + _ = g.cli.RefreshSession() + } + return roomWire{ + ID: roomID, + Subject: subject, + Name: subjectName(subject), + Epoch: 1, + Encrypt: p.Encrypt, + Persist: p.Persist, + SignMsgs: p.SignMsgs, + Role: "owner", + }, nil +} + +// join resolves room metadata and (for encrypted rooms) fetches the room key so +// the gateway can later open payloads. Idempotent. +func (g *gateway) join(roomID string) error { + if err := g.cli.Join(roomID); err != nil { + return err + } + if g.refreshACL { + _ = g.cli.RefreshSession() + } + return nil +} + +// send publishes plaintext to a room. The unibus client seals it with the room +// key (encrypted rooms) and signs it (signed rooms) before it leaves the process. +func (g *gateway) send(roomID, body string) error { + return g.cli.Publish(roomID, []byte(body)) +} diff --git a/cmd/webgw/hub.go b/cmd/webgw/hub.go new file mode 100644 index 00000000..523dea03 --- /dev/null +++ b/cmd/webgw/hub.go @@ -0,0 +1,140 @@ +package main + +import ( + "sync" + + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/oklog/ulid/v2" +) + +// roomHub multiplexes ONE unibus room subscription to MANY SSE clients. The +// unibus client derives a per-(room, endpoint) durable consumer name, so a +// second Subscribe for the same room from the same operator would contend for +// the same durable (load-balanced delivery) rather than each browser receiving +// every message. The hub holds a single subscription per room and fans each +// decrypted frame out to every connected browser, which also means the gateway +// opens at most one bus subscription per room regardless of how many tabs watch +// it. +type roomHub struct { + roomID string + myEndpoint string + sub *client.Sub + + mu sync.Mutex + clients map[chan msgWire]struct{} +} + +// frameTS decodes the millisecond timestamp embedded in a frame's ULID id. A +// malformed id (should not happen for bus-produced frames) yields 0, which the +// browser renders without crashing. +func frameTS(msgID string) int64 { + id, err := ulid.Parse(msgID) + if err != nil { + return 0 + } + return int64(id.Time()) +} + +// newRoomHub opens the single bus subscription for roomID and starts fanning +// decrypted frames out to registered clients. The room must already be joined +// (so the gateway holds the room key) before this is called. +func newRoomHub(cli *client.Client, roomID, myEndpoint string) (*roomHub, error) { + h := &roomHub{ + roomID: roomID, + myEndpoint: myEndpoint, + clients: map[chan msgWire]struct{}{}, + } + sub, err := cli.Subscribe(roomID, func(f frame.Frame, plaintext []byte) { + m := msgWire{ + ID: f.MsgID, + Sender: f.Sender, + Body: string(plaintext), + TS: frameTS(f.MsgID), + Mine: f.Sender == myEndpoint, + } + h.broadcast(m) + }) + if err != nil { + return nil, err + } + h.sub = sub + return h, nil +} + +// broadcast delivers a message to every registered client without blocking the +// NATS delivery goroutine: a client whose buffer is full (a stalled browser) +// drops this frame rather than stalling the whole room. +func (h *roomHub) broadcast(m msgWire) { + h.mu.Lock() + defer h.mu.Unlock() + for ch := range h.clients { + select { + case ch <- m: + default: + } + } +} + +// add registers a new SSE client channel. +func (h *roomHub) add(ch chan msgWire) { + h.mu.Lock() + defer h.mu.Unlock() + h.clients[ch] = struct{}{} +} + +// stop unsubscribes from the bus. Local delivery ends; for a persisted room the +// durable consumer's ack position stays on the server, so a later subscription +// with the same operator resumes from where it left off. +func (h *roomHub) stop() { + if h.sub != nil { + _ = h.sub.Unsubscribe() + } +} + +// openStream joins the room (idempotent; fetches the room key for encrypted +// rooms), attaches an SSE client to the room's hub (creating it on first watcher), +// and returns the client's message channel plus a cleanup func. The cleanup +// detaches the client and, when it was the last watcher, tears down the room's +// single bus subscription. +func (g *gateway) openStream(roomID string) (chan msgWire, func(), error) { + if err := g.join(roomID); err != nil { + return nil, nil, err + } + g.mu.Lock() + h := g.hubs[roomID] + if h == nil { + var err error + h, err = newRoomHub(g.cli, roomID, g.endpoint) + if err != nil { + g.mu.Unlock() + return nil, nil, err + } + g.hubs[roomID] = h + } + g.mu.Unlock() + + // Buffer so a brief render hitch in the browser does not drop live frames; a + // sustained stall still drops (broadcast is non-blocking) rather than wedging + // the room. + ch := make(chan msgWire, 64) + h.add(ch) + + // cleanup takes g.mu before h.mu (the single, consistent lock order) so a + // concurrent openStream that re-creates the hub cannot race the teardown. + cleanup := func() { + g.mu.Lock() + defer g.mu.Unlock() + h.mu.Lock() + delete(h.clients, ch) + empty := len(h.clients) == 0 + h.mu.Unlock() + if empty { + if cur := g.hubs[roomID]; cur == h { + delete(g.hubs, roomID) + h.stop() + } + } + } + return ch, cleanup, nil +} diff --git a/cmd/webgw/identity.go b/cmd/webgw/identity.go new file mode 100644 index 00000000..41ee3b08 --- /dev/null +++ b/cmd/webgw/identity.go @@ -0,0 +1,98 @@ +package main + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "os" + "os/exec" + + cs "fn-registry/functions/cybersecurity" +) + +// identityJSON mirrors the on-disk / pass-stored identity format shared across +// the unibus tooling: the four keypair halves, each std-base64. It is the SAME +// shape the bus client persists (pkg/client identity file) and the operator's +// `pass` entry unibus/operator-identity, so the web gateway loads the operator's +// identity without a divergent serialization. Kept in lockstep with +// unibus_admin/internal/admin/identity.go. +type identityJSON struct { + SignPub string `json:"sign_pub"` + SignPriv string `json:"sign_priv"` + KexPub string `json:"kex_pub"` + KexPriv string `json:"kex_priv"` +} + +// decodeIdentity turns the JSON identity bytes into a cs.Identity. The private +// halves stay only in memory; this never writes them anywhere. +func decodeIdentity(raw []byte) (cs.Identity, error) { + var f identityJSON + if err := json.Unmarshal(raw, &f); err != nil { + return cs.Identity{}, fmt.Errorf("webgw: parse identity json: %w", err) + } + dec := base64.StdEncoding.DecodeString + signPub, err := dec(f.SignPub) + if err != nil { + return cs.Identity{}, fmt.Errorf("webgw: decode sign_pub: %w", err) + } + signPriv, err := dec(f.SignPriv) + if err != nil { + return cs.Identity{}, fmt.Errorf("webgw: decode sign_priv: %w", err) + } + kexPub, err := dec(f.KexPub) + if err != nil { + return cs.Identity{}, fmt.Errorf("webgw: decode kex_pub: %w", err) + } + kexPriv, err := dec(f.KexPriv) + if err != nil { + return cs.Identity{}, fmt.Errorf("webgw: decode kex_priv: %w", err) + } + if len(signPub) != 32 || len(signPriv) != 64 || len(kexPub) != 32 || len(kexPriv) != 32 { + return cs.Identity{}, fmt.Errorf("webgw: identity has wrong key sizes (sign_pub=%d sign_priv=%d kex_pub=%d kex_priv=%d)", + len(signPub), len(signPriv), len(kexPub), len(kexPriv)) + } + return cs.Identity{SignPub: signPub, SignPriv: signPriv, KexPub: kexPub, KexPriv: kexPriv}, nil +} + +// loadIdentityFromFile reads a 0600 identity JSON file (the same format the bus +// client writes) and decodes it. Used on a deploy host where `pass` is not +// available and the operator identity is delivered as a protected file. +func loadIdentityFromFile(path string) (cs.Identity, error) { + raw, err := os.ReadFile(path) + if err != nil { + return cs.Identity{}, fmt.Errorf("webgw: read identity file %q: %w", path, err) + } + return decodeIdentity(raw) +} + +// loadIdentityFromPass shells out to `pass show ` and decodes the JSON +// identity it returns. The secret is held only in memory; this process never +// writes it to disk or argv. Used in local operator workflows where the GNU +// password store holds unibus/operator-identity. +func loadIdentityFromPass(entry string) (cs.Identity, error) { + out, err := exec.Command("pass", "show", entry).Output() + if err != nil { + return cs.Identity{}, fmt.Errorf("webgw: pass show %q: %w", entry, err) + } + return decodeIdentity(out) +} + +// loadPassValue returns the first line of a `pass show ` for non-identity +// secrets (e.g. the unlock passphrase). Empty entry yields an empty string and +// no error, so callers can treat "no pass entry configured" as "not set". +func loadPassValue(entry string) (string, error) { + if entry == "" { + return "", nil + } + out, err := exec.Command("pass", "show", entry).Output() + if err != nil { + return "", fmt.Errorf("webgw: pass show %q: %w", entry, err) + } + s := string(out) + for i := 0; i < len(s); i++ { + if s[i] == '\n' || s[i] == '\r' { + return s[:i], nil + } + } + return s, nil +} diff --git a/cmd/webgw/main.go b/cmd/webgw/main.go new file mode 100644 index 00000000..c34181dd --- /dev/null +++ b/cmd/webgw/main.go @@ -0,0 +1,180 @@ +// Command webgw is the web gateway for the unibus chat SPA. It is a single Go +// binary that holds the operator's bus identity, connects to the bus as a real +// authenticated peer (pkg/client), and exposes a small REST + SSE API the +// browser consumes. The browser never signs, never speaks NATS, and never sees a +// private key: it authenticates to the gateway with a passphrase and thereafter +// holds only an opaque session cookie. +// +// TRUST MODEL (MVP, single operator): room content stays end-to-end encrypted on +// the bus. The gateway can read plaintext because it acts AS the operator's +// client — a legitimate member of each room holding the room key. Decryption +// happens server-side in this process; cleartext then crosses an authenticated +// (loopback or TLS-fronted) SSE channel to the browser. The wallet phase (issue: +// per-browser WebCrypto identity) can move decryption into the browser; see the +// report for the FASE 2 plan. +// +// # local dev against a loopback membershipd (plaintext), operator from pass: +// webgw --identity-pass unibus/operator-identity \ +// --ctrl-url http://127.0.0.1:8470 --nats-url nats://127.0.0.1:4250 +// +// # secured cluster (TLS + nkey on both planes), identity from a 0600 file: +// webgw --ca ca.crt --identity-file operator.id \ +// --ctrl-url https://node-a:8470 --nats-url nats://node-a:4250 \ +// --ctrl-urls https://node-b:8470,https://node-c:8470 \ +// --nats-urls nats://node-b:4250,nats://node-c:4250 +package main + +import ( + "context" + "flag" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + cs "fn-registry/functions/cybersecurity" +) + +func main() { + var ( + bind = flag.String("bind", "127.0.0.1", "interface to bind the gateway HTTP server to (loopback by default)") + port = flag.String("port", "8481", "gateway HTTP port") + ctrlURL = flag.String("ctrl-url", "http://127.0.0.1:8470", "primary unibus control-plane base URL") + ctrlURLs = flag.String("ctrl-urls", "", "comma-separated ADDITIONAL control-plane base URLs (cluster failover)") + natsURL = flag.String("nats-url", "nats://127.0.0.1:4250", "primary NATS URL") + natsURLs = flag.String("nats-urls", "", "comma-separated ADDITIONAL NATS seed URLs (cluster failover)") + caPath = flag.String("ca", "", "bus CA cert path; set to talk TLS+nkey to a secured bus (empty = plaintext dev)") + identityFile = flag.String("identity-file", "", "path to the operator identity JSON file (0600). Mutually exclusive with --identity-pass") + identityPass = flag.String("identity-pass", "", "pass(1) entry holding the operator identity JSON, e.g. unibus/operator-identity") + unlockPass = flag.String("unlock-pass", "", "literal passphrase the browser must send to unlock a session (dev). Prefer --unlock-pass-entry") + unlockEntry = flag.String("unlock-pass-entry", "unibus/admin-panel-password", "pass(1) entry holding the unlock passphrase (used when --unlock-pass is empty)") + webDir = flag.String("web-dir", "", "OPTIONAL path to the built SPA (web/dist) to serve. Empty = API only (use vite dev server)") + ) + flag.Parse() + + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[webgw] ") + + id, err := loadIdentity(*identityFile, *identityPass) + if err != nil { + log.Fatalf("%v", err) + } + + unlock := *unlockPass + if unlock == "" { + unlock, err = loadPassValue(*unlockEntry) + if err != nil { + log.Fatalf("resolve unlock passphrase: %v", err) + } + } + if unlock == "" { + log.Fatalf("an unlock passphrase is required: set --unlock-pass or a non-empty --unlock-pass-entry (default unibus/admin-panel-password)") + } + + resolvedWebDir := resolveWebDir(*webDir) + + gw, err := newGateway(gatewayConfig{ + Identity: id, + NatsURL: *natsURL, + CtrlURL: *ctrlURL, + CtrlURLs: splitCSV(*ctrlURLs), + NatsURLs: splitCSV(*natsURLs), + CAPath: *caPath, + }) + if err != nil { + log.Fatalf("%v", err) + } + defer gw.Close() + + log.Printf("operator endpoint: %s", gw.endpoint) + log.Printf("control plane: %s (+%d failover)", *ctrlURL, len(splitCSV(*ctrlURLs))) + tls := "OFF (plaintext dev)" + if *caPath != "" { + tls = "ON (CA " + *caPath + ")" + } + log.Printf("bus TLS+nkey: %s", tls) + if resolvedWebDir != "" { + log.Printf("serving SPA from: %s", resolvedWebDir) + } else { + log.Printf("API only (no --web-dir): use the vite dev server with a /api+stream proxy") + } + + srv := newServer(gw, unlock, resolvedWebDir) + addr := *bind + ":" + *port + httpSrv := &http.Server{ + Addr: addr, + Handler: srv, + // No global write timeout: SSE streams are long-lived. Header timeout still + // bounds slowloris on the request line/headers. + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + log.Printf("web gateway: http://%s", addr) + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("http server: %v", err) + } + }() + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + log.Printf("shutting down...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = httpSrv.Shutdown(ctx) + log.Printf("bye") +} + +// loadIdentity resolves the operator identity from exactly one of --identity-file +// or --identity-pass. +func loadIdentity(file, passEntry string) (cs.Identity, error) { + switch { + case file != "" && passEntry != "": + return cs.Identity{}, errFlag("set only one of --identity-file or --identity-pass") + case file != "": + return loadIdentityFromFile(file) + case passEntry != "": + return loadIdentityFromPass(passEntry) + default: + return cs.Identity{}, errFlag("an identity is required: pass --identity-file or --identity-pass ") + } +} + +// resolveWebDir validates the --web-dir flag. An empty flag means API-only. A +// non-empty dir is kept only if it actually holds an index.html, so a typo logs +// "API only" rather than serving 404s. +func resolveWebDir(dir string) string { + if dir == "" { + return "" + } + abs, err := filepath.Abs(dir) + if err != nil { + log.Printf("WARN --web-dir %q: %v; serving API only", dir, err) + return "" + } + if !statFile(filepath.Join(abs, "index.html")) { + log.Printf("WARN --web-dir %q has no index.html; serving API only", abs) + return "" + } + return abs +} + +type flagErr string + +func (e flagErr) Error() string { return string(e) } +func errFlag(s string) error { return flagErr("webgw: " + s) } + +func splitCSV(s string) []string { + var out []string + for _, p := range strings.Split(s, ",") { + if p = strings.TrimSpace(p); p != "" { + out = append(out, p) + } + } + return out +} diff --git a/cmd/webgw/server.go b/cmd/webgw/server.go new file mode 100644 index 00000000..7277b48f --- /dev/null +++ b/cmd/webgw/server.go @@ -0,0 +1,301 @@ +package main + +import ( + "crypto/rand" + "crypto/subtle" + "encoding/hex" + "encoding/json" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +// sessionCookie is the name of the gateway's session cookie. The browser sends +// it automatically on same-origin fetches AND on EventSource (SSE) connections — +// EventSource cannot set custom headers, so a cookie is the only way to +// authenticate the stream. It is HttpOnly so page JS can never read the token. +const sessionCookie = "unibus_session" + +// server is the gateway's HTTP surface: a small REST/SSE API under /api gated by +// a session cookie, plus an optional static file server for the built SPA. The +// gateway's privileged operator identity never leaves the process; the browser +// authenticates with a passphrase and thereafter holds only an opaque session +// token. +type server struct { + gw *gateway + unlock string // passphrase that unlocks a session (compared in constant time) + webDir string // optional path to the built SPA (web/dist); empty = API only + mux *http.ServeMux + + mu sync.Mutex + sessions map[string]time.Time // token -> issued-at +} + +func newServer(gw *gateway, unlock, webDir string) *server { + s := &server{ + gw: gw, + unlock: unlock, + webDir: webDir, + mux: http.NewServeMux(), + sessions: map[string]time.Time{}, + } + s.routes() + return s +} + +func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } + +func (s *server) routes() { + // Liveness, unauthenticated (systemd / deploy smoke). + s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + }) + + // Auth: login is the only /api route reachable without a session. + s.mux.HandleFunc("POST /api/login", s.handleLogin) + s.mux.HandleFunc("POST /api/logout", s.auth(s.handleLogout)) + s.mux.HandleFunc("GET /api/me", s.auth(s.handleMe)) + + s.mux.HandleFunc("GET /api/rooms", s.auth(s.handleListRooms)) + s.mux.HandleFunc("POST /api/rooms", s.auth(s.handleCreateRoom)) + s.mux.HandleFunc("POST /api/rooms/{id}/join", s.auth(s.handleJoin)) + s.mux.HandleFunc("POST /api/rooms/{id}/send", s.auth(s.handleSend)) + s.mux.HandleFunc("GET /api/rooms/{id}/stream", s.auth(s.handleStream)) + + // Everything else is the SPA (when --web-dir is set). Registered last. + if s.webDir != "" { + s.mux.Handle("/", s.spaHandler()) + } +} + +// ---- auth ----------------------------------------------------------------- + +// auth wraps a handler so it runs only with a valid session cookie. A missing or +// unknown token yields 401, which the SPA treats as "show the login screen". +func (s *server) auth(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + c, err := r.Cookie(sessionCookie) + if err != nil || !s.validSession(c.Value) { + writeErr(w, http.StatusUnauthorized, "not authenticated") + return + } + next(w, r) + } +} + +func (s *server) validSession(token string) bool { + if token == "" { + return false + } + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.sessions[token] + return ok +} + +func (s *server) handleLogin(w http.ResponseWriter, r *http.Request) { + var req struct { + Passphrase string `json:"passphrase"` + } + if !decode(w, r, &req) { + return + } + // Constant-time compare so a wrong passphrase cannot be timed character by + // character. An empty configured passphrase never matches (main refuses to + // start without one, so this is defense in depth). + if s.unlock == "" || subtle.ConstantTimeCompare([]byte(req.Passphrase), []byte(s.unlock)) != 1 { + writeErr(w, http.StatusUnauthorized, "wrong passphrase") + return + } + tok := newToken() + s.mu.Lock() + s.sessions[tok] = time.Now() + s.mu.Unlock() + + http.SetCookie(w, &http.Cookie{ + Name: sessionCookie, + Value: tok, + Path: "/", + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + }) + writeJSON(w, http.StatusOK, s.gw.me()) +} + +func (s *server) handleLogout(w http.ResponseWriter, r *http.Request) { + if c, err := r.Cookie(sessionCookie); err == nil { + s.mu.Lock() + delete(s.sessions, c.Value) + s.mu.Unlock() + } + http.SetCookie(w, &http.Cookie{Name: sessionCookie, Value: "", Path: "/", MaxAge: -1, HttpOnly: true}) + writeJSON(w, http.StatusOK, map[string]string{"status": "logged_out"}) +} + +func (s *server) handleMe(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, s.gw.me()) +} + +// ---- rooms ---------------------------------------------------------------- + +func (s *server) handleListRooms(w http.ResponseWriter, _ *http.Request) { + rooms, err := s.gw.listRooms() + if err != nil { + writeErr(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, rooms) +} + +func (s *server) handleCreateRoom(w http.ResponseWriter, r *http.Request) { + var req createRoomReq + if !decode(w, r, &req) { + return + } + rv, err := s.gw.createRoom(req) + if err != nil { + writeErr(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusCreated, rv) +} + +func (s *server) handleJoin(w http.ResponseWriter, r *http.Request) { + if err := s.gw.join(r.PathValue("id")); err != nil { + writeErr(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "joined"}) +} + +func (s *server) handleSend(w http.ResponseWriter, r *http.Request) { + var req sendReq + if !decode(w, r, &req) { + return + } + if strings.TrimSpace(req.Body) == "" { + writeErr(w, http.StatusBadRequest, "body required") + return + } + if err := s.gw.send(r.PathValue("id"), req.Body); err != nil { + writeErr(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "sent"}) +} + +// handleStream is the SSE endpoint: it joins the room, attaches to the room's +// fan-out hub, and streams each decrypted message as a `data:` event. For a +// persisted room the hub's underlying subscription delivers history first +// (scrollback) and then live messages; for an ephemeral room only live messages +// flow. The stream ends when the browser disconnects (ctx cancelled). +func (s *server) handleStream(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + writeErr(w, http.StatusInternalServerError, "streaming unsupported") + return + } + ch, cleanup, err := s.gw.openStream(r.PathValue("id")) + if err != nil { + writeErr(w, http.StatusBadGateway, err.Error()) + return + } + defer cleanup() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering (nginx/caddy) + w.WriteHeader(http.StatusOK) + // An initial comment opens the stream immediately so the browser's + // EventSource fires `onopen` without waiting for the first message. + _, _ = w.Write([]byte(": connected\n\n")) + flusher.Flush() + + ctx := r.Context() + ping := time.NewTicker(25 * time.Second) + defer ping.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ping.C: + // Comment line keeps idle proxies from closing the connection. + if _, err := w.Write([]byte(": ping\n\n")); err != nil { + return + } + flusher.Flush() + case m := <-ch: + b, err := json.Marshal(m) + if err != nil { + continue + } + if _, err := w.Write([]byte("data: " + string(b) + "\n\n")); err != nil { + return + } + flusher.Flush() + } + } +} + +// ---- SPA serving (optional) ----------------------------------------------- + +// spaHandler serves the built SPA from s.webDir. A request for an existing asset +// is served directly; any other path (a client-side route) falls back to +// index.html so the SPA router can take over. /api and /healthz are matched first. +func (s *server) spaHandler() http.Handler { + root := http.Dir(s.webDir) + fileServer := http.FileServer(root) + index := filepath.Join(s.webDir, "index.html") + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + p := strings.TrimPrefix(r.URL.Path, "/") + if p == "" { + http.ServeFile(w, r, index) + return + } + if f, err := root.Open(p); err == nil { + _ = f.Close() + fileServer.ServeHTTP(w, r) + return + } + http.ServeFile(w, r, index) // unknown path -> SPA client-side routing + }) +} + +// ---- helpers -------------------------------------------------------------- + +func newToken() string { + b := make([]byte, 32) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeErr(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]string{"error": msg}) +} + +// decode reads a JSON body into v, writing a 400 and returning false on failure. +func decode(w http.ResponseWriter, r *http.Request, v any) bool { + defer r.Body.Close() + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(v); err != nil { + writeErr(w, http.StatusBadRequest, "bad json: "+err.Error()) + return false + } + return true +} + +// statFile reports whether path exists and is a regular file (used to validate +// --web-dir at startup so a typo surfaces as a clear log line, not 404s later). +func statFile(path string) bool { + fi, err := os.Stat(path) + return err == nil && !fi.IsDir() +}