4bce095964
Mueve duckdb_open, clickhouse_open, postgres_open, matrix_* y keyring_token_store
del paquete monolitico functions/infra a subpaquetes propios
(functions/infra/{duckdb,clickhouse,postgres,matrix,keyring}). El paquete infra ya
no importa los drivers (go-duckdb, clickhouse-go, pgx, mautrix, go-keyring), por lo
que las apps que solo usan funciones ligeras (process, cron, http, sqlite) dejan de
arrastrarlos. Reduccion de binarios: dag_engine 72->10MB, registry_api 70->8.7MB,
services_api 70->9MB, call_monitor 68->6.6MB, sqlite_api 70->8.9MB.
Los IDs del registry se mantienen estables (domain: infra en frontmatter). Se
preservan los build tags goolm/libolm de matrix_crypto_init.
Tambien corrige TestSSEHandler: el test leia el body con un unico Read() que con
HTTP chunked solo capturaba el primer evento; ahora usa io.ReadAll hasta EOF.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
367 lines
10 KiB
Go
367 lines
10 KiB
Go
package matrix
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"maunium.net/go/mautrix"
|
|
"maunium.net/go/mautrix/event"
|
|
"maunium.net/go/mautrix/id"
|
|
)
|
|
|
|
// MatrixSyncEvent es un evento normalizado emitido por MatrixSyncService.
|
|
// Cubre mensajes, pertenencia a sala, redacciones, reacciones, tipeo y estado.
|
|
type MatrixSyncEvent struct {
|
|
Type string `json:"type"` // "message" | "membership" | "redaction" | "reaction" | "edit" | "encrypted" | "presence" | "typing" | "read_receipt" | "room_state"
|
|
RoomID string `json:"room_id"` // ID de la sala (vacio para presencia global)
|
|
EventID string `json:"event_id"` // event_id unico Matrix (vacio para eventos efimeros)
|
|
Sender string `json:"sender"` // MXID del emisor (vacio para eventos efimeros)
|
|
Ts int64 `json:"ts"` // origin_server_ts en milisegundos
|
|
Body string `json:"body,omitempty"` // contenido de texto del evento (mensajes)
|
|
Raw interface{} `json:"raw,omitempty"` // *event.Event original para acceso completo
|
|
}
|
|
|
|
// MatrixSyncServiceConfig configura el servicio de sync loop de Matrix.
|
|
type MatrixSyncServiceConfig struct {
|
|
// Client es el *mautrix.Client ya inicializado con credenciales.
|
|
// Obligatorio.
|
|
Client *mautrix.Client
|
|
|
|
// InitialBackoffMS es el tiempo inicial de espera entre reintentos tras error (ms).
|
|
// Default: 1000 (1 segundo).
|
|
InitialBackoffMS int
|
|
|
|
// MaxBackoffMS es el techo del backoff exponencial (ms).
|
|
// Default: 60000 (60 segundos).
|
|
MaxBackoffMS int
|
|
|
|
// ChannelBuffer es la capacidad del canal Events.
|
|
// Si el consumer va lento y el buffer se llena, el sync se bloquea hasta
|
|
// que el consumer drene. Default: 256.
|
|
ChannelBuffer int
|
|
}
|
|
|
|
// MatrixSyncServiceHandle es el handle devuelto por MatrixSyncService.
|
|
type MatrixSyncServiceHandle struct {
|
|
// Events es el canal de eventos normalizados (cierra al Stop).
|
|
Events <-chan MatrixSyncEvent
|
|
|
|
// Errors recibe errores transitorios (red, 5xx, etc.).
|
|
// No fatal: el servicio reintenta con backoff. El caller decide si actuar.
|
|
// El canal cierra al Stop.
|
|
Errors <-chan error
|
|
|
|
// Stop cancela el sync loop de forma limpia e idempotente.
|
|
// Cierra Events y Errors. Seguro llamar varias veces.
|
|
Stop func()
|
|
}
|
|
|
|
// matrixSyncerWrapper envuelve DefaultSyncer para interceptar OnFailedSync
|
|
// e inyectar nuestro backoff exponencial y emision de errores al canal.
|
|
type matrixSyncerWrapper struct {
|
|
*mautrix.DefaultSyncer
|
|
errCh chan<- error
|
|
innerCtx context.Context
|
|
backoffMs *int
|
|
initialMS int
|
|
maxMS int
|
|
lastSyncOK *time.Time
|
|
}
|
|
|
|
// OnFailedSync implementa mautrix.Syncer. Emite el error al canal y devuelve
|
|
// el proximo backoff. Para errores fatales (401, M_FORBIDDEN) devuelve el
|
|
// backoff maximo y emite al canal — el caller decide via Stop().
|
|
func (w *matrixSyncerWrapper) OnFailedSync(_ *mautrix.RespSync, err error) (time.Duration, error) {
|
|
if w.innerCtx.Err() != nil {
|
|
return 0, fmt.Errorf("matrix_sync_service: context cancelado")
|
|
}
|
|
|
|
// Emitir error al canal de forma no-bloqueante
|
|
select {
|
|
case w.errCh <- fmt.Errorf("matrix_sync_service: %w", err):
|
|
default:
|
|
}
|
|
|
|
// Reset backoff si el ultimo sync exitoso fue reciente
|
|
if time.Since(*w.lastSyncOK) < 30*time.Second {
|
|
*w.backoffMs = w.initialMS
|
|
}
|
|
|
|
// Calcular duracion de espera
|
|
wait := time.Duration(*w.backoffMs) * time.Millisecond
|
|
|
|
// Backoff exponencial con techo
|
|
*w.backoffMs *= 2
|
|
if *w.backoffMs > w.maxMS {
|
|
*w.backoffMs = w.maxMS
|
|
}
|
|
|
|
// Para errores fatales, esperar el maximo pero no retornar error
|
|
// (dejamos al caller decidir via Stop)
|
|
if isFatalMatrixError(err) {
|
|
return time.Duration(w.maxMS) * time.Millisecond, nil
|
|
}
|
|
|
|
return wait, nil
|
|
}
|
|
|
|
// GetFilterJSON delega al DefaultSyncer.
|
|
func (w *matrixSyncerWrapper) GetFilterJSON(userID id.UserID) *mautrix.Filter {
|
|
return w.DefaultSyncer.GetFilterJSON(userID)
|
|
}
|
|
|
|
// ProcessResponse delega al DefaultSyncer. Actualiza lastSyncOK en exito.
|
|
func (w *matrixSyncerWrapper) ProcessResponse(ctx context.Context, resp *mautrix.RespSync, since string) error {
|
|
err := w.DefaultSyncer.ProcessResponse(ctx, resp, since)
|
|
if err == nil {
|
|
now := time.Now()
|
|
*w.lastSyncOK = now
|
|
}
|
|
return err
|
|
}
|
|
|
|
// MatrixSyncService arranca el sync loop de mautrix contra Synapse en background.
|
|
// Registra handlers para los tipos de evento mas comunes y los emite via canal.
|
|
// Implementa reconnect con backoff exponencial para errores transitorios.
|
|
//
|
|
// Requiere un *mautrix.Client ya inicializado (ver matrix_client_init).
|
|
// Opcionalmente combinar con matrix_crypto_init para descifrar m.room.encrypted.
|
|
//
|
|
// La goroutine interna vive hasta que ctx sea cancelado o se llame Stop.
|
|
// Ambas acciones cierran los canales Events y Errors.
|
|
func MatrixSyncService(ctx context.Context, cfg MatrixSyncServiceConfig) (*MatrixSyncServiceHandle, error) {
|
|
if cfg.Client == nil {
|
|
return nil, fmt.Errorf("matrix_sync_service: Client no puede ser nil")
|
|
}
|
|
|
|
// Aplicar defaults
|
|
initialBackoff := cfg.InitialBackoffMS
|
|
if initialBackoff <= 0 {
|
|
initialBackoff = 1000
|
|
}
|
|
maxBackoff := cfg.MaxBackoffMS
|
|
if maxBackoff <= 0 {
|
|
maxBackoff = 60000
|
|
}
|
|
bufSize := cfg.ChannelBuffer
|
|
if bufSize <= 0 {
|
|
bufSize = 256
|
|
}
|
|
|
|
// Context cancelable derivado del pasado
|
|
innerCtx, cancel := context.WithCancel(ctx)
|
|
|
|
// Channels
|
|
evtCh := make(chan MatrixSyncEvent, bufSize)
|
|
errCh := make(chan error, 8)
|
|
|
|
// Stop idempotente via sync.Once
|
|
var once sync.Once
|
|
stopFn := func() {
|
|
once.Do(func() {
|
|
cancel()
|
|
})
|
|
}
|
|
|
|
// Estado de backoff compartido con el wrapper
|
|
backoffMs := initialBackoff
|
|
lastSyncOK := time.Now()
|
|
|
|
// Configurar el Syncer: usar DefaultSyncer base (existente o nuevo)
|
|
var baseSyncer *mautrix.DefaultSyncer
|
|
if ds, ok := cfg.Client.Syncer.(*mautrix.DefaultSyncer); ok {
|
|
baseSyncer = ds
|
|
} else {
|
|
baseSyncer = mautrix.NewDefaultSyncer()
|
|
}
|
|
|
|
// Crear wrapper que intercepta OnFailedSync
|
|
wrapper := &matrixSyncerWrapper{
|
|
DefaultSyncer: baseSyncer,
|
|
errCh: errCh,
|
|
innerCtx: innerCtx,
|
|
backoffMs: &backoffMs,
|
|
initialMS: initialBackoff,
|
|
maxMS: maxBackoff,
|
|
lastSyncOK: &lastSyncOK,
|
|
}
|
|
cfg.Client.Syncer = wrapper
|
|
|
|
// Helper: emitir evento de forma no-bloqueante respetando ctx
|
|
emit := func(ev MatrixSyncEvent) {
|
|
select {
|
|
case evtCh <- ev:
|
|
case <-innerCtx.Done():
|
|
}
|
|
}
|
|
|
|
// Helper: extraer body de texto de Content.VeryRaw
|
|
extractBody := func(evt *event.Event) string {
|
|
raw := evt.Content.VeryRaw
|
|
if raw == nil {
|
|
return ""
|
|
}
|
|
var m map[string]interface{}
|
|
if err := json.Unmarshal(raw, &m); err != nil {
|
|
return ""
|
|
}
|
|
if b, ok := m["body"].(string); ok {
|
|
return b
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// Registrar event handlers sobre el DefaultSyncer base
|
|
|
|
// m.room.message — mensajes de texto, imagen, archivo
|
|
baseSyncer.OnEventType(event.EventMessage, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "message",
|
|
RoomID: evt.RoomID.String(),
|
|
EventID: evt.ID.String(),
|
|
Sender: evt.Sender.String(),
|
|
Ts: evt.Timestamp,
|
|
Body: extractBody(evt),
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.room.encrypted — mensajes cifrados (crypto helper los descifra si esta init)
|
|
baseSyncer.OnEventType(event.EventEncrypted, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "encrypted",
|
|
RoomID: evt.RoomID.String(),
|
|
EventID: evt.ID.String(),
|
|
Sender: evt.Sender.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.room.redaction — redacciones de mensajes
|
|
baseSyncer.OnEventType(event.EventRedaction, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "redaction",
|
|
RoomID: evt.RoomID.String(),
|
|
EventID: evt.ID.String(),
|
|
Sender: evt.Sender.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.reaction — reacciones emoji
|
|
baseSyncer.OnEventType(event.EventReaction, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "reaction",
|
|
RoomID: evt.RoomID.String(),
|
|
EventID: evt.ID.String(),
|
|
Sender: evt.Sender.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.room.member — cambios de pertenencia a sala
|
|
baseSyncer.OnEventType(event.StateMember, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "membership",
|
|
RoomID: evt.RoomID.String(),
|
|
EventID: evt.ID.String(),
|
|
Sender: evt.Sender.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.typing — efimero: quien esta escribiendo en una sala
|
|
baseSyncer.OnEventType(event.EphemeralEventTyping, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "typing",
|
|
RoomID: evt.RoomID.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.receipt — read receipts
|
|
baseSyncer.OnEventType(event.EphemeralEventReceipt, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "read_receipt",
|
|
RoomID: evt.RoomID.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// m.presence — presencia de usuarios
|
|
baseSyncer.OnEventType(event.EphemeralEventPresence, func(_ context.Context, evt *event.Event) {
|
|
emit(MatrixSyncEvent{
|
|
Type: "presence",
|
|
Sender: evt.Sender.String(),
|
|
Ts: evt.Timestamp,
|
|
Raw: evt,
|
|
})
|
|
})
|
|
|
|
// Goroutine principal
|
|
// SyncWithContext ya es un loop bloqueante que incluye retry via OnFailedSync.
|
|
// Esta goroutine solo reinicia si SyncWithContext retorna con error inesperado.
|
|
go func() {
|
|
defer func() {
|
|
cancel()
|
|
close(evtCh)
|
|
close(errCh)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-innerCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
err := cfg.Client.SyncWithContext(innerCtx)
|
|
|
|
// ctx cancelado = salida limpia
|
|
if innerCtx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
// SyncWithContext retorna nil si otro Sync() lo cancelo
|
|
if err == nil {
|
|
return
|
|
}
|
|
|
|
// Cualquier otro error: pequeno delay antes de reiniciar
|
|
select {
|
|
case <-innerCtx.Done():
|
|
return
|
|
case <-time.After(time.Duration(initialBackoff) * time.Millisecond):
|
|
}
|
|
}
|
|
}()
|
|
|
|
return &MatrixSyncServiceHandle{
|
|
Events: evtCh,
|
|
Errors: errCh,
|
|
Stop: stopFn,
|
|
}, nil
|
|
}
|
|
|
|
// isFatalMatrixError devuelve true si el error indica que no tiene sentido
|
|
// reintentar (token invalido, forbidden).
|
|
func isFatalMatrixError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
msg := err.Error()
|
|
return strings.Contains(msg, "M_UNKNOWN_TOKEN") ||
|
|
strings.Contains(msg, "M_FORBIDDEN") ||
|
|
strings.Contains(msg, "401")
|
|
}
|