change(room members): makes sure to subscribe to timeline items changes

This commit is contained in:
ganfra
2025-11-07 20:15:18 +01:00
parent 5f2453b128
commit a3c81d5f25
10 changed files with 162 additions and 106 deletions

View File

@@ -54,6 +54,7 @@ interface Timeline : AutoCloseable {
val mode: Mode
val membershipChangeEventReceived: Flow<Unit>
val onSyncedEventReceived: Flow<Unit>
suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit>
suspend fun markAsRead(receiptType: ReceiptType): Result<Unit>
suspend fun paginate(direction: PaginationDirection): Result<Boolean>
@@ -233,4 +234,5 @@ interface Timeline : AutoCloseable {
* Get the latest event id of the timeline.
*/
suspend fun getLatestEventId(): Result<EventId?>
}

View File

@@ -50,6 +50,7 @@ import io.element.android.libraries.matrix.impl.widget.generateWidgetWebViewUrl
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted.Companion.WhileSubscribed
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.drop
@@ -57,6 +58,7 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.DateDividerMode
import org.matrix.rustcomponents.sdk.IdentityStatusChangeListener
@@ -91,8 +93,6 @@ class JoinedRustRoom(
private val roomDispatcher = coroutineDispatchers.io.limitedParallelism(32)
private val innerRoom = baseRoom.innerRoom
override val syncUpdateFlow = MutableStateFlow(0L)
override val roomTypingMembersFlow: Flow<List<UserId>> = mxCallbackFlow {
val initial = emptyList<UserId>()
channel.trySend(initial)
@@ -135,11 +135,21 @@ class JoinedRustRoom(
override val roomNotificationSettingsStateFlow = MutableStateFlow<RoomNotificationSettingsState>(RoomNotificationSettingsState.Unknown)
override val liveTimeline = liveInnerTimeline.map(mode = Timeline.Mode.Live) {
syncUpdateFlow.value = systemClock.epochMillis()
}
override val liveTimeline = liveInnerTimeline.map(mode = Timeline.Mode.Live)
override val syncUpdateFlow = liveTimeline
.onSyncedEventReceived.map { systemClock.epochMillis() }
.stateIn(
scope = roomCoroutineScope,
started = WhileSubscribed(),
initialValue = systemClock.epochMillis(),
)
init {
subscribeToRoomMembersChange()
}
private fun subscribeToRoomMembersChange() {
val powerLevelChanges = roomInfoFlow.map { it.roomPowerLevels }.distinctUntilChanged()
val membershipChanges = liveTimeline.membershipChangeEventReceived.onStart { emit(Unit) }
combine(membershipChanges, powerLevelChanges) { _, _ -> }
@@ -478,7 +488,6 @@ class JoinedRustRoom(
private fun InnerTimeline.map(
mode: Timeline.Mode,
onNewSyncedEvent: () -> Unit = {},
): Timeline {
val timelineCoroutineScope = roomCoroutineScope.childScope(coroutineDispatchers.main, "TimelineScope-$roomId-$this")
return RustTimeline(
@@ -489,7 +498,6 @@ class JoinedRustRoom(
coroutineScope = timelineCoroutineScope,
dispatcher = roomDispatcher,
roomContentForwarder = roomContentForwarder,
onNewSyncedEvent = onNewSyncedEvent,
)
}
}

View File

@@ -7,9 +7,10 @@
package io.element.android.libraries.matrix.impl.timeline
import androidx.compose.ui.util.fastForEach
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
import io.element.android.libraries.matrix.api.timeline.item.event.RoomMembershipContent
import kotlinx.coroutines.flow.Flow
import io.element.android.libraries.matrix.api.timeline.item.event.TimelineItemEventOrigin
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
@@ -20,58 +21,60 @@ import timber.log.Timber
internal class MatrixTimelineDiffProcessor(
private val timelineItems: MutableSharedFlow<List<MatrixTimelineItem>>,
private val timelineItemFactory: MatrixTimelineItemMapper,
private val membershipChangeEventReceivedFlow: MutableSharedFlow<Unit>,
private val syncedEventReceivedFlow: MutableSharedFlow<Unit>,
private val timelineItemMapper: MatrixTimelineItemMapper,
) {
private val mutex = Mutex()
private val _membershipChangeEventReceived = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
val membershipChangeEventReceived: Flow<Unit> = _membershipChangeEventReceived
suspend fun postDiffs(diffs: List<TimelineDiff>) {
updateTimelineItems {
mutex.withLock {
Timber.v("Update timeline items from postDiffs (with ${diffs.size} items) on ${Thread.currentThread()}")
diffs.forEach { diff ->
applyDiff(diff)
val result = processDiffs(diffs)
timelineItems.emit(result.items())
if (result.hasNewEventsFromSync()) {
syncedEventReceivedFlow.emit(Unit)
}
if (result.hasMembershipChangeEventFromSync()) {
membershipChangeEventReceivedFlow.emit(Unit)
}
}
}
private suspend fun updateTimelineItems(block: MutableList<MatrixTimelineItem>.() -> Unit) =
mutex.withLock {
val mutableTimelineItems = if (timelineItems.replayCache.isNotEmpty()) {
timelineItems.first().toMutableList()
} else {
mutableListOf()
}
block(mutableTimelineItems)
timelineItems.tryEmit(mutableTimelineItems)
private suspend fun processDiffs(diffs: List<TimelineDiff>): DiffingResult {
val mutableTimelineItems = if (timelineItems.replayCache.isNotEmpty()) {
timelineItems.first().toMutableList()
} else {
mutableListOf()
}
val result = DiffingResult(items = mutableTimelineItems)
diffs.forEach { diff ->
result.applyDiff(diff)
}
return result
}
private fun MutableList<MatrixTimelineItem>.applyDiff(diff: TimelineDiff) {
private fun DiffingResult.applyDiff(diff: TimelineDiff) {
when (diff) {
is TimelineDiff.Append -> {
val items = diff.values.map { it.asMatrixTimelineItem() }
addAll(items)
diff.values.fastForEach { item ->
add(item.map())
}
}
is TimelineDiff.PushBack -> {
val item = diff.value.asMatrixTimelineItem()
if (item is MatrixTimelineItem.Event && item.event.content is RoomMembershipContent) {
// TODO - This is a temporary solution to notify the room screen about membership changes
// Ideally, this should be implemented by the Rust SDK
_membershipChangeEventReceived.tryEmit(Unit)
}
val item = diff.value.map()
add(item)
}
is TimelineDiff.PushFront -> {
val item = diff.value.asMatrixTimelineItem()
val item = diff.value.map()
add(0, item)
}
is TimelineDiff.Set -> {
val item = diff.value.asMatrixTimelineItem()
val item = diff.value.map()
set(diff.index.toInt(), item)
}
is TimelineDiff.Insert -> {
val item = diff.value.asMatrixTimelineItem()
val item = diff.value.map()
add(diff.index.toInt(), item)
}
is TimelineDiff.Remove -> {
@@ -79,25 +82,92 @@ internal class MatrixTimelineDiffProcessor(
}
is TimelineDiff.Reset -> {
clear()
val items = diff.values.map { it.asMatrixTimelineItem() }
addAll(items)
diff.values.fastForEach { item ->
add(item.map())
}
}
TimelineDiff.PopFront -> {
removeFirstOrNull()
removeFirst()
}
TimelineDiff.PopBack -> {
removeLastOrNull()
removeLast()
}
TimelineDiff.Clear -> {
clear()
}
is TimelineDiff.Truncate -> {
subList(diff.length.toInt(), size).clear()
truncate(diff.length.toInt())
}
}
}
private fun TimelineItem.asMatrixTimelineItem(): MatrixTimelineItem {
return timelineItemFactory.map(this)
private fun TimelineItem.map(): MatrixTimelineItem {
return timelineItemMapper.map(this)
}
}
private class DiffingResult(
private val items: MutableList<MatrixTimelineItem>,
private var hasNewEventsFromSync: Boolean = false,
private var hasMembershipChangeEventFromSync: Boolean = false,
) {
fun items(): List<MatrixTimelineItem> = items
fun hasNewEventsFromSync(): Boolean = hasNewEventsFromSync
fun hasMembershipChangeEventFromSync(): Boolean = hasMembershipChangeEventFromSync
fun add(item: MatrixTimelineItem) {
processItem(item)
items.add(item)
}
fun add(index: Int, item: MatrixTimelineItem) {
processItem(item)
items.add(index, item)
}
fun set(index: Int, item: MatrixTimelineItem) {
processItem(item)
items[index] = item
}
fun removeAt(index: Int) {
items.removeAt(index)
}
fun removeFirst() {
items.removeFirstOrNull()
}
fun removeLast() {
items.removeLastOrNull()
}
fun truncate(length: Int) {
items.subList(length, items.size).clear()
}
fun clear() {
items.clear()
}
private fun processItem(item: MatrixTimelineItem) {
if (skipProcessing()) return
when (item) {
is MatrixTimelineItem.Event -> {
if (item.event.origin == TimelineItemEventOrigin.SYNC) {
hasNewEventsFromSync = true
when (item.event.content) {
is RoomMembershipContent -> hasMembershipChangeEventFromSync = true
else -> Unit
}
}
}
else -> Unit
}
}
private fun skipProcessing(): Boolean {
return hasNewEventsFromSync && hasMembershipChangeEventFromSync
}
}

View File

@@ -80,16 +80,18 @@ private const val PAGINATION_SIZE = 50
class RustTimeline(
private val inner: InnerTimeline,
override val mode: Timeline.Mode,
systemClock: SystemClock,
private val systemClock: SystemClock,
private val joinedRoom: JoinedRoom,
private val coroutineScope: CoroutineScope,
private val dispatcher: CoroutineDispatcher,
private val roomContentForwarder: RoomContentForwarder,
onNewSyncedEvent: () -> Unit,
) : Timeline {
private val _timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> =
MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
private val _membershipChangeEventReceived = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
private val _onSyncedEventReceived: MutableSharedFlow<Unit> = MutableSharedFlow(extraBufferCapacity = 1)
private val timelineEventContentMapper = TimelineEventContentMapper()
private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper)
private val timelineItemMapper = MatrixTimelineItemMapper(
@@ -98,18 +100,19 @@ class RustTimeline(
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
eventTimelineItemMapper = EventTimelineItemMapper(
contentMapper = timelineEventContentMapper
)
),
)
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
timelineItems = _timelineItems,
timelineItemFactory = timelineItemMapper,
membershipChangeEventReceivedFlow = _membershipChangeEventReceived,
syncedEventReceivedFlow = _onSyncedEventReceived,
timelineItemMapper = timelineItemMapper,
)
private val timelineItemsSubscriber = TimelineItemsSubscriber(
timeline = inner,
timelineCoroutineScope = coroutineScope,
timelineDiffProcessor = timelineDiffProcessor,
dispatcher = dispatcher,
onNewSyncedEvent = onNewSyncedEvent,
)
private val roomBeginningPostProcessor = RoomBeginningPostProcessor(mode)
@@ -151,7 +154,13 @@ class RustTimeline(
.launchIn(this)
}
override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived
override val membershipChangeEventReceived: Flow<Unit> = _membershipChangeEventReceived
.onStart { timelineItemsSubscriber.subscribeIfNeeded() }
.onCompletion { timelineItemsSubscriber.unsubscribeIfNeeded() }
override val onSyncedEventReceived: Flow<Unit> = _onSyncedEventReceived
.onStart { timelineItemsSubscriber.subscribeIfNeeded() }
.onCompletion { timelineItemsSubscriber.unsubscribeIfNeeded() }
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> = withContext(dispatcher) {
runCatchingExceptions {

View File

@@ -1,32 +0,0 @@
/*
* Copyright 2023, 2024 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.matrix.impl.timeline
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import uniffi.matrix_sdk_ui.EventItemOrigin
/**
* Tries to get an event origin from the TimelineDiff.
* If there is multiple events in the diff, uses the first one as it should be a good indicator.
*/
internal fun TimelineDiff.eventOrigin(): EventItemOrigin? {
return when (this) {
is TimelineDiff.Append -> values.firstOrNull()?.eventOrigin()
is TimelineDiff.PushBack -> value.eventOrigin()
is TimelineDiff.PushFront -> value.eventOrigin()
is TimelineDiff.Set -> value.eventOrigin()
is TimelineDiff.Insert -> value.eventOrigin()
is TimelineDiff.Reset -> values.firstOrNull()?.eventOrigin()
else -> null
}
}
private fun TimelineItem.eventOrigin(): EventItemOrigin? {
return asEvent()?.origin
}

View File

@@ -11,13 +11,11 @@ import io.element.android.libraries.core.coroutine.childScope
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.matrix.rustcomponents.sdk.Timeline
import uniffi.matrix_sdk_ui.EventItemOrigin
/**
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
@@ -28,7 +26,6 @@ internal class TimelineItemsSubscriber(
dispatcher: CoroutineDispatcher,
private val timeline: Timeline,
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
private val onNewSyncedEvent: () -> Unit,
) {
private var subscriptionCount = 0
private val mutex = Mutex()
@@ -43,9 +40,6 @@ internal class TimelineItemsSubscriber(
if (subscriptionCount == 0) {
timeline.timelineDiffFlow()
.onEach { diffs ->
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
onNewSyncedEvent()
}
timelineDiffProcessor.postDiffs(diffs)
}
.launchIn(coroutineScope)

View File

@@ -168,10 +168,12 @@ class MatrixTimelineDiffProcessorTest {
}
internal fun TestScope.createMatrixTimelineDiffProcessor(
timelineItems: MutableSharedFlow<List<MatrixTimelineItem>>,
): MatrixTimelineDiffProcessor {
timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> = MutableSharedFlow(),
membershipChangeEventReceivedFlow: MutableSharedFlow<Unit> = MutableSharedFlow(),
syncedEventReceivedFlow: MutableSharedFlow<Unit> = MutableSharedFlow(),
): MatrixTimelineDiffProcessor {
val timelineEventContentMapper = TimelineEventContentMapper()
val timelineItemMapper = MatrixTimelineItemMapper(
val timelineItemFactory = MatrixTimelineItemMapper(
fetchDetailsForEvent = { _ -> Result.success(Unit) },
coroutineScope = this,
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
@@ -181,6 +183,8 @@ internal fun TestScope.createMatrixTimelineDiffProcessor(
)
return MatrixTimelineDiffProcessor(
timelineItems = timelineItems,
timelineItemFactory = timelineItemMapper,
membershipChangeEventReceivedFlow = membershipChangeEventReceivedFlow,
syncedEventReceivedFlow = syncedEventReceivedFlow,
timelineItemMapper = timelineItemFactory,
)
}

View File

@@ -98,7 +98,6 @@ private fun TestScope.createRustTimeline(
coroutineScope: CoroutineScope = backgroundScope,
dispatcher: CoroutineDispatcher = testCoroutineDispatchers().io,
roomContentForwarder: RoomContentForwarder = RoomContentForwarder(FakeFfiRoomListService()),
onNewSyncedEvent: () -> Unit = {},
): RustTimeline {
return RustTimeline(
inner = inner,
@@ -108,6 +107,5 @@ private fun TestScope.createRustTimeline(
coroutineScope = coroutineScope,
dispatcher = dispatcher,
roomContentForwarder = roomContentForwarder,
onNewSyncedEvent = onNewSyncedEvent,
)
}

View File

@@ -13,8 +13,6 @@ import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
import io.element.android.libraries.matrix.impl.fixtures.factories.aRustEventTimelineItem
import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiTimeline
import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiTimelineItem
import io.element.android.tests.testutils.lambda.lambdaError
import io.element.android.tests.testutils.lambda.lambdaRecorder
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.test.StandardTestDispatcher
@@ -35,9 +33,12 @@ class TimelineItemsSubscriberTest {
val timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> =
MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
val timeline = FakeFfiTimeline()
val diffProcessor = createMatrixTimelineDiffProcessor(
timelineItems = timelineItems,
)
val timelineItemsSubscriber = createTimelineItemsSubscriber(
timeline = timeline,
timelineItems = timelineItems,
timelineDiffProcessor = diffProcessor,
)
timelineItems.test {
timelineItemsSubscriber.subscribeIfNeeded()
@@ -56,9 +57,12 @@ class TimelineItemsSubscriberTest {
val timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> =
MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
val timeline = FakeFfiTimeline()
val diffProcessor = createMatrixTimelineDiffProcessor(
timelineItems = timelineItems,
)
val timelineItemsSubscriber = createTimelineItemsSubscriber(
timeline = timeline,
timelineItems = timelineItems,
timelineDiffProcessor = diffProcessor,
)
timelineItems.test {
timelineItemsSubscriber.subscribeIfNeeded()
@@ -73,15 +77,16 @@ class TimelineItemsSubscriberTest {
@Ignore("JNA direct mapping has broken unit tests with FFI fakes")
@Test
fun `when timeline emits an item with SYNC origin, the callback onNewSyncedEvent is invoked`() = runTest {
fun `when timeline emits an item with SYNC origin`() = runTest {
val timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> =
MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
val timeline = FakeFfiTimeline()
val onNewSyncedEventRecorder = lambdaRecorder<Unit> { }
val diffProcessor = createMatrixTimelineDiffProcessor(
timelineItems = timelineItems,
)
val timelineItemsSubscriber = createTimelineItemsSubscriber(
timeline = timeline,
timelineItems = timelineItems,
onNewSyncedEvent = onNewSyncedEventRecorder,
timelineDiffProcessor = diffProcessor,
)
timelineItems.test {
timelineItemsSubscriber.subscribeIfNeeded()
@@ -100,7 +105,6 @@ class TimelineItemsSubscriberTest {
assertThat(final).isNotEmpty()
timelineItemsSubscriber.unsubscribeIfNeeded()
}
onNewSyncedEventRecorder.assertions().isCalledOnce()
}
@Ignore("JNA direct mapping has broken unit tests with FFI fakes")
@@ -116,14 +120,12 @@ class TimelineItemsSubscriberTest {
private fun TestScope.createTimelineItemsSubscriber(
timeline: Timeline = FakeFfiTimeline(),
timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> = MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE),
onNewSyncedEvent: () -> Unit = { lambdaError() },
timelineDiffProcessor: MatrixTimelineDiffProcessor = createMatrixTimelineDiffProcessor(),
): TimelineItemsSubscriber {
return TimelineItemsSubscriber(
timelineCoroutineScope = backgroundScope,
dispatcher = StandardTestDispatcher(testScheduler),
timeline = timeline,
timelineDiffProcessor = createMatrixTimelineDiffProcessor(timelineItems),
onNewSyncedEvent = onNewSyncedEvent,
timelineDiffProcessor = timelineDiffProcessor,
)
}

View File

@@ -47,6 +47,7 @@ class FakeTimeline(
)
),
override val membershipChangeEventReceived: Flow<Unit> = MutableSharedFlow(),
override val onSyncedEventReceived: Flow<Unit> = MutableSharedFlow(),
private val cancelSendResult: (TransactionId) -> Result<Unit> = { lambdaError() },
override val mode: Timeline.Mode = Timeline.Mode.Live,
private val markAsReadResult: (ReceiptType) -> Result<Unit> = { lambdaError() },