package main import ( "sync" "sync/atomic" "testing" "time" ) func TestEventHub_BroadcastToAllUsers(t *testing.T) { hub := NewEventHub() chA := hub.SubscribeUser("alice") chB := hub.SubscribeUser("bob") defer hub.UnsubscribeUser("alice", chA) defer hub.UnsubscribeUser("bob", chB) hub.PublishJSON("card.updated", "c1", "", map[string]string{"id": "c1"}) for _, ch := range []chan Event{chA, chB} { select { case ev := <-ch: if ev.Type != "card.updated" { t.Fatalf("type = %q, want card.updated", ev.Type) } case <-time.After(time.Second): t.Fatal("timeout waiting for event") } } } func TestEventHub_PrivateUserEvent(t *testing.T) { hub := NewEventHub() chA := hub.SubscribeUser("alice") chB := hub.SubscribeUser("bob") defer hub.UnsubscribeUser("alice", chA) defer hub.UnsubscribeUser("bob", chB) hub.PublishJSON("notification.created", "", "alice", map[string]string{"foo": "bar"}) select { case ev := <-chA: if ev.UserID != "alice" { t.Fatalf("user_id = %q, want alice", ev.UserID) } case <-time.After(time.Second): t.Fatal("alice did not get private event") } select { case ev := <-chB: t.Fatalf("bob received private event for alice: %+v", ev) case <-time.After(100 * time.Millisecond): // expected } } func TestEventHub_CardSubscription(t *testing.T) { hub := NewEventHub() ch := hub.SubscribeCard("card-1") defer hub.UnsubscribeCard("card-1", ch) hub.PublishJSON("message.created", "card-1", "", map[string]string{"id": "m1"}) hub.PublishJSON("message.created", "card-2", "", map[string]string{"id": "m2"}) select { case ev := <-ch: if ev.CardID != "card-1" { t.Fatalf("card_id = %q, want card-1", ev.CardID) } case <-time.After(time.Second): t.Fatal("timeout") } select { case ev := <-ch: t.Fatalf("received unexpected event for other card: %+v", ev) case <-time.After(100 * time.Millisecond): } } func TestEventHub_DropPolicyOnSlowConsumer(t *testing.T) { hub := NewEventHub() ch := hub.SubscribeUser("slow") defer hub.UnsubscribeUser("slow", ch) // Fill the buffer + N extra to force drops. const extra = 50 for i := 0; i < eventBufSize+extra; i++ { hub.PublishJSON("noise", "", "slow", nil) } if got := hub.DropCount(); got < extra { t.Fatalf("DropCount = %d, want >= %d", got, extra) } } func TestEventHub_UnsubscribeRemoves(t *testing.T) { hub := NewEventHub() ch := hub.SubscribeUser("alice") hub.UnsubscribeUser("alice", ch) // channel must be closed select { case _, ok := <-ch: if ok { t.Fatal("expected closed channel") } default: // channel could be drained-and-closed } // Publish should not panic and should not deliver anywhere. hub.PublishJSON("noise", "", "alice", nil) } func TestEventHub_ConcurrentPublishers(t *testing.T) { hub := NewEventHub() ch := hub.SubscribeUser("u") defer hub.UnsubscribeUser("u", ch) var received atomic.Uint64 done := make(chan struct{}) go func() { for range ch { received.Add(1) } close(done) }() var wg sync.WaitGroup const writers = 10 const each = 100 for i := 0; i < writers; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < each; j++ { hub.PublishJSON("ping", "", "u", nil) } }() } wg.Wait() // Give the consumer time to drain. time.Sleep(200 * time.Millisecond) got := received.Load() dropped := hub.DropCount() if got+dropped < writers*each { t.Fatalf("received=%d drop=%d want sum >= %d", got, dropped, writers*each) } }