Cleanup finished OAuth 2.0 sessions (#5443)
This commit is contained in:
30
crates/storage-pg/.sqlx/query-d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH\n to_delete AS (\n SELECT oauth2_session_id, finished_at\n FROM oauth2_sessions\n WHERE finished_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR finished_at >= $1)\n AND finished_at < $2\n ORDER BY finished_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n deleted_refresh_tokens AS (\n DELETE FROM oauth2_refresh_tokens USING to_delete\n WHERE oauth2_refresh_tokens.oauth2_session_id = to_delete.oauth2_session_id\n ),\n deleted_access_tokens AS (\n DELETE FROM oauth2_access_tokens USING to_delete\n WHERE oauth2_access_tokens.oauth2_session_id = to_delete.oauth2_session_id\n ),\n deleted_sessions AS (\n DELETE FROM oauth2_sessions USING to_delete\n WHERE oauth2_sessions.oauth2_session_id = to_delete.oauth2_session_id\n RETURNING oauth2_sessions.finished_at\n )\n SELECT COUNT(*) as \"count!\", MAX(finished_at) as last_finished_at FROM deleted_sessions\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "last_finished_at",
|
||||
"type_info": "Timestamptz"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "d8f0b02952e786dd4309eac9de04a359aea3a46e5d4e07764cec56ce5d6609c0"
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
-- no-transaction
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE files in the repository root for full details.
|
||||
|
||||
-- Adds a partial index on oauth2_sessions.finished_at to help cleaning up
|
||||
-- finished sessions
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "oauth2_sessions_finished_at_idx"
|
||||
ON "oauth2_sessions" ("finished_at")
|
||||
WHERE "finished_at" IS NOT NULL;
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
@@ -592,4 +593,63 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> {
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.oauth2_session.cleanup_finished",
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
until = %until,
|
||||
limit = limit,
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn cleanup_finished(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
WITH
|
||||
to_delete AS (
|
||||
SELECT oauth2_session_id, finished_at
|
||||
FROM oauth2_sessions
|
||||
WHERE finished_at IS NOT NULL
|
||||
AND ($1::timestamptz IS NULL OR finished_at >= $1)
|
||||
AND finished_at < $2
|
||||
ORDER BY finished_at ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE
|
||||
),
|
||||
deleted_refresh_tokens AS (
|
||||
DELETE FROM oauth2_refresh_tokens USING to_delete
|
||||
WHERE oauth2_refresh_tokens.oauth2_session_id = to_delete.oauth2_session_id
|
||||
),
|
||||
deleted_access_tokens AS (
|
||||
DELETE FROM oauth2_access_tokens USING to_delete
|
||||
WHERE oauth2_access_tokens.oauth2_session_id = to_delete.oauth2_session_id
|
||||
),
|
||||
deleted_sessions AS (
|
||||
DELETE FROM oauth2_sessions USING to_delete
|
||||
WHERE oauth2_sessions.oauth2_session_id = to_delete.oauth2_session_id
|
||||
RETURNING oauth2_sessions.finished_at
|
||||
)
|
||||
SELECT COUNT(*) as "count!", MAX(finished_at) as last_finished_at FROM deleted_sessions
|
||||
"#,
|
||||
since,
|
||||
until,
|
||||
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||
)
|
||||
.traced()
|
||||
.fetch_one(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
res.count.try_into().unwrap_or(usize::MAX),
|
||||
res.last_finished_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
@@ -461,6 +462,29 @@ pub trait OAuth2SessionRepository: Send + Sync {
|
||||
session: Session,
|
||||
human_name: Option<String>,
|
||||
) -> Result<Session, Self::Error>;
|
||||
|
||||
/// Cleanup finished [`Session`]s
|
||||
///
|
||||
/// Deletes sessions finished between `since` and `until`. Returns the
|
||||
/// number of deleted sessions and the timestamp of the last deleted
|
||||
/// session for pagination.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: The earliest finish time to delete (exclusive). If `None`,
|
||||
/// starts from the beginning.
|
||||
/// * `until`: The latest finish time to delete (exclusive)
|
||||
/// * `limit`: Maximum number of sessions to delete in this batch
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup_finished(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(OAuth2SessionRepository:
|
||||
@@ -526,4 +550,11 @@ repository_impl!(OAuth2SessionRepository:
|
||||
session: Session,
|
||||
human_name: Option<String>,
|
||||
) -> Result<Session, Self::Error>;
|
||||
|
||||
async fn cleanup_finished(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -366,6 +366,14 @@ impl InsertableJob for CleanupFinishedCompatSessionsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions";
|
||||
}
|
||||
|
||||
/// Cleanup finished OAuth 2.0 sessions
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupFinishedOAuth2SessionsJob;
|
||||
|
||||
impl InsertableJob for CleanupFinishedOAuth2SessionsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-finished-oauth2-sessions";
|
||||
}
|
||||
|
||||
/// Cleanup old OAuth 2.0 authorization grants
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupOAuthAuthorizationGrantsJob;
|
||||
|
||||
@@ -12,11 +12,12 @@ use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use mas_storage::queue::{
|
||||
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
||||
CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob,
|
||||
CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob,
|
||||
CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob,
|
||||
CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob,
|
||||
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
|
||||
CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob,
|
||||
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
|
||||
CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob,
|
||||
CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob,
|
||||
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use ulid::Ulid;
|
||||
@@ -561,6 +562,54 @@ impl RunnableJob for CleanupFinishedCompatSessionsJob {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupFinishedOAuth2SessionsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_finished_oauth2_sessions", skip_all)]
|
||||
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||
// Cleanup OAuth2 sessions that were finished more than 30 days ago
|
||||
let until = state.clock.now() - chrono::Duration::days(30);
|
||||
let mut total = 0;
|
||||
|
||||
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
|
||||
// this is a scheduled job and it will end up being rescheduled later anyway.
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
|
||||
// This returns the number of deleted sessions, and the last finished_at
|
||||
// timestamp
|
||||
let (count, last_finished_at) = repo
|
||||
.oauth2_session()
|
||||
.cleanup_finished(since, until, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
since = last_finished_at;
|
||||
total += count;
|
||||
|
||||
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
|
||||
// there might be more to delete
|
||||
if count != BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
debug!("no finished OAuth2 sessions to clean up");
|
||||
} else {
|
||||
info!(count = total, "cleaned up finished OAuth2 sessions");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn timeout(&self) -> Option<Duration> {
|
||||
// This job runs every hour, so having it running it for 10 minutes is fine
|
||||
Some(Duration::from_secs(10 * 60))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)]
|
||||
|
||||
@@ -135,6 +135,7 @@ pub async fn init(
|
||||
.register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
|
||||
@@ -187,6 +188,12 @@ pub async fn init(
|
||||
"0 40 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupFinishedCompatSessionsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-finished-oauth2-sessions",
|
||||
// Run this job every hour
|
||||
"0 42 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-oauth-authorization-grants",
|
||||
// Run this job every hour
|
||||
|
||||
Reference in New Issue
Block a user