Performance : add cache on roomListItem and fullRoom
This commit is contained in:
@@ -196,8 +196,6 @@ class RustMatrixRoom(
|
||||
override fun destroy() {
|
||||
roomCoroutineScope.cancel()
|
||||
liveTimeline.close()
|
||||
innerRoom.destroy()
|
||||
roomListItem.destroy()
|
||||
}
|
||||
|
||||
override val displayName: String
|
||||
@@ -627,12 +625,13 @@ class RustMatrixRoom(
|
||||
isLive: Boolean,
|
||||
onNewSyncedEvent: () -> Unit = {},
|
||||
): Timeline {
|
||||
val timelineCoroutineScope = roomCoroutineScope.childScope(coroutineDispatchers.main, "TimelineScope-$roomId-$timeline")
|
||||
return RustTimeline(
|
||||
isKeyBackupEnabled = isKeyBackupEnabled,
|
||||
isLive = isLive,
|
||||
matrixRoom = this,
|
||||
systemClock = systemClock,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
coroutineScope = timelineCoroutineScope,
|
||||
dispatcher = roomDispatcher,
|
||||
lastLoginTimestamp = sessionData.loginTimestamp,
|
||||
onNewSyncedEvent = onNewSyncedEvent,
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import androidx.collection.lruCache
|
||||
import io.element.android.appconfig.TimelineConfig
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
@@ -41,6 +42,8 @@ import org.matrix.rustcomponents.sdk.TimelineEventTypeFilter
|
||||
import timber.log.Timber
|
||||
import org.matrix.rustcomponents.sdk.RoomListService as InnerRoomListService
|
||||
|
||||
private const val CACHE_SIZE = 16
|
||||
|
||||
class RustRoomFactory(
|
||||
private val sessionId: SessionId,
|
||||
private val notificationSettingsService: NotificationSettingsService,
|
||||
@@ -55,8 +58,23 @@ class RustRoomFactory(
|
||||
private val getSessionData: suspend () -> SessionData,
|
||||
) {
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
private val createRoomDispatcher = dispatchers.io.limitedParallelism(1)
|
||||
private val dispatcher = dispatchers.io.limitedParallelism(1)
|
||||
private val mutex = Mutex()
|
||||
private var isDestroyed: Boolean = false
|
||||
|
||||
private data class RustRoomObjects(
|
||||
val roomListItem: RoomListItem,
|
||||
val fullRoom: Room,
|
||||
)
|
||||
|
||||
private val cache = lruCache<RoomId, RustRoomObjects>(
|
||||
maxSize = CACHE_SIZE,
|
||||
onEntryRemoved = { evicted, roomId, oldRoom, _ ->
|
||||
Timber.d("On room removed from cache: $roomId, evicted: $evicted")
|
||||
oldRoom.roomListItem.close()
|
||||
oldRoom.fullRoom.close()
|
||||
}
|
||||
)
|
||||
|
||||
private val matrixRoomInfoMapper = MatrixRoomInfoMapper()
|
||||
|
||||
@@ -70,30 +88,41 @@ class RustRoomFactory(
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun create(roomId: RoomId): MatrixRoom? = withContext(createRoomDispatcher) {
|
||||
var cachedPairOfRoom: Pair<RoomListItem, Room>?
|
||||
mutex.withLock {
|
||||
// Check if already in memory...
|
||||
cachedPairOfRoom = pairOfRoom(roomId)
|
||||
if (cachedPairOfRoom == null) {
|
||||
// ... otherwise, lets wait for the SS to load all rooms and check again.
|
||||
roomListService.allRooms.awaitLoaded()
|
||||
cachedPairOfRoom = pairOfRoom(roomId)
|
||||
suspend fun destroy() {
|
||||
withContext(dispatcher) {
|
||||
mutex.withLock {
|
||||
Timber.d("Destroying room factory")
|
||||
cache.evictAll()
|
||||
isDestroyed = true
|
||||
}
|
||||
}
|
||||
if (cachedPairOfRoom == null) {
|
||||
Timber.d("No room found for $roomId")
|
||||
return@withContext null
|
||||
}
|
||||
cachedPairOfRoom?.let { (roomListItem, fullRoom) ->
|
||||
}
|
||||
|
||||
suspend fun create(roomId: RoomId): MatrixRoom? = withContext(dispatcher) {
|
||||
mutex.withLock {
|
||||
if (isDestroyed) {
|
||||
Timber.d("Room factory is destroyed, returning null for $roomId")
|
||||
return@withContext null
|
||||
}
|
||||
var roomObjects: RustRoomObjects? = getRoomObjects(roomId)
|
||||
if (roomObjects == null) {
|
||||
// ... otherwise, lets wait for the SS to load all rooms and check again.
|
||||
roomListService.allRooms.awaitLoaded()
|
||||
roomObjects = getRoomObjects(roomId)
|
||||
}
|
||||
if (roomObjects == null) {
|
||||
Timber.d("No room found for $roomId, returning null")
|
||||
return@withContext null
|
||||
}
|
||||
val liveTimeline = roomObjects.fullRoom.timeline()
|
||||
RustMatrixRoom(
|
||||
sessionId = sessionId,
|
||||
isKeyBackupEnabled = isKeyBackupEnabled(),
|
||||
roomListItem = roomListItem,
|
||||
innerRoom = fullRoom,
|
||||
innerTimeline = fullRoom.timeline(),
|
||||
notificationSettingsService = notificationSettingsService,
|
||||
roomListItem = roomObjects.roomListItem,
|
||||
innerRoom = roomObjects.fullRoom,
|
||||
innerTimeline = liveTimeline,
|
||||
sessionCoroutineScope = sessionCoroutineScope,
|
||||
notificationSettingsService = notificationSettingsService,
|
||||
coroutineDispatchers = dispatchers,
|
||||
systemClock = systemClock,
|
||||
roomContentForwarder = roomContentForwarder,
|
||||
@@ -104,20 +133,28 @@ class RustRoomFactory(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? {
|
||||
val cachedRoomListItem = innerRoomListService.roomOrNull(roomId.value)
|
||||
private suspend fun getRoomObjects(roomId: RoomId): RustRoomObjects? {
|
||||
cache[roomId]?.let {
|
||||
Timber.d("Room found in cache for $roomId")
|
||||
return it
|
||||
}
|
||||
val roomListItem = innerRoomListService.roomOrNull(roomId.value)
|
||||
if (roomListItem == null) {
|
||||
Timber.d("Room not found for $roomId")
|
||||
return null
|
||||
}
|
||||
val fullRoom = try {
|
||||
cachedRoomListItem?.fullRoomWithTimeline(filter = eventFilters)
|
||||
roomListItem.fullRoomWithTimeline(filter = eventFilters)
|
||||
} catch (e: RoomListException) {
|
||||
Timber.e(e, "Failed to get full room with timeline for $roomId")
|
||||
null
|
||||
return null
|
||||
}
|
||||
return if (cachedRoomListItem == null || fullRoom == null) {
|
||||
Timber.d("No room cached for $roomId")
|
||||
null
|
||||
} else {
|
||||
Timber.d("Found room cached for $roomId")
|
||||
Pair(cachedRoomListItem, fullRoom)
|
||||
Timber.d("Got full room with timeline for $roomId")
|
||||
return RustRoomObjects(
|
||||
roomListItem = roomListItem,
|
||||
fullRoom = fullRoom,
|
||||
).also {
|
||||
cache.put(roomId, it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
|
||||
class MatrixTimelineItemMapper(
|
||||
private val fetchDetailsForEvent: suspend (EventId) -> Result<Unit>,
|
||||
private val roomCoroutineScope: CoroutineScope,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val virtualTimelineItemMapper: VirtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
private val eventTimelineItemMapper: EventTimelineItemMapper = EventTimelineItemMapper(),
|
||||
) {
|
||||
@@ -49,7 +49,7 @@ class MatrixTimelineItemMapper(
|
||||
return MatrixTimelineItem.Other
|
||||
}
|
||||
|
||||
private fun fetchEventDetails(eventId: EventId) = roomCoroutineScope.launch {
|
||||
private fun fetchEventDetails(eventId: EventId) = coroutineScope.launch {
|
||||
fetchDetailsForEvent(eventId)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,13 +25,13 @@ import kotlinx.coroutines.flow.buffer
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import org.matrix.rustcomponents.sdk.PaginationStatusListener
|
||||
import org.matrix.rustcomponents.sdk.Timeline
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineInterface
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
import timber.log.Timber
|
||||
import uniffi.matrix_sdk_ui.LiveBackPaginationStatus
|
||||
|
||||
internal fun Timeline.liveBackPaginationStatus(): Flow<LiveBackPaginationStatus> = callbackFlow {
|
||||
internal fun TimelineInterface.liveBackPaginationStatus(): Flow<LiveBackPaginationStatus> = callbackFlow {
|
||||
val listener = object : PaginationStatusListener {
|
||||
override fun onUpdate(status: LiveBackPaginationStatus) {
|
||||
trySend(status)
|
||||
@@ -45,7 +45,7 @@ internal fun Timeline.liveBackPaginationStatus(): Flow<LiveBackPaginationStatus>
|
||||
Timber.d(it, "liveBackPaginationStatus() failed")
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
internal fun Timeline.timelineDiffFlow(): Flow<List<TimelineDiff>> =
|
||||
internal fun TimelineInterface.timelineDiffFlow(): Flow<List<TimelineDiff>> =
|
||||
callbackFlow {
|
||||
val listener = object : TimelineListener {
|
||||
override fun onUpdate(diff: List<TimelineDiff>) {
|
||||
@@ -62,7 +62,7 @@ internal fun Timeline.timelineDiffFlow(): Flow<List<TimelineDiff>> =
|
||||
Timber.d(it, "timelineDiffFlow() failed")
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
internal suspend fun Timeline.runWithTimelineListenerRegistered(action: suspend () -> Unit) {
|
||||
internal suspend fun TimelineInterface.runWithTimelineListenerRegistered(action: suspend () -> Unit) {
|
||||
val result = addListener(NoOpTimelineListener)
|
||||
try {
|
||||
action()
|
||||
|
||||
@@ -56,6 +56,7 @@ import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
@@ -64,6 +65,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.getAndUpdate
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onCompletion
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.launch
|
||||
@@ -88,9 +90,9 @@ class RustTimeline(
|
||||
private val inner: InnerTimeline,
|
||||
private val isLive: Boolean,
|
||||
systemClock: SystemClock,
|
||||
roomCoroutineScope: CoroutineScope,
|
||||
isKeyBackupEnabled: Boolean,
|
||||
private val matrixRoom: MatrixRoom,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val dispatcher: CoroutineDispatcher,
|
||||
lastLoginTimestamp: Date?,
|
||||
private val roomContentForwarder: RoomContentForwarder,
|
||||
@@ -106,7 +108,7 @@ class RustTimeline(
|
||||
private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper)
|
||||
private val timelineItemMapper = MatrixTimelineItemMapper(
|
||||
fetchDetailsForEvent = this::fetchDetailsForEvent,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
coroutineScope = coroutineScope,
|
||||
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
eventTimelineItemMapper = EventTimelineItemMapper(
|
||||
contentMapper = timelineEventContentMapper
|
||||
@@ -124,7 +126,7 @@ class RustTimeline(
|
||||
)
|
||||
private val timelineItemsSubscriber = TimelineItemsSubscriber(
|
||||
timeline = inner,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
timelineCoroutineScope = coroutineScope,
|
||||
timelineDiffProcessor = timelineDiffProcessor,
|
||||
initLatch = initLatch,
|
||||
isInit = isInit,
|
||||
@@ -145,13 +147,11 @@ class RustTimeline(
|
||||
)
|
||||
|
||||
init {
|
||||
roomCoroutineScope.launch(dispatcher) {
|
||||
fetchMembers()
|
||||
if (isLive) {
|
||||
// When timeline is live, we need to listen to the back pagination status as
|
||||
// sdk can automatically paginate backwards.
|
||||
registerBackPaginationStatusListener()
|
||||
}
|
||||
coroutineScope.fetchMembers()
|
||||
if (isLive) {
|
||||
// When timeline is live, we need to listen to the back pagination status as
|
||||
// sdk can automatically paginate backwards.
|
||||
coroutineScope.registerBackPaginationStatusListener()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,9 +243,12 @@ class RustTimeline(
|
||||
}
|
||||
}.onStart {
|
||||
timelineItemsSubscriber.subscribeIfNeeded()
|
||||
}.onCompletion {
|
||||
timelineItemsSubscriber.unsubscribeIfNeeded()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
coroutineScope.cancel()
|
||||
inner.close()
|
||||
}
|
||||
|
||||
|
||||
@@ -40,10 +40,9 @@ private const val INITIAL_MAX_SIZE = 50
|
||||
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
|
||||
* It will also trigger a callback when a new synced event is received.
|
||||
* It will also handle the initial items and make sure they are posted before any diff.
|
||||
* When closing the room subscription, it will also unsubscribe automatically.
|
||||
*/
|
||||
internal class TimelineItemsSubscriber(
|
||||
roomCoroutineScope: CoroutineScope,
|
||||
timelineCoroutineScope: CoroutineScope,
|
||||
dispatcher: CoroutineDispatcher,
|
||||
private val timeline: Timeline,
|
||||
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
|
||||
@@ -54,8 +53,12 @@ internal class TimelineItemsSubscriber(
|
||||
private var subscriptionCount = 0
|
||||
private val mutex = Mutex()
|
||||
|
||||
private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber")
|
||||
private val coroutineScope = timelineCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber")
|
||||
|
||||
/**
|
||||
* Add a subscription to the timeline and start posting items/diffs to the timelineDiffProcessor.
|
||||
* It will also trigger a callback when a new synced event is received.
|
||||
*/
|
||||
suspend fun subscribeIfNeeded() = mutex.withLock {
|
||||
if (subscriptionCount == 0) {
|
||||
timeline.timelineDiffFlow()
|
||||
@@ -70,6 +73,11 @@ internal class TimelineItemsSubscriber(
|
||||
subscriptionCount++
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a subscription to the timeline and unsubscribe if needed.
|
||||
* The timeline will be unsubscribed when the last subscription is removed.
|
||||
* If the timelineCoroutineScope is cancelled, the timeline will be unsubscribed automatically.
|
||||
*/
|
||||
suspend fun unsubscribeIfNeeded() = mutex.withLock {
|
||||
when (subscriptionCount) {
|
||||
0 -> return@withLock
|
||||
|
||||
Reference in New Issue
Block a user