Implement cleanup job for email authentications

Add scheduled cleanup job that removes old user email authentications
after 7 days. Runs every hour.
This commit is contained in:
Quentin Gliech
2026-01-16 13:26:18 +01:00
parent e6e793f46f
commit f350b94918
6 changed files with 186 additions and 2 deletions

View File

@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH\n to_delete AS (\n SELECT user_email_authentication_id\n FROM user_email_authentications\n WHERE ($1::uuid IS NULL OR user_email_authentication_id > $1)\n AND user_email_authentication_id <= $2\n ORDER BY user_email_authentication_id\n LIMIT $3\n ),\n deleted_codes AS (\n DELETE FROM user_email_authentication_codes\n USING to_delete\n WHERE user_email_authentication_codes.user_email_authentication_id = to_delete.user_email_authentication_id\n RETURNING user_email_authentication_codes.user_email_authentication_code_id\n )\n DELETE FROM user_email_authentications\n USING to_delete\n WHERE user_email_authentications.user_email_authentication_id = to_delete.user_email_authentication_id\n RETURNING user_email_authentications.user_email_authentication_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "user_email_authentication_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Int8"
]
},
"nullable": [
false
]
},
"hash": "e02ea83d195cb58fa8525e66a6ac1dddae3f1dfb1ef48494f6aee3fd03abe6f6"
}

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -757,4 +758,60 @@ impl UserEmailRepository for PgUserEmailRepository<'_> {
user_email_authentication.completed_at = Some(completed_at);
Ok(user_email_authentication)
}
#[tracing::instrument(
name = "db.user_email.cleanup_authentications",
skip_all,
fields(
db.query.text,
since = since.map(tracing::field::display),
until = %until,
limit = limit,
),
err,
)]
async fn cleanup_authentications(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error> {
// Use ULID cursor-based pagination. Since ULIDs contain a timestamp,
// we can efficiently delete old authentications without needing an index.
// `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side.
let res = sqlx::query_scalar!(
r#"
WITH
to_delete AS (
SELECT user_email_authentication_id
FROM user_email_authentications
WHERE ($1::uuid IS NULL OR user_email_authentication_id > $1)
AND user_email_authentication_id <= $2
ORDER BY user_email_authentication_id
LIMIT $3
),
deleted_codes AS (
DELETE FROM user_email_authentication_codes
USING to_delete
WHERE user_email_authentication_codes.user_email_authentication_id = to_delete.user_email_authentication_id
RETURNING user_email_authentication_codes.user_email_authentication_code_id
)
DELETE FROM user_email_authentications
USING to_delete
WHERE user_email_authentications.user_email_authentication_id = to_delete.user_email_authentication_id
RETURNING user_email_authentications.user_email_authentication_id
"#,
since.map(Uuid::from),
Uuid::from(until),
i64::try_from(limit).unwrap_or(i64::MAX)
)
.traced()
.fetch_all(&mut *self.conn)
.await?;
let count = res.len();
let max_id = res.into_iter().max();
Ok((count, max_id.map(Ulid::from)))
}
}

View File

@@ -390,6 +390,14 @@ impl InsertableJob for CleanupUserRecoverySessionsJob {
const QUEUE_NAME: &'static str = "cleanup-user-recovery-sessions";
}
/// Cleanup old user email authentications
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupUserEmailAuthenticationsJob;
impl InsertableJob for CleanupUserEmailAuthenticationsJob {
const QUEUE_NAME: &'static str = "cleanup-user-email-authentications";
}
/// Scheduled job to expire inactive sessions
///
/// This job will trigger jobs to expire inactive compat, oauth and user

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
//
@@ -334,6 +335,32 @@ pub trait UserEmailRepository: Send + Sync {
authentication: UserEmailAuthentication,
upstream_oauth_authorization_session: &UpstreamOAuthAuthorizationSession,
) -> Result<UserEmailAuthentication, Self::Error>;
/// Cleanup old email authentications
///
/// This will delete email authentications with IDs up to and including
/// `until`. Uses ULID cursor-based pagination for efficiency.
/// Authentication codes will cascade-delete automatically.
///
/// Returns the number of authentications deleted and the cursor for the
/// next batch
///
/// # Parameters
///
/// * `since`: The cursor to start from (exclusive), or `None` to start from
/// the beginning
/// * `until`: The maximum ULID to delete (inclusive upper bound)
/// * `limit`: The maximum number of authentications to delete in this batch
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup_authentications(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
}
repository_impl!(UserEmailRepository:
@@ -409,4 +436,11 @@ repository_impl!(UserEmailRepository:
authentication: UserEmailAuthentication,
upstream_oauth_authorization_session: &UpstreamOAuthAuthorizationSession,
) -> Result<UserEmailAuthentication, Self::Error>;
async fn cleanup_authentications(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
);

View File

@@ -14,8 +14,8 @@ use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob,
CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob,
CleanupRevokedOAuthRefreshTokensJob, CleanupUserRecoverySessionsJob,
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
CleanupRevokedOAuthRefreshTokensJob, CleanupUserEmailAuthenticationsJob,
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
};
use tracing::{debug, info};
use ulid::Ulid;
@@ -273,6 +273,60 @@ impl RunnableJob for CleanupUserRecoverySessionsJob {
}
}
#[async_trait]
impl RunnableJob for CleanupUserEmailAuthenticationsJob {
#[tracing::instrument(name = "job.cleanup_user_email_authentications", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove email authentications after 7 days. They are in practice only
// valid for a short time (codes expire after 10 minutes), but keeping
// them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted authentications, and the greatest ULID
// processed
let (count, cursor) = repo
.user_email()
.cleanup_authentications(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user email authentications to clean up");
} else {
info!(count = total, "cleaned up user email authentications");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserRegistrationsJob {
#[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)]

View File

@@ -138,6 +138,7 @@ pub async fn init(
.register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
.register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
.register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
.register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
.register_handler::<mas_storage::queue::DeactivateUserJob>()
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
@@ -201,6 +202,12 @@ pub async fn init(
"0 56 * * * *".parse()?,
mas_storage::queue::CleanupUserRecoverySessionsJob,
)
.add_schedule(
"cleanup-user-email-authentications",
// Run this job every hour
"0 57 * * * *".parse()?,
mas_storage::queue::CleanupUserEmailAuthenticationsJob,
)
.add_schedule(
"cleanup-expired-oauth-access-tokens",
// Run this job every 4 hours