Cleanup finished user/browser sessions (#5444)
This commit is contained in:
30
crates/storage-pg/.sqlx/query-d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH\n to_delete AS (\n SELECT user_session_id, finished_at\n FROM user_sessions us\n WHERE us.finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR us.finished_at >= $1)\n AND us.finished_at < $2\n -- Only delete if no oauth2_sessions reference this user_session\n AND NOT EXISTS (\n SELECT 1 FROM oauth2_sessions os\n WHERE os.user_session_id = us.user_session_id\n )\n -- Only delete if no compat_sessions reference this user_session\n AND NOT EXISTS (\n SELECT 1 FROM compat_sessions cs\n WHERE cs.user_session_id = us.user_session_id\n )\n ORDER BY us.finished_at ASC\n LIMIT $3\n FOR UPDATE OF us\n ),\n deleted_authentications AS (\n DELETE FROM user_session_authentications USING to_delete\n WHERE user_session_authentications.user_session_id = to_delete.user_session_id\n ),\n deleted_sessions AS (\n DELETE FROM user_sessions USING to_delete\n WHERE user_sessions.user_session_id = to_delete.user_session_id\n RETURNING user_sessions.finished_at\n )\n SELECT COUNT(*) as \"count!\", MAX(finished_at) as last_finished_at FROM deleted_sessions\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "last_finished_at",
|
||||
"type_info": "Timestamptz"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "d95cd1b4bcfa1d7bb236d49e1956fcc9a684609956972fe4f95aac13f30b2530"
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE files in the repository root for full details.
|
||||
|
||||
-- Change compat_sessions.user_session_id FK from ON DELETE SET NULL to NO ACTION
|
||||
-- This ensures user_sessions cannot be deleted while compat_sessions reference them,
|
||||
-- which is required for backchannel logout propagation to work correctly.
|
||||
--
|
||||
-- Uses NOT VALID to avoid scanning the entire table while holding a lock.
|
||||
-- A separate migration will validate the constraint.
|
||||
|
||||
ALTER TABLE compat_sessions
|
||||
DROP CONSTRAINT compat_sessions_user_session_id_fkey,
|
||||
ADD CONSTRAINT compat_sessions_user_session_id_fkey
|
||||
FOREIGN KEY (user_session_id)
|
||||
REFERENCES user_sessions (user_session_id)
|
||||
ON DELETE NO ACTION
|
||||
NOT VALID;
|
||||
@@ -0,0 +1,9 @@
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE files in the repository root for full details.
|
||||
|
||||
-- Validate the constraint added in the previous migration.
|
||||
-- This scans the table but does not hold an exclusive lock.
|
||||
ALTER TABLE compat_sessions
|
||||
VALIDATE CONSTRAINT compat_sessions_user_session_id_fkey;
|
||||
@@ -0,0 +1,11 @@
|
||||
-- no-transaction
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE files in the repository root for full details.
|
||||
|
||||
-- Adds a partial index on user_sessions.finished_at to help cleaning up
|
||||
-- finished sessions
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "user_sessions_finished_at_idx"
|
||||
ON "user_sessions" ("finished_at")
|
||||
WHERE "finished_at" IS NOT NULL;
|
||||
@@ -630,4 +630,69 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.browser_session.cleanup_finished",
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
until = %until,
|
||||
limit = limit,
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn cleanup_finished(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
WITH
|
||||
to_delete AS (
|
||||
SELECT user_session_id, finished_at
|
||||
FROM user_sessions us
|
||||
WHERE us.finished_at IS NOT NULL
|
||||
AND ($1::timestamptz IS NULL OR us.finished_at >= $1)
|
||||
AND us.finished_at < $2
|
||||
-- Only delete if no oauth2_sessions reference this user_session
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM oauth2_sessions os
|
||||
WHERE os.user_session_id = us.user_session_id
|
||||
)
|
||||
-- Only delete if no compat_sessions reference this user_session
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM compat_sessions cs
|
||||
WHERE cs.user_session_id = us.user_session_id
|
||||
)
|
||||
ORDER BY us.finished_at ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE OF us
|
||||
),
|
||||
deleted_authentications AS (
|
||||
DELETE FROM user_session_authentications USING to_delete
|
||||
WHERE user_session_authentications.user_session_id = to_delete.user_session_id
|
||||
),
|
||||
deleted_sessions AS (
|
||||
DELETE FROM user_sessions USING to_delete
|
||||
WHERE user_sessions.user_session_id = to_delete.user_session_id
|
||||
RETURNING user_sessions.finished_at
|
||||
)
|
||||
SELECT COUNT(*) as "count!", MAX(finished_at) as last_finished_at FROM deleted_sessions
|
||||
"#,
|
||||
since,
|
||||
until,
|
||||
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||
)
|
||||
.traced()
|
||||
.fetch_one(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
res.count.try_into().unwrap_or(usize::MAX),
|
||||
res.last_finished_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,6 +374,14 @@ impl InsertableJob for CleanupFinishedOAuth2SessionsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-finished-oauth2-sessions";
|
||||
}
|
||||
|
||||
/// Cleanup finished user/browser sessions
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupFinishedUserSessionsJob;
|
||||
|
||||
impl InsertableJob for CleanupFinishedUserSessionsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-finished-user-sessions";
|
||||
}
|
||||
|
||||
/// Cleanup old OAuth 2.0 authorization grants
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupOAuthAuthorizationGrantsJob;
|
||||
|
||||
@@ -307,6 +307,30 @@ pub trait BrowserSessionRepository: Send + Sync {
|
||||
&mut self,
|
||||
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Cleanup finished [`BrowserSession`]s
|
||||
///
|
||||
/// Deletes sessions finished between `since` and `until`, but only if they
|
||||
/// have no child sessions (`compat_sessions` or `oauth2_sessions`). Returns
|
||||
/// the number of deleted sessions and the timestamp of the last deleted
|
||||
/// session for pagination.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: The earliest finish time to delete (exclusive). If `None`,
|
||||
/// starts from the beginning.
|
||||
/// * `until`: The latest finish time to delete (exclusive)
|
||||
/// * `limit`: Maximum number of sessions to delete in this batch
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup_finished(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(BrowserSessionRepository:
|
||||
@@ -363,4 +387,11 @@ repository_impl!(BrowserSessionRepository:
|
||||
&mut self,
|
||||
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
async fn cleanup_finished(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -13,11 +13,11 @@ use async_trait::async_trait;
|
||||
use mas_storage::queue::{
|
||||
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
||||
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
|
||||
CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob,
|
||||
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
|
||||
CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob,
|
||||
CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob,
|
||||
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
CleanupFinishedUserSessionsJob, CleanupOAuthAuthorizationGrantsJob,
|
||||
CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob,
|
||||
CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob,
|
||||
CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob,
|
||||
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use ulid::Ulid;
|
||||
@@ -610,6 +610,55 @@ impl RunnableJob for CleanupFinishedOAuth2SessionsJob {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupFinishedUserSessionsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)]
|
||||
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||
// Cleanup user/browser sessions that were finished more than 30 days ago
|
||||
let until = state.clock.now() - chrono::Duration::days(30);
|
||||
let mut total = 0;
|
||||
|
||||
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
|
||||
// this is a scheduled job and it will end up being rescheduled later anyway.
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
|
||||
// This returns the number of deleted sessions, and the last finished_at
|
||||
// timestamp. Only deletes sessions that have no child sessions
|
||||
// (compat_sessions or oauth2_sessions).
|
||||
let (count, last_finished_at) = repo
|
||||
.browser_session()
|
||||
.cleanup_finished(since, until, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
since = last_finished_at;
|
||||
total += count;
|
||||
|
||||
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
|
||||
// there might be more to delete
|
||||
if count != BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
debug!("no finished user sessions to clean up");
|
||||
} else {
|
||||
info!(count = total, "cleaned up finished user sessions");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn timeout(&self) -> Option<Duration> {
|
||||
// This job runs every hour, so having it running it for 10 minutes is fine
|
||||
Some(Duration::from_secs(10 * 60))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)]
|
||||
|
||||
@@ -136,6 +136,7 @@ pub async fn init(
|
||||
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupFinishedUserSessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
|
||||
@@ -194,6 +195,12 @@ pub async fn init(
|
||||
"0 42 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-finished-user-sessions",
|
||||
// Run this job every hour
|
||||
"0 44 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupFinishedUserSessionsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-oauth-authorization-grants",
|
||||
// Run this job every hour
|
||||
|
||||
Reference in New Issue
Block a user