Coroutine: introduce scoped dispatcher with limitedParalellism

This commit is contained in:
ganfra
2023-07-11 11:41:24 +02:00
parent c2c81d3747
commit d02f7fb871
5 changed files with 68 additions and 55 deletions

View File

@@ -54,7 +54,7 @@ import io.element.android.libraries.matrix.impl.verification.RustSessionVerifica
import io.element.android.libraries.sessionstorage.api.SessionStore
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
@@ -73,6 +73,7 @@ import org.matrix.rustcomponents.sdk.CreateRoomParameters as RustCreateRoomParam
import org.matrix.rustcomponents.sdk.RoomPreset as RustRoomPreset
import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility
@OptIn(ExperimentalCoroutinesApi::class)
class RustMatrixClient constructor(
private val client: Client,
private val sessionStore: SessionStore,
@@ -85,6 +86,7 @@ class RustMatrixClient constructor(
override val sessionId: UserId = UserId(client.userId())
private val roomListService = client.roomListServiceWithEncryption()
private val sessionDispatcher = dispatchers.io.limitedParallelism(64)
private val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-${sessionId}")
private val verificationService = RustSessionVerificationService()
private val syncService = RustSyncService(roomListService, sessionCoroutineScope)
@@ -92,6 +94,7 @@ class RustMatrixClient constructor(
client = client,
dispatchers = dispatchers,
)
private val notificationService = RustNotificationService(client)
private val clientDelegate = object : ClientDelegate {
@@ -105,7 +108,7 @@ class RustMatrixClient constructor(
RustRoomSummaryDataSource(
roomListService = roomListService,
sessionCoroutineScope = sessionCoroutineScope,
coroutineDispatchers = dispatchers,
dispatcher = sessionDispatcher,
)
override val roomSummaryDataSource: RoomSummaryDataSource
@@ -150,7 +153,7 @@ class RustMatrixClient constructor(
)
}
private suspend fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? = withContext(dispatchers.io) {
private suspend fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? = withContext(sessionDispatcher) {
val cachedRoomListItem = roomListService.roomOrNull(roomId.value)
val fullRoom = cachedRoomListItem?.fullRoom()
if (cachedRoomListItem == null || fullRoom == null) {
@@ -165,19 +168,19 @@ class RustMatrixClient constructor(
return roomId?.let { getRoom(it) }
}
override suspend fun ignoreUser(userId: UserId): Result<Unit> = withContext(dispatchers.io) {
override suspend fun ignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.ignoreUser(userId.value)
}
}
override suspend fun unignoreUser(userId: UserId): Result<Unit> = withContext(dispatchers.io) {
override suspend fun unignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.unignoreUser(userId.value)
}
}
override suspend fun createRoom(createRoomParams: CreateRoomParameters): Result<RoomId> = withContext(dispatchers.io) {
override suspend fun createRoom(createRoomParams: CreateRoomParameters): Result<RoomId> = withContext(sessionDispatcher) {
runCatching {
val rustParams = RustCreateRoomParameters(
name = createRoomParams.name,
@@ -221,14 +224,14 @@ class RustMatrixClient constructor(
return createRoom(createRoomParams)
}
override suspend fun getProfile(userId: UserId): Result<MatrixUser> = withContext(Dispatchers.IO) {
override suspend fun getProfile(userId: UserId): Result<MatrixUser> = withContext(sessionDispatcher) {
runCatching {
client.getProfile(userId.value).let(UserProfileMapper::map)
}
}
override suspend fun searchUsers(searchTerm: String, limit: Long): Result<MatrixSearchUserResults> =
withContext(dispatchers.io) {
withContext(sessionDispatcher) {
runCatching {
client.searchUsers(searchTerm, limit.toULong()).let(UserSearchResultMapper::map)
}
@@ -260,7 +263,7 @@ class RustMatrixClient constructor(
baseDirectory.deleteSessionDirectory(userID = sessionId.value, deleteCryptoDb = false)
}
override suspend fun logout() = withContext(dispatchers.io) {
override suspend fun logout() = withContext(sessionDispatcher) {
try {
client.logout()
} catch (failure: Throwable) {
@@ -271,20 +274,20 @@ class RustMatrixClient constructor(
sessionStore.removeSession(sessionId.value)
}
override suspend fun loadUserDisplayName(): Result<String> = withContext(dispatchers.io) {
override suspend fun loadUserDisplayName(): Result<String> = withContext(sessionDispatcher) {
runCatching {
client.displayName()
}
}
override suspend fun loadUserAvatarURLString(): Result<String?> = withContext(dispatchers.io) {
override suspend fun loadUserAvatarURLString(): Result<String?> = withContext(sessionDispatcher) {
runCatching {
client.avatarUrl()
}
}
@OptIn(ExperimentalUnsignedTypes::class)
override suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result<String> = withContext(dispatchers.io) {
override suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result<String> = withContext(sessionDispatcher) {
runCatching {
client.uploadMedia(mimeType, data.toUByteArray().toList(), progressCallback?.toProgressWatcher())
}
@@ -305,7 +308,7 @@ class RustMatrixClient constructor(
private suspend fun File.getCacheSize(
userID: String,
includeCryptoDb: Boolean = false,
): Long = withContext(dispatchers.io) {
): Long = withContext(sessionDispatcher) {
// Rust sanitises the user ID replacing invalid characters with an _
val sanitisedUserID = userID.replace(":", "_")
val sessionDirectory = File(this@getCacheSize, sanitisedUserID)
@@ -327,7 +330,7 @@ class RustMatrixClient constructor(
private suspend fun File.deleteSessionDirectory(
userID: String,
deleteCryptoDb: Boolean = false,
): Boolean = withContext(dispatchers.io) {
): Boolean = withContext(sessionDispatcher) {
// Rust sanitises the user ID replacing invalid characters with an _
val sanitisedUserID = userID.replace(":", "_")
val sessionDirectory = File(this@deleteSessionDirectory, sanitisedUserID)

View File

@@ -20,6 +20,7 @@ import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.media.MatrixMediaLoader
import io.element.android.libraries.matrix.api.media.MediaFile
import io.element.android.libraries.matrix.api.media.MediaSource
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.Client
import org.matrix.rustcomponents.sdk.mediaSourceFromUrl
@@ -29,10 +30,12 @@ import org.matrix.rustcomponents.sdk.MediaSource as RustMediaSource
class RustMediaLoader(
baseCacheDirectory: File,
private val dispatchers: CoroutineDispatchers,
dispatchers: CoroutineDispatchers,
private val innerClient: Client,
) : MatrixMediaLoader {
@OptIn(ExperimentalCoroutinesApi::class)
private val mediaDispatcher = dispatchers.io.limitedParallelism(32)
private val cacheDirectory = File(baseCacheDirectory, "temp/media").apply {
if (!exists()) {
mkdirs()
@@ -41,7 +44,7 @@ class RustMediaLoader(
@OptIn(ExperimentalUnsignedTypes::class)
override suspend fun loadMediaContent(source: MediaSource): Result<ByteArray> =
withContext(dispatchers.io) {
withContext(mediaDispatcher) {
runCatching {
source.toRustMediaSource().use { source ->
innerClient.getMediaContent(source).toUByteArray().toByteArray()
@@ -55,7 +58,7 @@ class RustMediaLoader(
width: Long,
height: Long
): Result<ByteArray> =
withContext(dispatchers.io) {
withContext(mediaDispatcher) {
runCatching {
source.toRustMediaSource().use { mediaSource ->
innerClient.getMediaThumbnail(
@@ -68,7 +71,7 @@ class RustMediaLoader(
}
override suspend fun downloadMediaFile(source: MediaSource, mimeType: String?, body: String?): Result<MediaFile> =
withContext(dispatchers.io) {
withContext(mediaDispatcher) {
runCatching {
source.toRustMediaSource().use { mediaSource ->
val mediaFile = innerClient.getMediaFile(

View File

@@ -23,7 +23,6 @@ import io.element.android.libraries.matrix.api.core.ProgressCallback
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.media.AudioInfo
import io.element.android.libraries.matrix.api.media.FileInfo
import io.element.android.libraries.matrix.api.media.ImageInfo
@@ -32,17 +31,19 @@ import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.api.room.MatrixRoomMembersState
import io.element.android.libraries.matrix.api.room.MessageEventType
import io.element.android.libraries.matrix.api.room.StateEventType
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
import io.element.android.libraries.matrix.impl.core.toProgressWatcher
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.media.map
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow
import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
@@ -62,6 +63,7 @@ import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
import timber.log.Timber
import java.io.File
@OptIn(ExperimentalCoroutinesApi::class)
class RustMatrixRoom(
override val sessionId: SessionId,
private val roomListItem: RoomListItem,
@@ -74,6 +76,11 @@ class RustMatrixRoom(
override val roomId = RoomId(innerRoom.id())
// Create a dispatcher for all room methods...
private val roomDispatcher = coroutineDispatchers.io.limitedParallelism(32)
//...except getMember methods as it could quickly fill the roomDispatcher...
private val roomMembersDispatcher = coroutineDispatchers.io.limitedParallelism(8)
private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId")
private val _membersStateFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
private val isInit = MutableStateFlow(false)
@@ -83,7 +90,7 @@ class RustMatrixRoom(
matrixRoom = this,
innerRoom = innerRoom,
roomCoroutineScope = roomCoroutineScope,
coroutineDispatchers = coroutineDispatchers
dispatcher = roomDispatcher
)
}
@@ -105,7 +112,7 @@ class RustMatrixRoom(
timelineLimit = null
)
roomListItem.subscribe(settings)
roomCoroutineScope.launch(coroutineDispatchers.computation) {
roomCoroutineScope.launch(roomDispatcher) {
innerRoom.timelineDiffFlow { initialList ->
_timeline.postItems(initialList)
}.onEach {
@@ -175,7 +182,7 @@ class RustMatrixRoom(
override val activeMemberCount: Long
get() = innerRoom.activeMembersCount().toLong()
override suspend fun updateMembers(): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun updateMembers(): Result<Unit> = withContext(roomMembersDispatcher) {
val currentState = _membersStateFlow.value
val currentMembers = currentState.roomMembers()
_membersStateFlow.value = MatrixRoomMembersState.Pending(prevRoomMembers = currentMembers)
@@ -189,20 +196,20 @@ class RustMatrixRoom(
}
override suspend fun userDisplayName(userId: UserId): Result<String?> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.memberDisplayName(userId.value)
}
}
override suspend fun userAvatarUrl(userId: UserId): Result<String?> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.memberAvatarUrl(userId.value)
}
}
override suspend fun sendMessage(message: String): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun sendMessage(message: String): Result<Unit> = withContext(roomDispatcher) {
val transactionId = genTransactionId()
messageEventContentFromMarkdown(message).use { content ->
runCatching {
@@ -211,7 +218,7 @@ class RustMatrixRoom(
}
}
override suspend fun editMessage(originalEventId: EventId?, transactionId: String?, message: String): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun editMessage(originalEventId: EventId?, transactionId: String?, message: String): Result<Unit> = withContext(roomDispatcher) {
if (originalEventId != null) {
runCatching {
innerRoom.edit(/* TODO use content */ message, originalEventId.value, transactionId)
@@ -224,7 +231,7 @@ class RustMatrixRoom(
}
}
override suspend fun replyMessage(eventId: EventId, message: String): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun replyMessage(eventId: EventId, message: String): Result<Unit> = withContext(roomDispatcher) {
val transactionId = genTransactionId()
// val content = messageEventContentFromMarkdown(message)
runCatching {
@@ -232,50 +239,50 @@ class RustMatrixRoom(
}
}
override suspend fun redactEvent(eventId: EventId, reason: String?) = withContext(coroutineDispatchers.io) {
override suspend fun redactEvent(eventId: EventId, reason: String?) = withContext(roomDispatcher) {
val transactionId = genTransactionId()
runCatching {
innerRoom.redact(eventId.value, reason, transactionId)
}
}
override suspend fun leave(): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun leave(): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.leave()
}
}
override suspend fun acceptInvitation(): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun acceptInvitation(): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.acceptInvitation()
}
}
override suspend fun rejectInvitation(): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun rejectInvitation(): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.rejectInvitation()
}
}
override suspend fun inviteUserById(id: UserId): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun inviteUserById(id: UserId): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.inviteUserById(id.value)
}
}
override suspend fun canInvite(): Result<Boolean> = withContext(coroutineDispatchers.io) {
override suspend fun canInvite(): Result<Boolean> = withContext(roomMembersDispatcher) {
runCatching {
innerRoom.member(sessionId.value).use(RoomMember::canInvite)
}
}
override suspend fun canSendStateEvent(type: StateEventType): Result<Boolean> = withContext(coroutineDispatchers.io) {
override suspend fun canSendStateEvent(type: StateEventType): Result<Boolean> = withContext(roomMembersDispatcher) {
runCatching {
innerRoom.member(sessionId.value).use { it.canSendState(type.map()) }
}
}
override suspend fun canSendEvent(type: MessageEventType): Result<Boolean> = withContext(coroutineDispatchers.io) {
override suspend fun canSendEvent(type: MessageEventType): Result<Boolean> = withContext(roomMembersDispatcher) {
runCatching {
innerRoom.member(sessionId.value).use { it.canSendMessage(type.map()) }
}
@@ -305,13 +312,13 @@ class RustMatrixRoom(
}
}
override suspend fun toggleReaction(emoji: String, eventId: EventId): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun toggleReaction(emoji: String, eventId: EventId): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.toggleReaction(key = emoji, eventId = eventId.value)
}
}
override suspend fun forwardEvent(eventId: EventId, roomIds: List<RoomId>): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun forwardEvent(eventId: EventId, roomIds: List<RoomId>): Result<Unit> = withContext(roomDispatcher) {
runCatching {
roomContentForwarder.forward(fromRoom = innerRoom, eventId = eventId, toRoomIds = roomIds)
}.onFailure {
@@ -320,14 +327,14 @@ class RustMatrixRoom(
}
override suspend fun retrySendMessage(transactionId: String): Result<Unit> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.retrySend(transactionId)
}
}
override suspend fun cancelSend(transactionId: String): Result<Unit> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.cancelSend(transactionId)
}
@@ -335,40 +342,40 @@ class RustMatrixRoom(
@OptIn(ExperimentalUnsignedTypes::class)
override suspend fun updateAvatar(mimeType: String, data: ByteArray): Result<Unit> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.uploadAvatar(mimeType, data.toUByteArray().toList())
}
}
override suspend fun removeAvatar(): Result<Unit> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.removeAvatar()
}
}
override suspend fun setName(name: String): Result<Unit> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.setName(name)
}
}
override suspend fun setTopic(topic: String): Result<Unit> =
withContext(coroutineDispatchers.io) {
withContext(roomDispatcher) {
runCatching {
innerRoom.setTopic(topic)
}
}
private suspend fun fetchMembers() = withContext(coroutineDispatchers.io) {
private suspend fun fetchMembers() = withContext(roomDispatcher) {
runCatching {
innerRoom.fetchMembers()
}
}
override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason)
if (blockUserId != null) {
@@ -383,7 +390,7 @@ class RustMatrixRoom(
description: String?,
zoomLevel: Int?,
assetType: AssetType?,
): Result<Unit> = withContext(coroutineDispatchers.io) {
): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.sendLocation(
body = body,

View File

@@ -16,9 +16,9 @@
package io.element.android.libraries.matrix.impl.room
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.room.RoomSummary
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -41,7 +41,7 @@ import timber.log.Timber
internal class RustRoomSummaryDataSource(
private val roomListService: RoomListService,
private val sessionCoroutineScope: CoroutineScope,
coroutineDispatchers: CoroutineDispatchers,
dispatcher: CoroutineDispatcher,
roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
) : RoomSummaryDataSource {
@@ -53,7 +53,7 @@ internal class RustRoomSummaryDataSource(
private val inviteRoomsListProcessor = RoomSummaryListProcessor(inviteRooms, roomListService, roomSummaryDetailsFactory, shouldFetchFullRoom = true)
init {
sessionCoroutineScope.launch(coroutineDispatchers.computation) {
sessionCoroutineScope.launch(dispatcher) {
val allRooms = roomListService.allRooms()
allRooms
.observeEntriesWithProcessor(allRoomsListProcessor)

View File

@@ -16,7 +16,6 @@
package io.element.android.libraries.matrix.impl.timeline
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
@@ -25,6 +24,7 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
@@ -45,7 +45,7 @@ class RustMatrixTimeline(
roomCoroutineScope: CoroutineScope,
private val matrixRoom: MatrixRoom,
private val innerRoom: Room,
private val coroutineDispatchers: CoroutineDispatchers,
private val dispatcher: CoroutineDispatcher,
) : MatrixTimeline {
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
@@ -109,13 +109,13 @@ class RustMatrixTimeline(
}
}
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> = withContext(dispatcher) {
runCatching {
innerRoom.fetchDetailsForEvent(eventId.value)
}
}
override suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit> = withContext(dispatcher) {
runCatching {
Timber.v("Start back paginating for room ${matrixRoom.roomId} ")
val paginationOptions = PaginationOptions.UntilNumItems(
@@ -131,7 +131,7 @@ class RustMatrixTimeline(
}
}
override suspend fun sendReadReceipt(eventId: EventId) = withContext(coroutineDispatchers.io) {
override suspend fun sendReadReceipt(eventId: EventId) = withContext(dispatcher) {
runCatching {
innerRoom.sendReadReceipt(eventId = eventId.value)
}