Make the task State::clock() return a &dyn Clock instead of a BoxClock

This commit is contained in:
Quentin Gliech
2025-07-09 17:17:01 +02:00
parent 912fafe9bd
commit 716640486e
9 changed files with 30 additions and 30 deletions

View File

@@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicI64};
use chrono::{DateTime, TimeZone, Utc};
/// Represents a clock which can give the current date and time
pub trait Clock: Sync {
pub trait Clock: Send + Sync {
/// Get the current date and time
fn now(&self) -> DateTime<Utc>;
}

View File

@@ -24,7 +24,7 @@ impl RunnableJob for CleanupExpiredTokensJob {
let count = repo
.oauth2_access_token()
.cleanup_revoked(&clock)
.cleanup_revoked(clock)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;

View File

@@ -100,7 +100,7 @@ impl RunnableJob for SendEmailAuthenticationCodeJob {
.user_email()
.add_authentication_code(
&mut rng,
&clock,
clock,
Duration::minutes(5), // TODO: make this configurable
&user_email_authentication,
code,

View File

@@ -10,7 +10,7 @@ use mas_data_model::SiteConfig;
use mas_email::Mailer;
use mas_matrix::HomeserverConnection;
use mas_router::UrlBuilder;
use mas_storage::{BoxClock, BoxRepository, RepositoryError, RepositoryFactory, SystemClock};
use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory, SystemClock};
use mas_storage_pg::PgRepositoryFactory;
use new_queue::QueueRunnerError;
use opentelemetry::metrics::Meter;
@@ -39,7 +39,7 @@ static METER: LazyLock<Meter> = LazyLock::new(|| {
struct State {
repository_factory: PgRepositoryFactory,
mailer: Mailer,
clock: SystemClock,
clock: Arc<dyn Clock>,
homeserver: Arc<dyn HomeserverConnection>,
url_builder: UrlBuilder,
site_config: SiteConfig,
@@ -48,7 +48,7 @@ struct State {
impl State {
pub fn new(
repository_factory: PgRepositoryFactory,
clock: SystemClock,
clock: impl Clock + 'static,
mailer: Mailer,
homeserver: impl HomeserverConnection + 'static,
url_builder: UrlBuilder,
@@ -57,7 +57,7 @@ impl State {
Self {
repository_factory,
mailer,
clock,
clock: Arc::new(clock),
homeserver: Arc::new(homeserver),
url_builder,
site_config,
@@ -68,8 +68,8 @@ impl State {
self.repository_factory.pool()
}
pub fn clock(&self) -> BoxClock {
Box::new(self.clock.clone())
pub fn clock(&self) -> &dyn Clock {
&self.clock
}
pub fn mailer(&self) -> &Mailer {

View File

@@ -80,7 +80,7 @@ impl RunnableJob for ProvisionUserJob {
// Schedule a device sync job
let sync_device_job = SyncDevicesJob::new(&user);
repo.queue_job()
.schedule_job(&mut rng, &clock, sync_device_job)
.schedule_job(&mut rng, clock, sync_device_job)
.await
.map_err(JobError::retry)?;
@@ -118,7 +118,7 @@ impl RunnableJob for ProvisionDeviceJob {
// Schedule a device sync job
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
.await
.map_err(JobError::retry)?;
@@ -154,7 +154,7 @@ impl RunnableJob for DeleteDeviceJob {
// Schedule a device sync job
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
.await
.map_err(JobError::retry)?;

View File

@@ -246,7 +246,7 @@ impl QueueWorker {
.map_err(QueueRunnerError::StartTransaction)?;
let mut repo = PgRepository::from_conn(txn);
let registration = repo.queue_worker().register(&mut rng, &clock).await?;
let registration = repo.queue_worker().register(&mut rng, clock).await?;
tracing::Span::current().record("worker.id", tracing::field::display(registration.id));
repo.into_inner()
.commit()

View File

@@ -75,7 +75,7 @@ impl RunnableJob for SendAccountRecoveryEmailsJob {
let ticket = repo
.user_recovery()
.add_ticket(&mut rng, &clock, &session, &email, ticket)
.add_ticket(&mut rng, clock, &session, &email, ticket)
.await
.map_err(JobError::retry)?;

View File

@@ -39,7 +39,7 @@ impl RunnableJob for ExpireInactiveSessionsJob {
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
clock,
ExpireInactiveOAuthSessionsJob::new(now - ttl),
)
.await
@@ -50,7 +50,7 @@ impl RunnableJob for ExpireInactiveSessionsJob {
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
clock,
ExpireInactiveCompatSessionsJob::new(now - ttl),
)
.await
@@ -61,7 +61,7 @@ impl RunnableJob for ExpireInactiveSessionsJob {
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
clock,
ExpireInactiveUserSessionsJob::new(now - ttl),
)
.await
@@ -104,7 +104,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.schedule_job(&mut rng, clock, job)
.await
.map_err(JobError::retry)?;
}
@@ -117,7 +117,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
repo.queue_job()
.schedule_job_later(
&mut rng,
&clock,
clock,
SyncDevicesJob::new_for_id(user_id),
clock.now() + delay,
)
@@ -128,7 +128,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
}
repo.oauth2_session()
.finish(&clock, edge)
.finish(clock, edge)
.await
.map_err(JobError::retry)?;
}
@@ -168,7 +168,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.schedule_job(&mut rng, clock, job)
.await
.map_err(JobError::retry)?;
}
@@ -180,7 +180,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
repo.queue_job()
.schedule_job_later(
&mut rng,
&clock,
clock,
SyncDevicesJob::new_for_id(edge.user_id),
clock.now() + delay,
)
@@ -190,7 +190,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
}
repo.compat_session()
.finish(&clock, edge)
.finish(clock, edge)
.await
.map_err(JobError::retry)?;
}
@@ -223,14 +223,14 @@ impl RunnableJob for ExpireInactiveUserSessionsJob {
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.schedule_job(&mut rng, clock, job)
.await
.map_err(JobError::retry)?;
}
for edge in page.edges {
repo.browser_session()
.finish(&clock, edge)
.finish(clock, edge)
.await
.map_err(JobError::retry)?;
}

View File

@@ -44,14 +44,14 @@ impl RunnableJob for DeactivateUserJob {
// Let's first lock & deactivate the user
let user = repo
.user()
.lock(&clock, user)
.lock(clock, user)
.await
.context("Failed to lock user")
.map_err(JobError::retry)?;
let user = repo
.user()
.deactivate(&clock, user)
.deactivate(clock, user)
.await
.context("Failed to deactivate user")
.map_err(JobError::retry)?;
@@ -60,7 +60,7 @@ impl RunnableJob for DeactivateUserJob {
let n = repo
.browser_session()
.finish_bulk(
&clock,
clock,
BrowserSessionFilter::new().for_user(&user).active_only(),
)
.await
@@ -70,7 +70,7 @@ impl RunnableJob for DeactivateUserJob {
let n = repo
.oauth2_session()
.finish_bulk(
&clock,
clock,
OAuth2SessionFilter::new().for_user(&user).active_only(),
)
.await
@@ -80,7 +80,7 @@ impl RunnableJob for DeactivateUserJob {
let n = repo
.compat_session()
.finish_bulk(
&clock,
clock,
CompatSessionFilter::new().for_user(&user).active_only(),
)
.await