268 lines
7.0 KiB
Go
268 lines
7.0 KiB
Go
package main
|
|
|
|
import (
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
cs "fn-registry/functions/cybersecurity"
|
|
|
|
"github.com/enmanuel/unibus/pkg/blobstore"
|
|
"github.com/enmanuel/unibus/pkg/client"
|
|
"github.com/enmanuel/unibus/pkg/embeddednats"
|
|
"github.com/enmanuel/unibus/pkg/frame"
|
|
"github.com/enmanuel/unibus/pkg/membership"
|
|
"github.com/enmanuel/unibus/pkg/room"
|
|
)
|
|
|
|
// testHarness boots an isolated embedded NATS server + in-process membershipd on
|
|
// their OWN free ports (never the productive 8470/4250 nor the user's running
|
|
// playground on 7700/8480/4260) and tears everything down by handle. This mirrors
|
|
// the unibus client_test harness so the echobot is exercised against the real bus.
|
|
type testHarness struct {
|
|
natsURL string
|
|
ctrlURL string
|
|
}
|
|
|
|
func freePort(t *testing.T) int {
|
|
t.Helper()
|
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("free port: %v", err)
|
|
}
|
|
defer l.Close()
|
|
return l.Addr().(*net.TCPAddr).Port
|
|
}
|
|
|
|
func newHarness(t *testing.T) *testHarness {
|
|
t.Helper()
|
|
dir := t.TempDir()
|
|
|
|
ns, err := embeddednats.Start(filepath.Join(dir, "js"), freePort(t))
|
|
if err != nil {
|
|
t.Fatalf("embedded nats: %v", err)
|
|
}
|
|
|
|
store, err := membership.Open(filepath.Join(dir, "unibus.db"))
|
|
if err != nil {
|
|
ns.Shutdown()
|
|
t.Fatalf("membership store: %v", err)
|
|
}
|
|
blobs, err := blobstore.New(filepath.Join(dir, "blobs"))
|
|
if err != nil {
|
|
store.Close()
|
|
ns.Shutdown()
|
|
t.Fatalf("blob store: %v", err)
|
|
}
|
|
srv := membership.NewServer(store, blobs)
|
|
httpts := httptest.NewServer(srv)
|
|
|
|
t.Cleanup(func() {
|
|
httpts.Close()
|
|
store.Close()
|
|
ns.Shutdown()
|
|
ns.WaitForShutdown()
|
|
})
|
|
|
|
return &testHarness{natsURL: embeddednats.ClientURL(ns), ctrlURL: httpts.URL}
|
|
}
|
|
|
|
func waitHealth(t *testing.T, ctrlURL string) {
|
|
t.Helper()
|
|
deadline := time.Now().Add(3 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
resp, err := http.Get(ctrlURL + "/healthz")
|
|
if err == nil && resp.StatusCode == 200 {
|
|
resp.Body.Close()
|
|
return
|
|
}
|
|
if resp != nil {
|
|
resp.Body.Close()
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
t.Fatalf("membershipd never became healthy")
|
|
}
|
|
|
|
func mustIdentity(t *testing.T) cs.Identity {
|
|
t.Helper()
|
|
id, err := cs.GenerateIdentity()
|
|
if err != nil {
|
|
t.Fatalf("generate identity: %v", err)
|
|
}
|
|
return id
|
|
}
|
|
|
|
func waitFor(mu *sync.Mutex, slice *[]string, pred func([]string) bool, timeout time.Duration) bool {
|
|
deadline := time.Now().Add(timeout)
|
|
for time.Now().Before(deadline) {
|
|
mu.Lock()
|
|
cp := append([]string(nil), (*slice)...)
|
|
mu.Unlock()
|
|
if pred(cp) {
|
|
return true
|
|
}
|
|
time.Sleep(25 * time.Millisecond)
|
|
}
|
|
return false
|
|
}
|
|
|
|
func snapshot(mu *sync.Mutex, slice *[]string) []string {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
return append([]string(nil), (*slice)...)
|
|
}
|
|
|
|
func contains(rs []string, want string) bool {
|
|
for _, r := range rs {
|
|
if r == want {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// startEchobot wires up the echobot's chat + rpc behaviour against the given bus,
|
|
// using the same logic the main() entry point runs. It returns the bot client and
|
|
// its endpoint id so callers can assert the anti-loop guard. Cleanup is registered
|
|
// on the test.
|
|
func startEchobot(t *testing.T, h *testHarness, roomSubject, rpcSubject string) (*client.Client, string) {
|
|
t.Helper()
|
|
bot, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
|
if err != nil {
|
|
t.Fatalf("connect echobot: %v", err)
|
|
}
|
|
selfID := bot.Endpoint().ID
|
|
|
|
chatRoom, err := bot.CreateRoom(roomSubject, room.ModeNATS)
|
|
if err != nil {
|
|
bot.Close()
|
|
t.Fatalf("echobot create chat room: %v", err)
|
|
}
|
|
chatSub, err := bot.Subscribe(chatRoom, func(f frame.Frame, plaintext []byte) {
|
|
if f.Sender == selfID {
|
|
return // anti-loop guard
|
|
}
|
|
_ = bot.Publish(chatRoom, []byte("echo: "+string(plaintext)))
|
|
})
|
|
if err != nil {
|
|
bot.Close()
|
|
t.Fatalf("echobot subscribe chat: %v", err)
|
|
}
|
|
rpcSub, err := bot.Reply(rpcSubject, func(body []byte) []byte {
|
|
return []byte("echo: " + string(body))
|
|
})
|
|
if err != nil {
|
|
chatSub.Unsubscribe()
|
|
bot.Close()
|
|
t.Fatalf("echobot reply: %v", err)
|
|
}
|
|
|
|
t.Cleanup(func() {
|
|
rpcSub.Unsubscribe()
|
|
chatSub.Unsubscribe()
|
|
bot.Close()
|
|
})
|
|
return bot, selfID
|
|
}
|
|
|
|
// TestChatEcho: a "human" peer publishes "hola" on the echo subject; the echobot
|
|
// replies "echo: hola". Asserts the human receives the echo and that the echobot
|
|
// never echoes its own messages (no infinite loop).
|
|
func TestChatEcho(t *testing.T) {
|
|
h := newHarness(t)
|
|
waitHealth(t, h.ctrlURL)
|
|
|
|
const subject = "room.echo.test"
|
|
_, botID := startEchobot(t, h, subject, "rpc.echo.test")
|
|
|
|
human, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
|
if err != nil {
|
|
t.Fatalf("connect human: %v", err)
|
|
}
|
|
defer human.Close()
|
|
|
|
humanRoom, err := human.CreateRoom(subject, room.ModeNATS)
|
|
if err != nil {
|
|
t.Fatalf("human create room: %v", err)
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
var received []string
|
|
var echoSenders []string
|
|
hsub, err := human.Subscribe(humanRoom, func(f frame.Frame, plaintext []byte) {
|
|
mu.Lock()
|
|
received = append(received, string(plaintext))
|
|
if string(plaintext) == "echo: hola" {
|
|
echoSenders = append(echoSenders, f.Sender)
|
|
}
|
|
mu.Unlock()
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("human subscribe: %v", err)
|
|
}
|
|
defer hsub.Unsubscribe()
|
|
|
|
// Let both subscriptions settle before publishing.
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
if err := human.Publish(humanRoom, []byte("hola")); err != nil {
|
|
t.Fatalf("human publish: %v", err)
|
|
}
|
|
|
|
if !waitFor(&mu, &received, func(rs []string) bool { return contains(rs, "echo: hola") }, 2*time.Second) {
|
|
t.Fatalf("human never received the echo; got %v", snapshot(&mu, &received))
|
|
}
|
|
|
|
// The echo must come from the bot, not the human (sanity on routing).
|
|
mu.Lock()
|
|
senders := append([]string(nil), echoSenders...)
|
|
mu.Unlock()
|
|
for _, s := range senders {
|
|
if s != botID {
|
|
t.Fatalf("echo came from %q, expected echobot %q", s, botID)
|
|
}
|
|
}
|
|
|
|
// Anti-loop: give the bus time to spin if the guard were broken, then assert
|
|
// the bot did not re-echo its own "echo: hola" into "echo: echo: hola".
|
|
time.Sleep(500 * time.Millisecond)
|
|
for _, r := range snapshot(&mu, &received) {
|
|
if r == "echo: echo: hola" {
|
|
t.Fatalf("anti-loop guard broken: bot echoed its own message (%q)", r)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestRPCEcho: a process peer issues Request(rpc-subject, "ping") and gets back
|
|
// "echo: ping". The unibus client library exposes request/reply, so this mode is
|
|
// fully supported (see client.go: Client.Request / Client.Reply).
|
|
func TestRPCEcho(t *testing.T) {
|
|
h := newHarness(t)
|
|
waitHealth(t, h.ctrlURL)
|
|
|
|
const rpcSubject = "rpc.echo.test"
|
|
startEchobot(t, h, "room.echo.test", rpcSubject)
|
|
|
|
caller, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
|
|
if err != nil {
|
|
t.Fatalf("connect caller: %v", err)
|
|
}
|
|
defer caller.Close()
|
|
|
|
// Give the responder time to subscribe.
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
resp, err := caller.Request(rpcSubject, []byte("ping"), 2*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("rpc request: %v", err)
|
|
}
|
|
if got, want := string(resp), "echo: ping"; got != want {
|
|
t.Fatalf("rpc echo mismatch: got %q want %q", got, want)
|
|
}
|
|
}
|