Performance : subscribe to timeline items only when necessary
This commit is contained in:
@@ -56,8 +56,6 @@ import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
@@ -67,32 +65,28 @@ import kotlinx.coroutines.flow.getAndUpdate
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.FormattedBody
|
||||
import org.matrix.rustcomponents.sdk.MessageFormat
|
||||
import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation
|
||||
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
|
||||
import org.matrix.rustcomponents.sdk.TimelineChange
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromHtml
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
|
||||
import org.matrix.rustcomponents.sdk.use
|
||||
import timber.log.Timber
|
||||
import uniffi.matrix_sdk_ui.EventItemOrigin
|
||||
import uniffi.matrix_sdk_ui.LiveBackPaginationStatus
|
||||
import java.io.File
|
||||
import java.util.Date
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
private const val PAGINATION_SIZE = 50
|
||||
|
||||
class RustTimeline(
|
||||
private val inner: InnerTimeline,
|
||||
isLive: Boolean,
|
||||
private val isLive: Boolean,
|
||||
systemClock: SystemClock,
|
||||
roomCoroutineScope: CoroutineScope,
|
||||
isKeyBackupEnabled: Boolean,
|
||||
@@ -100,7 +94,7 @@ class RustTimeline(
|
||||
private val dispatcher: CoroutineDispatcher,
|
||||
lastLoginTimestamp: Date?,
|
||||
private val roomContentForwarder: RoomContentForwarder,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
onNewSyncedEvent: () -> Unit,
|
||||
) : Timeline {
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val isInit = AtomicBoolean(false)
|
||||
@@ -108,20 +102,9 @@ class RustTimeline(
|
||||
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
|
||||
MutableStateFlow(emptyList())
|
||||
|
||||
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
|
||||
lastLoginTimestamp = lastLoginTimestamp,
|
||||
isRoomEncrypted = matrixRoom.isEncrypted,
|
||||
isKeyBackupEnabled = isKeyBackupEnabled,
|
||||
dispatcher = dispatcher,
|
||||
)
|
||||
|
||||
private val roomBeginningPostProcessor = RoomBeginningPostProcessor()
|
||||
private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock)
|
||||
private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive)
|
||||
|
||||
private val timelineEventContentMapper = TimelineEventContentMapper()
|
||||
private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper)
|
||||
private val timelineItemFactory = MatrixTimelineItemMapper(
|
||||
private val timelineItemMapper = MatrixTimelineItemMapper(
|
||||
fetchDetailsForEvent = this::fetchDetailsForEvent,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
@@ -129,11 +112,29 @@ class RustTimeline(
|
||||
contentMapper = timelineEventContentMapper
|
||||
)
|
||||
)
|
||||
|
||||
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
|
||||
timelineItems = _timelineItems,
|
||||
timelineItemFactory = timelineItemFactory,
|
||||
timelineItemFactory = timelineItemMapper,
|
||||
)
|
||||
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
|
||||
lastLoginTimestamp = lastLoginTimestamp,
|
||||
isRoomEncrypted = matrixRoom.isEncrypted,
|
||||
isKeyBackupEnabled = isKeyBackupEnabled,
|
||||
dispatcher = dispatcher,
|
||||
)
|
||||
private val timelineItemsSubscriber = TimelineItemsSubscriber(
|
||||
timeline = inner,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
timelineDiffProcessor = timelineDiffProcessor,
|
||||
initLatch = initLatch,
|
||||
isInit = isInit,
|
||||
dispatcher = dispatcher,
|
||||
onNewSyncedEvent = onNewSyncedEvent,
|
||||
)
|
||||
|
||||
private val roomBeginningPostProcessor = RoomBeginningPostProcessor()
|
||||
private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock)
|
||||
private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive)
|
||||
|
||||
private val backPaginationStatus = MutableStateFlow(
|
||||
Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true)
|
||||
@@ -145,36 +146,28 @@ class RustTimeline(
|
||||
|
||||
init {
|
||||
roomCoroutineScope.launch(dispatcher) {
|
||||
inner.timelineDiffFlow()
|
||||
.onEach { diffs ->
|
||||
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiffs(diffs)
|
||||
}
|
||||
.launchIn(this)
|
||||
|
||||
launch {
|
||||
fetchMembers()
|
||||
}
|
||||
|
||||
fetchMembers()
|
||||
if (isLive) {
|
||||
// When timeline is live, we need to listen to the back pagination status as
|
||||
// sdk can automatically paginate backwards.
|
||||
inner.liveBackPaginationStatus()
|
||||
.onEach { backPaginationStatus ->
|
||||
updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) {
|
||||
when (backPaginationStatus) {
|
||||
is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline)
|
||||
is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true)
|
||||
}
|
||||
}
|
||||
}
|
||||
.launchIn(this)
|
||||
registerBackPaginationStatusListener()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun CoroutineScope.registerBackPaginationStatusListener() {
|
||||
inner.liveBackPaginationStatus()
|
||||
.onEach { backPaginationStatus ->
|
||||
updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) {
|
||||
when (backPaginationStatus) {
|
||||
is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline)
|
||||
is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true)
|
||||
}
|
||||
}
|
||||
}
|
||||
.launchIn(this)
|
||||
}
|
||||
|
||||
override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived
|
||||
|
||||
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> {
|
||||
@@ -248,13 +241,15 @@ class RustTimeline(
|
||||
// Keep lastForwardIndicatorsPostProcessor last
|
||||
.let { items -> lastForwardIndicatorsPostProcessor.process(items) }
|
||||
}
|
||||
}.onStart {
|
||||
timelineItemsSubscriber.subscribeIfNeeded()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
inner.close()
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(dispatcher) {
|
||||
private fun CoroutineScope.fetchMembers() = launch(dispatcher) {
|
||||
initLatch.await()
|
||||
try {
|
||||
inner.fetchMembers()
|
||||
@@ -263,32 +258,6 @@ class RustTimeline(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
isInit.set(true)
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
val diffsToProcess = diffs.toMutableList()
|
||||
if (!isInit.get()) {
|
||||
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
|
||||
if (resetDiff != null) {
|
||||
// Keep using the postItems logic so we can post the timelineItems asap.
|
||||
postItems(resetDiff.reset() ?: emptyList())
|
||||
diffsToProcess.remove(resetDiff)
|
||||
}
|
||||
}
|
||||
initLatch.await()
|
||||
if (diffsToProcess.isNotEmpty()) {
|
||||
timelineDiffProcessor.postDiffs(diffsToProcess)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun sendMessage(body: String, htmlBody: String?, mentions: List<Mention>): Result<Unit> = withContext(dispatcher) {
|
||||
messageEventContentFromParts(body, htmlBody).withMentions(mentions.map()).use { content ->
|
||||
runCatching<Unit> {
|
||||
@@ -550,12 +519,6 @@ class RustTimeline(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
|
||||
return runCatching {
|
||||
inner.fetchDetailsForEvent(eventId.value)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun loadReplyDetails(eventId: EventId): InReplyTo = withContext(dispatcher) {
|
||||
val timelineItem = _timelineItems.value.firstOrNull { timelineItem ->
|
||||
timelineItem is MatrixTimelineItem.Event && timelineItem.eventId == eventId
|
||||
@@ -572,4 +535,10 @@ class RustTimeline(
|
||||
inner.loadReplyDetails(eventId.value).use(inReplyToMapper::map)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
|
||||
return runCatching {
|
||||
inner.fetchDetailsForEvent(eventId.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.core.coroutine.childScope
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.rustcomponents.sdk.Timeline
|
||||
import org.matrix.rustcomponents.sdk.TimelineChange
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import uniffi.matrix_sdk_ui.EventItemOrigin
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
/**
|
||||
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
|
||||
* It will also trigger a callback when a new synced event is received.
|
||||
* It will also handle the initial items and make sure they are posted before any diff.
|
||||
* When closing the room subscription, it will also unsubscribe automatically.
|
||||
*/
|
||||
internal class TimelineItemsSubscriber(
|
||||
roomCoroutineScope: CoroutineScope,
|
||||
dispatcher: CoroutineDispatcher,
|
||||
private val timeline: Timeline,
|
||||
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
|
||||
private val initLatch: CompletableDeferred<Unit>,
|
||||
private val isInit: AtomicBoolean,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
) {
|
||||
private var subscriptionCount = 0
|
||||
private val mutex = Mutex()
|
||||
|
||||
private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber")
|
||||
|
||||
suspend fun subscribeIfNeeded() = mutex.withLock {
|
||||
if (subscriptionCount == 0) {
|
||||
timeline.timelineDiffFlow()
|
||||
.onEach { diffs ->
|
||||
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiffs(diffs)
|
||||
}
|
||||
.launchIn(coroutineScope)
|
||||
}
|
||||
subscriptionCount++
|
||||
}
|
||||
|
||||
suspend fun unsubscribeIfNeeded() = mutex.withLock {
|
||||
when (subscriptionCount) {
|
||||
0 -> return@withLock
|
||||
1 -> {
|
||||
coroutineScope.coroutineContext.cancelChildren()
|
||||
}
|
||||
}
|
||||
subscriptionCount--
|
||||
}
|
||||
|
||||
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
isInit.set(true)
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
val diffsToProcess = diffs.toMutableList()
|
||||
if (!isInit.get()) {
|
||||
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
|
||||
if (resetDiff != null) {
|
||||
// Keep using the postItems logic so we can post the timelineItems asap.
|
||||
postItems(resetDiff.reset() ?: emptyList())
|
||||
diffsToProcess.remove(resetDiff)
|
||||
}
|
||||
}
|
||||
initLatch.await()
|
||||
if (diffsToProcess.isNotEmpty()) {
|
||||
timelineDiffProcessor.postDiffs(diffsToProcess)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user