From bf57b54fee848c054a0afa0c54185d1647467b41 Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 14 Nov 2022 18:06:44 +0100 Subject: [PATCH] Try to fix some issues with roomList and timeline... --- .../x/features/messages/MessagesViewModel.kt | 2 + .../x/core/coroutine/CoroutineDispatchers.kt | 1 + .../io/element/android/x/matrix/Matrix.kt | 6 +- .../element/android/x/matrix/MatrixClient.kt | 3 +- .../android/x/matrix/room/MatrixRoom.kt | 4 +- .../x/matrix/room/RoomListenerFlows.kt | 8 ++- .../x/matrix/room/RoomSummaryDataSource.kt | 64 +++++++++---------- .../x/matrix/sync/SlidingSyncObserverProxy.kt | 19 ++++-- .../x/matrix/sync/SlidingSyncViewFlows.kt | 29 ++++++--- .../x/matrix/timeline/MatrixTimeline.kt | 49 +++++++------- 10 files changed, 107 insertions(+), 78 deletions(-) diff --git a/features/messages/src/main/java/io/element/android/x/features/messages/MessagesViewModel.kt b/features/messages/src/main/java/io/element/android/x/features/messages/MessagesViewModel.kt index 3e012d1fdc..a71d8ac037 100644 --- a/features/messages/src/main/java/io/element/android/x/features/messages/MessagesViewModel.kt +++ b/features/messages/src/main/java/io/element/android/x/features/messages/MessagesViewModel.kt @@ -67,6 +67,7 @@ class MessagesViewModel( } private fun handleInit() { + timeline.initialize() room.syncUpdateFlow() .onEach { val avatarData = @@ -97,5 +98,6 @@ class MessagesViewModel( override fun onCleared() { super.onCleared() + timeline.dispose() } } \ No newline at end of file diff --git a/libraries/core/src/main/java/io/element/android/x/core/coroutine/CoroutineDispatchers.kt b/libraries/core/src/main/java/io/element/android/x/core/coroutine/CoroutineDispatchers.kt index ba0a6a3a2f..f7414fb40b 100644 --- a/libraries/core/src/main/java/io/element/android/x/core/coroutine/CoroutineDispatchers.kt +++ b/libraries/core/src/main/java/io/element/android/x/core/coroutine/CoroutineDispatchers.kt @@ -6,4 +6,5 @@ data class CoroutineDispatchers( val io: CoroutineDispatcher, val computation: CoroutineDispatcher, val main: CoroutineDispatcher, + val diffUpdateDispatcher: CoroutineDispatcher, ) diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/Matrix.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/Matrix.kt index 3252a9ebad..716f626861 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/Matrix.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/Matrix.kt @@ -6,14 +6,15 @@ import io.element.android.x.matrix.session.SessionStore import io.element.android.x.matrix.util.logError import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.flow.* -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.AuthenticationService import org.matrix.rustcomponents.sdk.Client import org.matrix.rustcomponents.sdk.ClientBuilder import java.io.File import java.util.* +import java.util.concurrent.Executors class Matrix( private val coroutineScope: CoroutineScope, @@ -22,7 +23,8 @@ class Matrix( private val coroutineDispatchers = CoroutineDispatchers( io = Dispatchers.IO, computation = Dispatchers.Default, - main = Dispatchers.Main + main = Dispatchers.Main, + diffUpdateDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() ) private val baseFolder = File(context.filesDir, "matrix") private val sessionStore = SessionStore(context) diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/MatrixClient.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/MatrixClient.kt index 24eca22cf4..77858899ea 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/MatrixClient.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/MatrixClient.kt @@ -57,7 +57,7 @@ class MatrixClient internal constructor( .addView(slidingSyncView) .build() - private val slidingSyncObserverProxy = SlidingSyncObserverProxy(coroutineScope) + private val slidingSyncObserverProxy = SlidingSyncObserverProxy(coroutineScope, dispatchers) private val roomSummaryDataSource: RustRoomSummaryDataSource = RustRoomSummaryDataSource( slidingSyncObserverProxy.updateSummaryFlow, @@ -81,6 +81,7 @@ class MatrixClient internal constructor( slidingSyncUpdateFlow = slidingSyncObserverProxy.updateSummaryFlow, slidingSyncRoom = slidingSyncRoom, room = room, + coroutineScope = coroutineScope, coroutineDispatchers = dispatchers ) } diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/MatrixRoom.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/MatrixRoom.kt index 86a845a52c..509396efe5 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/MatrixRoom.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/MatrixRoom.kt @@ -4,6 +4,7 @@ import io.element.android.x.core.coroutine.CoroutineDispatchers import io.element.android.x.matrix.core.RoomId import io.element.android.x.matrix.core.UserId import io.element.android.x.matrix.timeline.MatrixTimeline +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map @@ -15,6 +16,7 @@ class MatrixRoom( private val slidingSyncUpdateFlow: Flow, private val slidingSyncRoom: SlidingSyncRoom, private val room: Room, + private val coroutineScope: CoroutineScope, private val coroutineDispatchers: CoroutineDispatchers, ) { @@ -28,7 +30,7 @@ class MatrixRoom( } fun timeline(): MatrixTimeline { - return MatrixTimeline(this, room, coroutineDispatchers) + return MatrixTimeline(this, room, coroutineScope, coroutineDispatchers) } val roomId = RoomId(room.id()) diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomListenerFlows.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomListenerFlows.kt index 5b87942159..691798b6e3 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomListenerFlows.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomListenerFlows.kt @@ -1,17 +1,21 @@ package io.element.android.x.matrix.room +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.launch import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineListener -fun Room.timelineDiff(): Flow = callbackFlow { +fun Room.timelineDiff(scope: CoroutineScope): Flow = callbackFlow { val listener = object : TimelineListener { override fun onUpdate(update: TimelineDiff) { - trySend(update) + scope.launch { + send(update) + } } } addTimelineListener(listener) diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt index bfa2725f99..60799ad58a 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt @@ -9,7 +9,6 @@ import org.matrix.rustcomponents.sdk.* import timber.log.Timber import java.io.Closeable import java.util.* -import java.util.concurrent.Executors interface RoomSummaryDataSource { fun roomSummaries(): Flow> @@ -23,8 +22,7 @@ internal class RustRoomSummaryDataSource( private val roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory() ) : RoomSummaryDataSource, Closeable { - private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val coroutineScope = CoroutineScope(SupervisorJob() + singleDispatcher) + private val coroutineScope = CoroutineScope(SupervisorJob() + coroutineDispatchers.io) private val roomSummaries = MutableStateFlow>(emptyList()) private val state = MutableStateFlow(SlidingSyncState.COLD) @@ -32,30 +30,31 @@ internal class RustRoomSummaryDataSource( fun startSync() { coroutineScope.launch { updateRoomSummaries { - clear() addAll( slidingSyncView.currentRoomsList().map(::buildSummaryForRoomListEntry) ) } - - slidingSyncView.roomListDiff() - .onEach { diffs -> - updateRoomSummaries { - applyDiff(diffs) - } - }.collect() - - slidingSyncView.state() - .onEach { slidingSyncState -> - Timber.v("New sliding sync state: $slidingSyncState") - state.value = slidingSyncState - }.collect() - - slidingSyncUpdateFlow - .onEach { - didReceiveSyncUpdate(it) - }.collect() } + + slidingSyncUpdateFlow + .onEach { + didReceiveSyncUpdate(it) + }.launchIn(coroutineScope) + + slidingSyncView.roomListDiff(coroutineScope) + .onEach { diffs -> + updateRoomSummaries { + applyDiff(diffs) + } + } + .launchIn(coroutineScope) + + slidingSyncView.state(coroutineScope) + .onEach { slidingSyncState -> + Timber.v("New sliding sync state: $slidingSyncState") + state.value = slidingSyncState + }.launchIn(coroutineScope) + } fun stopSync() { @@ -70,7 +69,7 @@ internal class RustRoomSummaryDataSource( return roomSummaries.sample(50) } - private fun didReceiveSyncUpdate(summary: UpdateSummary) { + private suspend fun didReceiveSyncUpdate(summary: UpdateSummary) { Timber.v("UpdateRooms with identifiers: ${summary.rooms}") if (state.value != SlidingSyncState.LIVE) { return @@ -141,19 +140,18 @@ internal class RustRoomSummaryDataSource( ) } - private fun updateRoomSummaries(block: MutableList.() -> Unit) { + private suspend fun updateRoomSummaries(block: MutableList.() -> Unit) = withContext(coroutineDispatchers.diffUpdateDispatcher){ val mutableRoomSummaries = roomSummaries.value.toMutableList() block(mutableRoomSummaries) roomSummaries.value = mutableRoomSummaries } -} - -fun SlidingSyncViewRoomsListDiff.isInvalidation(): Boolean { - return when (this) { - is SlidingSyncViewRoomsListDiff.InsertAt -> this.value is RoomListEntry.Invalidated - is SlidingSyncViewRoomsListDiff.UpdateAt -> this.value is RoomListEntry.Invalidated - is SlidingSyncViewRoomsListDiff.Push -> this.value is RoomListEntry.Invalidated - else -> false + fun SlidingSyncViewRoomsListDiff.isInvalidation(): Boolean { + return when (this) { + is SlidingSyncViewRoomsListDiff.InsertAt -> this.value is RoomListEntry.Invalidated + is SlidingSyncViewRoomsListDiff.UpdateAt -> this.value is RoomListEntry.Invalidated + is SlidingSyncViewRoomsListDiff.Push -> this.value is RoomListEntry.Invalidated + else -> false + } } -} +} \ No newline at end of file diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncObserverProxy.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncObserverProxy.kt index 08e8b69dd1..b5365cdf9a 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncObserverProxy.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncObserverProxy.kt @@ -1,21 +1,28 @@ package io.element.android.x.matrix.sync +import io.element.android.x.core.coroutine.CoroutineDispatchers import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.launch import org.matrix.rustcomponents.sdk.SlidingSyncObserver import org.matrix.rustcomponents.sdk.UpdateSummary -import org.matrix.rustcomponents.sdk.setupTracing -class SlidingSyncObserverProxy(private val coroutineScope: CoroutineScope) : SlidingSyncObserver { +// Sounds like a reasonable buffer size before it suspends emitting new items. +private const val BUFFER_SIZE = 64 +class SlidingSyncObserverProxy( + private val coroutineScope: CoroutineScope, + private val coroutineDispatchers: CoroutineDispatchers +) : SlidingSyncObserver { - private val updateSummaryMutableFlow = MutableSharedFlow() - val updateSummaryFlow: Flow = updateSummaryMutableFlow + private val updateSummaryMutableFlow = + MutableSharedFlow(extraBufferCapacity = BUFFER_SIZE) + val updateSummaryFlow: SharedFlow = updateSummaryMutableFlow.asSharedFlow() override fun didReceiveSyncUpdate(summary: UpdateSummary) { if (summary.rooms.isEmpty()) return - coroutineScope.launch { + coroutineScope.launch(coroutineDispatchers.io) { updateSummaryMutableFlow.emit(summary) } } diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncViewFlows.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncViewFlows.kt index f9b4aeeb76..8eaa386390 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncViewFlows.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/sync/SlidingSyncViewFlows.kt @@ -1,31 +1,40 @@ package io.element.android.x.matrix.sync +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch import mxCallbackFlow import org.matrix.rustcomponents.sdk.* -fun SlidingSyncView.roomListDiff(): Flow = mxCallbackFlow { - val observer = object : SlidingSyncViewRoomListObserver { - override fun didReceiveUpdate(diff: SlidingSyncViewRoomsListDiff) { - trySend(diff) +fun SlidingSyncView.roomListDiff(scope: CoroutineScope): Flow = + mxCallbackFlow { + val observer = object : SlidingSyncViewRoomListObserver { + override fun didReceiveUpdate(diff: SlidingSyncViewRoomsListDiff) { + scope.launch { + send(diff) + } + } } + observeRoomList(observer) } - observeRoomList(observer) -} -fun SlidingSyncView.state(): Flow = mxCallbackFlow { +fun SlidingSyncView.state(scope: CoroutineScope): Flow = mxCallbackFlow { val observer = object : SlidingSyncViewStateObserver { override fun didReceiveUpdate(newState: SlidingSyncState) { - trySend(newState) + scope.launch { + send(newState) + } } } observeState(observer) } -fun SlidingSyncView.roomsCount(): Flow = mxCallbackFlow { +fun SlidingSyncView.roomsCount(scope: CoroutineScope): Flow = mxCallbackFlow { val observer = object : SlidingSyncViewRoomsCountObserver { override fun didReceiveUpdate(count: UInt) { - trySend(count) + scope.launch { + send(count) + } } } observeRoomsCount(observer) diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt index f2e9fd1498..641d281385 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt @@ -1,11 +1,13 @@ package io.element.android.x.matrix.timeline import io.element.android.x.core.coroutine.CoroutineDispatchers -import io.element.android.x.core.data.flow.chunk import io.element.android.x.matrix.core.EventId import io.element.android.x.matrix.room.MatrixRoom -import io.element.android.x.matrix.room.timelineDiff -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.sample +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.* import timber.log.Timber @@ -14,8 +16,9 @@ import java.util.* class MatrixTimeline( private val matrixRoom: MatrixRoom, private val room: Room, + private val coroutineScope: CoroutineScope, private val coroutineDispatchers: CoroutineDispatchers, -) { +) : TimelineListener { interface Callback { fun onUpdatedTimelineItem(eventId: EventId) fun onStartedBackPaginating() @@ -30,9 +33,7 @@ class MatrixTimeline( fun timelineItems(): Flow> { - return diffFlow().combine(timelineItems) { _, _ -> - timelineItems.value - }.sample(50) + return timelineItems.sample(50) } val hasMoreToLoad: Boolean @@ -41,17 +42,6 @@ class MatrixTimeline( } - private fun diffFlow(): Flow { - return room.timelineDiff() - .onEach { timelineDiffs -> - //Timber.v("Apply ${timelineDiffs.size} diffs on thread: ${Thread.currentThread()}") - updateTimelineItems { - applyDiff(timelineDiffs) - } - }.map { } - .flowOn(coroutineDispatchers.computation) - } - private fun MutableList.applyDiff(diff: TimelineDiff) { when (diff.change()) { TimelineChange.PUSH -> { @@ -107,16 +97,21 @@ class MatrixTimeline( } } - private fun updateTimelineItems(block: MutableList.() -> Unit) { - val mutableTimelineItems = timelineItems.value.toMutableList() - block(mutableTimelineItems) - timelineItems.value = mutableTimelineItems - } + private suspend fun updateTimelineItems(block: MutableList.() -> Unit) = + withContext(coroutineDispatchers.diffUpdateDispatcher) { + val mutableTimelineItems = timelineItems.value.toMutableList() + block(mutableTimelineItems) + timelineItems.value = mutableTimelineItems + } fun addListener(timelineListener: TimelineListener) { room.addTimelineListener(timelineListener) } + fun initialize() { + addListener(this) + } + fun dispose() { room.removeTimeline() } @@ -128,4 +123,12 @@ class MatrixTimeline( return matrixRoom.sendMessage(message) } + override fun onUpdate(update: TimelineDiff) { + coroutineScope.launch { + updateTimelineItems { + applyDiff(update) + } + } + } + } \ No newline at end of file