Files
fn_registry/functions/infra/matrix/matrix_sync_service.go
T
egutierrez e22c33ee6d refactor(infra): split de drivers pesados a subpaquetes + fix TestSSEHandler
Mueve duckdb_open, clickhouse_open, postgres_open, matrix_* y keyring_token_store
del paquete monolitico functions/infra a subpaquetes propios
(functions/infra/{duckdb,clickhouse,postgres,matrix,keyring}). El paquete infra ya
no importa los drivers (go-duckdb, clickhouse-go, pgx, mautrix, go-keyring), por lo
que las apps que solo usan funciones ligeras (process, cron, http, sqlite) dejan de
arrastrarlos. Reduccion de binarios: dag_engine 72->10MB, registry_api 70->8.7MB,
services_api 70->9MB, call_monitor 68->6.6MB, sqlite_api 70->8.9MB.

Los IDs del registry se mantienen estables (domain: infra en frontmatter). Se
preservan los build tags goolm/libolm de matrix_crypto_init.

Tambien corrige TestSSEHandler: el test leia el body con un unico Read() que con
HTTP chunked solo capturaba el primer evento; ahora usa io.ReadAll hasta EOF.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 23:48:59 +02:00

367 lines
10 KiB
Go

package matrix
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
// MatrixSyncEvent es un evento normalizado emitido por MatrixSyncService.
// Cubre mensajes, pertenencia a sala, redacciones, reacciones, tipeo y estado.
type MatrixSyncEvent struct {
Type string `json:"type"` // "message" | "membership" | "redaction" | "reaction" | "edit" | "encrypted" | "presence" | "typing" | "read_receipt" | "room_state"
RoomID string `json:"room_id"` // ID de la sala (vacio para presencia global)
EventID string `json:"event_id"` // event_id unico Matrix (vacio para eventos efimeros)
Sender string `json:"sender"` // MXID del emisor (vacio para eventos efimeros)
Ts int64 `json:"ts"` // origin_server_ts en milisegundos
Body string `json:"body,omitempty"` // contenido de texto del evento (mensajes)
Raw interface{} `json:"raw,omitempty"` // *event.Event original para acceso completo
}
// MatrixSyncServiceConfig configura el servicio de sync loop de Matrix.
type MatrixSyncServiceConfig struct {
// Client es el *mautrix.Client ya inicializado con credenciales.
// Obligatorio.
Client *mautrix.Client
// InitialBackoffMS es el tiempo inicial de espera entre reintentos tras error (ms).
// Default: 1000 (1 segundo).
InitialBackoffMS int
// MaxBackoffMS es el techo del backoff exponencial (ms).
// Default: 60000 (60 segundos).
MaxBackoffMS int
// ChannelBuffer es la capacidad del canal Events.
// Si el consumer va lento y el buffer se llena, el sync se bloquea hasta
// que el consumer drene. Default: 256.
ChannelBuffer int
}
// MatrixSyncServiceHandle es el handle devuelto por MatrixSyncService.
type MatrixSyncServiceHandle struct {
// Events es el canal de eventos normalizados (cierra al Stop).
Events <-chan MatrixSyncEvent
// Errors recibe errores transitorios (red, 5xx, etc.).
// No fatal: el servicio reintenta con backoff. El caller decide si actuar.
// El canal cierra al Stop.
Errors <-chan error
// Stop cancela el sync loop de forma limpia e idempotente.
// Cierra Events y Errors. Seguro llamar varias veces.
Stop func()
}
// matrixSyncerWrapper envuelve DefaultSyncer para interceptar OnFailedSync
// e inyectar nuestro backoff exponencial y emision de errores al canal.
type matrixSyncerWrapper struct {
*mautrix.DefaultSyncer
errCh chan<- error
innerCtx context.Context
backoffMs *int
initialMS int
maxMS int
lastSyncOK *time.Time
}
// OnFailedSync implementa mautrix.Syncer. Emite el error al canal y devuelve
// el proximo backoff. Para errores fatales (401, M_FORBIDDEN) devuelve el
// backoff maximo y emite al canal — el caller decide via Stop().
func (w *matrixSyncerWrapper) OnFailedSync(_ *mautrix.RespSync, err error) (time.Duration, error) {
if w.innerCtx.Err() != nil {
return 0, fmt.Errorf("matrix_sync_service: context cancelado")
}
// Emitir error al canal de forma no-bloqueante
select {
case w.errCh <- fmt.Errorf("matrix_sync_service: %w", err):
default:
}
// Reset backoff si el ultimo sync exitoso fue reciente
if time.Since(*w.lastSyncOK) < 30*time.Second {
*w.backoffMs = w.initialMS
}
// Calcular duracion de espera
wait := time.Duration(*w.backoffMs) * time.Millisecond
// Backoff exponencial con techo
*w.backoffMs *= 2
if *w.backoffMs > w.maxMS {
*w.backoffMs = w.maxMS
}
// Para errores fatales, esperar el maximo pero no retornar error
// (dejamos al caller decidir via Stop)
if isFatalMatrixError(err) {
return time.Duration(w.maxMS) * time.Millisecond, nil
}
return wait, nil
}
// GetFilterJSON delega al DefaultSyncer.
func (w *matrixSyncerWrapper) GetFilterJSON(userID id.UserID) *mautrix.Filter {
return w.DefaultSyncer.GetFilterJSON(userID)
}
// ProcessResponse delega al DefaultSyncer. Actualiza lastSyncOK en exito.
func (w *matrixSyncerWrapper) ProcessResponse(ctx context.Context, resp *mautrix.RespSync, since string) error {
err := w.DefaultSyncer.ProcessResponse(ctx, resp, since)
if err == nil {
now := time.Now()
*w.lastSyncOK = now
}
return err
}
// MatrixSyncService arranca el sync loop de mautrix contra Synapse en background.
// Registra handlers para los tipos de evento mas comunes y los emite via canal.
// Implementa reconnect con backoff exponencial para errores transitorios.
//
// Requiere un *mautrix.Client ya inicializado (ver matrix_client_init).
// Opcionalmente combinar con matrix_crypto_init para descifrar m.room.encrypted.
//
// La goroutine interna vive hasta que ctx sea cancelado o se llame Stop.
// Ambas acciones cierran los canales Events y Errors.
func MatrixSyncService(ctx context.Context, cfg MatrixSyncServiceConfig) (*MatrixSyncServiceHandle, error) {
if cfg.Client == nil {
return nil, fmt.Errorf("matrix_sync_service: Client no puede ser nil")
}
// Aplicar defaults
initialBackoff := cfg.InitialBackoffMS
if initialBackoff <= 0 {
initialBackoff = 1000
}
maxBackoff := cfg.MaxBackoffMS
if maxBackoff <= 0 {
maxBackoff = 60000
}
bufSize := cfg.ChannelBuffer
if bufSize <= 0 {
bufSize = 256
}
// Context cancelable derivado del pasado
innerCtx, cancel := context.WithCancel(ctx)
// Channels
evtCh := make(chan MatrixSyncEvent, bufSize)
errCh := make(chan error, 8)
// Stop idempotente via sync.Once
var once sync.Once
stopFn := func() {
once.Do(func() {
cancel()
})
}
// Estado de backoff compartido con el wrapper
backoffMs := initialBackoff
lastSyncOK := time.Now()
// Configurar el Syncer: usar DefaultSyncer base (existente o nuevo)
var baseSyncer *mautrix.DefaultSyncer
if ds, ok := cfg.Client.Syncer.(*mautrix.DefaultSyncer); ok {
baseSyncer = ds
} else {
baseSyncer = mautrix.NewDefaultSyncer()
}
// Crear wrapper que intercepta OnFailedSync
wrapper := &matrixSyncerWrapper{
DefaultSyncer: baseSyncer,
errCh: errCh,
innerCtx: innerCtx,
backoffMs: &backoffMs,
initialMS: initialBackoff,
maxMS: maxBackoff,
lastSyncOK: &lastSyncOK,
}
cfg.Client.Syncer = wrapper
// Helper: emitir evento de forma no-bloqueante respetando ctx
emit := func(ev MatrixSyncEvent) {
select {
case evtCh <- ev:
case <-innerCtx.Done():
}
}
// Helper: extraer body de texto de Content.VeryRaw
extractBody := func(evt *event.Event) string {
raw := evt.Content.VeryRaw
if raw == nil {
return ""
}
var m map[string]interface{}
if err := json.Unmarshal(raw, &m); err != nil {
return ""
}
if b, ok := m["body"].(string); ok {
return b
}
return ""
}
// Registrar event handlers sobre el DefaultSyncer base
// m.room.message — mensajes de texto, imagen, archivo
baseSyncer.OnEventType(event.EventMessage, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "message",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Body: extractBody(evt),
Raw: evt,
})
})
// m.room.encrypted — mensajes cifrados (crypto helper los descifra si esta init)
baseSyncer.OnEventType(event.EventEncrypted, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "encrypted",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.room.redaction — redacciones de mensajes
baseSyncer.OnEventType(event.EventRedaction, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "redaction",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.reaction — reacciones emoji
baseSyncer.OnEventType(event.EventReaction, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "reaction",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.room.member — cambios de pertenencia a sala
baseSyncer.OnEventType(event.StateMember, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "membership",
RoomID: evt.RoomID.String(),
EventID: evt.ID.String(),
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.typing — efimero: quien esta escribiendo en una sala
baseSyncer.OnEventType(event.EphemeralEventTyping, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "typing",
RoomID: evt.RoomID.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.receipt — read receipts
baseSyncer.OnEventType(event.EphemeralEventReceipt, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "read_receipt",
RoomID: evt.RoomID.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// m.presence — presencia de usuarios
baseSyncer.OnEventType(event.EphemeralEventPresence, func(_ context.Context, evt *event.Event) {
emit(MatrixSyncEvent{
Type: "presence",
Sender: evt.Sender.String(),
Ts: evt.Timestamp,
Raw: evt,
})
})
// Goroutine principal
// SyncWithContext ya es un loop bloqueante que incluye retry via OnFailedSync.
// Esta goroutine solo reinicia si SyncWithContext retorna con error inesperado.
go func() {
defer func() {
cancel()
close(evtCh)
close(errCh)
}()
for {
select {
case <-innerCtx.Done():
return
default:
}
err := cfg.Client.SyncWithContext(innerCtx)
// ctx cancelado = salida limpia
if innerCtx.Err() != nil {
return
}
// SyncWithContext retorna nil si otro Sync() lo cancelo
if err == nil {
return
}
// Cualquier otro error: pequeno delay antes de reiniciar
select {
case <-innerCtx.Done():
return
case <-time.After(time.Duration(initialBackoff) * time.Millisecond):
}
}
}()
return &MatrixSyncServiceHandle{
Events: evtCh,
Errors: errCh,
Stop: stopFn,
}, nil
}
// isFatalMatrixError devuelve true si el error indica que no tiene sentido
// reintentar (token invalido, forbidden).
func isFatalMatrixError(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "M_UNKNOWN_TOKEN") ||
strings.Contains(msg, "M_FORBIDDEN") ||
strings.Contains(msg, "401")
}