Schedule jobs through the new queue

This commit is contained in:
Quentin Gliech
2024-10-15 14:35:01 +02:00
parent b82483f936
commit 703bd743d6
30 changed files with 180 additions and 697 deletions

124
Cargo.lock generated
View File

@@ -194,44 +194,6 @@ version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775"
[[package]]
name = "apalis-core"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1deb48475efcdece1f23a0553209ee842f264c2a5e9bcc4928bfa6a15a044cde"
dependencies = [
"async-stream",
"async-trait",
"chrono",
"futures",
"graceful-shutdown",
"http",
"log",
"pin-project-lite",
"serde",
"strum 0.25.0",
"thiserror 1.0.69",
"tokio",
"tower 0.4.13",
"tracing",
"ulid",
]
[[package]]
name = "apalis-cron"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43310b7e0132f9520b09224fb6faafb32eec82a672aa79c09e46b5b488ed505b"
dependencies = [
"apalis-core",
"async-stream",
"chrono",
"cron",
"futures",
"tokio",
"tower 0.4.13",
]
[[package]]
name = "arbitrary"
version = "1.3.2"
@@ -391,7 +353,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"strum 0.26.3",
"strum",
"syn",
"thiserror 1.0.69",
]
@@ -638,7 +600,7 @@ dependencies = [
"serde_urlencoded",
"sync_wrapper 1.0.1",
"tokio",
"tower 0.5.1",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -685,7 +647,7 @@ dependencies = [
"multer",
"pin-project-lite",
"serde",
"tower 0.5.1",
"tower",
"tower-layer",
"tower-service",
]
@@ -1352,17 +1314,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "cron"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
dependencies = [
"chrono",
"nom",
"once_cell",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
@@ -2138,17 +2089,6 @@ dependencies = [
"spinning_top",
]
[[package]]
name = "graceful-shutdown"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3effbaf774a1da3462925bb182ccf975c284cf46edca5569ea93420a657af484"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "group"
version = "0.13.0"
@@ -3186,7 +3126,7 @@ dependencies = [
"serde_with",
"thiserror 2.0.3",
"tokio",
"tower 0.5.1",
"tower",
"tracing",
"ulid",
"url",
@@ -3255,7 +3195,7 @@ dependencies = [
"sqlx",
"tokio",
"tokio-util",
"tower 0.5.1",
"tower",
"tower-http",
"tracing",
"tracing-appender",
@@ -3395,7 +3335,7 @@ dependencies = [
"time",
"tokio",
"tokio-util",
"tower 0.5.1",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
@@ -3420,7 +3360,7 @@ dependencies = [
"reqwest",
"rustls-platform-verifier",
"tokio",
"tower 0.5.1",
"tower",
"tower-http",
"tracing",
"tracing-opentelemetry",
@@ -3567,7 +3507,7 @@ dependencies = [
"tokio-rustls",
"tokio-test",
"tokio-util",
"tower 0.5.1",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
@@ -3599,7 +3539,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror 2.0.3",
"tower 0.5.1",
"tower",
"tracing",
"url",
"urlencoding",
@@ -3683,7 +3623,6 @@ dependencies = [
name = "mas-storage"
version = "0.12.0"
dependencies = [
"apalis-core",
"async-trait",
"chrono",
"futures-util",
@@ -3734,8 +3673,6 @@ name = "mas-tasks"
version = "0.12.0"
dependencies = [
"anyhow",
"apalis-core",
"apalis-cron",
"async-stream",
"async-trait",
"chrono",
@@ -3759,7 +3696,7 @@ dependencies = [
"thiserror 2.0.3",
"tokio",
"tokio-util",
"tower 0.5.1",
"tower",
"tracing",
"tracing-opentelemetry",
"ulid",
@@ -3804,7 +3741,7 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-semantic-conventions",
"pin-project-lite",
"tower 0.5.1",
"tower",
"tracing",
"tracing-opentelemetry",
]
@@ -6117,35 +6054,13 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "strum"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [
"strum_macros 0.25.3",
]
[[package]]
name = "strum"
version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
dependencies = [
"strum_macros 0.26.4",
]
[[package]]
name = "strum_macros"
version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
"syn",
"strum_macros",
]
[[package]]
@@ -6483,21 +6398,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.5.1"

View File

@@ -20,9 +20,11 @@ use mas_matrix::HomeserverConnection;
use mas_matrix_synapse::SynapseConnection;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionFilter, CompatSessionRepository},
job::JobRepositoryExt,
oauth2::OAuth2SessionFilter,
queue::{DeactivateUserJob, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob},
queue::{
DeactivateUserJob, ProvisionUserJob, QueueJobRepositoryExt as _, ReactivateUserJob,
SyncDevicesJob,
},
user::{BrowserSessionFilter, UserEmailRepository, UserPasswordRepository, UserRepository},
Clock, RepositoryAccess, SystemClock,
};
@@ -365,7 +367,7 @@ impl Options {
let id = id.into();
info!(user.id = %id, "Scheduling provisioning job");
let job = ProvisionUserJob::new_for_id(id);
repo.job().schedule_job(job).await?;
repo.queue_job().schedule_job(&mut rng, &clock, job).await?;
}
repo.into_inner().commit().await?;
@@ -428,7 +430,9 @@ impl Options {
// Schedule a job to sync the devices of the user with the homeserver
warn!("Scheduling job to sync devices for the user");
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;
let txn = repo.into_inner();
if dry_run {
@@ -466,8 +470,8 @@ impl Options {
if deactivate {
warn!(%user.id, "Scheduling user deactivation");
repo.job()
.schedule_job(DeactivateUserJob::new(&user, false))
repo.queue_job()
.schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, false))
.await?;
}
@@ -490,8 +494,8 @@ impl Options {
.context("User not found")?;
warn!(%user.id, "User scheduling user reactivation");
repo.job()
.schedule_job(ReactivateUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ReactivateUserJob::new(&user))
.await?;
repo.into_inner().commit().await?;
@@ -974,7 +978,9 @@ impl UserCreationRequest<'_> {
provision_job = provision_job.set_display_name(display_name);
}
repo.job().schedule_job(provision_job).await?;
repo.queue_job()
.schedule_job(rng, clock, provision_job)
.await?;
Ok(user)
}

View File

@@ -19,10 +19,6 @@ use mas_matrix_synapse::SynapseConnection;
use mas_router::UrlBuilder;
use mas_storage::SystemClock;
use mas_storage_pg::MIGRATOR;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng,
};
use sqlx::migrate::Migrate;
use tracing::{info, info_span, warn, Instrument};
@@ -161,13 +157,8 @@ impl Options {
let mailer = mailer_from_config(&config.email, &templates)?;
mailer.test_connection().await?;
#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
let worker_name = Alphanumeric.sample_string(&mut rng, 10);
info!(worker_name, "Starting task worker");
let monitor = mas_tasks::init(
&worker_name,
info!("Starting task worker");
mas_tasks::init(
&pool,
&mailer,
homeserver_connection.clone(),
@@ -176,21 +167,6 @@ impl Options {
shutdown.task_tracker(),
)
.await?;
// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
}
let listeners_config = config.http.listeners.clone();

View File

@@ -11,10 +11,6 @@ use figment::Figment;
use mas_config::{AppConfig, ConfigurationSection};
use mas_matrix_synapse::SynapseConnection;
use mas_router::UrlBuilder;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng,
};
use tracing::{info, info_span};
use crate::{
@@ -71,13 +67,8 @@ impl Options {
drop(config);
#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
let worker_name = Alphanumeric.sample_string(&mut rng, 10);
info!(worker_name, "Starting task scheduler");
let monitor = mas_tasks::init(
&worker_name,
info!("Starting task scheduler");
mas_tasks::init(
&pool,
&mailer,
conn,
@@ -87,20 +78,6 @@ impl Options {
)
.await?;
// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
span.exit();
shutdown.run().await;

View File

@@ -8,7 +8,10 @@ use aide::{transform::TransformOperation, NoApi, OperationIo};
use axum::{extract::State, response::IntoResponse, Json};
use hyper::StatusCode;
use mas_matrix::BoxHomeserverConnection;
use mas_storage::{job::JobRepositoryExt, queue::ProvisionUserJob, BoxRng};
use mas_storage::{
queue::{ProvisionUserJob, QueueJobRepositoryExt as _},
BoxRng,
};
use schemars::JsonSchema;
use serde::Deserialize;
use tracing::warn;
@@ -161,8 +164,8 @@ pub async fn handler(
let user = repo.user().add(&mut rng, &clock, params.username).await?;
repo.job()
.schedule_job(ProvisionUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
.await?;
repo.save().await?;

View File

@@ -4,10 +4,13 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use aide::{transform::TransformOperation, OperationIo};
use aide::{transform::TransformOperation, NoApi, OperationIo};
use axum::{response::IntoResponse, Json};
use hyper::StatusCode;
use mas_storage::{job::JobRepositoryExt, queue::DeactivateUserJob};
use mas_storage::{
queue::{DeactivateUserJob, QueueJobRepositoryExt as _},
BoxRng,
};
use tracing::info;
use ulid::Ulid;
@@ -69,6 +72,7 @@ pub async fn handler(
CallContext {
mut repo, clock, ..
}: CallContext,
NoApi(mut rng): NoApi<BoxRng>,
id: UlidPathParam,
) -> Result<Json<SingleResponse<User>>, RouteError> {
let id = *id;
@@ -83,8 +87,8 @@ pub async fn handler(
}
info!("Scheduling deactivation of user {}", user.id);
repo.job()
.schedule_job(DeactivateUserJob::new(&user, true))
repo.queue_job()
.schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, true))
.await?;
repo.save().await?;
@@ -133,8 +137,9 @@ mod tests {
// It should have scheduled a deactivation job for the user
// XXX: we don't have a good way to look for the deactivation job
let job: Json<serde_json::Value> =
sqlx::query_scalar("SELECT job FROM apalis.jobs WHERE job_type = 'deactivate-user'")
let job: Json<serde_json::Value> = sqlx::query_scalar(
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
)
.fetch_one(&pool)
.await
.expect("Deactivation job to be scheduled");
@@ -174,8 +179,9 @@ mod tests {
// It should have scheduled a deactivation job for the user
// XXX: we don't have a good way to look for the deactivation job
let job: Json<serde_json::Value> =
sqlx::query_scalar("SELECT job FROM apalis.jobs WHERE job_type = 'deactivate-user'")
let job: Json<serde_json::Value> = sqlx::query_scalar(
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
)
.fetch_one(&pool)
.await
.expect("Deactivation job to be scheduled");

View File

@@ -12,9 +12,8 @@ use mas_axum_utils::sentry::SentryEventID;
use mas_data_model::TokenType;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionRepository},
job::JobRepositoryExt,
queue::SyncDevicesJob,
BoxClock, BoxRepository, Clock, RepositoryAccess,
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess,
};
use thiserror::Error;
@@ -66,6 +65,7 @@ impl IntoResponse for RouteError {
#[tracing::instrument(name = "handlers.compat.logout.post", skip_all, err)]
pub(crate) async fn post(
clock: BoxClock,
mut rng: BoxRng,
mut repo: BoxRepository,
activity_tracker: BoundActivityTracker,
maybe_authorization: Option<TypedHeader<Authorization<Bearer>>>,
@@ -105,7 +105,9 @@ pub(crate) async fn post(
.ok_or(RouteError::InvalidAuthorization)?;
// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;
repo.compat_session().finish(&clock, session).await?;

View File

@@ -7,7 +7,9 @@
use anyhow::Context as _;
use async_graphql::{Context, Enum, InputObject, Object, ID};
use mas_storage::{
compat::CompatSessionRepository, job::JobRepositoryExt, queue::SyncDevicesJob, RepositoryAccess,
compat::CompatSessionRepository,
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
RepositoryAccess,
};
use crate::graphql::{
@@ -70,6 +72,7 @@ impl CompatSessionMutations {
input: EndCompatSessionInput,
) -> Result<EndCompatSessionPayload, async_graphql::Error> {
let state = ctx.state();
let mut rng = state.rng();
let compat_session_id = NodeType::CompatSession.extract_ulid(&input.compat_session_id)?;
let requester = ctx.requester();
@@ -92,7 +95,9 @@ impl CompatSessionMutations {
.context("Could not load user")?;
// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;
let session = repo.compat_session().finish(&clock, session).await?;

View File

@@ -9,12 +9,11 @@ use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use chrono::Duration;
use mas_data_model::{Device, TokenType};
use mas_storage::{
job::JobRepositoryExt,
oauth2::{
OAuth2AccessTokenRepository, OAuth2ClientRepository, OAuth2RefreshTokenRepository,
OAuth2SessionRepository,
},
queue::SyncDevicesJob,
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
user::UserRepository,
RepositoryAccess,
};
@@ -218,6 +217,7 @@ impl OAuth2SessionMutations {
let mut repo = state.repository().await?;
let clock = state.clock();
let mut rng = state.rng();
let session = repo.oauth2_session().lookup(oauth2_session_id).await?;
let Some(session) = session else {
@@ -236,7 +236,9 @@ impl OAuth2SessionMutations {
.context("Could not load user")?;
// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;
}
let session = repo.oauth2_session().finish(&clock, session).await?;

View File

@@ -7,8 +7,7 @@
use anyhow::Context as _;
use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use mas_storage::{
job::JobRepositoryExt,
queue::{DeactivateUserJob, ProvisionUserJob},
queue::{DeactivateUserJob, ProvisionUserJob, QueueJobRepositoryExt as _},
user::UserRepository,
};
use tracing::{info, warn};
@@ -399,8 +398,8 @@ impl UserMutations {
let user = repo.user().add(&mut rng, &clock, input.username).await?;
repo.job()
.schedule_job(ProvisionUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
.await?;
repo.save().await?;
@@ -415,6 +414,8 @@ impl UserMutations {
input: LockUserInput,
) -> Result<LockUserPayload, async_graphql::Error> {
let state = ctx.state();
let clock = state.clock();
let mut rng = state.rng();
let requester = ctx.requester();
if !requester.is_admin() {
@@ -436,8 +437,8 @@ impl UserMutations {
if deactivate {
info!("Scheduling deactivation of user {}", user.id);
repo.job()
.schedule_job(DeactivateUserJob::new(&user, deactivate))
repo.queue_job()
.schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, deactivate))
.await?;
}

View File

@@ -7,8 +7,7 @@
use anyhow::Context as _;
use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use mas_storage::{
job::JobRepositoryExt,
queue::{ProvisionUserJob, VerifyEmailJob},
queue::{ProvisionUserJob, QueueJobRepositoryExt as _, VerifyEmailJob},
user::{UserEmailRepository, UserRepository},
RepositoryAccess,
};
@@ -377,6 +376,8 @@ impl UserEmailMutations {
let state = ctx.state();
let id = NodeType::User.extract_ulid(&input.user_id)?;
let requester = ctx.requester();
let clock = state.clock();
let mut rng = state.rng();
if !requester.is_owner_or_admin(&UserId(id)) {
return Err(async_graphql::Error::new("Unauthorized"));
@@ -428,9 +429,6 @@ impl UserEmailMutations {
let (added, mut user_email) = if let Some(user_email) = existing_user_email {
(false, user_email)
} else {
let clock = state.clock();
let mut rng = state.rng();
let user_email = repo
.user_email()
.add(&mut rng, &clock, &user, input.email)
@@ -448,8 +446,8 @@ impl UserEmailMutations {
.await?;
} else {
// TODO: figure out the locale
repo.job()
.schedule_job(VerifyEmailJob::new(&user_email))
repo.queue_job()
.schedule_job(&mut rng, &clock, VerifyEmailJob::new(&user_email))
.await?;
}
}
@@ -471,6 +469,8 @@ impl UserEmailMutations {
input: SendVerificationEmailInput,
) -> Result<SendVerificationEmailPayload, async_graphql::Error> {
let state = ctx.state();
let clock = state.clock();
let mut rng = state.rng();
let user_email_id = NodeType::UserEmail.extract_ulid(&input.user_email_id)?;
let requester = ctx.requester();
@@ -490,8 +490,8 @@ impl UserEmailMutations {
let needs_verification = user_email.confirmed_at.is_none();
if needs_verification {
// TODO: figure out the locale
repo.job()
.schedule_job(VerifyEmailJob::new(&user_email))
repo.queue_job()
.schedule_job(&mut rng, &clock, VerifyEmailJob::new(&user_email))
.await?;
}
@@ -516,6 +516,7 @@ impl UserEmailMutations {
let requester = ctx.requester();
let clock = state.clock();
let mut rng = state.rng();
let mut repo = state.repository().await?;
let user_email = repo
@@ -568,8 +569,8 @@ impl UserEmailMutations {
.mark_as_verified(&clock, user_email)
.await?;
repo.job()
.schedule_job(ProvisionUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
.await?;
repo.save().await?;
@@ -587,6 +588,8 @@ impl UserEmailMutations {
let user_email_id = NodeType::UserEmail.extract_ulid(&input.user_email_id)?;
let requester = ctx.requester();
let mut rng = state.rng();
let clock = state.clock();
let mut repo = state.repository().await?;
let user_email = repo.user_email().lookup(user_email_id).await?;
@@ -617,8 +620,8 @@ impl UserEmailMutations {
repo.user_email().remove(user_email.clone()).await?;
// Schedule a job to update the user
repo.job()
.schedule_job(ProvisionUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
.await?;
repo.save().await?;

View File

@@ -14,7 +14,8 @@ use mas_data_model::TokenType;
use mas_iana::oauth::OAuthTokenTypeHint;
use mas_keystore::Encrypter;
use mas_storage::{
job::JobRepositoryExt, queue::SyncDevicesJob, BoxClock, BoxRepository, RepositoryAccess,
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
};
use oauth2_types::{
errors::{ClientError, ClientErrorCode},
@@ -109,6 +110,7 @@ impl From<mas_data_model::TokenFormatError> for RouteError {
)]
pub(crate) async fn post(
clock: BoxClock,
mut rng: BoxRng,
State(http_client): State<reqwest::Client>,
mut repo: BoxRepository,
activity_tracker: BoundActivityTracker,
@@ -208,7 +210,9 @@ pub(crate) async fn post(
.ok_or(RouteError::UnknownToken)?;
// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;
}
// Now that we checked everything, we can end the session.

View File

@@ -23,8 +23,7 @@ use mas_matrix::BoxHomeserverConnection;
use mas_policy::Policy;
use mas_router::UrlBuilder;
use mas_storage::{
job::JobRepositoryExt,
queue::ProvisionUserJob,
queue::{ProvisionUserJob, QueueJobRepositoryExt as _},
upstream_oauth2::{UpstreamOAuthLinkRepository, UpstreamOAuthSessionRepository},
user::{BrowserSessionRepository, UserEmailRepository, UserRepository},
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
@@ -797,7 +796,7 @@ pub(crate) async fn post(
job = job.set_display_name(name);
}
repo.job().schedule_job(job).await?;
repo.queue_job().schedule_job(&mut rng, &clock, job).await?;
// If we have an email, add it to the user
if let Some(email) = email {

View File

@@ -17,8 +17,9 @@ use mas_data_model::SiteConfig;
use mas_policy::Policy;
use mas_router::UrlBuilder;
use mas_storage::{
job::JobRepositoryExt, queue::VerifyEmailJob, user::UserEmailRepository, BoxClock,
BoxRepository, BoxRng,
queue::{QueueJobRepositoryExt as _, VerifyEmailJob},
user::UserEmailRepository,
BoxClock, BoxRepository, BoxRng,
};
use mas_templates::{EmailAddContext, ErrorContext, TemplateContext, Templates};
use serde::Deserialize;
@@ -136,8 +137,12 @@ pub(crate) async fn post(
// If the email was not confirmed, send a confirmation email & redirect to the
// verify page
let next = if user_email.confirmed_at.is_none() {
repo.job()
.schedule_job(VerifyEmailJob::new(&user_email).with_language(locale.to_string()))
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
VerifyEmailJob::new(&user_email).with_language(locale.to_string()),
)
.await?;
let next = mas_router::AccountVerifyEmail::new(user_email.id);

View File

@@ -16,8 +16,9 @@ use mas_axum_utils::{
};
use mas_router::UrlBuilder;
use mas_storage::{
job::JobRepositoryExt, queue::ProvisionUserJob, user::UserEmailRepository, BoxClock,
BoxRepository, BoxRng, RepositoryAccess,
queue::{ProvisionUserJob, QueueJobRepositoryExt as _},
user::UserEmailRepository,
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
};
use mas_templates::{EmailVerificationPageContext, TemplateContext, Templates};
use serde::Deserialize;
@@ -93,6 +94,7 @@ pub(crate) async fn get(
)]
pub(crate) async fn post(
clock: BoxClock,
mut rng: BoxRng,
mut repo: BoxRepository,
cookie_jar: CookieJar,
State(url_builder): State<UrlBuilder>,
@@ -140,8 +142,8 @@ pub(crate) async fn post(
.mark_as_verified(&clock, user_email)
.await?;
repo.job()
.schedule_job(ProvisionUserJob::new(&session.user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&session.user))
.await?;
repo.save().await?;

View File

@@ -18,7 +18,8 @@ use mas_axum_utils::{
use mas_data_model::SiteConfig;
use mas_router::UrlBuilder;
use mas_storage::{
job::JobRepositoryExt, queue::SendAccountRecoveryEmailsJob, BoxClock, BoxRepository, BoxRng,
queue::{QueueJobRepositoryExt as _, SendAccountRecoveryEmailsJob},
BoxClock, BoxRepository, BoxRng,
};
use mas_templates::{EmptyContext, RecoveryProgressContext, TemplateContext, Templates};
use ulid::Ulid;
@@ -135,8 +136,12 @@ pub(crate) async fn post(
}
// Schedule a new batch of emails
repo.job()
.schedule_job(SendAccountRecoveryEmailsJob::new(&recovery_session))
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
SendAccountRecoveryEmailsJob::new(&recovery_session),
)
.await?;
repo.save().await?;

View File

@@ -21,7 +21,8 @@ use mas_axum_utils::{
use mas_data_model::{SiteConfig, UserAgent};
use mas_router::UrlBuilder;
use mas_storage::{
job::JobRepositoryExt, queue::SendAccountRecoveryEmailsJob, BoxClock, BoxRepository, BoxRng,
queue::{QueueJobRepositoryExt as _, SendAccountRecoveryEmailsJob},
BoxClock, BoxRepository, BoxRng,
};
use mas_templates::{
EmptyContext, FieldError, FormError, FormState, RecoveryStartContext, RecoveryStartFormField,
@@ -144,8 +145,12 @@ pub(crate) async fn post(
)
.await?;
repo.job()
.schedule_job(SendAccountRecoveryEmailsJob::new(&session))
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
SendAccountRecoveryEmailsJob::new(&session),
)
.await?;
repo.save().await?;

View File

@@ -24,8 +24,7 @@ use mas_matrix::BoxHomeserverConnection;
use mas_policy::Policy;
use mas_router::UrlBuilder;
use mas_storage::{
job::JobRepositoryExt,
queue::{ProvisionUserJob, VerifyEmailJob},
queue::{ProvisionUserJob, QueueJobRepositoryExt as _, VerifyEmailJob},
user::{BrowserSessionRepository, UserEmailRepository, UserPasswordRepository, UserRepository},
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
};
@@ -295,12 +294,16 @@ pub(crate) async fn post(
.authenticate_with_password(&mut rng, &clock, &session, &user_password)
.await?;
repo.job()
.schedule_job(VerifyEmailJob::new(&user_email).with_language(locale.to_string()))
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
VerifyEmailJob::new(&user_email).with_language(locale.to_string()),
)
.await?;
repo.job()
.schedule_job(ProvisionUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
.await?;
repo.save().await?;

View File

@@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO apalis.jobs (job, id, job_type)\n VALUES ($1::json, $2::text, $3::text)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Json",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b"
}

View File

@@ -1,67 +0,0 @@
// Copyright 2024 New Vector Ltd.
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! A module containing the PostgreSQL implementation of the [`JobRepository`].
use async_trait::async_trait;
use mas_storage::job::{JobId, JobRepository, JobSubmission};
use sqlx::PgConnection;
use crate::{DatabaseError, ExecuteExt};
/// An implementation of [`JobRepository`] for a PostgreSQL connection.
pub struct PgJobRepository<'c> {
conn: &'c mut PgConnection,
}
impl<'c> PgJobRepository<'c> {
/// Create a new [`PgJobRepository`] from an active PostgreSQL connection.
#[must_use]
pub fn new(conn: &'c mut PgConnection) -> Self {
Self { conn }
}
}
#[async_trait]
impl JobRepository for PgJobRepository<'_> {
type Error = DatabaseError;
#[tracing::instrument(
name = "db.job.schedule_submission",
skip_all,
fields(
db.query.text,
job.id,
job.name = submission.name(),
),
err,
)]
async fn schedule_submission(
&mut self,
submission: JobSubmission,
) -> Result<JobId, Self::Error> {
// XXX: This does not use the clock nor the rng
let id = JobId::new();
tracing::Span::current().record("job.id", tracing::field::display(&id));
let res = sqlx::query!(
r#"
INSERT INTO apalis.jobs (job, id, job_type)
VALUES ($1::json, $2::text, $3::text)
"#,
submission.payload(),
id.to_string(),
submission.name(),
)
.traced()
.execute(&mut *self.conn)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(id)
}
}

View File

@@ -164,7 +164,6 @@ use sqlx::migrate::Migrator;
pub mod app_session;
pub mod compat;
pub mod job;
pub mod oauth2;
pub mod queue;
pub mod upstream_oauth2;

View File

@@ -13,7 +13,6 @@ use mas_storage::{
CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository,
CompatSsoLoginRepository,
},
job::JobRepository,
oauth2::{
OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository,
OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository,
@@ -34,13 +33,12 @@ use crate::{
PgCompatAccessTokenRepository, PgCompatRefreshTokenRepository, PgCompatSessionRepository,
PgCompatSsoLoginRepository,
},
job::PgJobRepository,
oauth2::{
PgOAuth2AccessTokenRepository, PgOAuth2AuthorizationGrantRepository,
PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository,
PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository,
},
queue::worker::PgQueueWorkerRepository,
queue::{job::PgQueueJobRepository, worker::PgQueueWorkerRepository},
upstream_oauth2::{
PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository,
PgUpstreamOAuthSessionRepository,
@@ -261,13 +259,15 @@ where
Box::new(PgCompatRefreshTokenRepository::new(self.conn.as_mut()))
}
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c> {
Box::new(PgJobRepository::new(self.conn.as_mut()))
}
fn queue_worker<'c>(
&'c mut self,
) -> Box<dyn mas_storage::queue::QueueWorkerRepository<Error = Self::Error> + 'c> {
Box::new(PgQueueWorkerRepository::new(self.conn.as_mut()))
}
fn queue_job<'c>(
&'c mut self,
) -> Box<dyn mas_storage::queue::QueueJobRepository<Error = Self::Error> + 'c> {
Box::new(PgQueueJobRepository::new(self.conn.as_mut()))
}
}

View File

@@ -14,18 +14,16 @@ workspace = true
[dependencies]
async-trait.workspace = true
chrono.workspace = true
thiserror.workspace = true
futures-util.workspace = true
apalis-core = { version = "0.4.9", features = ["tokio-comp"] }
opentelemetry.workspace = true
rand_core = "0.6.4"
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
thiserror.workspace = true
tracing-opentelemetry.workspace = true
url.workspace = true
tracing.workspace = true
ulid.workspace = true
url.workspace = true
oauth2-types.workspace = true
mas-data-model.workspace = true

View File

@@ -1,221 +0,0 @@
// Copyright 2024 New Vector Ltd.
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! Repository to schedule persistent jobs.
use std::{num::ParseIntError, ops::Deref};
pub use apalis_core::job::{Job, JobId};
use async_trait::async_trait;
use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::repository_impl;
/// A job submission to be scheduled through the repository.
pub struct JobSubmission {
name: &'static str,
payload: Value,
}
#[derive(Serialize, Deserialize)]
struct SerializableSpanContext {
trace_id: String,
span_id: String,
trace_flags: u8,
}
impl From<&SpanContext> for SerializableSpanContext {
fn from(value: &SpanContext) -> Self {
Self {
trace_id: value.trace_id().to_string(),
span_id: value.span_id().to_string(),
trace_flags: value.trace_flags().to_u8(),
}
}
}
impl TryFrom<&SerializableSpanContext> for SpanContext {
type Error = ParseIntError;
fn try_from(value: &SerializableSpanContext) -> Result<Self, Self::Error> {
Ok(SpanContext::new(
TraceId::from_hex(&value.trace_id)?,
SpanId::from_hex(&value.span_id)?,
TraceFlags::new(value.trace_flags),
// XXX: is that fine?
true,
TraceState::default(),
))
}
}
/// A wrapper for [`Job`] which adds the span context in the payload.
#[derive(Serialize, Deserialize)]
pub struct JobWithSpanContext<T> {
#[serde(skip_serializing_if = "Option::is_none")]
span_context: Option<SerializableSpanContext>,
#[serde(flatten)]
payload: T,
}
impl<J> From<J> for JobWithSpanContext<J> {
fn from(payload: J) -> Self {
Self {
span_context: None,
payload,
}
}
}
impl<J: Job> Job for JobWithSpanContext<J> {
const NAME: &'static str = J::NAME;
}
impl<T> Deref for JobWithSpanContext<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.payload
}
}
impl<T> JobWithSpanContext<T> {
/// Get the span context of the job.
///
/// # Returns
///
/// Returns [`None`] if the job has no span context, or if the span context
/// is invalid.
#[must_use]
pub fn span_context(&self) -> Option<SpanContext> {
self.span_context
.as_ref()
.and_then(|ctx| ctx.try_into().ok())
}
}
impl JobSubmission {
/// Create a new job submission out of a [`Job`].
///
/// # Panics
///
/// Panics if the job cannot be serialized.
#[must_use]
pub fn new<J: Job + Serialize>(job: J) -> Self {
let payload = serde_json::to_value(job).expect("Could not serialize job");
Self {
name: J::NAME,
payload,
}
}
/// Create a new job submission out of a [`Job`] and a [`SpanContext`].
///
/// # Panics
///
/// Panics if the job cannot be serialized.
#[must_use]
pub fn new_with_span_context<J: Job + Serialize>(job: J, span_context: &SpanContext) -> Self {
// Serialize the span context alongside the job.
let span_context = SerializableSpanContext::from(span_context);
Self::new(JobWithSpanContext {
payload: job,
span_context: Some(span_context),
})
}
/// The name of the job.
#[must_use]
pub fn name(&self) -> &'static str {
self.name
}
/// The payload of the job.
#[must_use]
pub fn payload(&self) -> &Value {
&self.payload
}
}
/// A [`JobRepository`] is used to schedule jobs to be executed by a worker.
#[async_trait]
pub trait JobRepository: Send + Sync {
/// The error type returned by the repository.
type Error;
/// Schedule a job submission to be executed at a later time.
///
/// # Parameters
///
/// * `submission` - The job to schedule.
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn schedule_submission(
&mut self,
submission: JobSubmission,
) -> Result<JobId, Self::Error>;
}
repository_impl!(JobRepository:
async fn schedule_submission(&mut self, submission: JobSubmission) -> Result<JobId, Self::Error>;
);
/// An extension trait for [`JobRepository`] to schedule jobs directly.
#[async_trait]
pub trait JobRepositoryExt {
/// The error type returned by the repository.
type Error;
/// Schedule a job to be executed at a later time.
///
/// # Parameters
///
/// * `job` - The job to schedule.
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn schedule_job<J: Job + Serialize + Send>(
&mut self,
job: J,
) -> Result<JobId, Self::Error>;
}
#[async_trait]
impl<T> JobRepositoryExt for T
where
T: JobRepository + ?Sized,
{
type Error = T::Error;
#[tracing::instrument(
name = "db.job.schedule_job",
skip_all,
fields(
job.name = J::NAME,
),
)]
async fn schedule_job<J: Job + Serialize + Send>(
&mut self,
job: J,
) -> Result<JobId, Self::Error> {
let span = tracing::Span::current();
let ctx = span.context();
let span = ctx.span();
let span_context = span.span_context();
self.schedule_submission(JobSubmission::new_with_span_context(job, span_context))
.await
}
}

View File

@@ -118,7 +118,6 @@ mod utils;
pub mod app_session;
pub mod compat;
pub mod job;
pub mod oauth2;
pub mod queue;
pub mod upstream_oauth2;

View File

@@ -46,11 +46,6 @@ impl VerifyEmailJob {
}
}
// Implemented for compatibility
impl apalis_core::job::Job for VerifyEmailJob {
const NAME: &'static str = "verify-email";
}
impl InsertableJob for VerifyEmailJob {
const QUEUE_NAME: &'static str = "verify-email";
}
@@ -101,11 +96,6 @@ impl ProvisionUserJob {
}
}
// Implemented for compatibility
impl apalis_core::job::Job for ProvisionUserJob {
const NAME: &'static str = "provision-user";
}
impl InsertableJob for ProvisionUserJob {
const QUEUE_NAME: &'static str = "provision-user";
}
@@ -134,11 +124,6 @@ impl ProvisionDeviceJob {
}
}
// Implemented for compatibility with older versions
impl apalis_core::job::Job for ProvisionDeviceJob {
const NAME: &'static str = "provision-device";
}
impl InsertableJob for ProvisionDeviceJob {
const QUEUE_NAME: &'static str = "provision-device";
}
@@ -176,11 +161,6 @@ impl DeleteDeviceJob {
}
}
// Implemented for compatibility with older versions
impl apalis_core::job::Job for DeleteDeviceJob {
const NAME: &'static str = "delete-device";
}
impl InsertableJob for DeleteDeviceJob {
const QUEUE_NAME: &'static str = "delete-device";
}
@@ -206,11 +186,6 @@ impl SyncDevicesJob {
}
}
// Implemented for compatibility with older versions
impl apalis_core::job::Job for SyncDevicesJob {
const NAME: &'static str = "sync-devices";
}
impl InsertableJob for SyncDevicesJob {
const QUEUE_NAME: &'static str = "sync-devices";
}
@@ -250,11 +225,6 @@ impl DeactivateUserJob {
}
}
// Implemented for compatibility with older versions
impl apalis_core::job::Job for DeactivateUserJob {
const NAME: &'static str = "deactivate-user";
}
impl InsertableJob for DeactivateUserJob {
const QUEUE_NAME: &'static str = "deactivate-user";
}
@@ -283,11 +253,6 @@ impl ReactivateUserJob {
}
}
// Implemented for compatibility with older versions
impl apalis_core::job::Job for ReactivateUserJob {
const NAME: &'static str = "reactivate-user";
}
impl InsertableJob for ReactivateUserJob {
const QUEUE_NAME: &'static str = "reactivate-user";
}
@@ -320,11 +285,6 @@ impl SendAccountRecoveryEmailsJob {
}
}
// Implemented for compatibility with older versions
impl apalis_core::job::Job for SendAccountRecoveryEmailsJob {
const NAME: &'static str = "send-account-recovery-email";
}
impl InsertableJob for SendAccountRecoveryEmailsJob {
const QUEUE_NAME: &'static str = "send-account-recovery-email";
}

View File

@@ -13,12 +13,11 @@ use crate::{
CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository,
CompatSsoLoginRepository,
},
job::JobRepository,
oauth2::{
OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository,
OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository,
},
queue::QueueWorkerRepository,
queue::{QueueJobRepository, QueueWorkerRepository},
upstream_oauth2::{
UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository,
UpstreamOAuthSessionRepository,
@@ -190,11 +189,11 @@ pub trait RepositoryAccess: Send {
&'c mut self,
) -> Box<dyn CompatRefreshTokenRepository<Error = Self::Error> + 'c>;
/// Get a [`JobRepository`]
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c>;
/// Get a [`QueueWorkerRepository`]
fn queue_worker<'c>(&'c mut self) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c>;
/// Get a [`QueueJobRepository`]
fn queue_job<'c>(&'c mut self) -> Box<dyn QueueJobRepository<Error = Self::Error> + 'c>;
}
/// Implementations of the [`RepositoryAccess`], [`RepositoryTransaction`] and
@@ -209,13 +208,12 @@ mod impls {
CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository,
CompatSsoLoginRepository,
},
job::JobRepository,
oauth2::{
OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository,
OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository,
OAuth2SessionRepository,
},
queue::QueueWorkerRepository,
queue::{QueueJobRepository, QueueWorkerRepository},
upstream_oauth2::{
UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository,
UpstreamOAuthSessionRepository,
@@ -407,15 +405,15 @@ mod impls {
))
}
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c> {
Box::new(MapErr::new(self.inner.job(), &mut self.mapper))
}
fn queue_worker<'c>(
&'c mut self,
) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c> {
Box::new(MapErr::new(self.inner.queue_worker(), &mut self.mapper))
}
fn queue_job<'c>(&'c mut self) -> Box<dyn QueueJobRepository<Error = Self::Error> + 'c> {
Box::new(MapErr::new(self.inner.queue_job(), &mut self.mapper))
}
}
impl<R: RepositoryAccess + ?Sized> RepositoryAccess for Box<R> {
@@ -535,14 +533,14 @@ mod impls {
(**self).compat_refresh_token()
}
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c> {
(**self).job()
}
fn queue_worker<'c>(
&'c mut self,
) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c> {
(**self).queue_worker()
}
fn queue_job<'c>(&'c mut self) -> Box<dyn QueueJobRepository<Error = Self::Error> + 'c> {
(**self).queue_job()
}
}
}

View File

@@ -13,12 +13,6 @@ workspace = true
[dependencies]
anyhow.workspace = true
apalis-core = { version = "0.4.9", features = [
"extensions",
"tokio-comp",
"storage",
] }
apalis-cron = "0.4.9"
async-stream = "0.3.6"
async-trait.workspace = true
chrono.workspace = true

View File

@@ -4,9 +4,10 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
#![allow(dead_code)]
use std::sync::Arc;
use apalis_core::{executor::TokioExecutor, layers::extensions::Extension, monitor::Monitor};
use mas_email::Mailer;
use mas_matrix::HomeserverConnection;
use mas_router::UrlBuilder;
@@ -16,18 +17,15 @@ use new_queue::QueueRunnerError;
use rand::SeedableRng;
use sqlx::{Pool, Postgres};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::debug;
use crate::storage::PostgresStorageFactory;
mod database;
mod email;
mod matrix;
// mod database;
// mod email;
// mod matrix;
mod new_queue;
mod recovery;
mod storage;
mod user;
mod utils;
// mod recovery;
// mod storage;
// mod user;
// mod utils;
#[derive(Clone)]
struct State {
@@ -55,10 +53,6 @@ impl State {
}
}
pub fn inject(&self) -> Extension<Self> {
Extension(self.clone())
}
pub fn pool(&self) -> &Pool<Postgres> {
&self.pool
}
@@ -95,58 +89,19 @@ impl State {
}
}
trait JobContextExt {
fn state(&self) -> State;
}
impl JobContextExt for apalis_core::context::JobContext {
fn state(&self) -> State {
self.data_opt::<State>()
.expect("state not injected in job context")
.clone()
}
}
/// Helper macro to build a storage-backed worker.
macro_rules! build {
($job:ty => $fn:ident, $suffix:expr, $state:expr, $factory:expr) => {{
let storage = $factory.build();
let worker_name = format!(
"{job}-{suffix}",
job = <$job as ::apalis_core::job::Job>::NAME,
suffix = $suffix
);
let builder = ::apalis_core::builder::WorkerBuilder::new(worker_name)
.layer($state.inject())
.layer(crate::utils::trace_layer())
.layer(crate::utils::metrics_layer());
let builder = ::apalis_core::storage::builder::WithStorage::with_storage_config(
builder,
storage,
|c| c.fetch_interval(std::time::Duration::from_secs(1)),
);
::apalis_core::builder::WorkerFactory::build(builder, ::apalis_core::job_fn::job_fn($fn))
}};
}
pub(crate) use build;
/// Initialise the workers.
///
/// # Errors
///
/// This function can fail if the database connection fails.
pub async fn init(
name: &str,
pool: &Pool<Postgres>,
mailer: &Mailer,
homeserver: impl HomeserverConnection<Error = anyhow::Error> + 'static,
url_builder: UrlBuilder,
cancellation_token: CancellationToken,
task_tracker: &TaskTracker,
) -> Result<Monitor<TokioExecutor>, QueueRunnerError> {
) -> Result<(), QueueRunnerError> {
let state = State::new(
pool.clone(),
SystemClock::default(),
@@ -154,21 +109,6 @@ pub async fn init(
homeserver,
url_builder,
);
let factory = PostgresStorageFactory::new(pool.clone());
let monitor = Monitor::new().executor(TokioExecutor::new());
let monitor = self::database::register(name, monitor, &state);
let monitor = self::email::register(name, monitor, &state, &factory);
let monitor = self::matrix::register(name, monitor, &state, &factory);
let monitor = self::user::register(name, monitor, &state, &factory);
let monitor = self::recovery::register(name, monitor, &state, &factory);
// TODO: we might want to grab the join handle here
// TODO: this error isn't right, I just want that to compile
factory
.listen()
.await
.map_err(QueueRunnerError::SetupListener)?;
debug!(?monitor, "workers registered");
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;
task_tracker.spawn(async move {
@@ -180,5 +120,5 @@ pub async fn init(
}
});
Ok(monitor)
Ok(())
}

View File

@@ -69,13 +69,8 @@ skip = [
{ name = "heck", version = "0.4.1" },
# wasmtime -> cranelift is depending on this old version
{ name = "itertools", version = "0.12.1" },
# apalis-core depends on this old version
{ name = "strum", version = "0.25.0" },
{ name = "strum_macros", version = "0.25.0" },
# For some reason, axum-core depends on this old version, even though axum is on the new one
{ name = "sync_wrapper", version = "0.1.2" },
# `apalis` depends on this old version of tower
{ name = "tower", version = "0.4.13" },
]
skip-tree = []