From 3fa53d285e3bc7686702878cb2e4fefe23a4292d Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 14 Jan 2026 13:23:28 +0100 Subject: [PATCH] Cleanup old user registrations from the database --- ...f61efbeb1ec24e2c694e1673347bae993762d.json | 24 +++++++++ crates/storage-pg/src/user/registration.rs | 48 +++++++++++++++++ crates/storage/src/queue/tasks.rs | 8 +++ crates/storage/src/user/registration.rs | 28 ++++++++++ crates/tasks/src/database.rs | 54 ++++++++++++++++++- crates/tasks/src/lib.rs | 7 +++ 6 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 crates/storage-pg/.sqlx/query-a50eb326c3522f971f6ee7e13dff61efbeb1ec24e2c694e1673347bae993762d.json diff --git a/crates/storage-pg/.sqlx/query-a50eb326c3522f971f6ee7e13dff61efbeb1ec24e2c694e1673347bae993762d.json b/crates/storage-pg/.sqlx/query-a50eb326c3522f971f6ee7e13dff61efbeb1ec24e2c694e1673347bae993762d.json new file mode 100644 index 000000000..f1e857508 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-a50eb326c3522f971f6ee7e13dff61efbeb1ec24e2c694e1673347bae993762d.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_delete AS (\n SELECT user_registration_id\n FROM user_registrations\n WHERE ($1::uuid IS NULL OR user_registration_id > $1)\n AND user_registration_id <= $2\n ORDER BY user_registration_id\n LIMIT $3\n )\n DELETE FROM user_registrations\n USING to_delete\n WHERE user_registrations.user_registration_id = to_delete.user_registration_id\n RETURNING user_registrations.user_registration_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_registration_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "a50eb326c3522f971f6ee7e13dff61efbeb1ec24e2c694e1673347bae993762d" +} diff --git a/crates/storage-pg/src/user/registration.rs b/crates/storage-pg/src/user/registration.rs index 69f14ff93..fff1dd0ad 100644 --- a/crates/storage-pg/src/user/registration.rs +++ b/crates/storage-pg/src/user/registration.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -469,6 +470,53 @@ impl UserRegistrationRepository for PgUserRegistrationRepository<'_> { Ok(user_registration) } + + #[tracing::instrument( + name = "db.user_registration.cleanup", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // `MAX(uuid)` isn't a thing in Postgres, so we can't just re-select the + // deleted rows and do a MAX on the `user_registration_id`. + // Instead, we do the aggregation on the client side, which is a little + // less efficient, but good enough. + let res = sqlx::query_scalar!( + r#" + WITH to_delete AS ( + SELECT user_registration_id + FROM user_registrations + WHERE ($1::uuid IS NULL OR user_registration_id > $1) + AND user_registration_id <= $2 + ORDER BY user_registration_id + LIMIT $3 + ) + DELETE FROM user_registrations + USING to_delete + WHERE user_registrations.user_registration_id = to_delete.user_registration_id + RETURNING user_registrations.user_registration_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } #[cfg(test)] diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 7abfd13c8..a7326eb35 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -350,6 +350,14 @@ impl InsertableJob for CleanupConsumedOAuthRefreshTokensJob { const QUEUE_NAME: &'static str = "cleanup-consumed-oauth-refresh-tokens"; } +/// Cleanup old user registrations +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupUserRegistrationsJob; + +impl InsertableJob for CleanupUserRegistrationsJob { + const QUEUE_NAME: &'static str = "cleanup-user-registrations"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/storage/src/user/registration.rs b/crates/storage/src/user/registration.rs index 77c85b932..dc2a73f9e 100644 --- a/crates/storage/src/user/registration.rs +++ b/crates/storage/src/user/registration.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -199,6 +200,27 @@ pub trait UserRegistrationRepository: Send + Sync { clock: &dyn Clock, user_registration: UserRegistration, ) -> Result; + + /// Cleanup [`UserRegistration`]s between the given IDs. + /// + /// Returns the number of registrations deleted, as well as the ID of the + /// last registration deleted. + /// + /// # Parameters + /// + /// * `since`: An optional ID to start from + /// * `until`: The ID until which to clean up registrations + /// * `limit`: The maximum number of registrations to clean up + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(UserRegistrationRepository: @@ -248,4 +270,10 @@ repository_impl!(UserRegistrationRepository: clock: &dyn Clock, user_registration: UserRegistration, ) -> Result; + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 9e6e67658..3c074c503 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -13,9 +13,10 @@ use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, - PruneStalePolicyDataJob, + CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; +use ulid::Ulid; use crate::{ State, @@ -214,6 +215,57 @@ impl RunnableJob for CleanupConsumedOAuthRefreshTokensJob { } } +#[async_trait] +impl RunnableJob for CleanupUserRegistrationsJob { + #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove user registrations after 24h. They are in practice only valid for 1h + let until = state.clock.now() - chrono::Duration::hours(24); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted registrations, and the greatest ULID + // processed + let (count, cursor) = repo + .user_registration() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user registrations to clean up"); + } else { + info!(count = total, "cleaned up user registrations"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(60)) + } +} + #[async_trait] impl RunnableJob for PruneStalePolicyDataJob { #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index f2570049b..f0e2e0f98 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -133,6 +133,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -166,6 +167,12 @@ pub async fn init( "0 20 * * * *".parse()?, mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob, ) + .add_schedule( + "cleanup-user-registrations", + // Run this job every hour + "0 30 * * * *".parse()?, + mas_storage::queue::CleanupUserRegistrationsJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours