diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index 9cdb8d1366..9f48e59995 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -56,8 +56,6 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -67,32 +65,28 @@ import kotlinx.coroutines.flow.getAndUpdate import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.FormattedBody import org.matrix.rustcomponents.sdk.MessageFormat import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle -import org.matrix.rustcomponents.sdk.TimelineChange -import org.matrix.rustcomponents.sdk.TimelineDiff -import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.messageEventContentFromHtml import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown import org.matrix.rustcomponents.sdk.use import timber.log.Timber -import uniffi.matrix_sdk_ui.EventItemOrigin import uniffi.matrix_sdk_ui.LiveBackPaginationStatus import java.io.File import java.util.Date import java.util.concurrent.atomic.AtomicBoolean import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline -private const val INITIAL_MAX_SIZE = 50 private const val PAGINATION_SIZE = 50 class RustTimeline( private val inner: InnerTimeline, - isLive: Boolean, + private val isLive: Boolean, systemClock: SystemClock, roomCoroutineScope: CoroutineScope, isKeyBackupEnabled: Boolean, @@ -100,7 +94,7 @@ class RustTimeline( private val dispatcher: CoroutineDispatcher, lastLoginTimestamp: Date?, private val roomContentForwarder: RoomContentForwarder, - private val onNewSyncedEvent: () -> Unit, + onNewSyncedEvent: () -> Unit, ) : Timeline { private val initLatch = CompletableDeferred() private val isInit = AtomicBoolean(false) @@ -108,20 +102,9 @@ class RustTimeline( private val _timelineItems: MutableStateFlow> = MutableStateFlow(emptyList()) - private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor( - lastLoginTimestamp = lastLoginTimestamp, - isRoomEncrypted = matrixRoom.isEncrypted, - isKeyBackupEnabled = isKeyBackupEnabled, - dispatcher = dispatcher, - ) - - private val roomBeginningPostProcessor = RoomBeginningPostProcessor() - private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock) - private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive) - private val timelineEventContentMapper = TimelineEventContentMapper() private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper) - private val timelineItemFactory = MatrixTimelineItemMapper( + private val timelineItemMapper = MatrixTimelineItemMapper( fetchDetailsForEvent = this::fetchDetailsForEvent, roomCoroutineScope = roomCoroutineScope, virtualTimelineItemMapper = VirtualTimelineItemMapper(), @@ -129,11 +112,29 @@ class RustTimeline( contentMapper = timelineEventContentMapper ) ) - private val timelineDiffProcessor = MatrixTimelineDiffProcessor( timelineItems = _timelineItems, - timelineItemFactory = timelineItemFactory, + timelineItemFactory = timelineItemMapper, ) + private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor( + lastLoginTimestamp = lastLoginTimestamp, + isRoomEncrypted = matrixRoom.isEncrypted, + isKeyBackupEnabled = isKeyBackupEnabled, + dispatcher = dispatcher, + ) + private val timelineItemsSubscriber = TimelineItemsSubscriber( + timeline = inner, + roomCoroutineScope = roomCoroutineScope, + timelineDiffProcessor = timelineDiffProcessor, + initLatch = initLatch, + isInit = isInit, + dispatcher = dispatcher, + onNewSyncedEvent = onNewSyncedEvent, + ) + + private val roomBeginningPostProcessor = RoomBeginningPostProcessor() + private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock) + private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive) private val backPaginationStatus = MutableStateFlow( Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true) @@ -145,36 +146,28 @@ class RustTimeline( init { roomCoroutineScope.launch(dispatcher) { - inner.timelineDiffFlow() - .onEach { diffs -> - if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) { - onNewSyncedEvent() - } - postDiffs(diffs) - } - .launchIn(this) - - launch { - fetchMembers() - } - + fetchMembers() if (isLive) { // When timeline is live, we need to listen to the back pagination status as // sdk can automatically paginate backwards. - inner.liveBackPaginationStatus() - .onEach { backPaginationStatus -> - updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) { - when (backPaginationStatus) { - is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline) - is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true) - } - } - } - .launchIn(this) + registerBackPaginationStatusListener() } } } + private fun CoroutineScope.registerBackPaginationStatusListener() { + inner.liveBackPaginationStatus() + .onEach { backPaginationStatus -> + updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) { + when (backPaginationStatus) { + is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline) + is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true) + } + } + } + .launchIn(this) + } + override val membershipChangeEventReceived: Flow = timelineDiffProcessor.membershipChangeEventReceived override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result { @@ -248,13 +241,15 @@ class RustTimeline( // Keep lastForwardIndicatorsPostProcessor last .let { items -> lastForwardIndicatorsPostProcessor.process(items) } } + }.onStart { + timelineItemsSubscriber.subscribeIfNeeded() } override fun close() { inner.close() } - private suspend fun fetchMembers() = withContext(dispatcher) { + private fun CoroutineScope.fetchMembers() = launch(dispatcher) { initLatch.await() try { inner.fetchMembers() @@ -263,32 +258,6 @@ class RustTimeline( } } - private suspend fun postItems(items: List) = coroutineScope { - // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. - items.chunked(INITIAL_MAX_SIZE).reversed().forEach { - ensureActive() - timelineDiffProcessor.postItems(it) - } - isInit.set(true) - initLatch.complete(Unit) - } - - private suspend fun postDiffs(diffs: List) { - val diffsToProcess = diffs.toMutableList() - if (!isInit.get()) { - val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } - if (resetDiff != null) { - // Keep using the postItems logic so we can post the timelineItems asap. - postItems(resetDiff.reset() ?: emptyList()) - diffsToProcess.remove(resetDiff) - } - } - initLatch.await() - if (diffsToProcess.isNotEmpty()) { - timelineDiffProcessor.postDiffs(diffsToProcess) - } - } - override suspend fun sendMessage(body: String, htmlBody: String?, mentions: List): Result = withContext(dispatcher) { messageEventContentFromParts(body, htmlBody).withMentions(mentions.map()).use { content -> runCatching { @@ -550,12 +519,6 @@ class RustTimeline( } } - private suspend fun fetchDetailsForEvent(eventId: EventId): Result { - return runCatching { - inner.fetchDetailsForEvent(eventId.value) - } - } - override suspend fun loadReplyDetails(eventId: EventId): InReplyTo = withContext(dispatcher) { val timelineItem = _timelineItems.value.firstOrNull { timelineItem -> timelineItem is MatrixTimelineItem.Event && timelineItem.eventId == eventId @@ -572,4 +535,10 @@ class RustTimeline( inner.loadReplyDetails(eventId.value).use(inReplyToMapper::map) } } + + private suspend fun fetchDetailsForEvent(eventId: EventId): Result { + return runCatching { + inner.fetchDetailsForEvent(eventId.value) + } + } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt new file mode 100644 index 0000000000..0205ac20e5 --- /dev/null +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2024 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.element.android.libraries.matrix.impl.timeline + +import io.element.android.libraries.core.coroutine.childScope +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.matrix.rustcomponents.sdk.Timeline +import org.matrix.rustcomponents.sdk.TimelineChange +import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineItem +import uniffi.matrix_sdk_ui.EventItemOrigin +import java.util.concurrent.atomic.AtomicBoolean + +private const val INITIAL_MAX_SIZE = 50 + +/** + * This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor. + * It will also trigger a callback when a new synced event is received. + * It will also handle the initial items and make sure they are posted before any diff. + * When closing the room subscription, it will also unsubscribe automatically. + */ +internal class TimelineItemsSubscriber( + roomCoroutineScope: CoroutineScope, + dispatcher: CoroutineDispatcher, + private val timeline: Timeline, + private val timelineDiffProcessor: MatrixTimelineDiffProcessor, + private val initLatch: CompletableDeferred, + private val isInit: AtomicBoolean, + private val onNewSyncedEvent: () -> Unit, +) { + private var subscriptionCount = 0 + private val mutex = Mutex() + + private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + + suspend fun subscribeIfNeeded() = mutex.withLock { + if (subscriptionCount == 0) { + timeline.timelineDiffFlow() + .onEach { diffs -> + if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) { + onNewSyncedEvent() + } + postDiffs(diffs) + } + .launchIn(coroutineScope) + } + subscriptionCount++ + } + + suspend fun unsubscribeIfNeeded() = mutex.withLock { + when (subscriptionCount) { + 0 -> return@withLock + 1 -> { + coroutineScope.coroutineContext.cancelChildren() + } + } + subscriptionCount-- + } + + private suspend fun postItems(items: List) = coroutineScope { + // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. + items.chunked(INITIAL_MAX_SIZE).reversed().forEach { + ensureActive() + timelineDiffProcessor.postItems(it) + } + isInit.set(true) + initLatch.complete(Unit) + } + + private suspend fun postDiffs(diffs: List) { + val diffsToProcess = diffs.toMutableList() + if (!isInit.get()) { + val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } + if (resetDiff != null) { + // Keep using the postItems logic so we can post the timelineItems asap. + postItems(resetDiff.reset() ?: emptyList()) + diffsToProcess.remove(resetDiff) + } + } + initLatch.await() + if (diffsToProcess.isNotEmpty()) { + timelineDiffProcessor.postDiffs(diffsToProcess) + } + } +}