package main import ( "encoding/json" "sync" "sync/atomic" "time" ) // EventHub is an in-process pub/sub used to push board mutations and // notifications to connected clients (SSE for board-wide events, WS for // per-card chat). Drop policy on slow consumers: best-effort send; if a // subscriber's buffered channel is full the event is dropped and the // hub increments dropCount. Clients are expected to reconcile state via // a full reload when reconnecting. type EventHub struct { mu sync.RWMutex userSubs map[string]map[chan Event]struct{} cardSubs map[string]map[chan Event]struct{} dropCount uint64 } // Event is the envelope broadcast to subscribers. // // Type — discriminator (e.g. "card.updated", "message.created"). // CardID — set when payload pertains to a specific card. // UserID — set for per-user private events (e.g. notifications). Empty // means broadcast to every user subscriber. // Payload — arbitrary JSON describing the change. // TS — RFC3339 timestamp. type Event struct { Type string `json:"type"` CardID string `json:"card_id,omitempty"` UserID string `json:"user_id,omitempty"` Payload json.RawMessage `json:"payload,omitempty"` TS string `json:"ts"` } const eventBufSize = 64 func NewEventHub() *EventHub { return &EventHub{ userSubs: map[string]map[chan Event]struct{}{}, cardSubs: map[string]map[chan Event]struct{}{}, } } // SubscribeUser returns a channel that receives every public event plus // private events targeted at userID. Caller MUST eventually call // UnsubscribeUser to release resources. func (h *EventHub) SubscribeUser(userID string) chan Event { ch := make(chan Event, eventBufSize) h.mu.Lock() set, ok := h.userSubs[userID] if !ok { set = map[chan Event]struct{}{} h.userSubs[userID] = set } set[ch] = struct{}{} h.mu.Unlock() return ch } func (h *EventHub) UnsubscribeUser(userID string, ch chan Event) { h.mu.Lock() if set, ok := h.userSubs[userID]; ok { delete(set, ch) if len(set) == 0 { delete(h.userSubs, userID) } } h.mu.Unlock() close(ch) } // SubscribeCard returns a channel that receives events scoped to cardID // (chat messages + typing indicators). func (h *EventHub) SubscribeCard(cardID string) chan Event { ch := make(chan Event, eventBufSize) h.mu.Lock() set, ok := h.cardSubs[cardID] if !ok { set = map[chan Event]struct{}{} h.cardSubs[cardID] = set } set[ch] = struct{}{} h.mu.Unlock() return ch } func (h *EventHub) UnsubscribeCard(cardID string, ch chan Event) { h.mu.Lock() if set, ok := h.cardSubs[cardID]; ok { delete(set, ch) if len(set) == 0 { delete(h.cardSubs, cardID) } } h.mu.Unlock() close(ch) } // Publish delivers ev to every matching subscriber. If ev.UserID is set // it is delivered ONLY to that user's subscribers; otherwise it fans out // to all user subscribers. Card subscribers ALWAYS receive events that // match ev.CardID. Best-effort: full channels are skipped. func (h *EventHub) Publish(ev Event) { if ev.TS == "" { ev.TS = time.Now().UTC().Format(time.RFC3339) } h.mu.RLock() defer h.mu.RUnlock() deliver := func(ch chan Event) { select { case ch <- ev: default: atomic.AddUint64(&h.dropCount, 1) } } if ev.UserID != "" { if set, ok := h.userSubs[ev.UserID]; ok { for ch := range set { deliver(ch) } } } else { for _, set := range h.userSubs { for ch := range set { deliver(ch) } } } if ev.CardID != "" { if set, ok := h.cardSubs[ev.CardID]; ok { for ch := range set { deliver(ch) } } } } func (h *EventHub) DropCount() uint64 { return atomic.LoadUint64(&h.dropCount) } // PublishJSON marshals payload and publishes a single Event. func (h *EventHub) PublishJSON(typ, cardID, userID string, payload interface{}) { var raw json.RawMessage if payload != nil { b, err := json.Marshal(payload) if err == nil { raw = b } } h.Publish(Event{Type: typ, CardID: cardID, UserID: userID, Payload: raw}) }