diff --git a/crates/storage-pg/.sqlx/query-d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0.json b/crates/storage-pg/.sqlx/query-d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0.json new file mode 100644 index 000000000..275273b93 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n to_delete AS (\n SELECT oauth2_session_id, finished_at\n FROM oauth2_sessions\n WHERE finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR finished_at >= $1)\n AND finished_at < $2\n ORDER BY finished_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n deleted_refresh_tokens AS (\n DELETE FROM oauth2_refresh_tokens USING to_delete\n WHERE oauth2_refresh_tokens.oauth2_session_id = to_delete.oauth2_session_id\n ),\n deleted_access_tokens AS (\n DELETE FROM oauth2_access_tokens USING to_delete\n WHERE oauth2_access_tokens.oauth2_session_id = to_delete.oauth2_session_id\n ),\n deleted_sessions AS (\n DELETE FROM oauth2_sessions USING to_delete\n WHERE oauth2_sessions.oauth2_session_id = to_delete.oauth2_session_id\n RETURNING oauth2_sessions.finished_at\n )\n SELECT COUNT(*) as \"count!\", MAX(finished_at) as last_finished_at FROM deleted_sessions\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_finished_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0" +} diff --git a/crates/storage-pg/migrations/20260122123211_idx_oauth2_sessions_finished_at.sql b/crates/storage-pg/migrations/20260122123211_idx_oauth2_sessions_finished_at.sql new file mode 100644 index 000000000..e075b345b --- /dev/null +++ b/crates/storage-pg/migrations/20260122123211_idx_oauth2_sessions_finished_at.sql @@ -0,0 +1,11 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Adds a partial index on oauth2_sessions.finished_at to help cleaning up +-- finished sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "oauth2_sessions_finished_at_idx" + ON "oauth2_sessions" ("finished_at") + WHERE "finished_at" IS NOT NULL; diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index 072691a06..a6f8dab56 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -592,4 +593,63 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { Ok(session) } + + #[tracing::instrument( + name = "db.oauth2_session.cleanup_finished", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH + to_delete AS ( + SELECT oauth2_session_id, finished_at + FROM oauth2_sessions + WHERE finished_at IS NOT NULL + AND ($1::timestamptz IS NULL OR finished_at >= $1) + AND finished_at < $2 + ORDER BY finished_at ASC + LIMIT $3 + FOR UPDATE + ), + deleted_refresh_tokens AS ( + DELETE FROM oauth2_refresh_tokens USING to_delete + WHERE oauth2_refresh_tokens.oauth2_session_id = to_delete.oauth2_session_id + ), + deleted_access_tokens AS ( + DELETE FROM oauth2_access_tokens USING to_delete + WHERE oauth2_access_tokens.oauth2_session_id = to_delete.oauth2_session_id + ), + deleted_sessions AS ( + DELETE FROM oauth2_sessions USING to_delete + WHERE oauth2_sessions.oauth2_session_id = to_delete.oauth2_session_id + RETURNING oauth2_sessions.finished_at + ) + SELECT COUNT(*) as "count!", MAX(finished_at) as last_finished_at FROM deleted_sessions + "#, + since, + until, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_finished_at, + )) + } } diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 30ac1abe1..068512b14 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2022-2024 The Matrix.org Foundation C.I.C. // @@ -461,6 +462,29 @@ pub trait OAuth2SessionRepository: Send + Sync { session: Session, human_name: Option, ) -> Result; + + /// Cleanup finished [`Session`]s + /// + /// Deletes sessions finished between `since` and `until`. Returns the + /// number of deleted sessions and the timestamp of the last deleted + /// session for pagination. + /// + /// # Parameters + /// + /// * `since`: The earliest finish time to delete (exclusive). If `None`, + /// starts from the beginning. + /// * `until`: The latest finish time to delete (exclusive) + /// * `limit`: Maximum number of sessions to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(OAuth2SessionRepository: @@ -526,4 +550,11 @@ repository_impl!(OAuth2SessionRepository: session: Session, human_name: Option, ) -> Result; + + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 7ea900a47..09067976d 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -366,6 +366,14 @@ impl InsertableJob for CleanupFinishedCompatSessionsJob { const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions"; } +/// Cleanup finished OAuth 2.0 sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupFinishedOAuth2SessionsJob; + +impl InsertableJob for CleanupFinishedOAuth2SessionsJob { + const QUEUE_NAME: &'static str = "cleanup-finished-oauth2-sessions"; +} + /// Cleanup old OAuth 2.0 authorization grants #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct CleanupOAuthAuthorizationGrantsJob; diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 1d1f4e314..daa9254e0 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -12,11 +12,12 @@ use std::time::Duration; use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, - CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob, - CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob, - CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, - CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, - CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob, + CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, + CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, + CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, + CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, + CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -561,6 +562,54 @@ impl RunnableJob for CleanupFinishedCompatSessionsJob { } } +#[async_trait] +impl RunnableJob for CleanupFinishedOAuth2SessionsJob { + #[tracing::instrument(name = "job.cleanup_finished_oauth2_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup OAuth2 sessions that were finished more than 30 days ago + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted sessions, and the last finished_at + // timestamp + let (count, last_finished_at) = repo + .oauth2_session() + .cleanup_finished(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_finished_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no finished OAuth2 sessions to clean up"); + } else { + info!(count = total, "cleaned up finished OAuth2 sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for CleanupOAuthAuthorizationGrantsJob { #[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 54dc8d713..3ea660e5e 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -135,6 +135,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -187,6 +188,12 @@ pub async fn init( "0 40 * * * *".parse()?, mas_storage::queue::CleanupFinishedCompatSessionsJob, ) + .add_schedule( + "cleanup-finished-oauth2-sessions", + // Run this job every hour + "0 42 * * * *".parse()?, + mas_storage::queue::CleanupFinishedOAuth2SessionsJob, + ) .add_schedule( "cleanup-oauth-authorization-grants", // Run this job every hour