Pinned messages : fix timeline provider subscription
This commit is contained in:
@@ -18,11 +18,13 @@ import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.timeline.Timeline
|
||||
import io.element.android.libraries.matrix.api.timeline.TimelineProvider
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onCompletion
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import javax.inject.Inject
|
||||
|
||||
@@ -32,7 +34,8 @@ class PinnedEventsTimelineProvider @Inject constructor(
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
private val featureFlagService: FeatureFlagService,
|
||||
) : TimelineProvider {
|
||||
private val _timelineStateFlow: MutableStateFlow<AsyncData<Timeline>> = MutableStateFlow(AsyncData.Uninitialized)
|
||||
private val _timelineStateFlow: MutableStateFlow<AsyncData<Timeline>> =
|
||||
MutableStateFlow(AsyncData.Uninitialized)
|
||||
|
||||
override fun activeTimelineFlow(): StateFlow<Timeline?> {
|
||||
return _timelineStateFlow
|
||||
@@ -44,25 +47,46 @@ class PinnedEventsTimelineProvider @Inject constructor(
|
||||
val timelineStateFlow = _timelineStateFlow
|
||||
|
||||
fun launchIn(scope: CoroutineScope) {
|
||||
_timelineStateFlow.subscriptionCount
|
||||
.map { count -> count > 0 }
|
||||
.distinctUntilChanged()
|
||||
.onEach { isActive ->
|
||||
if (isActive) {
|
||||
onActive()
|
||||
} else {
|
||||
onInactive()
|
||||
}
|
||||
}
|
||||
.launchIn(scope)
|
||||
}
|
||||
|
||||
private suspend fun onActive() = coroutineScope {
|
||||
combine(
|
||||
featureFlagService.isFeatureEnabledFlow(FeatureFlags.PinnedEvents),
|
||||
networkMonitor.connectivity
|
||||
) {
|
||||
// do not use connectivity here as data can be loaded from cache, it's just to trigger retry if needed
|
||||
isEnabled, _ ->
|
||||
) { isEnabled, _ ->
|
||||
// do not use connectivity here as data can be loaded from cache, it's just to trigger retry if needed
|
||||
isEnabled
|
||||
}
|
||||
.onEach { isFeatureEnabled ->
|
||||
if (isFeatureEnabled) {
|
||||
loadTimelineIfNeeded()
|
||||
} else {
|
||||
_timelineStateFlow.value = AsyncData.Uninitialized
|
||||
resetTimeline()
|
||||
}
|
||||
}
|
||||
.onCompletion {
|
||||
invokeOnTimeline { close() }
|
||||
}
|
||||
.launchIn(scope)
|
||||
.launchIn(this)
|
||||
}
|
||||
|
||||
private suspend fun onInactive() {
|
||||
resetTimeline()
|
||||
}
|
||||
|
||||
private suspend fun resetTimeline() {
|
||||
invokeOnTimeline {
|
||||
close()
|
||||
}
|
||||
_timelineStateFlow.emit(AsyncData.Uninitialized)
|
||||
}
|
||||
|
||||
suspend fun invokeOnTimeline(action: suspend Timeline.() -> Unit) {
|
||||
|
||||
Reference in New Issue
Block a user