feat(0003e/1): client failover over a list of seeds and control planes
The client (issue 0003e, part 1) accepts a LIST of NATS seeds and a LIST of control-plane URLs so a node loss is transparent. pkg/client: - Options.NatsServers: extra NATS seeds beyond the primary. The client connects to the joined seed list with MaxReconnects(-1) + RetryOnFailedConnect, so nats.go fails over to a surviving node when the one a client is attached to dies and rejoins a node that comes back. - Options.CtrlURLs: extra control-plane endpoints. doJSON/putBlob/getBlob now try each endpoint in order, falling over on a transport error to the next (an HTTP response from any node is authoritative — every node serves the same state under the KV store). newSignedRequest becomes newSignedRequestTo(base, ...); each failover attempt mints a fresh nonce (the signature covers method+path+ts+nonce+body, not the host), so a retried request is never seen as a replay. - ConnectedServer()/IsConnected(): observability for which node the data plane is attached to, for ops and failover tests. - New/Connect/NewWithOptions keep their signatures (a single URL = a one-element list), so worker/chat/mobile/playground are unchanged. Test (DoD edge — the issue's "kill node A" case): - TestClientFailoverAcrossNodes: A seeds two clustered nodes, subscribes, receives a cross-node message; the node A is attached to is KILLED; A reconnects to the survivor and still receives messages — session intact.
This commit is contained in:
+140
-69
@@ -51,7 +51,7 @@ type Client struct {
|
||||
endpoint string
|
||||
nc *nats.Conn
|
||||
js jetstream.JetStream // durable plane for rooms with Policy.Persist
|
||||
ctrlURL string
|
||||
ctrlURLs []string // control-plane HTTP endpoints, tried in order (failover)
|
||||
http *http.Client
|
||||
|
||||
mu sync.RWMutex
|
||||
@@ -77,6 +77,33 @@ type Options struct {
|
||||
// secured independently (a test may TLS one and not the other); production
|
||||
// sets both to the same CA via Connect. Nil keeps the control plane plaintext.
|
||||
CtrlTLS *tls.Config
|
||||
// NatsServers are ADDITIONAL NATS seed URLs for cluster failover (issue
|
||||
// 0003e), beyond the primary natsURL passed to the constructor. With more
|
||||
// than one server nats.go reconnects to a surviving node automatically when
|
||||
// the one a client is attached to dies, so a node loss is transparent.
|
||||
NatsServers []string
|
||||
// CtrlURLs are ADDITIONAL control-plane HTTP endpoints (one per node) beyond
|
||||
// the primary ctrlURL. Each request is tried against them in order until one
|
||||
// answers, so the control plane survives a node loss too. With the
|
||||
// decentralized KV store every node serves the same state, so any of them
|
||||
// can answer any request.
|
||||
CtrlURLs []string
|
||||
}
|
||||
|
||||
// dedupNonEmpty returns the input with empty strings dropped and duplicates
|
||||
// removed, preserving order. Used to build the NATS seed list and control-plane
|
||||
// list from a primary URL plus optional extras without a redundant entry.
|
||||
func dedupNonEmpty(in []string) []string {
|
||||
seen := map[string]bool{}
|
||||
var out []string
|
||||
for _, s := range in {
|
||||
if s == "" || seen[s] {
|
||||
continue
|
||||
}
|
||||
seen[s] = true
|
||||
out = append(out, s)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// New connects to NATS and records the control-plane URL with default Options
|
||||
@@ -116,7 +143,20 @@ func Connect(natsURL, ctrlURL string, id cs.Identity, caPath string) (*Client, e
|
||||
// so every peer (worker, chat, mobile, gateway) gets identical behavior by
|
||||
// passing the same Options.
|
||||
func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Client, error) {
|
||||
natsOpts := []nats.Option{nats.Name("unibus-client")}
|
||||
// Seed list = primary + extras. With more than one seed, nats.go fails over
|
||||
// to a surviving node on disconnect; MaxReconnects(-1) keeps it retrying
|
||||
// indefinitely so a node coming back is rejoined rather than given up on.
|
||||
natsServers := dedupNonEmpty(append([]string{natsURL}, opts.NatsServers...))
|
||||
natsOpts := []nats.Option{
|
||||
nats.Name("unibus-client"),
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectWait(250 * time.Millisecond),
|
||||
}
|
||||
if len(natsServers) > 1 {
|
||||
// Try every seed on the initial connect too, so startup tolerates one
|
||||
// seed being down.
|
||||
natsOpts = append(natsOpts, nats.RetryOnFailedConnect(true))
|
||||
}
|
||||
if opts.UseNkey {
|
||||
nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv)
|
||||
if err != nil {
|
||||
@@ -127,9 +167,9 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli
|
||||
if opts.TLS != nil {
|
||||
natsOpts = append(natsOpts, nats.Secure(opts.TLS))
|
||||
}
|
||||
nc, err := nats.Connect(natsURL, natsOpts...)
|
||||
nc, err := nats.Connect(strings.Join(natsServers, ","), natsOpts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("client: connect nats %q: %w", natsURL, err)
|
||||
return nil, fmt.Errorf("client: connect nats %v: %w", natsServers, err)
|
||||
}
|
||||
// JetStream context for the durable plane. Obtaining it does not require any
|
||||
// stream to exist yet and has no effect on cleartext/ephemeral rooms — those
|
||||
@@ -151,7 +191,7 @@ func NewWithOptions(natsURL, ctrlURL string, id cs.Identity, opts Options) (*Cli
|
||||
endpoint: frame.EndpointID(id.SignPub),
|
||||
nc: nc,
|
||||
js: js,
|
||||
ctrlURL: ctrlURL,
|
||||
ctrlURLs: dedupNonEmpty(append([]string{ctrlURL}, opts.CtrlURLs...)),
|
||||
http: httpClient,
|
||||
keyCache: map[string]map[int][]byte{},
|
||||
signCache: map[string][]byte{},
|
||||
@@ -169,6 +209,15 @@ func (c *Client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectedServer returns the URL of the NATS node this client is currently
|
||||
// attached to (empty when disconnected). It is observability for cluster
|
||||
// failover: after a node dies, this reports the surviving node nats.go
|
||||
// reconnected to. IsConnected reports whether the data-plane link is up.
|
||||
func (c *Client) ConnectedServer() string { return c.nc.ConnectedUrl() }
|
||||
|
||||
// IsConnected reports whether the NATS data-plane connection is currently up.
|
||||
func (c *Client) IsConnected() bool { return c.nc.IsConnected() }
|
||||
|
||||
// ---- key cache ------------------------------------------------------------
|
||||
|
||||
func (c *Client) cacheKey(roomID string, epoch int, k []byte) {
|
||||
@@ -203,36 +252,45 @@ func (c *Client) doJSON(method, path string, body, out any) error {
|
||||
}
|
||||
bodyBytes = b
|
||||
}
|
||||
req, err := c.newSignedRequest(method, path, bodyBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if body != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client: do %s %s: %w", method, path, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
if resp.StatusCode >= 300 {
|
||||
// Surface the server's structured {"error": "..."} message when present,
|
||||
// instead of leaking the raw HTTP envelope (method, path, status, JSON body).
|
||||
var er struct {
|
||||
Error string `json:"error"`
|
||||
// Try each control-plane endpoint in order. A transport error (a dead node)
|
||||
// falls over to the next; an HTTP response (any status) is authoritative and
|
||||
// returned, since every node serves the same state. Each attempt is freshly
|
||||
// signed (new nonce), so a failed-over retry is never seen as a replay.
|
||||
var lastErr error
|
||||
for _, base := range c.ctrlURLs {
|
||||
req, err := c.newSignedRequestTo(base, method, path, bodyBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
|
||||
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
|
||||
if body != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
|
||||
}
|
||||
if out != nil {
|
||||
if err := json.Unmarshal(respBody, out); err != nil {
|
||||
return fmt.Errorf("client: decode response: %w", err)
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue // dead node: try the next control plane
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
if resp.StatusCode >= 300 {
|
||||
// Surface the server's structured {"error": "..."} message when present,
|
||||
// instead of leaking the raw HTTP envelope (method, path, status, body).
|
||||
var er struct {
|
||||
Error string `json:"error"`
|
||||
}
|
||||
if json.Unmarshal(respBody, &er) == nil && er.Error != "" {
|
||||
return fmt.Errorf("%s (HTTP %d)", er.Error, resp.StatusCode)
|
||||
}
|
||||
return fmt.Errorf("client: %s %s -> %d: %s", method, path, resp.StatusCode, string(respBody))
|
||||
}
|
||||
if out != nil {
|
||||
if err := json.Unmarshal(respBody, out); err != nil {
|
||||
return fmt.Errorf("client: decode response: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("client: %s %s: all control planes failed: %w", method, path, lastErr)
|
||||
}
|
||||
|
||||
// signRequest signs the canonical bytes of req (req must already have its Sig
|
||||
@@ -246,22 +304,25 @@ func (c *Client) signRequest(req any) []byte {
|
||||
return cs.SignEd25519(c.id.SignPriv, b)
|
||||
}
|
||||
|
||||
// newSignedRequest builds an *http.Request to the control plane and attaches the
|
||||
// transport authentication headers (X-Unibus-Pub/Ts/Nonce/Sig) signing the
|
||||
// canonical request bytes with this peer's Ed25519 key. path is the request URI
|
||||
// (path plus any query); body is the raw request body (nil for GET). The server
|
||||
// (membership.authenticate) verifies these headers under the bus-auth flag.
|
||||
// newSignedRequestTo builds an *http.Request to the control-plane endpoint
|
||||
// `base` and attaches the transport authentication headers
|
||||
// (X-Unibus-Pub/Ts/Nonce/Sig) signing the canonical request bytes with this
|
||||
// peer's Ed25519 key. path is the request URI (path plus any query); body is the
|
||||
// raw request body (nil for GET). The server (membership.authenticate) verifies
|
||||
// these headers under the bus-auth flag. The signature covers method+path+ts+
|
||||
// nonce+sha256(body), NOT the host, so the same request can be addressed to any
|
||||
// node — and each failover attempt mints a fresh nonce so it is never a replay.
|
||||
//
|
||||
// Signing happens on every request — including GETs — so that under enforce the
|
||||
// server can authenticate the caller and reject unregistered or revoked
|
||||
// identities uniformly. The canonical construction is the single source of truth
|
||||
// in membership.CanonicalRequest, shared by both sides.
|
||||
func (c *Client) newSignedRequest(method, path string, body []byte) (*http.Request, error) {
|
||||
func (c *Client) newSignedRequestTo(base, method, path string, body []byte) (*http.Request, error) {
|
||||
var rdr io.Reader
|
||||
if body != nil {
|
||||
rdr = bytes.NewReader(body)
|
||||
}
|
||||
req, err := http.NewRequest(method, c.ctrlURL+path, rdr)
|
||||
req, err := http.NewRequest(method, base+path, rdr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("client: new request: %w", err)
|
||||
}
|
||||
@@ -887,40 +948,50 @@ func (c *Client) FetchMedia(roomID string, f frame.Frame) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (c *Client) putBlob(ciphertext []byte) (string, error) {
|
||||
req, err := c.newSignedRequest("POST", "/blobs", ciphertext)
|
||||
if err != nil {
|
||||
return "", err
|
||||
var lastErr error
|
||||
for _, base := range c.ctrlURLs {
|
||||
req, err := c.newSignedRequestTo(base, "POST", "/blobs", ciphertext)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue // dead node: try the next control plane
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if resp.StatusCode >= 300 {
|
||||
return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
var r blobResp
|
||||
if err := json.Unmarshal(body, &r); err != nil {
|
||||
return "", fmt.Errorf("client: decode blob resp: %w", err)
|
||||
}
|
||||
return r.Hash, nil
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("client: put blob: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if resp.StatusCode >= 300 {
|
||||
return "", fmt.Errorf("client: put blob -> %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
var r blobResp
|
||||
if err := json.Unmarshal(body, &r); err != nil {
|
||||
return "", fmt.Errorf("client: decode blob resp: %w", err)
|
||||
}
|
||||
return r.Hash, nil
|
||||
return "", fmt.Errorf("client: put blob: all control planes failed: %w", lastErr)
|
||||
}
|
||||
|
||||
func (c *Client) getBlob(hash string) ([]byte, error) {
|
||||
req, err := c.newSignedRequest("GET", "/blobs/"+hash, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var lastErr error
|
||||
for _, base := range c.ctrlURLs {
|
||||
req, err := c.newSignedRequestTo(base, "GET", "/blobs/"+hash, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue // dead node: try the next control plane
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 300 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
return io.ReadAll(resp.Body)
|
||||
}
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("client: get blob: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 300 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("client: get blob -> %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
return io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("client: get blob: all control planes failed: %w", lastErr)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,185 @@
|
||||
package client_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/enmanuel/unibus/pkg/blobstore"
|
||||
"github.com/enmanuel/unibus/pkg/client"
|
||||
"github.com/enmanuel/unibus/pkg/embeddednats"
|
||||
"github.com/enmanuel/unibus/pkg/frame"
|
||||
"github.com/enmanuel/unibus/pkg/membership"
|
||||
"github.com/enmanuel/unibus/pkg/room"
|
||||
server "github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
// startClusterNode boots a clustered embedded NATS node (auth off, no route TLS:
|
||||
// this test exercises client failover, not route security — that is covered in
|
||||
// pkg/embeddednats).
|
||||
func startClusterNode(t *testing.T, name string, clientPort, routePort int, peerRoutePorts []int) *server.Server {
|
||||
t.Helper()
|
||||
routes := make([]string, 0, len(peerRoutePorts))
|
||||
for _, p := range peerRoutePorts {
|
||||
routes = append(routes, fmt.Sprintf("nats://127.0.0.1:%d", p))
|
||||
}
|
||||
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
|
||||
StoreDir: t.TempDir(),
|
||||
Host: "127.0.0.1",
|
||||
Port: clientPort,
|
||||
ServerName: name,
|
||||
Cluster: &embeddednats.ClusterConfig{Name: "unibus-failover", Host: "127.0.0.1", Port: routePort, Routes: routes},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start node %s: %v", name, err)
|
||||
}
|
||||
t.Cleanup(func() { ns.Shutdown(); ns.WaitForShutdown() })
|
||||
return ns
|
||||
}
|
||||
|
||||
func waitClusterRoutes(t *testing.T, ns *server.Server) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(8 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if ns.NumRoutes() >= 1 {
|
||||
return
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("node %q never formed a route", ns.Name())
|
||||
}
|
||||
|
||||
// portOf extracts the :port of a nats URL for matching ConnectedServer() (which
|
||||
// may report a different host spelling than ClientURL()).
|
||||
func portOf(natsURL string) string {
|
||||
i := strings.LastIndex(natsURL, ":")
|
||||
if i < 0 {
|
||||
return ""
|
||||
}
|
||||
return natsURL[i+1:]
|
||||
}
|
||||
|
||||
// TestClientFailoverAcrossNodes is the issue's edge case: a client connected to
|
||||
// node A keeps its session when A is killed — nats.go reconnects it to node B
|
||||
// and it keeps receiving messages published on the surviving node.
|
||||
func TestClientFailoverAcrossNodes(t *testing.T) {
|
||||
rp0, rp1 := freePort(t), freePort(t)
|
||||
p0, p1 := freePort(t), freePort(t)
|
||||
n0 := startClusterNode(t, "n0", p0, rp0, []int{rp1})
|
||||
n1 := startClusterNode(t, "n1", p1, rp1, []int{rp0})
|
||||
waitClusterRoutes(t, n0)
|
||||
waitClusterRoutes(t, n1)
|
||||
nodes := map[string]*server.Server{strconv.Itoa(p0): n0, strconv.Itoa(p1): n1}
|
||||
|
||||
// Control plane: one in-process membershipd (metadata only; the data plane is
|
||||
// the NATS cluster). Auth off keeps the test focused on data-plane failover.
|
||||
dir := t.TempDir()
|
||||
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { store.Close() })
|
||||
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
||||
if err != nil {
|
||||
t.Fatalf("blobs: %v", err)
|
||||
}
|
||||
ctrl := httptest.NewServer(membership.NewServer(store, blobs, membership.AuthOff))
|
||||
t.Cleanup(ctrl.Close)
|
||||
|
||||
url0 := n0.ClientURL()
|
||||
url1 := n1.ClientURL()
|
||||
|
||||
// A seeds BOTH nodes (failover list); B connects directly to n1.
|
||||
a, err := client.NewWithOptions(url0, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url1}})
|
||||
if err != nil {
|
||||
t.Fatalf("connect A: %v", err)
|
||||
}
|
||||
defer a.Close()
|
||||
b, err := client.NewWithOptions(url1, ctrl.URL, mustIdentity(t), client.Options{NatsServers: []string{url0}})
|
||||
if err != nil {
|
||||
t.Fatalf("connect B: %v", err)
|
||||
}
|
||||
defer b.Close()
|
||||
|
||||
roomID, err := a.CreateRoom("room.failover", room.ModeNATS)
|
||||
if err != nil {
|
||||
t.Fatalf("A create room: %v", err)
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var got []string
|
||||
sub, err := a.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) {
|
||||
mu.Lock()
|
||||
got = append(got, string(plaintext))
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("A subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Pre-kill sanity: B publishes, A receives across the cluster.
|
||||
if err := b.Publish(roomID, []byte("before-kill")); err != nil {
|
||||
t.Fatalf("B publish 1: %v", err)
|
||||
}
|
||||
if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "before-kill") }, 3*time.Second) {
|
||||
t.Fatalf("A did not receive the pre-kill message; got %v", snapshot(&mu, &got))
|
||||
}
|
||||
|
||||
// Identify and KILL the node A is attached to, forcing a reconnect.
|
||||
attached := a.ConnectedServer()
|
||||
killPort := portOf(attached)
|
||||
victim, ok := nodes[killPort]
|
||||
if !ok {
|
||||
t.Fatalf("A is attached to an unknown node %q (port %q)", attached, killPort)
|
||||
}
|
||||
survivorURL := url1
|
||||
if killPort == strconv.Itoa(p1) {
|
||||
survivorURL = url0
|
||||
}
|
||||
victim.Shutdown()
|
||||
victim.WaitForShutdown()
|
||||
|
||||
// A must reconnect to the surviving node.
|
||||
deadline := time.Now().Add(8 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if a.IsConnected() && portOf(a.ConnectedServer()) == portOf(survivorURL) {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if !a.IsConnected() || portOf(a.ConnectedServer()) != portOf(survivorURL) {
|
||||
t.Fatalf("A did not fail over to the surviving node (now on %q, want port %s)", a.ConnectedServer(), portOf(survivorURL))
|
||||
}
|
||||
|
||||
// Make B publish from the surviving node and confirm A still receives —
|
||||
// the session (its subscription) survived the failover.
|
||||
if survivorURL == url0 {
|
||||
// B's primary was n1 (killed); ensure B is on the survivor too.
|
||||
deadline := time.Now().Add(8 * time.Second)
|
||||
for time.Now().Before(deadline) && portOf(b.ConnectedServer()) != portOf(survivorURL) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
if err := b.Publish(roomID, []byte("after-kill")); err != nil {
|
||||
t.Fatalf("B publish 2: %v", err)
|
||||
}
|
||||
if !waitFor(&mu, &got, func(rs []string) bool { return contains(rs, "after-kill") }, 6*time.Second) {
|
||||
t.Fatalf("A did not receive a message after failover; got %v", snapshot(&mu, &got))
|
||||
}
|
||||
}
|
||||
|
||||
func contains(rs []string, want string) bool {
|
||||
for _, r := range rs {
|
||||
if r == want {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user