Room : makes subscribeToSync/unsubscribeFromSync suspendable and makes sure we keep subscription count
This commit is contained in:
@@ -46,6 +46,7 @@ import io.element.android.libraries.matrix.api.core.UserId
|
||||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.room.RoomMembershipObserver
|
||||
import io.element.android.services.appnavstate.api.AppNavigationStateService
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
@@ -60,6 +61,7 @@ class RoomLoadedFlowNode @AssistedInject constructor(
|
||||
private val messagesEntryPoint: MessagesEntryPoint,
|
||||
private val roomDetailsEntryPoint: RoomDetailsEntryPoint,
|
||||
private val appNavigationStateService: AppNavigationStateService,
|
||||
private val appCoroutineScope: CoroutineScope,
|
||||
roomComponentFactory: RoomComponentFactory,
|
||||
roomMembershipObserver: RoomMembershipObserver,
|
||||
) : BackstackNode<RoomLoadedFlowNode.NavTarget>(
|
||||
@@ -91,6 +93,16 @@ class RoomLoadedFlowNode @AssistedInject constructor(
|
||||
appNavigationStateService.onNavigateToRoom(id, inputs.room.roomId)
|
||||
fetchRoomMembers()
|
||||
},
|
||||
onResume = {
|
||||
appCoroutineScope.launch {
|
||||
inputs.room.subscribeToSync()
|
||||
}
|
||||
},
|
||||
onPause = {
|
||||
appCoroutineScope.launch {
|
||||
inputs.room.unsubscribeFromSync()
|
||||
}
|
||||
},
|
||||
onDestroy = {
|
||||
Timber.v("OnDestroy")
|
||||
appNavigationStateService.onLeavingRoom(id)
|
||||
@@ -162,9 +174,7 @@ class RoomLoadedFlowNode @AssistedInject constructor(
|
||||
// because this node enters 'onDestroy' before his children, so it can leads to
|
||||
// using the room in a child node where it's already closed.
|
||||
DisposableEffect(Unit) {
|
||||
inputs.room.subscribeToSync()
|
||||
onDispose {
|
||||
inputs.room.unsubscribeFromSync()
|
||||
if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
|
||||
inputs.room.destroy()
|
||||
}
|
||||
|
||||
@@ -77,9 +77,9 @@ interface MatrixRoom : Closeable {
|
||||
|
||||
fun destroy()
|
||||
|
||||
fun subscribeToSync()
|
||||
suspend fun subscribeToSync()
|
||||
|
||||
fun unsubscribeFromSync()
|
||||
suspend fun unsubscribeFromSync()
|
||||
|
||||
suspend fun userDisplayName(userId: UserId): Result<String?>
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ import io.element.android.libraries.matrix.impl.notificationsettings.RustNotific
|
||||
import io.element.android.libraries.matrix.impl.oidc.toRustAction
|
||||
import io.element.android.libraries.matrix.impl.pushers.RustPushersService
|
||||
import io.element.android.libraries.matrix.impl.room.RoomContentForwarder
|
||||
import io.element.android.libraries.matrix.impl.room.RoomSyncSubscriber
|
||||
import io.element.android.libraries.matrix.impl.room.RustMatrixRoom
|
||||
import io.element.android.libraries.matrix.impl.roomlist.RustRoomListService
|
||||
import io.element.android.libraries.matrix.impl.roomlist.roomOrNull
|
||||
@@ -114,6 +115,7 @@ class RustMatrixClient constructor(
|
||||
|
||||
private val notificationService = RustNotificationService(sessionId, notificationClient, dispatchers, clock)
|
||||
private val notificationSettingsService = RustNotificationSettingsService(notificationSettings, dispatchers)
|
||||
private val roomSyncSubscriber = RoomSyncSubscriber(innerRoomListService, dispatchers)
|
||||
|
||||
private val isLoggingOut = AtomicBoolean(false)
|
||||
|
||||
@@ -185,6 +187,7 @@ class RustMatrixClient constructor(
|
||||
systemClock = clock,
|
||||
roomContentForwarder = roomContentForwarder,
|
||||
sessionData = sessionStore.getSession(sessionId.value)!!,
|
||||
roomSyncSubscriber = roomSyncSubscriber
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -292,7 +295,6 @@ class RustMatrixClient constructor(
|
||||
runCatching { client.removeAvatar() }
|
||||
}
|
||||
|
||||
|
||||
override fun syncService(): SyncService = rustSyncService
|
||||
|
||||
override fun sessionVerificationService(): SessionVerificationService = verificationService
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.RoomSubscription
|
||||
import timber.log.Timber
|
||||
|
||||
class RoomSyncSubscriber(
|
||||
private val roomListService: RoomListService,
|
||||
private val dispatchers: CoroutineDispatchers,
|
||||
) {
|
||||
|
||||
private val subscriptionCounts = HashMap<RoomId, Int>()
|
||||
private val mutex = Mutex()
|
||||
|
||||
private val settings = RoomSubscription(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""),
|
||||
),
|
||||
timelineLimit = null
|
||||
)
|
||||
|
||||
suspend fun subscribe(roomId: RoomId) = mutex.withLock {
|
||||
withContext(dispatchers.io) {
|
||||
try {
|
||||
val currentSubscription = subscriptionCounts.getOrElse(roomId) { 0 }
|
||||
if (currentSubscription == 0) {
|
||||
Timber.d("Subscribing to room $roomId}")
|
||||
roomListService.room(roomId.value).use { roomListItem ->
|
||||
roomListItem.subscribe(settings)
|
||||
}
|
||||
}
|
||||
subscriptionCounts[roomId] = currentSubscription + 1
|
||||
} catch (exception: Exception) {
|
||||
Timber.e("Failed to subscribe to room $roomId")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun unsubscribe(roomId: RoomId) = mutex.withLock {
|
||||
withContext(dispatchers.io) {
|
||||
try {
|
||||
val currentSubscription = subscriptionCounts.getOrElse(roomId) { 0 }
|
||||
when (currentSubscription) {
|
||||
0 -> return@withContext
|
||||
1 -> {
|
||||
Timber.d("Unsubscribe from room $roomId")
|
||||
roomListService.room(roomId.value).use { roomListItem ->
|
||||
roomListItem.unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
subscriptionCounts[roomId] = currentSubscription - 1
|
||||
} catch (exception: Exception) {
|
||||
Timber.e("Failed to unsubscribe from room $roomId")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -40,7 +40,6 @@ import io.element.android.libraries.matrix.api.room.location.AssetType
|
||||
import io.element.android.libraries.matrix.api.room.roomMembers
|
||||
import io.element.android.libraries.matrix.api.room.roomNotificationSettings
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
|
||||
import io.element.android.libraries.matrix.impl.core.toProgressWatcher
|
||||
import io.element.android.libraries.matrix.impl.media.MediaUploadHandlerImpl
|
||||
import io.element.android.libraries.matrix.impl.media.map
|
||||
@@ -61,12 +60,10 @@ import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.EventTimelineItem
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
import org.matrix.rustcomponents.sdk.RoomMember
|
||||
import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation
|
||||
import org.matrix.rustcomponents.sdk.RoomSubscription
|
||||
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromHtml
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
|
||||
@@ -84,6 +81,7 @@ class RustMatrixRoom(
|
||||
private val systemClock: SystemClock,
|
||||
private val roomContentForwarder: RoomContentForwarder,
|
||||
private val sessionData: SessionData,
|
||||
private val roomSyncSubscriber: RoomSyncSubscriber,
|
||||
) : MatrixRoom {
|
||||
|
||||
override val roomId = RoomId(innerRoom.id())
|
||||
@@ -118,22 +116,9 @@ class RustMatrixRoom(
|
||||
|
||||
override val timeline: MatrixTimeline = _timeline
|
||||
|
||||
override fun subscribeToSync() {
|
||||
val settings = RoomSubscription(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""),
|
||||
),
|
||||
timelineLimit = null
|
||||
)
|
||||
roomListItem.subscribe(settings)
|
||||
}
|
||||
override suspend fun subscribeToSync() = roomSyncSubscriber.subscribe(roomId)
|
||||
|
||||
override fun unsubscribeFromSync() {
|
||||
roomListItem.unsubscribe()
|
||||
}
|
||||
override suspend fun unsubscribeFromSync() = roomSyncSubscriber.unsubscribe(roomId)
|
||||
|
||||
override fun destroy() {
|
||||
roomCoroutineScope.cancel()
|
||||
|
||||
@@ -157,9 +157,9 @@ class FakeMatrixRoom(
|
||||
|
||||
override val timeline: MatrixTimeline = matrixTimeline
|
||||
|
||||
override fun subscribeToSync() = Unit
|
||||
override suspend fun subscribeToSync() = Unit
|
||||
|
||||
override fun unsubscribeFromSync() = Unit
|
||||
override suspend fun unsubscribeFromSync() = Unit
|
||||
|
||||
override fun destroy() = Unit
|
||||
|
||||
|
||||
Reference in New Issue
Block a user