Allow running jobs from the job queue in tests (#4775)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3341,6 +3341,7 @@ dependencies = [
|
||||
"mas-config",
|
||||
"mas-context",
|
||||
"mas-data-model",
|
||||
"mas-email",
|
||||
"mas-http",
|
||||
"mas-i18n",
|
||||
"mas-iana",
|
||||
@@ -3352,6 +3353,7 @@ dependencies = [
|
||||
"mas-router",
|
||||
"mas-storage",
|
||||
"mas-storage-pg",
|
||||
"mas-tasks",
|
||||
"mas-templates",
|
||||
"mime",
|
||||
"minijinja",
|
||||
|
||||
@@ -173,8 +173,9 @@ impl Options {
|
||||
test_mailer_in_background(&mailer, Duration::from_secs(30));
|
||||
|
||||
info!("Starting task worker");
|
||||
mas_tasks::init(
|
||||
mas_tasks::init_and_run(
|
||||
PgRepositoryFactory::new(pool.clone()),
|
||||
SystemClock::default(),
|
||||
&mailer,
|
||||
homeserver_connection.clone(),
|
||||
url_builder.clone(),
|
||||
|
||||
@@ -10,6 +10,7 @@ use clap::Parser;
|
||||
use figment::Figment;
|
||||
use mas_config::{AppConfig, ConfigurationSection};
|
||||
use mas_router::UrlBuilder;
|
||||
use mas_storage::SystemClock;
|
||||
use mas_storage_pg::PgRepositoryFactory;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
@@ -63,8 +64,9 @@ impl Options {
|
||||
drop(config);
|
||||
|
||||
info!("Starting task scheduler");
|
||||
mas_tasks::init(
|
||||
mas_tasks::init_and_run(
|
||||
PgRepositoryFactory::new(pool.clone()),
|
||||
SystemClock::default(),
|
||||
&mailer,
|
||||
conn,
|
||||
url_builder,
|
||||
|
||||
@@ -72,6 +72,7 @@ mas-axum-utils.workspace = true
|
||||
mas-config.workspace = true
|
||||
mas-context.workspace = true
|
||||
mas-data-model.workspace = true
|
||||
mas-email.workspace = true
|
||||
mas-http.workspace = true
|
||||
mas-i18n.workspace = true
|
||||
mas-iana.workspace = true
|
||||
@@ -83,6 +84,7 @@ mas-policy.workspace = true
|
||||
mas-router.workspace = true
|
||||
mas-storage.workspace = true
|
||||
mas-storage-pg.workspace = true
|
||||
mas-tasks.workspace = true
|
||||
mas-templates.workspace = true
|
||||
oauth2-types.workspace = true
|
||||
zxcvbn.workspace = true
|
||||
|
||||
@@ -105,8 +105,9 @@ pub async fn handler(
|
||||
mod tests {
|
||||
use chrono::Duration;
|
||||
use hyper::{Request, StatusCode};
|
||||
use insta::assert_json_snapshot;
|
||||
use mas_storage::{Clock, RepositoryAccess, user::UserRepository};
|
||||
use sqlx::{PgPool, types::Json};
|
||||
use sqlx::PgPool;
|
||||
|
||||
use crate::test_utils::{RequestBuilderExt, ResponseExt, TestState, setup};
|
||||
|
||||
@@ -137,15 +138,37 @@ mod tests {
|
||||
serde_json::json!(state.clock.now())
|
||||
);
|
||||
|
||||
// It should have scheduled a deactivation job for the user
|
||||
// XXX: we don't have a good way to look for the deactivation job
|
||||
let job: Json<serde_json::Value> = sqlx::query_scalar(
|
||||
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
|
||||
)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("Deactivation job to be scheduled");
|
||||
assert_eq!(job["user_id"], serde_json::json!(user.id));
|
||||
// Make sure to run the jobs in the queue
|
||||
state.run_jobs_in_queue().await;
|
||||
|
||||
let request = Request::get(format!("/api/admin/v1/users/{}", user.id))
|
||||
.bearer(&token)
|
||||
.empty();
|
||||
let response = state.request(request).await;
|
||||
response.assert_status(StatusCode::OK);
|
||||
let body: serde_json::Value = response.json();
|
||||
|
||||
assert_json_snapshot!(body, @r#"
|
||||
{
|
||||
"data": {
|
||||
"type": "user",
|
||||
"id": "01FSHN9AG0MZAA6S4AF7CTV32E",
|
||||
"attributes": {
|
||||
"username": "alice",
|
||||
"created_at": "2022-01-16T14:40:00Z",
|
||||
"locked_at": "2022-01-16T14:40:00Z",
|
||||
"deactivated_at": "2022-01-16T14:40:00Z",
|
||||
"admin": false
|
||||
},
|
||||
"links": {
|
||||
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
|
||||
}
|
||||
},
|
||||
"links": {
|
||||
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
|
||||
}
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
|
||||
@@ -179,15 +202,37 @@ mod tests {
|
||||
serde_json::json!(state.clock.now())
|
||||
);
|
||||
|
||||
// It should have scheduled a deactivation job for the user
|
||||
// XXX: we don't have a good way to look for the deactivation job
|
||||
let job: Json<serde_json::Value> = sqlx::query_scalar(
|
||||
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
|
||||
)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("Deactivation job to be scheduled");
|
||||
assert_eq!(job["user_id"], serde_json::json!(user.id));
|
||||
// Make sure to run the jobs in the queue
|
||||
state.run_jobs_in_queue().await;
|
||||
|
||||
let request = Request::get(format!("/api/admin/v1/users/{}", user.id))
|
||||
.bearer(&token)
|
||||
.empty();
|
||||
let response = state.request(request).await;
|
||||
response.assert_status(StatusCode::OK);
|
||||
let body: serde_json::Value = response.json();
|
||||
|
||||
assert_json_snapshot!(body, @r#"
|
||||
{
|
||||
"data": {
|
||||
"type": "user",
|
||||
"id": "01FSHN9AG0MZAA6S4AF7CTV32E",
|
||||
"attributes": {
|
||||
"username": "alice",
|
||||
"created_at": "2022-01-16T14:40:00Z",
|
||||
"locked_at": "2022-01-16T14:40:00Z",
|
||||
"deactivated_at": "2022-01-16T14:41:00Z",
|
||||
"admin": false
|
||||
},
|
||||
"links": {
|
||||
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
|
||||
}
|
||||
},
|
||||
"links": {
|
||||
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
|
||||
}
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
|
||||
|
||||
@@ -29,6 +29,7 @@ use mas_axum_utils::{
|
||||
};
|
||||
use mas_config::RateLimitingConfig;
|
||||
use mas_data_model::SiteConfig;
|
||||
use mas_email::{MailTransport, Mailer};
|
||||
use mas_i18n::Translator;
|
||||
use mas_keystore::{Encrypter, JsonWebKey, JsonWebKeySet, Keystore, PrivateKey};
|
||||
use mas_matrix::{HomeserverConnection, MockHomeserverConnection};
|
||||
@@ -39,6 +40,7 @@ use mas_storage::{
|
||||
clock::MockClock,
|
||||
};
|
||||
use mas_storage_pg::PgRepositoryFactory;
|
||||
use mas_tasks::QueueWorker;
|
||||
use mas_templates::{SiteConfigExt, Templates};
|
||||
use oauth2_types::{registration::ClientRegistrationResponse, requests::AccessTokenResponse};
|
||||
use rand::SeedableRng;
|
||||
@@ -113,6 +115,7 @@ pub(crate) struct TestState {
|
||||
pub rng: Arc<Mutex<ChaChaRng>>,
|
||||
pub http_client: reqwest::Client,
|
||||
pub task_tracker: TaskTracker,
|
||||
queue_worker: Arc<tokio::sync::Mutex<QueueWorker>>,
|
||||
|
||||
#[allow(dead_code)] // It is used, as it will cancel the CancellationToken when dropped
|
||||
cancellation_drop_guard: Arc<DropGuard>,
|
||||
@@ -235,6 +238,27 @@ impl TestState {
|
||||
shutdown_token.child_token(),
|
||||
);
|
||||
|
||||
let mailer = Mailer::new(
|
||||
templates.clone(),
|
||||
MailTransport::blackhole(),
|
||||
"hello@example.com".parse().unwrap(),
|
||||
"hello@example.com".parse().unwrap(),
|
||||
);
|
||||
|
||||
let queue_worker = mas_tasks::init(
|
||||
PgRepositoryFactory::new(pool.clone()),
|
||||
Arc::clone(&clock),
|
||||
&mailer,
|
||||
homeserver_connection.clone(),
|
||||
url_builder.clone(),
|
||||
&site_config,
|
||||
shutdown_token.child_token(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let queue_worker = Arc::new(tokio::sync::Mutex::new(queue_worker));
|
||||
|
||||
Ok(Self {
|
||||
repository_factory: PgRepositoryFactory::new(pool),
|
||||
templates,
|
||||
@@ -254,10 +278,19 @@ impl TestState {
|
||||
rng,
|
||||
http_client,
|
||||
task_tracker,
|
||||
queue_worker,
|
||||
cancellation_drop_guard: Arc::new(shutdown_token.drop_guard()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Run all the available jobs in the queue.
|
||||
///
|
||||
/// Panics if it fails to run the jobs (but not on job failures!)
|
||||
pub async fn run_jobs_in_queue(&self) {
|
||||
let mut queue = self.queue_worker.lock().await;
|
||||
queue.process_all_jobs_in_tests().await.unwrap();
|
||||
}
|
||||
|
||||
/// Reset the test utils to a fresh state, with the same configuration.
|
||||
pub async fn reset(self) -> Self {
|
||||
let site_config = self.site_config.clone();
|
||||
|
||||
@@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicI64};
|
||||
use chrono::{DateTime, TimeZone, Utc};
|
||||
|
||||
/// Represents a clock which can give the current date and time
|
||||
pub trait Clock: Sync {
|
||||
pub trait Clock: Send + Sync {
|
||||
/// Get the current date and time
|
||||
fn now(&self) -> DateTime<Utc>;
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ impl RunnableJob for CleanupExpiredTokensJob {
|
||||
|
||||
let count = repo
|
||||
.oauth2_access_token()
|
||||
.cleanup_revoked(&clock)
|
||||
.cleanup_revoked(clock)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
@@ -100,7 +100,7 @@ impl RunnableJob for SendEmailAuthenticationCodeJob {
|
||||
.user_email()
|
||||
.add_authentication_code(
|
||||
&mut rng,
|
||||
&clock,
|
||||
clock,
|
||||
Duration::minutes(5), // TODO: make this configurable
|
||||
&user_email_authentication,
|
||||
code,
|
||||
|
||||
@@ -10,7 +10,7 @@ use mas_data_model::SiteConfig;
|
||||
use mas_email::Mailer;
|
||||
use mas_matrix::HomeserverConnection;
|
||||
use mas_router::UrlBuilder;
|
||||
use mas_storage::{BoxClock, BoxRepository, RepositoryError, RepositoryFactory, SystemClock};
|
||||
use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory};
|
||||
use mas_storage_pg::PgRepositoryFactory;
|
||||
use new_queue::QueueRunnerError;
|
||||
use opentelemetry::metrics::Meter;
|
||||
@@ -18,6 +18,8 @@ use rand::SeedableRng;
|
||||
use sqlx::{Pool, Postgres};
|
||||
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
||||
|
||||
pub use crate::new_queue::QueueWorker;
|
||||
|
||||
mod database;
|
||||
mod email;
|
||||
mod matrix;
|
||||
@@ -39,7 +41,7 @@ static METER: LazyLock<Meter> = LazyLock::new(|| {
|
||||
struct State {
|
||||
repository_factory: PgRepositoryFactory,
|
||||
mailer: Mailer,
|
||||
clock: SystemClock,
|
||||
clock: Arc<dyn Clock>,
|
||||
homeserver: Arc<dyn HomeserverConnection>,
|
||||
url_builder: UrlBuilder,
|
||||
site_config: SiteConfig,
|
||||
@@ -48,7 +50,7 @@ struct State {
|
||||
impl State {
|
||||
pub fn new(
|
||||
repository_factory: PgRepositoryFactory,
|
||||
clock: SystemClock,
|
||||
clock: impl Clock + 'static,
|
||||
mailer: Mailer,
|
||||
homeserver: impl HomeserverConnection + 'static,
|
||||
url_builder: UrlBuilder,
|
||||
@@ -57,7 +59,7 @@ impl State {
|
||||
Self {
|
||||
repository_factory,
|
||||
mailer,
|
||||
clock,
|
||||
clock: Arc::new(clock),
|
||||
homeserver: Arc::new(homeserver),
|
||||
url_builder,
|
||||
site_config,
|
||||
@@ -68,8 +70,8 @@ impl State {
|
||||
self.repository_factory.pool()
|
||||
}
|
||||
|
||||
pub fn clock(&self) -> BoxClock {
|
||||
Box::new(self.clock.clone())
|
||||
pub fn clock(&self) -> &dyn Clock {
|
||||
&self.clock
|
||||
}
|
||||
|
||||
pub fn mailer(&self) -> &Mailer {
|
||||
@@ -99,29 +101,31 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialise the workers.
|
||||
/// Initialise the worker, without running it.
|
||||
///
|
||||
/// This is mostly useful for tests.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function can fail if the database connection fails.
|
||||
pub async fn init(
|
||||
repository_factory: PgRepositoryFactory,
|
||||
clock: impl Clock + 'static,
|
||||
mailer: &Mailer,
|
||||
homeserver: impl HomeserverConnection + 'static,
|
||||
url_builder: UrlBuilder,
|
||||
site_config: &SiteConfig,
|
||||
cancellation_token: CancellationToken,
|
||||
task_tracker: &TaskTracker,
|
||||
) -> Result<(), QueueRunnerError> {
|
||||
) -> Result<QueueWorker, QueueRunnerError> {
|
||||
let state = State::new(
|
||||
repository_factory,
|
||||
SystemClock::default(),
|
||||
clock,
|
||||
mailer.clone(),
|
||||
homeserver,
|
||||
url_builder,
|
||||
site_config.clone(),
|
||||
);
|
||||
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;
|
||||
let mut worker = QueueWorker::new(state, cancellation_token).await?;
|
||||
|
||||
worker
|
||||
.register_handler::<mas_storage::queue::CleanupExpiredTokensJob>()
|
||||
@@ -157,6 +161,36 @@ pub async fn init(
|
||||
mas_storage::queue::PruneStalePolicyDataJob,
|
||||
);
|
||||
|
||||
Ok(worker)
|
||||
}
|
||||
|
||||
/// Initialise the worker and run it.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function can fail if the database connection fails.
|
||||
#[expect(clippy::too_many_arguments, reason = "this is fine")]
|
||||
pub async fn init_and_run(
|
||||
repository_factory: PgRepositoryFactory,
|
||||
clock: impl Clock + 'static,
|
||||
mailer: &Mailer,
|
||||
homeserver: impl HomeserverConnection + 'static,
|
||||
url_builder: UrlBuilder,
|
||||
site_config: &SiteConfig,
|
||||
cancellation_token: CancellationToken,
|
||||
task_tracker: &TaskTracker,
|
||||
) -> Result<(), QueueRunnerError> {
|
||||
let worker = init(
|
||||
repository_factory,
|
||||
clock,
|
||||
mailer,
|
||||
homeserver,
|
||||
url_builder,
|
||||
site_config,
|
||||
cancellation_token,
|
||||
)
|
||||
.await?;
|
||||
|
||||
task_tracker.spawn(worker.run());
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -80,7 +80,7 @@ impl RunnableJob for ProvisionUserJob {
|
||||
// Schedule a device sync job
|
||||
let sync_device_job = SyncDevicesJob::new(&user);
|
||||
repo.queue_job()
|
||||
.schedule_job(&mut rng, &clock, sync_device_job)
|
||||
.schedule_job(&mut rng, clock, sync_device_job)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
|
||||
@@ -118,7 +118,7 @@ impl RunnableJob for ProvisionDeviceJob {
|
||||
|
||||
// Schedule a device sync job
|
||||
repo.queue_job()
|
||||
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
|
||||
.schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
|
||||
@@ -154,7 +154,7 @@ impl RunnableJob for DeleteDeviceJob {
|
||||
|
||||
// Schedule a device sync job
|
||||
repo.queue_job()
|
||||
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
|
||||
.schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ use opentelemetry::{
|
||||
metrics::{Counter, Histogram, UpDownCounter},
|
||||
};
|
||||
use rand::{Rng, RngCore, distributions::Uniform};
|
||||
use rand_chacha::ChaChaRng;
|
||||
use serde::de::DeserializeOwned;
|
||||
use sqlx::{
|
||||
Acquire, Either,
|
||||
@@ -195,8 +194,6 @@ struct ScheduleDefinition {
|
||||
}
|
||||
|
||||
pub struct QueueWorker {
|
||||
rng: ChaChaRng,
|
||||
clock: Box<dyn Clock + Send>,
|
||||
listener: PgListener,
|
||||
registration: Worker,
|
||||
am_i_leader: bool,
|
||||
@@ -217,7 +214,7 @@ impl QueueWorker {
|
||||
skip_all,
|
||||
fields(worker.id)
|
||||
)]
|
||||
pub async fn new(
|
||||
pub(crate) async fn new(
|
||||
state: State,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Result<Self, QueueRunnerError> {
|
||||
@@ -246,7 +243,7 @@ impl QueueWorker {
|
||||
.map_err(QueueRunnerError::StartTransaction)?;
|
||||
let mut repo = PgRepository::from_conn(txn);
|
||||
|
||||
let registration = repo.queue_worker().register(&mut rng, &clock).await?;
|
||||
let registration = repo.queue_worker().register(&mut rng, clock).await?;
|
||||
tracing::Span::current().record("worker.id", tracing::field::display(registration.id));
|
||||
repo.into_inner()
|
||||
.commit()
|
||||
@@ -278,8 +275,6 @@ impl QueueWorker {
|
||||
let cancellation_guard = cancellation_token.clone().drop_guard();
|
||||
|
||||
Ok(Self {
|
||||
rng,
|
||||
clock,
|
||||
listener,
|
||||
registration,
|
||||
am_i_leader: false,
|
||||
@@ -294,7 +289,7 @@ impl QueueWorker {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn register_handler<T: RunnableJob + InsertableJob>(&mut self) -> &mut Self {
|
||||
pub(crate) fn register_handler<T: RunnableJob + InsertableJob>(&mut self) -> &mut Self {
|
||||
// There is a potential panic here, which is fine as it's going to be caught
|
||||
// within the job task
|
||||
let factory = |payload: JobPayload| {
|
||||
@@ -307,7 +302,7 @@ impl QueueWorker {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_schedule<T: InsertableJob>(
|
||||
pub(crate) fn add_schedule<T: InsertableJob>(
|
||||
&mut self,
|
||||
schedule_name: &'static str,
|
||||
expression: Schedule,
|
||||
@@ -325,7 +320,7 @@ impl QueueWorker {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
pub(crate) async fn run(mut self) {
|
||||
if let Err(e) = self.run_inner().await {
|
||||
tracing::error!(
|
||||
error = &e as &dyn std::error::Error,
|
||||
@@ -349,7 +344,7 @@ impl QueueWorker {
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "worker.setup_schedules", skip_all)]
|
||||
pub async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> {
|
||||
pub(crate) async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> {
|
||||
let schedules: Vec<_> = self.schedules.iter().map(|s| s.schedule_name).collect();
|
||||
|
||||
// Start a transaction on the existing PgListener connection
|
||||
@@ -397,6 +392,9 @@ impl QueueWorker {
|
||||
async fn shutdown(&mut self) -> Result<(), QueueRunnerError> {
|
||||
tracing::info!("Shutting down worker");
|
||||
|
||||
let clock = self.state.clock();
|
||||
let mut rng = self.state.rng();
|
||||
|
||||
// Start a transaction on the existing PgListener connection
|
||||
let txn = self
|
||||
.listener
|
||||
@@ -421,13 +419,13 @@ impl QueueWorker {
|
||||
|
||||
// Wait for all the jobs to finish
|
||||
self.tracker
|
||||
.process_jobs(&mut self.rng, &self.clock, &mut repo, true)
|
||||
.process_jobs(&mut rng, clock, &mut repo, true)
|
||||
.await?;
|
||||
|
||||
// Tell the other workers we're shutting down
|
||||
// This also releases the leader election lease
|
||||
repo.queue_worker()
|
||||
.shutdown(&self.clock, &self.registration)
|
||||
.shutdown(clock, &self.registration)
|
||||
.await?;
|
||||
|
||||
repo.into_inner()
|
||||
@@ -440,12 +438,12 @@ impl QueueWorker {
|
||||
|
||||
#[tracing::instrument(name = "worker.wait_until_wakeup", skip_all)]
|
||||
async fn wait_until_wakeup(&mut self) -> Result<(), QueueRunnerError> {
|
||||
let mut rng = self.state.rng();
|
||||
|
||||
// This is to make sure we wake up every second to do the maintenance tasks
|
||||
// We add a little bit of random jitter to the duration, so that we don't get
|
||||
// fully synced workers waking up at the same time after each notification
|
||||
let sleep_duration = self
|
||||
.rng
|
||||
.sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION));
|
||||
let sleep_duration = rng.sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION));
|
||||
let wakeup_sleep = tokio::time::sleep(sleep_duration);
|
||||
|
||||
tokio::select! {
|
||||
@@ -490,7 +488,9 @@ impl QueueWorker {
|
||||
)]
|
||||
async fn tick(&mut self) -> Result<(), QueueRunnerError> {
|
||||
tracing::debug!("Tick");
|
||||
let now = self.clock.now();
|
||||
let clock = self.state.clock();
|
||||
let mut rng = self.state.rng();
|
||||
let now = clock.now();
|
||||
|
||||
// Start a transaction on the existing PgListener connection
|
||||
let txn = self
|
||||
@@ -505,25 +505,25 @@ impl QueueWorker {
|
||||
if now - self.last_heartbeat >= chrono::Duration::minutes(1) {
|
||||
tracing::info!("Sending heartbeat");
|
||||
repo.queue_worker()
|
||||
.heartbeat(&self.clock, &self.registration)
|
||||
.heartbeat(clock, &self.registration)
|
||||
.await?;
|
||||
self.last_heartbeat = now;
|
||||
}
|
||||
|
||||
// Remove any dead worker leader leases
|
||||
repo.queue_worker()
|
||||
.remove_leader_lease_if_expired(&self.clock)
|
||||
.remove_leader_lease_if_expired(clock)
|
||||
.await?;
|
||||
|
||||
// Try to become (or stay) the leader
|
||||
let leader = repo
|
||||
.queue_worker()
|
||||
.try_get_leader_lease(&self.clock, &self.registration)
|
||||
.try_get_leader_lease(clock, &self.registration)
|
||||
.await?;
|
||||
|
||||
// Process any job task which finished
|
||||
self.tracker
|
||||
.process_jobs(&mut self.rng, &self.clock, &mut repo, false)
|
||||
.process_jobs(&mut rng, clock, &mut repo, false)
|
||||
.await?;
|
||||
|
||||
// Compute how many jobs we should fetch at most
|
||||
@@ -538,7 +538,7 @@ impl QueueWorker {
|
||||
let queues = self.tracker.queues();
|
||||
let jobs = repo
|
||||
.queue_job()
|
||||
.reserve(&self.clock, &self.registration, &queues, max_jobs_to_fetch)
|
||||
.reserve(clock, &self.registration, &queues, max_jobs_to_fetch)
|
||||
.await?;
|
||||
|
||||
for Job {
|
||||
@@ -592,6 +592,9 @@ impl QueueWorker {
|
||||
return Err(QueueRunnerError::NotLeader);
|
||||
}
|
||||
|
||||
let clock = self.state.clock();
|
||||
let mut rng = self.state.rng();
|
||||
|
||||
// Start a transaction on the existing PgListener connection
|
||||
let txn = self
|
||||
.listener
|
||||
@@ -633,7 +636,7 @@ impl QueueWorker {
|
||||
// Look at the state of schedules in the database
|
||||
let schedules_status = repo.queue_schedule().list().await?;
|
||||
|
||||
let now = self.clock.now();
|
||||
let now = clock.now();
|
||||
for schedule in &self.schedules {
|
||||
// Find the schedule status from the database
|
||||
let Some(schedule_status) = schedules_status
|
||||
@@ -670,8 +673,8 @@ impl QueueWorker {
|
||||
|
||||
repo.queue_job()
|
||||
.schedule_later(
|
||||
&mut self.rng,
|
||||
&self.clock,
|
||||
&mut rng,
|
||||
clock,
|
||||
schedule.queue_name,
|
||||
schedule.payload.clone(),
|
||||
serde_json::json!({}),
|
||||
@@ -684,16 +687,13 @@ impl QueueWorker {
|
||||
// We also check if the worker is dead, and if so, we shutdown all the dead
|
||||
// workers that haven't checked in the last two minutes
|
||||
repo.queue_worker()
|
||||
.shutdown_dead_workers(&self.clock, Duration::minutes(2))
|
||||
.shutdown_dead_workers(clock, Duration::minutes(2))
|
||||
.await?;
|
||||
|
||||
// TODO: mark tasks those workers had as lost
|
||||
|
||||
// Mark all the scheduled jobs as available
|
||||
let scheduled = repo
|
||||
.queue_job()
|
||||
.schedule_available_jobs(&self.clock)
|
||||
.await?;
|
||||
let scheduled = repo.queue_job().schedule_available_jobs(clock).await?;
|
||||
match scheduled {
|
||||
0 => {}
|
||||
1 => tracing::info!("One scheduled job marked as available"),
|
||||
@@ -713,6 +713,73 @@ impl QueueWorker {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process all the pending jobs in the queue.
|
||||
/// This should only be called in tests!
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function can fail if the database connection fails.
|
||||
pub async fn process_all_jobs_in_tests(&mut self) -> Result<(), QueueRunnerError> {
|
||||
// I swear, I'm the leader!
|
||||
self.am_i_leader = true;
|
||||
|
||||
// First, perform the leader duties. This will make sure that we schedule
|
||||
// recurring jobs.
|
||||
self.perform_leader_duties().await?;
|
||||
|
||||
let clock = self.state.clock();
|
||||
let mut rng = self.state.rng();
|
||||
|
||||
// Grab the connection from the PgListener
|
||||
let txn = self
|
||||
.listener
|
||||
.begin()
|
||||
.await
|
||||
.map_err(QueueRunnerError::StartTransaction)?;
|
||||
let mut repo = PgRepository::from_conn(txn);
|
||||
|
||||
// Spawn all the jobs in the database
|
||||
let queues = self.tracker.queues();
|
||||
let jobs = repo
|
||||
.queue_job()
|
||||
// I really hope that we don't spawn more than 10k jobs in tests
|
||||
.reserve(clock, &self.registration, &queues, 10_000)
|
||||
.await?;
|
||||
|
||||
for Job {
|
||||
id,
|
||||
queue_name,
|
||||
payload,
|
||||
metadata,
|
||||
attempt,
|
||||
} in jobs
|
||||
{
|
||||
let cancellation_token = self.cancellation_token.child_token();
|
||||
let start = Instant::now();
|
||||
let context = JobContext {
|
||||
id,
|
||||
metadata,
|
||||
queue_name,
|
||||
attempt,
|
||||
start,
|
||||
cancellation_token,
|
||||
};
|
||||
|
||||
self.tracker.spawn_job(self.state.clone(), context, payload);
|
||||
}
|
||||
|
||||
self.tracker
|
||||
.process_jobs(&mut rng, clock, &mut repo, true)
|
||||
.await?;
|
||||
|
||||
repo.into_inner()
|
||||
.commit()
|
||||
.await
|
||||
.map_err(QueueRunnerError::CommitTransaction)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks running jobs
|
||||
|
||||
@@ -75,7 +75,7 @@ impl RunnableJob for SendAccountRecoveryEmailsJob {
|
||||
|
||||
let ticket = repo
|
||||
.user_recovery()
|
||||
.add_ticket(&mut rng, &clock, &session, &email, ticket)
|
||||
.add_ticket(&mut rng, clock, &session, &email, ticket)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ impl RunnableJob for ExpireInactiveSessionsJob {
|
||||
repo.queue_job()
|
||||
.schedule_job(
|
||||
&mut rng,
|
||||
&clock,
|
||||
clock,
|
||||
ExpireInactiveOAuthSessionsJob::new(now - ttl),
|
||||
)
|
||||
.await
|
||||
@@ -50,7 +50,7 @@ impl RunnableJob for ExpireInactiveSessionsJob {
|
||||
repo.queue_job()
|
||||
.schedule_job(
|
||||
&mut rng,
|
||||
&clock,
|
||||
clock,
|
||||
ExpireInactiveCompatSessionsJob::new(now - ttl),
|
||||
)
|
||||
.await
|
||||
@@ -61,7 +61,7 @@ impl RunnableJob for ExpireInactiveSessionsJob {
|
||||
repo.queue_job()
|
||||
.schedule_job(
|
||||
&mut rng,
|
||||
&clock,
|
||||
clock,
|
||||
ExpireInactiveUserSessionsJob::new(now - ttl),
|
||||
)
|
||||
.await
|
||||
@@ -104,7 +104,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
|
||||
if let Some(job) = self.next(&page) {
|
||||
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
|
||||
repo.queue_job()
|
||||
.schedule_job(&mut rng, &clock, job)
|
||||
.schedule_job(&mut rng, clock, job)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
}
|
||||
@@ -117,7 +117,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
|
||||
repo.queue_job()
|
||||
.schedule_job_later(
|
||||
&mut rng,
|
||||
&clock,
|
||||
clock,
|
||||
SyncDevicesJob::new_for_id(user_id),
|
||||
clock.now() + delay,
|
||||
)
|
||||
@@ -128,7 +128,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
|
||||
}
|
||||
|
||||
repo.oauth2_session()
|
||||
.finish(&clock, edge)
|
||||
.finish(clock, edge)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
}
|
||||
@@ -168,7 +168,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
|
||||
if let Some(job) = self.next(&page) {
|
||||
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
|
||||
repo.queue_job()
|
||||
.schedule_job(&mut rng, &clock, job)
|
||||
.schedule_job(&mut rng, clock, job)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
}
|
||||
@@ -180,7 +180,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
|
||||
repo.queue_job()
|
||||
.schedule_job_later(
|
||||
&mut rng,
|
||||
&clock,
|
||||
clock,
|
||||
SyncDevicesJob::new_for_id(edge.user_id),
|
||||
clock.now() + delay,
|
||||
)
|
||||
@@ -190,7 +190,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
|
||||
}
|
||||
|
||||
repo.compat_session()
|
||||
.finish(&clock, edge)
|
||||
.finish(clock, edge)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
}
|
||||
@@ -223,14 +223,14 @@ impl RunnableJob for ExpireInactiveUserSessionsJob {
|
||||
if let Some(job) = self.next(&page) {
|
||||
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
|
||||
repo.queue_job()
|
||||
.schedule_job(&mut rng, &clock, job)
|
||||
.schedule_job(&mut rng, clock, job)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
}
|
||||
|
||||
for edge in page.edges {
|
||||
repo.browser_session()
|
||||
.finish(&clock, edge)
|
||||
.finish(clock, edge)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
}
|
||||
|
||||
@@ -44,14 +44,14 @@ impl RunnableJob for DeactivateUserJob {
|
||||
// Let's first lock & deactivate the user
|
||||
let user = repo
|
||||
.user()
|
||||
.lock(&clock, user)
|
||||
.lock(clock, user)
|
||||
.await
|
||||
.context("Failed to lock user")
|
||||
.map_err(JobError::retry)?;
|
||||
|
||||
let user = repo
|
||||
.user()
|
||||
.deactivate(&clock, user)
|
||||
.deactivate(clock, user)
|
||||
.await
|
||||
.context("Failed to deactivate user")
|
||||
.map_err(JobError::retry)?;
|
||||
@@ -60,7 +60,7 @@ impl RunnableJob for DeactivateUserJob {
|
||||
let n = repo
|
||||
.browser_session()
|
||||
.finish_bulk(
|
||||
&clock,
|
||||
clock,
|
||||
BrowserSessionFilter::new().for_user(&user).active_only(),
|
||||
)
|
||||
.await
|
||||
@@ -70,7 +70,7 @@ impl RunnableJob for DeactivateUserJob {
|
||||
let n = repo
|
||||
.oauth2_session()
|
||||
.finish_bulk(
|
||||
&clock,
|
||||
clock,
|
||||
OAuth2SessionFilter::new().for_user(&user).active_only(),
|
||||
)
|
||||
.await
|
||||
@@ -80,7 +80,7 @@ impl RunnableJob for DeactivateUserJob {
|
||||
let n = repo
|
||||
.compat_session()
|
||||
.finish_bulk(
|
||||
&clock,
|
||||
clock,
|
||||
CompatSessionFilter::new().for_user(&user).active_only(),
|
||||
)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user