Files
egutierrez 2524340759 chore: auto-commit (21 archivos)
- app.md
- backend/dist/assets/index-D_Kep7Fb.js
- backend/dist/index.html
- backend/handlers.go
- backend/main.go
- frontend/src/App.tsx
- frontend/src/api.ts
- frontend/src/components/CardChatPanel.tsx
- frontend/src/components/LoginPage.tsx
- frontend/src/types.ts
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 18:17:04 +02:00

298 lines
8.4 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"fn-registry/functions/infra"
"nhooyr.io/websocket"
)
const (
sseHeartbeat = 25 * time.Second
wsChatHeartbeat = 30 * time.Second
wsChatReadLimit = 64 * 1024
wsChatWriteWait = 5 * time.Second
typingDebounceMs = 1500
)
// handleEventStream serves the per-user SSE channel.
//
// One stream per browser tab. Auto-reconnect lives on the client (browsers
// retry EventSource by default). The server publishes:
//
// board.* — column/card mutations (broadcast to every user).
// message.created — chat message added on any card (broadcast).
// notification.* — private events for one recipient (UserID set).
func handleEventStream(hub *EventHub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey)
if userID == "" {
infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"})
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Initial flush so the browser knows the stream is open.
fmt.Fprint(w, ": hello\n\n")
flusher.Flush()
ch := hub.SubscribeUser(userID)
defer hub.UnsubscribeUser(userID, ch)
ticker := time.NewTicker(sseHeartbeat)
defer ticker.Stop()
for {
select {
case <-r.Context().Done():
return
case <-ticker.C:
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case ev, ok := <-ch:
if !ok {
return
}
if ev.UserID != "" && ev.UserID != userID {
// Defensive: hub already routes private events but the
// broadcast path could leak if a future change adds
// fan-out. Skip explicitly.
continue
}
b, err := json.Marshal(ev)
if err != nil {
continue
}
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev.Type, b); err != nil {
return
}
flusher.Flush()
}
}
}
}
// cardChatWSIn is the message sent by the browser over the per-card WS.
type cardChatWSIn struct {
Type string `json:"type"` // "send" | "typing"
Body string `json:"body,omitempty"` // only for "send"
}
// cardChatWSOut is the message the server pushes to subscribers of a card.
//
// Types:
//
// message.created — new CardMessage (full payload).
// typing — UserID is typing (no body).
// error — server-side error, connection stays open.
type cardChatWSOut struct {
Type string `json:"type"`
Message *CardMessage `json:"message,omitempty"`
UserID string `json:"user_id,omitempty"`
Error string `json:"error,omitempty"`
}
// handleCardChatWS upgrades the request to WebSocket and provides bidirectional
// realtime chat for a single card. Each connection is subscribed to the
// card's event channel; sends originating from this connection are persisted
// then republished through the hub so peer connections (including this one)
// see them.
func handleCardChatWS(db *DB, hub *EventHub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cardID := r.PathValue("id")
userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey)
if userID == "" {
infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"})
return
}
// Confirm card exists before upgrading to avoid leaking goroutines on
// invalid IDs.
var exists int
if err := db.conn.QueryRow(`SELECT 1 FROM cards WHERE id = ?`, cardID).Scan(&exists); err != nil {
notFound(w, "card not found")
return
}
conn, err := infra.WSUpgrader(w, r, []string{"*"})
if err != nil {
return
}
defer conn.Close(websocket.StatusInternalError, "internal")
conn.SetReadLimit(wsChatReadLimit)
ch := hub.SubscribeCard(cardID)
defer hub.UnsubscribeCard(cardID, ch)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
// Writer goroutine: forward hub events to this socket.
go func() {
ticker := time.NewTicker(wsChatHeartbeat)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
wctx, c := context.WithTimeout(ctx, wsChatWriteWait)
_ = conn.Ping(wctx)
c()
case ev, ok := <-ch:
if !ok {
return
}
if ev.CardID != cardID {
continue
}
out := cardChatWSOut{Type: ev.Type}
switch ev.Type {
case "message.created":
var m CardMessage
if err := json.Unmarshal(ev.Payload, &m); err == nil {
out.Message = &m
}
case "card.typing":
var p struct {
UserID string `json:"user_id"`
}
_ = json.Unmarshal(ev.Payload, &p)
// Skip echoing the typer's own indicator.
if p.UserID == userID {
continue
}
out.UserID = p.UserID
default:
continue
}
b, err := json.Marshal(out)
if err != nil {
continue
}
wctx, c := context.WithTimeout(ctx, wsChatWriteWait)
if err := conn.Write(wctx, websocket.MessageText, b); err != nil {
c()
cancel()
return
}
c()
}
}
}()
// Reader loop: persist sends and broadcast typing.
for {
_, raw, err := conn.Read(ctx)
if err != nil {
return
}
var in cardChatWSIn
if err := json.Unmarshal(raw, &in); err != nil {
continue
}
switch in.Type {
case "send":
if in.Body == "" {
continue
}
if _, _, _, err := db.CreateCardMessageAndNotify(cardID, userID, in.Body, hub); err != nil {
b, _ := json.Marshal(cardChatWSOut{Type: "error", Error: err.Error()})
wctx, c := context.WithTimeout(ctx, wsChatWriteWait)
_ = conn.Write(wctx, websocket.MessageText, b)
c()
}
case "typing":
hub.PublishJSON("card.typing", cardID, "", map[string]string{"user_id": userID})
}
}
}
}
// Notification HTTP handlers.
func handleListNotifications(db *DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey)
if userID == "" {
infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"})
return
}
onlyUnread := r.URL.Query().Get("unread") == "1"
limit := 50
out, err := db.ListNotifications(userID, onlyUnread, limit)
if err != nil {
serverError(w, err)
return
}
infra.HTTPJSONResponse(w, http.StatusOK, out)
}
}
func handleUnreadCount(db *DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey)
if userID == "" {
infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"})
return
}
n, err := db.CountUnreadNotifications(userID)
if err != nil {
serverError(w, err)
return
}
infra.HTTPJSONResponse(w, http.StatusOK, map[string]int{"count": n})
}
}
func handleMarkNotificationRead(db *DB, hub *EventHub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey)
if userID == "" {
infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"})
return
}
id := r.PathValue("id")
if err := db.MarkNotificationRead(userID, id); err != nil {
serverError(w, err)
return
}
if hub != nil {
hub.PublishJSON("notification.read", "", userID, map[string]string{"id": id})
}
w.WriteHeader(http.StatusNoContent)
}
}
func handleMarkAllNotificationsRead(db *DB, hub *EventHub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
userID, _ := infra.UserIDFromContext(r.Context(), userCtxKey)
if userID == "" {
infra.HTTPErrorResponse(w, infra.HTTPError{Status: http.StatusUnauthorized, Code: "unauthorized", Message: "session required"})
return
}
n, err := db.MarkAllNotificationsRead(userID)
if err != nil {
serverError(w, err)
return
}
if hub != nil {
hub.PublishJSON("notification.read_all", "", userID, map[string]int{"count": n})
}
infra.HTTPJSONResponse(w, http.StatusOK, map[string]int{"count": n})
}
}