diff --git a/pkg/client/client.go b/pkg/client/client.go index 69de476..bcd7f95 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -51,7 +51,7 @@ type Client struct { endpoint string nc *nats.Conn js jetstream.JetStream // durable plane for rooms with Policy.Persist - ctrlURL string + ctrlURLs []string // control-plane HTTP endpoints, tried in order (failover) http *http.Client mu sync.RWMutex @@ -77,6 +77,33 @@ type Options struct { // secured independently (a test may TLS one and not the other); production // sets both to the same CA via Connect. Nil keeps the control plane plaintext. CtrlTLS *tls.Config + // NatsServers are ADDITIONAL NATS seed URLs for cluster failover (issue + // 0003e), beyond the primary natsURL passed to the constructor. With more + // than one server nats.go reconnects to a surviving node automatically when + // the one a client is attached to dies, so a node loss is transparent. + NatsServers []string + // CtrlURLs are ADDITIONAL control-plane HTTP endpoints (one per node) beyond + // the primary ctrlURL. Each request is tried against them in order until one + // answers, so the control plane survives a node loss too. With the + // decentralized KV store every node serves the same state, so any of them + // can answer any request. + CtrlURLs []string +} + +// dedupNonEmpty returns the input with empty strings dropped and duplicates +// removed, preserving order. Used to build the NATS seed list and control-plane +// list from a primary URL plus optional extras without a redundant entry. +func dedupNonEmpty(in []string) []string { + seen := map[string]bool{} + var out []string + for _, s := range in { + if s == "" || seen[s] { + continue + } + seen[s] = true + out = append(out, s) + } + return out } // New connects to NATS and records the control-plane URL with default Options @@ -116,7 +143,20 @@ func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, e // so every peer (worker, chat, mobile, gateway) gets identical behavior by // passing the same Options. func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Client, error) { - natsOpts := []nats.Option{nats.Name("unibus-client")} + // Seed list = primary + extras. With more than one seed, nats.go fails over + // to a surviving node on disconnect; MaxReconnects(-1) keeps it retrying + // indefinitely so a node coming back is rejoined rather than given up on. + natsServers := dedupNonEmpty(append([]string{natsURL}, opts.NatsServers...)) + natsOpts := []nats.Option{ + nats.Name("unibus-client"), + nats.MaxReconnects(-1), + nats.ReconnectWait(250 * time.Millisecond), + } + if len(natsServers) > 1 { + // Try every seed on the initial connect too, so startup tolerates one + // seed being down. + natsOpts = append(natsOpts, nats.RetryOnFailedConnect(true)) + } if opts.UseNkey { nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv) if err != nil { @@ -127,9 +167,9 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli if opts.TLS != nil { natsOpts = append(natsOpts, nats.Secure(opts.TLS)) } - nc, err := nats.Connect(natsURL, natsOpts...) + nc, err := nats.Connect(strings.Join(natsServers, ","), natsOpts...) if err != nil { - return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err) + return nil, fmt.Errorf("client: connect nats %v: %w", natsServers, err) } // JetStream context for the durable plane. Obtaining it does not require any // stream to exist yet and has no effect on cleartext/ephemeral rooms — those @@ -151,7 +191,7 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli endpoint: frame.EndpointID(id.SignPub), nc: nc, js: js, - ctrlURL: ctrlURL, + ctrlURLs: dedupNonEmpty(append([]string{ctrlURL}, opts.CtrlURLs...)), http: httpClient, keyCache: map[string]map[int][]byte{}, signCache: map[string][]byte{}, @@ -169,6 +209,15 @@ func (c *Client) Close() error { return nil } +// ConnectedServer returns the URL of the NATS node this client is currently +// attached to (empty when disconnected). It is observability for cluster +// failover: after a node dies, this reports the surviving node nats.go +// reconnected to. IsConnected reports whether the data-plane link is up. +func (c *Client) ConnectedServer() string { return c.nc.ConnectedUrl() } + +// IsConnected reports whether the NATS data-plane connection is currently up. +func (c *Client) IsConnected() bool { return c.nc.IsConnected() } + // ---- key cache ------------------------------------------------------------ func (c *Client) cacheKey(roomID string, epoch int, k []byte) { @@ -203,36 +252,45 @@ func (c *Client) doJSON(method, path string, body, out any) error { } bodyBytes = b } - req, err := c.newSignedRequest(method, path, bodyBytes) - if err != nil { - return err - } - if body != nil { - req.Header.Set("Content-Type", "application/json") - } - resp, err := c.http.Do(req) - if err != nil { - return fmt.Errorf("client: do %s %s: %w", method, path, err) - } - defer resp.Body.Close() - respBody, _ := io.ReadAll(resp.Body) - if resp.StatusCode >= 300 { - // Surface the server's structured {"error": "..."} message when present, - // instead of leaking the raw HTTP envelope (method, path, status, JSON body). - var er struct { - Error string `json:"error"` + // Try each control-plane endpoint in order. A transport error (a dead node) + // falls over to the next; an HTTP response (any status) is authoritative and + // returned, since every node serves the same state. Each attempt is freshly + // signed (new nonce), so a failed-over retry is never seen as a replay. + var lastErr error + for _, base := range c.ctrlURLs { + req, err := c.newSignedRequestTo(base, method, path, bodyBytes) + if err != nil { + return err } - if json.Unmarshal(respBody, &er) == nil && er.Error != "" { - return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode) + if body != nil { + req.Header.Set("Content-Type", "application/json") } - return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody)) - } - if out != nil { - if err := json.Unmarshal(respBody, out); err != nil { - return fmt.Errorf("client: decode response: %w", err) + resp, err := c.http.Do(req) + if err != nil { + lastErr = err + continue // dead node: try the next control plane } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 300 { + // Surface the server's structured {"error": "..."} message when present, + // instead of leaking the raw HTTP envelope (method, path, status, body). + var er struct { + Error string `json:"error"` + } + if json.Unmarshal(respBody, &er) == nil && er.Error != "" { + return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode) + } + return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody)) + } + if out != nil { + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("client: decode response: %w", err) + } + } + return nil } - return nil + return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr) } // signRequest signs the canonical bytes of req (req must already have its Sig @@ -246,22 +304,25 @@ func (c *Client) signRequest(req any) []byte { return cs.SignEd25519(c.id.SignPriv, b) } -// newSignedRequest builds an *http.Request to the control plane and attaches the -// transport authentication headers (X-Unibus-Pub/Ts/Nonce/Sig) signing the -// canonical request bytes with this peer's Ed25519 key. path is the request URI -// (path plus any query); body is the raw request body (nil for GET). The server -// (membership.authenticate) verifies these headers under the bus-auth flag. +// newSignedRequestTo builds an *http.Request to the control-plane endpoint +// `base` and attaches the transport authentication headers +// (X-Unibus-Pub/Ts/Nonce/Sig) signing the canonical request bytes with this +// peer's Ed25519 key. path is the request URI (path plus any query); body is the +// raw request body (nil for GET). The server (membership.authenticate) verifies +// these headers under the bus-auth flag. The signature covers method+path+ts+ +// nonce+sha256(body), NOT the host, so the same request can be addressed to any +// node — and each failover attempt mints a fresh nonce so it is never a replay. // // Signing happens on every request — including GETs — so that under enforce the // server can authenticate the caller and reject unregistered or revoked // identities uniformly. The canonical construction is the single source of truth // in membership.CanonicalRequest, shared by both sides. -func (c *Client) newSignedRequest(method, path string, body []byte) (*http.Request, error) { +func (c *Client) newSignedRequestTo(base, method, path string, body []byte) (*http.Request, error) { var rdr io.Reader if body != nil { rdr = bytes.NewReader(body) } - req, err := http.NewRequest(method, c.ctrlURL+path, rdr) + req, err := http.NewRequest(method, base+path, rdr) if err != nil { return nil, fmt.Errorf("client: new request: %w", err) } @@ -887,40 +948,50 @@ func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) { } func (c *Client) putBlob(ciphertext []byte) (string, error) { - req, err := c.newSignedRequest("POST", "/blobs", ciphertext) - if err != nil { - return "", err + var lastErr error + for _, base := range c.ctrlURLs { + req, err := c.newSignedRequestTo(base, "POST", "/blobs", ciphertext) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := c.http.Do(req) + if err != nil { + lastErr = err + continue // dead node: try the next control plane + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 300 { + return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body)) + } + var r blobResp + if err := json.Unmarshal(body, &r); err != nil { + return "", fmt.Errorf("client: decode blob resp: %w", err) + } + return r.Hash, nil } - req.Header.Set("Content-Type", "application/octet-stream") - resp, err := c.http.Do(req) - if err != nil { - return "", fmt.Errorf("client: put blob: %w", err) - } - defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) - if resp.StatusCode >= 300 { - return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body)) - } - var r blobResp - if err := json.Unmarshal(body, &r); err != nil { - return "", fmt.Errorf("client: decode blob resp: %w", err) - } - return r.Hash, nil + return "", fmt.Errorf("client: put blob: all control planes failed: %w", lastErr) } func (c *Client) getBlob(hash string) ([]byte, error) { - req, err := c.newSignedRequest("GET", "/blobs/"+hash, nil) - if err != nil { - return nil, err + var lastErr error + for _, base := range c.ctrlURLs { + req, err := c.newSignedRequestTo(base, "GET", "/blobs/"+hash, nil) + if err != nil { + return nil, err + } + resp, err := c.http.Do(req) + if err != nil { + lastErr = err + continue // dead node: try the next control plane + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body)) + } + return io.ReadAll(resp.Body) } - resp, err := c.http.Do(req) - if err != nil { - return nil, fmt.Errorf("client: get blob: %w", err) - } - defer resp.Body.Close() - if resp.StatusCode >= 300 { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body)) - } - return io.ReadAll(resp.Body) + return nil, fmt.Errorf("client: get blob: all control planes failed: %w", lastErr) } diff --git a/pkg/client/failover_test.go b/pkg/client/failover_test.go new file mode 100644 index 0000000..da6f5cf --- /dev/null +++ b/pkg/client/failover_test.go @@ -0,0 +1,185 @@ +package client_test + +import ( + "fmt" + "net/http/httptest" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/membership" + "github.com/enmanuel/unibus/pkg/room" + server "github.com/nats-io/nats-server/v2/server" +) + +// startClusterNode boots a clustered embedded NATS node (auth off, no route TLS: +// this test exercises client failover, not route security — that is covered in +// pkg/embeddednats). +func startClusterNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int) *server.Server { + t.Helper() + routes := make([]string, 0, len(peerRoutePorts)) + for _, p := range peerRoutePorts { + routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p)) + } + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), + Host: "127.0.0.1", + Port: clientPort, + ServerName: name, + Cluster: &embeddednats.ClusterConfig{Name: "unibus-failover", Host: "127.0.0.1", Port: routePort, Routes: routes}, + }) + if err != nil { + t.Fatalf("start node %s: %v", name, err) + } + t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) + return ns +} + +func waitClusterRoutes(t *testing.T, ns *server.Server) { + t.Helper() + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if ns.NumRoutes() >= 1 { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("node %q never formed a route", ns.Name()) +} + +// portOf extracts the :port of a nats URL for matching ConnectedServer() (which +// may report a different host spelling than ClientURL()). +func portOf(natsURL string) string { + i := strings.LastIndex(natsURL, ":") + if i < 0 { + return "" + } + return natsURL[i+1:] +} + +// TestClientFailoverAcrossNodes is the issue's edge case: a client connected to +// node A keeps its session when A is killed — nats.go reconnects it to node B +// and it keeps receiving messages published on the surviving node. +func TestClientFailoverAcrossNodes(t *testing.T) { + rp0, rp1 := freePort(t), freePort(t) + p0, p1 := freePort(t), freePort(t) + n0 := startClusterNode(t, "n0", p0, rp0, []int{rp1}) + n1 := startClusterNode(t, "n1", p1, rp1, []int{rp0}) + waitClusterRoutes(t, n0) + waitClusterRoutes(t, n1) + nodes := map[string]*server.Server{strconv.Itoa(p0): n0, strconv.Itoa(p1): n1} + + // Control plane: one in-process membershipd (metadata only; the data plane is + // the NATS cluster). Auth off keeps the test focused on data-plane failover. + dir := t.TempDir() + store, err := membership.Open(filepath.Join(dir, "unibus.db")) + if err != nil { + t.Fatalf("store: %v", err) + } + t.Cleanup(func() { store.Close() }) + blobs, err := blobstore.New(filepath.Join(dir, "blobs")) + if err != nil { + t.Fatalf("blobs: %v", err) + } + ctrl := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff)) + t.Cleanup(ctrl.Close) + + url0 := n0.ClientURL() + url1 := n1.ClientURL() + + // A seeds BOTH nodes (failover list); B connects directly to n1. + a, err := client.NewWithOptions(url0, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url1}}) + if err != nil { + t.Fatalf("connect A: %v", err) + } + defer a.Close() + b, err := client.NewWithOptions(url1, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url0}}) + if err != nil { + t.Fatalf("connect B: %v", err) + } + defer b.Close() + + roomID, err := a.CreateRoom("room.failover", room.ModeNATS) + if err != nil { + t.Fatalf("A create room: %v", err) + } + + var mu sync.Mutex + var got []string + sub, err := a.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { + mu.Lock() + got = append(got, string(plaintext)) + mu.Unlock() + }) + if err != nil { + t.Fatalf("A subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(200 * time.Millisecond) + + // Pre-kill sanity: B publishes, A receives across the cluster. + if err := b.Publish(roomID, []byte("before-kill")); err != nil { + t.Fatalf("B publish 1: %v", err) + } + if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "before-kill") }, 3*time.Second) { + t.Fatalf("A did not receive the pre-kill message; got %v", snapshot(&mu, &got)) + } + + // Identify and KILL the node A is attached to, forcing a reconnect. + attached := a.ConnectedServer() + killPort := portOf(attached) + victim, ok := nodes[killPort] + if !ok { + t.Fatalf("A is attached to an unknown node %q (port %q)", attached, killPort) + } + survivorURL := url1 + if killPort == strconv.Itoa(p1) { + survivorURL = url0 + } + victim.Shutdown() + victim.WaitForShutdown() + + // A must reconnect to the surviving node. + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if a.IsConnected() && portOf(a.ConnectedServer()) == portOf(survivorURL) { + break + } + time.Sleep(100 * time.Millisecond) + } + if !a.IsConnected() || portOf(a.ConnectedServer()) != portOf(survivorURL) { + t.Fatalf("A did not fail over to the surviving node (now on %q, want port %s)", a.ConnectedServer(), portOf(survivorURL)) + } + + // Make B publish from the surviving node and confirm A still receives — + // the session (its subscription) survived the failover. + if survivorURL == url0 { + // B's primary was n1 (killed); ensure B is on the survivor too. + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) && portOf(b.ConnectedServer()) != portOf(survivorURL) { + time.Sleep(100 * time.Millisecond) + } + } + if err := b.Publish(roomID, []byte("after-kill")); err != nil { + t.Fatalf("B publish 2: %v", err) + } + if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "after-kill") }, 6*time.Second) { + t.Fatalf("A did not receive a message after failover; got %v", snapshot(&mu, &got)) + } +} + +func contains(rs []string, want string) bool { + for _, r := range rs { + if r == want { + return true + } + } + return false +}