package main import ( "net" "net/http" "net/http/httptest" "path/filepath" "sync" "testing" "time" cs "fn-registry/functions/cybersecurity" "github.com/enmanuel/unibus/pkg/blobstore" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/embeddednats" "github.com/enmanuel/unibus/pkg/frame" "github.com/enmanuel/unibus/pkg/membership" "github.com/enmanuel/unibus/pkg/room" ) // testHarness boots an isolated embedded NATS server + in-process membershipd on // their OWN free ports (never the productive 8470/4250 nor the user's running // playground on 7700/8480/4260) and tears everything down by handle. This mirrors // the unibus client_test harness so the echobot is exercised against the real bus. type testHarness struct { natsURL string ctrlURL string } func freePort(t *testing.T) int { t.Helper() l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("free port: %v", err) } defer l.Close() return l.Addr().(*net.TCPAddr).Port } func newHarness(t *testing.T) *testHarness { t.Helper() dir := t.TempDir() ns, err := embeddednats.Start(filepath.Join(dir, "js"), freePort(t)) if err != nil { t.Fatalf("embedded nats: %v", err) } store, err := membership.Open(filepath.Join(dir, "unibus.db")) if err != nil { ns.Shutdown() t.Fatalf("membership store: %v", err) } blobs, err := blobstore.New(filepath.Join(dir, "blobs")) if err != nil { store.Close() ns.Shutdown() t.Fatalf("blob store: %v", err) } srv := membership.NewServer(store, blobs) httpts := httptest.NewServer(srv) t.Cleanup(func() { httpts.Close() store.Close() ns.Shutdown() ns.WaitForShutdown() }) return &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL} } func waitHealth(t *testing.T, ctrlURL string) { t.Helper() deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { resp, err := http.Get(ctrlURL + "/healthz") if err == nil && resp.StatusCode == 200 { resp.Body.Close() return } if resp != nil { resp.Body.Close() } time.Sleep(50 * time.Millisecond) } t.Fatalf("membershipd never became healthy") } func mustIdentity(t *testing.T) cs.Identity { t.Helper() id, err := cs.GenerateIdentity() if err != nil { t.Fatalf("generate identity: %v", err) } return id } func waitFor(mu *sync.Mutex, slice *[]string, pred func([]string) bool, timeout time.Duration) bool { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { mu.Lock() cp := append([]string(nil), (*slice)...) mu.Unlock() if pred(cp) { return true } time.Sleep(25 * time.Millisecond) } return false } func snapshot(mu *sync.Mutex, slice *[]string) []string { mu.Lock() defer mu.Unlock() return append([]string(nil), (*slice)...) } func contains(rs []string, want string) bool { for _, r := range rs { if r == want { return true } } return false } // startEchobot wires up the echobot's chat + rpc behaviour against the given bus, // using the same logic the main() entry point runs. It returns the bot client and // its endpoint id so callers can assert the anti-loop guard. Cleanup is registered // on the test. func startEchobot(t *testing.T, h *testHarness, roomSubject, rpcSubject string) (*client.Client, string) { t.Helper() bot, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) if err != nil { t.Fatalf("connect echobot: %v", err) } selfID := bot.Endpoint().ID chatRoom, err := bot.CreateRoom(roomSubject, room.ModeNATS) if err != nil { bot.Close() t.Fatalf("echobot create chat room: %v", err) } chatSub, err := bot.Subscribe(chatRoom, func(f frame.Frame, plaintext []byte) { if f.Sender == selfID { return // anti-loop guard } _ = bot.Publish(chatRoom, []byte("echo: "+string(plaintext))) }) if err != nil { bot.Close() t.Fatalf("echobot subscribe chat: %v", err) } rpcSub, err := bot.Reply(rpcSubject, func(body []byte) []byte { return []byte("echo: " + string(body)) }) if err != nil { chatSub.Unsubscribe() bot.Close() t.Fatalf("echobot reply: %v", err) } t.Cleanup(func() { rpcSub.Unsubscribe() chatSub.Unsubscribe() bot.Close() }) return bot, selfID } // TestChatEcho: a "human" peer publishes "hola" on the echo subject; the echobot // replies "echo: hola". Asserts the human receives the echo and that the echobot // never echoes its own messages (no infinite loop). func TestChatEcho(t *testing.T) { h := newHarness(t) waitHealth(t, h.ctrlURL) const subject = "room.echo.test" _, botID := startEchobot(t, h, subject, "rpc.echo.test") human, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) if err != nil { t.Fatalf("connect human: %v", err) } defer human.Close() humanRoom, err := human.CreateRoom(subject, room.ModeNATS) if err != nil { t.Fatalf("human create room: %v", err) } var mu sync.Mutex var received []string var echoSenders []string hsub, err := human.Subscribe(humanRoom, func(f frame.Frame, plaintext []byte) { mu.Lock() received = append(received, string(plaintext)) if string(plaintext) == "echo: hola" { echoSenders = append(echoSenders, f.Sender) } mu.Unlock() }) if err != nil { t.Fatalf("human subscribe: %v", err) } defer hsub.Unsubscribe() // Let both subscriptions settle before publishing. time.Sleep(200 * time.Millisecond) if err := human.Publish(humanRoom, []byte("hola")); err != nil { t.Fatalf("human publish: %v", err) } if !waitFor(&mu, &received, func(rs []string) bool { return contains(rs, "echo: hola") }, 2*time.Second) { t.Fatalf("human never received the echo; got %v", snapshot(&mu, &received)) } // The echo must come from the bot, not the human (sanity on routing). mu.Lock() senders := append([]string(nil), echoSenders...) mu.Unlock() for _, s := range senders { if s != botID { t.Fatalf("echo came from %q, expected echobot %q", s, botID) } } // Anti-loop: give the bus time to spin if the guard were broken, then assert // the bot did not re-echo its own "echo: hola" into "echo: echo: hola". time.Sleep(500 * time.Millisecond) for _, r := range snapshot(&mu, &received) { if r == "echo: echo: hola" { t.Fatalf("anti-loop guard broken: bot echoed its own message (%q)", r) } } } // TestRPCEcho: a process peer issues Request(rpc-subject, "ping") and gets back // "echo: ping". The unibus client library exposes request/reply, so this mode is // fully supported (see client.go: Client.Request / Client.Reply). func TestRPCEcho(t *testing.T) { h := newHarness(t) waitHealth(t, h.ctrlURL) const rpcSubject = "rpc.echo.test" startEchobot(t, h, "room.echo.test", rpcSubject) caller, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t)) if err != nil { t.Fatalf("connect caller: %v", err) } defer caller.Close() // Give the responder time to subscribe. time.Sleep(150 * time.Millisecond) resp, err := caller.Request(rpcSubject, []byte("ping"), 2*time.Second) if err != nil { t.Fatalf("rpc request: %v", err) } if got, want := string(resp), "echo: ping"; got != want { t.Fatalf("rpc echo mismatch: got %q want %q", got, want) } }