diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineDiffProcessor.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineDiffProcessor.kt index d44f112f38..58a6f75ef1 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineDiffProcessor.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineDiffProcessor.kt @@ -27,7 +27,6 @@ import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.TimelineChange import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem -import org.matrix.rustcomponents.sdk.TimelineListener internal class MatrixTimelineDiffProcessor( private val paginationState: MutableStateFlow, @@ -35,9 +34,9 @@ internal class MatrixTimelineDiffProcessor( private val coroutineScope: CoroutineScope, private val diffDispatcher: CoroutineDispatcher, private val timelineItemFactory: MatrixTimelineItemMapper, -) : TimelineListener { +) { - override fun onUpdate(diff: TimelineDiff) { + fun onUpdate(diff: TimelineDiff) { coroutineScope.launch { updateTimelineItems { applyDiff(diff) @@ -123,5 +122,4 @@ internal class MatrixTimelineDiffProcessor( private fun TimelineItem.asMatrixTimelineItem(): MatrixTimelineItem { return timelineItemFactory.map(this) } - } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt index 980c427737..36044e8c11 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt @@ -32,6 +32,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.sample import kotlinx.coroutines.launch import kotlinx.coroutines.withContext @@ -40,9 +42,6 @@ import org.matrix.rustcomponents.sdk.RequiredState import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.RoomListItem import org.matrix.rustcomponents.sdk.RoomSubscription -import org.matrix.rustcomponents.sdk.SlidingSyncRoom -import org.matrix.rustcomponents.sdk.TimelineItem -import org.matrix.rustcomponents.sdk.TimelineListener import timber.log.Timber import java.util.concurrent.atomic.AtomicBoolean @@ -74,7 +73,7 @@ class RustMatrixTimeline( ) ) - private val innerTimelineListener = MatrixTimelineDiffProcessor( + private val timelineDiffProcessor = MatrixTimelineDiffProcessor( paginationState = paginationState, timelineItems = timelineItems, coroutineScope = coroutineScope, @@ -95,13 +94,8 @@ class RustMatrixTimeline( override fun initialize() { Timber.v("Init timeline for room ${matrixRoom.roomId}") coroutineScope.launch { - val result = addListener(innerTimelineListener) - result - .onSuccess { timelineItems -> - val matrixTimelineItems = timelineItems.map(timelineItemFactory::map) - withContext(coroutineDispatchers.diffUpdateDispatcher) { - this@RustMatrixTimeline.timelineItems.value = matrixTimelineItems - } + subscribeAndAddListener(this) + .onSuccess { isInit.set(true) } .onFailure { @@ -156,8 +150,8 @@ class RustMatrixTimeline( } } - private suspend fun addListener(timelineListener: TimelineListener): Result> = withContext(coroutineDispatchers.io) { - runCatching { + private fun subscribeAndAddListener(coroutineScope: CoroutineScope): Result { + return runCatching { val settings = RoomSubscription( requiredState = listOf( RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""), @@ -168,12 +162,11 @@ class RustMatrixTimeline( timelineLimit = null ) roomListItem.subscribe(settings) - val result = innerRoom.addTimelineListener(timelineListener) - launch { - fetchMembers() - } - listenerTokens += result.itemsStream - result.items + innerRoom.timelineDiffFlow { initialList -> + timelineItems.value = initialList.map(timelineItemFactory::map) + }.onEach { + timelineDiffProcessor.onUpdate(it) + }.launchIn(coroutineScope) } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineDiffFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineDiffFlow.kt new file mode 100644 index 0000000000..92ddac8f0a --- /dev/null +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineDiffFlow.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2023 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.element.android.libraries.matrix.impl.timeline + +import io.element.android.libraries.matrix.impl.util.mxCallbackFlow +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.Flow +import org.matrix.rustcomponents.sdk.Room +import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineItem +import org.matrix.rustcomponents.sdk.TimelineListener + +internal fun Room.timelineDiffFlow(onInitialList: suspend (List) -> Unit): Flow = + mxCallbackFlow { + val listener = object : TimelineListener { + override fun onUpdate(diff: TimelineDiff) { + trySendBlocking(diff) + } + } + val result = addTimelineListener(listener) + onInitialList(result.items) + result.itemsStream + }