Message queuing : branches the enable sending queue mechanism.
This commit is contained in:
@@ -40,6 +40,7 @@ import dagger.assisted.AssistedInject
|
||||
import im.vector.app.features.analytics.plan.JoinedRoom
|
||||
import io.element.android.anvilannotations.ContributesNode
|
||||
import io.element.android.appnav.loggedin.LoggedInNode
|
||||
import io.element.android.appnav.loggedin.SendingQueue
|
||||
import io.element.android.appnav.room.RoomFlowNode
|
||||
import io.element.android.appnav.room.RoomNavigationTarget
|
||||
import io.element.android.appnav.room.joined.JoinedRoomLoadedFlowNode
|
||||
@@ -99,6 +100,7 @@ class LoggedInFlowNode @AssistedInject constructor(
|
||||
private val ftueService: FtueService,
|
||||
private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint,
|
||||
private val matrixClient: MatrixClient,
|
||||
private val sendingQueue: SendingQueue,
|
||||
snackbarDispatcher: SnackbarDispatcher,
|
||||
) : BaseFlowNode<LoggedInFlowNode.NavTarget>(
|
||||
backstack = BackStack(
|
||||
@@ -154,6 +156,11 @@ class LoggedInFlowNode @AssistedInject constructor(
|
||||
}
|
||||
)
|
||||
observeSyncStateAndNetworkStatus()
|
||||
setupSendingQueue()
|
||||
}
|
||||
|
||||
private fun setupSendingQueue() {
|
||||
sendingQueue.setupWith(lifecycleScope)
|
||||
}
|
||||
|
||||
@OptIn(FlowPreview::class)
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.appnav.loggedin
|
||||
|
||||
import io.element.android.features.networkmonitor.api.NetworkMonitor
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.libraries.di.SessionScope
|
||||
import io.element.android.libraries.di.SingleIn
|
||||
import io.element.android.libraries.matrix.api.MatrixClient
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.inject.Inject
|
||||
|
||||
private const val MIN_RETRY_DELAY = 250L
|
||||
private const val MAX_RETRY_DELAY = 5000L
|
||||
|
||||
@SingleIn(SessionScope::class)
|
||||
class SendingQueue @Inject constructor(
|
||||
private val matrixClient: MatrixClient,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
) {
|
||||
|
||||
private val retryCount = AtomicInteger(0)
|
||||
|
||||
fun setupWith(coroutineScope: CoroutineScope) {
|
||||
combine(
|
||||
networkMonitor.connectivity,
|
||||
matrixClient.sendingQueueStatus(),
|
||||
) { networkStatus, isSendingQueueEnabled ->
|
||||
Pair(networkStatus, isSendingQueueEnabled)
|
||||
}.onEach { (networkStatus, isSendingQueueEnabled) ->
|
||||
Timber.d("Network status: $networkStatus, isSendingQueueEnabled: $isSendingQueueEnabled")
|
||||
if (networkStatus == NetworkStatus.Online && !isSendingQueueEnabled) {
|
||||
val retryDelay = (MIN_RETRY_DELAY * retryCount.incrementAndGet()).coerceIn(MIN_RETRY_DELAY, MAX_RETRY_DELAY)
|
||||
Timber.d("Retry enabling sending queue in $retryDelay ms")
|
||||
delay(retryDelay)
|
||||
} else {
|
||||
retryCount.set(0)
|
||||
}
|
||||
matrixClient.setSendingQueueEnabled(enabled = networkStatus == NetworkStatus.Online)
|
||||
}.launchIn(coroutineScope)
|
||||
}
|
||||
}
|
||||
@@ -114,7 +114,7 @@ interface MatrixClient : Closeable {
|
||||
* so it's required to manually re-enable it as soon as
|
||||
* connectivity is back on the device.
|
||||
*/
|
||||
suspend fun enableSendingQueue(enable: Boolean)
|
||||
suspend fun setSendingQueueEnabled(enabled: Boolean)
|
||||
|
||||
/**
|
||||
* Returns the current status of the sending queue as a [StateFlow].
|
||||
|
||||
@@ -552,8 +552,9 @@ class RustMatrixClient(
|
||||
}.distinctUntilChanged()
|
||||
}
|
||||
|
||||
override suspend fun enableSendingQueue(enable: Boolean) = withContext(sessionDispatcher) {
|
||||
client.enableSendingQueue(enable)
|
||||
override suspend fun setSendingQueueEnabled(enabled: Boolean) = withContext(sessionDispatcher) {
|
||||
Timber.i("setSendingQueueEnabled($enabled)")
|
||||
client.enableSendingQueue(enabled)
|
||||
}
|
||||
|
||||
override fun sendingQueueStatus(): StateFlow<Boolean> = mxCallbackFlow {
|
||||
|
||||
@@ -304,7 +304,7 @@ class FakeMatrixClient(
|
||||
// no-op
|
||||
}
|
||||
|
||||
override suspend fun enableSendingQueue(enable: Boolean) = enableSendingQueueLambda(enable)
|
||||
override suspend fun setSendingQueueEnabled(enable: Boolean) = enableSendingQueueLambda(enable)
|
||||
|
||||
var sendingQueueStatusFlow = MutableStateFlow(true)
|
||||
override fun sendingQueueStatus(): StateFlow<Boolean> = sendingQueueStatusFlow
|
||||
|
||||
Reference in New Issue
Block a user