Implement cleanup jobs for upstream OAuth sessions and links

Add two cleanup jobs scheduled hourly:

1. Upstream OAuth authorization sessions - removes sessions after 30 days
2. Orphaned upstream OAuth links - removes links after 7 days where user_id IS NULL. These are links created during upstream OAuth 2.0 login but never associated with a user
This commit is contained in:
Quentin Gliech
2026-01-16 13:26:29 +01:00
parent f350b94918
commit e7c07a8f88
10 changed files with 342 additions and 1 deletions

View File

@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH to_delete AS (\n SELECT upstream_oauth_link_id\n FROM upstream_oauth_links\n WHERE user_id IS NULL\n AND ($1::uuid IS NULL OR upstream_oauth_link_id > $1)\n AND upstream_oauth_link_id <= $2\n ORDER BY upstream_oauth_link_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_links\n USING to_delete\n WHERE upstream_oauth_links.upstream_oauth_link_id = to_delete.upstream_oauth_link_id\n RETURNING upstream_oauth_links.upstream_oauth_link_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "upstream_oauth_link_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Int8"
]
},
"nullable": [
false
]
},
"hash": "67b4a124ea3e12902dc4256cf95950508f7eb24f4c9d62b34815c8e8940e4676"
}

View File

@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH to_delete AS (\n SELECT upstream_oauth_authorization_session_id\n FROM upstream_oauth_authorization_sessions\n WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)\n AND upstream_oauth_authorization_session_id <= $2\n ORDER BY upstream_oauth_authorization_session_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_authorization_sessions\n USING to_delete\n WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id\n RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "upstream_oauth_authorization_session_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Int8"
]
},
"nullable": [
false
]
},
"hash": "da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35"
}

View File

@@ -0,0 +1,10 @@
-- no-transaction
-- Copyright 2026 Element Creations Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
-- Please see LICENSE files in the repository root for full details.
-- Add partial index for cleanup of orphaned upstream OAuth links
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_upstream_oauth_links_orphaned
ON upstream_oauth_links (upstream_oauth_link_id)
WHERE user_id IS NULL;

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -442,4 +443,54 @@ impl UpstreamOAuthLinkRepository for PgUpstreamOAuthLinkRepository<'_> {
Ok(())
}
#[tracing::instrument(
name = "db.upstream_oauth_link.cleanup_orphaned",
skip_all,
fields(
db.query.text,
since,
until,
limit,
),
err,
)]
async fn cleanup_orphaned(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error> {
// Use ULID cursor-based pagination for orphaned links only.
// We only delete links that have no user associated with them.
// `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side.
let res = sqlx::query_scalar!(
r#"
WITH to_delete AS (
SELECT upstream_oauth_link_id
FROM upstream_oauth_links
WHERE user_id IS NULL
AND ($1::uuid IS NULL OR upstream_oauth_link_id > $1)
AND upstream_oauth_link_id <= $2
ORDER BY upstream_oauth_link_id
LIMIT $3
)
DELETE FROM upstream_oauth_links
USING to_delete
WHERE upstream_oauth_links.upstream_oauth_link_id = to_delete.upstream_oauth_link_id
RETURNING upstream_oauth_links.upstream_oauth_link_id
"#,
since.map(Uuid::from),
Uuid::from(until),
i64::try_from(limit).unwrap_or(i64::MAX)
)
.traced()
.fetch_all(&mut *self.conn)
.await?;
let count = res.len();
let max_id = res.into_iter().max();
Ok((count, max_id.map(Ulid::from)))
}
}

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -564,4 +565,53 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> {
.try_into()
.map_err(DatabaseError::to_invalid_operation)
}
#[tracing::instrument(
name = "db.upstream_oauth_authorization_session.cleanup",
skip_all,
fields(
db.query.text,
since = since.map(tracing::field::display),
until = %until,
limit = limit,
),
err,
)]
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error> {
// Use ULID cursor-based pagination for pending sessions only.
// We only delete sessions that are not yet completed.
// `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side.
let res = sqlx::query_scalar!(
r#"
WITH to_delete AS (
SELECT upstream_oauth_authorization_session_id
FROM upstream_oauth_authorization_sessions
WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)
AND upstream_oauth_authorization_session_id <= $2
ORDER BY upstream_oauth_authorization_session_id
LIMIT $3
)
DELETE FROM upstream_oauth_authorization_sessions
USING to_delete
WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id
RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id
"#,
since.map(Uuid::from),
Uuid::from(until),
i64::try_from(limit).unwrap_or(i64::MAX)
)
.traced()
.fetch_all(&mut *self.conn)
.await?;
let count = res.len();
let max_id = res.into_iter().max();
Ok((count, max_id.map(Ulid::from)))
}
}

View File

