Implement cleanup job for OAuth2 authorization grants
Add cleanup job that removes authorization grants older than 7 days. Uses ULID cursor-based pagination for efficiency. - Add cleanup method to OAuth2AuthorizationGrantRepository trait - Add CleanupOAuthAuthorizationGrantsJob task - Register handler and schedule to run hourly
This commit is contained in:
24
crates/storage-pg/.sqlx/query-d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d.json
generated
Normal file
24
crates/storage-pg/.sqlx/query-d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d.json
generated
Normal file
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_delete AS (\n SELECT oauth2_authorization_grant_id\n FROM oauth2_authorization_grants\n WHERE ($1::uuid IS NULL OR oauth2_authorization_grant_id > $1)\n AND oauth2_authorization_grant_id <= $2\n ORDER BY oauth2_authorization_grant_id\n LIMIT $3\n )\n DELETE FROM oauth2_authorization_grants\n USING to_delete\n WHERE oauth2_authorization_grants.oauth2_authorization_grant_id = to_delete.oauth2_authorization_grant_id\n RETURNING oauth2_authorization_grants.oauth2_authorization_grant_id\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "oauth2_authorization_grant_id",
|
||||
"type_info": "Uuid"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Uuid",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "d4bc51c30f1119ea9d117fb565ec554d63c8773040679a77e99ac3fa24cec71d"
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
@@ -448,4 +449,54 @@ impl OAuth2AuthorizationGrantRepository for PgOAuth2AuthorizationGrantRepository
|
||||
|
||||
Ok(grant)
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.oauth2_authorization_grant.cleanup",
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
until = %until,
|
||||
limit = limit,
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn cleanup(
|
||||
&mut self,
|
||||
since: Option<Ulid>,
|
||||
until: Ulid,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<Ulid>), Self::Error> {
|
||||
// `MAX(uuid)` isn't a thing in Postgres, so we can't just re-select the
|
||||
// deleted rows and do a MAX on the `oauth2_authorization_grant_id`.
|
||||
// Instead, we do the aggregation on the client side, which is a little
|
||||
// less efficient, but good enough.
|
||||
let res = sqlx::query_scalar!(
|
||||
r#"
|
||||
WITH to_delete AS (
|
||||
SELECT oauth2_authorization_grant_id
|
||||
FROM oauth2_authorization_grants
|
||||
WHERE ($1::uuid IS NULL OR oauth2_authorization_grant_id > $1)
|
||||
AND oauth2_authorization_grant_id <= $2
|
||||
ORDER BY oauth2_authorization_grant_id
|
||||
LIMIT $3
|
||||
)
|
||||
DELETE FROM oauth2_authorization_grants
|
||||
USING to_delete
|
||||
WHERE oauth2_authorization_grants.oauth2_authorization_grant_id = to_delete.oauth2_authorization_grant_id
|
||||
RETURNING oauth2_authorization_grants.oauth2_authorization_grant_id
|
||||
"#,
|
||||
since.map(Uuid::from),
|
||||
Uuid::from(until),
|
||||
i64::try_from(limit).unwrap_or(i64::MAX)
|
||||
)
|
||||
.traced()
|
||||
.fetch_all(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
let count = res.len();
|
||||
let max_id = res.into_iter().max();
|
||||
|
||||
Ok((count, max_id.map(Ulid::from)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
@@ -127,6 +128,30 @@ pub trait OAuth2AuthorizationGrantRepository: Send + Sync {
|
||||
clock: &dyn Clock,
|
||||
authorization_grant: AuthorizationGrant,
|
||||
) -> Result<AuthorizationGrant, Self::Error>;
|
||||
|
||||
/// Cleanup old authorization grants
|
||||
///
|
||||
/// This will delete authorization grants with IDs up to and including
|
||||
/// `until`. Uses ULID cursor-based pagination for efficiency.
|
||||
///
|
||||
/// Returns the number of grants deleted and the cursor for the next batch
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: The cursor to start from (exclusive), or `None` to start from
|
||||
/// the beginning
|
||||
/// * `until`: The maximum ULID to delete (inclusive upper bound)
|
||||
/// * `limit`: The maximum number of grants to delete in this batch
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup(
|
||||
&mut self,
|
||||
since: Option<Ulid>,
|
||||
until: Ulid,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<Ulid>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(OAuth2AuthorizationGrantRepository:
|
||||
@@ -163,4 +188,11 @@ repository_impl!(OAuth2AuthorizationGrantRepository:
|
||||
clock: &dyn Clock,
|
||||
authorization_grant: AuthorizationGrant,
|
||||
) -> Result<AuthorizationGrant, Self::Error>;
|
||||
|
||||
async fn cleanup(
|
||||
&mut self,
|
||||
since: Option<Ulid>,
|
||||
until: Ulid,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<Ulid>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -366,6 +366,14 @@ impl InsertableJob for CleanupFinishedCompatSessionsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions";
|
||||
}
|
||||
|
||||
/// Cleanup old OAuth 2.0 authorization grants
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupOAuthAuthorizationGrantsJob;
|
||||
|
||||
impl InsertableJob for CleanupOAuthAuthorizationGrantsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-oauth-authorization-grants";
|
||||
}
|
||||
|
||||
/// Scheduled job to expire inactive sessions
|
||||
///
|
||||
/// This job will trigger jobs to expire inactive compat, oauth and user
|
||||
|
||||
@@ -12,8 +12,9 @@ use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use mas_storage::queue::{
|
||||
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
||||
CleanupFinishedCompatSessionsJob, CleanupRevokedOAuthAccessTokensJob,
|
||||
CleanupRevokedOAuthRefreshTokensJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob,
|
||||
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
|
||||
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use ulid::Ulid;
|
||||
@@ -319,6 +320,59 @@ impl RunnableJob for CleanupFinishedCompatSessionsJob {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)]
|
||||
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||
// Remove authorization grants after 7 days. They are in practice only
|
||||
// valid for a short time, but keeping them around helps investigate abuse
|
||||
// patterns.
|
||||
let until = state.clock.now() - chrono::Duration::days(7);
|
||||
// We use the fact that ULIDs include the creation time in their first 48 bits
|
||||
// as a cursor
|
||||
let until = Ulid::from_parts(
|
||||
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
|
||||
u128::MAX,
|
||||
);
|
||||
let mut total = 0;
|
||||
|
||||
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
|
||||
// this is a scheduled job and it will end up being rescheduled later anyway.
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
// This returns the number of deleted grants, and the greatest ULID processed
|
||||
let (count, cursor) = repo
|
||||
.oauth2_authorization_grant()
|
||||
.cleanup(since, until, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
since = cursor;
|
||||
total += count;
|
||||
|
||||
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
|
||||
// there might be more to delete
|
||||
if count != BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
debug!("no authorization grants to clean up");
|
||||
} else {
|
||||
info!(count = total, "cleaned up authorization grants");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn timeout(&self) -> Option<Duration> {
|
||||
// This job runs every hour, so having it running it for 10 minutes is fine
|
||||
Some(Duration::from_secs(10 * 60))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for PruneStalePolicyDataJob {
|
||||
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]
|
||||
|
||||
@@ -135,6 +135,7 @@ pub async fn init(
|
||||
.register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
|
||||
.register_handler::<mas_storage::queue::DeactivateUserJob>()
|
||||
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
|
||||
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
|
||||
@@ -180,6 +181,12 @@ pub async fn init(
|
||||
"0 40 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupFinishedCompatSessionsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-oauth-authorization-grants",
|
||||
// Run this job every hour
|
||||
"0 50 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-expired-oauth-access-tokens",
|
||||
// Run this job every 4 hours
|
||||
|
||||
Reference in New Issue
Block a user