Cleanup revoked refresh tokens
This commit is contained in:
30
crates/storage-pg/.sqlx/query-31e8bf68ff70a436fd0b6787ac8e2777f9327708b450d048638a162343478cc6.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-31e8bf68ff70a436fd0b6787ac8e2777f9327708b450d048638a162343478cc6.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH\n to_delete AS (\n SELECT oauth2_refresh_token_id\n FROM oauth2_refresh_tokens\n WHERE revoked_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR revoked_at >= $1::timestamptz)\n AND revoked_at < $2::timestamptz\n ORDER BY revoked_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n\n deleted AS (\n DELETE FROM oauth2_refresh_tokens\n USING to_delete\n WHERE oauth2_refresh_tokens.oauth2_refresh_token_id = to_delete.oauth2_refresh_token_id\n RETURNING oauth2_refresh_tokens.revoked_at\n )\n\n SELECT\n COUNT(*) as \"count!\",\n MAX(revoked_at) as last_revoked_at\n FROM deleted\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "last_revoked_at",
|
||||
"type_info": "Timestamptz"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "31e8bf68ff70a436fd0b6787ac8e2777f9327708b450d048638a162343478cc6"
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
-- no-transaction
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE in the repository root for full details.
|
||||
|
||||
-- This adds an index on the revoked_at field on oauth2_refresh_tokens to speed up cleaning them up
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS oauth_refresh_tokens_revoked_at_idx
|
||||
ON oauth2_refresh_tokens (revoked_at) WHERE revoked_at IS NOT NULL;
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
@@ -281,4 +282,58 @@ impl OAuth2RefreshTokenRepository for PgOAuth2RefreshTokenRepository<'_> {
|
||||
.revoke(revoked_at)
|
||||
.map_err(DatabaseError::to_invalid_operation)
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.oauth2_refresh_token.cleanup_revoked",
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn cleanup_revoked(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
until: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
WITH
|
||||
to_delete AS (
|
||||
SELECT oauth2_refresh_token_id
|
||||
FROM oauth2_refresh_tokens
|
||||
WHERE revoked_at IS NOT NULL
|
||||
AND ($1::timestamptz IS NULL OR revoked_at >= $1::timestamptz)
|
||||
AND revoked_at < $2::timestamptz
|
||||
ORDER BY revoked_at ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE
|
||||
),
|
||||
|
||||
deleted AS (
|
||||
DELETE FROM oauth2_refresh_tokens
|
||||
USING to_delete
|
||||
WHERE oauth2_refresh_tokens.oauth2_refresh_token_id = to_delete.oauth2_refresh_token_id
|
||||
RETURNING oauth2_refresh_tokens.revoked_at
|
||||
)
|
||||
|
||||
SELECT
|
||||
COUNT(*) as "count!",
|
||||
MAX(revoked_at) as last_revoked_at
|
||||
FROM deleted
|
||||
"#,
|
||||
since,
|
||||
until,
|
||||
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||
)
|
||||
.traced()
|
||||
.fetch_one(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
res.count.try_into().unwrap_or(usize::MAX),
|
||||
res.last_revoked_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
@@ -111,6 +112,27 @@ pub trait OAuth2RefreshTokenRepository: Send + Sync {
|
||||
clock: &dyn Clock,
|
||||
refresh_token: RefreshToken,
|
||||
) -> Result<RefreshToken, Self::Error>;
|
||||
|
||||
/// Cleanup revoked refresh tokens that were revoked before a certain time
|
||||
///
|
||||
/// Returns the number of deleted tokens and the last `revoked_at` timestamp
|
||||
/// processed
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: An optional timestamp to start from
|
||||
/// * `until`: The timestamp before which to revoke tokens
|
||||
/// * `limit`: The maximum number of tokens to revoke
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup_revoked(
|
||||
&mut self,
|
||||
since: Option<chrono::DateTime<chrono::Utc>>,
|
||||
until: chrono::DateTime<chrono::Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<chrono::DateTime<chrono::Utc>>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(OAuth2RefreshTokenRepository:
|
||||
@@ -142,4 +164,11 @@ repository_impl!(OAuth2RefreshTokenRepository:
|
||||
clock: &dyn Clock,
|
||||
refresh_token: RefreshToken,
|
||||
) -> Result<RefreshToken, Self::Error>;
|
||||
|
||||
async fn cleanup_revoked(
|
||||
&mut self,
|
||||
since: Option<chrono::DateTime<chrono::Utc>>,
|
||||
until: chrono::DateTime<chrono::Utc>,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<chrono::DateTime<chrono::Utc>>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -334,6 +334,14 @@ impl InsertableJob for CleanupExpiredOAuthAccessTokensJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-expired-oauth-access-tokens";
|
||||
}
|
||||
|
||||
/// Cleanup revoked OAuth 2.0 refresh tokens
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupRevokedOAuthRefreshTokensJob;
|
||||
|
||||
impl InsertableJob for CleanupRevokedOAuthRefreshTokensJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-revoked-oauth-refresh-tokens";
|
||||
}
|
||||
|
||||
/// Scheduled job to expire inactive sessions
|
||||
///
|
||||
/// This job will trigger jobs to expire inactive compat, oauth and user
|
||||
|
||||
@@ -11,7 +11,8 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use mas_storage::queue::{
|
||||
CleanupExpiredOAuthAccessTokensJob, CleanupRevokedOAuthAccessTokensJob, PruneStalePolicyDataJob,
|
||||
CleanupExpiredOAuthAccessTokensJob, CleanupRevokedOAuthAccessTokensJob,
|
||||
CleanupRevokedOAuthRefreshTokensJob, PruneStalePolicyDataJob,
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
|
||||
@@ -120,6 +121,52 @@ impl RunnableJob for CleanupExpiredOAuthAccessTokensJob {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupRevokedOAuthRefreshTokensJob {
|
||||
#[tracing::instrument(name = "job.cleanup_revoked_oauth_refresh_tokens", skip_all)]
|
||||
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||
// Cleanup tokens that were revoked more than an hour ago
|
||||
let until = state.clock.now() - chrono::Duration::hours(1);
|
||||
let mut total = 0;
|
||||
|
||||
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
|
||||
// this is a scheduled job and it will end up being rescheduled later anyway.
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
|
||||
// This returns the number of deleted tokens, and the last revoked_at timestamp
|
||||
let (count, last_revoked_at) = repo
|
||||
.oauth2_refresh_token()
|
||||
.cleanup_revoked(since, until, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
since = last_revoked_at;
|
||||
total += count;
|
||||
|
||||
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
|
||||
// there might be more to delete
|
||||
if count != BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
debug!("no token to clean up");
|
||||
} else {
|
||||
info!(count = total, "cleaned up revoked tokens");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn timeout(&self) -> Option<Duration> {
|
||||
Some(Duration::from_secs(60))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for PruneStalePolicyDataJob {
|
||||
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]
|
||||
|
||||
@@ -131,6 +131,7 @@ pub async fn init(
|
||||
worker
|
||||
.register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
|
||||
.register_handler::<mas_storage::queue::DeactivateUserJob>()
|
||||
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
|
||||
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
|
||||
@@ -152,6 +153,12 @@ pub async fn init(
|
||||
"0 0 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-revoked-oauth-refresh-tokens",
|
||||
// Run this job every hour
|
||||
"0 10 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-expired-oauth-access-tokens",
|
||||
// Run this job every 4 hours
|
||||
|
||||
Reference in New Issue
Block a user