diff --git a/crates/storage-pg/.sqlx/query-e02ea83d195cb58fa8525e66a6ac1dddae3f1dfb1ef48494f6aee3fd03abe6f6.json b/crates/storage-pg/.sqlx/query-e02ea83d195cb58fa8525e66a6ac1dddae3f1dfb1ef48494f6aee3fd03abe6f6.json new file mode 100644 index 000000000..3fe700729 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-e02ea83d195cb58fa8525e66a6ac1dddae3f1dfb1ef48494f6aee3fd03abe6f6.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n to_delete AS (\n SELECT user_email_authentication_id\n FROM user_email_authentications\n WHERE ($1::uuid IS NULL OR user_email_authentication_id > $1)\n AND user_email_authentication_id <= $2\n ORDER BY user_email_authentication_id\n LIMIT $3\n ),\n deleted_codes AS (\n DELETE FROM user_email_authentication_codes\n USING to_delete\n WHERE user_email_authentication_codes.user_email_authentication_id = to_delete.user_email_authentication_id\n RETURNING user_email_authentication_codes.user_email_authentication_code_id\n )\n DELETE FROM user_email_authentications\n USING to_delete\n WHERE user_email_authentications.user_email_authentication_id = to_delete.user_email_authentication_id\n RETURNING user_email_authentications.user_email_authentication_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_email_authentication_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "e02ea83d195cb58fa8525e66a6ac1dddae3f1dfb1ef48494f6aee3fd03abe6f6" +} diff --git a/crates/storage-pg/src/user/email.rs b/crates/storage-pg/src/user/email.rs index 05122ac7a..501807058 100644 --- a/crates/storage-pg/src/user/email.rs +++ b/crates/storage-pg/src/user/email.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -757,4 +758,60 @@ impl UserEmailRepository for PgUserEmailRepository<'_> { user_email_authentication.completed_at = Some(completed_at); Ok(user_email_authentication) } + + #[tracing::instrument( + name = "db.user_email.cleanup_authentications", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup_authentications( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // Use ULID cursor-based pagination. Since ULIDs contain a timestamp, + // we can efficiently delete old authentications without needing an index. + // `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side. + let res = sqlx::query_scalar!( + r#" + WITH + to_delete AS ( + SELECT user_email_authentication_id + FROM user_email_authentications + WHERE ($1::uuid IS NULL OR user_email_authentication_id > $1) + AND user_email_authentication_id <= $2 + ORDER BY user_email_authentication_id + LIMIT $3 + ), + deleted_codes AS ( + DELETE FROM user_email_authentication_codes + USING to_delete + WHERE user_email_authentication_codes.user_email_authentication_id = to_delete.user_email_authentication_id + RETURNING user_email_authentication_codes.user_email_authentication_code_id + ) + DELETE FROM user_email_authentications + USING to_delete + WHERE user_email_authentications.user_email_authentication_id = to_delete.user_email_authentication_id + RETURNING user_email_authentications.user_email_authentication_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index c331fff4b..7a1ffbba6 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -390,6 +390,14 @@ impl InsertableJob for CleanupUserRecoverySessionsJob { const QUEUE_NAME: &'static str = "cleanup-user-recovery-sessions"; } +/// Cleanup old user email authentications +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupUserEmailAuthenticationsJob; + +impl InsertableJob for CleanupUserEmailAuthenticationsJob { + const QUEUE_NAME: &'static str = "cleanup-user-email-authentications"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/storage/src/user/email.rs b/crates/storage/src/user/email.rs index f73414130..eadf3dfb9 100644 --- a/crates/storage/src/user/email.rs +++ b/crates/storage/src/user/email.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -334,6 +335,32 @@ pub trait UserEmailRepository: Send + Sync { authentication: UserEmailAuthentication, upstream_oauth_authorization_session: &UpstreamOAuthAuthorizationSession, ) -> Result; + + /// Cleanup old email authentications + /// + /// This will delete email authentications with IDs up to and including + /// `until`. Uses ULID cursor-based pagination for efficiency. + /// Authentication codes will cascade-delete automatically. + /// + /// Returns the number of authentications deleted and the cursor for the + /// next batch + /// + /// # Parameters + /// + /// * `since`: The cursor to start from (exclusive), or `None` to start from + /// the beginning + /// * `until`: The maximum ULID to delete (inclusive upper bound) + /// * `limit`: The maximum number of authentications to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_authentications( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(UserEmailRepository: @@ -409,4 +436,11 @@ repository_impl!(UserEmailRepository: authentication: UserEmailAuthentication, upstream_oauth_authorization_session: &UpstreamOAuthAuthorizationSession, ) -> Result; + + async fn cleanup_authentications( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index a58885329..28c825c4c 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -14,8 +14,8 @@ use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob, - CleanupRevokedOAuthRefreshTokensJob, CleanupUserRecoverySessionsJob, - CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupRevokedOAuthRefreshTokensJob, CleanupUserEmailAuthenticationsJob, + CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -273,6 +273,60 @@ impl RunnableJob for CleanupUserRecoverySessionsJob { } } +#[async_trait] +impl RunnableJob for CleanupUserEmailAuthenticationsJob { + #[tracing::instrument(name = "job.cleanup_user_email_authentications", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove email authentications after 7 days. They are in practice only + // valid for a short time (codes expire after 10 minutes), but keeping + // them around helps investigate abuse patterns. + let until = state.clock.now() - chrono::Duration::days(7); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted authentications, and the greatest ULID + // processed + let (count, cursor) = repo + .user_email() + .cleanup_authentications(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user email authentications to clean up"); + } else { + info!(count = total, "cleaned up user email authentications"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for CleanupUserRegistrationsJob { #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index fa5d4f188..a830a207f 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -138,6 +138,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -201,6 +202,12 @@ pub async fn init( "0 56 * * * *".parse()?, mas_storage::queue::CleanupUserRecoverySessionsJob, ) + .add_schedule( + "cleanup-user-email-authentications", + // Run this job every hour + "0 57 * * * *".parse()?, + mas_storage::queue::CleanupUserEmailAuthenticationsJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours