diff --git a/crates/storage-pg/.sqlx/query-d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d.json b/crates/storage-pg/.sqlx/query-d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d.json new file mode 100644 index 000000000..7403c7cb7 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_delete AS (\n SELECT oauth2_authorization_grant_id\n FROM oauth2_authorization_grants\n WHERE ($1::uuid IS NULL OR oauth2_authorization_grant_id > $1)\n AND oauth2_authorization_grant_id <= $2\n ORDER BY oauth2_authorization_grant_id\n LIMIT $3\n )\n DELETE FROM oauth2_authorization_grants\n USING to_delete\n WHERE oauth2_authorization_grants.oauth2_authorization_grant_id = to_delete.oauth2_authorization_grant_id\n RETURNING oauth2_authorization_grants.oauth2_authorization_grant_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "oauth2_authorization_grant_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d" +} diff --git a/crates/storage-pg/src/oauth2/authorization_grant.rs b/crates/storage-pg/src/oauth2/authorization_grant.rs index b8af0e535..827a930ab 100644 --- a/crates/storage-pg/src/oauth2/authorization_grant.rs +++ b/crates/storage-pg/src/oauth2/authorization_grant.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2021-2024 The Matrix.org Foundation C.I.C. // @@ -448,4 +449,54 @@ impl OAuth2AuthorizationGrantRepository for PgOAuth2AuthorizationGrantRepository Ok(grant) } + + #[tracing::instrument( + name = "db.oauth2_authorization_grant.cleanup", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // `MAX(uuid)` isn't a thing in Postgres, so we can't just re-select the + // deleted rows and do a MAX on the `oauth2_authorization_grant_id`. + // Instead, we do the aggregation on the client side, which is a little + // less efficient, but good enough. + let res = sqlx::query_scalar!( + r#" + WITH to_delete AS ( + SELECT oauth2_authorization_grant_id + FROM oauth2_authorization_grants + WHERE ($1::uuid IS NULL OR oauth2_authorization_grant_id > $1) + AND oauth2_authorization_grant_id <= $2 + ORDER BY oauth2_authorization_grant_id + LIMIT $3 + ) + DELETE FROM oauth2_authorization_grants + USING to_delete + WHERE oauth2_authorization_grants.oauth2_authorization_grant_id = to_delete.oauth2_authorization_grant_id + RETURNING oauth2_authorization_grants.oauth2_authorization_grant_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } diff --git a/crates/storage/src/oauth2/authorization_grant.rs b/crates/storage/src/oauth2/authorization_grant.rs index c019f6bd7..c0f1030e3 100644 --- a/crates/storage/src/oauth2/authorization_grant.rs +++ b/crates/storage/src/oauth2/authorization_grant.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2021-2024 The Matrix.org Foundation C.I.C. // @@ -127,6 +128,30 @@ pub trait OAuth2AuthorizationGrantRepository: Send + Sync { clock: &dyn Clock, authorization_grant: AuthorizationGrant, ) -> Result; + + /// Cleanup old authorization grants + /// + /// This will delete authorization grants with IDs up to and including + /// `until`. Uses ULID cursor-based pagination for efficiency. + /// + /// Returns the number of grants deleted and the cursor for the next batch + /// + /// # Parameters + /// + /// * `since`: The cursor to start from (exclusive), or `None` to start from + /// the beginning + /// * `until`: The maximum ULID to delete (inclusive upper bound) + /// * `limit`: The maximum number of grants to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(OAuth2AuthorizationGrantRepository: @@ -163,4 +188,11 @@ repository_impl!(OAuth2AuthorizationGrantRepository: clock: &dyn Clock, authorization_grant: AuthorizationGrant, ) -> Result; + + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index f7742efd0..cf5564a7e 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -366,6 +366,14 @@ impl InsertableJob for CleanupFinishedCompatSessionsJob { const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions"; } +/// Cleanup old OAuth 2.0 authorization grants +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupOAuthAuthorizationGrantsJob; + +impl InsertableJob for CleanupOAuthAuthorizationGrantsJob { + const QUEUE_NAME: &'static str = "cleanup-oauth-authorization-grants"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index ce9d5f96f..543520642 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -12,8 +12,9 @@ use std::time::Duration; use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, - CleanupFinishedCompatSessionsJob, CleanupRevokedOAuthAccessTokensJob, - CleanupRevokedOAuthRefreshTokensJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob, + CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, + CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -319,6 +320,59 @@ impl RunnableJob for CleanupFinishedCompatSessionsJob { } } +#[async_trait] +impl RunnableJob for CleanupOAuthAuthorizationGrantsJob { + #[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove authorization grants after 7 days. They are in practice only + // valid for a short time, but keeping them around helps investigate abuse + // patterns. + let until = state.clock.now() - chrono::Duration::days(7); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted grants, and the greatest ULID processed + let (count, cursor) = repo + .oauth2_authorization_grant() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no authorization grants to clean up"); + } else { + info!(count = total, "cleaned up authorization grants"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for PruneStalePolicyDataJob { #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index e2d0734f2..7e2b1918d 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -135,6 +135,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -180,6 +181,12 @@ pub async fn init( "0 40 * * * *".parse()?, mas_storage::queue::CleanupFinishedCompatSessionsJob, ) + .add_schedule( + "cleanup-oauth-authorization-grants", + // Run this job every hour + "0 50 * * * *".parse()?, + mas_storage::queue::CleanupOAuthAuthorizationGrantsJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours