package main import ( "context" "encoding/json" "fmt" "net/http" "time" "fn-registry/functions/infra" "nhooyr.io/websocket" ) const ( sseHeartbeat = 25 * time.Second wsChatHeartbeat = 30 * time.Second wsChatReadLimit = 64 * 1024 wsChatWriteWait = 5 * time.Second typingDebounceMs = 1500 ) // handleEventStream serves the per-user SSE channel. // // One stream per browser tab. Auto-reconnect lives on the client (browsers // retry EventSource by default). The server publishes: // // board.* — column/card mutations (broadcast to every user). // message.created — chat message added on any card (broadcast). // notification.* — private events for one recipient (UserID set). func handleEventStream(hub *EventHub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey) if userID == "" { infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"}) return } flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) // Initial flush so the browser knows the stream is open. fmt.Fprint(w, ": hello\n\n") flusher.Flush() ch := hub.SubscribeUser(userID) defer hub.UnsubscribeUser(userID, ch) ticker := time.NewTicker(sseHeartbeat) defer ticker.Stop() for { select { case <-r.Context().Done(): return case <-ticker.C: if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil { return } flusher.Flush() case ev, ok := <-ch: if !ok { return } if ev.UserID != "" && ev.UserID != userID { // Defensive: hub already routes private events but the // broadcast path could leak if a future change adds // fan-out. Skip explicitly. continue } b, err := json.Marshal(ev) if err != nil { continue } if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev.Type, b); err != nil { return } flusher.Flush() } } } } // cardChatWSIn is the message sent by the browser over the per-card WS. type cardChatWSIn struct { Type string `json:"type"` // "send" | "typing" Body string `json:"body,omitempty"` // only for "send" } // cardChatWSOut is the message the server pushes to subscribers of a card. // // Types: // // message.created — new CardMessage (full payload). // typing — UserID is typing (no body). // error — server-side error, connection stays open. type cardChatWSOut struct { Type string `json:"type"` Message *CardMessage `json:"message,omitempty"` UserID string `json:"user_id,omitempty"` Error string `json:"error,omitempty"` } // handleCardChatWS upgrades the request to WebSocket and provides bidirectional // realtime chat for a single card. Each connection is subscribed to the // card's event channel; sends originating from this connection are persisted // then republished through the hub so peer connections (including this one) // see them. func handleCardChatWS(db *DB, hub *EventHub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cardID := r.PathValue("id") userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey) if userID == "" { infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"}) return } // Confirm card exists before upgrading to avoid leaking goroutines on // invalid IDs. var exists int if err := db.conn.QueryRow(`SELECT 1 FROM cards WHERE id = ?`, cardID).Scan(&exists); err != nil { notFound(w, "card not found") return } conn, err := infra.WSUpgrader(w, r, []string{"*"}) if err != nil { return } defer conn.Close(websocket.StatusInternalError, "internal") conn.SetReadLimit(wsChatReadLimit) ch := hub.SubscribeCard(cardID) defer hub.UnsubscribeCard(cardID, ch) ctx, cancel := context.WithCancel(r.Context()) defer cancel() // Writer goroutine: forward hub events to this socket. go func() { ticker := time.NewTicker(wsChatHeartbeat) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: wctx, c := context.WithTimeout(ctx, wsChatWriteWait) _ = conn.Ping(wctx) c() case ev, ok := <-ch: if !ok { return } if ev.CardID != cardID { continue } out := cardChatWSOut{Type: ev.Type} switch ev.Type { case "message.created": var m CardMessage if err := json.Unmarshal(ev.Payload, &m); err == nil { out.Message = &m } case "card.typing": var p struct { UserID string `json:"user_id"` } _ = json.Unmarshal(ev.Payload, &p) // Skip echoing the typer's own indicator. if p.UserID == userID { continue } out.UserID = p.UserID default: continue } b, err := json.Marshal(out) if err != nil { continue } wctx, c := context.WithTimeout(ctx, wsChatWriteWait) if err := conn.Write(wctx, websocket.MessageText, b); err != nil { c() cancel() return } c() } } }() // Reader loop: persist sends and broadcast typing. for { _, raw, err := conn.Read(ctx) if err != nil { return } var in cardChatWSIn if err := json.Unmarshal(raw, &in); err != nil { continue } switch in.Type { case "send": if in.Body == "" { continue } if _, _, _, err := db.CreateCardMessageAndNotify(cardID, userID, in.Body, hub); err != nil { b, _ := json.Marshal(cardChatWSOut{Type: "error", Error: err.Error()}) wctx, c := context.WithTimeout(ctx, wsChatWriteWait) _ = conn.Write(wctx, websocket.MessageText, b) c() } case "typing": hub.PublishJSON("card.typing", cardID, "", map[string]string{"user_id": userID}) } } } } // Notification HTTP handlers. func handleListNotifications(db *DB) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey) if userID == "" { infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"}) return } onlyUnread := r.URL.Query().Get("unread") == "1" limit := 50 out, err := db.ListNotifications(userID, onlyUnread, limit) if err != nil { serverError(w, err) return } infra.HTTPJSONResponse(w, http.StatusOK, out) } } func handleUnreadCount(db *DB) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey) if userID == "" { infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"}) return } n, err := db.CountUnreadNotifications(userID) if err != nil { serverError(w, err) return } infra.HTTPJSONResponse(w, http.StatusOK, map[string]int{"count": n}) } } func handleMarkNotificationRead(db *DB, hub *EventHub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey) if userID == "" { infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"}) return } id := r.PathValue("id") if err := db.MarkNotificationRead(userID, id); err != nil { serverError(w, err) return } if hub != nil { hub.PublishJSON("notification.read", "", userID, map[string]string{"id": id}) } w.WriteHeader(http.StatusNoContent) } } func handleMarkAllNotificationsRead(db *DB, hub *EventHub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey) if userID == "" { infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"}) return } n, err := db.MarkAllNotificationsRead(userID) if err != nil { serverError(w, err) return } if hub != nil { hub.PublishJSON("notification.read_all", "", userID, map[string]int{"count": n}) } infra.HTTPJSONResponse(w, http.StatusOK, map[string]int{"count": n}) } }