60d6a86655
Pre-auth DoS hardening (audit H1, Critical). The control-plane middleware read the request body with io.ReadAll before authenticating and with no size cap, so an unauthenticated peer could force the server to buffer an arbitrary body in RAM (the auditor sent 400 MB and watched RSS climb to ~898 MB). - ServeHTTP now caps the buffered body before reading: a per-route ceiling (1 MiB JSON, 16 MiB /blobs) rejects an over-declared Content-Length outright and wraps the body in http.MaxBytesReader so a lying/chunked sender trips at the ceiling instead of unbounded. - handlePutBlob maps the MaxBytesReader cutoff to 413 in every auth mode. - Per-IP token-bucket rate limiter (golang.org/x/time/rate, already in the module graph) sheds floods before auth or body reads. Loopback dev stacks are unaffected (burst >> any single client's rate). Kept in-package as transport glue, not promoted to the registry, mirroring the nonceCache decision in 0003. - membershipd sets http.Server.MaxHeaderBytes and ReadHeaderTimeout. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
149 lines
5.3 KiB
Go
149 lines
5.3 KiB
Go
// Command membershipd is the unibus control-plane service: room metadata,
|
|
// member directory, sealed key distribution, and the media blob store. The data
|
|
// plane is NATS — if --nats-url is empty it starts an embedded nats-server with
|
|
// JetStream so the whole stack runs with `go run` and nothing to install.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
server "github.com/nats-io/nats-server/v2/server"
|
|
|
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
|
"github.com/enmanuel/unibus/pkg/busauth"
|
|
"github.com/enmanuel/unibus/pkg/embeddednats"
|
|
"github.com/enmanuel/unibus/pkg/membership"
|
|
)
|
|
|
|
func main() {
|
|
// Subcommand dispatch: `membershipd user ...` is the local administration CLI
|
|
// (seed/list/revoke bus users) and must be handled before the server flag set
|
|
// parses os.Args. Running the CLI on the bus host is trusted by design (whoever
|
|
// has a shell there already controls the service), which is how the first admin
|
|
// is seeded without a chicken-egg auth problem.
|
|
if len(os.Args) > 1 && os.Args[1] == "user" {
|
|
runUserCLI(os.Args[2:])
|
|
return
|
|
}
|
|
|
|
var (
|
|
bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers")
|
|
natsURL = flag.String("nats-url", "", "external NATS url; empty starts an embedded server")
|
|
httpPort = flag.String("http-port", "8470", "HTTP port for the control-plane API")
|
|
dbPath = flag.String("db", "./local_files/unibus.db", "SQLite database path")
|
|
storeDir = flag.String("store-dir", "./local_files/blobs", "blob store directory")
|
|
natsPort = flag.Int("nats-port", 4250, "embedded NATS listen port (when --nats-url empty)")
|
|
natsStore = flag.String("nats-store", "./local_files/jetstream", "embedded JetStream store dir")
|
|
busAuth = flag.String("bus-auth", "off", "control-plane auth rollout: off|soft|enforce (feature flag bus-auth)")
|
|
tlsCert = flag.String("tls-cert", "", "PATH to the NATS server certificate (deploy/tls/server.crt); enables TLS on the embedded data plane")
|
|
tlsKey = flag.String("tls-key", "", "path to the NATS server private key (deploy/tls/server.key); required with --tls-cert")
|
|
)
|
|
flag.Parse()
|
|
|
|
authMode, err := membership.ParseAuthMode(*busAuth)
|
|
if err != nil {
|
|
log.Fatalf("%v", err)
|
|
}
|
|
|
|
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
|
log.SetPrefix("[membershipd] ")
|
|
|
|
// Control plane store first: the NATS authenticator consults IsAuthorized, so
|
|
// the store must exist before the embedded server starts.
|
|
store, err := membership.Open(*dbPath)
|
|
if err != nil {
|
|
log.Fatalf("open membership store: %v", err)
|
|
}
|
|
defer store.Close()
|
|
log.Printf("membership store: %s", *dbPath)
|
|
|
|
blobs, err := blobstore.New(*storeDir)
|
|
if err != nil {
|
|
log.Fatalf("open blob store: %v", err)
|
|
}
|
|
log.Printf("blob store: %s", *storeDir)
|
|
|
|
// Data plane: embedded or external NATS. For the embedded server, enforce
|
|
// turns on the nkey authenticator (only allowlisted identities may connect)
|
|
// and --tls-cert/--tls-key turn on TLS. An external NATS manages its own
|
|
// auth/TLS, so those flags do not apply to it.
|
|
var ns *server.Server
|
|
natsClientURL := *natsURL
|
|
if natsClientURL == "" {
|
|
cfg := embeddednats.ServerConfig{
|
|
// Bind the embedded NATS to the same interface as the HTTP API so a
|
|
// single --bind flag governs reachability: 127.0.0.1 keeps the whole
|
|
// stack loopback-only; 0.0.0.0 exposes both planes to the LAN.
|
|
StoreDir: *natsStore,
|
|
Host: *bind,
|
|
Port: *natsPort,
|
|
}
|
|
if authMode == membership.AuthEnforce {
|
|
cfg.Auth = busauth.NewNkeyAuthenticator(store.IsAuthorized)
|
|
log.Printf("NATS nkey authentication: ON (enforce)")
|
|
}
|
|
if *tlsCert != "" || *tlsKey != "" {
|
|
if *tlsCert == "" || *tlsKey == "" {
|
|
log.Fatalf("--tls-cert and --tls-key must be set together")
|
|
}
|
|
tlsCfg, err := busauth.ServerTLSConfig(*tlsCert, *tlsKey)
|
|
if err != nil {
|
|
log.Fatalf("load NATS TLS: %v", err)
|
|
}
|
|
cfg.TLS = tlsCfg
|
|
log.Printf("NATS TLS: ON (%s)", *tlsCert)
|
|
}
|
|
ns, err = embeddednats.StartServer(cfg)
|
|
if err != nil {
|
|
log.Fatalf("start embedded nats: %v", err)
|
|
}
|
|
natsClientURL = embeddednats.ClientURL(ns)
|
|
log.Printf("embedded NATS (JetStream) ready: %s", natsClientURL)
|
|
} else {
|
|
log.Printf("using external NATS: %s", natsClientURL)
|
|
}
|
|
|
|
srv := membership.NewServer(store, blobs, authMode)
|
|
log.Printf("control-plane auth: %s", authMode)
|
|
addr := *bind + ":" + *httpPort
|
|
httpSrv := &http.Server{
|
|
Addr: addr,
|
|
Handler: srv,
|
|
// Bound request header size so a peer cannot exhaust memory with huge
|
|
// headers before any body limit applies (the body ceilings live in the
|
|
// membership middleware).
|
|
MaxHeaderBytes: membership.MaxHeaderBytes,
|
|
ReadHeaderTimeout: 10 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
log.Printf("HTTP control-plane API: http://%s", addr)
|
|
log.Printf(" health: http://%s/healthz", addr)
|
|
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("http server: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Graceful shutdown on SIGINT/SIGTERM.
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
|
|
<-stop
|
|
log.Printf("shutting down...")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = httpSrv.Shutdown(ctx)
|
|
if ns != nil {
|
|
ns.Shutdown()
|
|
ns.WaitForShutdown()
|
|
}
|
|
log.Printf("bye")
|
|
}
|