Merge pull request #957 from vector-im/feature/bma/fixRoomCrash
Rework the way we init and close the RustMatrixRoom
This commit is contained in:
@@ -20,6 +20,7 @@ import android.os.Parcelable
|
||||
import androidx.compose.runtime.Composable
|
||||
import androidx.compose.runtime.DisposableEffect
|
||||
import androidx.compose.ui.Modifier
|
||||
import androidx.lifecycle.Lifecycle
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import com.bumble.appyx.core.composable.Children
|
||||
import com.bumble.appyx.core.lifecycle.subscribe
|
||||
@@ -161,13 +162,16 @@ class RoomLoadedFlowNode @AssistedInject constructor(
|
||||
|
||||
@Composable
|
||||
override fun View(modifier: Modifier) {
|
||||
// Rely on the View Lifecycle instead of the Node Lifecycle,
|
||||
// Rely on the View Lifecycle in addition to the Node Lifecycle,
|
||||
// because this node enters 'onDestroy' before his children, so it can leads to
|
||||
// using the room in a child node where it's already closed.
|
||||
DisposableEffect(Unit) {
|
||||
inputs.room.open()
|
||||
inputs.room.subscribeToSync()
|
||||
onDispose {
|
||||
inputs.room.close()
|
||||
inputs.room.unsubscribeFromSync()
|
||||
if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
|
||||
inputs.room.destroy()
|
||||
}
|
||||
}
|
||||
}
|
||||
Children(
|
||||
|
||||
@@ -63,7 +63,11 @@ interface MatrixRoom : Closeable {
|
||||
|
||||
val timeline: MatrixTimeline
|
||||
|
||||
fun open(): Result<Unit>
|
||||
fun destroy()
|
||||
|
||||
fun subscribeToSync()
|
||||
|
||||
fun unsubscribeFromSync()
|
||||
|
||||
suspend fun userDisplayName(userId: UserId): Result<String?>
|
||||
|
||||
@@ -133,6 +137,8 @@ interface MatrixRoom : Closeable {
|
||||
zoomLevel: Int? = null,
|
||||
assetType: AssetType? = null,
|
||||
): Result<Unit>
|
||||
|
||||
override fun close() = destroy()
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -40,9 +40,6 @@ import io.element.android.libraries.matrix.impl.core.toProgressWatcher
|
||||
import io.element.android.libraries.matrix.impl.media.map
|
||||
import io.element.android.libraries.matrix.impl.room.location.toInner
|
||||
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
|
||||
import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow
|
||||
import io.element.android.libraries.matrix.impl.timeline.eventOrigin
|
||||
import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow
|
||||
import io.element.android.libraries.sessionstorage.api.SessionData
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
@@ -51,11 +48,7 @@ import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.EventItemOrigin
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
@@ -88,7 +81,6 @@ class RustMatrixRoom(
|
||||
|
||||
private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId")
|
||||
private val _membersStateFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
|
||||
private val isInit = MutableStateFlow(false)
|
||||
private val _syncUpdateFlow = MutableStateFlow(0L)
|
||||
private val _timeline by lazy {
|
||||
RustMatrixTimeline(
|
||||
@@ -97,6 +89,7 @@ class RustMatrixRoom(
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
dispatcher = roomDispatcher,
|
||||
lastLoginTimestamp = sessionData.loginTimestamp,
|
||||
onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() }
|
||||
)
|
||||
}
|
||||
|
||||
@@ -106,8 +99,7 @@ class RustMatrixRoom(
|
||||
|
||||
override val timeline: MatrixTimeline = _timeline
|
||||
|
||||
override fun open(): Result<Unit> {
|
||||
if (isInit.value) return Result.failure(IllegalStateException("Listener already registered"))
|
||||
override fun subscribeToSync() {
|
||||
val settings = RoomSubscription(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
@@ -118,35 +110,16 @@ class RustMatrixRoom(
|
||||
timelineLimit = null
|
||||
)
|
||||
roomListItem.subscribe(settings)
|
||||
roomCoroutineScope.launch(roomDispatcher) {
|
||||
innerRoom.timelineDiffFlow { initialList ->
|
||||
_timeline.postItems(initialList)
|
||||
}.onEach { diff ->
|
||||
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
|
||||
_syncUpdateFlow.value = systemClock.epochMillis()
|
||||
}
|
||||
_timeline.postDiff(diff)
|
||||
}.launchIn(this)
|
||||
|
||||
innerRoom.backPaginationStatusFlow()
|
||||
.onEach {
|
||||
_timeline.postPaginationStatus(it)
|
||||
}.launchIn(this)
|
||||
|
||||
fetchMembers()
|
||||
}
|
||||
isInit.value = true
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
if (isInit.value) {
|
||||
isInit.value = false
|
||||
roomCoroutineScope.cancel()
|
||||
roomListItem.unsubscribe()
|
||||
innerRoom.destroy()
|
||||
roomListItem.destroy()
|
||||
}
|
||||
override fun unsubscribeFromSync() {
|
||||
roomListItem.unsubscribe()
|
||||
}
|
||||
|
||||
override fun destroy() {
|
||||
roomCoroutineScope.cancel()
|
||||
innerRoom.destroy()
|
||||
roomListItem.destroy()
|
||||
}
|
||||
|
||||
override val name: String?
|
||||
@@ -363,12 +336,6 @@ class RustMatrixRoom(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(roomDispatcher) {
|
||||
runCatching {
|
||||
innerRoom.fetchMembers()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(roomDispatcher) {
|
||||
runCatching {
|
||||
innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason)
|
||||
|
||||
@@ -26,8 +26,8 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
@@ -37,17 +37,21 @@ import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.getAndUpdate
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.mapLatest
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.sample
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.BackPaginationStatus
|
||||
import org.matrix.rustcomponents.sdk.EventItemOrigin
|
||||
import org.matrix.rustcomponents.sdk.PaginationOptions
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.Date
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
@@ -57,6 +61,7 @@ class RustMatrixTimeline(
|
||||
private val innerRoom: Room,
|
||||
private val dispatcher: CoroutineDispatcher,
|
||||
private val lastLoginTimestamp: Date?,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
) : MatrixTimeline {
|
||||
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
@@ -93,13 +98,40 @@ class RustMatrixTimeline(
|
||||
|
||||
override val paginationState: StateFlow<MatrixTimeline.PaginationState> = _paginationState.asStateFlow()
|
||||
|
||||
init {
|
||||
Timber.d("Initialize timeline for room ${matrixRoom.roomId}")
|
||||
roomCoroutineScope.launch(dispatcher) {
|
||||
innerRoom.timelineDiffFlow { initialList ->
|
||||
postItems(initialList)
|
||||
}.onEach { diff ->
|
||||
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiff(diff)
|
||||
}.launchIn(this)
|
||||
|
||||
innerRoom.backPaginationStatusFlow()
|
||||
.onEach {
|
||||
postPaginationStatus(it)
|
||||
}.launchIn(this)
|
||||
|
||||
fetchMembers()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(dispatcher) {
|
||||
runCatching {
|
||||
innerRoom.fetchMembers()
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class)
|
||||
override val timelineItems: Flow<List<MatrixTimelineItem>> = _timelineItems.sample(50)
|
||||
.mapLatest { items ->
|
||||
encryptedHistoryPostProcessor.process(items)
|
||||
}
|
||||
|
||||
internal suspend fun postItems(items: List<TimelineItem>) {
|
||||
private suspend fun postItems(items: List<TimelineItem>) {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
timelineDiffProcessor.postItems(it)
|
||||
@@ -108,12 +140,12 @@ class RustMatrixTimeline(
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
internal suspend fun postDiff(timelineDiff: TimelineDiff) {
|
||||
private suspend fun postDiff(timelineDiff: TimelineDiff) {
|
||||
initLatch.await()
|
||||
timelineDiffProcessor.postDiff(timelineDiff)
|
||||
}
|
||||
|
||||
internal fun postPaginationStatus(status: BackPaginationStatus) {
|
||||
private fun postPaginationStatus(status: BackPaginationStatus) {
|
||||
_paginationState.getAndUpdate { currentPaginationState ->
|
||||
if (hasEncryptionHistoryBanner()) {
|
||||
return@getAndUpdate currentPaginationState.copy(
|
||||
|
||||
@@ -100,7 +100,6 @@ class FakeMatrixRoom(
|
||||
private val _sentLocations = mutableListOf<SendLocationInvocation>()
|
||||
val sentLocations: List<SendLocationInvocation> = _sentLocations
|
||||
|
||||
|
||||
var invitedUserId: UserId? = null
|
||||
private set
|
||||
|
||||
@@ -128,9 +127,11 @@ class FakeMatrixRoom(
|
||||
|
||||
override val timeline: MatrixTimeline = matrixTimeline
|
||||
|
||||
override fun open(): Result<Unit> {
|
||||
return Result.success(Unit)
|
||||
}
|
||||
override fun subscribeToSync() = Unit
|
||||
|
||||
override fun unsubscribeFromSync() = Unit
|
||||
|
||||
override fun destroy() = Unit
|
||||
|
||||
override suspend fun userDisplayName(userId: UserId): Result<String?> = simulateLongTask {
|
||||
userDisplayNameResult
|
||||
@@ -283,8 +284,6 @@ class FakeMatrixRoom(
|
||||
return sendLocationResult
|
||||
}
|
||||
|
||||
override fun close() = Unit
|
||||
|
||||
fun givenLeaveRoomError(throwable: Throwable?) {
|
||||
this.leaveRoomError = throwable
|
||||
}
|
||||
|
||||
@@ -87,7 +87,6 @@ class RoomListScreen(
|
||||
Singleton.appScope.launch {
|
||||
withContext(coroutineDispatchers.io) {
|
||||
matrixClient.getRoom(roomId)!!.use { room ->
|
||||
room.open()
|
||||
room.timeline.paginateBackwards(20, 50)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user