Add cleanup job for finished user sessions

Implements hard deletion of user/browser sessions that have been finished for more than 30 days, but only after all child sessions are cleaned up.

User sessions can only be deleted when no child sessions exist, ensuring backchannel logout propagation continues to work correctly.
This commit is contained in:
Quentin Gliech
2026-01-22 11:13:19 +01:00
parent c508c7899e
commit 85f71d2200
7 changed files with 208 additions and 5 deletions

View File

@@ -0,0 +1,30 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH\n to_delete AS (\n SELECT user_session_id, finished_at\n FROM user_sessions us\n WHERE us.finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR us.finished_at >= $1)\n AND us.finished_at < $2\n -- Only delete if no oauth2_sessions reference this user_session\n AND NOT EXISTS (\n SELECT 1 FROM oauth2_sessions os\n WHERE os.user_session_id = us.user_session_id\n )\n -- Only delete if no compat_sessions reference this user_session\n AND NOT EXISTS (\n SELECT 1 FROM compat_sessions cs\n WHERE cs.user_session_id = us.user_session_id\n )\n ORDER BY us.finished_at ASC\n LIMIT $3\n FOR UPDATE OF us\n ),\n deleted_authentications AS (\n DELETE FROM user_session_authentications USING to_delete\n WHERE user_session_authentications.user_session_id = to_delete.user_session_id\n ),\n deleted_sessions AS (\n DELETE FROM user_sessions USING to_delete\n WHERE user_sessions.user_session_id = to_delete.user_session_id\n RETURNING user_sessions.finished_at\n )\n SELECT COUNT(*) as \"count!\", MAX(finished_at) as last_finished_at FROM deleted_sessions\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "count!",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "last_finished_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz",
"Int8"
]
},
"nullable": [
null,
null
]
},
"hash": "d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530"
}

View File

@@ -0,0 +1,11 @@
-- no-transaction
-- Copyright 2026 Element Creations Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
-- Please see LICENSE files in the repository root for full details.
-- Adds a partial index on user_sessions.finished_at to help cleaning up
-- finished sessions
CREATE INDEX CONCURRENTLY IF NOT EXISTS "user_sessions_finished_at_idx"
ON "user_sessions" ("finished_at")
WHERE "finished_at" IS NOT NULL;

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -641,4 +642,69 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
Ok(())
}
#[tracing::instrument(
name = "db.browser_session.cleanup_finished",
skip_all,
fields(
db.query.text,
since = since.map(tracing::field::display),
until = %until,
limit = limit,
),
err,
)]
async fn cleanup_finished(
&mut self,
since: Option<DateTime<Utc>>,
until: DateTime<Utc>,
limit: usize,
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
let res = sqlx::query!(
r#"
WITH
to_delete AS (
SELECT user_session_id, finished_at
FROM user_sessions us
WHERE us.finished_at IS NOT NULL
AND ($1::timestamptz IS NULL OR us.finished_at >= $1)
AND us.finished_at < $2
-- Only delete if no oauth2_sessions reference this user_session
AND NOT EXISTS (
SELECT 1 FROM oauth2_sessions os
WHERE os.user_session_id = us.user_session_id
)
-- Only delete if no compat_sessions reference this user_session
AND NOT EXISTS (
SELECT 1 FROM compat_sessions cs
WHERE cs.user_session_id = us.user_session_id
)
ORDER BY us.finished_at ASC
LIMIT $3
FOR UPDATE OF us
),
deleted_authentications AS (
DELETE FROM user_session_authentications USING to_delete
WHERE user_session_authentications.user_session_id = to_delete.user_session_id
),
deleted_sessions AS (
DELETE FROM user_sessions USING to_delete
WHERE user_sessions.user_session_id = to_delete.user_session_id
RETURNING user_sessions.finished_at
)
SELECT COUNT(*) as "count!", MAX(finished_at) as last_finished_at FROM deleted_sessions
"#,
since,
until,
i64::try_from(limit).unwrap_or(i64::MAX),
)
.traced()
.fetch_one(&mut *self.conn)
.await?;
Ok((
res.count.try_into().unwrap_or(usize::MAX),
res.last_finished_at,
))
}
}

View File

@@ -374,6 +374,14 @@ impl InsertableJob for CleanupFinishedOAuth2SessionsJob {
const QUEUE_NAME: &'static str = "cleanup-finished-oauth2-sessions";
}
/// Cleanup finished user/browser sessions
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupFinishedUserSessionsJob;
impl InsertableJob for CleanupFinishedUserSessionsJob {
const QUEUE_NAME: &'static str = "cleanup-finished-user-sessions";
}
/// Cleanup old OAuth 2.0 authorization grants
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupOAuthAuthorizationGrantsJob;

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -307,6 +308,30 @@ pub trait BrowserSessionRepository: Send + Sync {
&mut self,
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
) -> Result<(), Self::Error>;
/// Cleanup finished [`BrowserSession`]s
///
/// Deletes sessions finished between `since` and `until`, but only if they
/// have no child sessions (`compat_sessions` or `oauth2_sessions`). Returns
/// the number of deleted sessions and the timestamp of the last deleted
/// session for pagination.
///
/// # Parameters
///
/// * `since`: The earliest finish time to delete (exclusive). If `None`,
/// starts from the beginning.
/// * `until`: The latest finish time to delete (exclusive)
/// * `limit`: Maximum number of sessions to delete in this batch
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup_finished(
&mut self,
since: Option<DateTime<Utc>>,
until: DateTime<Utc>,
limit: usize,
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
}
repository_impl!(BrowserSessionRepository:
@@ -363,4 +388,11 @@ repository_impl!(BrowserSessionRepository:
&mut self,
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
) -> Result<(), Self::Error>;
async fn cleanup_finished(
&mut self,
since: Option<DateTime<Utc>>,
until: DateTime<Utc>,
limit: usize,
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
);

View File

@@ -13,11 +13,11 @@ use async_trait::async_trait;
use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob,
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob,
CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob,
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
CleanupFinishedUserSessionsJob, CleanupOAuthAuthorizationGrantsJob,
CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob,
CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob,
CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob,
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
};
use tracing::{debug, info};
use ulid::Ulid;
@@ -610,6 +610,55 @@ impl RunnableJob for CleanupFinishedOAuth2SessionsJob {
}
}
#[async_trait]
impl RunnableJob for CleanupFinishedUserSessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup user/browser sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp. Only deletes sessions that have no child sessions
// (compat_sessions or oauth2_sessions).
let (count, last_finished_at) = repo
.browser_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished user sessions to clean up");
} else {
info!(count = total, "cleaned up finished user sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
#[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)]

View File

@@ -136,6 +136,7 @@ pub async fn init(
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
.register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
.register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
.register_handler::<mas_storage::queue::CleanupFinishedUserSessionsJob>()
.register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
.register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
.register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
@@ -194,6 +195,12 @@ pub async fn init(
"0 42 * * * *".parse()?,
mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
)
.add_schedule(
"cleanup-finished-user-sessions",
// Run this job every hour
"0 44 * * * *".parse()?,
mas_storage::queue::CleanupFinishedUserSessionsJob,
)
.add_schedule(
"cleanup-oauth-authorization-grants",
// Run this job every hour