package main import ( "encoding/json" "fmt" "io" "net/http" "os" "strconv" "time" "fn-registry/projects/element_agents/apps/matrix_client_pc/internal/infra" ) // E2EServer exposes the MatrixService methods as a localhost HTTP API for // automated testing. Only starts when env var MATRIX_CLIENT_PC_E2E=1. // Default port 8767, override with MATRIX_CLIENT_PC_E2E_PORT. // // SECURITY: this binds 127.0.0.1 only and the env var gate prevents accidental // production exposure. Tokens are accepted via POST body and written to keyring. type E2EServer struct { svc *MatrixService } func MaybeStartE2EServer(svc *MatrixService) { if os.Getenv("MATRIX_CLIENT_PC_E2E") != "1" { return } port := os.Getenv("MATRIX_CLIENT_PC_E2E_PORT") if port == "" { port = "8767" } s := &E2EServer{svc: svc} mux := http.NewServeMux() mux.HandleFunc("/inject_token", s.handleInjectToken) mux.HandleFunc("/signin_admin", s.handleSigninAdmin) mux.HandleFunc("/wipe_session", s.handleWipeSession) mux.HandleFunc("/last_user", s.handleLastUser) mux.HandleFunc("/start", s.handleStart) mux.HandleFunc("/stop", s.handleStop) mux.HandleFunc("/wipe_crypto", s.handleWipeCrypto) mux.HandleFunc("/diagnostics", s.handleDiagnostics) mux.HandleFunc("/rooms", s.handleRooms) mux.HandleFunc("/timeline", s.handleTimeline) mux.HandleFunc("/send", s.handleSend) mux.HandleFunc("/logs", s.handleLogs) mux.HandleFunc("/ping", func(w http.ResponseWriter, _ *http.Request) { writeJSON(w, 200, map[string]any{"ok": true, "ts": time.Now().Unix()}) }) host := "127.0.0.1" if os.Getenv("MATRIX_CLIENT_PC_E2E_BIND_ALL") == "1" { host = "0.0.0.0" } addr := host + ":" + port logInfo("E2E server starting", "addr", addr) handler := corsMiddleware(mux) go func() { if err := http.ListenAndServe(addr, handler); err != nil { logError("E2E server died", "err", err) } }() } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } // corsMiddleware allows any origin (gated by MATRIX_CLIENT_PC_E2E=1 so it's // only active in dev/test mode) and handles preflight OPTIONS requests. func corsMiddleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") if r.Method == "OPTIONS" { w.WriteHeader(http.StatusNoContent) return } h.ServeHTTP(w, r) }) } type injectReq struct { AccessToken string `json:"access_token"` RefreshToken string `json:"refresh_token"` UserID string `json:"user_id"` DeviceID string `json:"device_id"` HomeserverURL string `json:"homeserver_url"` PickleKeyHex string `json:"pickle_key_hex,omitempty"` } func (s *E2EServer) handleInjectToken(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeJSON(w, 405, map[string]string{"error": "POST only"}) return } var req injectReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeJSON(w, 400, map[string]string{"error": err.Error()}) return } if req.UserID == "" || req.AccessToken == "" || req.DeviceID == "" { writeJSON(w, 400, map[string]string{"error": "user_id, access_token, device_id required"}) return } if req.HomeserverURL == "" { req.HomeserverURL = homeserverURL } tok := infra.Token{ AccessToken: req.AccessToken, RefreshToken: req.RefreshToken, UserID: req.UserID, DeviceID: req.DeviceID, HomeserverURL: req.HomeserverURL, Issuer: masIssuer, ClientID: masClientID, PickleKeyHex: req.PickleKeyHex, } if err := s.svc.store.Save(req.UserID, tok); err != nil { writeJSON(w, 500, map[string]string{"error": "keyring save: " + err.Error()}) return } if err := writeLastUser(req.UserID); err != nil { writeJSON(w, 500, map[string]string{"error": "last_user: " + err.Error()}) return } logInfo("E2E inject_token OK", "user_id", req.UserID, "device_id", req.DeviceID) writeJSON(w, 200, map[string]string{"status": "ok", "user_id": req.UserID}) } type signinAdminReq struct { AdminToken string `json:"admin_token"` UserID string `json:"user_id"` } // handleSigninAdmin takes an existing Matrix access token (admin or otherwise) // and resolves the user_id + device_id via whoami, then persists to keyring. // // With MAS enabled, the Synapse admin login endpoint is disabled. So this // helper does NOT mint a fresh token — it just bootstraps the app with a // token that already exists (e.g. from `pass matrix/synapse-admin-token`). // Token + device_id can also be provided via env (MATRIX_SYNAPSE_ADMIN_TOKEN). func (s *E2EServer) handleSigninAdmin(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeJSON(w, 405, map[string]string{"error": "POST only"}) return } var req signinAdminReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeJSON(w, 400, map[string]string{"error": err.Error()}) return } admin := req.AdminToken if admin == "" { admin = os.Getenv("MATRIX_SYNAPSE_ADMIN_TOKEN") } if admin == "" { writeJSON(w, 400, map[string]string{"error": "admin_token required (body or env MATRIX_SYNAPSE_ADMIN_TOKEN)"}) return } whoamiReq, _ := http.NewRequest("GET", homeserverURL+"/_matrix/client/v3/account/whoami", nil) whoamiReq.Header.Set("Authorization", "Bearer "+admin) hc := &http.Client{Timeout: 15 * time.Second} wresp, err := hc.Do(whoamiReq) if err != nil { writeJSON(w, 502, map[string]string{"error": "whoami http: " + err.Error()}) return } defer wresp.Body.Close() wbody, _ := io.ReadAll(wresp.Body) if wresp.StatusCode != 200 { writeJSON(w, wresp.StatusCode, map[string]string{"error": "whoami non-200", "status": strconv.Itoa(wresp.StatusCode), "body": string(wbody)}) return } var who struct { UserID string `json:"user_id"` DeviceID string `json:"device_id"` } if err := json.Unmarshal(wbody, &who); err != nil { writeJSON(w, 500, map[string]string{"error": "parse whoami", "body": string(wbody)}) return } if who.UserID == "" || who.DeviceID == "" { writeJSON(w, 500, map[string]string{"error": "user_id or device_id empty from whoami", "body": string(wbody)}) return } // Optional sanity: if caller passed user_id, verify it matches. if req.UserID != "" && req.UserID != who.UserID { writeJSON(w, 400, map[string]string{"error": fmt.Sprintf("user_id mismatch: passed %s, whoami returned %s", req.UserID, who.UserID)}) return } tok := infra.Token{ AccessToken: admin, UserID: who.UserID, DeviceID: who.DeviceID, HomeserverURL: homeserverURL, Issuer: masIssuer, ClientID: masClientID, } if err := s.svc.store.Save(who.UserID, tok); err != nil { writeJSON(w, 500, map[string]string{"error": "keyring save: " + err.Error()}) return } if err := writeLastUser(who.UserID); err != nil { writeJSON(w, 500, map[string]string{"error": "last_user: " + err.Error()}) return } logInfo("E2E signin_admin OK", "user_id", who.UserID, "device_id", who.DeviceID) writeJSON(w, 200, map[string]string{ "status": "ok", "user_id": who.UserID, "device_id": who.DeviceID, }) } type startReq struct { UserID string `json:"user_id"` } func (s *E2EServer) handleStart(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeJSON(w, 405, map[string]string{"error": "POST only"}) return } var req startReq _ = json.NewDecoder(r.Body).Decode(&req) if req.UserID == "" { req.UserID = readLastUser() } if req.UserID == "" { writeJSON(w, 400, map[string]string{"error": "user_id required (or set last_user.txt)"}) return } skipCrypto := r.URL.Query().Get("skip_crypto") == "true" var err error if skipCrypto { err = s.svc.StartNoCrypto(req.UserID) } else { err = s.svc.Start(req.UserID) } if err != nil { writeJSON(w, 500, map[string]string{"error": err.Error()}) return } writeJSON(w, 200, map[string]any{"status": "ok", "user_id": req.UserID, "skip_crypto": skipCrypto}) } func (s *E2EServer) handleWipeCrypto(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeJSON(w, 405, map[string]string{"error": "POST only"}) return } user := r.URL.Query().Get("user_id") if user == "" { user = readLastUser() } if user == "" { writeJSON(w, 400, map[string]string{"error": "user_id required"}) return } dir := userStoreDir(user) cryptoDB := dir + "/crypto.db" _ = os.Remove(cryptoDB) _ = os.Remove(cryptoDB + "-shm") _ = os.Remove(cryptoDB + "-wal") logInfo("E2E wipe_crypto", "dir", dir) writeJSON(w, 200, map[string]string{"status": "ok", "wiped": cryptoDB}) } func (s *E2EServer) handleStop(w http.ResponseWriter, _ *http.Request) { s.svc.Stop() writeJSON(w, 200, map[string]string{"status": "ok"}) } // handleLastUser returns the persisted last user_id from last_user.txt, or // empty if the file is missing. Used by the shim's GetLastUserID so the // frontend lands on LoginScreen after /wipe_session even when a session is // still active in memory. func (s *E2EServer) handleLastUser(w http.ResponseWriter, _ *http.Request) { writeJSON(w, 200, map[string]string{"user_id": readLastUser()}) } // handleWipeSession clears last_user.txt so the frontend lands on LoginScreen // on next load. Keyring entries are kept (use /wipe_crypto to also drop the // olm store). Does NOT call svc.Stop() because the matrix sync loop can be // blocked on HTTP and cause this handler to hang indefinitely. func (s *E2EServer) handleWipeSession(w http.ResponseWriter, _ *http.Request) { path := lastUserFilePath() _ = clearLastUser() logInfo("E2E wipe_session", "path", path) writeJSON(w, 200, map[string]string{"status": "ok", "wiped": path}) } func (s *E2EServer) handleDiagnostics(w http.ResponseWriter, _ *http.Request) { d := s.svc.GetDiagnostics() writeJSON(w, 200, d) } func (s *E2EServer) handleRooms(w http.ResponseWriter, _ *http.Request) { rooms, err := s.svc.ListRooms() if err != nil { writeJSON(w, 500, map[string]string{"error": err.Error()}) return } writeJSON(w, 200, map[string]any{"rooms": rooms, "count": len(rooms)}) } func (s *E2EServer) handleTimeline(w http.ResponseWriter, r *http.Request) { roomID := r.URL.Query().Get("room_id") if roomID == "" { writeJSON(w, 400, map[string]string{"error": "room_id query param required"}) return } limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) if limit <= 0 { limit = 50 } evs, err := s.svc.LoadTimeline(roomID, limit) if err != nil { writeJSON(w, 500, map[string]string{"error": err.Error()}) return } writeJSON(w, 200, map[string]any{"events": evs, "count": len(evs)}) } type sendReq struct { RoomID string `json:"room_id"` Body string `json:"body"` Markdown bool `json:"markdown,omitempty"` } func (s *E2EServer) handleSend(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeJSON(w, 405, map[string]string{"error": "POST only"}) return } var req sendReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeJSON(w, 400, map[string]string{"error": err.Error()}) return } if req.RoomID == "" || req.Body == "" { writeJSON(w, 400, map[string]string{"error": "room_id and body required"}) return } var evID string var err error if req.Markdown { evID, err = s.svc.SendMarkdown(req.RoomID, req.Body) } else { evID, err = s.svc.SendText(req.RoomID, req.Body) } if err != nil { writeJSON(w, 500, map[string]string{"error": err.Error()}) return } writeJSON(w, 200, map[string]string{"event_id": evID}) } func (s *E2EServer) handleLogs(w http.ResponseWriter, r *http.Request) { n, _ := strconv.Atoi(r.URL.Query().Get("n")) if n <= 0 { n = 200 } lines, err := TailLog(n) if err != nil { writeJSON(w, 500, map[string]string{"error": err.Error()}) return } writeJSON(w, 200, map[string]any{"lines": lines, "count": len(lines), "path": fmt.Sprintf("%s", s.svc.GetLogPath())}) }