misc : simplify timeline diff logic
This commit is contained in:
@@ -28,14 +28,6 @@ internal class MatrixTimelineDiffProcessor(
|
||||
private val _membershipChangeEventReceived = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
|
||||
val membershipChangeEventReceived: Flow<Unit> = _membershipChangeEventReceived
|
||||
|
||||
suspend fun postItems(items: List<TimelineItem>) {
|
||||
updateTimelineItems {
|
||||
Timber.v("Update timeline items from postItems (with ${items.size} items) on ${Thread.currentThread()}")
|
||||
val mappedItems = items.map { it.asMatrixTimelineItem() }
|
||||
addAll(0, mappedItems)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
updateTimelineItems {
|
||||
Timber.v("Update timeline items from postDiffs (with ${diffs.size} items) on ${Thread.currentThread()}")
|
||||
|
||||
@@ -48,7 +48,6 @@ import io.element.android.libraries.matrix.impl.timeline.postprocessor.TypingNot
|
||||
import io.element.android.libraries.matrix.impl.timeline.reply.InReplyToMapper
|
||||
import io.element.android.libraries.matrix.impl.util.MessageEventContent
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
@@ -95,9 +94,6 @@ class RustTimeline(
|
||||
private val featureFlagsService: FeatureFlagService,
|
||||
onNewSyncedEvent: () -> Unit,
|
||||
) : Timeline {
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val isTimelineInitialized = MutableStateFlow(false)
|
||||
|
||||
private val _timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> =
|
||||
MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
|
||||
|
||||
@@ -119,8 +115,6 @@ class RustTimeline(
|
||||
timeline = inner,
|
||||
timelineCoroutineScope = coroutineScope,
|
||||
timelineDiffProcessor = timelineDiffProcessor,
|
||||
initLatch = initLatch,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
dispatcher = dispatcher,
|
||||
onNewSyncedEvent = onNewSyncedEvent,
|
||||
)
|
||||
@@ -181,7 +175,6 @@ class RustTimeline(
|
||||
// Use NonCancellable to avoid breaking the timeline when the coroutine is cancelled.
|
||||
override suspend fun paginate(direction: Timeline.PaginationDirection): Result<Boolean> = withContext(NonCancellable) {
|
||||
withContext(dispatcher) {
|
||||
initLatch.await()
|
||||
runCatchingExceptions {
|
||||
if (!canPaginate(direction)) throw TimelineException.CannotPaginate
|
||||
updatePaginationStatus(direction) { it.copy(isPaginating = true) }
|
||||
@@ -203,7 +196,6 @@ class RustTimeline(
|
||||
}
|
||||
|
||||
private fun canPaginate(direction: Timeline.PaginationDirection): Boolean {
|
||||
if (!isTimelineInitialized.value) return false
|
||||
return when (direction) {
|
||||
Timeline.PaginationDirection.BACKWARDS -> backwardPaginationStatus.value.canPaginate
|
||||
Timeline.PaginationDirection.FORWARDS -> forwardPaginationStatus.value.canPaginate
|
||||
@@ -215,12 +207,10 @@ class RustTimeline(
|
||||
backwardPaginationStatus,
|
||||
forwardPaginationStatus,
|
||||
joinedRoom.roomInfoFlow.map { it.creator to it.isDm }.distinctUntilChanged(),
|
||||
isTimelineInitialized,
|
||||
) { timelineItems,
|
||||
backwardPaginationStatus,
|
||||
forwardPaginationStatus,
|
||||
(roomCreator, isDm),
|
||||
isTimelineInitialized ->
|
||||
(roomCreator, isDm) ->
|
||||
withContext(dispatcher) {
|
||||
timelineItems
|
||||
.let { items ->
|
||||
@@ -234,7 +224,6 @@ class RustTimeline(
|
||||
.let { items ->
|
||||
loadingIndicatorsPostProcessor.process(
|
||||
items = items,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
hasMoreToLoadBackward = backwardPaginationStatus.hasMoreToLoad,
|
||||
hasMoreToLoadForward = forwardPaginationStatus.hasMoreToLoad,
|
||||
)
|
||||
@@ -244,10 +233,7 @@ class RustTimeline(
|
||||
}
|
||||
// Keep lastForwardIndicatorsPostProcessor last
|
||||
.let { items ->
|
||||
lastForwardIndicatorsPostProcessor.process(
|
||||
items = items,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
)
|
||||
lastForwardIndicatorsPostProcessor.process(items = items)
|
||||
}
|
||||
}
|
||||
}.onStart {
|
||||
@@ -262,7 +248,6 @@ class RustTimeline(
|
||||
}
|
||||
|
||||
private fun CoroutineScope.fetchMembers() = launch(dispatcher) {
|
||||
initLatch.await()
|
||||
try {
|
||||
inner.fetchMembers()
|
||||
} catch (exception: Exception) {
|
||||
|
||||
@@ -8,37 +8,26 @@
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.core.coroutine.childScope
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.rustcomponents.sdk.Timeline
|
||||
import org.matrix.rustcomponents.sdk.TimelineChange
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import uniffi.matrix_sdk_ui.EventItemOrigin
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
/**
|
||||
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
|
||||
* It will also trigger a callback when a new synced event is received.
|
||||
* It will also handle the initial items and make sure they are posted before any diff.
|
||||
*/
|
||||
internal class TimelineItemsSubscriber(
|
||||
timelineCoroutineScope: CoroutineScope,
|
||||
dispatcher: CoroutineDispatcher,
|
||||
private val timeline: Timeline,
|
||||
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
|
||||
private val initLatch: CompletableDeferred<Unit>,
|
||||
private val isTimelineInitialized: MutableStateFlow<Boolean>,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
) {
|
||||
private var subscriptionCount = 0
|
||||
@@ -57,7 +46,7 @@ internal class TimelineItemsSubscriber(
|
||||
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiffs(diffs)
|
||||
timelineDiffProcessor.postDiffs(diffs)
|
||||
}
|
||||
.launchIn(coroutineScope)
|
||||
}
|
||||
@@ -78,35 +67,4 @@ internal class TimelineItemsSubscriber(
|
||||
}
|
||||
subscriptionCount--
|
||||
}
|
||||
|
||||
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
if (items.isEmpty()) {
|
||||
// Makes sure to post empty list if there is no item, so you can handle empty state.
|
||||
timelineDiffProcessor.postItems(emptyList())
|
||||
} else {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
}
|
||||
isTimelineInitialized.value = true
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
val diffsToProcess = diffs.toMutableList()
|
||||
if (!isTimelineInitialized.value) {
|
||||
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
|
||||
if (resetDiff != null) {
|
||||
// Keep using the postItems logic so we can post the timelineItems asap.
|
||||
postItems(resetDiff.reset() ?: emptyList())
|
||||
diffsToProcess.remove(resetDiff)
|
||||
}
|
||||
}
|
||||
initLatch.await()
|
||||
if (diffsToProcess.isNotEmpty()) {
|
||||
timelineDiffProcessor.postDiffs(diffsToProcess)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,9 +22,7 @@ class LastForwardIndicatorsPostProcessor(
|
||||
|
||||
fun process(
|
||||
items: List<MatrixTimelineItem>,
|
||||
isTimelineInitialized: Boolean,
|
||||
): List<MatrixTimelineItem> {
|
||||
if (!isTimelineInitialized) return items
|
||||
// We don't need to add the last forward indicator if we are not in the FOCUSED_ON_EVENT mode
|
||||
if (mode != Timeline.Mode.FOCUSED_ON_EVENT) {
|
||||
return items
|
||||
|
||||
@@ -16,11 +16,9 @@ import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
class LoadingIndicatorsPostProcessor(private val systemClock: SystemClock) {
|
||||
fun process(
|
||||
items: List<MatrixTimelineItem>,
|
||||
isTimelineInitialized: Boolean,
|
||||
hasMoreToLoadBackward: Boolean,
|
||||
hasMoreToLoadForward: Boolean,
|
||||
): List<MatrixTimelineItem> {
|
||||
if (!isTimelineInitialized) return items
|
||||
val shouldAddForwardLoadingIndicator = hasMoreToLoadForward && items.isNotEmpty()
|
||||
val currentTimestamp = systemClock.epochMillis()
|
||||
return buildList {
|
||||
|
||||
@@ -57,11 +57,6 @@ class RustTimelineTest {
|
||||
)
|
||||
)
|
||||
)
|
||||
with(awaitItem()) {
|
||||
assertThat(size).isEqualTo(1)
|
||||
// Typing notification
|
||||
assertThat((get(0) as MatrixTimelineItem.Virtual).virtual).isEqualTo(VirtualTimelineItem.TypingNotification)
|
||||
}
|
||||
with(awaitItem()) {
|
||||
assertThat(size).isEqualTo(2)
|
||||
// The loading
|
||||
|
||||
@@ -16,10 +16,8 @@ import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiTimelineDi
|
||||
import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiTimelineItem
|
||||
import io.element.android.tests.testutils.lambda.lambdaError
|
||||
import io.element.android.tests.testutils.lambda.lambdaRecorder
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.test.StandardTestDispatcher
|
||||
import kotlinx.coroutines.test.TestScope
|
||||
import kotlinx.coroutines.test.runCurrent
|
||||
@@ -116,8 +114,6 @@ class TimelineItemsSubscriberTest {
|
||||
private fun TestScope.createTimelineItemsSubscriber(
|
||||
timeline: Timeline = FakeFfiTimeline(),
|
||||
timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> = MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE),
|
||||
initLatch: CompletableDeferred<Unit> = CompletableDeferred(),
|
||||
isTimelineInitialized: MutableStateFlow<Boolean> = MutableStateFlow(false),
|
||||
onNewSyncedEvent: () -> Unit = { lambdaError() },
|
||||
): TimelineItemsSubscriber {
|
||||
return TimelineItemsSubscriber(
|
||||
@@ -125,8 +121,6 @@ private fun TestScope.createTimelineItemsSubscriber(
|
||||
dispatcher = StandardTestDispatcher(testScheduler),
|
||||
timeline = timeline,
|
||||
timelineDiffProcessor = createMatrixTimelineDiffProcessor(timelineItems),
|
||||
initLatch = initLatch,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
onNewSyncedEvent = onNewSyncedEvent,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -18,21 +18,14 @@ class LastForwardIndicatorsPostProcessorTest {
|
||||
@Test
|
||||
fun `LastForwardIndicatorsPostProcessor does not alter the items with mode not FOCUSED_ON_EVENT`() {
|
||||
val sut = LastForwardIndicatorsPostProcessor(Timeline.Mode.LIVE)
|
||||
val result = sut.process(listOf(messageEvent), true)
|
||||
assertThat(result).containsExactly(messageEvent)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `LastForwardIndicatorsPostProcessor does not alter the items with mode FOCUSED_ON_EVENT but timeline not initialized`() {
|
||||
val sut = LastForwardIndicatorsPostProcessor(Timeline.Mode.FOCUSED_ON_EVENT)
|
||||
val result = sut.process(listOf(messageEvent), false)
|
||||
val result = sut.process(listOf(messageEvent))
|
||||
assertThat(result).containsExactly(messageEvent)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `LastForwardIndicatorsPostProcessor add virtual items`() {
|
||||
val sut = LastForwardIndicatorsPostProcessor(Timeline.Mode.FOCUSED_ON_EVENT)
|
||||
val result = sut.process(listOf(messageEvent), true)
|
||||
val result = sut.process(listOf(messageEvent))
|
||||
assertThat(result).containsExactly(
|
||||
messageEvent,
|
||||
MatrixTimelineItem.Virtual(
|
||||
@@ -45,7 +38,7 @@ class LastForwardIndicatorsPostProcessorTest {
|
||||
@Test
|
||||
fun `LastForwardIndicatorsPostProcessor add virtual items on empty list`() {
|
||||
val sut = LastForwardIndicatorsPostProcessor(Timeline.Mode.FOCUSED_ON_EVENT)
|
||||
val result = sut.process(listOf(), true)
|
||||
val result = sut.process(listOf())
|
||||
assertThat(result).containsExactly(
|
||||
MatrixTimelineItem.Virtual(
|
||||
uniqueId = UniqueId("last_forward_indicator_fake_id"),
|
||||
@@ -58,9 +51,9 @@ class LastForwardIndicatorsPostProcessorTest {
|
||||
fun `LastForwardIndicatorsPostProcessor add virtual items but does not alter the list if called a second time`() {
|
||||
val sut = LastForwardIndicatorsPostProcessor(Timeline.Mode.FOCUSED_ON_EVENT)
|
||||
// Process a first time
|
||||
sut.process(listOf(messageEvent), true)
|
||||
sut.process(listOf(messageEvent))
|
||||
// Process a second time with the same Event
|
||||
val result = sut.process(listOf(messageEvent), true)
|
||||
val result = sut.process(listOf(messageEvent))
|
||||
assertThat(result).containsExactly(
|
||||
messageEvent,
|
||||
MatrixTimelineItem.Virtual(
|
||||
@@ -74,9 +67,9 @@ class LastForwardIndicatorsPostProcessorTest {
|
||||
fun `LastForwardIndicatorsPostProcessor add virtual items each time it is called with new Events`() {
|
||||
val sut = LastForwardIndicatorsPostProcessor(Timeline.Mode.FOCUSED_ON_EVENT)
|
||||
// Process a first time
|
||||
sut.process(listOf(dayEvent, messageEvent), true)
|
||||
sut.process(listOf(dayEvent, messageEvent))
|
||||
// Process a second time with the same Event
|
||||
val result = sut.process(listOf(dayEvent, messageEvent, messageEvent2), true)
|
||||
val result = sut.process(listOf(dayEvent, messageEvent, messageEvent2))
|
||||
assertThat(result).containsExactly(
|
||||
dayEvent,
|
||||
messageEvent,
|
||||
|
||||
@@ -16,25 +16,12 @@ import io.element.android.services.toolbox.test.systemclock.FakeSystemClock
|
||||
import org.junit.Test
|
||||
|
||||
class LoadingIndicatorsPostProcessorTest {
|
||||
@Test
|
||||
fun `LoadingIndicatorsPostProcessor does not alter the items is the timeline is not initialized`() {
|
||||
val sut = LoadingIndicatorsPostProcessor(FakeSystemClock())
|
||||
val result = sut.process(
|
||||
items = listOf(messageEvent, messageEvent2),
|
||||
isTimelineInitialized = false,
|
||||
hasMoreToLoadBackward = true,
|
||||
hasMoreToLoadForward = true,
|
||||
)
|
||||
assertThat(result).containsExactly(messageEvent, messageEvent2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `LoadingIndicatorsPostProcessor adds Loading indicator at the top of the list if hasMoreToLoadBackward is true`() {
|
||||
val clock = FakeSystemClock()
|
||||
val sut = LoadingIndicatorsPostProcessor(clock)
|
||||
val result = sut.process(
|
||||
items = listOf(messageEvent, messageEvent2),
|
||||
isTimelineInitialized = true,
|
||||
hasMoreToLoadBackward = true,
|
||||
hasMoreToLoadForward = false,
|
||||
)
|
||||
@@ -57,7 +44,6 @@ class LoadingIndicatorsPostProcessorTest {
|
||||
val sut = LoadingIndicatorsPostProcessor(clock)
|
||||
val result = sut.process(
|
||||
items = listOf(messageEvent, messageEvent2),
|
||||
isTimelineInitialized = true,
|
||||
hasMoreToLoadBackward = false,
|
||||
hasMoreToLoadForward = true,
|
||||
)
|
||||
@@ -80,7 +66,6 @@ class LoadingIndicatorsPostProcessorTest {
|
||||
val sut = LoadingIndicatorsPostProcessor(clock)
|
||||
val result = sut.process(
|
||||
items = listOf(messageEvent, messageEvent2),
|
||||
isTimelineInitialized = true,
|
||||
hasMoreToLoadBackward = true,
|
||||
hasMoreToLoadForward = true,
|
||||
)
|
||||
@@ -110,7 +95,6 @@ class LoadingIndicatorsPostProcessorTest {
|
||||
val sut = LoadingIndicatorsPostProcessor(clock)
|
||||
val result = sut.process(
|
||||
items = listOf(),
|
||||
isTimelineInitialized = true,
|
||||
hasMoreToLoadBackward = true,
|
||||
hasMoreToLoadForward = true,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user