diff --git a/cmd/clientcheck/main.go b/cmd/clientcheck/main.go new file mode 100644 index 0000000..abcd5ba --- /dev/null +++ b/cmd/clientcheck/main.go @@ -0,0 +1,260 @@ +// Command clientcheck is an end-to-end verification client for a live unibus +// cluster (issue 0011 GAP B). The 0011 chaos test validated only the control +// plane (healthz + meta/stream-leader failover + KV readable with 2/3); it never +// connected an authenticated bus client (nkey + TLS) to create a room and +// publish/subscribe through it, least of all across a node loss. clientcheck does +// exactly that with a real identity (the operator), so the data-plane end-to-end +// path — connect, create an E2E room, publish, receive decrypted — is exercised +// against the running cluster, including while a node is stopped. +// +// It is a reusable tool, not a throwaway script: point it at the cluster's CA, +// an identity file, and the NATS + control-plane seed lists. +// +// # golden: connect, create an E2E room, publish N, confirm N decrypted back +// clientcheck --ca ca.crt --identity-file operator.id \ +// --nats-seeds nats://A:4250,nats://B:4250,nats://C:4250 \ +// --ctrl-seeds https://A:8470,https://B:8470,https://C:8470 --messages 5 +// +// # loop: publish a counter every interval for the duration, logging the node +// # it is attached to — stop a node mid-run (systemctl stop membershipd-cluster) +// # and watch it fail over to a survivor and keep receiving (quorum 2/3). +// clientcheck ... --mode loop --duration 45s --interval 1s +package main + +import ( + "crypto/rand" + "encoding/hex" + "flag" + "fmt" + "log" + "sort" + "strings" + "sync" + "time" + + "github.com/enmanuel/unibus/pkg/busauth" + "github.com/enmanuel/unibus/pkg/client" + "github.com/enmanuel/unibus/pkg/frame" + "github.com/enmanuel/unibus/pkg/room" +) + +func main() { + var ( + caPath = flag.String("ca", "", "bus CA cert pinning TLS on both planes (required for a secured cluster)") + idFile = flag.String("identity-file", "", "path to the client identity JSON (e.g. `pass show unibus/operator-identity` written 0600) (required)") + natsSeeds = flag.String("nats-seeds", "", "comma-separated NATS urls of the cluster nodes (required)") + ctrlSeeds = flag.String("ctrl-seeds", "", "comma-separated control-plane https urls of the cluster nodes (required)") + subject = flag.String("subject", "test.gapcheck", "test room subject PREFIX; a random token is appended so runs never collide with real rooms") + messages = flag.Int("messages", 5, "golden mode: number of messages to publish and expect back") + mode = flag.String("mode", "golden", "golden (publish N, verify N decrypted) | loop (publish a counter for --duration, for failover testing)") + duration = flag.Duration("duration", 30*time.Second, "loop mode: how long to keep publishing") + interval = flag.Duration("interval", 1*time.Second, "loop mode: delay between published messages") + ) + flag.Parse() + + if *idFile == "" || *natsSeeds == "" || *ctrlSeeds == "" { + log.Fatalf("clientcheck: --identity-file, --nats-seeds and --ctrl-seeds are required") + } + + id, err := client.LoadIdentity(*idFile) + if err != nil { + log.Fatalf("clientcheck: load identity: %v", err) + } + natsList := splitCSV(*natsSeeds) + ctrlList := splitCSV(*ctrlSeeds) + if len(natsList) == 0 || len(ctrlList) == 0 { + log.Fatalf("clientcheck: empty --nats-seeds or --ctrl-seeds") + } + + // Build the secure client options: nkey on the data plane, TLS pinned to the + // bus CA on both planes, and the FULL seed lists so nats.go fails over to a + // surviving node when the attached one dies (the failover this tool verifies). + opts := client.Options{ + NatsServers: natsList[1:], + CtrlURLs: ctrlList[1:], + } + if *caPath != "" { + tlsCfg, err := busauth.LoadCATLSConfig(*caPath) + if err != nil { + log.Fatalf("clientcheck: load CA: %v", err) + } + opts.UseNkey = true + opts.TLS = tlsCfg + opts.CtrlTLS = tlsCfg + for _, u := range ctrlList { + if !strings.HasPrefix(u, "https://") { + log.Fatalf("clientcheck: control URL %q must be https:// when --ca is set", u) + } + } + } + + c, err := client.NewWithOptions(natsList[0], ctrlList[0], id, opts) + if err != nil { + log.Fatalf("clientcheck: connect: %v", err) + } + defer c.Close() + log.Printf("connected: endpoint=%s nats=%s", c.Endpoint().ID, c.ConnectedServer()) + + // Create an EPHEMERAL E2E room (encrypted + signed, NOT persisted): the test + // stays end-to-end encrypted (the cluster requires encryption on a public + // bind) while leaving no durable JetStream stream behind. The random subject + // token guarantees the room is unique and never a real room. + rnd := make([]byte, 8) + if _, err := rand.Read(rnd); err != nil { + log.Fatalf("clientcheck: random: %v", err) + } + subj := fmt.Sprintf("%s.%s", *subject, hex.EncodeToString(rnd)) + policy := room.Policy{Encrypt: true, Persist: false, SignMsgs: true} + roomID, err := c.CreateRoom(subj, policy) + if err != nil { + log.Fatalf("clientcheck: create room: %v", err) + } + log.Printf("created E2E room: id=%s subject=%s (encrypt=%v sign=%v persist=%v)", roomID, subj, policy.Encrypt, policy.SignMsgs, policy.Persist) + + // Under the per-subject ACL, NATS freezes permissions at connect time, so the + // just-created room's subject is not yet publishable/subscribable on the live + // connection. RefreshSession reconnects so the authenticator re-derives the + // ACL (now including this room) — the post-0006 contract every client follows + // after a membership change. + if err := c.RefreshSession(); err != nil { + log.Fatalf("clientcheck: refresh session: %v", err) + } + + switch *mode { + case "golden": + runGolden(c, roomID, *messages) + case "loop": + runLoop(c, roomID, *duration, *interval) + default: + log.Fatalf("clientcheck: --mode must be golden or loop, got %q", *mode) + } +} + +// runGolden subscribes, publishes n messages, and asserts all n come back +// decrypted. Exits non-zero if any are missing. +func runGolden(c *client.Client, roomID string, n int) { + var mu sync.Mutex + got := map[string]bool{} + sub, err := c.Subscribe(roomID, func(_ frame.Frame, plaintext []byte) { + mu.Lock() + got[string(plaintext)] = true + mu.Unlock() + }) + if err != nil { + log.Fatalf("clientcheck: subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(300 * time.Millisecond) // let the subscription settle + + want := make([]string, n) + for i := 0; i < n; i++ { + msg := fmt.Sprintf("gapcheck-e2e-%d", i) + want[i] = msg + if err := c.Publish(roomID, []byte(msg)); err != nil { + log.Fatalf("clientcheck: publish %d: %v", i, err) + } + } + log.Printf("published %d messages to %s; waiting for decrypted echoes...", n, roomID) + + deadline := time.Now().Add(15 * time.Second) + for time.Now().Before(deadline) { + mu.Lock() + have := len(got) + mu.Unlock() + if have >= n { + break + } + time.Sleep(100 * time.Millisecond) + } + + mu.Lock() + defer mu.Unlock() + missing := 0 + for _, w := range want { + if !got[w] { + missing++ + log.Printf(" MISSING: %q", w) + } + } + log.Printf("connected node at finish: %s", c.ConnectedServer()) + if missing > 0 { + log.Fatalf("GOLDEN FAIL: %d/%d messages not received decrypted", missing, n) + } + log.Printf("GOLDEN OK: all %d messages received and decrypted end-to-end", n) +} + +// runLoop publishes a numbered message every interval for the duration and logs +// the count received plus the node currently attached, so an operator stopping a +// cluster node mid-run sees the client fail over to a survivor and keep receiving +// (quorum 2/3). It is the live failover-with-a-connected-client test the 0011 +// chaos run never performed. +func runLoop(c *client.Client, roomID string, duration, interval time.Duration) { + var mu sync.Mutex + received := 0 + servers := map[string]int{} // node -> #ticks observed attached + sub, err := c.Subscribe(roomID, func(_ frame.Frame, _ []byte) { + mu.Lock() + received++ + mu.Unlock() + }) + if err != nil { + log.Fatalf("clientcheck: subscribe: %v", err) + } + defer sub.Unsubscribe() + time.Sleep(300 * time.Millisecond) + + log.Printf("loop: publishing every %s for %s — stop a node now to test failover", interval, duration) + end := time.Now().Add(duration) + sent := 0 + for time.Now().Before(end) { + msg := fmt.Sprintf("gapcheck-loop-%d", sent) + err := c.Publish(roomID, []byte(msg)) + sent++ + mu.Lock() + recv := received + mu.Unlock() + node := c.ConnectedServer() + up := c.IsConnected() + if node != "" { + mu.Lock() + servers[node]++ + mu.Unlock() + } + pubStatus := "ok" + if err != nil { + pubStatus = "ERR:" + err.Error() + } + log.Printf(" t=%2ds sent=%d recv=%d up=%v node=%s publish=%s", + sent, sent, recv, up, node, pubStatus) + time.Sleep(interval) + } + + mu.Lock() + defer mu.Unlock() + log.Printf("loop done: sent=%d received=%d", sent, received) + nodes := make([]string, 0, len(servers)) + for n := range servers { + nodes = append(nodes, n) + } + sort.Strings(nodes) + for _, n := range nodes { + log.Printf(" attached to %s for %d ticks", n, servers[n]) + } + if len(servers) > 1 { + log.Printf("FAILOVER OBSERVED: client was attached to %d distinct nodes across the run", len(servers)) + } + if received == 0 { + log.Fatalf("LOOP FAIL: received 0 messages") + } + log.Printf("LOOP OK: client kept receiving across the run (received=%d)", received) +} + +func splitCSV(s string) []string { + var out []string + for _, p := range strings.Split(s, ",") { + if p = strings.TrimSpace(p); p != "" { + out = append(out, p) + } + } + return out +}