diff --git a/crates/storage-pg/.sqlx/query-036e9e2cb7271782e48700fecd3fdd80f596ed433f37f2528c7edbdc88b13646.json b/crates/storage-pg/.sqlx/query-036e9e2cb7271782e48700fecd3fdd80f596ed433f37f2528c7edbdc88b13646.json deleted file mode 100644 index 68eb6347c..000000000 --- a/crates/storage-pg/.sqlx/query-036e9e2cb7271782e48700fecd3fdd80f596ed433f37f2528c7edbdc88b13646.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM oauth2_consents\n WHERE oauth2_client_id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "036e9e2cb7271782e48700fecd3fdd80f596ed433f37f2528c7edbdc88b13646" -} diff --git a/crates/storage-pg/.sqlx/query-494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009.json b/crates/storage-pg/.sqlx/query-494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009.json new file mode 100644 index 000000000..f7e0bd73c --- /dev/null +++ b/crates/storage-pg/.sqlx/query-494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n to_delete AS (\n SELECT compat_session_id, finished_at\n FROM compat_sessions\n WHERE finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR finished_at >= $1)\n AND finished_at < $2\n ORDER BY finished_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n\n -- Delete refresh tokens first because they reference access tokens\n deleted_refresh_tokens AS (\n DELETE FROM compat_refresh_tokens\n USING to_delete\n WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id\n ),\n\n deleted_access_tokens AS (\n DELETE FROM compat_access_tokens\n USING to_delete\n WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id\n ),\n\n deleted_sso_logins AS (\n DELETE FROM compat_sso_logins\n USING to_delete\n WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id\n ),\n\n deleted_sessions AS (\n DELETE FROM compat_sessions\n USING to_delete\n WHERE compat_sessions.compat_session_id = to_delete.compat_session_id\n RETURNING compat_sessions.finished_at\n )\n\n SELECT\n COUNT(*) as \"count!\",\n MAX(finished_at) as last_finished_at\n FROM deleted_sessions\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_finished_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009" +} diff --git a/crates/storage-pg/migrations/20260115111313_idx_compat_sessions_finished_at.sql b/crates/storage-pg/migrations/20260115111313_idx_compat_sessions_finished_at.sql new file mode 100644 index 000000000..8e5acb2cc --- /dev/null +++ b/crates/storage-pg/migrations/20260115111313_idx_compat_sessions_finished_at.sql @@ -0,0 +1,11 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Index to efficiently query finished compat sessions for cleanup +-- Only includes non-null finished_at values since we filter on finished_at IS NOT NULL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "compat_sessions_finished_at_idx" + ON "compat_sessions" ("finished_at") + WHERE "finished_at" IS NOT NULL; diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index 0fb21c487..df071c752 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2023, 2024 The Matrix.org Foundation C.I.C. // @@ -684,4 +685,77 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { Ok(compat_session) } + + #[tracing::instrument( + name = "db.compat_session.cleanup_finished", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH + to_delete AS ( + SELECT compat_session_id, finished_at + FROM compat_sessions + WHERE finished_at IS NOT NULL + AND ($1::timestamptz IS NULL OR finished_at >= $1) + AND finished_at < $2 + ORDER BY finished_at ASC + LIMIT $3 + FOR UPDATE + ), + + -- Delete refresh tokens first because they reference access tokens + deleted_refresh_tokens AS ( + DELETE FROM compat_refresh_tokens + USING to_delete + WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id + ), + + deleted_access_tokens AS ( + DELETE FROM compat_access_tokens + USING to_delete + WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id + ), + + deleted_sso_logins AS ( + DELETE FROM compat_sso_logins + USING to_delete + WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id + ), + + deleted_sessions AS ( + DELETE FROM compat_sessions + USING to_delete + WHERE compat_sessions.compat_session_id = to_delete.compat_session_id + RETURNING compat_sessions.finished_at + ) + + SELECT + COUNT(*) as "count!", + MAX(finished_at) as last_finished_at + FROM deleted_sessions + "#, + since, + until, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_finished_at, + )) + } } diff --git a/crates/storage/src/compat/session.rs b/crates/storage/src/compat/session.rs index d16351c0a..0ed335ed9 100644 --- a/crates/storage/src/compat/session.rs +++ b/crates/storage/src/compat/session.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // Copyright 2023, 2024 The Matrix.org Foundation C.I.C. // @@ -361,6 +362,30 @@ pub trait CompatSessionRepository: Send + Sync { compat_session: CompatSession, human_name: Option, ) -> Result; + + /// Cleanup finished [`CompatSession`]s and their associated tokens. + /// + /// This deletes compat sessions that have been finished, along with their + /// associated access tokens, refresh tokens, and SSO logins. + /// + /// Returns the number of sessions deleted and the timestamp of the last + /// deleted session's `finished_at`, which can be used for pagination. + /// + /// # Parameters + /// + /// * `since`: Only delete sessions finished at or after this timestamp + /// * `until`: Only delete sessions finished before this timestamp + /// * `limit`: Maximum number of sessions to delete + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(CompatSessionRepository: @@ -413,4 +438,11 @@ repository_impl!(CompatSessionRepository: compat_session: CompatSession, human_name: Option, ) -> Result; + + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index a7326eb35..f7742efd0 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -358,6 +358,14 @@ impl InsertableJob for CleanupUserRegistrationsJob { const QUEUE_NAME: &'static str = "cleanup-user-registrations"; } +/// Cleanup finished compat sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupFinishedCompatSessionsJob; + +impl InsertableJob for CleanupFinishedCompatSessionsJob { + const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 039750a74..6f94c4aee 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -12,8 +12,8 @@ use std::time::Duration; use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, - CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, - CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupFinishedCompatSessionsJob, CleanupRevokedOAuthAccessTokensJob, + CleanupRevokedOAuthRefreshTokensJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -267,6 +267,53 @@ impl RunnableJob for CleanupUserRegistrationsJob { } } +#[async_trait] +impl RunnableJob for CleanupFinishedCompatSessionsJob { + #[tracing::instrument(name = "job.cleanup_finished_compat_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup compat sessions that were finished more than 30 days ago + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted sessions, and the last finished_at + // timestamp + let (count, last_finished_at) = repo + .compat_session() + .cleanup_finished(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_finished_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no finished compat sessions to clean up"); + } else { + info!(count = total, "cleaned up finished compat sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(60)) + } +} + #[async_trait] impl RunnableJob for PruneStalePolicyDataJob { #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index f0e2e0f98..e2d0734f2 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -134,6 +134,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -173,6 +174,12 @@ pub async fn init( "0 30 * * * *".parse()?, mas_storage::queue::CleanupUserRegistrationsJob, ) + .add_schedule( + "cleanup-finished-compat-sessions", + // Run this job every hour + "0 40 * * * *".parse()?, + mas_storage::queue::CleanupFinishedCompatSessionsJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours