Implement cleanup job for OAuth2 device code grants

Add cleanup job that removes device code grants older than 7 days.
Uses ULID cursor-based pagination for efficiency.

- Add cleanup method to OAuth2DeviceCodeGrantRepository
- Add CleanupOAuthDeviceCodeGrantsJob task
- Register handler and schedule to run hourly
This commit is contained in:
Quentin Gliech
2026-01-16 12:32:17 +01:00
parent fc07a32a8c
commit 67a0d0e92e
6 changed files with 177 additions and 2 deletions

View File

@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH to_delete AS (\n SELECT oauth2_device_code_grant_id\n FROM oauth2_device_code_grant\n WHERE ($1::uuid IS NULL OR oauth2_device_code_grant_id > $1)\n AND oauth2_device_code_grant_id <= $2\n ORDER BY oauth2_device_code_grant_id\n LIMIT $3\n )\n DELETE FROM oauth2_device_code_grant\n USING to_delete\n WHERE oauth2_device_code_grant.oauth2_device_code_grant_id = to_delete.oauth2_device_code_grant_id\n RETURNING oauth2_device_code_grant.oauth2_device_code_grant_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "oauth2_device_code_grant_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Int8"
]
},
"nullable": [
false
]
},
"hash": "45d7e962d91fcdcf8284d81d04bc0737c0d20799b497089a566e2ff704d56b67"
}

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
//
@@ -469,4 +470,54 @@ impl OAuth2DeviceCodeGrantRepository for PgOAuth2DeviceCodeGrantRepository<'_> {
Ok(device_code_grant)
}
#[tracing::instrument(
name = "db.oauth2_device_code_grant.cleanup",
skip_all,
fields(
db.query.text,
since = since.map(tracing::field::display),
until = %until,
limit = limit,
),
err,
)]
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error> {
// `MAX(uuid)` isn't a thing in Postgres, so we can't just re-select the
// deleted rows and do a MAX on the `oauth2_device_code_grant_id`.
// Instead, we do the aggregation on the client side, which is a little
// less efficient, but good enough.
let res = sqlx::query_scalar!(
r#"
WITH to_delete AS (
SELECT oauth2_device_code_grant_id
FROM oauth2_device_code_grant
WHERE ($1::uuid IS NULL OR oauth2_device_code_grant_id > $1)
AND oauth2_device_code_grant_id <= $2
ORDER BY oauth2_device_code_grant_id
LIMIT $3
)
DELETE FROM oauth2_device_code_grant
USING to_delete
WHERE oauth2_device_code_grant.oauth2_device_code_grant_id = to_delete.oauth2_device_code_grant_id
RETURNING oauth2_device_code_grant.oauth2_device_code_grant_id
"#,
since.map(Uuid::from),
Uuid::from(until),
i64::try_from(limit).unwrap_or(i64::MAX)
)
.traced()
.fetch_all(&mut *self.conn)
.await?;
let count = res.len();
let max_id = res.into_iter().max();
Ok((count, max_id.map(Ulid::from)))
}
}

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
//
@@ -183,6 +184,30 @@ pub trait OAuth2DeviceCodeGrantRepository: Send + Sync {
device_code_grant: DeviceCodeGrant,
session: &Session,
) -> Result<DeviceCodeGrant, Self::Error>;
/// Cleanup old device code grants
///
/// This will delete device code grants that were created before `until`.
/// Uses ULID cursor-based pagination for efficiency.
///
/// Returns the number of grants deleted and the cursor for the next batch
///
/// # Parameters
///
/// * `since`: The cursor to start from (exclusive), or `None` to start from
/// the beginning
/// * `until`: The ULID threshold representing 7 days ago
/// * `limit`: The maximum number of grants to delete in this batch
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
}
repository_impl!(OAuth2DeviceCodeGrantRepository:
@@ -225,4 +250,11 @@ repository_impl!(OAuth2DeviceCodeGrantRepository:
device_code_grant: DeviceCodeGrant,
session: &Session,
) -> Result<DeviceCodeGrant, Self::Error>;
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
);

View File

@@ -374,6 +374,14 @@ impl InsertableJob for CleanupOAuthAuthorizationGrantsJob {
const QUEUE_NAME: &'static str = "cleanup-oauth-authorization-grants";
}
/// Cleanup old OAuth 2.0 device code grants
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupOAuthDeviceCodeGrantsJob;
impl InsertableJob for CleanupOAuthDeviceCodeGrantsJob {
const QUEUE_NAME: &'static str = "cleanup-oauth-device-code-grants";
}
/// Scheduled job to expire inactive sessions
///
/// This job will trigger jobs to expire inactive compat, oauth and user

View File

@@ -13,8 +13,8 @@ use async_trait::async_trait;
use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob,
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob,
CleanupRevokedOAuthRefreshTokensJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
};
use tracing::{debug, info};
use ulid::Ulid;
@@ -373,6 +373,59 @@ impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
}
}
#[async_trait]
impl RunnableJob for CleanupOAuthDeviceCodeGrantsJob {
#[tracing::instrument(name = "job.cleanup_oauth_device_code_grants", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove device code grants after 7 days. They are in practice only
// valid for a short time, but keeping them around helps investigate abuse
// patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted grants, and the greatest ULID processed
let (count, cursor) = repo
.oauth2_device_code_grant()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no device code grants to clean up");
} else {
info!(count = total, "cleaned up device code grants");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for PruneStalePolicyDataJob {
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]

View File

@@ -136,6 +136,7 @@ pub async fn init(
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
.register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
.register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
.register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
.register_handler::<mas_storage::queue::DeactivateUserJob>()
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
@@ -187,6 +188,12 @@ pub async fn init(
"0 50 * * * *".parse()?,
mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
)
.add_schedule(
"cleanup-oauth-device-code-grants",
// Run this job every hour
"0 55 * * * *".parse()?,
mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob,
)
.add_schedule(
"cleanup-expired-oauth-access-tokens",
// Run this job every 4 hours