Sync notifications using WorkManager (#5545)

* Initial implementation of notification sync using `WorkManager`

* Use custom `MetroWorkerFactory` to allow assisted injection in WorkManager Workers

* Add tests for `FetchNotificationWorker`. Create `FakeNotificationResolverQueue` to help testing.

* Add more tests, fix Konsist checks

* Add tests for `SyncNotificationWorkManagerRequest`

* Simplify `FakeNotificationResolverQueue`
This commit is contained in:
Jorge Martin Espinosa
2025-10-17 11:51:27 +02:00
committed by GitHub
parent 42c775d740
commit 597c9b473a
38 changed files with 968 additions and 98 deletions

View File

@@ -34,10 +34,17 @@
<provider
android:name="androidx.startup.InitializationProvider"
android:authorities="${applicationId}.androidx-startup"
android:exported="false">
android:exported="false"
tools:node="merge">
<meta-data
android:name='androidx.lifecycle.ProcessLifecycleInitializer'
android:value='androidx.startup' />
<!-- Remove to use Application workManagerConfiguration -->
<meta-data
android:name="androidx.work.WorkManagerInitializer"
android:value="androidx.startup"
tools:node="remove" />
</provider>
<!--
@@ -175,7 +182,6 @@
android:name="android.support.FILE_PROVIDER_PATHS"
android:resource="@xml/file_providers" />
</provider>
</application>
</manifest>

View File

@@ -9,17 +9,23 @@ package io.element.android.x
import android.app.Application
import androidx.startup.AppInitializer
import androidx.work.Configuration
import dev.zacsweers.metro.createGraphFactory
import io.element.android.features.cachecleaner.api.CacheCleanerInitializer
import io.element.android.libraries.di.DependencyInjectionGraphOwner
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import io.element.android.x.di.AppGraph
import io.element.android.x.info.logApplicationInfo
import io.element.android.x.initializer.CrashInitializer
import io.element.android.x.initializer.PlatformInitializer
class ElementXApplication : Application(), DependencyInjectionGraphOwner {
class ElementXApplication : Application(), DependencyInjectionGraphOwner, Configuration.Provider {
override val graph: AppGraph = createGraphFactory<AppGraph.Factory>().create(this)
override val workManagerConfiguration: Configuration = Configuration.Builder()
.setWorkerFactory(MetroWorkerFactory(graph.workerProviders))
.build()
override fun onCreate() {
super.onCreate()
AppInitializer.getInstance(this).apply {
@@ -27,6 +33,7 @@ class ElementXApplication : Application(), DependencyInjectionGraphOwner {
initializeComponent(PlatformInitializer::class.java)
initializeComponent(CacheCleanerInitializer::class.java)
}
logApplicationInfo(this)
}
}

View File

@@ -8,16 +8,24 @@
package io.element.android.x.di
import android.content.Context
import androidx.work.ListenableWorker
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.DependencyGraph
import dev.zacsweers.metro.Multibinds
import dev.zacsweers.metro.Provides
import io.element.android.libraries.architecture.NodeFactoriesBindings
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import kotlin.reflect.KClass
@DependencyGraph(AppScope::class)
interface AppGraph : NodeFactoriesBindings {
val sessionGraphFactory: SessionGraph.Factory
@Multibinds
val workerProviders:
Map<KClass<out ListenableWorker>, MetroWorkerFactory.WorkerInstanceFactory<*>>
@DependencyGraph.Factory
interface Factory {
fun create(

View File

@@ -34,10 +34,12 @@ dependencies {
implementation(projects.libraries.testtags)
implementation(projects.libraries.uiStrings)
implementation(projects.libraries.dateformatter.api)
implementation(projects.libraries.workmanager.api)
api(projects.features.logout.api)
testCommonDependencies(libs, true)
testImplementation(projects.libraries.matrix.test)
testImplementation(projects.libraries.featureflag.test)
testImplementation(projects.libraries.sessionStorage.test)
testImplementation(projects.libraries.workmanager.test)
}

View File

@@ -26,6 +26,7 @@ import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.encryption.BackupState
import io.element.android.libraries.matrix.api.encryption.BackupUploadState
import io.element.android.libraries.matrix.api.encryption.EncryptionService
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -34,6 +35,7 @@ import kotlinx.coroutines.launch
class LogoutPresenter(
private val matrixClient: MatrixClient,
private val encryptionService: EncryptionService,
private val workManagerScheduler: WorkManagerScheduler,
) : Presenter<LogoutState> {
@Composable
override fun present(): LogoutState {
@@ -109,6 +111,9 @@ class LogoutPresenter(
ignoreSdkError: Boolean,
) = launch {
suspend {
// Cancel any pending work (e.g. notification sync)
workManagerScheduler.cancel(matrixClient.sessionId)
matrixClient.logout(userInitiated = true, ignoreSdkError)
}.runCatchingUpdatingState(logoutAction)
}

View File

@@ -14,6 +14,7 @@ import app.cash.turbine.test
import com.google.common.truth.Truth.assertThat
import io.element.android.libraries.architecture.AsyncAction
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.encryption.BackupState
import io.element.android.libraries.matrix.api.encryption.BackupUploadState
import io.element.android.libraries.matrix.api.encryption.EncryptionService
@@ -21,7 +22,9 @@ import io.element.android.libraries.matrix.api.encryption.RecoveryState
import io.element.android.libraries.matrix.test.AN_EXCEPTION
import io.element.android.libraries.matrix.test.FakeMatrixClient
import io.element.android.libraries.matrix.test.encryption.FakeEncryptionService
import io.element.android.libraries.workmanager.test.FakeWorkManagerScheduler
import io.element.android.tests.testutils.WarmUpRule
import io.element.android.tests.testutils.lambda.lambdaRecorder
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.runTest
@@ -145,7 +148,9 @@ class LogoutPresenterTest {
@Test
fun `present - logout then confirm`() = runTest {
val presenter = createLogoutPresenter()
val cancelWorkManagerJobsLambda = lambdaRecorder<SessionId, Unit> {}
val workManagerScheduler = FakeWorkManagerScheduler(cancelLambda = cancelWorkManagerJobsLambda)
val presenter = createLogoutPresenter(workManagerScheduler = workManagerScheduler)
moleculeFlow(RecompositionMode.Immediate) {
presenter.present()
}.test {
@@ -158,6 +163,8 @@ class LogoutPresenterTest {
assertThat(loadingState.logoutAction).isInstanceOf(AsyncAction.Loading::class.java)
val successState = awaitItem()
assertThat(successState.logoutAction).isInstanceOf(AsyncAction.Success::class.java)
cancelWorkManagerJobsLambda.assertions().isCalledOnce()
}
}
@@ -230,7 +237,9 @@ class LogoutPresenterTest {
internal fun createLogoutPresenter(
matrixClient: MatrixClient = FakeMatrixClient(),
encryptionService: EncryptionService = FakeEncryptionService(),
workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(cancelLambda = {}),
): LogoutPresenter = LogoutPresenter(
matrixClient = matrixClient,
encryptionService = encryptionService,
workManagerScheduler = workManagerScheduler,
)

View File

@@ -163,7 +163,7 @@ class DefaultBugReporterTest {
assertThat(foundValues["file"]).contains(fakePushRules)
// device_key now added given they are not null
// so is the push_rules value
// so is the file value for the included push_rules
assertThat(progressValues.size).isEqualTo(EXPECTED_NUMBER_OF_PROGRESS_VALUE + 2)
server.shutdown()

View File

@@ -20,6 +20,7 @@ lifecycle = "2.9.2"
activity = "1.11.0"
media3 = "1.8.0"
camera = "1.5.1"
work = "2.10.5"
# Compose
compose_bom = "2025.07.00"
@@ -94,6 +95,7 @@ androidx_camera_lifecycle = { module = "androidx.camera:camera-lifecycle", versi
androidx_camera_view = { module = "androidx.camera:camera-view", version.ref = "camera" }
androidx_camera_camera2 = { module = "androidx.camera:camera-camera2", version.ref = "camera" }
androidx_javascriptengine = "androidx.javascriptengine:javascriptengine:1.0.0"
androidx_workmanager_runtime = { module = "androidx.work:work-runtime-ktx", version.ref = "work" }
androidx_recyclerview = "androidx.recyclerview:recyclerview:1.4.0"
androidx_browser = "androidx.browser:browser:1.9.0"

View File

@@ -109,4 +109,12 @@ enum class FeatureFlags(
defaultValue = { false },
isFinished = false,
),
SyncNotificationsWithWorkManager(
key = "feature.sync_notifications_with_workmanager",
title = "Sync notifications with WorkManager",
description = "Use WorkManager to schedule notification sync tasks when a push is received." +
" This should improve reliability and battery usage.",
defaultValue = { false },
isFinished = false,
),
}

View File

@@ -0,0 +1,19 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.api.push
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
data class NotificationEventRequest(
val sessionId: SessionId,
val roomId: RoomId,
val eventId: EventId,
val providerInfo: String,
)

View File

@@ -0,0 +1,12 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.api.push
fun interface SyncOnNotifiableEvent {
suspend operator fun invoke(requests: List<NotificationEventRequest>)
}

View File

@@ -49,10 +49,12 @@ dependencies {
implementation(projects.libraries.network)
implementation(projects.libraries.matrix.api)
implementation(projects.libraries.matrixui)
implementation(projects.features.networkmonitor.api)
implementation(projects.libraries.preferences.api)
implementation(projects.libraries.sessionStorage.api)
implementation(projects.libraries.uiStrings)
implementation(projects.libraries.troubleshoot.api)
implementation(projects.libraries.workmanager.api)
implementation(projects.features.call.api)
implementation(projects.features.lockscreen.api)
implementation(projects.libraries.featureflag.api)
@@ -73,8 +75,10 @@ dependencies {
testImplementation(projects.libraries.pushproviders.test)
testImplementation(projects.libraries.pushstore.test)
testImplementation(projects.libraries.troubleshoot.test)
testImplementation(projects.libraries.workmanager.test)
testImplementation(projects.features.call.test)
testImplementation(projects.features.lockscreen.test)
testImplementation(projects.features.networkmonitor.test)
testImplementation(projects.services.appnavstate.test)
testImplementation(projects.services.toolbox.impl)
testImplementation(projects.services.toolbox.test)

View File

@@ -44,6 +44,7 @@ import io.element.android.libraries.matrix.api.timeline.item.event.TextMessageTy
import io.element.android.libraries.matrix.api.timeline.item.event.VideoMessageType
import io.element.android.libraries.matrix.api.timeline.item.event.VoiceMessageType
import io.element.android.libraries.matrix.ui.messages.toPlainText
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.R
import io.element.android.libraries.push.impl.notifications.model.InviteNotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableMessageEvent
@@ -95,7 +96,10 @@ class DefaultNotifiableEventResolver(
val client = matrixClientProvider.getOrRestore(sessionId).getOrElse {
return Result.failure(IllegalStateException("Couldn't get or restore client for session $sessionId"))
}
val ids = notificationEventRequests.groupBy { it.roomId }.mapValues { (_, value) -> value.map { it.eventId } }
val ids = notificationEventRequests.groupBy { it.roomId }
.mapValues { (_, requests) ->
requests.map { it.eventId }
}
// TODO this notificationData is not always valid at the moment, sometimes the Rust SDK can't fetch the matching event
val notificationsResult = client.notificationService.getNotifications(ids)

View File

@@ -8,13 +8,16 @@
package io.element.android.libraries.push.impl.notifications
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.Inject
import dev.zacsweers.metro.SingleIn
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.impl.workmanager.SyncNotificationWorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
@@ -27,18 +30,26 @@ import kotlinx.coroutines.launch
import timber.log.Timber
import kotlin.time.Duration.Companion.milliseconds
interface NotificationResolverQueue {
val results: SharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>
suspend fun enqueue(request: NotificationEventRequest)
}
/**
* This class is responsible for periodically batching notification requests and resolving them in a single call,
* so that we can avoid having to resolve each notification individually in the SDK.
*/
@OptIn(ExperimentalCoroutinesApi::class)
@SingleIn(AppScope::class)
@ContributesBinding(AppScope::class)
@Inject
class NotificationResolverQueue(
class DefaultNotificationResolverQueue(
private val notifiableEventResolver: NotifiableEventResolver,
@AppCoroutineScope
private val appCoroutineScope: CoroutineScope,
) {
private val workManagerScheduler: WorkManagerScheduler,
private val featureFlagService: FeatureFlagService,
) : NotificationResolverQueue {
companion object {
private const val BATCH_WINDOW_MS = 250L
}
@@ -50,7 +61,7 @@ class NotificationResolverQueue(
* A flow that emits pairs of a list of notification event requests and a map of the resolved events.
* The map contains the original request as the key and the resolved event as the value.
*/
val results: SharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>> = MutableSharedFlow()
override val results = MutableSharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>()
/**
* Enqueues a notification event request to be resolved.
@@ -58,7 +69,7 @@ class NotificationResolverQueue(
*
* @param request The notification event request to enqueue.
*/
suspend fun enqueue(request: NotificationEventRequest) {
override suspend fun enqueue(request: NotificationEventRequest) {
// Cancel previous processing job if it exists, acting as a debounce operation
Timber.d("Cancelling job: $currentProcessingJob")
currentProcessingJob?.cancel()
@@ -77,10 +88,15 @@ class NotificationResolverQueue(
appCoroutineScope.launch {
val groupedRequestsById = buildList {
while (!requestQueue.isEmpty) {
requestQueue.receiveCatching().getOrNull()?.let(this::add)
requestQueue.receiveCatching().getOrNull()?.let(::add)
}
}.groupBy { it.sessionId }
if (featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
for ((sessionId, requests) in groupedRequestsById) {
workManagerScheduler.submit(SyncNotificationWorkManagerRequest(sessionId, requests))
}
} else {
val sessionIds = groupedRequestsById.keys
for (sessionId in sessionIds) {
val requests = groupedRequestsById[sessionId].orEmpty()
@@ -89,16 +105,10 @@ class NotificationResolverQueue(
launch {
// No need for a Mutex since the SDK already has one internally
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty()
(results as MutableSharedFlow).emit(requests to notifications)
results.emit(requests to notifications)
}
}
}
}
}
}
data class NotificationEventRequest(
val sessionId: SessionId,
val roomId: RoomId,
val eventId: EventId,
val providerInfo: String,
)

View File

@@ -16,7 +16,11 @@ import io.element.android.features.call.api.ElementCallEntryPoint
import io.element.android.libraries.core.log.logger.LoggerTag
import io.element.android.libraries.core.meta.BuildMeta
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.exception.NotificationResolverException
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.history.onDiagnosticPush
import io.element.android.libraries.push.impl.history.onInvalidPushReceived
@@ -24,7 +28,6 @@ import io.element.android.libraries.push.impl.history.onSuccess
import io.element.android.libraries.push.impl.history.onUnableToResolveEvent
import io.element.android.libraries.push.impl.history.onUnableToRetrieveSession
import io.element.android.libraries.push.impl.notifications.FallbackNotificationFactory
import io.element.android.libraries.push.impl.notifications.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.channels.NotificationChannels
import io.element.android.libraries.push.impl.notifications.model.FallbackNotifiableEvent
@@ -65,6 +68,8 @@ class DefaultPushHandler(
@AppCoroutineScope
private val appCoroutineScope: CoroutineScope,
private val fallbackNotificationFactory: FallbackNotificationFactory,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val featureFlagService: FeatureFlagService,
) : PushHandler {
init {
processPushEventResults()
@@ -196,6 +201,10 @@ class DefaultPushHandler(
if (nonRingingCallEvents.isNotEmpty()) {
onNotifiableEventReceived.onNotifiableEventsReceived(nonRingingCallEvents)
}
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
syncOnNotifiableEvent(requests)
}
}
.launchIn(appCoroutineScope)
}

View File

@@ -7,41 +7,45 @@
package io.element.android.libraries.push.impl.push
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.Inject
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.push.impl.notifications.model.NotifiableEvent
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.services.appnavstate.api.AppForegroundStateService
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds
@ContributesBinding(AppScope::class)
@Inject
class SyncOnNotifiableEvent(
class DefaultSyncOnNotifiableEvent(
private val matrixClientProvider: MatrixClientProvider,
private val featureFlagService: FeatureFlagService,
private val appForegroundStateService: AppForegroundStateService,
private val dispatchers: CoroutineDispatchers,
) {
suspend operator fun invoke(notifiableEvents: List<NotifiableEvent>) = withContext(dispatchers.io) {
) : SyncOnNotifiableEvent {
override suspend operator fun invoke(requests: List<NotificationEventRequest>) = withContext(dispatchers.io) {
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncOnPush)) {
return@withContext
}
try {
val eventsBySession = notifiableEvents.groupBy { it.sessionId }
val eventsBySession = requests.groupBy { it.sessionId }
appForegroundStateService.updateIsSyncingNotificationEvent(true)
Timber.d("Starting opportunistic room list sync | In foreground: ${appForegroundStateService.isInForeground.value}")
for ((sessionId, events) in eventsBySession) {
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: continue
val eventsByRoomId = events.groupBy { it.roomId }
val roomIds = events.map { it.roomId }.distinct()
client.roomListService.subscribeToVisibleRooms(eventsByRoomId.keys.toList())
client.roomListService.subscribeToVisibleRooms(roomIds)
if (!appForegroundStateService.isInForeground.value) {
// Give the sync some time to complete in background

View File

@@ -27,11 +27,9 @@ class DefaultOnNotifiableEventReceived(
private val defaultNotificationDrawerManager: DefaultNotificationDrawerManager,
@AppCoroutineScope
private val coroutineScope: CoroutineScope,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
) : OnNotifiableEventReceived {
override fun onNotifiableEventsReceived(notifiableEvents: List<NotifiableEvent>) {
coroutineScope.launch {
launch { syncOnNotifiableEvent(notifiableEvents) }
defaultNotificationDrawerManager.onNotifiableEventsReceived(notifiableEvents.filter { it !is NotifiableRingingCallEvent })
}
}

View File

@@ -0,0 +1,123 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.Assisted
import dev.zacsweers.metro.AssistedFactory
import dev.zacsweers.metro.AssistedInject
import dev.zacsweers.metro.ContributesIntoMap
import dev.zacsweers.metro.binding
import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import io.element.android.libraries.workmanager.api.di.WorkerKey
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.json.Json
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds
@AssistedInject
class FetchNotificationsWorker(
@Assisted workerParams: WorkerParameters,
@ApplicationContext context: Context,
private val networkMonitor: NetworkMonitor,
private val eventResolver: NotifiableEventResolver,
private val queue: NotificationResolverQueue,
private val workManagerScheduler: WorkManagerScheduler,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val coroutineDispatchers: CoroutineDispatchers,
private val json: Json,
) : CoroutineWorker(context, workerParams) {
override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) {
Timber.d("FetchNotificationsWorker started")
val rawRequestsJson = inputData.getString("requests") ?: return@withContext Result.failure()
val requests = runCatchingExceptions {
json.decodeFromString<List<SyncNotificationWorkManagerRequest.Data>>(rawRequestsJson).map { it.toRequest() }
}.getOrElse {
Timber.e(it, "Failed to deserialize notification requests")
return@withContext Result.failure()
}
Timber.d("Deserialized ${requests.size} requests")
// Wait for network to be available, but not more than 10 seconds
val hasNetwork = withTimeoutOrNull(10.seconds) {
networkMonitor.connectivity.first { it == NetworkStatus.Connected }
} != null
if (!hasNetwork) {
Timber.w("No network, retrying later")
return@withContext Result.retry()
}
val failedSyncForSessions = mutableSetOf<SessionId>()
val groupedRequests = requests.groupBy { it.sessionId }
for ((sessionId, notificationRequests) in groupedRequests) {
Timber.d("Processing notification requests for session $sessionId")
eventResolver.resolveEvents(sessionId, notificationRequests)
.fold(
onSuccess = { result ->
// Update the resolved results in the queue
(queue.results as MutableSharedFlow).emit(requests to result)
},
onFailure = {
failedSyncForSessions += sessionId
Timber.e(it, "Failed to resolve notification events for session $sessionId")
}
)
}
// If there were failures for whole sessions, we retry all their requests
if (failedSyncForSessions.isNotEmpty()) {
for (failedSessionId in failedSyncForSessions) {
val requestsToRetry = groupedRequests[failedSessionId] ?: continue
Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
workManagerScheduler.submit(SyncNotificationWorkManagerRequest(failedSessionId, requestsToRetry))
}
}
Timber.d("Notifications processed successfully")
performOpportunisticSyncIfNeeded(groupedRequests)
Result.success()
}
private suspend fun performOpportunisticSyncIfNeeded(
groupedRequests: Map<SessionId, List<NotificationEventRequest>>,
) {
for ((sessionId, notificationRequests) in groupedRequests) {
runCatchingExceptions {
syncOnNotifiableEvent(notificationRequests)
}.onFailure {
Timber.e(it, "Failed to sync on notifiable events for session $sessionId")
}
}
}
@ContributesIntoMap(AppScope::class, binding = binding<MetroWorkerFactory.WorkerInstanceFactory<*>>())
@WorkerKey(FetchNotificationsWorker::class)
@AssistedFactory
abstract class Factory : MetroWorkerFactory.WorkerInstanceFactory<FetchNotificationsWorker>
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.OutOfQuotaPolicy
import androidx.work.WorkRequest
import androidx.work.workDataOf
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.workManagerTag
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import timber.log.Timber
import java.security.InvalidParameterException
class SyncNotificationWorkManagerRequest(
private val sessionId: SessionId,
private val notificationEventRequests: List<NotificationEventRequest>,
) : WorkManagerRequest {
private val json = Json { ignoreUnknownKeys = true }
override fun build(): Result<WorkRequest> {
if (notificationEventRequests.isEmpty()) {
return Result.failure(InvalidParameterException("notificationEventRequests cannot be empty"))
}
val json = runCatchingExceptions { json.encodeToString(notificationEventRequests.map { it.toData() }) }
.getOrElse {
Timber.e(it, "Failed to serialize notification requests")
return Result.failure(it)
}
Timber.d("Scheduling ${notificationEventRequests.size} notification requests with WorkManager for $sessionId")
return Result.success(
OneTimeWorkRequestBuilder<FetchNotificationsWorker>()
.setInputData(workDataOf("requests" to json))
.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
.setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC))
// TODO investigate using this instead of the resolver queue
// .setInputMerger()
.build()
)
}
@Serializable
data class Data(
@SerialName("session_id")
val sessionId: String,
@SerialName("room_id")
val roomId: String,
@SerialName("event_id")
val eventId: String,
@SerialName("provider_info")
val providerInfo: String,
) {
fun toRequest(): NotificationEventRequest {
return NotificationEventRequest(
sessionId = SessionId(sessionId),
roomId = RoomId(roomId),
eventId = EventId(eventId),
providerInfo = providerInfo,
)
}
}
}
private fun NotificationEventRequest.toData(): SyncNotificationWorkManagerRequest.Data {
return SyncNotificationWorkManagerRequest.Data(
sessionId = sessionId.value,
roomId = roomId.value,
eventId = eventId.value,
providerInfo = providerInfo,
)
}

View File

@@ -44,6 +44,7 @@ import io.element.android.libraries.matrix.test.FakeMatrixClientProvider
import io.element.android.libraries.matrix.test.notification.FakeNotificationService
import io.element.android.libraries.matrix.test.notification.aNotificationData
import io.element.android.libraries.matrix.test.permalink.FakePermalinkParser
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.fake.FakeNotificationMediaRepo
import io.element.android.libraries.push.impl.notifications.fixtures.aNotifiableMessageEvent
import io.element.android.libraries.push.impl.notifications.model.FallbackNotifiableEvent

View File

@@ -8,6 +8,7 @@
package io.element.android.libraries.push.impl.notifications
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.tests.testutils.lambda.lambdaError

View File

@@ -0,0 +1,28 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.notifications.fixtures
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.test.AN_EVENT_ID
import io.element.android.libraries.matrix.test.A_ROOM_ID
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.push.api.push.NotificationEventRequest
fun aNotificationEventRequest(
sessionId: SessionId = A_SESSION_ID,
roomId: RoomId = A_ROOM_ID,
eventId: EventId = AN_EVENT_ID,
providerInfo: String = "providerInfo",
) = NotificationEventRequest(
sessionId = sessionId,
roomId = roomId,
eventId = eventId,
providerInfo = providerInfo,
)

View File

@@ -14,6 +14,8 @@ import com.google.common.truth.Truth.assertThat
import io.element.android.features.call.api.CallType
import io.element.android.features.call.test.FakeElementCallEntryPoint
import io.element.android.libraries.core.meta.BuildMeta
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.featureflag.test.FakeFeatureFlagService
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
@@ -28,12 +30,13 @@ import io.element.android.libraries.matrix.test.A_SECRET
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.matrix.test.A_USER_ID
import io.element.android.libraries.matrix.test.core.aBuildMeta
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.history.FakePushHistoryService
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.notifications.DefaultNotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.FakeNotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.FallbackNotificationFactory
import io.element.android.libraries.push.impl.notifications.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.channels.FakeNotificationChannels
import io.element.android.libraries.push.impl.notifications.fixtures.aNotifiableCallEvent
import io.element.android.libraries.push.impl.notifications.fixtures.aNotifiableMessageEvent
@@ -48,6 +51,8 @@ import io.element.android.libraries.pushstore.api.clientsecret.PushClientSecret
import io.element.android.libraries.pushstore.test.userpushstore.FakeUserPushStore
import io.element.android.libraries.pushstore.test.userpushstore.FakeUserPushStoreFactory
import io.element.android.libraries.pushstore.test.userpushstore.clientsecret.FakePushClientSecret
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.test.FakeWorkManagerScheduler
import io.element.android.services.toolbox.test.strings.FakeStringProvider
import io.element.android.services.toolbox.test.systemclock.FakeSystemClock
import io.element.android.tests.testutils.lambda.any
@@ -131,6 +136,45 @@ class DefaultPushHandlerTest {
.isCalledOnce()
}
@Test
fun `when classical PushData is received and the workmanager flag is enabled, the work is scheduled`() = runTest {
val aNotifiableMessageEvent = aNotifiableMessageEvent()
val notifiableEventResult =
lambdaRecorder<SessionId, List<NotificationEventRequest>, Result<Map<NotificationEventRequest, Result<ResolvedPushEvent>>>> { _, _ ->
val request = NotificationEventRequest(A_SESSION_ID, A_ROOM_ID, AN_EVENT_ID, A_PUSHER_INFO)
Result.success(mapOf(request to Result.success(ResolvedPushEvent.Event(aNotifiableMessageEvent))))
}
val incrementPushCounterResult = lambdaRecorder<Unit> {}
val aPushData = PushData(
eventId = AN_EVENT_ID,
roomId = A_ROOM_ID,
unread = 0,
clientSecret = A_SECRET,
)
val featureFlagService = FakeFeatureFlagService(mapOf(FeatureFlags.SyncNotificationsWithWorkManager.key to true))
val submitWorkLambda = lambdaRecorder<WorkManagerRequest, Unit> {}
val workManagerScheduler = FakeWorkManagerScheduler(submitLambda = submitWorkLambda)
val defaultPushHandler = createDefaultPushHandler(
notifiableEventsResult = notifiableEventResult,
pushClientSecret = FakePushClientSecret(
getUserIdFromSecretResult = { A_USER_ID }
),
incrementPushCounterResult = incrementPushCounterResult,
featureFlagService = featureFlagService,
workManagerScheduler = workManagerScheduler,
)
defaultPushHandler.handle(aPushData, A_PUSHER_INFO)
advanceTimeBy(300.milliseconds)
submitWorkLambda.assertions().isCalledOnce()
incrementPushCounterResult.assertions()
.isCalledOnce()
}
@Test
fun `when classical PushData is received, but notifications are disabled, nothing happen`() =
runTest {
@@ -644,6 +688,9 @@ class DefaultPushHandlerTest {
elementCallEntryPoint: FakeElementCallEntryPoint = FakeElementCallEntryPoint(),
notificationChannels: FakeNotificationChannels = FakeNotificationChannels(),
pushHistoryService: PushHistoryService = FakePushHistoryService(),
syncOnNotifiableEvent: SyncOnNotifiableEvent = SyncOnNotifiableEvent {},
featureFlagService: FakeFeatureFlagService = FakeFeatureFlagService(),
workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(),
): DefaultPushHandler {
return DefaultPushHandler(
onNotifiableEventReceived = FakeOnNotifiableEventReceived(onNotifiableEventsReceived),
@@ -661,12 +708,20 @@ class DefaultPushHandlerTest {
elementCallEntryPoint = elementCallEntryPoint,
notificationChannels = notificationChannels,
pushHistoryService = pushHistoryService,
resolverQueue = NotificationResolverQueue(notifiableEventResolver = FakeNotifiableEventResolver(notifiableEventsResult), backgroundScope),
// We don't use a fake here so we can perform tests that are a bit more end to end
resolverQueue = DefaultNotificationResolverQueue(
notifiableEventResolver = FakeNotifiableEventResolver(notifiableEventsResult),
appCoroutineScope = backgroundScope,
workManagerScheduler = workManagerScheduler,
featureFlagService = featureFlagService,
),
appCoroutineScope = backgroundScope,
fallbackNotificationFactory = FallbackNotificationFactory(
clock = FakeSystemClock(),
stringProvider = FakeStringProvider(),
)
),
syncOnNotifiableEvent = syncOnNotifiableEvent,
featureFlagService = featureFlagService,
)
}
}

View File

@@ -18,22 +18,17 @@ import io.element.android.libraries.matrix.test.FakeMatrixClient
import io.element.android.libraries.matrix.test.FakeMatrixClientProvider
import io.element.android.libraries.matrix.test.room.FakeBaseRoom
import io.element.android.libraries.matrix.test.room.FakeJoinedRoom
import io.element.android.libraries.matrix.test.room.aRoomInfo
import io.element.android.libraries.matrix.test.sync.FakeSyncService
import io.element.android.libraries.push.impl.notifications.fixtures.aNotifiableCallEvent
import io.element.android.libraries.push.impl.notifications.fixtures.aNotifiableMessageEvent
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.notifications.fixtures.aNotificationEventRequest
import io.element.android.services.appnavstate.test.FakeAppForegroundStateService
import io.element.android.tests.testutils.lambda.assert
import io.element.android.tests.testutils.lambda.lambdaRecorder
import io.element.android.tests.testutils.testCoroutineDispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import org.junit.Test
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.time.Duration.Companion.seconds
class SyncOnNotifiableEventTest {
private val startSyncLambda = lambdaRecorder<Result<Unit>> { Result.success(Unit) }
@@ -57,60 +52,19 @@ class SyncOnNotifiableEventTest {
givenGetRoomResult(A_ROOM_ID, room)
}
private val notifiableEvent = aNotifiableMessageEvent()
private val incomingCallNotifiableEvent = aNotifiableCallEvent()
private val notificationRequest = aNotificationEventRequest()
@Test
fun `when feature flag is disabled, nothing happens`() = runTest {
val sut = createSyncOnNotifiableEvent(client = client, isSyncOnPushEnabled = false)
sut(listOf(notifiableEvent))
sut(listOf(notificationRequest))
assert(startSyncLambda).isNeverCalled()
assert(stopSyncLambda).isNeverCalled()
assert(subscribeToSyncLambda).isNeverCalled()
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when feature flag is enabled, a ringing call waits until the room is in 'in-call' state`() = runTest {
val appForegroundStateService = FakeAppForegroundStateService(
initialForegroundValue = false,
)
val sut = createSyncOnNotifiableEvent(client = client, appForegroundStateService = appForegroundStateService, isSyncOnPushEnabled = true)
val unlocked = AtomicBoolean(false)
launch {
advanceTimeBy(1.seconds)
unlocked.set(true)
room.givenRoomInfo(aRoomInfo(hasRoomCall = true))
}
sut(listOf(incomingCallNotifiableEvent))
// The process was completed before the timeout
assertThat(unlocked.get()).isTrue()
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when feature flag is enabled, a ringing call waits until the room is in 'in-call' state or timeouts`() = runTest {
val appForegroundStateService = FakeAppForegroundStateService(
initialForegroundValue = false,
)
val sut = createSyncOnNotifiableEvent(client = client, appForegroundStateService = appForegroundStateService, isSyncOnPushEnabled = true)
val unlocked = AtomicBoolean(false)
launch {
advanceTimeBy(120.seconds)
unlocked.set(true)
room.givenRoomInfo(aRoomInfo(hasRoomCall = true))
}
sut(listOf(incomingCallNotifiableEvent))
// Didn't unlock before the timeout
assertThat(unlocked.get()).isFalse()
}
@Test
fun `when feature flag is enabled and app is in background, sync is started and stopped`() = runTest {
val appForegroundStateService = FakeAppForegroundStateService(
@@ -120,7 +74,7 @@ class SyncOnNotifiableEventTest {
appForegroundStateService.isSyncingNotificationEvent.test {
syncService.emitSyncState(SyncState.Running)
sut(listOf(notifiableEvent))
sut(listOf(notificationRequest))
// It's initially false
assertThat(awaitItem()).isFalse()
@@ -141,8 +95,8 @@ class SyncOnNotifiableEventTest {
val sut = createSyncOnNotifiableEvent(client = client, appForegroundStateService = appForegroundStateService, isSyncOnPushEnabled = true)
appForegroundStateService.isSyncingNotificationEvent.test {
launch { sut(listOf(notifiableEvent)) }
launch { sut(listOf(notifiableEvent)) }
launch { sut(listOf(notificationRequest)) }
launch { sut(listOf(notificationRequest)) }
// It's initially false
assertThat(awaitItem()).isFalse()
@@ -168,7 +122,7 @@ class SyncOnNotifiableEventTest {
)
)
val matrixClientProvider = FakeMatrixClientProvider { Result.success(client) }
return SyncOnNotifiableEvent(
return DefaultSyncOnNotifiableEvent(
matrixClientProvider = matrixClientProvider,
featureFlagService = featureFlagService,
appForegroundStateService = appForegroundStateService,

View File

@@ -0,0 +1,206 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import androidx.work.Data
import androidx.work.ListenableWorker
import androidx.work.WorkerParameters
import androidx.work.impl.utils.taskexecutor.WorkManagerTaskExecutor
import androidx.work.workDataOf
import com.google.common.truth.Truth.assertThat
import com.google.common.util.concurrent.ListenableFuture
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.notifications.FakeNotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.fixtures.aNotifiableMessageEvent
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.test.notifications.FakeNotificationResolverQueue
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import io.element.android.libraries.workmanager.test.FakeWorkManagerScheduler
import io.element.android.tests.testutils.lambda.lambdaRecorder
import io.element.android.tests.testutils.testCoroutineDispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.json.Json
import org.junit.Test
import org.junit.runner.RunWith
import java.util.UUID
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(AndroidJUnit4::class)
class FetchNotificationWorkerTest {
@Test
fun `test - success`() = runTest {
var synced = false
val syncOnNotifiableEventLambda = SyncOnNotifiableEvent { synced = true }
val queue = FakeNotificationResolverQueue(
processingLambda = { Result.success(ResolvedPushEvent.Event(aNotifiableMessageEvent())) }
)
val worker = createWorker(
input = """
[
{
"session_id": "@alice:matrix.org",
"room_id": "!roomid:matrix.org",
"event_id": "$1436ebk:matrix.org",
"provider_info": "some_info"
}
]
""".trimIndent(),
queue = queue,
syncOnNotifiableEvent = syncOnNotifiableEventLambda,
)
val result = worker.doWork()
// The process finished successfully
assertThat(result).isEqualTo(ListenableWorker.Result.success())
// A result was emitted
assertThat(queue.results.replayCache).isNotEmpty()
// An opportunistic sync was triggered
assertThat(synced).isTrue()
}
@Test
fun `test - invalid input fails the work`() = runTest {
val worker = createWorker(
input = """
[
{
"session_id": "!alice:matrix.org",
"room_id": "!roomid:matrix.org",
"event_id": "$1436ebk:matrix.org",
"provider_info": "some_info"
}
]
""".trimIndent(),
)
val result = worker.doWork()
// The process failed
assertThat(result).isEqualTo(ListenableWorker.Result.failure())
}
@Test
fun `test - no network connectivity fails the work`() = runTest {
val networkMonitor = FakeNetworkMonitor(initialStatus = NetworkStatus.Disconnected)
val worker = createWorker(
input = """
[
{
"session_id": "@alice:matrix.org",
"room_id": "!roomid:matrix.org",
"event_id": "$1436ebk:matrix.org",
"provider_info": "some_info"
}
]
""".trimIndent(),
networkMonitor = networkMonitor,
)
val result = worker.doWork()
advanceTimeBy(10.seconds)
// The process failed due to a timeout in getting the network connectivity, a retry is scheduled
assertThat(result).isEqualTo(ListenableWorker.Result.retry())
}
@Test
fun `test - failing to resolve events re-schedules the work`() = runTest {
val submitWorkerLambda = lambdaRecorder<WorkManagerRequest, Unit> {}
val scheduler = FakeWorkManagerScheduler(submitLambda = submitWorkerLambda)
val resolver = FakeNotifiableEventResolver(
resolveEventsResult = { _, _ -> Result.failure(Exception("Failed to resolve events")) }
)
val worker = createWorker(
input = """
[
{
"session_id": "@alice:matrix.org",
"room_id": "!roomid:matrix.org",
"event_id": "$1436ebk:matrix.org",
"provider_info": "some_info"
}
]
""".trimIndent(),
eventResolver = resolver,
workManagerScheduler = scheduler,
)
val result = worker.doWork()
// The process was considered successful, but a retry was scheduled due to the failure to resolve events
assertThat(result).isEqualTo(ListenableWorker.Result.success())
submitWorkerLambda.assertions().isCalledOnce()
}
private fun TestScope.createWorker(
input: String,
networkMonitor: FakeNetworkMonitor = FakeNetworkMonitor(),
eventResolver: FakeNotifiableEventResolver = FakeNotifiableEventResolver(resolveEventsResult = { _, _ -> Result.success(emptyMap()) }),
queue: NotificationResolverQueue = FakeNotificationResolverQueue(
processingLambda = { Result.success(ResolvedPushEvent.Event(aNotifiableMessageEvent())) }
),
workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(),
syncOnNotifiableEvent: SyncOnNotifiableEvent = SyncOnNotifiableEvent {},
) = FetchNotificationsWorker(
workerParams = createWorkerParams(workDataOf("requests" to input)),
context = InstrumentationRegistry.getInstrumentation().context,
networkMonitor = networkMonitor,
eventResolver = eventResolver,
queue = queue,
workManagerScheduler = workManagerScheduler,
syncOnNotifiableEvent = syncOnNotifiableEvent,
coroutineDispatchers = testCoroutineDispatchers(),
json = Json {},
)
private fun TestScope.createWorkerParams(
inputData: Data = Data.EMPTY,
): WorkerParameters = WorkerParameters(
UUID.randomUUID(),
inputData,
emptySet(),
WorkerParameters.RuntimeExtras(),
0,
0,
Executors.newSingleThreadExecutor(),
backgroundScope.coroutineContext,
WorkManagerTaskExecutor(Executors.newSingleThreadExecutor()),
MetroWorkerFactory(emptyMap()),
{ context, id, data -> FakeListenableFuture() },
{ context, id, foregroundInfo -> FakeListenableFuture() },
)
}
class FakeListenableFuture<T> : ListenableFuture<T> {
override fun addListener(listener: Runnable, executor: Executor) = Unit
override fun cancel(mayInterruptIfRunning: Boolean): Boolean = true
override fun get(): T? = null
override fun get(timeout: Long, unit: TimeUnit?): T? = null
override fun isCancelled(): Boolean = false
override fun isDone(): Boolean = false
}

View File

@@ -0,0 +1,50 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import androidx.work.OneTimeWorkRequest
import androidx.work.hasKeyWithValueOfType
import com.google.common.truth.Truth.assertThat
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.push.impl.notifications.fixtures.aNotificationEventRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.workManagerTag
import kotlinx.coroutines.test.runTest
import org.junit.Test
class SyncNotificationWorkManagerRequestTest {
@Test
fun `build - success`() = runTest {
val request = SyncNotificationWorkManagerRequest(
sessionId = A_SESSION_ID,
notificationEventRequests = listOf(aNotificationEventRequest())
)
val result = request.build()
assertThat(result.isSuccess).isTrue()
result.getOrNull()!!.run {
assertThat(this).isInstanceOf(OneTimeWorkRequest::class.java)
assertThat(workSpec.input.hasKeyWithValueOfType<String>("requests")).isTrue()
assertThat(workSpec.expedited).isTrue()
assertThat(workSpec.traceTag).isEqualTo(workManagerTag(A_SESSION_ID, WorkManagerRequestType.NOTIFICATION_SYNC))
}
}
@Test
fun `build - empty list of requests fails`() = runTest {
val request = SyncNotificationWorkManagerRequest(
sessionId = A_SESSION_ID,
notificationEventRequests = emptyList()
)
val result = request.build()
assertThat(result.isFailure).isTrue()
}
// TODO add test for invalid serialization (how?)
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.test.notifications
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import kotlinx.coroutines.flow.MutableSharedFlow
class FakeNotificationResolverQueue(
private val processingLambda: suspend (NotificationEventRequest) -> Result<ResolvedPushEvent>,
) : NotificationResolverQueue {
override val results = MutableSharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>(replay = 1)
override suspend fun enqueue(request: NotificationEventRequest) {
results.emit(listOf(request) to mapOf(request to processingLambda(request)))
}
}

View File

@@ -0,0 +1,23 @@
import extension.setupDependencyInjection
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
plugins {
id("io.element.android-library")
}
android {
namespace = "io.element.android.libraries.workmanager.api"
}
setupDependencyInjection()
dependencies {
api(libs.androidx.workmanager.runtime)
implementation(projects.libraries.matrix.api)
}

View File

@@ -0,0 +1,14 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.workmanager.api
import androidx.work.WorkRequest
interface WorkManagerRequest {
fun build(): Result<WorkRequest>
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.workmanager.api
import io.element.android.libraries.matrix.api.core.SessionId
interface WorkManagerScheduler {
fun submit(workManagerRequest: WorkManagerRequest)
fun cancel(sessionId: SessionId)
}
fun workManagerTag(sessionId: SessionId, requestType: WorkManagerRequestType): String {
val prefix = when (requestType) {
WorkManagerRequestType.NOTIFICATION_SYNC -> "notifications"
}
return "$prefix-$sessionId"
}
enum class WorkManagerRequestType {
NOTIFICATION_SYNC,
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.workmanager.api.di
import android.content.Context
import androidx.work.ListenableWorker
import androidx.work.WorkerFactory
import androidx.work.WorkerParameters
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.Inject
import kotlin.reflect.KClass
@ContributesBinding(AppScope::class)
@Inject
class MetroWorkerFactory(
val workerProviders: Map<KClass<out ListenableWorker>, WorkerInstanceFactory<*>>
) : WorkerFactory() {
override fun createWorker(
appContext: Context,
workerClassName: String,
workerParameters: WorkerParameters,
): ListenableWorker? {
return workerProviders[Class.forName(workerClassName).kotlin]?.create(workerParameters)
}
interface WorkerInstanceFactory<T : ListenableWorker> {
fun create(params: WorkerParameters): T
}
}

View File

@@ -0,0 +1,18 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.workmanager.api.di
import androidx.work.ListenableWorker
import dev.zacsweers.metro.MapKey
import kotlin.reflect.KClass
/** A [MapKey] annotation for binding Worker in a multibinding map. */
@MapKey
@Target(AnnotationTarget.CLASS, AnnotationTarget.PROPERTY)
@Retention(AnnotationRetention.RUNTIME)
annotation class WorkerKey(val value: KClass<out ListenableWorker>)

View File

@@ -0,0 +1,24 @@
import extension.setupDependencyInjection
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
plugins {
id("io.element.android-library")
}
android {
namespace = "io.element.android.libraries.workmanager.impl"
}
setupDependencyInjection()
dependencies {
api(projects.libraries.workmanager.api)
implementation(projects.libraries.core)
implementation(projects.libraries.matrix.api)
implementation(projects.libraries.di)
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.workmanager.impl
import android.content.Context
import androidx.work.WorkManager
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.Inject
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.libraries.workmanager.api.workManagerTag
import timber.log.Timber
@ContributesBinding(AppScope::class)
@Inject
class DefaultWorkManagerScheduler(
@ApplicationContext private val context: Context,
) : WorkManagerScheduler {
private val workManager by lazy { WorkManager.getInstance(context) }
override fun submit(workManagerRequest: WorkManagerRequest) {
workManagerRequest.build().fold(
onSuccess = {
workManager.enqueue(it)
},
onFailure = {
Timber.e(it, "Failed to build WorkManager request $workManagerRequest")
}
)
}
override fun cancel(sessionId: SessionId) {
Timber.d("Cancelling work for sessionId: $sessionId")
for (requestType in WorkManagerRequestType.entries) {
workManager.cancelAllWorkByTag(workManagerTag(sessionId, requestType))
}
}
}

View File

@@ -0,0 +1,19 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
plugins {
id("io.element.android-library")
}
android {
namespace = "io.element.android.libraries.workmanager.test"
}
dependencies {
api(projects.libraries.workmanager.api)
implementation(projects.libraries.matrix.api)
implementation(projects.tests.testutils)
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.workmanager.test
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.tests.testutils.lambda.lambdaError
class FakeWorkManagerScheduler(
private val submitLambda: (WorkManagerRequest) -> Unit = { lambdaError() },
private val cancelLambda: (SessionId) -> Unit = { lambdaError() },
) : WorkManagerScheduler {
override fun submit(workManagerRequest: WorkManagerRequest) {
submitLambda(workManagerRequest)
}
override fun cancel(sessionId: SessionId) {
cancelLambda(sessionId)
}
}

View File

@@ -118,6 +118,7 @@ fun DependencyHandlerScope.allLibrariesImpl() {
implementation(project(":libraries:fullscreenintent:impl"))
implementation(project(":libraries:wellknown:impl"))
implementation(project(":libraries:oidc:impl"))
implementation(project(":libraries:workmanager:impl"))
}
fun DependencyHandlerScope.allServicesImpl() {

View File

@@ -100,6 +100,7 @@ class KonsistClassNameTest {
.withoutName(
"FakeFileSystem",
"FakeImageLoader",
"FakeListenableFuture",
)
.assertTrue {
val interfaceName = it.name
@@ -149,6 +150,7 @@ class KonsistClassNameTest {
"Factory",
"TimelineController",
"TimelineMediaGalleryDataSource",
"MetroWorkerFactory",
)
.withoutNameStartingWith(
"Accompanist",