From 716640486e27daf3515ece35e2ac252bf3a5ee64 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:17:01 +0200 Subject: [PATCH] Make the task State::clock() return a &dyn Clock instead of a BoxClock --- crates/storage/src/clock.rs | 2 +- crates/tasks/src/database.rs | 2 +- crates/tasks/src/email.rs | 2 +- crates/tasks/src/lib.rs | 12 ++++++------ crates/tasks/src/matrix.rs | 6 +++--- crates/tasks/src/new_queue.rs | 2 +- crates/tasks/src/recovery.rs | 2 +- crates/tasks/src/sessions.rs | 22 +++++++++++----------- crates/tasks/src/user.rs | 10 +++++----- 9 files changed, 30 insertions(+), 30 deletions(-) diff --git a/crates/storage/src/clock.rs b/crates/storage/src/clock.rs index df32114bb..bf31835f0 100644 --- a/crates/storage/src/clock.rs +++ b/crates/storage/src/clock.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicI64}; use chrono::{DateTime, TimeZone, Utc}; /// Represents a clock which can give the current date and time -pub trait Clock: Sync { +pub trait Clock: Send + Sync { /// Get the current date and time fn now(&self) -> DateTime; } diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index a659eb265..bc14215f8 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -24,7 +24,7 @@ impl RunnableJob for CleanupExpiredTokensJob { let count = repo .oauth2_access_token() - .cleanup_revoked(&clock) + .cleanup_revoked(clock) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index 99170d2eb..8e685843a 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -100,7 +100,7 @@ impl RunnableJob for SendEmailAuthenticationCodeJob { .user_email() .add_authentication_code( &mut rng, - &clock, + clock, Duration::minutes(5), // TODO: make this configurable &user_email_authentication, code, diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 58737574b..796f83396 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -10,7 +10,7 @@ use mas_data_model::SiteConfig; use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; -use mas_storage::{BoxClock, BoxRepository, RepositoryError, RepositoryFactory, SystemClock}; +use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory, SystemClock}; use mas_storage_pg::PgRepositoryFactory; use new_queue::QueueRunnerError; use opentelemetry::metrics::Meter; @@ -39,7 +39,7 @@ static METER: LazyLock = LazyLock::new(|| { struct State { repository_factory: PgRepositoryFactory, mailer: Mailer, - clock: SystemClock, + clock: Arc, homeserver: Arc, url_builder: UrlBuilder, site_config: SiteConfig, @@ -48,7 +48,7 @@ struct State { impl State { pub fn new( repository_factory: PgRepositoryFactory, - clock: SystemClock, + clock: impl Clock + 'static, mailer: Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, @@ -57,7 +57,7 @@ impl State { Self { repository_factory, mailer, - clock, + clock: Arc::new(clock), homeserver: Arc::new(homeserver), url_builder, site_config, @@ -68,8 +68,8 @@ impl State { self.repository_factory.pool() } - pub fn clock(&self) -> BoxClock { - Box::new(self.clock.clone()) + pub fn clock(&self) -> &dyn Clock { + &self.clock } pub fn mailer(&self) -> &Mailer { diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index 40e38899c..92e15f448 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -80,7 +80,7 @@ impl RunnableJob for ProvisionUserJob { // Schedule a device sync job let sync_device_job = SyncDevicesJob::new(&user); repo.queue_job() - .schedule_job(&mut rng, &clock, sync_device_job) + .schedule_job(&mut rng, clock, sync_device_job) .await .map_err(JobError::retry)?; @@ -118,7 +118,7 @@ impl RunnableJob for ProvisionDeviceJob { // Schedule a device sync job repo.queue_job() - .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user)) .await .map_err(JobError::retry)?; @@ -154,7 +154,7 @@ impl RunnableJob for DeleteDeviceJob { // Schedule a device sync job repo.queue_job() - .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user)) .await .map_err(JobError::retry)?; diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index dcb578011..777844e2f 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -246,7 +246,7 @@ impl QueueWorker { .map_err(QueueRunnerError::StartTransaction)?; let mut repo = PgRepository::from_conn(txn); - let registration = repo.queue_worker().register(&mut rng, &clock).await?; + let registration = repo.queue_worker().register(&mut rng, clock).await?; tracing::Span::current().record("worker.id", tracing::field::display(registration.id)); repo.into_inner() .commit() diff --git a/crates/tasks/src/recovery.rs b/crates/tasks/src/recovery.rs index ab51e420b..03e02d57b 100644 --- a/crates/tasks/src/recovery.rs +++ b/crates/tasks/src/recovery.rs @@ -75,7 +75,7 @@ impl RunnableJob for SendAccountRecoveryEmailsJob { let ticket = repo .user_recovery() - .add_ticket(&mut rng, &clock, &session, &email, ticket) + .add_ticket(&mut rng, clock, &session, &email, ticket) .await .map_err(JobError::retry)?; diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index f457fe04f..d10d908da 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -39,7 +39,7 @@ impl RunnableJob for ExpireInactiveSessionsJob { repo.queue_job() .schedule_job( &mut rng, - &clock, + clock, ExpireInactiveOAuthSessionsJob::new(now - ttl), ) .await @@ -50,7 +50,7 @@ impl RunnableJob for ExpireInactiveSessionsJob { repo.queue_job() .schedule_job( &mut rng, - &clock, + clock, ExpireInactiveCompatSessionsJob::new(now - ttl), ) .await @@ -61,7 +61,7 @@ impl RunnableJob for ExpireInactiveSessionsJob { repo.queue_job() .schedule_job( &mut rng, - &clock, + clock, ExpireInactiveUserSessionsJob::new(now - ttl), ) .await @@ -104,7 +104,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { if let Some(job) = self.next(&page) { tracing::info!("Scheduling job to expire the next batch of inactive sessions"); repo.queue_job() - .schedule_job(&mut rng, &clock, job) + .schedule_job(&mut rng, clock, job) .await .map_err(JobError::retry)?; } @@ -117,7 +117,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { repo.queue_job() .schedule_job_later( &mut rng, - &clock, + clock, SyncDevicesJob::new_for_id(user_id), clock.now() + delay, ) @@ -128,7 +128,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { } repo.oauth2_session() - .finish(&clock, edge) + .finish(clock, edge) .await .map_err(JobError::retry)?; } @@ -168,7 +168,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { if let Some(job) = self.next(&page) { tracing::info!("Scheduling job to expire the next batch of inactive sessions"); repo.queue_job() - .schedule_job(&mut rng, &clock, job) + .schedule_job(&mut rng, clock, job) .await .map_err(JobError::retry)?; } @@ -180,7 +180,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { repo.queue_job() .schedule_job_later( &mut rng, - &clock, + clock, SyncDevicesJob::new_for_id(edge.user_id), clock.now() + delay, ) @@ -190,7 +190,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { } repo.compat_session() - .finish(&clock, edge) + .finish(clock, edge) .await .map_err(JobError::retry)?; } @@ -223,14 +223,14 @@ impl RunnableJob for ExpireInactiveUserSessionsJob { if let Some(job) = self.next(&page) { tracing::info!("Scheduling job to expire the next batch of inactive sessions"); repo.queue_job() - .schedule_job(&mut rng, &clock, job) + .schedule_job(&mut rng, clock, job) .await .map_err(JobError::retry)?; } for edge in page.edges { repo.browser_session() - .finish(&clock, edge) + .finish(clock, edge) .await .map_err(JobError::retry)?; } diff --git a/crates/tasks/src/user.rs b/crates/tasks/src/user.rs index 9c9db9097..b5f64dd42 100644 --- a/crates/tasks/src/user.rs +++ b/crates/tasks/src/user.rs @@ -44,14 +44,14 @@ impl RunnableJob for DeactivateUserJob { // Let's first lock & deactivate the user let user = repo .user() - .lock(&clock, user) + .lock(clock, user) .await .context("Failed to lock user") .map_err(JobError::retry)?; let user = repo .user() - .deactivate(&clock, user) + .deactivate(clock, user) .await .context("Failed to deactivate user") .map_err(JobError::retry)?; @@ -60,7 +60,7 @@ impl RunnableJob for DeactivateUserJob { let n = repo .browser_session() .finish_bulk( - &clock, + clock, BrowserSessionFilter::new().for_user(&user).active_only(), ) .await @@ -70,7 +70,7 @@ impl RunnableJob for DeactivateUserJob { let n = repo .oauth2_session() .finish_bulk( - &clock, + clock, OAuth2SessionFilter::new().for_user(&user).active_only(), ) .await @@ -80,7 +80,7 @@ impl RunnableJob for DeactivateUserJob { let n = repo .compat_session() .finish_bulk( - &clock, + clock, CompatSessionFilter::new().for_user(&user).active_only(), ) .await