Try to fix some issues with roomList and timeline...

This commit is contained in:
ganfra
2022-11-14 18:06:44 +01:00
parent 089e1da4b7
commit bf57b54fee
10 changed files with 107 additions and 78 deletions

View File

@@ -67,6 +67,7 @@ class MessagesViewModel(
}
private fun handleInit() {
timeline.initialize()
room.syncUpdateFlow()
.onEach {
val avatarData =
@@ -97,5 +98,6 @@ class MessagesViewModel(
override fun onCleared() {
super.onCleared()
timeline.dispose()
}
}

View File

@@ -6,4 +6,5 @@ data class CoroutineDispatchers(
val io: CoroutineDispatcher,
val computation: CoroutineDispatcher,
val main: CoroutineDispatcher,
val diffUpdateDispatcher: CoroutineDispatcher,
)

View File

@@ -6,14 +6,15 @@ import io.element.android.x.matrix.session.SessionStore
import io.element.android.x.matrix.util.logError
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.AuthenticationService
import org.matrix.rustcomponents.sdk.Client
import org.matrix.rustcomponents.sdk.ClientBuilder
import java.io.File
import java.util.*
import java.util.concurrent.Executors
class Matrix(
private val coroutineScope: CoroutineScope,
@@ -22,7 +23,8 @@ class Matrix(
private val coroutineDispatchers = CoroutineDispatchers(
io = Dispatchers.IO,
computation = Dispatchers.Default,
main = Dispatchers.Main
main = Dispatchers.Main,
diffUpdateDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
)
private val baseFolder = File(context.filesDir, "matrix")
private val sessionStore = SessionStore(context)

View File

@@ -57,7 +57,7 @@ class MatrixClient internal constructor(
.addView(slidingSyncView)
.build()
private val slidingSyncObserverProxy = SlidingSyncObserverProxy(coroutineScope)
private val slidingSyncObserverProxy = SlidingSyncObserverProxy(coroutineScope, dispatchers)
private val roomSummaryDataSource: RustRoomSummaryDataSource =
RustRoomSummaryDataSource(
slidingSyncObserverProxy.updateSummaryFlow,
@@ -81,6 +81,7 @@ class MatrixClient internal constructor(
slidingSyncUpdateFlow = slidingSyncObserverProxy.updateSummaryFlow,
slidingSyncRoom = slidingSyncRoom,
room = room,
coroutineScope = coroutineScope,
coroutineDispatchers = dispatchers
)
}

View File

@@ -4,6 +4,7 @@ import io.element.android.x.core.coroutine.CoroutineDispatchers
import io.element.android.x.matrix.core.RoomId
import io.element.android.x.matrix.core.UserId
import io.element.android.x.matrix.timeline.MatrixTimeline
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
@@ -15,6 +16,7 @@ class MatrixRoom(
private val slidingSyncUpdateFlow: Flow<UpdateSummary>,
private val slidingSyncRoom: SlidingSyncRoom,
private val room: Room,
private val coroutineScope: CoroutineScope,
private val coroutineDispatchers: CoroutineDispatchers,
) {
@@ -28,7 +30,7 @@ class MatrixRoom(
}
fun timeline(): MatrixTimeline {
return MatrixTimeline(this, room, coroutineDispatchers)
return MatrixTimeline(this, room, coroutineScope, coroutineDispatchers)
}
val roomId = RoomId(room.id())

View File

@@ -1,17 +1,21 @@
package io.element.android.x.matrix.room
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineListener
fun Room.timelineDiff(): Flow<TimelineDiff> = callbackFlow {
fun Room.timelineDiff(scope: CoroutineScope): Flow<TimelineDiff> = callbackFlow {
val listener = object : TimelineListener {
override fun onUpdate(update: TimelineDiff) {
trySend(update)
scope.launch {
send(update)
}
}
}
addTimelineListener(listener)

View File

@@ -9,7 +9,6 @@ import org.matrix.rustcomponents.sdk.*
import timber.log.Timber
import java.io.Closeable
import java.util.*
import java.util.concurrent.Executors
interface RoomSummaryDataSource {
fun roomSummaries(): Flow<List<RoomSummary>>
@@ -23,8 +22,7 @@ internal class RustRoomSummaryDataSource(
private val roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory()
) : RoomSummaryDataSource, Closeable {
private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val coroutineScope = CoroutineScope(SupervisorJob() + singleDispatcher)
private val coroutineScope = CoroutineScope(SupervisorJob() + coroutineDispatchers.io)
private val roomSummaries = MutableStateFlow<List<RoomSummary>>(emptyList())
private val state = MutableStateFlow(SlidingSyncState.COLD)
@@ -32,30 +30,31 @@ internal class RustRoomSummaryDataSource(
fun startSync() {
coroutineScope.launch {
updateRoomSummaries {
clear()
addAll(
slidingSyncView.currentRoomsList().map(::buildSummaryForRoomListEntry)
)
}
slidingSyncView.roomListDiff()
.onEach { diffs ->
updateRoomSummaries {
applyDiff(diffs)
}
}.collect()
slidingSyncView.state()
.onEach { slidingSyncState ->
Timber.v("New sliding sync state: $slidingSyncState")
state.value = slidingSyncState
}.collect()
slidingSyncUpdateFlow
.onEach {
didReceiveSyncUpdate(it)
}.collect()
}
slidingSyncUpdateFlow
.onEach {
didReceiveSyncUpdate(it)
}.launchIn(coroutineScope)
slidingSyncView.roomListDiff(coroutineScope)
.onEach { diffs ->
updateRoomSummaries {
applyDiff(diffs)
}
}
.launchIn(coroutineScope)
slidingSyncView.state(coroutineScope)
.onEach { slidingSyncState ->
Timber.v("New sliding sync state: $slidingSyncState")
state.value = slidingSyncState
}.launchIn(coroutineScope)
}
fun stopSync() {
@@ -70,7 +69,7 @@ internal class RustRoomSummaryDataSource(
return roomSummaries.sample(50)
}
private fun didReceiveSyncUpdate(summary: UpdateSummary) {
private suspend fun didReceiveSyncUpdate(summary: UpdateSummary) {
Timber.v("UpdateRooms with identifiers: ${summary.rooms}")
if (state.value != SlidingSyncState.LIVE) {
return
@@ -141,19 +140,18 @@ internal class RustRoomSummaryDataSource(
)
}
private fun updateRoomSummaries(block: MutableList<RoomSummary>.() -> Unit) {
private suspend fun updateRoomSummaries(block: MutableList<RoomSummary>.() -> Unit) = withContext(coroutineDispatchers.diffUpdateDispatcher){
val mutableRoomSummaries = roomSummaries.value.toMutableList()
block(mutableRoomSummaries)
roomSummaries.value = mutableRoomSummaries
}
}
fun SlidingSyncViewRoomsListDiff.isInvalidation(): Boolean {
return when (this) {
is SlidingSyncViewRoomsListDiff.InsertAt -> this.value is RoomListEntry.Invalidated
is SlidingSyncViewRoomsListDiff.UpdateAt -> this.value is RoomListEntry.Invalidated
is SlidingSyncViewRoomsListDiff.Push -> this.value is RoomListEntry.Invalidated
else -> false
fun SlidingSyncViewRoomsListDiff.isInvalidation(): Boolean {
return when (this) {
is SlidingSyncViewRoomsListDiff.InsertAt -> this.value is RoomListEntry.Invalidated
is SlidingSyncViewRoomsListDiff.UpdateAt -> this.value is RoomListEntry.Invalidated
is SlidingSyncViewRoomsListDiff.Push -> this.value is RoomListEntry.Invalidated
else -> false
}
}
}
}

View File

@@ -1,21 +1,28 @@
package io.element.android.x.matrix.sync
import io.element.android.x.core.coroutine.CoroutineDispatchers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import org.matrix.rustcomponents.sdk.SlidingSyncObserver
import org.matrix.rustcomponents.sdk.UpdateSummary
import org.matrix.rustcomponents.sdk.setupTracing
class SlidingSyncObserverProxy(private val coroutineScope: CoroutineScope) : SlidingSyncObserver {
// Sounds like a reasonable buffer size before it suspends emitting new items.
private const val BUFFER_SIZE = 64
class SlidingSyncObserverProxy(
private val coroutineScope: CoroutineScope,
private val coroutineDispatchers: CoroutineDispatchers
) : SlidingSyncObserver {
private val updateSummaryMutableFlow = MutableSharedFlow<UpdateSummary>()
val updateSummaryFlow: Flow<UpdateSummary> = updateSummaryMutableFlow
private val updateSummaryMutableFlow =
MutableSharedFlow<UpdateSummary>(extraBufferCapacity = BUFFER_SIZE)
val updateSummaryFlow: SharedFlow<UpdateSummary> = updateSummaryMutableFlow.asSharedFlow()
override fun didReceiveSyncUpdate(summary: UpdateSummary) {
if (summary.rooms.isEmpty()) return
coroutineScope.launch {
coroutineScope.launch(coroutineDispatchers.io) {
updateSummaryMutableFlow.emit(summary)
}
}

View File

@@ -1,31 +1,40 @@
package io.element.android.x.matrix.sync
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import mxCallbackFlow
import org.matrix.rustcomponents.sdk.*
fun SlidingSyncView.roomListDiff(): Flow<SlidingSyncViewRoomsListDiff> = mxCallbackFlow {
val observer = object : SlidingSyncViewRoomListObserver {
override fun didReceiveUpdate(diff: SlidingSyncViewRoomsListDiff) {
trySend(diff)
fun SlidingSyncView.roomListDiff(scope: CoroutineScope): Flow<SlidingSyncViewRoomsListDiff> =
mxCallbackFlow {
val observer = object : SlidingSyncViewRoomListObserver {
override fun didReceiveUpdate(diff: SlidingSyncViewRoomsListDiff) {
scope.launch {
send(diff)
}
}
}
observeRoomList(observer)
}
observeRoomList(observer)
}
fun SlidingSyncView.state(): Flow<SlidingSyncState> = mxCallbackFlow {
fun SlidingSyncView.state(scope: CoroutineScope): Flow<SlidingSyncState> = mxCallbackFlow {
val observer = object : SlidingSyncViewStateObserver {
override fun didReceiveUpdate(newState: SlidingSyncState) {
trySend(newState)
scope.launch {
send(newState)
}
}
}
observeState(observer)
}
fun SlidingSyncView.roomsCount(): Flow<UInt> = mxCallbackFlow {
fun SlidingSyncView.roomsCount(scope: CoroutineScope): Flow<UInt> = mxCallbackFlow {
val observer = object : SlidingSyncViewRoomsCountObserver {
override fun didReceiveUpdate(count: UInt) {
trySend(count)
scope.launch {
send(count)
}
}
}
observeRoomsCount(observer)

View File

@@ -1,11 +1,13 @@
package io.element.android.x.matrix.timeline
import io.element.android.x.core.coroutine.CoroutineDispatchers
import io.element.android.x.core.data.flow.chunk
import io.element.android.x.matrix.core.EventId
import io.element.android.x.matrix.room.MatrixRoom
import io.element.android.x.matrix.room.timelineDiff
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.*
import timber.log.Timber
@@ -14,8 +16,9 @@ import java.util.*
class MatrixTimeline(
private val matrixRoom: MatrixRoom,
private val room: Room,
private val coroutineScope: CoroutineScope,
private val coroutineDispatchers: CoroutineDispatchers,
) {
) : TimelineListener {
interface Callback {
fun onUpdatedTimelineItem(eventId: EventId)
fun onStartedBackPaginating()
@@ -30,9 +33,7 @@ class MatrixTimeline(
fun timelineItems(): Flow<List<MatrixTimelineItem>> {
return diffFlow().combine(timelineItems) { _, _ ->
timelineItems.value
}.sample(50)
return timelineItems.sample(50)
}
val hasMoreToLoad: Boolean
@@ -41,17 +42,6 @@ class MatrixTimeline(
}
private fun diffFlow(): Flow<Unit> {
return room.timelineDiff()
.onEach { timelineDiffs ->
//Timber.v("Apply ${timelineDiffs.size} diffs on thread: ${Thread.currentThread()}")
updateTimelineItems {
applyDiff(timelineDiffs)
}
}.map { }
.flowOn(coroutineDispatchers.computation)
}
private fun MutableList<MatrixTimelineItem>.applyDiff(diff: TimelineDiff) {
when (diff.change()) {
TimelineChange.PUSH -> {
@@ -107,16 +97,21 @@ class MatrixTimeline(
}
}
private fun updateTimelineItems(block: MutableList<MatrixTimelineItem>.() -> Unit) {
val mutableTimelineItems = timelineItems.value.toMutableList()
block(mutableTimelineItems)
timelineItems.value = mutableTimelineItems
}
private suspend fun updateTimelineItems(block: MutableList<MatrixTimelineItem>.() -> Unit) =
withContext(coroutineDispatchers.diffUpdateDispatcher) {
val mutableTimelineItems = timelineItems.value.toMutableList()
block(mutableTimelineItems)
timelineItems.value = mutableTimelineItems
}
fun addListener(timelineListener: TimelineListener) {
room.addTimelineListener(timelineListener)
}
fun initialize() {
addListener(this)
}
fun dispose() {
room.removeTimeline()
}
@@ -128,4 +123,12 @@ class MatrixTimeline(
return matrixRoom.sendMessage(message)
}
override fun onUpdate(update: TimelineDiff) {
coroutineScope.launch {
updateTimelineItems {
applyDiff(update)
}
}
}
}