diff --git a/ElementX/Sources/Services/Client/ClientProxy.swift b/ElementX/Sources/Services/Client/ClientProxy.swift index 17b3f05cf..979c90fb8 100644 --- a/ElementX/Sources/Services/Client/ClientProxy.swift +++ b/ElementX/Sources/Services/Client/ClientProxy.swift @@ -65,7 +65,6 @@ class ClientProxy: ClientProxyProtocol { let spaceService: SpaceServiceProxyProtocol let capabilities: HomeserverCapabilitiesProxyProtocol - private var capabilitiesRefreshTask: Task? let eventStringBuilder: RoomEventStringBuilder @@ -251,58 +250,9 @@ class ClientProxy: ClientProxyProtocol { try await client.setUtdDelegate(utdDelegate: ClientDecryptionErrorDelegate(actionsSubject: actionsSubject)) - networkMonitor.reachabilityPublisher - .removeDuplicates() - .receive(on: DispatchQueue.main) - .sink { [weak self] reachability in - if reachability == .reachable { - self?.startSync() - } - } - .store(in: &cancellables) - loadUserAvatarURLFromCache() - ignoredUsersListenerTaskHandle = client.subscribeToIgnoredUsers(listener: SDKListener { [weak self] ignoredUsers in - self?.ignoredUsersSubject.send(ignoredUsers) - }) - - await updateVerificationState(client.encryption().verificationState()) - - verificationStateListenerTaskHandle = client.encryption().verificationStateListener(listener: SDKListener { [weak self] verificationState in - Task { await self?.updateVerificationState(verificationState) } - }) - - sendQueueStatusListenerTaskHandle = client.subscribeToSendQueueStatus(listener: SDKListener { [weak self] roomID, error in - MXLog.error("Send queue failed in room: \(roomID) with error: \(error)") - self?.sendQueueStatusSubject.send(false) - }) - - sendQueueUpdatesListenerTaskHandle = try? await client.subscribeToSendQueueUpdates(listener: SDKListener { _, update in - switch update { - case .newLocalEvent(let transactionID): - analyticsService.signpost.startTransaction(.sendMessage(uuid: transactionID)) - case .sentEvent(let transactionID, _): - analyticsService.signpost.finishTransaction(.sendMessage(uuid: transactionID)) - default: - break - } - }) - - sendQueueStatusSubject - .combineLatest(homeserverReachabilityPublisher) - .debounce(for: 1.0, scheduler: DispatchQueue.main) - .sink { enabled, reachability in - MXLog.info("Send queue status changed to enabled: \(enabled), homeserver reachability: \(reachability)") - - if enabled == false, reachability == .reachable { - MXLog.info("Enabling all send queues") - Task { - await client.enableAllSendQueues(enable: true) - } - } - } - .store(in: &cancellables) + await setupSubscriptions() Task { do { @@ -1039,6 +989,66 @@ class ClientProxy: ClientProxyProtocol { // MARK: - Private + private func setupSubscriptions() async { + networkMonitor.reachabilityPublisher + .removeDuplicates() + .receive(on: DispatchQueue.main) + .sink { [weak self] reachability in + if reachability == .reachable { + self?.startSync() + } + } + .store(in: &cancellables) + + ignoredUsersListenerTaskHandle = client.subscribeToIgnoredUsers(listener: SDKListener { [weak self] ignoredUsers in + self?.ignoredUsersSubject.send(ignoredUsers) + }) + + await updateVerificationState(client.encryption().verificationState()) + verificationStateListenerTaskHandle = client.encryption().verificationStateListener(listener: SDKListener { [weak self] verificationState in + Task { await self?.updateVerificationState(verificationState) } + }) + + sendQueueStatusListenerTaskHandle = client.subscribeToSendQueueStatus(listener: SDKListener { [weak self] roomID, error in + MXLog.error("Send queue failed in room: \(roomID) with error: \(error)") + self?.sendQueueStatusSubject.send(false) + }) + + sendQueueUpdatesListenerTaskHandle = try? await client.subscribeToSendQueueUpdates(listener: SDKListener { [analyticsService] _, update in + switch update { + case .newLocalEvent(let transactionID): + analyticsService.signpost.startTransaction(.sendMessage(uuid: transactionID)) + case .sentEvent(let transactionID, _): + analyticsService.signpost.finishTransaction(.sendMessage(uuid: transactionID)) + default: + break + } + }) + + sendQueueStatusSubject + .combineLatest(homeserverReachabilityPublisher) + .debounce(for: 1.0, scheduler: DispatchQueue.main) + .sink { [client] enabled, reachability in + MXLog.info("Send queue status changed to enabled: \(enabled), homeserver reachability: \(reachability)") + + if enabled == false, reachability == .reachable { + MXLog.info("Enabling all send queues") + Task { + await client.enableAllSendQueues(enable: true) + } + } + } + .store(in: &cancellables) + + actionsPublisher + .filter(\.isSyncUpdate) + .throttle(for: 86400, scheduler: DispatchQueue.main, latest: true) + .sink { [weak self] _ in + Task { await self?.capabilities.refresh() } + } + .store(in: &cancellables) + } + private func cacheAccountURL() async { // Calling this function for the first time will cache the account URL in volatile memory for 24 hrs on the SDK. _ = try? await client.accountUrl(action: nil) @@ -1142,13 +1152,6 @@ class ClientProxy: ClientProxyProtocol { if ignoredUsersSubject.value == nil { updateIgnoredUsers() } - - if capabilitiesRefreshTask == nil { - capabilitiesRefreshTask = Task { [weak self] in - await self?.capabilities.refresh() - self?.capabilitiesRefreshTask = nil - } - } case .error, .terminated: break // The sync service is responsible for handling error and termination }