diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt index 5d92bfde0b..8e7047aaa4 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.RoomList import org.matrix.rustcomponents.sdk.RoomListEntriesListener import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate @@ -41,15 +42,15 @@ fun RoomList.loadingStateFlow(): Flow = trySendBlocking(state) } } - tryOrNull { - val result = loadingState(listener) - try { - send(result.state) - } catch (exception: Exception) { - Timber.d("loadingStateFlow() initialState failed.") - } - result.stateStream + val result = loadingState(listener) + try { + send(result.state) + } catch (exception: Exception) { + Timber.d("loadingStateFlow() initialState failed.") } + result.stateStream + }.catch { + Timber.d(it, "loadingStateFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): Flow> = @@ -59,15 +60,15 @@ fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): trySendBlocking(roomEntriesUpdate) } } - tryOrNull { - val result = entries(listener) - try { - onInitialList(result.entries) - } catch (exception: Exception) { - Timber.d(exception, "entriesFlow() onInitialList failed.") - } - result.entriesStream + val result = entries(listener) + try { + onInitialList(result.entries) + } catch (exception: Exception) { + Timber.d("entriesFlow() onInitialList failed.") } + result.entriesStream + }.catch { + Timber.d(it, "entriesFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomListService.stateFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index 3c5bb26e30..9f8bccb1d0 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -26,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.BackPaginationStatus import org.matrix.rustcomponents.sdk.BackPaginationStatusListener import org.matrix.rustcomponents.sdk.Room -import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.TimelineListener @@ -42,20 +42,21 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List) - trySendBlocking(diff) } } - var result: RoomTimelineListenerResult? = null - val roomId = tryOrNull { id() } + val roomId = id() Timber.d("Open timelineDiffFlow for room $roomId") + val result = addTimelineListener(listener) try { - result = addTimelineListener(listener) onInitialList(result.items) } catch (exception: Exception) { Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId") } awaitClose { Timber.d("Close timelineDiffFlow for room $roomId") - result?.itemsStream?.cancelAndDestroy() - result?.items?.destroyAll() + result.itemsStream.cancelAndDestroy() + result.items.destroyAll() } + }.catch { + Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) internal fun Room.backPaginationStatusFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt index 283b5f7076..fbf393e587 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt @@ -23,8 +23,8 @@ import org.matrix.rustcomponents.sdk.TaskHandle internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle?) = callbackFlow { - val token: TaskHandle? = block(this) + val taskHandle: TaskHandle? = block(this) awaitClose { - token?.cancelAndDestroy() + taskHandle?.cancelAndDestroy() } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt index 937e73e72e..5842ba1546 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt @@ -19,22 +19,22 @@ package io.element.android.libraries.matrix.impl.util import org.matrix.rustcomponents.sdk.TaskHandle import java.util.concurrent.CopyOnWriteArraySet -class TaskHandleBag(private val tokens: MutableSet = CopyOnWriteArraySet()) : Set by tokens { - - operator fun plusAssign(taskHandle: TaskHandle?) { - if (taskHandle == null) return - tokens += taskHandle - } - - fun dispose() { - tokens.forEach { - it.cancelAndDestroy() - } - tokens.clear() - } -} - fun TaskHandle.cancelAndDestroy() { cancel() destroy() } + +class TaskHandleBag(private val taskHandles: MutableSet = CopyOnWriteArraySet()) : Set by taskHandles { + + operator fun plusAssign(taskHandle: TaskHandle?) { + if (taskHandle == null) return + taskHandles += taskHandle + } + + fun dispose() { + taskHandles.forEach { + it.cancelAndDestroy() + } + taskHandles.clear() + } +}