@@ -398,6 +398,22 @@ impl InsertableJob for CleanupUserEmailAuthenticationsJob {
const QUEUE_NAME: &'static str = "cleanup-user-email-authentications";
}
/// Cleanup old pending upstream OAuth authorization sessions
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupUpstreamOAuthSessionsJob;
impl InsertableJob for CleanupUpstreamOAuthSessionsJob {
const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-sessions";
}
/// Cleanup orphaned upstream OAuth links
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupUpstreamOAuthLinksJob;
impl InsertableJob for CleanupUpstreamOAuthLinksJob {
const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-links";
}
/// Scheduled job to expire inactive sessions
///
/// This job will trigger jobs to expire inactive compat, oauth and user

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -216,6 +217,30 @@ pub trait UpstreamOAuthLinkRepository: Send + Sync {
clock: &dyn Clock,
upstream_oauth_link: UpstreamOAuthLink,
) -> Result<(), Self::Error>;
/// Cleanup orphaned upstream OAuth links
///
/// This will delete orphaned links (where `user_id IS NULL`) with IDs up to
/// and including `until`. Uses ULID cursor-based pagination for efficiency.
///
/// Returns the number of links deleted and the cursor for the next batch
///
/// # Parameters
///
/// * `since`: The cursor to start from (exclusive), or `None` to start from
/// the beginning
/// * `until`: The maximum ULID to delete (inclusive upper bound)
/// * `limit`: The maximum number of links to delete in this batch
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup_orphaned(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
}
repository_impl!(UpstreamOAuthLinkRepository:
@@ -251,4 +276,11 @@ repository_impl!(UpstreamOAuthLinkRepository:
async fn count(&mut self, filter: UpstreamOAuthLinkFilter<'_>) -> Result<usize, Self::Error>;
async fn remove(&mut self, clock: &dyn Clock, upstream_oauth_link: UpstreamOAuthLink) -> Result<(), Self::Error>;
async fn cleanup_orphaned(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
);

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -205,6 +206,29 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync {
/// Returns [`Self::Error`] if the underlying repository fails
async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>)
-> Result<usize, Self::Error>;
/// Cleanup old authorization sessions
///
/// This will delete sessions with IDs up to and including `until`.
///
/// Returns the number of sessions deleted and the cursor for the next batch
///
/// # Parameters
///
/// * `since`: The cursor to start from (exclusive), or `None` to start from
/// the beginning
/// * `until`: The maximum ULID to delete (inclusive upper bound)
/// * `limit`: The maximum number of sessions to delete in this batch
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
}
repository_impl!(UpstreamOAuthSessionRepository:
@@ -247,4 +271,11 @@ repository_impl!(UpstreamOAuthSessionRepository:
) -> Result<Page<UpstreamOAuthAuthorizationSession>, Self::Error>;
async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>) -> Result<usize, Self::Error>;
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
);

View File

@@ -14,7 +14,8 @@ use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob,
CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob,
CleanupRevokedOAuthRefreshTokensJob, CleanupUserEmailAuthenticationsJob,
CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob,
CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob,
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
};
use tracing::{debug, info};
@@ -327,6 +328,94 @@ impl RunnableJob for CleanupUserEmailAuthenticationsJob {
}
}
#[async_trait]
impl RunnableJob for CleanupUpstreamOAuthSessionsJob {
#[tracing::instrument(name = "job.cleanup_upstream_oauth_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove pending upstream OAuth authorization sessions after 7 days.
let until = state.clock.now() - chrono::Duration::days(7);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_session()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no pending upstream OAuth sessions to clean up");
} else {
info!(count = total, "cleaned up pending upstream OAuth sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUpstreamOAuthLinksJob {
#[tracing::instrument(name = "job.cleanup_upstream_oauth_links", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove orphaned upstream OAuth links after 7 days.
let until = state.clock.now() - chrono::Duration::days(7);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_link()
.cleanup_orphaned(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no orphaned upstream OAuth links to clean up");
} else {
info!(count = total, "cleaned up orphaned upstream OAuth links");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserRegistrationsJob {
#[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)]

View File

@@ -139,6 +139,8 @@ pub async fn init(
.register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
.register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
.register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
.register_handler::<mas_storage::queue::CleanupUpstreamOAuthSessionsJob>()
.register_handler::<mas_storage::queue::CleanupUpstreamOAuthLinksJob>()
.register_handler::<mas_storage::queue::DeactivateUserJob>()
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
@@ -208,6 +210,18 @@ pub async fn init(
"0 57 * * * *".parse()?,
mas_storage::queue::CleanupUserEmailAuthenticationsJob,
)
.add_schedule(
"cleanup-upstream-oauth-sessions",
// Run this job every hour
"0 58 * * * *".parse()?,
mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
)
.add_schedule(
"cleanup-upstream-oauth-links",
// Run this job every hour
"0 59 * * * *".parse()?,
mas_storage::queue::CleanupUpstreamOAuthLinksJob,
)
.add_schedule(
"cleanup-expired-oauth-access-tokens",
// Run this job every 4 hours