Sending queue : adjust to match the latest rust api
This commit is contained in:
@@ -41,7 +41,7 @@ import dagger.assisted.AssistedInject
|
||||
import im.vector.app.features.analytics.plan.JoinedRoom
|
||||
import io.element.android.anvilannotations.ContributesNode
|
||||
import io.element.android.appnav.loggedin.LoggedInNode
|
||||
import io.element.android.appnav.loggedin.SendingQueue
|
||||
import io.element.android.appnav.loggedin.SendQueues
|
||||
import io.element.android.appnav.room.RoomFlowNode
|
||||
import io.element.android.appnav.room.RoomNavigationTarget
|
||||
import io.element.android.appnav.room.joined.JoinedRoomLoadedFlowNode
|
||||
@@ -103,7 +103,7 @@ class LoggedInFlowNode @AssistedInject constructor(
|
||||
private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint,
|
||||
private val shareEntryPoint: ShareEntryPoint,
|
||||
private val matrixClient: MatrixClient,
|
||||
private val sendingQueue: SendingQueue,
|
||||
private val sendingQueue: SendQueues,
|
||||
snackbarDispatcher: SnackbarDispatcher,
|
||||
) : BaseFlowNode<LoggedInFlowNode.NavTarget>(
|
||||
backstack = BackStack(
|
||||
|
||||
@@ -22,44 +22,39 @@ import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.libraries.di.SessionScope
|
||||
import io.element.android.libraries.di.SingleIn
|
||||
import io.element.android.libraries.matrix.api.MatrixClient
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.inject.Inject
|
||||
|
||||
private const val SENDING_QUEUE_MIN_RETRY_DELAY = 250L
|
||||
|
||||
@VisibleForTesting
|
||||
const val SENDING_QUEUE_MAX_RETRY_DELAY = 5000L
|
||||
const val SENDING_QUEUE_RETRY_DELAY = 1500L
|
||||
|
||||
@SingleIn(SessionScope::class)
|
||||
class SendingQueue @Inject constructor(
|
||||
class SendQueues @Inject constructor(
|
||||
private val matrixClient: MatrixClient,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
) {
|
||||
private val retryCount = AtomicInteger(0)
|
||||
|
||||
fun launchIn(coroutineScope: CoroutineScope) {
|
||||
combine(
|
||||
networkMonitor.connectivity,
|
||||
matrixClient.sendingQueueStatus(),
|
||||
) { networkStatus, isSendingQueueEnabled ->
|
||||
Pair(networkStatus, isSendingQueueEnabled)
|
||||
}.onEach { (networkStatus, isSendingQueueEnabled) ->
|
||||
Timber.d("Network status: $networkStatus, isSendingQueueEnabled: $isSendingQueueEnabled")
|
||||
if (networkStatus == NetworkStatus.Online && !isSendingQueueEnabled) {
|
||||
val retryDelay =
|
||||
(SENDING_QUEUE_MIN_RETRY_DELAY * retryCount.incrementAndGet()).coerceIn(SENDING_QUEUE_MIN_RETRY_DELAY, SENDING_QUEUE_MAX_RETRY_DELAY)
|
||||
Timber.d("Retry enabling sending queue in $retryDelay ms")
|
||||
delay(retryDelay)
|
||||
} else {
|
||||
retryCount.set(0)
|
||||
networkMonitor.connectivity
|
||||
.onEach { networkStatus ->
|
||||
matrixClient.setAllSendQueuesEnabled(enabled = networkStatus == NetworkStatus.Online)
|
||||
}
|
||||
matrixClient.setSendingQueueEnabled(enabled = networkStatus == NetworkStatus.Online)
|
||||
}.launchIn(coroutineScope)
|
||||
.launchIn(coroutineScope)
|
||||
|
||||
matrixClient.sendQueueDisabledFlow()
|
||||
.onEach { roomId: RoomId ->
|
||||
Timber.d("Send queue disabled for room $roomId")
|
||||
if (networkMonitor.connectivity.value == NetworkStatus.Online) {
|
||||
delay(SENDING_QUEUE_RETRY_DELAY)
|
||||
matrixClient.getRoom(roomId)?.use { room ->
|
||||
room.setSendQueueEnabled(enabled = true)
|
||||
}
|
||||
}
|
||||
}.launchIn(coroutineScope)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.appnav.loggedin
|
||||
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import io.element.android.libraries.matrix.test.FakeMatrixClient
|
||||
import io.element.android.libraries.matrix.test.room.FakeMatrixRoom
|
||||
import io.element.android.tests.testutils.lambda.assert
|
||||
import io.element.android.tests.testutils.lambda.lambdaRecorder
|
||||
import io.element.android.tests.testutils.lambda.value
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.test.advanceTimeBy
|
||||
import kotlinx.coroutines.test.runCurrent
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.junit.Test
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class) class SendQueuesTest {
|
||||
private val matrixClient = FakeMatrixClient()
|
||||
private val room = FakeMatrixRoom()
|
||||
private val networkMonitor = FakeNetworkMonitor()
|
||||
private val sut = SendQueues(matrixClient, networkMonitor)
|
||||
|
||||
@Test
|
||||
fun `test network status online and sending queue failed`() = runTest {
|
||||
|
||||
val sendQueueDisabledFlow = MutableSharedFlow<RoomId>(replay = 1)
|
||||
val setAllSendQueuesEnabledLambda = lambdaRecorder { _: Boolean -> }
|
||||
matrixClient.sendQueueDisabledFlow = sendQueueDisabledFlow
|
||||
matrixClient.setAllSendQueuesEnabledLambda = setAllSendQueuesEnabledLambda
|
||||
matrixClient.givenGetRoomResult(room.roomId, room)
|
||||
|
||||
val setRoomSendQueueEnabledLambda = lambdaRecorder { _: Boolean -> }
|
||||
room.setSendQueueEnabledLambda = setRoomSendQueueEnabledLambda
|
||||
|
||||
sut.launchIn(backgroundScope)
|
||||
|
||||
sendQueueDisabledFlow.emit(room.roomId)
|
||||
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
|
||||
runCurrent()
|
||||
|
||||
assert(setAllSendQueuesEnabledLambda)
|
||||
.isCalledOnce()
|
||||
.with(value(true))
|
||||
|
||||
assert(setRoomSendQueueEnabledLambda)
|
||||
.isCalledOnce()
|
||||
.with(value(true))
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test network status offline and sending queue failed`() = runTest {
|
||||
|
||||
val sendQueueDisabledFlow = MutableSharedFlow<RoomId>(replay = 1)
|
||||
|
||||
val setAllSendQueuesEnabledLambda = lambdaRecorder { _: Boolean -> }
|
||||
matrixClient.sendQueueDisabledFlow = sendQueueDisabledFlow
|
||||
matrixClient.setAllSendQueuesEnabledLambda = setAllSendQueuesEnabledLambda
|
||||
networkMonitor.connectivity.value = NetworkStatus.Offline
|
||||
matrixClient.givenGetRoomResult(room.roomId, room)
|
||||
|
||||
val setRoomSendQueueEnabledLambda = lambdaRecorder { _: Boolean -> }
|
||||
room.setSendQueueEnabledLambda = setRoomSendQueueEnabledLambda
|
||||
|
||||
sut.launchIn(backgroundScope)
|
||||
|
||||
sendQueueDisabledFlow.emit(room.roomId)
|
||||
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
|
||||
runCurrent()
|
||||
|
||||
assert(setAllSendQueuesEnabledLambda)
|
||||
.isCalledOnce()
|
||||
.with(value(false))
|
||||
|
||||
assert(setRoomSendQueueEnabledLambda)
|
||||
.isNeverCalled()
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test network status getting offline and online`() = runTest {
|
||||
|
||||
val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> }
|
||||
matrixClient.setAllSendQueuesEnabledLambda = setEnableSendingQueueLambda
|
||||
|
||||
sut.launchIn(backgroundScope)
|
||||
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
|
||||
networkMonitor.connectivity.value = NetworkStatus.Offline
|
||||
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
|
||||
networkMonitor.connectivity.value = NetworkStatus.Online
|
||||
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
|
||||
|
||||
assert(setEnableSendingQueueLambda)
|
||||
.isCalledExactly(3)
|
||||
.withSequence(
|
||||
listOf(value(true)),
|
||||
listOf(value(false)),
|
||||
listOf(value(true)),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.appnav.loggedin
|
||||
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
|
||||
import io.element.android.libraries.matrix.test.FakeMatrixClient
|
||||
import io.element.android.tests.testutils.lambda.assert
|
||||
import io.element.android.tests.testutils.lambda.lambdaRecorder
|
||||
import io.element.android.tests.testutils.lambda.value
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.test.advanceTimeBy
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.junit.Test
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class) class SendingQueueTest {
|
||||
private val matrixClient = FakeMatrixClient()
|
||||
private val networkMonitor = FakeNetworkMonitor()
|
||||
private val sut = SendingQueue(matrixClient, networkMonitor)
|
||||
|
||||
@Test
|
||||
fun `test network status online and sending queue is disabled`() = runTest {
|
||||
val sendingQueueStatusFlow = MutableStateFlow(false)
|
||||
val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> }
|
||||
matrixClient.sendingQueueStatusFlow = sendingQueueStatusFlow
|
||||
matrixClient.setSendingQueueEnabledLambda = setEnableSendingQueueLambda
|
||||
|
||||
sut.launchIn(backgroundScope)
|
||||
|
||||
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
|
||||
sendingQueueStatusFlow.value = true
|
||||
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
|
||||
|
||||
assert(setEnableSendingQueueLambda)
|
||||
.isCalledExactly(2)
|
||||
.withSequence(
|
||||
listOf(value(true)),
|
||||
listOf(value(true))
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test network status getting offline and online`() = runTest {
|
||||
val sendingQueueStatusFlow = MutableStateFlow(true)
|
||||
val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> }
|
||||
matrixClient.sendingQueueStatusFlow = sendingQueueStatusFlow
|
||||
matrixClient.setSendingQueueEnabledLambda = setEnableSendingQueueLambda
|
||||
|
||||
sut.launchIn(backgroundScope)
|
||||
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
|
||||
networkMonitor.connectivity.value = NetworkStatus.Offline
|
||||
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
|
||||
networkMonitor.connectivity.value = NetworkStatus.Online
|
||||
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
|
||||
|
||||
assert(setEnableSendingQueueLambda)
|
||||
.isCalledExactly(3)
|
||||
.withSequence(
|
||||
listOf(value(true)),
|
||||
listOf(value(false)),
|
||||
listOf(value(true)),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -114,11 +114,11 @@ interface MatrixClient : Closeable {
|
||||
* so it's required to manually re-enable it as soon as
|
||||
* connectivity is back on the device.
|
||||
*/
|
||||
suspend fun setSendingQueueEnabled(enabled: Boolean)
|
||||
suspend fun setAllSendQueuesEnabled(enabled: Boolean)
|
||||
|
||||
/**
|
||||
* Returns the current status of the sending queue as a [StateFlow].
|
||||
* If true, the sending queue is enabled.
|
||||
* Returns a flow of room IDs that have send queue being disabled.
|
||||
* This flow will emit a new value whenever the send queue is disabled for a room.
|
||||
*/
|
||||
fun sendingQueueStatus(): StateFlow<Boolean>
|
||||
fun sendQueueDisabledFlow(): Flow<RoomId>
|
||||
}
|
||||
|
||||
@@ -335,5 +335,7 @@ interface MatrixRoom : Closeable {
|
||||
*/
|
||||
suspend fun sendCallNotificationIfNeeded(): Result<Unit>
|
||||
|
||||
suspend fun setSendQueueEnabled(enabled: Boolean)
|
||||
|
||||
override fun close() = destroy()
|
||||
}
|
||||
|
||||
@@ -95,10 +95,11 @@ import kotlinx.coroutines.withTimeout
|
||||
import org.matrix.rustcomponents.sdk.BackupState
|
||||
import org.matrix.rustcomponents.sdk.Client
|
||||
import org.matrix.rustcomponents.sdk.ClientDelegate
|
||||
import org.matrix.rustcomponents.sdk.ClientException
|
||||
import org.matrix.rustcomponents.sdk.IgnoredUsersListener
|
||||
import org.matrix.rustcomponents.sdk.NotificationProcessSetup
|
||||
import org.matrix.rustcomponents.sdk.PowerLevels
|
||||
import org.matrix.rustcomponents.sdk.SendingQueueStatusListener
|
||||
import org.matrix.rustcomponents.sdk.SendQueueRoomErrorListener
|
||||
import org.matrix.rustcomponents.sdk.TaskHandle
|
||||
import org.matrix.rustcomponents.sdk.use
|
||||
import timber.log.Timber
|
||||
@@ -554,20 +555,18 @@ class RustMatrixClient(
|
||||
}.distinctUntilChanged()
|
||||
}
|
||||
|
||||
override suspend fun setSendingQueueEnabled(enabled: Boolean) = withContext(sessionDispatcher) {
|
||||
Timber.i("setSendingQueueEnabled($enabled)")
|
||||
client.enableSendingQueue(enabled)
|
||||
override suspend fun setAllSendQueuesEnabled(enabled: Boolean) = withContext(sessionDispatcher) {
|
||||
Timber.i("setAllSendQueuesEnabled($enabled)")
|
||||
client.enableAllSendQueues(enabled)
|
||||
}
|
||||
|
||||
override fun sendingQueueStatus(): StateFlow<Boolean> = mxCallbackFlow {
|
||||
client.subscribeToSendingQueueStatus(object : SendingQueueStatusListener {
|
||||
override fun onValue(newValue: Boolean) {
|
||||
channel.trySend(newValue)
|
||||
override fun sendQueueDisabledFlow(): Flow<RoomId> = mxCallbackFlow {
|
||||
client.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener {
|
||||
override fun onError(roomId: String, error: ClientException) {
|
||||
trySend(RoomId(roomId))
|
||||
}
|
||||
})
|
||||
}
|
||||
.buffer(Channel.UNLIMITED)
|
||||
.stateIn(sessionCoroutineScope, started = SharingStarted.Eagerly, initialValue = true)
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
private suspend fun File.getCacheSize(
|
||||
includeCryptoDb: Boolean = false,
|
||||
|
||||
@@ -84,6 +84,7 @@ import org.matrix.rustcomponents.sdk.WidgetCapabilities
|
||||
import org.matrix.rustcomponents.sdk.WidgetCapabilitiesProvider
|
||||
import org.matrix.rustcomponents.sdk.getElementCallRequiredPermissions
|
||||
import org.matrix.rustcomponents.sdk.use
|
||||
import timber.log.Timber
|
||||
import uniffi.matrix_sdk.RoomPowerLevelChanges
|
||||
import java.io.File
|
||||
import org.matrix.rustcomponents.sdk.Room as InnerRoom
|
||||
@@ -594,6 +595,11 @@ class RustMatrixRoom(
|
||||
innerRoom.sendCallNotificationIfNeeded()
|
||||
}
|
||||
|
||||
override suspend fun setSendQueueEnabled(enabled: Boolean) = withContext(roomDispatcher) {
|
||||
Timber.d("setSendQueuesEnabled: $enabled")
|
||||
innerRoom.enableSendQueue(enabled)
|
||||
}
|
||||
|
||||
private fun createTimeline(
|
||||
timeline: InnerTimeline,
|
||||
isLive: Boolean,
|
||||
|
||||
@@ -54,8 +54,10 @@ import kotlinx.collections.immutable.ImmutableList
|
||||
import kotlinx.collections.immutable.persistentListOf
|
||||
import kotlinx.collections.immutable.toImmutableList
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.emptyFlow
|
||||
import kotlinx.coroutines.flow.flowOf
|
||||
import kotlinx.coroutines.test.TestScope
|
||||
import java.util.Optional
|
||||
@@ -300,12 +302,12 @@ class FakeMatrixClient(
|
||||
|
||||
override fun getRoomInfoFlow(roomId: RoomId) = getRoomInfoFlowLambda(roomId)
|
||||
|
||||
var setSendingQueueEnabledLambda = lambdaRecorder(ensureNeverCalled = true) { _: Boolean ->
|
||||
var setAllSendQueuesEnabledLambda = lambdaRecorder(ensureNeverCalled = true) { _: Boolean ->
|
||||
// no-op
|
||||
}
|
||||
|
||||
override suspend fun setSendingQueueEnabled(enabled: Boolean) = setSendingQueueEnabledLambda(enabled)
|
||||
override suspend fun setAllSendQueuesEnabled(enabled: Boolean) = setAllSendQueuesEnabledLambda(enabled)
|
||||
|
||||
var sendingQueueStatusFlow = MutableStateFlow(true)
|
||||
override fun sendingQueueStatus(): StateFlow<Boolean> = sendingQueueStatusFlow
|
||||
var sendQueueDisabledFlow = emptyFlow<RoomId>()
|
||||
override fun sendQueueDisabledFlow(): Flow<RoomId> = sendQueueDisabledFlow
|
||||
}
|
||||
|
||||
@@ -525,6 +525,9 @@ class FakeMatrixRoom(
|
||||
return sendCallNotificationIfNeededResult()
|
||||
}
|
||||
|
||||
var setSendQueueEnabledLambda = { _: Boolean -> }
|
||||
override suspend fun setSendQueueEnabled(enabled: Boolean) = setSendQueueEnabledLambda(enabled)
|
||||
|
||||
override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result<MatrixWidgetDriver> = getWidgetDriverResult
|
||||
|
||||
fun givenRoomMembersState(state: MatrixRoomMembersState) {
|
||||
|
||||
Reference in New Issue
Block a user