Files
matrix_client_pc/internal/infra/matrix_sync_service.go
T
Egutierrez 36a485ea26 feat: chat E2EE MVP - rooms list + timeline + composer + sync (issues 0148+0149+0150)
Backend extends MatrixService with Start()/Stop()/ListRooms()/LoadTimeline()/
SendText()/SendMarkdown(). On login the service initialises the crypto store
(cryptohelper, Olm/Megolm via goolm build tag) and a sync loop that fans
events out through Wails events ("matrix:event", "matrix:error"). Pickle
key is 32 random bytes hex-encoded in the OS keyring alongside the access
token, so the crypto SQLite store survives restarts.

Vendors 4 fresh helpers from fn_registry/functions/infra/:
  matrix_crypto_init.go (//go:build goolm || libolm)
  matrix_sync_service.go
  matrix_message_send.go
  matrix_room_list.go
Plus the existing 3 (mas_oidc_loopback, keyring_token_store, matrix_client_init).
go-sqlite3 driver pulled explicitly via sqlite_driver.go.

Frontend rewires HomeScreen as a 3-zone AppShell (sidebar / timeline /
composer). useMatrixRooms polls + reacts to the sync stream; useMatrixTimeline
loads the last 50 events of the selected room and appends live ones. New
components: RoomList, Timeline, EventBubble, Composer. Composer supports
plain text (default) and a markdown toggle; Enter sends, Shift+Enter newline.

wails.json now passes "build:tags": "goolm" by default. Tested with
wails build -tags goolm on linux/amd64 and windows/amd64.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 01:03:31 +02:00

367 lines
10 KiB
Go

package infra
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
// MatrixSyncEvent es un evento normalizado emitido por MatrixSyncService.
// Cubre mensajes, pertenencia a sala, redacciones, reacciones, tipeo y estado.
type MatrixSyncEvent struct {
Type string `json:"type"` // "message" | "membership" | "redaction" | "reaction" | "edit" | "encrypted" | "presence" | "typing" | "read_receipt" | "room_state"
RoomID string `json:"room_id"` // ID de la sala (vacio para presencia global)
EventID string `json:"event_id"` // event_id unico Matrix (vacio para eventos efimeros)
Sender string `json:"sender"` // MXID del emisor (vacio para eventos efimeros)
Ts int64 `json:"ts"` // origin_server_ts en milisegundos
Body string `json:"body,omitempty"` // contenido de texto del evento (mensajes)
Raw interface{} `json:"raw,omitempty"` // *event.Event original para acceso completo
}
// MatrixSyncServiceConfig configura el servicio de sync loop de Matrix.
type MatrixSyncServiceConfig struct {
// Client es el *mautrix.Client ya inicializado con credenciales.
// Obligatorio.
Client *mautrix.Client
// InitialBackoffMS es el tiempo inicial de espera entre reintentos tras error (ms).
// Default: 1000 (1 segundo).
InitialBackoffMS int
// MaxBackoffMS es el techo del backoff exponencial (ms).
// Default: 60000 (60 segundos).
MaxBackoffMS int
// ChannelBuffer es la capacidad del canal Events.
// Si el consumer va lento y el buffer se llena, el sync se bloquea hasta
// que el consumer drene. Default: 256.
ChannelBuffer int
}
// MatrixSyncServiceHandle es el handle devuelto por MatrixSyncService.
type MatrixSyncServiceHandle struct {
// Events es el canal de eventos normalizados (cierra al Stop).
Events <-chan MatrixSyncEvent
// Errors recibe errores transitorios (red, 5xx, etc.).
// No fatal: el servicio reintenta con backoff. El caller decide si actuar.
// El canal cierra al Stop.
Errors <-chan error
// Stop cancela el sync loop de forma limpia e idempotente.
// Cierra Events y Errors. Seguro llamar varias veces.
Stop func()
}
// matrixSyncerWrapper envuelve DefaultSyncer para interceptar OnFailedSync
// e inyectar nuestro backoff exponencial y emision de errores al canal.
type matrixSyncerWrapper struct {
*mautrix.DefaultSyncer
errCh chan<- error
innerCtx context.Context
backoffMs *int
initialMS int
maxMS int
lastSyncOK *time.Time
}
// OnFailedSync implementa mautrix.Syncer. Emite el error al canal y devuelve
// el proximo backoff. Para errores fatales (401, M_FORBIDDEN) devuelve el
// backoff maximo y emite al canal — el caller decide via Stop().
func (w *matrixSyncerWrapper) OnFailedSync(_ *mautrix.RespSync, err error) (time.Duration, error) {
if w.innerCtx.Err() != nil {
return 0, fmt.Errorf("matrix_sync_service: context cancelado")
}
// Emitir error al canal de forma no-bloqueante
select {
case w.errCh <- fmt.Errorf("matrix_sync_service: %w", err):
default:
}
// Reset backoff si el ultimo sync exitoso fue reciente
if time.Since(*w.lastSyncOK) < 30*time.Second {
*w.backoffMs = w.initialMS
}
// Calcular duracion de espera
wait := time.Duration(*w.backoffMs) * time.Millisecond
// Backoff exponencial con techo
*w.backoffMs *= 2
if *w.backoffMs > w.maxMS {
*w.backoffMs = w.maxMS
}
// Para errores fatales, esperar el maximo pero no retornar error
// (dejamos al caller decidir via Stop)
if isFatalMatrixError(err) {
return time.Duration(w.maxMS) * time.Millisecond, nil
}
return wait, nil
}
// GetFilterJSON delega al DefaultSyncer.
func (w *matrixSyncerWrapper) GetFilterJSON(userID id.UserID) *mautrix.Filter {
return w.DefaultSyncer.GetFilterJSON(userID)
}
// ProcessResponse delega al DefaultSyncer. Actualiza lastSyncOK en exito.
func (w *matrixSyncerWrapper) ProcessResponse(ctx context.Context, resp *mautrix.RespSync, since string) error {
err := w.DefaultSyncer.ProcessResponse(ctx, resp, since)
if err == nil {
now := time.Now()
*w.lastSyncOK = now
}
return err
}
// MatrixSyncService arranca el sync loop de mautrix contra Synapse en background.
// Registra handlers para los tipos de evento mas comunes y los emite via canal.
// Implementa reconnect con backoff exponencial para errores transitorios.
//
// Requiere un *mautrix.Client ya inicializado (ver matrix_client_init).
// Opcionalmente combinar con matrix_crypto_init para descifrar m.room.encrypted.
//
// La goroutine interna vive hasta que ctx sea cancelado o se llame Stop.
// Ambas acciones cierran los canales Events y Errors.
func MatrixSyncService(ctx context.Context, cfg MatrixSyncServiceConfig) (*MatrixSyncServiceHandle, error) {
if cfg.Client == nil {
return nil, fmt.Errorf("matrix_sync_service: Client no puede ser nil")
}
// Aplicar defaults
initialBackoff := cfg.InitialBackoffMS
if initialBackoff <= 0 {
initialBackoff = 1000
}
maxBackoff := cfg.MaxBackoffMS
if maxBackoff <= 0 {
maxBackoff = 60000
}
bufSize := cfg.ChannelBuffer
if bufSize <= 0 {
bufSize = 256
}
// Context cancelable derivado del pasado
innerCtx, cancel := context.WithCancel(ctx)
// Channels
evtCh := make(chan MatrixSyncEvent, bufSize)
errCh := make(chan error, 8)
// Stop idempotente via sync.Once
var once sync.Once
stopFn := func() {
once.Do(func() {
cancel()
})
}
// Estado de backoff compartido con el wrapper
backoffMs := initialBackoff
lastSyncOK := time.Now()
// Configurar el Syncer: usar DefaultSyncer base (existente o nuevo)
var baseSyncer *mautrix.DefaultSyncer
if ds, ok := cfg.Client.Syncer.(*mautrix.DefaultSyncer); ok {
baseSyncer = ds
} else {
baseSyncer = mautrix.NewDefaultSyncer()
}
// Crear wrapper que intercepta OnFailedSync
wrapper := &matrixSyncerWrapper{
DefaultSyncer: baseSyncer,
errCh: errCh,
innerCtx: innerCtx,
backoffMs: &backoffMs,
initialMS: initialBackoff,
maxMS: maxBackoff,
lastSyncOK: &lastSyncOK,
}
cfg.Client.Syncer = wrapper
// Helper: emitir evento de forma no-bloqueante respetando ctx
emit := func(ev MatrixSyncEvent) {
select {
case evtCh <- ev:
case <-innerCtx.Done():
}
}
// Helper: extraer body de texto de Content.VeryRaw
extractBody := func(evt *event.Event) string {
raw := evt.Content.VeryRaw
if raw == nil {
return ""
}
var m map[string]interface{}
if err := json.Unmarshal(raw, &m); err != nil {
return ""
}
if b, ok := m["body"].(string); ok {
return b
}
return ""
}
// Registrar event handlers sobre el DefaultSyncer base
// m.room.message — mensajes de texto, imagen, archivo
baseSyncer.OnEventType(event.EventMessage, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "message",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Body: extractBody(evt),
Raw: evt,
})
})
// m.room.encrypted — mensajes cifrados (crypto helper los descifra si esta init)
baseSyncer.OnEventType(event.EventEncrypted, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "encrypted",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.room.redaction — redacciones de mensajes
baseSyncer.OnEventType(event.EventRedaction, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "redaction",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.reaction — reacciones emoji
baseSyncer.OnEventType(event.EventReaction, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "reaction",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.room.member — cambios de pertenencia a sala
baseSyncer.OnEventType(event.StateMember, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "membership",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.typing — efimero: quien esta escribiendo en una sala
baseSyncer.OnEventType(event.EphemeralEventTyping, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "typing",
RoomID: evt.RoomID.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.receipt — read receipts
baseSyncer.OnEventType(event.EphemeralEventReceipt, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "read_receipt",
RoomID: evt.RoomID.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.presence — presencia de usuarios
baseSyncer.OnEventType(event.EphemeralEventPresence, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "presence",
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// Goroutine principal
// SyncWithContext ya es un loop bloqueante que incluye retry via OnFailedSync.
// Esta goroutine solo reinicia si SyncWithContext retorna con error inesperado.
go func() {
defer func() {
cancel()
close(evtCh)
close(errCh)
}()
for {
select {
case <-innerCtx.Done():
return
default:
}
err := cfg.Client.SyncWithContext(innerCtx)
// ctx cancelado = salida limpia
if innerCtx.Err() != nil {
return
}
// SyncWithContext retorna nil si otro Sync() lo cancelo
if err == nil {
return
}
// Cualquier otro error: pequeno delay antes de reiniciar
select {
case <-innerCtx.Done():
return
case <-time.After(time.Duration(initialBackoff) * time.Millisecond):
}
}
}()
return &MatrixSyncServiceHandle{
Events: evtCh,
Errors: errCh,
Stop: stopFn,
}, nil
}
// isFatalMatrixError devuelve true si el error indica que no tiene sentido
// reintentar (token invalido, forbidden).
func isFatalMatrixError(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "M_UNKNOWN_TOKEN") ||
strings.Contains(msg, "M_FORBIDDEN") ||
strings.Contains(msg, "401")
}