RoomList: make the main room list working (WIP)
This commit is contained in:
@@ -19,6 +19,15 @@ package io.element.android.libraries.matrix.api.room
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
||||
interface RoomSummaryDataSource {
|
||||
|
||||
enum class LoadingState {
|
||||
NotLoaded,
|
||||
PreLoaded,
|
||||
PartiallyLoaded,
|
||||
FullyLoaded,
|
||||
}
|
||||
|
||||
fun loadingState(): StateFlow<LoadingState>
|
||||
fun roomSummaries(): StateFlow<List<RoomSummary>>
|
||||
fun setSlidingSyncRange(range: IntRange)
|
||||
}
|
||||
|
||||
@@ -31,7 +31,6 @@ import io.element.android.libraries.matrix.api.pusher.PushersService
|
||||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.room.RoomMembershipObserver
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
|
||||
import io.element.android.libraries.matrix.api.user.MatrixSearchUserResults
|
||||
import io.element.android.libraries.matrix.api.user.MatrixUser
|
||||
import io.element.android.libraries.matrix.api.verification.SessionVerificationService
|
||||
@@ -40,7 +39,6 @@ import io.element.android.libraries.matrix.impl.notification.RustNotificationSer
|
||||
import io.element.android.libraries.matrix.impl.pushers.RustPushersService
|
||||
import io.element.android.libraries.matrix.impl.room.RustMatrixRoom
|
||||
import io.element.android.libraries.matrix.impl.room.RustRoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.impl.sync.SlidingSyncObserverProxy
|
||||
import io.element.android.libraries.matrix.impl.usersearch.UserProfileMapper
|
||||
import io.element.android.libraries.matrix.impl.usersearch.UserSearchResultMapper
|
||||
import io.element.android.libraries.matrix.impl.verification.RustSessionVerificationService
|
||||
@@ -50,21 +48,12 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import org.matrix.rustcomponents.sdk.Client
|
||||
import org.matrix.rustcomponents.sdk.ClientDelegate
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncList
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListBuilder
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListOnceBuilt
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRequestListFilters
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncSelectiveModeBuilder
|
||||
import org.matrix.rustcomponents.sdk.TaskHandle
|
||||
import org.matrix.rustcomponents.sdk.use
|
||||
import timber.log.Timber
|
||||
@@ -101,75 +90,11 @@ class RustMatrixClient constructor(
|
||||
}
|
||||
}
|
||||
|
||||
private val visibleRoomsSlidingSyncFilters = SlidingSyncRequestListFilters(
|
||||
isDm = null,
|
||||
spaces = emptyList(),
|
||||
isEncrypted = null,
|
||||
isInvite = false,
|
||||
isTombstoned = false,
|
||||
roomTypes = emptyList(),
|
||||
notRoomTypes = listOf("m.space"),
|
||||
roomNameLike = null,
|
||||
tags = emptyList(),
|
||||
notTags = emptyList()
|
||||
)
|
||||
private val roomList = client.roomList()
|
||||
|
||||
private val visibleRoomsSlidingSyncList = MutableSharedFlow<SlidingSyncList>(replay = 1)
|
||||
private val visibleRoomsSlidingSyncListBuilder = SlidingSyncListBuilder("CurrentlyVisibleRooms")
|
||||
.timelineLimit(limit = 1u)
|
||||
.requiredState(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_AVATAR, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_ENCRYPTION, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""),
|
||||
)
|
||||
)
|
||||
.filters(visibleRoomsSlidingSyncFilters)
|
||||
.syncModeSelective(SlidingSyncSelectiveModeBuilder().addRange(0u, 20u))
|
||||
.onceBuilt(object : SlidingSyncListOnceBuilt {
|
||||
override fun updateList(list: SlidingSyncList): SlidingSyncList {
|
||||
visibleRoomsSlidingSyncList.tryEmit(list)
|
||||
return list
|
||||
}
|
||||
})
|
||||
|
||||
private val invitesSlidingSyncFilters = visibleRoomsSlidingSyncFilters.copy(isInvite = true)
|
||||
|
||||
private val invitesSlidingSyncList = MutableSharedFlow<SlidingSyncList>(replay = 1)
|
||||
private val invitesSlidingSyncListBuilder = SlidingSyncListBuilder("CurrentInvites")
|
||||
.timelineLimit(limit = 1u)
|
||||
.requiredState(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_AVATAR, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_ENCRYPTION, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
)
|
||||
)
|
||||
.filters(invitesSlidingSyncFilters)
|
||||
.syncModeSelective(SlidingSyncSelectiveModeBuilder().addRange(0u, 20u))
|
||||
.onceBuilt(object : SlidingSyncListOnceBuilt {
|
||||
override fun updateList(list: SlidingSyncList): SlidingSyncList {
|
||||
invitesSlidingSyncList.tryEmit(list)
|
||||
return list
|
||||
}
|
||||
})
|
||||
|
||||
private val slidingSync = client
|
||||
.slidingSync("ElementX")
|
||||
// .homeserver("https://slidingsync.lab.matrix.org")
|
||||
.withCommonExtensions()
|
||||
.addList(visibleRoomsSlidingSyncListBuilder)
|
||||
.addList(invitesSlidingSyncListBuilder)
|
||||
.use {
|
||||
it.build()
|
||||
}
|
||||
|
||||
private val slidingSyncObserverProxy = SlidingSyncObserverProxy(coroutineScope)
|
||||
private val rustRoomSummaryDataSource: RustRoomSummaryDataSource =
|
||||
RustRoomSummaryDataSource(
|
||||
slidingSyncObserverProxy.updateSummaryFlow,
|
||||
slidingSync,
|
||||
visibleRoomsSlidingSyncList,
|
||||
roomList,
|
||||
dispatchers,
|
||||
)
|
||||
|
||||
@@ -178,9 +103,7 @@ class RustMatrixClient constructor(
|
||||
|
||||
private val rustInvitesDataSource: RustRoomSummaryDataSource =
|
||||
RustRoomSummaryDataSource(
|
||||
slidingSyncObserverProxy.updateSummaryFlow,
|
||||
slidingSync,
|
||||
invitesSlidingSyncList,
|
||||
roomList,
|
||||
dispatchers,
|
||||
)
|
||||
|
||||
@@ -199,25 +122,19 @@ class RustMatrixClient constructor(
|
||||
|
||||
init {
|
||||
client.setDelegate(clientDelegate)
|
||||
rustRoomSummaryDataSource.init()
|
||||
rustInvitesDataSource.init()
|
||||
slidingSync.setObserver(slidingSyncObserverProxy)
|
||||
slidingSyncUpdateJob = slidingSyncObserverProxy.updateSummaryFlow
|
||||
.onEach { onSlidingSyncUpdate() }
|
||||
.launchIn(coroutineScope)
|
||||
rustRoomSummaryDataSource.subscribeIfNeeded()
|
||||
//rustInvitesDataSource.init()
|
||||
}
|
||||
|
||||
override fun getRoom(roomId: RoomId): MatrixRoom? {
|
||||
val slidingSyncRoom = slidingSync.getRoom(roomId.value) ?: return null
|
||||
val fullRoom = slidingSyncRoom.fullRoom() ?: return null
|
||||
val roomListItem = roomList.room(roomId.value)
|
||||
val fullRoom = roomListItem.fullRoom()
|
||||
return RustMatrixRoom(
|
||||
sessionId = sessionId,
|
||||
slidingSyncUpdateFlow = slidingSyncObserverProxy.updateSummaryFlow,
|
||||
slidingSyncRoom = slidingSyncRoom,
|
||||
roomListItem = roomListItem,
|
||||
innerRoom = fullRoom,
|
||||
coroutineScope = coroutineScope,
|
||||
coroutineDispatchers = dispatchers,
|
||||
clock = clock,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -261,9 +178,11 @@ class RustMatrixClient constructor(
|
||||
|
||||
// Wait to receive the room back from the sync
|
||||
withTimeout(30_000L) {
|
||||
slidingSyncObserverProxy.updateSummaryFlow.filter { roomId.value in it.rooms }.first()
|
||||
roomSummaryDataSource.roomSummaries()
|
||||
.filter { roomSummaries ->
|
||||
roomSummaries.map { it.identifier() }.contains(roomId.value)
|
||||
}.first()
|
||||
}
|
||||
|
||||
roomId
|
||||
}
|
||||
}
|
||||
@@ -301,7 +220,7 @@ class RustMatrixClient constructor(
|
||||
|
||||
override fun startSync() {
|
||||
if (isSyncing.compareAndSet(false, true)) {
|
||||
slidingSyncObserverToken = slidingSync.sync()
|
||||
slidingSyncObserverToken = roomList.sync()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,16 +233,11 @@ class RustMatrixClient constructor(
|
||||
override fun close() {
|
||||
slidingSyncUpdateJob?.cancel()
|
||||
stopSync()
|
||||
slidingSync.setObserver(null)
|
||||
rustRoomSummaryDataSource.close()
|
||||
rustInvitesDataSource.close()
|
||||
client.setDelegate(null)
|
||||
visibleRoomsSlidingSyncListBuilder.destroy()
|
||||
invitesSlidingSyncListBuilder.destroy()
|
||||
visibleRoomsSlidingSyncList.resetReplayCache()
|
||||
invitesSlidingSyncList.resetReplayCache()
|
||||
slidingSync.destroy()
|
||||
verificationService.destroy()
|
||||
roomList.destroy()
|
||||
client.destroy()
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import org.matrix.rustcomponents.sdk.RoomList
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesListener
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.RoomListState
|
||||
import org.matrix.rustcomponents.sdk.RoomListStateListener
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListStateObserver
|
||||
|
||||
fun RoomList.stateFlow(): Flow<RoomListState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListStateListener {
|
||||
override fun onUpdate(state: RoomListState) {
|
||||
trySendBlocking(state)
|
||||
}
|
||||
}
|
||||
state(listener)
|
||||
}
|
||||
|
||||
fun RoomList.loadingStateFlow(): Flow<SlidingSyncListLoadingState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : SlidingSyncListStateObserver {
|
||||
override fun didReceiveUpdate(newState: SlidingSyncListLoadingState) {
|
||||
trySendBlocking(newState)
|
||||
}
|
||||
}
|
||||
val result = entriesLoadingState(listener)
|
||||
send(result.entriesLoadingState)
|
||||
result.entriesLoadingStateStream
|
||||
}
|
||||
|
||||
fun RoomList.roomListEntriesUpdateFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<RoomListEntriesUpdate> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListEntriesListener {
|
||||
override fun onUpdate(roomEntriesUpdate: RoomListEntriesUpdate) {
|
||||
trySendBlocking(roomEntriesUpdate)
|
||||
}
|
||||
}
|
||||
val result = entries(listener)
|
||||
onInitialList(result.entries)
|
||||
result.entriesStream
|
||||
}
|
||||
|
||||
@@ -20,27 +20,24 @@ import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummaryDetails
|
||||
import io.element.android.libraries.matrix.impl.room.message.RoomMessageFactory
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRoom
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
|
||||
class RoomSummaryDetailsFactory(private val roomMessageFactory: RoomMessageFactory = RoomMessageFactory()) {
|
||||
|
||||
fun create(slidingSyncRoom: SlidingSyncRoom, room: Room?): RoomSummaryDetails {
|
||||
val latestRoomMessage = slidingSyncRoom.latestRoomMessage()?.use {
|
||||
fun create(roomListItem: RoomListItem, room: Room?): RoomSummaryDetails {
|
||||
val latestRoomMessage = roomListItem.latestEvent()?.use {
|
||||
roomMessageFactory.create(it)
|
||||
}
|
||||
|
||||
return RoomSummaryDetails(
|
||||
roomId = RoomId(slidingSyncRoom.roomId()),
|
||||
name = slidingSyncRoom.name() ?: slidingSyncRoom.roomId(),
|
||||
roomId = RoomId(roomListItem.id()),
|
||||
name = roomListItem.name() ?: roomListItem.id(),
|
||||
canonicalAlias = room?.canonicalAlias(),
|
||||
isDirect = room?.isDirect() ?: false,
|
||||
avatarURLString = room?.avatarUrl(),
|
||||
unreadNotificationCount = slidingSyncRoom.unreadNotifications().use { it.notificationCount().toInt() },
|
||||
unreadNotificationCount = roomListItem.unreadNotifications().use { it.notificationCount().toInt() },
|
||||
lastMessage = latestRoomMessage,
|
||||
lastMessageTimestamp = latestRoomMessage?.originServerTs,
|
||||
inviter = room?.inviter()?.let(RoomMemberMapper::map),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -34,30 +34,28 @@ import io.element.android.libraries.matrix.impl.media.map
|
||||
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.emptyFlow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
import org.matrix.rustcomponents.sdk.RoomMember
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRoom
|
||||
import org.matrix.rustcomponents.sdk.UpdateSummary
|
||||
import org.matrix.rustcomponents.sdk.genTransactionId
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
|
||||
import java.io.File
|
||||
|
||||
class RustMatrixRoom(
|
||||
override val sessionId: SessionId,
|
||||
private val slidingSyncUpdateFlow: Flow<UpdateSummary>,
|
||||
private val slidingSyncRoom: SlidingSyncRoom,
|
||||
private val roomListItem: RoomListItem,
|
||||
private val innerRoom: Room,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val clock: SystemClock,
|
||||
) : MatrixRoom {
|
||||
|
||||
override val membersStateFlow: StateFlow<MatrixRoomMembersState>
|
||||
@@ -69,21 +67,15 @@ class RustMatrixRoom(
|
||||
RustMatrixTimeline(
|
||||
matrixRoom = this,
|
||||
innerRoom = innerRoom,
|
||||
slidingSyncRoom = slidingSyncRoom,
|
||||
roomListItem = roomListItem,
|
||||
coroutineScope = coroutineScope,
|
||||
coroutineDispatchers = coroutineDispatchers
|
||||
)
|
||||
}
|
||||
|
||||
override fun syncUpdateFlow(): Flow<Long> {
|
||||
return slidingSyncUpdateFlow
|
||||
.filter {
|
||||
it.rooms.contains(roomId.value)
|
||||
}
|
||||
.map {
|
||||
clock.epochMillis()
|
||||
}
|
||||
.onStart { emit(clock.epochMillis()) }
|
||||
//TODO branch this somehow...
|
||||
return emptyFlow()
|
||||
}
|
||||
|
||||
override fun timeline(): MatrixTimeline {
|
||||
@@ -92,14 +84,14 @@ class RustMatrixRoom(
|
||||
|
||||
override fun close() {
|
||||
innerRoom.destroy()
|
||||
slidingSyncRoom.destroy()
|
||||
roomListItem.destroy()
|
||||
}
|
||||
|
||||
override val roomId = RoomId(innerRoom.id())
|
||||
|
||||
override val name: String?
|
||||
get() {
|
||||
return slidingSyncRoom.name()
|
||||
return roomListItem.name()
|
||||
}
|
||||
|
||||
override val bestName: String
|
||||
|
||||
@@ -19,36 +19,27 @@ package io.element.android.libraries.matrix.impl.room
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummary
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.impl.sync.roomListDiff
|
||||
import io.element.android.libraries.matrix.impl.sync.state
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.firstOrNull
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.RoomList
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.SlidingSync
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncList
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomsListDiff
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncSelectiveModeBuilder
|
||||
import org.matrix.rustcomponents.sdk.RoomListInput
|
||||
import org.matrix.rustcomponents.sdk.RoomListRange
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.UpdateSummary
|
||||
import timber.log.Timber
|
||||
import java.io.Closeable
|
||||
import java.util.UUID
|
||||
|
||||
internal class RustRoomSummaryDataSource(
|
||||
private val slidingSyncUpdateFlow: Flow<UpdateSummary>,
|
||||
private val slidingSync: SlidingSync,
|
||||
private val slidingSyncListFlow: Flow<SlidingSyncList>,
|
||||
private val roomList: RoomList,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
|
||||
) : RoomSummaryDataSource, Closeable {
|
||||
@@ -56,39 +47,24 @@ internal class RustRoomSummaryDataSource(
|
||||
private val coroutineScope = CoroutineScope(SupervisorJob() + coroutineDispatchers.io)
|
||||
|
||||
private val roomSummaries = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
private val state = MutableStateFlow(SlidingSyncListLoadingState.NOT_LOADED)
|
||||
private val loadingState = MutableStateFlow(RoomSummaryDataSource.LoadingState.NotLoaded)
|
||||
|
||||
fun init() {
|
||||
fun subscribeIfNeeded() {
|
||||
coroutineScope.launch {
|
||||
val slidingSyncList = slidingSyncListFlow.first()
|
||||
val summaries = slidingSyncList.currentRoomList().map(::buildSummaryForRoomListEntry)
|
||||
updateRoomSummaries {
|
||||
addAll(summaries)
|
||||
}
|
||||
|
||||
slidingSyncList.roomListDiff(this)
|
||||
.onEach { diffs ->
|
||||
updateRoomSummaries {
|
||||
applyDiff(diffs)
|
||||
}
|
||||
roomList.roomListEntriesUpdateFlow { roomListEntries ->
|
||||
val summaries = roomListEntries.map(::buildSummaryForRoomListEntry)
|
||||
updateRoomSummaries {
|
||||
addAll(summaries)
|
||||
}
|
||||
.launchIn(this)
|
||||
|
||||
slidingSyncList.state(this)
|
||||
.onEach { slidingSyncState ->
|
||||
Timber.v("New sliding sync state: $slidingSyncState")
|
||||
state.value = slidingSyncState
|
||||
}.launchIn(this)
|
||||
}.onEach {
|
||||
updateRoomSummaries {
|
||||
applyUpdate(it)
|
||||
}
|
||||
}.launchIn(this)
|
||||
}
|
||||
|
||||
slidingSyncUpdateFlow
|
||||
.onEach {
|
||||
didReceiveSyncUpdate(it)
|
||||
}.launchIn(coroutineScope)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
runBlocking { slidingSyncListFlow.firstOrNull() }?.close()
|
||||
coroutineScope.cancel()
|
||||
}
|
||||
|
||||
@@ -96,77 +72,64 @@ internal class RustRoomSummaryDataSource(
|
||||
return roomSummaries
|
||||
}
|
||||
|
||||
override fun loadingState(): StateFlow<RoomSummaryDataSource.LoadingState> {
|
||||
return loadingState
|
||||
}
|
||||
|
||||
override fun setSlidingSyncRange(range: IntRange) {
|
||||
Timber.v("setVisibleRange=$range")
|
||||
coroutineScope.launch {
|
||||
val slidingSyncMode = SlidingSyncSelectiveModeBuilder()
|
||||
.addRange(range.first.toUInt(), range.last.toUInt())
|
||||
slidingSyncListFlow.first().setSyncMode(slidingSyncMode)
|
||||
val ranges = listOf(RoomListRange(range.first.toUInt(), range.last.toUInt()))
|
||||
roomList.applyInput(
|
||||
RoomListInput.Viewport(ranges)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun didReceiveSyncUpdate(summary: UpdateSummary) {
|
||||
Timber.v("UpdateRooms with identifiers: ${summary.rooms}")
|
||||
if (state.value != SlidingSyncListLoadingState.FULLY_LOADED) {
|
||||
return
|
||||
}
|
||||
updateRoomSummaries {
|
||||
for (identifier in summary.rooms) {
|
||||
val index = indexOfFirst { it.identifier() == identifier }
|
||||
if (index == -1) {
|
||||
continue
|
||||
}
|
||||
val updatedRoomSummary = buildRoomSummaryForIdentifier(identifier)
|
||||
set(index, updatedRoomSummary)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun MutableList<RoomSummary>.applyDiff(diff: SlidingSyncListRoomsListDiff) {
|
||||
private fun MutableList<RoomSummary>.applyUpdate(update: RoomListEntriesUpdate) {
|
||||
fun MutableList<RoomSummary>.fillUntil(untilIndex: Int) {
|
||||
repeat((size - 1 until untilIndex).count()) {
|
||||
add(buildEmptyRoomSummary())
|
||||
}
|
||||
}
|
||||
Timber.v("ApplyDiff: $diff for list with size: $size")
|
||||
when (diff) {
|
||||
is SlidingSyncListRoomsListDiff.Append -> {
|
||||
val roomSummaries = diff.values.map {
|
||||
when (update) {
|
||||
is RoomListEntriesUpdate.Append -> {
|
||||
val roomSummaries = update.values.map {
|
||||
buildSummaryForRoomListEntry(it)
|
||||
}
|
||||
addAll(roomSummaries)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.PushBack -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
is RoomListEntriesUpdate.PushBack -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
add(roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.PushFront -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
is RoomListEntriesUpdate.PushFront -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
add(0, roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Set -> {
|
||||
fillUntil(diff.index.toInt())
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
set(diff.index.toInt(), roomSummary)
|
||||
is RoomListEntriesUpdate.Set -> {
|
||||
fillUntil(update.index.toInt())
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
set(update.index.toInt(), roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Insert -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
add(diff.index.toInt(), roomSummary)
|
||||
is RoomListEntriesUpdate.Insert -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
add(update.index.toInt(), roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Remove -> {
|
||||
removeAt(diff.index.toInt())
|
||||
is RoomListEntriesUpdate.Remove -> {
|
||||
removeAt(update.index.toInt())
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Reset -> {
|
||||
is RoomListEntriesUpdate.Reset -> {
|
||||
clear()
|
||||
addAll(diff.values.map { buildSummaryForRoomListEntry(it) })
|
||||
addAll(update.values.map { buildSummaryForRoomListEntry(it) })
|
||||
}
|
||||
SlidingSyncListRoomsListDiff.PopBack -> {
|
||||
RoomListEntriesUpdate.PopBack -> {
|
||||
removeFirstOrNull()
|
||||
}
|
||||
SlidingSyncListRoomsListDiff.PopFront -> {
|
||||
RoomListEntriesUpdate.PopFront -> {
|
||||
removeLastOrNull()
|
||||
}
|
||||
SlidingSyncListRoomsListDiff.Clear -> {
|
||||
RoomListEntriesUpdate.Clear -> {
|
||||
clear()
|
||||
}
|
||||
}
|
||||
@@ -185,14 +148,13 @@ internal class RustRoomSummaryDataSource(
|
||||
}
|
||||
|
||||
private fun buildRoomSummaryForIdentifier(identifier: String): RoomSummary {
|
||||
val slidingSyncRoom = slidingSync.getRoom(identifier) ?: return RoomSummary.Empty(identifier)
|
||||
val fullRoom = slidingSyncRoom.fullRoom()
|
||||
val roomSummary = RoomSummary.Filled(
|
||||
details = roomSummaryDetailsFactory.create(slidingSyncRoom, fullRoom)
|
||||
)
|
||||
fullRoom?.destroy()
|
||||
slidingSyncRoom.destroy()
|
||||
return roomSummary
|
||||
return roomList.room(identifier).use { roomListItem ->
|
||||
roomListItem.fullRoom().use { fullRoom ->
|
||||
RoomSummary.Filled(
|
||||
details = roomSummaryDetailsFactory.create(roomListItem, fullRoom)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun updateRoomSummaries(block: MutableList<RoomSummary>.() -> Unit) =
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncList
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomListObserver
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomsCountObserver
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomsListDiff
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListStateObserver
|
||||
|
||||
fun SlidingSyncList.roomListDiff(scope: CoroutineScope): Flow<SlidingSyncListRoomsListDiff> =
|
||||
mxCallbackFlow {
|
||||
val observer = object : SlidingSyncListRoomListObserver {
|
||||
override fun didReceiveUpdate(diff: SlidingSyncListRoomsListDiff) {
|
||||
scope.launch {
|
||||
send(diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
observeRoomList(observer)
|
||||
}
|
||||
|
||||
fun SlidingSyncList.state(scope: CoroutineScope): Flow<SlidingSyncListLoadingState> = mxCallbackFlow {
|
||||
val observer = object : SlidingSyncListStateObserver {
|
||||
override fun didReceiveUpdate(newState: SlidingSyncListLoadingState) {
|
||||
scope.launch {
|
||||
send(newState)
|
||||
}
|
||||
}
|
||||
}
|
||||
observeState(observer)
|
||||
}
|
||||
|
||||
fun SlidingSyncList.roomsCount(scope: CoroutineScope): Flow<UInt> = mxCallbackFlow {
|
||||
val observer = object : SlidingSyncListRoomsCountObserver {
|
||||
override fun didReceiveUpdate(count: UInt) {
|
||||
scope.launch {
|
||||
send(count)
|
||||
}
|
||||
}
|
||||
}
|
||||
observeRoomsCount(observer)
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncObserver
|
||||
import org.matrix.rustcomponents.sdk.UpdateSummary
|
||||
|
||||
// Sounds like a reasonable buffer size before it suspends emitting new items.
|
||||
private const val BUFFER_SIZE = 64
|
||||
|
||||
class SlidingSyncObserverProxy(
|
||||
private val coroutineScope: CoroutineScope,
|
||||
) : SlidingSyncObserver {
|
||||
|
||||
private val updateSummaryMutableFlow =
|
||||
MutableSharedFlow<UpdateSummary>(extraBufferCapacity = BUFFER_SIZE)
|
||||
val updateSummaryFlow: SharedFlow<UpdateSummary> = updateSummaryMutableFlow.asSharedFlow()
|
||||
|
||||
override fun didReceiveSyncUpdate(summary: UpdateSummary) {
|
||||
coroutineScope.launch {
|
||||
updateSummaryMutableFlow.emit(summary)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -37,10 +37,10 @@ internal class MatrixTimelineDiffProcessor(
|
||||
private val timelineItemFactory: MatrixTimelineItemMapper,
|
||||
) : TimelineListener {
|
||||
|
||||
override fun onUpdate(update: TimelineDiff) {
|
||||
override fun onUpdate(diff: TimelineDiff) {
|
||||
coroutineScope.launch {
|
||||
updateTimelineItems {
|
||||
applyDiff(update)
|
||||
applyDiff(diff)
|
||||
}
|
||||
when (val firstItem = timelineItems.value.firstOrNull()) {
|
||||
is MatrixTimelineItem.Virtual -> updateBackPaginationState(firstItem.virtual)
|
||||
|
||||
@@ -38,6 +38,7 @@ import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.PaginationOptions
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
import org.matrix.rustcomponents.sdk.RoomSubscription
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRoom
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
||||
class RustMatrixTimeline(
|
||||
private val matrixRoom: MatrixRoom,
|
||||
private val innerRoom: Room,
|
||||
private val slidingSyncRoom: SlidingSyncRoom,
|
||||
private val roomListItem: RoomListItem,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
) : MatrixTimeline {
|
||||
@@ -166,12 +167,12 @@ class RustMatrixTimeline(
|
||||
),
|
||||
timelineLimit = null
|
||||
)
|
||||
slidingSyncRoom.subscribeToRoom(settings)
|
||||
val result = slidingSyncRoom.addTimelineListener(timelineListener)
|
||||
roomListItem.subscribe(settings)
|
||||
val result = innerRoom.addTimelineListener(timelineListener)
|
||||
launch {
|
||||
fetchMembers()
|
||||
}
|
||||
listenerTokens += result.taskHandle
|
||||
listenerTokens += result.itemsStream
|
||||
result.items
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user