Remove usage of async-uniffi as it leads to a deadlocks and memory leaks (#1381)
This commit is contained in:
1
changelog.d/1381.misc
Normal file
1
changelog.d/1381.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove usage of async-uniffi as it leads to a deadlocks and memory leaks.
|
||||
@@ -97,8 +97,8 @@ class RustMatrixClient constructor(
|
||||
private val innerRoomListService = syncService.roomListService()
|
||||
private val sessionDispatcher = dispatchers.io.limitedParallelism(64)
|
||||
private val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-${sessionId}")
|
||||
private val rustSyncService = RustSyncService(syncService, sessionCoroutineScope)
|
||||
private val verificationService = RustSessionVerificationService(rustSyncService)
|
||||
private val rustSyncService = RustSyncService(syncService, dispatchers, sessionCoroutineScope)
|
||||
private val verificationService = RustSessionVerificationService(rustSyncService, dispatchers)
|
||||
private val pushersService = RustPushersService(
|
||||
client = client,
|
||||
dispatchers = dispatchers,
|
||||
|
||||
@@ -53,8 +53,7 @@ class RustMatrixClientFactory @Inject constructor(
|
||||
|
||||
client.restoreSession(sessionData.toSession())
|
||||
|
||||
val syncService = client.syncService()
|
||||
.finish()
|
||||
val syncService = client.syncService().finishBlocking()
|
||||
|
||||
RustMatrixClient(
|
||||
client = client,
|
||||
|
||||
@@ -47,15 +47,19 @@ class RustNotificationSettingsService(
|
||||
notificationSettings.setDelegate(notificationSettingsDelegate)
|
||||
}
|
||||
|
||||
override suspend fun getRoomNotificationSettings(roomId: RoomId, isEncrypted: Boolean, isOneToOne: Boolean): Result<RoomNotificationSettings> =
|
||||
override suspend fun getRoomNotificationSettings(roomId: RoomId, isEncrypted: Boolean, isOneToOne: Boolean): Result<RoomNotificationSettings> = withContext(
|
||||
dispatchers.io
|
||||
) {
|
||||
runCatching {
|
||||
notificationSettings.getRoomNotificationSettings(roomId.value, isEncrypted, isOneToOne).let(RoomNotificationSettingsMapper::map)
|
||||
notificationSettings.getRoomNotificationSettingsBlocking(roomId.value, isEncrypted, isOneToOne).let(RoomNotificationSettingsMapper::map)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun getDefaultRoomNotificationMode(isEncrypted: Boolean, isOneToOne: Boolean): Result<RoomNotificationMode> =
|
||||
override suspend fun getDefaultRoomNotificationMode(isEncrypted: Boolean, isOneToOne: Boolean): Result<RoomNotificationMode> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.getDefaultRoomNotificationMode(isEncrypted, isOneToOne).let(RoomNotificationSettingsMapper::mapMode)
|
||||
notificationSettings.getDefaultRoomNotificationModeBlocking(isEncrypted, isOneToOne).let(RoomNotificationSettingsMapper::mapMode)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun setDefaultRoomNotificationMode(
|
||||
isEncrypted: Boolean,
|
||||
@@ -63,19 +67,19 @@ class RustNotificationSettingsService(
|
||||
isOneToOne: Boolean
|
||||
): Result<Unit> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.setDefaultRoomNotificationMode(isEncrypted, isOneToOne, mode.let(RoomNotificationSettingsMapper::mapMode))
|
||||
notificationSettings.setDefaultRoomNotificationModeBlocking(isEncrypted, isOneToOne, mode.let(RoomNotificationSettingsMapper::mapMode))
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun setRoomNotificationMode(roomId: RoomId, mode: RoomNotificationMode): Result<Unit> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.setRoomNotificationMode(roomId.value, mode.let(RoomNotificationSettingsMapper::mapMode))
|
||||
notificationSettings.setRoomNotificationModeBlocking(roomId.value, mode.let(RoomNotificationSettingsMapper::mapMode))
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun restoreDefaultRoomNotificationMode(roomId: RoomId): Result<Unit> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.restoreDefaultRoomNotificationMode(roomId.value)
|
||||
notificationSettings.restoreDefaultRoomNotificationModeBlocking(roomId.value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,31 +87,31 @@ class RustNotificationSettingsService(
|
||||
|
||||
override suspend fun unmuteRoom(roomId: RoomId, isEncrypted: Boolean, isOneToOne: Boolean) = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.unmuteRoom(roomId.value, isEncrypted, isOneToOne)
|
||||
notificationSettings.unmuteRoomBlocking(roomId.value, isEncrypted, isOneToOne)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun isRoomMentionEnabled(): Result<Boolean> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.isRoomMentionEnabled()
|
||||
notificationSettings.isRoomMentionEnabledBlocking()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun setRoomMentionEnabled(enabled: Boolean): Result<Unit> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.setRoomMentionEnabled(enabled)
|
||||
notificationSettings.setRoomMentionEnabledBlocking(enabled)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun isCallEnabled(): Result<Boolean> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.isCallEnabled()
|
||||
notificationSettings.isCallEnabledBlocking()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun setCallEnabled(enabled: Boolean): Result<Unit> = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
notificationSettings.setCallEnabled(enabled)
|
||||
notificationSettings.setCallEnabledBlocking(enabled)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,11 +55,11 @@ import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.coroutines.yield
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
@@ -188,12 +188,12 @@ class RustMatrixRoom(
|
||||
_membersStateFlow.value = MatrixRoomMembersState.Pending(prevRoomMembers = currentMembers)
|
||||
var rustMembers: List<RoomMember>? = null
|
||||
try {
|
||||
rustMembers = innerRoom.members().use { membersIterator ->
|
||||
rustMembers = innerRoom.membersBlocking().use { membersIterator ->
|
||||
buildList {
|
||||
while (true) {
|
||||
// Loading the whole membersIterator as a stop-gap measure.
|
||||
// We should probably implement some sort of paging in the future.
|
||||
yield()
|
||||
ensureActive()
|
||||
addAll(membersIterator.nextChunk(1000u) ?: break)
|
||||
}
|
||||
}
|
||||
@@ -294,27 +294,27 @@ class RustMatrixRoom(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun canUserInvite(userId: UserId): Result<Boolean> {
|
||||
return runCatching {
|
||||
innerRoom.canUserInvite(userId.value)
|
||||
override suspend fun canUserInvite(userId: UserId): Result<Boolean> = withContext(roomMembersDispatcher) {
|
||||
runCatching {
|
||||
innerRoom.canUserInviteBlocking(userId.value)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun canUserRedact(userId: UserId): Result<Boolean> {
|
||||
return runCatching {
|
||||
innerRoom.canUserRedact(userId.value)
|
||||
override suspend fun canUserRedact(userId: UserId): Result<Boolean> = withContext(roomMembersDispatcher) {
|
||||
runCatching {
|
||||
innerRoom.canUserRedactBlocking(userId.value)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun canUserSendState(userId: UserId, type: StateEventType): Result<Boolean> {
|
||||
return runCatching {
|
||||
innerRoom.canUserSendState(userId.value, type.map())
|
||||
override suspend fun canUserSendState(userId: UserId, type: StateEventType): Result<Boolean> = withContext(roomMembersDispatcher) {
|
||||
runCatching {
|
||||
innerRoom.canUserSendStateBlocking(userId.value, type.map())
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun canUserSendMessage(userId: UserId, type: MessageEventType): Result<Boolean> {
|
||||
return runCatching {
|
||||
innerRoom.canUserSendMessage(userId.value, type.map())
|
||||
override suspend fun canUserSendMessage(userId: UserId, type: MessageEventType): Result<Boolean> = withContext(roomMembersDispatcher) {
|
||||
runCatching {
|
||||
innerRoom.canUserSendMessageBlocking(userId.value, type.map())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.matrix.api.sync.SyncService
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
@@ -25,27 +26,33 @@ import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.SyncServiceInterface
|
||||
import org.matrix.rustcomponents.sdk.SyncServiceState
|
||||
import timber.log.Timber
|
||||
|
||||
class RustSyncService(
|
||||
private val innerSyncService: SyncServiceInterface,
|
||||
private val dispatchers: CoroutineDispatchers,
|
||||
sessionCoroutineScope: CoroutineScope
|
||||
) : SyncService {
|
||||
|
||||
override suspend fun startSync() = runCatching {
|
||||
Timber.i("Start sync")
|
||||
innerSyncService.start()
|
||||
}.onFailure {
|
||||
Timber.d("Start sync failed: $it")
|
||||
override suspend fun startSync() = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
Timber.i("Start sync")
|
||||
innerSyncService.startBlocking()
|
||||
}.onFailure {
|
||||
Timber.d("Start sync failed: $it")
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun stopSync() = runCatching {
|
||||
Timber.i("Stop sync")
|
||||
innerSyncService.stop()
|
||||
}.onFailure {
|
||||
Timber.d("Stop sync failed: $it")
|
||||
override suspend fun stopSync() = withContext(dispatchers.io){
|
||||
runCatching {
|
||||
Timber.i("Stop sync")
|
||||
innerSyncService.stopBlocking()
|
||||
}.onFailure {
|
||||
Timber.d("Stop sync failed: $it")
|
||||
}
|
||||
}
|
||||
|
||||
override val syncState: StateFlow<SyncState> =
|
||||
|
||||
@@ -44,7 +44,7 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -
|
||||
}
|
||||
val roomId = id()
|
||||
Timber.d("Open timelineDiffFlow for room $roomId")
|
||||
val result = addTimelineListener(listener)
|
||||
val result = addTimelineListenerBlocking(listener)
|
||||
try {
|
||||
onInitialList(result.items)
|
||||
} catch (exception: Exception) {
|
||||
|
||||
@@ -124,7 +124,7 @@ class RustMatrixTimeline(
|
||||
|
||||
private suspend fun fetchMembers() = withContext(dispatcher) {
|
||||
initLatch.await()
|
||||
innerRoom.fetchMembers()
|
||||
innerRoom.fetchMembersBlocking()
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package io.element.android.libraries.matrix.impl.verification
|
||||
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.core.data.tryOrNull
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.libraries.matrix.api.verification.SessionVerificationService
|
||||
@@ -27,6 +28,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.SessionVerificationController
|
||||
import org.matrix.rustcomponents.sdk.SessionVerificationControllerDelegate
|
||||
import org.matrix.rustcomponents.sdk.SessionVerificationControllerInterface
|
||||
@@ -35,6 +37,7 @@ import javax.inject.Inject
|
||||
|
||||
class RustSessionVerificationService @Inject constructor(
|
||||
private val syncService: RustSyncService,
|
||||
private val dispatchers: CoroutineDispatchers,
|
||||
) : SessionVerificationService, SessionVerificationControllerDelegate {
|
||||
|
||||
var verificationController: SessionVerificationControllerInterface? = null
|
||||
@@ -61,21 +64,31 @@ class RustSessionVerificationService @Inject constructor(
|
||||
syncState == SyncState.Running && verificationStatus == SessionVerifiedStatus.NotVerified
|
||||
}
|
||||
|
||||
override suspend fun requestVerification() = tryOrFail {
|
||||
verificationController?.requestVerification()
|
||||
override suspend fun requestVerification() {
|
||||
tryOrFail {
|
||||
verificationController?.requestVerificationBlocking()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun cancelVerification() = tryOrFail { verificationController?.cancelVerification() }
|
||||
|
||||
override suspend fun approveVerification() = tryOrFail { verificationController?.approveVerification() }
|
||||
|
||||
override suspend fun declineVerification() = tryOrFail { verificationController?.declineVerification() }
|
||||
|
||||
override suspend fun startVerification() = tryOrFail {
|
||||
verificationController?.startSasVerification()
|
||||
override suspend fun cancelVerification() {
|
||||
tryOrFail { verificationController?.cancelVerificationBlocking() }
|
||||
}
|
||||
|
||||
private suspend fun tryOrFail(block: suspend () -> Unit) {
|
||||
override suspend fun approveVerification() {
|
||||
tryOrFail { verificationController?.approveVerificationBlocking() }
|
||||
}
|
||||
|
||||
override suspend fun declineVerification() {
|
||||
tryOrFail { verificationController?.declineVerificationBlocking() }
|
||||
}
|
||||
|
||||
override suspend fun startVerification() {
|
||||
tryOrFail {
|
||||
verificationController?.startSasVerificationBlocking()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun tryOrFail(block: suspend () -> Unit) = withContext(dispatchers.io) {
|
||||
runCatching {
|
||||
block()
|
||||
}.onFailure { didFail() }
|
||||
|
||||
Reference in New Issue
Block a user