diff --git a/crates/storage-pg/.sqlx/query-67b4a124ea3e12902dc4256cf95950508f7eb24f4c9d62b34815c8e8940e4676.json b/crates/storage-pg/.sqlx/query-67b4a124ea3e12902dc4256cf95950508f7eb24f4c9d62b34815c8e8940e4676.json new file mode 100644 index 000000000..b9a82c629 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-67b4a124ea3e12902dc4256cf95950508f7eb24f4c9d62b34815c8e8940e4676.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_delete AS (\n SELECT upstream_oauth_link_id\n FROM upstream_oauth_links\n WHERE user_id IS NULL\n AND ($1::uuid IS NULL OR upstream_oauth_link_id > $1)\n AND upstream_oauth_link_id <= $2\n ORDER BY upstream_oauth_link_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_links\n USING to_delete\n WHERE upstream_oauth_links.upstream_oauth_link_id = to_delete.upstream_oauth_link_id\n RETURNING upstream_oauth_links.upstream_oauth_link_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "upstream_oauth_link_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "67b4a124ea3e12902dc4256cf95950508f7eb24f4c9d62b34815c8e8940e4676" +} diff --git a/crates/storage-pg/.sqlx/query-da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35.json b/crates/storage-pg/.sqlx/query-da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35.json new file mode 100644 index 000000000..2ced2b555 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_delete AS (\n SELECT upstream_oauth_authorization_session_id\n FROM upstream_oauth_authorization_sessions\n WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)\n AND upstream_oauth_authorization_session_id <= $2\n ORDER BY upstream_oauth_authorization_session_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_authorization_sessions\n USING to_delete\n WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id\n RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "upstream_oauth_authorization_session_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35" +} diff --git a/crates/storage-pg/migrations/20260116000002_idx_upstream_oauth_links_orphaned.sql b/crates/storage-pg/migrations/20260116000002_idx_upstream_oauth_links_orphaned.sql new file mode 100644 index 000000000..97d324a27 --- /dev/null +++ b/crates/storage-pg/migrations/20260116000002_idx_upstream_oauth_links_orphaned.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Add partial index for cleanup of orphaned upstream OAuth links +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_upstream_oauth_links_orphaned + ON upstream_oauth_links (upstream_oauth_link_id) + WHERE user_id IS NULL; diff --git a/crates/storage-pg/src/upstream_oauth2/link.rs b/crates/storage-pg/src/upstream_oauth2/link.rs index c43dd8a18..720d0f251 100644 --- a/crates/storage-pg/src/upstream_oauth2/link.rs +++ b/crates/storage-pg/src/upstream_oauth2/link.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -442,4 +443,54 @@ impl UpstreamOAuthLinkRepository for PgUpstreamOAuthLinkRepository<'_> { Ok(()) } + + #[tracing::instrument( + name = "db.upstream_oauth_link.cleanup_orphaned", + skip_all, + fields( + db.query.text, + since, + until, + limit, + ), + err, + )] + async fn cleanup_orphaned( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // Use ULID cursor-based pagination for orphaned links only. + // We only delete links that have no user associated with them. + // `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side. + let res = sqlx::query_scalar!( + r#" + WITH to_delete AS ( + SELECT upstream_oauth_link_id + FROM upstream_oauth_links + WHERE user_id IS NULL + AND ($1::uuid IS NULL OR upstream_oauth_link_id > $1) + AND upstream_oauth_link_id <= $2 + ORDER BY upstream_oauth_link_id + LIMIT $3 + ) + DELETE FROM upstream_oauth_links + USING to_delete + WHERE upstream_oauth_links.upstream_oauth_link_id = to_delete.upstream_oauth_link_id + RETURNING upstream_oauth_links.upstream_oauth_link_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } diff --git a/crates/storage-pg/src/upstream_oauth2/session.rs b/crates/storage-pg/src/upstream_oauth2/session.rs index b961c4f8c..ebd45cb45 100644 --- a/crates/storage-pg/src/upstream_oauth2/session.rs +++ b/crates/storage-pg/src/upstream_oauth2/session.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -564,4 +565,53 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> { .try_into() .map_err(DatabaseError::to_invalid_operation) } + + #[tracing::instrument( + name = "db.upstream_oauth_authorization_session.cleanup", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // Use ULID cursor-based pagination for pending sessions only. + // We only delete sessions that are not yet completed. + // `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side. + let res = sqlx::query_scalar!( + r#" + WITH to_delete AS ( + SELECT upstream_oauth_authorization_session_id + FROM upstream_oauth_authorization_sessions + WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1) + AND upstream_oauth_authorization_session_id <= $2 + ORDER BY upstream_oauth_authorization_session_id + LIMIT $3 + ) + DELETE FROM upstream_oauth_authorization_sessions + USING to_delete + WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id + RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 7a1ffbba6..500a7eced 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -398,6 +398,22 @@ impl InsertableJob for CleanupUserEmailAuthenticationsJob { const QUEUE_NAME: &'static str = "cleanup-user-email-authentications"; } +/// Cleanup old pending upstream OAuth authorization sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupUpstreamOAuthSessionsJob; + +impl InsertableJob for CleanupUpstreamOAuthSessionsJob { + const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-sessions"; +} + +/// Cleanup orphaned upstream OAuth links +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupUpstreamOAuthLinksJob; + +impl InsertableJob for CleanupUpstreamOAuthLinksJob { + const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-links"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/storage/src/upstream_oauth2/link.rs b/crates/storage/src/upstream_oauth2/link.rs index 092ad8d31..27e3ca677 100644 --- a/crates/storage/src/upstream_oauth2/link.rs +++ b/crates/storage/src/upstream_oauth2/link.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -216,6 +217,30 @@ pub trait UpstreamOAuthLinkRepository: Send + Sync { clock: &dyn Clock, upstream_oauth_link: UpstreamOAuthLink, ) -> Result<(), Self::Error>; + + /// Cleanup orphaned upstream OAuth links + /// + /// This will delete orphaned links (where `user_id IS NULL`) with IDs up to + /// and including `until`. Uses ULID cursor-based pagination for efficiency. + /// + /// Returns the number of links deleted and the cursor for the next batch + /// + /// # Parameters + /// + /// * `since`: The cursor to start from (exclusive), or `None` to start from + /// the beginning + /// * `until`: The maximum ULID to delete (inclusive upper bound) + /// * `limit`: The maximum number of links to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_orphaned( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(UpstreamOAuthLinkRepository: @@ -251,4 +276,11 @@ repository_impl!(UpstreamOAuthLinkRepository: async fn count(&mut self, filter: UpstreamOAuthLinkFilter<'_>) -> Result; async fn remove(&mut self, clock: &dyn Clock, upstream_oauth_link: UpstreamOAuthLink) -> Result<(), Self::Error>; + + async fn cleanup_orphaned( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); diff --git a/crates/storage/src/upstream_oauth2/session.rs b/crates/storage/src/upstream_oauth2/session.rs index f9a902ec2..04c5a90f1 100644 --- a/crates/storage/src/upstream_oauth2/session.rs +++ b/crates/storage/src/upstream_oauth2/session.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -205,6 +206,29 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>) -> Result; + + /// Cleanup old authorization sessions + /// + /// This will delete sessions with IDs up to and including `until`. + /// + /// Returns the number of sessions deleted and the cursor for the next batch + /// + /// # Parameters + /// + /// * `since`: The cursor to start from (exclusive), or `None` to start from + /// the beginning + /// * `until`: The maximum ULID to delete (inclusive upper bound) + /// * `limit`: The maximum number of sessions to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(UpstreamOAuthSessionRepository: @@ -247,4 +271,11 @@ repository_impl!(UpstreamOAuthSessionRepository: ) -> Result, Self::Error>; async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>) -> Result; + + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 28c825c4c..fe92e5a99 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -14,7 +14,8 @@ use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob, - CleanupRevokedOAuthRefreshTokensJob, CleanupUserEmailAuthenticationsJob, + CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, + CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; @@ -327,6 +328,94 @@ impl RunnableJob for CleanupUserEmailAuthenticationsJob { } } +#[async_trait] +impl RunnableJob for CleanupUpstreamOAuthSessionsJob { + #[tracing::instrument(name = "job.cleanup_upstream_oauth_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove pending upstream OAuth authorization sessions after 7 days. + let until = state.clock.now() - chrono::Duration::days(7); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .upstream_oauth_session() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no pending upstream OAuth sessions to clean up"); + } else { + info!(count = total, "cleaned up pending upstream OAuth sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupUpstreamOAuthLinksJob { + #[tracing::instrument(name = "job.cleanup_upstream_oauth_links", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove orphaned upstream OAuth links after 7 days. + let until = state.clock.now() - chrono::Duration::days(7); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .upstream_oauth_link() + .cleanup_orphaned(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no orphaned upstream OAuth links to clean up"); + } else { + info!(count = total, "cleaned up orphaned upstream OAuth links"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for CleanupUserRegistrationsJob { #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index a830a207f..5278caed3 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -139,6 +139,8 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -208,6 +210,18 @@ pub async fn init( "0 57 * * * *".parse()?, mas_storage::queue::CleanupUserEmailAuthenticationsJob, ) + .add_schedule( + "cleanup-upstream-oauth-sessions", + // Run this job every hour + "0 58 * * * *".parse()?, + mas_storage::queue::CleanupUpstreamOAuthSessionsJob, + ) + .add_schedule( + "cleanup-upstream-oauth-links", + // Run this job every hour + "0 59 * * * *".parse()?, + mas_storage::queue::CleanupUpstreamOAuthLinksJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours