Cleanup finished compat sessions after 30 days
This commit is contained in:
@@ -1,14 +0,0 @@
|
|||||||
{
|
|
||||||
"db_name": "PostgreSQL",
|
|
||||||
"query": "\n DELETE FROM oauth2_consents\n WHERE oauth2_client_id = $1\n ",
|
|
||||||
"describe": {
|
|
||||||
"columns": [],
|
|
||||||
"parameters": {
|
|
||||||
"Left": [
|
|
||||||
"Uuid"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"nullable": []
|
|
||||||
},
|
|
||||||
"hash": "036e9e2cb7271782e48700fecd3fdd80f596ed433f37f2528c7edbdc88b13646"
|
|
||||||
}
|
|
||||||
30
crates/storage-pg/.sqlx/query-494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009.json
generated
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n WITH\n to_delete AS (\n SELECT compat_session_id, finished_at\n FROM compat_sessions\n WHERE finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR finished_at >= $1)\n AND finished_at < $2\n ORDER BY finished_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n\n -- Delete refresh tokens first because they reference access tokens\n deleted_refresh_tokens AS (\n DELETE FROM compat_refresh_tokens\n USING to_delete\n WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id\n ),\n\n deleted_access_tokens AS (\n DELETE FROM compat_access_tokens\n USING to_delete\n WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id\n ),\n\n deleted_sso_logins AS (\n DELETE FROM compat_sso_logins\n USING to_delete\n WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id\n ),\n\n deleted_sessions AS (\n DELETE FROM compat_sessions\n USING to_delete\n WHERE compat_sessions.compat_session_id = to_delete.compat_session_id\n RETURNING compat_sessions.finished_at\n )\n\n SELECT\n COUNT(*) as \"count!\",\n MAX(finished_at) as last_finished_at\n FROM deleted_sessions\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "count!",
|
||||||
|
"type_info": "Int8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ordinal": 1,
|
||||||
|
"name": "last_finished_at",
|
||||||
|
"type_info": "Timestamptz"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Timestamptz",
|
||||||
|
"Timestamptz",
|
||||||
|
"Int8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "494ca16f0f00f977a3031924a15318aa7346917e5c8a37bb0f5b2b3067588009"
|
||||||
|
}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
-- no-transaction
|
||||||
|
-- Copyright 2026 Element Creations Ltd.
|
||||||
|
--
|
||||||
|
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||||
|
-- Please see LICENSE files in the repository root for full details.
|
||||||
|
|
||||||
|
-- Index to efficiently query finished compat sessions for cleanup
|
||||||
|
-- Only includes non-null finished_at values since we filter on finished_at IS NOT NULL
|
||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS "compat_sessions_finished_at_idx"
|
||||||
|
ON "compat_sessions" ("finished_at")
|
||||||
|
WHERE "finished_at" IS NOT NULL;
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
// Copyright 2025, 2026 Element Creations Ltd.
|
||||||
// Copyright 2024, 2025 New Vector Ltd.
|
// Copyright 2024, 2025 New Vector Ltd.
|
||||||
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
|
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
|
||||||
//
|
//
|
||||||
@@ -684,4 +685,77 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
|
|||||||
|
|
||||||
Ok(compat_session)
|
Ok(compat_session)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "db.compat_session.cleanup_finished",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
db.query.text,
|
||||||
|
),
|
||||||
|
err,
|
||||||
|
)]
|
||||||
|
async fn cleanup_finished(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
until: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||||
|
let res = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
WITH
|
||||||
|
to_delete AS (
|
||||||
|
SELECT compat_session_id, finished_at
|
||||||
|
FROM compat_sessions
|
||||||
|
WHERE finished_at IS NOT NULL
|
||||||
|
AND ($1::timestamptz IS NULL OR finished_at >= $1)
|
||||||
|
AND finished_at < $2
|
||||||
|
ORDER BY finished_at ASC
|
||||||
|
LIMIT $3
|
||||||
|
FOR UPDATE
|
||||||
|
),
|
||||||
|
|
||||||
|
-- Delete refresh tokens first because they reference access tokens
|
||||||
|
deleted_refresh_tokens AS (
|
||||||
|
DELETE FROM compat_refresh_tokens
|
||||||
|
USING to_delete
|
||||||
|
WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id
|
||||||
|
),
|
||||||
|
|
||||||
|
deleted_access_tokens AS (
|
||||||
|
DELETE FROM compat_access_tokens
|
||||||
|
USING to_delete
|
||||||
|
WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id
|
||||||
|
),
|
||||||
|
|
||||||
|
deleted_sso_logins AS (
|
||||||
|
DELETE FROM compat_sso_logins
|
||||||
|
USING to_delete
|
||||||
|
WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id
|
||||||
|
),
|
||||||
|
|
||||||
|
deleted_sessions AS (
|
||||||
|
DELETE FROM compat_sessions
|
||||||
|
USING to_delete
|
||||||
|
WHERE compat_sessions.compat_session_id = to_delete.compat_session_id
|
||||||
|
RETURNING compat_sessions.finished_at
|
||||||
|
)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
COUNT(*) as "count!",
|
||||||
|
MAX(finished_at) as last_finished_at
|
||||||
|
FROM deleted_sessions
|
||||||
|
"#,
|
||||||
|
since,
|
||||||
|
until,
|
||||||
|
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||||
|
)
|
||||||
|
.traced()
|
||||||
|
.fetch_one(&mut *self.conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
res.count.try_into().unwrap_or(usize::MAX),
|
||||||
|
res.last_finished_at,
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
// Copyright 2025, 2026 Element Creations Ltd.
|
||||||
// Copyright 2024, 2025 New Vector Ltd.
|
// Copyright 2024, 2025 New Vector Ltd.
|
||||||
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
|
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
|
||||||
//
|
//
|
||||||
@@ -361,6 +362,30 @@ pub trait CompatSessionRepository: Send + Sync {
|
|||||||
compat_session: CompatSession,
|
compat_session: CompatSession,
|
||||||
human_name: Option<String>,
|
human_name: Option<String>,
|
||||||
) -> Result<CompatSession, Self::Error>;
|
) -> Result<CompatSession, Self::Error>;
|
||||||
|
|
||||||
|
/// Cleanup finished [`CompatSession`]s and their associated tokens.
|
||||||
|
///
|
||||||
|
/// This deletes compat sessions that have been finished, along with their
|
||||||
|
/// associated access tokens, refresh tokens, and SSO logins.
|
||||||
|
///
|
||||||
|
/// Returns the number of sessions deleted and the timestamp of the last
|
||||||
|
/// deleted session's `finished_at`, which can be used for pagination.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// * `since`: Only delete sessions finished at or after this timestamp
|
||||||
|
/// * `until`: Only delete sessions finished before this timestamp
|
||||||
|
/// * `limit`: Maximum number of sessions to delete
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns [`Self::Error`] if the underlying repository fails
|
||||||
|
async fn cleanup_finished(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
until: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
repository_impl!(CompatSessionRepository:
|
repository_impl!(CompatSessionRepository:
|
||||||
@@ -413,4 +438,11 @@ repository_impl!(CompatSessionRepository:
|
|||||||
compat_session: CompatSession,
|
compat_session: CompatSession,
|
||||||
human_name: Option<String>,
|
human_name: Option<String>,
|
||||||
) -> Result<CompatSession, Self::Error>;
|
) -> Result<CompatSession, Self::Error>;
|
||||||
|
|
||||||
|
async fn cleanup_finished(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
until: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -358,6 +358,14 @@ impl InsertableJob for CleanupUserRegistrationsJob {
|
|||||||
const QUEUE_NAME: &'static str = "cleanup-user-registrations";
|
const QUEUE_NAME: &'static str = "cleanup-user-registrations";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cleanup finished compat sessions
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||||
|
pub struct CleanupFinishedCompatSessionsJob;
|
||||||
|
|
||||||
|
impl InsertableJob for CleanupFinishedCompatSessionsJob {
|
||||||
|
const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions";
|
||||||
|
}
|
||||||
|
|
||||||
/// Scheduled job to expire inactive sessions
|
/// Scheduled job to expire inactive sessions
|
||||||
///
|
///
|
||||||
/// This job will trigger jobs to expire inactive compat, oauth and user
|
/// This job will trigger jobs to expire inactive compat, oauth and user
|
||||||
|
|||||||
@@ -12,8 +12,8 @@ use std::time::Duration;
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use mas_storage::queue::{
|
use mas_storage::queue::{
|
||||||
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
||||||
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
|
CleanupFinishedCompatSessionsJob, CleanupRevokedOAuthAccessTokensJob,
|
||||||
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
CleanupRevokedOAuthRefreshTokensJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||||
};
|
};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
use ulid::Ulid;
|
use ulid::Ulid;
|
||||||
@@ -267,6 +267,53 @@ impl RunnableJob for CleanupUserRegistrationsJob {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RunnableJob for CleanupFinishedCompatSessionsJob {
|
||||||
|
#[tracing::instrument(name = "job.cleanup_finished_compat_sessions", skip_all)]
|
||||||
|
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||||
|
// Cleanup compat sessions that were finished more than 30 days ago
|
||||||
|
let until = state.clock.now() - chrono::Duration::days(30);
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
|
||||||
|
// this is a scheduled job and it will end up being rescheduled later anyway.
|
||||||
|
let mut since = None;
|
||||||
|
while !context.cancellation_token.is_cancelled() {
|
||||||
|
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
// This returns the number of deleted sessions, and the last finished_at
|
||||||
|
// timestamp
|
||||||
|
let (count, last_finished_at) = repo
|
||||||
|
.compat_session()
|
||||||
|
.cleanup_finished(since, until, BATCH_SIZE)
|
||||||
|
.await
|
||||||
|
.map_err(JobError::retry)?;
|
||||||
|
repo.save().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
since = last_finished_at;
|
||||||
|
total += count;
|
||||||
|
|
||||||
|
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
|
||||||
|
// there might be more to delete
|
||||||
|
if count != BATCH_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if total == 0 {
|
||||||
|
debug!("no finished compat sessions to clean up");
|
||||||
|
} else {
|
||||||
|
info!(count = total, "cleaned up finished compat sessions");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timeout(&self) -> Option<Duration> {
|
||||||
|
Some(Duration::from_secs(60))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RunnableJob for PruneStalePolicyDataJob {
|
impl RunnableJob for PruneStalePolicyDataJob {
|
||||||
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]
|
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]
|
||||||
|
|||||||
@@ -134,6 +134,7 @@ pub async fn init(
|
|||||||
.register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
|
.register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
|
||||||
.register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
|
.register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
|
||||||
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
|
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
|
||||||
|
.register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
|
||||||
.register_handler::<mas_storage::queue::DeactivateUserJob>()
|
.register_handler::<mas_storage::queue::DeactivateUserJob>()
|
||||||
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
|
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
|
||||||
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
|
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
|
||||||
@@ -173,6 +174,12 @@ pub async fn init(
|
|||||||
"0 30 * * * *".parse()?,
|
"0 30 * * * *".parse()?,
|
||||||
mas_storage::queue::CleanupUserRegistrationsJob,
|
mas_storage::queue::CleanupUserRegistrationsJob,
|
||||||
)
|
)
|
||||||
|
.add_schedule(
|
||||||
|
"cleanup-finished-compat-sessions",
|
||||||
|
// Run this job every hour
|
||||||
|
"0 40 * * * *".parse()?,
|
||||||
|
mas_storage::queue::CleanupFinishedCompatSessionsJob,
|
||||||
|
)
|
||||||
.add_schedule(
|
.add_schedule(
|
||||||
"cleanup-expired-oauth-access-tokens",
|
"cleanup-expired-oauth-access-tokens",
|
||||||
// Run this job every 4 hours
|
// Run this job every 4 hours
|
||||||
|
|||||||
Reference in New Issue
Block a user