SyncOrchestrator: restore the initial sync step (#4242)
* SyncOrchestrator: restore the initial sync step * Try having internal and public functions to be able to unit test the initial sync and the state changes separately, as well as the initial sync followed by a state change * Only manually start sync if the `SyncService` was previously stopped, don't do it for `Offline` state
This commit is contained in:
committed by
GitHub
parent
a28796106b
commit
51f5087ef2
@@ -7,6 +7,7 @@
|
||||
|
||||
package io.element.android.appnav.di
|
||||
|
||||
import androidx.annotation.VisibleForTesting
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
@@ -21,9 +22,9 @@ import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.onCompletion
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
@@ -53,13 +54,28 @@ class SyncOrchestrator @AssistedInject constructor(
|
||||
*
|
||||
* Before observing the state, a first attempt at starting the sync service will happen if it's not already running.
|
||||
*/
|
||||
@OptIn(FlowPreview::class)
|
||||
fun start() {
|
||||
if (!started.compareAndSet(false, true)) {
|
||||
Timber.tag(tag).d("already started, exiting early")
|
||||
return
|
||||
}
|
||||
|
||||
coroutineScope.launch {
|
||||
// Perform an initial sync if the sync service is not running, to check whether the homeserver is accessible
|
||||
// Otherwise, if the device is offline the sync service will never start and the SyncState will be Idle, not Offline
|
||||
Timber.tag(tag).d("performing initial sync attempt")
|
||||
syncService.startSync()
|
||||
|
||||
// Wait until the sync service is not idle, either it will be running or in error/offline state
|
||||
syncService.syncState.first { it != SyncState.Idle }
|
||||
|
||||
observeStates()
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(FlowPreview::class)
|
||||
@VisibleForTesting(otherwise = VisibleForTesting.PRIVATE)
|
||||
internal fun observeStates() = coroutineScope.launch {
|
||||
Timber.tag(tag).d("start observing the app and network state")
|
||||
|
||||
combine(
|
||||
@@ -76,7 +92,7 @@ class SyncOrchestrator @AssistedInject constructor(
|
||||
Timber.tag(tag).d("isAppActive=$isAppActive, isNetworkAvailable=$isNetworkAvailable")
|
||||
if (syncState == SyncState.Running && !isAppActive) {
|
||||
SyncStateAction.StopSync
|
||||
} else if (syncState != SyncState.Running && isAppActive && isNetworkAvailable) {
|
||||
} else if (syncState == SyncState.Idle && isAppActive && isNetworkAvailable) {
|
||||
SyncStateAction.StartSync
|
||||
} else {
|
||||
SyncStateAction.NoOp
|
||||
@@ -87,7 +103,10 @@ class SyncOrchestrator @AssistedInject constructor(
|
||||
// Don't stop the sync immediately, wait a bit to avoid starting/stopping the sync too often
|
||||
if (action == SyncStateAction.StopSync) 3.seconds else 0.seconds
|
||||
}
|
||||
.onEach { action ->
|
||||
.onCompletion {
|
||||
Timber.tag(tag).d("has been stopped")
|
||||
}
|
||||
.collect { action ->
|
||||
when (action) {
|
||||
SyncStateAction.StartSync -> {
|
||||
syncService.startSync()
|
||||
@@ -98,10 +117,6 @@ class SyncOrchestrator @AssistedInject constructor(
|
||||
SyncStateAction.NoOp -> Unit
|
||||
}
|
||||
}
|
||||
.onCompletion {
|
||||
Timber.tag(tag).d("has been stopped")
|
||||
}
|
||||
.launchIn(coroutineScope)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,50 @@ class SyncOrchestratorTest {
|
||||
@get:Rule
|
||||
val warmUpRule = WarmUpRule()
|
||||
|
||||
@Test
|
||||
fun `when the sync wasn't running before, an initial sync will take place, even with no network`() = runTest {
|
||||
val startSyncRecorder = lambdaRecorder<Result<Unit>> { Result.success(Unit) }
|
||||
val syncService = FakeSyncService(initialSyncState = SyncState.Idle).apply {
|
||||
startSyncLambda = startSyncRecorder
|
||||
}
|
||||
val networkMonitor = FakeNetworkMonitor(initialStatus = NetworkStatus.Disconnected)
|
||||
val syncOrchestrator = createSyncOrchestrator(
|
||||
syncService = syncService,
|
||||
networkMonitor = networkMonitor,
|
||||
)
|
||||
|
||||
// We start observing with an initial sync
|
||||
syncOrchestrator.start()
|
||||
|
||||
// Advance the time just enough to make sure the initial sync has run
|
||||
advanceTimeBy(1.milliseconds)
|
||||
startSyncRecorder.assertions().isCalledOnce()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `when the sync wasn't running before, an initial sync will take place`() = runTest {
|
||||
val startSyncRecorder = lambdaRecorder<Result<Unit>> { Result.success(Unit) }
|
||||
val syncService = FakeSyncService(initialSyncState = SyncState.Idle).apply {
|
||||
startSyncLambda = startSyncRecorder
|
||||
}
|
||||
val networkMonitor = FakeNetworkMonitor(initialStatus = NetworkStatus.Connected)
|
||||
val syncOrchestrator = createSyncOrchestrator(
|
||||
syncService = syncService,
|
||||
networkMonitor = networkMonitor,
|
||||
)
|
||||
|
||||
// We start observing with an initial sync
|
||||
syncOrchestrator.start()
|
||||
|
||||
// Advance the time just enough to make sure the initial sync has run
|
||||
advanceTimeBy(1.milliseconds)
|
||||
startSyncRecorder.assertions().isCalledOnce()
|
||||
|
||||
// If we wait for a while, the sync will not be started again by the observer since it's already running
|
||||
advanceTimeBy(10.seconds)
|
||||
startSyncRecorder.assertions().isCalledOnce()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `when the app goes to background and the sync was running, it will be stopped after a delay`() = runTest {
|
||||
val stopSyncRecorder = lambdaRecorder<Result<Unit>> { Result.success(Unit) }
|
||||
@@ -46,7 +90,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// Advance the time to make sure the orchestrator has had time to start processing the inputs
|
||||
advanceTimeBy(100.milliseconds)
|
||||
@@ -78,7 +122,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// Advance the time to make sure the orchestrator has had time to start processing the inputs
|
||||
advanceTimeBy(100.milliseconds)
|
||||
@@ -126,7 +170,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// Advance the time to make sure the orchestrator has had time to start processing the inputs
|
||||
advanceTimeBy(100.milliseconds)
|
||||
@@ -169,7 +213,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// Advance the time to make sure the orchestrator has had time to start processing the inputs
|
||||
advanceTimeBy(100.milliseconds)
|
||||
@@ -213,7 +257,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// Advance the time to make sure the orchestrator has had time to start processing the inputs
|
||||
advanceTimeBy(100.milliseconds)
|
||||
@@ -256,7 +300,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// Advance the time to make sure the orchestrator has had time to start processing the inputs
|
||||
advanceTimeBy(100.milliseconds)
|
||||
@@ -285,7 +329,7 @@ class SyncOrchestratorTest {
|
||||
)
|
||||
|
||||
// We start observing
|
||||
syncOrchestrator.start()
|
||||
syncOrchestrator.observeStates()
|
||||
|
||||
// This should still not trigger a sync, since there is no network
|
||||
advanceTimeBy(10.seconds)
|
||||
|
||||
Reference in New Issue
Block a user