Throttle capabilities updates to once per day (or when the app is next launched). (#5454)

* Throttle capabilities updates to once per day (or when the app is next launched).

* Add a setupSubcriptions method to shorten the init.
This commit is contained in:
Doug
2026-04-21 12:43:40 +01:00
committed by GitHub
parent b16bbdcc72
commit 9983e23a4c

View File

@@ -65,7 +65,6 @@ class ClientProxy: ClientProxyProtocol {
let spaceService: SpaceServiceProxyProtocol
let capabilities: HomeserverCapabilitiesProxyProtocol
private var capabilitiesRefreshTask: Task<Void, Never>?
let eventStringBuilder: RoomEventStringBuilder
@@ -251,58 +250,9 @@ class ClientProxy: ClientProxyProtocol {
try await client.setUtdDelegate(utdDelegate: ClientDecryptionErrorDelegate(actionsSubject: actionsSubject))
networkMonitor.reachabilityPublisher
.removeDuplicates()
.receive(on: DispatchQueue.main)
.sink { [weak self] reachability in
if reachability == .reachable {
self?.startSync()
}
}
.store(in: &cancellables)
loadUserAvatarURLFromCache()
ignoredUsersListenerTaskHandle = client.subscribeToIgnoredUsers(listener: SDKListener { [weak self] ignoredUsers in
self?.ignoredUsersSubject.send(ignoredUsers)
})
await updateVerificationState(client.encryption().verificationState())
verificationStateListenerTaskHandle = client.encryption().verificationStateListener(listener: SDKListener { [weak self] verificationState in
Task { await self?.updateVerificationState(verificationState) }
})
sendQueueStatusListenerTaskHandle = client.subscribeToSendQueueStatus(listener: SDKListener { [weak self] roomID, error in
MXLog.error("Send queue failed in room: \(roomID) with error: \(error)")
self?.sendQueueStatusSubject.send(false)
})
sendQueueUpdatesListenerTaskHandle = try? await client.subscribeToSendQueueUpdates(listener: SDKListener { _, update in
switch update {
case .newLocalEvent(let transactionID):
analyticsService.signpost.startTransaction(.sendMessage(uuid: transactionID))
case .sentEvent(let transactionID, _):
analyticsService.signpost.finishTransaction(.sendMessage(uuid: transactionID))
default:
break
}
})
sendQueueStatusSubject
.combineLatest(homeserverReachabilityPublisher)
.debounce(for: 1.0, scheduler: DispatchQueue.main)
.sink { enabled, reachability in
MXLog.info("Send queue status changed to enabled: \(enabled), homeserver reachability: \(reachability)")
if enabled == false, reachability == .reachable {
MXLog.info("Enabling all send queues")
Task {
await client.enableAllSendQueues(enable: true)
}
}
}
.store(in: &cancellables)
await setupSubscriptions()
Task {
do {
@@ -1039,6 +989,66 @@ class ClientProxy: ClientProxyProtocol {
// MARK: - Private
private func setupSubscriptions() async {
networkMonitor.reachabilityPublisher
.removeDuplicates()
.receive(on: DispatchQueue.main)
.sink { [weak self] reachability in
if reachability == .reachable {
self?.startSync()
}
}
.store(in: &cancellables)
ignoredUsersListenerTaskHandle = client.subscribeToIgnoredUsers(listener: SDKListener { [weak self] ignoredUsers in
self?.ignoredUsersSubject.send(ignoredUsers)
})
await updateVerificationState(client.encryption().verificationState())
verificationStateListenerTaskHandle = client.encryption().verificationStateListener(listener: SDKListener { [weak self] verificationState in
Task { await self?.updateVerificationState(verificationState) }
})
sendQueueStatusListenerTaskHandle = client.subscribeToSendQueueStatus(listener: SDKListener { [weak self] roomID, error in
MXLog.error("Send queue failed in room: \(roomID) with error: \(error)")
self?.sendQueueStatusSubject.send(false)
})
sendQueueUpdatesListenerTaskHandle = try? await client.subscribeToSendQueueUpdates(listener: SDKListener { [analyticsService] _, update in
switch update {
case .newLocalEvent(let transactionID):
analyticsService.signpost.startTransaction(.sendMessage(uuid: transactionID))
case .sentEvent(let transactionID, _):
analyticsService.signpost.finishTransaction(.sendMessage(uuid: transactionID))
default:
break
}
})
sendQueueStatusSubject
.combineLatest(homeserverReachabilityPublisher)
.debounce(for: 1.0, scheduler: DispatchQueue.main)
.sink { [client] enabled, reachability in
MXLog.info("Send queue status changed to enabled: \(enabled), homeserver reachability: \(reachability)")
if enabled == false, reachability == .reachable {
MXLog.info("Enabling all send queues")
Task {
await client.enableAllSendQueues(enable: true)
}
}
}
.store(in: &cancellables)
actionsPublisher
.filter(\.isSyncUpdate)
.throttle(for: 86400, scheduler: DispatchQueue.main, latest: true)
.sink { [weak self] _ in
Task { await self?.capabilities.refresh() }
}
.store(in: &cancellables)
}
private func cacheAccountURL() async {
// Calling this function for the first time will cache the account URL in volatile memory for 24 hrs on the SDK.
_ = try? await client.accountUrl(action: nil)
@@ -1142,13 +1152,6 @@ class ClientProxy: ClientProxyProtocol {
if ignoredUsersSubject.value == nil {
updateIgnoredUsers()
}
if capabilitiesRefreshTask == nil {
capabilitiesRefreshTask = Task { [weak self] in
await self?.capabilities.refresh()
self?.capabilitiesRefreshTask = nil
}
}
case .error, .terminated:
break // The sync service is responsible for handling error and termination
}