Add chunk operator on flow

This commit is contained in:
ganfra
2022-11-07 18:42:07 +01:00
parent bff815e2c5
commit 7907fce9e5
3 changed files with 99 additions and 5 deletions

View File

@@ -1,2 +1,88 @@
package io.element.android.x.core.data.flow
import android.os.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.selects.select
@ExperimentalCoroutinesApi
fun <T> Flow<T>.chunk(durationInMillis: Long): Flow<List<T>> {
require(durationInMillis > 0) { "Duration should be greater than 0" }
return flow {
coroutineScope {
val events = ArrayList<T>()
val ticker = fixedPeriodTicker(durationInMillis)
try {
val upstreamValues = produce(capacity = Channel.CONFLATED) {
collect { value -> send(value) }
}
while (isActive) {
var hasTimedOut = false
select<Unit> {
upstreamValues.onReceive {
events.add(it)
}
ticker.onReceive {
hasTimedOut = true
}
}
if (hasTimedOut && events.isNotEmpty()) {
emit(events.toList())
events.clear()
}
}
} catch (e: ClosedReceiveChannelException) {
// drain remaining events
if (events.isNotEmpty()) emit(events.toList())
} finally {
ticker.cancel()
}
}
}
}
@ExperimentalCoroutinesApi
fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
var windowStartTime = SystemClock.elapsedRealtime()
var emitted = false
collect { value ->
val currentTime = SystemClock.elapsedRealtime()
val delta = currentTime - windowStartTime
if (delta >= windowDuration) {
windowStartTime += delta / windowDuration * windowDuration
emitted = false
}
if (!emitted) {
emit(value)
emitted = true
}
}
}
@ExperimentalCoroutinesApi
fun tickerFlow(scope: CoroutineScope, delayMillis: Long, initialDelayMillis: Long = delayMillis): Flow<Unit> {
return scope.fixedPeriodTicker(delayMillis, initialDelayMillis).consumeAsFlow()
}
@ExperimentalCoroutinesApi
private fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
delay(initialDelayMillis)
while (true) {
channel.send(Unit)
delay(delayMillis)
}
}
}

View File

@@ -1,6 +1,7 @@
package io.element.android.x.matrix.room
import io.element.android.x.core.data.CoroutineDispatchers
import io.element.android.x.core.data.flow.chunk
import io.element.android.x.matrix.room.message.RoomMessageFactory
import io.element.android.x.matrix.sync.roomListDiff
import io.element.android.x.matrix.sync.state
@@ -34,9 +35,12 @@ internal class RustRoomSummaryDataSource(
fun startSync(){
slidingSyncView.roomListDiff()
.onEach { diff ->
.chunk(100)
.onEach { diffs ->
updateRoomSummaries {
applyDiff(diff)
diffs.forEach {
applyDiff(it)
}
}
}.launchIn(coroutineScope)

View File

@@ -1,6 +1,7 @@
package io.element.android.x.matrix.timeline
import io.element.android.x.core.data.CoroutineDispatchers
import io.element.android.x.core.data.flow.chunk
import io.element.android.x.matrix.core.EventId
import io.element.android.x.matrix.room.MatrixRoom
import io.element.android.x.matrix.room.timelineDiff
@@ -42,9 +43,12 @@ class MatrixTimeline(
private fun diffFlow(): Flow<Unit> {
return room.timelineDiff()
.onEach { timelineDiff ->
.chunk(100)
.onEach { timelineDiffs ->
updateTimelineItems {
applyDiff(timelineDiff)
timelineDiffs.onEach {
applyDiff(it)
}
}
}.map { }
}
@@ -107,7 +111,7 @@ class MatrixTimeline(
room.addTimelineListener(timelineListener)
}
fun dispose(){
fun dispose() {
room.removeTimeline()
}