No crash when room is already destroyed...

This commit is contained in:
ganfra
2023-07-28 13:40:18 +02:00
parent 962596b417
commit ca4bbbc050
5 changed files with 52 additions and 28 deletions

View File

@@ -16,6 +16,7 @@
package io.element.android.libraries.matrix.impl.room
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
@@ -40,9 +41,15 @@ fun RoomList.loadingStateFlow(): Flow<RoomListLoadingState> =
trySendBlocking(state)
}
}
val result = loadingState(listener)
send(result.state)
result.stateStream
tryOrNull {
val result = loadingState(listener)
try {
send(result.state)
} catch (exception: Exception) {
Timber.d("loadingStateFlow() initialState failed.")
}
result.stateStream
}
}.buffer(Channel.UNLIMITED)
fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<List<RoomListEntriesUpdate>> =
@@ -52,9 +59,27 @@ fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit):
trySendBlocking(roomEntriesUpdate)
}
}
val result = entries(listener)
onInitialList(result.entries)
result.entriesStream
tryOrNull {
val result = entries(listener)
try {
onInitialList(result.entries)
} catch (exception: Exception) {
Timber.d(exception, "entriesFlow() onInitialList failed.")
}
result.entriesStream
}
}.buffer(Channel.UNLIMITED)
fun RoomListService.stateFlow(): Flow<RoomListServiceState> =
mxCallbackFlow {
val listener = object : RoomListServiceStateListener {
override fun onUpdate(state: RoomListServiceState) {
trySendBlocking(state)
}
}
tryOrNull {
state(listener)
}
}.buffer(Channel.UNLIMITED)
fun RoomListService.roomOrNull(roomId: String): RoomListItem? {
@@ -65,13 +90,3 @@ fun RoomListService.roomOrNull(roomId: String): RoomListItem? {
return null
}
}
fun RoomListService.stateFlow(): Flow<RoomListServiceState> =
mxCallbackFlow {
val listener = object : RoomListServiceStateListener {
override fun onUpdate(state: RoomListServiceState) {
trySendBlocking(state)
}
}
state(listener)
}.buffer(Channel.UNLIMITED)

View File

@@ -16,6 +16,7 @@
package io.element.android.libraries.matrix.impl.sync
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
@@ -32,5 +33,7 @@ fun SyncService.stateFlow(): Flow<SyncServiceState> =
trySendBlocking(state)
}
}
state(listener)
tryOrNull {
state(listener)
}
}.buffer(Channel.UNLIMITED)

View File

@@ -16,6 +16,8 @@
package io.element.android.libraries.matrix.impl.timeline
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.cancelAndDestroy
import io.element.android.libraries.matrix.impl.util.destroyAll
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.channels.Channel
@@ -35,14 +37,14 @@ import timber.log.Timber
internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -> Unit): Flow<TimelineDiff> =
callbackFlow {
val roomId = id()
Timber.d("Open timelineDiffFlow for room $roomId")
val listener = object : TimelineListener {
override fun onUpdate(diff: TimelineDiff) {
trySendBlocking(diff)
}
}
var result: RoomTimelineListenerResult? = null
val roomId = tryOrNull { id() }
Timber.d("Open timelineDiffFlow for room $roomId")
try {
result = addTimelineListener(listener)
onInitialList(result.items)
@@ -51,8 +53,7 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -
}
awaitClose {
Timber.d("Close timelineDiffFlow for room $roomId")
result?.itemsStream?.cancel()
result?.itemsStream?.destroy()
result?.itemsStream?.cancelAndDestroy()
result?.items?.destroyAll()
}
}.buffer(Channel.UNLIMITED)
@@ -64,5 +65,7 @@ internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> =
trySendBlocking(status)
}
}
subscribeToBackPaginationStatus(listener)
tryOrNull {
subscribeToBackPaginationStatus(listener)
}
}.buffer(Channel.UNLIMITED)

View File

@@ -21,11 +21,10 @@ import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
import org.matrix.rustcomponents.sdk.TaskHandle
internal fun <T> mxCallbackFlow(block: suspend ProducerScope<T>.() -> TaskHandle) =
internal fun <T> mxCallbackFlow(block: suspend ProducerScope<T>.() -> TaskHandle?) =
callbackFlow {
val token: TaskHandle = block(this)
val token: TaskHandle? = block(this)
awaitClose {
token.cancel()
token.destroy()
token?.cancelAndDestroy()
}
}

View File

@@ -28,9 +28,13 @@ class TaskHandleBag(private val tokens: MutableSet<TaskHandle> = CopyOnWriteArra
fun dispose() {
tokens.forEach {
it.cancel()
it.destroy()
it.cancelAndDestroy()
}
tokens.clear()
}
}
fun TaskHandle.cancelAndDestroy() {
cancel()
destroy()
}