diff --git a/crates/storage-pg/.sqlx/query-d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530.json b/crates/storage-pg/.sqlx/query-d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530.json new file mode 100644 index 000000000..119530215 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n to_delete AS (\n SELECT user_session_id, finished_at\n FROM user_sessions us\n WHERE us.finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR us.finished_at >= $1)\n AND us.finished_at < $2\n -- Only delete if no oauth2_sessions reference this user_session\n AND NOT EXISTS (\n SELECT 1 FROM oauth2_sessions os\n WHERE os.user_session_id = us.user_session_id\n )\n -- Only delete if no compat_sessions reference this user_session\n AND NOT EXISTS (\n SELECT 1 FROM compat_sessions cs\n WHERE cs.user_session_id = us.user_session_id\n )\n ORDER BY us.finished_at ASC\n LIMIT $3\n FOR UPDATE OF us\n ),\n deleted_authentications AS (\n DELETE FROM user_session_authentications USING to_delete\n WHERE user_session_authentications.user_session_id = to_delete.user_session_id\n ),\n deleted_sessions AS (\n DELETE FROM user_sessions USING to_delete\n WHERE user_sessions.user_session_id = to_delete.user_session_id\n RETURNING user_sessions.finished_at\n )\n SELECT COUNT(*) as \"count!\", MAX(finished_at) as last_finished_at FROM deleted_sessions\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_finished_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530" +} diff --git a/crates/storage-pg/migrations/20260122113523_compat_sessions_user_session_no_action.sql b/crates/storage-pg/migrations/20260122113523_compat_sessions_user_session_no_action.sql new file mode 100644 index 000000000..d86ea2291 --- /dev/null +++ b/crates/storage-pg/migrations/20260122113523_compat_sessions_user_session_no_action.sql @@ -0,0 +1,19 @@ +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Change compat_sessions.user_session_id FK from ON DELETE SET NULL to NO ACTION +-- This ensures user_sessions cannot be deleted while compat_sessions reference them, +-- which is required for backchannel logout propagation to work correctly. +-- +-- Uses NOT VALID to avoid scanning the entire table while holding a lock. +-- A separate migration will validate the constraint. + +ALTER TABLE compat_sessions + DROP CONSTRAINT compat_sessions_user_session_id_fkey, + ADD CONSTRAINT compat_sessions_user_session_id_fkey + FOREIGN KEY (user_session_id) + REFERENCES user_sessions (user_session_id) + ON DELETE NO ACTION + NOT VALID; diff --git a/crates/storage-pg/migrations/20260122114353_compat_sessions_user_session_validate.sql b/crates/storage-pg/migrations/20260122114353_compat_sessions_user_session_validate.sql new file mode 100644 index 000000000..4a5a7f23d --- /dev/null +++ b/crates/storage-pg/migrations/20260122114353_compat_sessions_user_session_validate.sql @@ -0,0 +1,9 @@ +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Validate the constraint added in the previous migration. +-- This scans the table but does not hold an exclusive lock. +ALTER TABLE compat_sessions + VALIDATE CONSTRAINT compat_sessions_user_session_id_fkey; diff --git a/crates/storage-pg/migrations/20260122124231_idx_user_sessions_finished_at.sql b/crates/storage-pg/migrations/20260122124231_idx_user_sessions_finished_at.sql new file mode 100644 index 000000000..5f09a7ada --- /dev/null +++ b/crates/storage-pg/migrations/20260122124231_idx_user_sessions_finished_at.sql @@ -0,0 +1,11 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Adds a partial index on user_sessions.finished_at to help cleaning up +-- finished sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "user_sessions_finished_at_idx" + ON "user_sessions" ("finished_at") + WHERE "finished_at" IS NOT NULL; diff --git a/crates/storage-pg/src/user/session.rs b/crates/storage-pg/src/user/session.rs index 9c546695e..0af857f57 100644 --- a/crates/storage-pg/src/user/session.rs +++ b/crates/storage-pg/src/user/session.rs @@ -630,4 +630,69 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { Ok(()) } + + #[tracing::instrument( + name = "db.browser_session.cleanup_finished", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH + to_delete AS ( + SELECT user_session_id, finished_at + FROM user_sessions us + WHERE us.finished_at IS NOT NULL + AND ($1::timestamptz IS NULL OR us.finished_at >= $1) + AND us.finished_at < $2 + -- Only delete if no oauth2_sessions reference this user_session + AND NOT EXISTS ( + SELECT 1 FROM oauth2_sessions os + WHERE os.user_session_id = us.user_session_id + ) + -- Only delete if no compat_sessions reference this user_session + AND NOT EXISTS ( + SELECT 1 FROM compat_sessions cs + WHERE cs.user_session_id = us.user_session_id + ) + ORDER BY us.finished_at ASC + LIMIT $3 + FOR UPDATE OF us + ), + deleted_authentications AS ( + DELETE FROM user_session_authentications USING to_delete + WHERE user_session_authentications.user_session_id = to_delete.user_session_id + ), + deleted_sessions AS ( + DELETE FROM user_sessions USING to_delete + WHERE user_sessions.user_session_id = to_delete.user_session_id + RETURNING user_sessions.finished_at + ) + SELECT COUNT(*) as "count!", MAX(finished_at) as last_finished_at FROM deleted_sessions + "#, + since, + until, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_finished_at, + )) + } } diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 09067976d..edf3a3c5e 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -374,6 +374,14 @@ impl InsertableJob for CleanupFinishedOAuth2SessionsJob { const QUEUE_NAME: &'static str = "cleanup-finished-oauth2-sessions"; } +/// Cleanup finished user/browser sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupFinishedUserSessionsJob; + +impl InsertableJob for CleanupFinishedUserSessionsJob { + const QUEUE_NAME: &'static str = "cleanup-finished-user-sessions"; +} + /// Cleanup old OAuth 2.0 authorization grants #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct CleanupOAuthAuthorizationGrantsJob; diff --git a/crates/storage/src/user/session.rs b/crates/storage/src/user/session.rs index 78e2f6ebd..fb7517b02 100644 --- a/crates/storage/src/user/session.rs +++ b/crates/storage/src/user/session.rs @@ -307,6 +307,30 @@ pub trait BrowserSessionRepository: Send + Sync { &mut self, activity: Vec<(Ulid, DateTime, Option)>, ) -> Result<(), Self::Error>; + + /// Cleanup finished [`BrowserSession`]s + /// + /// Deletes sessions finished between `since` and `until`, but only if they + /// have no child sessions (`compat_sessions` or `oauth2_sessions`). Returns + /// the number of deleted sessions and the timestamp of the last deleted + /// session for pagination. + /// + /// # Parameters + /// + /// * `since`: The earliest finish time to delete (exclusive). If `None`, + /// starts from the beginning. + /// * `until`: The latest finish time to delete (exclusive) + /// * `limit`: Maximum number of sessions to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(BrowserSessionRepository: @@ -363,4 +387,11 @@ repository_impl!(BrowserSessionRepository: &mut self, activity: Vec<(Ulid, DateTime, Option)>, ) -> Result<(), Self::Error>; + + async fn cleanup_finished( + &mut self, + since: Option>, + until: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index daa9254e0..0df35dbfd 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -13,11 +13,11 @@ use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob, - CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, - CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, - CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, - CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, - CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupFinishedUserSessionsJob, CleanupOAuthAuthorizationGrantsJob, + CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob, + CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, + CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, + CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -610,6 +610,55 @@ impl RunnableJob for CleanupFinishedOAuth2SessionsJob { } } +#[async_trait] +impl RunnableJob for CleanupFinishedUserSessionsJob { + #[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup user/browser sessions that were finished more than 30 days ago + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted sessions, and the last finished_at + // timestamp. Only deletes sessions that have no child sessions + // (compat_sessions or oauth2_sessions). + let (count, last_finished_at) = repo + .browser_session() + .cleanup_finished(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_finished_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no finished user sessions to clean up"); + } else { + info!(count = total, "cleaned up finished user sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for CleanupOAuthAuthorizationGrantsJob { #[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 3ea660e5e..d4f58b23b 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -136,6 +136,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -194,6 +195,12 @@ pub async fn init( "0 42 * * * *".parse()?, mas_storage::queue::CleanupFinishedOAuth2SessionsJob, ) + .add_schedule( + "cleanup-finished-user-sessions", + // Run this job every hour + "0 44 * * * *".parse()?, + mas_storage::queue::CleanupFinishedUserSessionsJob, + ) .add_schedule( "cleanup-oauth-authorization-grants", // Run this job every hour