package client_test import ( "fmt" "net/http/httptest" "path/filepath" "strconv" "strings" "sync" "testing" "time" "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/embeddednats" "github.com/enmanuel/unibus/pkg/frame" "github.com/enmanuel/unibus/pkg/membership" "github.com/enmanuel/unibus/pkg/room" server "github.com/nats-io/nats-server/v2/server" ) // startClusterNode boots a clustered embedded NATS node (auth off, no route TLS: // this test exercises client failover, not route security — that is covered in // pkg/embeddednats). func startClusterNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int) *server.Server { t.Helper() routes := make([]string, 0, len(peerRoutePorts)) for _, p := range peerRoutePorts { routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p)) } ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ StoreDir: t.TempDir(), Host: "127.0.0.1", Port: clientPort, ServerName: name, Cluster: &embeddednats.ClusterConfig{Name: "unibus-failover", Host: "127.0.0.1", Port: routePort, Routes: routes}, }) if err != nil { t.Fatalf("start node %s: %v", name, err) } t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() }) return ns } func waitClusterRoutes(t *testing.T, ns *server.Server) { t.Helper() deadline := time.Now().Add(8 * time.Second) for time.Now().Before(deadline) { if ns.NumRoutes() >= 1 { return } time.Sleep(50 * time.Millisecond) } t.Fatalf("node %q never formed a route", ns.Name()) } // portOf extracts the :port of a nats URL for matching ConnectedServer() (which // may report a different host spelling than ClientURL()). func portOf(natsURL string) string { i := strings.LastIndex(natsURL, ":") if i < 0 { return "" } return natsURL[i+1:] } // TestClientFailoverAcrossNodes is the issue's edge case: a client connected to // node A keeps its session when A is killed — nats.go reconnects it to node B // and it keeps receiving messages published on the surviving node. func TestClientFailoverAcrossNodes(t *testing.T) { rp0, rp1 := freePort(t), freePort(t) p0, p1 := freePort(t), freePort(t) n0 := startClusterNode(t, "n0", p0, rp0, []int{rp1}) n1 := startClusterNode(t, "n1", p1, rp1, []int{rp0}) waitClusterRoutes(t, n0) waitClusterRoutes(t, n1) nodes := map[string]*server.Server{strconv.Itoa(p0): n0, strconv.Itoa(p1): n1} // Control plane: one in-process membershipd (metadata only; the data plane is // the NATS cluster). Auth off keeps the test focused on data-plane failover. dir := t.TempDir() store, err := membership.Open(filepath.Join(dir, "unibus.db")) if err != nil { t.Fatalf("store: %v", err) } t.Cleanup(func() { store.Close() }) blobs, err := blobstore.New(filepath.Join(dir, "blobs")) if err != nil { t.Fatalf("blobs: %v", err) } ctrl := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff)) t.Cleanup(ctrl.Close) url0 := n0.ClientURL() url1 := n1.ClientURL() // A seeds BOTH nodes (failover list); B connects directly to n1. a, err := client.NewWithOptions(url0, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url1}}) if err != nil { t.Fatalf("connect A: %v", err) } defer a.Close() b, err := client.NewWithOptions(url1, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url0}}) if err != nil { t.Fatalf("connect B: %v", err) } defer b.Close() roomID, err := a.CreateRoom("room.failover", room.ModeNATS) if err != nil { t.Fatalf("A create room: %v", err) } var mu sync.Mutex var got []string sub, err := a.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { mu.Lock() got = append(got, string(plaintext)) mu.Unlock() }) if err != nil { t.Fatalf("A subscribe: %v", err) } defer sub.Unsubscribe() time.Sleep(200 * time.Millisecond) // Pre-kill sanity: B publishes, A receives across the cluster. if err := b.Publish(roomID, []byte("before-kill")); err != nil { t.Fatalf("B publish 1: %v", err) } if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "before-kill") }, 3*time.Second) { t.Fatalf("A did not receive the pre-kill message; got %v", snapshot(&mu, &got)) } // Identify and KILL the node A is attached to, forcing a reconnect. attached := a.ConnectedServer() killPort := portOf(attached) victim, ok := nodes[killPort] if !ok { t.Fatalf("A is attached to an unknown node %q (port %q)", attached, killPort) } survivorURL := url1 if killPort == strconv.Itoa(p1) { survivorURL = url0 } victim.Shutdown() victim.WaitForShutdown() // A must reconnect to the surviving node. deadline := time.Now().Add(8 * time.Second) for time.Now().Before(deadline) { if a.IsConnected() && portOf(a.ConnectedServer()) == portOf(survivorURL) { break } time.Sleep(100 * time.Millisecond) } if !a.IsConnected() || portOf(a.ConnectedServer()) != portOf(survivorURL) { t.Fatalf("A did not fail over to the surviving node (now on %q, want port %s)", a.ConnectedServer(), portOf(survivorURL)) } // Make B publish from the surviving node and confirm A still receives — // the session (its subscription) survived the failover. if survivorURL == url0 { // B's primary was n1 (killed); ensure B is on the survivor too. deadline := time.Now().Add(8 * time.Second) for time.Now().Before(deadline) && portOf(b.ConnectedServer()) != portOf(survivorURL) { time.Sleep(100 * time.Millisecond) } } if err := b.Publish(roomID, []byte("after-kill")); err != nil { t.Fatalf("B publish 2: %v", err) } if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "after-kill") }, 6*time.Second) { t.Fatalf("A did not receive a message after failover; got %v", snapshot(&mu, &got)) } } func contains(rs []string, want string) bool { for _, r := range rs { if r == want { return true } } return false }