package main import ( "context" "crypto/rand" "encoding/hex" "errors" "fmt" "path/filepath" "sync" "time" "fn-registry/projects/element_agents/apps/matrix_client_pc/internal/infra" "github.com/wailsapp/wails/v2/pkg/runtime" "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" ) // Constants are operator-configurable later via settings UI. Hardcoded for issue 0147 MVP. const ( homeserverURL = "https://matrix-af2f3d.organic-machine.com" masIssuer = "https://auth-af2f3d.organic-machine.com/" masClientID = "3DC4XQ2ZKN2TJ0BYVJ54FK7M6Y" loopbackPort = 8765 keyringServiceName = "fn_registry.matrix_client_pc" oidcTimeoutSeconds = 300 ) var defaultScopes = []string{ "openid", "urn:matrix:org.matrix.msc2967.client:api:*", } // MatrixService is bound to the Wails frontend. type MatrixService struct { ctx context.Context mu sync.Mutex store *infra.KeyringTokenStore client *mautrix.Client sync *infra.MatrixSyncServiceHandle crypto *infra.MatrixCryptoInitResult userID string lastError string // last surfaced error message (for diagnostics panel) errorTs time.Time } func NewMatrixService() *MatrixService { return &MatrixService{ store: infra.NewKeyringTokenStore(keyringServiceName), } } func (s *MatrixService) SetContext(ctx context.Context) { s.ctx = ctx } // SessionView is the safe-to-send JSON for the frontend (no tokens). type SessionView struct { UserID string `json:"user_id"` DeviceID string `json:"device_id"` HomeserverURL string `json:"homeserver_url"` HasToken bool `json:"has_token"` ExpiresAt string `json:"expires_at,omitempty"` } // MatrixEvent is the exportable, JSON-friendly shape of a Matrix event for the frontend. type MatrixEvent struct { EventID string `json:"event_id"` RoomID string `json:"room_id"` Sender string `json:"sender"` Type string `json:"type"` Ts int64 `json:"ts"` Body string `json:"body,omitempty"` EncryptedRaw bool `json:"encrypted_raw"` } // SyncEventView is what we emit through Wails events. Mirrors MatrixSyncEvent but without // the raw event pointer that wouldn't survive JSON serialization. type SyncEventView struct { Type string `json:"type"` RoomID string `json:"room_id"` EventID string `json:"event_id"` Sender string `json:"sender"` Ts int64 `json:"ts"` Body string `json:"body,omitempty"` } // Login launches the OAuth2 PKCE flow against MAS. Blocks until completion or timeout. // Returns the user_id of the authenticated session. func (s *MatrixService) Login() (string, error) { s.mu.Lock() defer s.mu.Unlock() // Generate a fresh client-side device_id and request a device-bound scope // so MAS issues a token tied to this device. Without this scope MAS does // NOT bind the token to a device, whoami returns empty device_id, and // MatrixCryptoInit hangs because device-keys upload has nowhere to land. // MSC2967: urn:matrix:org.matrix.msc2967.client:device:<10-char-id> deviceIDForLogin := generateDeviceID() scopes := append([]string{}, defaultScopes...) scopes = append(scopes, "urn:matrix:org.matrix.msc2967.client:device:"+deviceIDForLogin) logInfo("Login start", "client_id", masClientID, "loopback", loopbackPort, "issuer", masIssuer, "device_id", deviceIDForLogin) cfg := infra.MasOidcLoopbackConfig{ Issuer: masIssuer, ClientID: masClientID, Scopes: scopes, LoopbackPort: loopbackPort, OpenBrowser: true, TimeoutSeconds: oidcTimeoutSeconds, } res, err := infra.MasOidcLoopback(cfg) if err != nil { s.recordError(fmt.Errorf("oidc loopback: %w", err)) return "", fmt.Errorf("oidc: %w", err) } logInfo("oidc loopback OK", "token_type", res.TokenType, "expires_in", res.ExpiresIn, "scope", res.Scope) // Pre-fetch user_id by hitting /whoami directly (mautrix requires UserID at NewClient). userID, deviceID, err := whoami(s.ctx, homeserverURL, res.AccessToken) if err != nil { s.recordError(fmt.Errorf("whoami after oidc: %w", err)) return "", fmt.Errorf("whoami: %w", err) } // Fallback: some MAS deployments don't echo device_id in /whoami even when // the token IS device-bound. We requested a specific device: scope, so // the binding exists — use that id as the canonical device_id. if deviceID == "" { deviceID = deviceIDForLogin logWarn("whoami returned empty device_id — using client-generated id from device-scope", "device_id", deviceID) } logInfo("whoami OK", "user_id", userID, "device_id", deviceID) clientCfg := infra.MatrixClientInitConfig{ HomeserverURL: homeserverURL, UserID: userID, DeviceID: deviceID, AccessToken: res.AccessToken, StoreDir: userStoreDir(userID), EnableCrypto: false, // crypto goes through MatrixCryptoInit in Start() } if _, err := infra.MatrixClientInit(clientCfg); err != nil { return "", fmt.Errorf("matrix init: %w", err) } tok := infra.Token{ AccessToken: res.AccessToken, RefreshToken: res.RefreshToken, UserID: userID, DeviceID: deviceID, HomeserverURL: homeserverURL, Issuer: masIssuer, ClientID: masClientID, } if res.ExpiresIn > 0 { tok.ExpiresAt = time.Now().Add(time.Duration(res.ExpiresIn) * time.Second) } if err := s.store.Save(userID, tok); err != nil { logError("keyring save failed", "err", err, "user_id", userID) return "", fmt.Errorf("keyring save: %w", err) } if err := writeLastUser(userID); err != nil { logWarn("write last_user.txt failed (non-fatal)", "err", err) } logInfo("login complete + token persisted", "user_id", userID) return userID, nil } // GetLastUserID returns the last-logged-in user ID persisted in /matrix_client_pc/last_user.txt. // Empty string if never logged in or if file unreadable. func (s *MatrixService) GetLastUserID() string { return readLastUser() } // GetSession returns the persisted session for the given user_id. func (s *MatrixService) GetSession(userID string) (*SessionView, error) { if userID == "" { return nil, errors.New("user_id required (v0.1.0 multi-account index TODO)") } tok, err := s.store.Load(userID) if err != nil { if errors.Is(err, infra.ErrNotFound) { return nil, nil } return nil, fmt.Errorf("keyring load: %w", err) } view := &SessionView{ UserID: tok.UserID, DeviceID: tok.DeviceID, HomeserverURL: tok.HomeserverURL, HasToken: tok.AccessToken != "", } if !tok.ExpiresAt.IsZero() { view.ExpiresAt = tok.ExpiresAt.Format(time.RFC3339) } return view, nil } // Logout deletes the persisted token + stops sync. func (s *MatrixService) Logout(userID string) error { s.mu.Lock() defer s.mu.Unlock() if userID == "" { return errors.New("user_id required") } if s.sync != nil { s.sync.Stop() s.sync = nil } s.client = nil s.crypto = nil s.userID = "" if err := clearLastUser(); err != nil { logWarn("clear last_user.txt failed (non-fatal)", "err", err) } return s.store.Delete(userID) } // Diagnostics is a snapshot of the live Matrix service state, used by the frontend // "comprobar chats" panel. Safe to call any time (returns zero values if not started). type Diagnostics struct { Started bool `json:"started"` UserID string `json:"user_id"` HomeserverURL string `json:"homeserver_url"` ClientReady bool `json:"client_ready"` CryptoInitialized bool `json:"crypto_initialized"` SyncActive bool `json:"sync_active"` RoomsCount int `json:"rooms_count"` EncryptedRooms int `json:"encrypted_rooms"` DMsCount int `json:"dms_count"` LastError string `json:"last_error,omitempty"` } // recordError stores the last error surfaced by Login/Start/Send/etc. for the // diagnostics panel + E2E server. func (s *MatrixService) recordError(err error) { if err == nil { return } s.mu.Lock() s.lastError = err.Error() s.errorTs = time.Now() s.mu.Unlock() logError("recorded error", "err", err) } // GetDiagnostics returns a live snapshot of service state + a fresh ListRooms count. func (s *MatrixService) GetDiagnostics() Diagnostics { s.mu.Lock() d := Diagnostics{ Started: s.sync != nil, UserID: s.userID, HomeserverURL: homeserverURL, ClientReady: s.client != nil, CryptoInitialized: s.crypto != nil, SyncActive: s.sync != nil, LastError: s.lastError, } client := s.client s.mu.Unlock() if client != nil { rooms, err := infra.MatrixRoomList(s.ctx, infra.MatrixRoomListConfig{Client: client}) if err != nil { d.LastError = err.Error() logWarn("diagnostics: room list error", "err", err) } else { d.RoomsCount = len(rooms) for _, r := range rooms { if r.IsEncrypted { d.EncryptedRooms++ } if r.IsDirect { d.DMsCount++ } } } } logInfo("GetDiagnostics", "rooms", d.RoomsCount, "encrypted", d.EncryptedRooms, "dms", d.DMsCount) return d } // Stop shuts down the sync loop without deleting credentials. Safe to call multiple times. func (s *MatrixService) Stop() { s.mu.Lock() defer s.mu.Unlock() if s.sync != nil { s.sync.Stop() s.sync = nil } } // StartNoCrypto initializes the Matrix client + sync loop WITHOUT E2EE. // Useful for E2E tests + admin tokens (which lack MAS OAuth session and can't // complete the cryptohelper upload). Encrypted rooms will show as "Encrypted" // placeholder bubbles; unencrypted rooms work normally. func (s *MatrixService) StartNoCrypto(userID string) error { return s.startInternal(userID, true) } // Start initializes the Matrix client + crypto + sync loop for the given user. // Must be called after Login() or after a successful GetSession() for a returning user. // Idempotent: safe to call multiple times for the same user. func (s *MatrixService) Start(userID string) error { return s.startInternal(userID, false) } func (s *MatrixService) startInternal(userID string, skipCrypto bool) error { s.mu.Lock() defer s.mu.Unlock() if userID == "" { return errors.New("user_id required") } // Idempotent for same user if s.sync != nil && s.userID == userID { return nil } // Different user or restart: stop previous if s.sync != nil { s.sync.Stop() s.sync = nil } logInfo("Start invoked", "user_id", userID) tok, err := s.store.Load(userID) if err != nil { logError("keyring load failed", "err", err, "user_id", userID) return fmt.Errorf("keyring load: %w", err) } logInfo("token loaded from keyring", "user_id", tok.UserID, "device_id", tok.DeviceID, "homeserver", tok.HomeserverURL, "client_id", tok.ClientID, "has_refresh", tok.RefreshToken != "", "expires_at", tok.ExpiresAt, "now", time.Now(), ) storeDir := userStoreDir(userID) clientCfg := infra.MatrixClientInitConfig{ HomeserverURL: tok.HomeserverURL, UserID: tok.UserID, DeviceID: tok.DeviceID, AccessToken: tok.AccessToken, StoreDir: storeDir, EnableCrypto: false, } clientRes, err := infra.MatrixClientInit(clientCfg) if err != nil { logError("matrix client init failed (token rejected by Synapse)", "err", err, "user_id", userID, "hint", "Token may be stale — call Logout(user_id) then Login() again", ) return fmt.Errorf("matrix init: %w (token rejected — re-login required)", err) } logInfo("matrix client init OK", "store_dir", storeDir, "device_id", string(clientRes.Client.DeviceID)) // Defensive: if DeviceID still empty after init, retry whoami + persist back. // Happens when keyring has stale token (saved before whoami fixed) or when // MAS-issued token's whoami response omits device_id (some servers do this). if clientRes.Client.DeviceID == "" { logWarn("client.DeviceID empty after init — retrying whoami") uid, did, werr := whoami(s.ctx, tok.HomeserverURL, tok.AccessToken) if werr != nil { logError("whoami retry failed", "err", werr) return fmt.Errorf("whoami retry: %w", werr) } if did == "" { logError("Synapse whoami returned empty device_id — MAS session likely lacks device binding", "user_id", uid, ) return fmt.Errorf("synapse whoami did not return device_id — re-login required to bind a device") } clientRes.Client.DeviceID = id.DeviceID(did) tok.DeviceID = did _ = s.store.Save(userID, *tok) logInfo("whoami retry OK + persisted", "device_id", did) } // Pickle key: load from keyring (hex), or generate fresh and persist. pickleKey, err := s.loadOrCreatePickleKey(tok) if err != nil { return fmt.Errorf("pickle key: %w", err) } cryptoStorePath := filepath.Join(storeDir, "crypto.db") if skipCrypto { logWarn("crypto init SKIPPED — encrypted rooms wont decrypt", "user_id", userID) syncRes, err := infra.MatrixSyncService(s.ctx, infra.MatrixSyncServiceConfig{ Client: clientRes.Client, }) if err != nil { s.recordError(fmt.Errorf("sync service start (no crypto): %w", err)) return fmt.Errorf("matrix sync: %w", err) } s.client = clientRes.Client s.sync = syncRes s.userID = userID go s.fanout() logInfo("StartNoCrypto complete", "user_id", userID) return nil } // Start sync FIRST so the app is usable immediately. Crypto runs best-effort // in background — if it hangs/fails, encrypted rooms show placeholder but // the app remains responsive. Plain rooms work fully either way. syncRes, err := infra.MatrixSyncService(s.ctx, infra.MatrixSyncServiceConfig{ Client: clientRes.Client, }) if err != nil { s.recordError(fmt.Errorf("sync service start: %w", err)) return fmt.Errorf("matrix sync: %w", err) } logInfo("sync service started") s.client = clientRes.Client s.sync = syncRes s.userID = userID go s.fanout() // Crypto best-effort with heartbeat + timeout. Runs OUTSIDE s.mu so a hang // here does NOT block subsequent service calls. The 45s ceiling matches // 3x the longest observed cryptohelper handshake on warm MAS. go s.tryCryptoInit(clientRes.Client, cryptoStorePath, pickleKey) logInfo("Start complete (crypto initializing in background)", "user_id", userID) return nil } // tryCryptoInit runs MatrixCryptoInit out-of-band with progress heartbeats. // Logs every 5s while pending. On success: attaches helper to client.Crypto so // future SendMessageEvent encrypts automatically. On error/timeout: logs and // proceeds — app continues to work on plain rooms; encrypted rooms show as // EncryptedRaw=true and Send to them returns M_FORBIDDEN until crypto recovers. func (s *MatrixService) tryCryptoInit(client *mautrix.Client, storePath string, pickleKey []byte) { const initTimeout = 45 * time.Second const beatEvery = 5 * time.Second logInfo("calling MatrixCryptoInit (best-effort, background)", "store", storePath, "timeout_s", int(initTimeout/time.Second)) cryptoCtx, cancel := context.WithTimeout(s.ctx, initTimeout) defer cancel() done := make(chan struct{}) var ( cryptoRes *infra.MatrixCryptoInitResult cryptoErr error ) go func() { defer close(done) cryptoRes, cryptoErr = infra.MatrixCryptoInit(cryptoCtx, infra.MatrixCryptoInitConfig{ Client: client, StorePath: storePath, PickleKey: pickleKey, }) }() start := time.Now() tick := time.NewTicker(beatEvery) defer tick.Stop() for { select { case <-done: if cryptoErr != nil { s.recordError(fmt.Errorf("crypto init: %w (store=%s)", cryptoErr, storePath)) logError("crypto init failed — continuing without E2EE", "err", cryptoErr, "elapsed_s", int(time.Since(start)/time.Second), "crypto_store", storePath, "hint", "encrypted rooms show placeholder; plain rooms work; investigate MAS UIA on /keys/upload") return } s.mu.Lock() s.crypto = cryptoRes s.mu.Unlock() logInfo("crypto init OK", "store", storePath, "elapsed_s", int(time.Since(start)/time.Second)) return case <-tick.C: logInfo("crypto init still running", "elapsed_s", int(time.Since(start)/time.Second), "store", storePath) case <-cryptoCtx.Done(): // Timeout fires before goroutine returns; wait for goroutine to observe ctx // cancellation (usually immediate). If mautrix ignores ctx, goroutine leaks // but the app stays usable. logWarn("crypto init exceeded timeout — app continues without E2EE", "elapsed_s", int(time.Since(start)/time.Second), "store", storePath, "hint", "goroutine may continue draining if mautrix ignores ctx; encrypted rooms wont decrypt this session") return } } } // GetLogTail returns the last n lines of the app log file for the diagnostics UI. func (s *MatrixService) GetLogTail(n int) ([]string, error) { if n <= 0 { n = 200 } return TailLog(n) } // GetLogPath returns the absolute path to the log file (for the diagnostics UI). func (s *MatrixService) GetLogPath() string { if globalLogger == nil { return "" } return globalLogger.Path() } func (s *MatrixService) fanout() { if s.ctx == nil || s.sync == nil { return } events := s.sync.Events errs := s.sync.Errors for { select { case <-s.ctx.Done(): return case ev, ok := <-events: if !ok { return } view := SyncEventView{ Type: ev.Type, RoomID: ev.RoomID, EventID: ev.EventID, Sender: ev.Sender, Ts: ev.Ts, Body: ev.Body, } runtime.EventsEmit(s.ctx, "matrix:event", view) case e, ok := <-errs: if !ok { return } if e != nil { runtime.EventsEmit(s.ctx, "matrix:error", e.Error()) } } } } // ListRooms returns the joined rooms with summary metadata. func (s *MatrixService) ListRooms() ([]infra.RoomSummary, error) { s.mu.Lock() client := s.client s.mu.Unlock() if client == nil { return nil, errors.New("matrix service not started — call Start() first") } return infra.MatrixRoomList(s.ctx, infra.MatrixRoomListConfig{Client: client}) } // LoadTimeline fetches the last N messages of a room (most recent first). func (s *MatrixService) LoadTimeline(roomID string, limit int) ([]MatrixEvent, error) { s.mu.Lock() client := s.client s.mu.Unlock() if client == nil { return nil, errors.New("matrix service not started — call Start() first") } if limit <= 0 { limit = 50 } msgs, err := client.Messages(s.ctx, id.RoomID(roomID), "", "", mautrix.DirectionBackward, nil, limit) if err != nil { return nil, fmt.Errorf("messages: %w", err) } out := make([]MatrixEvent, 0, len(msgs.Chunk)) for _, evt := range msgs.Chunk { out = append(out, eventToView(evt)) } return out, nil } func eventToView(evt *event.Event) MatrixEvent { view := MatrixEvent{ EventID: evt.ID.String(), RoomID: evt.RoomID.String(), Sender: evt.Sender.String(), Type: evt.Type.String(), Ts: evt.Timestamp, } if evt.Type == event.EventEncrypted { view.EncryptedRaw = true return view } // Try to parse and extract body for messages. if evt.Type == event.EventMessage { _ = evt.Content.ParseRaw(evt.Type) if mc, ok := evt.Content.Parsed.(*event.MessageEventContent); ok && mc != nil { view.Body = mc.Body } } return view } // SendText sends a plain text message to the given room. func (s *MatrixService) SendText(roomID, body string) (string, error) { s.mu.Lock() client := s.client s.mu.Unlock() if client == nil { return "", errors.New("matrix service not started — call Start() first") } evID, err := infra.MatrixSendText(s.ctx, client, id.RoomID(roomID), body) if err != nil { return "", err } return string(evID), nil } // SendMarkdown sends a markdown-formatted message (rendered + sanitized HTML). func (s *MatrixService) SendMarkdown(roomID, md string) (string, error) { s.mu.Lock() client := s.client s.mu.Unlock() if client == nil { return "", errors.New("matrix service not started — call Start() first") } evID, err := infra.MatrixSendMarkdown(s.ctx, client, id.RoomID(roomID), md) if err != nil { return "", err } return string(evID), nil } // generateDeviceID produces a 10-character uppercase alphanumeric device_id // suitable for MAS MSC2967 device-scope. Matches the format Element clients // use (e.g. RZXAYCAWAY). func generateDeviceID() string { const alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" buf := make([]byte, 10) if _, err := rand.Read(buf); err != nil { // Fallback to time-based id; rand.Read on Windows is reliable so this is rare. return fmt.Sprintf("DEV%07d", time.Now().UnixNano()%10000000) } for i, b := range buf { buf[i] = alphabet[int(b)%len(alphabet)] } return string(buf) } // loadOrCreatePickleKey returns the 32-byte pickle key for the user. // If absent in keyring, generates fresh random bytes, hex-encodes them, persists, and returns. func (s *MatrixService) loadOrCreatePickleKey(tok *infra.Token) ([]byte, error) { if tok.PickleKeyHex != "" { key, err := hex.DecodeString(tok.PickleKeyHex) if err == nil && len(key) == 32 { return key, nil } // Malformed key in keyring — fall through and regenerate. } buf := make([]byte, 32) if _, err := rand.Read(buf); err != nil { return nil, fmt.Errorf("rand: %w", err) } tok.PickleKeyHex = hex.EncodeToString(buf) if err := s.store.Save(tok.UserID, *tok); err != nil { return nil, fmt.Errorf("save pickle key: %w", err) } return buf, nil }