From a059f32f167461e4ea49074bbbef3d65d4b36f90 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 30 Jan 2026 06:28:05 +0100 Subject: [PATCH] Split the cleanup jobs into multiple files --- crates/tasks/src/cleanup/misc.rs | 88 +++ crates/tasks/src/cleanup/mod.rs | 24 + crates/tasks/src/cleanup/oauth.rs | 216 +++++++ crates/tasks/src/cleanup/sessions.rs | 290 +++++++++ crates/tasks/src/cleanup/tokens.rs | 214 +++++++ crates/tasks/src/cleanup/user.rs | 181 ++++++ crates/tasks/src/database.rs | 916 --------------------------- crates/tasks/src/lib.rs | 125 ++-- docs/development/cleanup-jobs.md | 18 +- 9 files changed, 1093 insertions(+), 979 deletions(-) create mode 100644 crates/tasks/src/cleanup/misc.rs create mode 100644 crates/tasks/src/cleanup/mod.rs create mode 100644 crates/tasks/src/cleanup/oauth.rs create mode 100644 crates/tasks/src/cleanup/sessions.rs create mode 100644 crates/tasks/src/cleanup/tokens.rs create mode 100644 crates/tasks/src/cleanup/user.rs delete mode 100644 crates/tasks/src/database.rs diff --git a/crates/tasks/src/cleanup/misc.rs b/crates/tasks/src/cleanup/misc.rs new file mode 100644 index 000000000..52fd62e5c --- /dev/null +++ b/crates/tasks/src/cleanup/misc.rs @@ -0,0 +1,88 @@ +// Copyright 2026 Element Creations Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! Miscellaneous cleanup tasks + +use std::time::Duration; + +use async_trait::async_trait; +use mas_storage::queue::{CleanupQueueJobsJob, PruneStalePolicyDataJob}; +use tracing::{debug, info}; +use ulid::Ulid; + +use super::BATCH_SIZE; +use crate::{ + State, + new_queue::{JobContext, JobError, RunnableJob}, +}; + +#[async_trait] +impl RunnableJob for CleanupQueueJobsJob { + #[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove completed and failed queue jobs after 30 days. + // Keep them for debugging purposes. + let until = state.clock.now() - chrono::Duration::days(30); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .queue_job() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no queue jobs to clean up"); + } else { + info!(count = total, "cleaned up queue jobs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for PruneStalePolicyDataJob { + #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)] + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // Keep the last 10 policy data + let count = repo + .policy_data() + .prune(10) + .await + .map_err(JobError::retry)?; + + repo.save().await.map_err(JobError::retry)?; + + if count == 0 { + debug!("no stale policy data to prune"); + } else { + info!(count, "pruned stale policy data"); + } + + Ok(()) + } +} diff --git a/crates/tasks/src/cleanup/mod.rs b/crates/tasks/src/cleanup/mod.rs new file mode 100644 index 000000000..02ace1ded --- /dev/null +++ b/crates/tasks/src/cleanup/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2026 Element Creations Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! Database cleanup tasks +//! +//! This module contains tasks for cleaning up old data from the database. +//! Tasks are grouped by domain: +//! +//! - [`tokens`]: OAuth token cleanup (access and refresh tokens) +//! - [`sessions`]: Session cleanup (compat, `OAuth2`, user sessions and their +//! IPs) +//! - [`oauth`]: OAuth grants and upstream OAuth cleanup +//! - [`user`]: User-related cleanup (registrations, recovery, email auth) +//! - [`misc`]: Miscellaneous cleanup (queue jobs, policy data) + +mod misc; +mod oauth; +mod sessions; +mod tokens; +mod user; + +pub(crate) const BATCH_SIZE: usize = 1000; diff --git a/crates/tasks/src/cleanup/oauth.rs b/crates/tasks/src/cleanup/oauth.rs new file mode 100644 index 000000000..2a201d4df --- /dev/null +++ b/crates/tasks/src/cleanup/oauth.rs @@ -0,0 +1,216 @@ +// Copyright 2026 Element Creations Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! OAuth grants and upstream OAuth cleanup tasks + +use std::time::Duration; + +use async_trait::async_trait; +use mas_storage::queue::{ + CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, + CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, +}; +use tracing::{debug, info}; +use ulid::Ulid; + +use super::BATCH_SIZE; +use crate::{ + State, + new_queue::{JobContext, JobError, RunnableJob}, +}; + +#[async_trait] +impl RunnableJob for CleanupOAuthAuthorizationGrantsJob { + #[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove authorization grants after 7 days. They are in practice only + // valid for a short time, but keeping them around helps investigate abuse + // patterns. + let until = state.clock.now() - chrono::Duration::days(7); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted grants, and the greatest ULID processed + let (count, cursor) = repo + .oauth2_authorization_grant() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no authorization grants to clean up"); + } else { + info!(count = total, "cleaned up authorization grants"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupOAuthDeviceCodeGrantsJob { + #[tracing::instrument(name = "job.cleanup_oauth_device_code_grants", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove device code grants after 7 days. They are in practice only + // valid for a short time, but keeping them around helps investigate abuse + // patterns. + let until = state.clock.now() - chrono::Duration::days(7); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted grants, and the greatest ULID processed + let (count, cursor) = repo + .oauth2_device_code_grant() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no device code grants to clean up"); + } else { + info!(count = total, "cleaned up device code grants"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupUpstreamOAuthSessionsJob { + #[tracing::instrument(name = "job.cleanup_upstream_oauth_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove pending upstream OAuth authorization sessions after 7 days. + let until = state.clock.now() - chrono::Duration::days(7); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .upstream_oauth_session() + .cleanup_orphaned(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no pending upstream OAuth sessions to clean up"); + } else { + info!(count = total, "cleaned up pending upstream OAuth sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupUpstreamOAuthLinksJob { + #[tracing::instrument(name = "job.cleanup_upstream_oauth_links", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove orphaned upstream OAuth links after 7 days. + let until = state.clock.now() - chrono::Duration::days(7); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .upstream_oauth_link() + .cleanup_orphaned(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no orphaned upstream OAuth links to clean up"); + } else { + info!(count = total, "cleaned up orphaned upstream OAuth links"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} diff --git a/crates/tasks/src/cleanup/sessions.rs b/crates/tasks/src/cleanup/sessions.rs new file mode 100644 index 000000000..0a11a6b99 --- /dev/null +++ b/crates/tasks/src/cleanup/sessions.rs @@ -0,0 +1,290 @@ +// Copyright 2026 Element Creations Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! Session cleanup tasks + +use std::time::Duration; + +use async_trait::async_trait; +use mas_storage::queue::{ + CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob, + CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob, + CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob, +}; +use tracing::{debug, info}; + +use super::BATCH_SIZE; +use crate::{ + State, + new_queue::{JobContext, JobError, RunnableJob}, +}; + +#[async_trait] +impl RunnableJob for CleanupFinishedCompatSessionsJob { + #[tracing::instrument(name = "job.cleanup_finished_compat_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup compat sessions that were finished more than 30 days ago + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted sessions, and the last finished_at + // timestamp + let (count, last_finished_at) = repo + .compat_session() + .cleanup_finished(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_finished_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no finished compat sessions to clean up"); + } else { + info!(count = total, "cleaned up finished compat sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupFinishedOAuth2SessionsJob { + #[tracing::instrument(name = "job.cleanup_finished_oauth2_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup OAuth2 sessions that were finished more than 30 days ago + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted sessions, and the last finished_at + // timestamp + let (count, last_finished_at) = repo + .oauth2_session() + .cleanup_finished(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_finished_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no finished OAuth2 sessions to clean up"); + } else { + info!(count = total, "cleaned up finished OAuth2 sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupFinishedUserSessionsJob { + #[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup user/browser sessions that were finished more than 30 days ago + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted sessions, and the last finished_at + // timestamp. Only deletes sessions that have no child sessions + // (compat_sessions or oauth2_sessions). + let (count, last_finished_at) = repo + .browser_session() + .cleanup_finished(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_finished_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no finished user sessions to clean up"); + } else { + info!(count = total, "cleaned up finished user sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_active_at) = repo + .oauth2_session() + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_active_at; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no OAuth2 session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive OAuth2 session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveCompatSessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_active_at) = repo + .compat_session() + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_active_at; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no compat session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive compat session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveUserSessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_active_at) = repo + .browser_session() + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_active_at; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive user session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} diff --git a/crates/tasks/src/cleanup/tokens.rs b/crates/tasks/src/cleanup/tokens.rs new file mode 100644 index 000000000..dd91de2b9 --- /dev/null +++ b/crates/tasks/src/cleanup/tokens.rs @@ -0,0 +1,214 @@ +// Copyright 2026 Element Creations Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! OAuth token cleanup tasks + +use std::time::Duration; + +use async_trait::async_trait; +use mas_storage::queue::{ + CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, + CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, +}; +use tracing::{debug, info}; + +use super::BATCH_SIZE; +use crate::{ + State, + new_queue::{JobContext, JobError, RunnableJob}, +}; + +#[async_trait] +impl RunnableJob for CleanupRevokedOAuthAccessTokensJob { + #[tracing::instrument(name = "job.cleanup_revoked_oauth_access_tokens", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup tokens that were revoked more than an hour ago + let until = state.clock.now() - chrono::Duration::hours(1); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted tokens, and the last revoked_at timestamp + let (count, last_revoked_at) = repo + .oauth2_access_token() + .cleanup_revoked(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_revoked_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no token to clean up"); + } else { + info!(count = total, "cleaned up revoked tokens"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupExpiredOAuthAccessTokensJob { + #[tracing::instrument(name = "job.cleanup_expired_oauth_access_tokens", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup tokens that expired more than a month ago + // It is important to keep them around for a bit because of refresh + // token idempotency. When we see a refresh token twice, we allow + // reusing it *only* if both the next refresh token and the next access + // tokens were not used. By keeping expired access tokens around for a + // month, we cannot make the *correct* decision, we will assume that the + // token wasn't used. Refer to the token refresh logic for details. + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted tokens, and the last expires_at timestamp + let (count, last_expires_at) = repo + .oauth2_access_token() + .cleanup_expired(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_expires_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no token to clean up"); + } else { + info!(count = total, "cleaned up expired tokens"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupRevokedOAuthRefreshTokensJob { + #[tracing::instrument(name = "job.cleanup_revoked_oauth_refresh_tokens", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup tokens that were revoked more than an hour ago + let until = state.clock.now() - chrono::Duration::hours(1); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted tokens, and the last revoked_at timestamp + let (count, last_revoked_at) = repo + .oauth2_refresh_token() + .cleanup_revoked(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_revoked_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no token to clean up"); + } else { + info!(count = total, "cleaned up revoked tokens"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupConsumedOAuthRefreshTokensJob { + #[tracing::instrument(name = "job.cleanup_consumed_oauth_refresh_tokens", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup tokens that were consumed more than an hour ago + let until = state.clock.now() - chrono::Duration::hours(1); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + // This returns the number of deleted tokens, and the last consumed_at timestamp + let (count, last_consumed_at) = repo + .oauth2_refresh_token() + .cleanup_consumed(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_consumed_at; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no token to clean up"); + } else { + info!(count = total, "cleaned up consumed tokens"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} diff --git a/crates/tasks/src/cleanup/user.rs b/crates/tasks/src/cleanup/user.rs new file mode 100644 index 000000000..d682c1b51 --- /dev/null +++ b/crates/tasks/src/cleanup/user.rs @@ -0,0 +1,181 @@ +// Copyright 2026 Element Creations Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! User-related cleanup tasks + +use std::time::Duration; + +use async_trait::async_trait; +use mas_storage::queue::{ + CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, +}; +use tracing::{debug, info}; +use ulid::Ulid; + +use super::BATCH_SIZE; +use crate::{ + State, + new_queue::{JobContext, JobError, RunnableJob}, +}; + +#[async_trait] +impl RunnableJob for CleanupUserRegistrationsJob { + #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove user registrations after 30 days. They are in practice only + // valid for 1h, but keeping them around helps investigate abuse patterns. + let until = state.clock.now() - chrono::Duration::days(30); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted registrations, and the greatest ULID + // processed + let (count, cursor) = repo + .user_registration() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user registrations to clean up"); + } else { + info!(count = total, "cleaned up user registrations"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupUserRecoverySessionsJob { + #[tracing::instrument(name = "job.cleanup_user_recovery_sessions", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove recovery sessions after 7 days. They are in practice only + // valid for a short time (tickets expire after 10 minutes), but keeping + // them around helps investigate abuse patterns. + let until = state.clock.now() - chrono::Duration::days(7); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted sessions, and the greatest ULID processed + let (count, cursor) = repo + .user_recovery() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user recovery sessions to clean up"); + } else { + info!(count = total, "cleaned up user recovery sessions"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupUserEmailAuthenticationsJob { + #[tracing::instrument(name = "job.cleanup_user_email_authentications", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove email authentications after 7 days. They are in practice only + // valid for a short time (codes expire after 10 minutes), but keeping + // them around helps investigate abuse patterns. + let until = state.clock.now() - chrono::Duration::days(7); + // We use the fact that ULIDs include the creation time in their first 48 bits + // as a cursor + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + // Run until we get cancelled. We don't schedule a retry if we get cancelled, as + // this is a scheduled job and it will end up being rescheduled later anyway. + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + // This returns the number of deleted authentications, and the greatest ULID + // processed + let (count, cursor) = repo + .user_email() + .cleanup_authentications(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + // Check how many we deleted. If we deleted exactly BATCH_SIZE, + // there might be more to delete + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user email authentications to clean up"); + } else { + info!(count = total, "cleaned up user email authentications"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + // This job runs every hour, so having it running it for 10 minutes is fine + Some(Duration::from_secs(10 * 60)) + } +} diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs deleted file mode 100644 index c3fb46f78..000000000 --- a/crates/tasks/src/database.rs +++ /dev/null @@ -1,916 +0,0 @@ -// Copyright 2025, 2026 Element Creations Ltd. -// Copyright 2024, 2025 New Vector Ltd. -// Copyright 2023, 2024 The Matrix.org Foundation C.I.C. -// -// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial -// Please see LICENSE files in the repository root for full details. - -//! Database-related tasks - -use std::time::Duration; - -use async_trait::async_trait; -use mas_storage::queue::{ - CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, - CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob, - CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob, - CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob, - CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, - CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, - CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, - CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, - CleanupUserRegistrationsJob, PruneStalePolicyDataJob, -}; -use tracing::{debug, info}; -use ulid::Ulid; - -use crate::{ - State, - new_queue::{JobContext, JobError, RunnableJob}, -}; - -const BATCH_SIZE: usize = 1000; - -#[async_trait] -impl RunnableJob for CleanupRevokedOAuthAccessTokensJob { - #[tracing::instrument(name = "job.cleanup_revoked_oauth_access_tokens", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup tokens that were revoked more than an hour ago - let until = state.clock.now() - chrono::Duration::hours(1); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted tokens, and the last revoked_at timestamp - let (count, last_revoked_at) = repo - .oauth2_access_token() - .cleanup_revoked(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_revoked_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no token to clean up"); - } else { - info!(count = total, "cleaned up revoked tokens"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupExpiredOAuthAccessTokensJob { - #[tracing::instrument(name = "job.cleanup_expired_oauth_access_tokens", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup tokens that expired more than a month ago - // It is important to keep them around for a bit because of refresh - // token idempotency. When we see a refresh token twice, we allow - // reusing it *only* if both the next refresh token and the next access - // tokens were not used. By keeping expired access tokens around for a - // month, we cannot make the *correct* decision, we will assume that the - // token wasn't used. Refer to the token refresh logic for details. - let until = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted tokens, and the last expires_at timestamp - let (count, last_expires_at) = repo - .oauth2_access_token() - .cleanup_expired(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_expires_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no token to clean up"); - } else { - info!(count = total, "cleaned up expired tokens"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - Some(Duration::from_secs(60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupRevokedOAuthRefreshTokensJob { - #[tracing::instrument(name = "job.cleanup_revoked_oauth_refresh_tokens", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup tokens that were revoked more than an hour ago - let until = state.clock.now() - chrono::Duration::hours(1); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted tokens, and the last revoked_at timestamp - let (count, last_revoked_at) = repo - .oauth2_refresh_token() - .cleanup_revoked(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_revoked_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no token to clean up"); - } else { - info!(count = total, "cleaned up revoked tokens"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupConsumedOAuthRefreshTokensJob { - #[tracing::instrument(name = "job.cleanup_consumed_oauth_refresh_tokens", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup tokens that were consumed more than an hour ago - let until = state.clock.now() - chrono::Duration::hours(1); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted tokens, and the last consumed_at timestamp - let (count, last_consumed_at) = repo - .oauth2_refresh_token() - .cleanup_consumed(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_consumed_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no token to clean up"); - } else { - info!(count = total, "cleaned up consumed tokens"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupUserRecoverySessionsJob { - #[tracing::instrument(name = "job.cleanup_user_recovery_sessions", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove recovery sessions after 7 days. They are in practice only - // valid for a short time (tickets expire after 10 minutes), but keeping - // them around helps investigate abuse patterns. - let until = state.clock.now() - chrono::Duration::days(7); - // We use the fact that ULIDs include the creation time in their first 48 bits - // as a cursor - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - // This returns the number of deleted sessions, and the greatest ULID processed - let (count, cursor) = repo - .user_recovery() - .cleanup(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no user recovery sessions to clean up"); - } else { - info!(count = total, "cleaned up user recovery sessions"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupUserEmailAuthenticationsJob { - #[tracing::instrument(name = "job.cleanup_user_email_authentications", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove email authentications after 7 days. They are in practice only - // valid for a short time (codes expire after 10 minutes), but keeping - // them around helps investigate abuse patterns. - let until = state.clock.now() - chrono::Duration::days(7); - // We use the fact that ULIDs include the creation time in their first 48 bits - // as a cursor - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - // This returns the number of deleted authentications, and the greatest ULID - // processed - let (count, cursor) = repo - .user_email() - .cleanup_authentications(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no user email authentications to clean up"); - } else { - info!(count = total, "cleaned up user email authentications"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupUpstreamOAuthSessionsJob { - #[tracing::instrument(name = "job.cleanup_upstream_oauth_sessions", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove pending upstream OAuth authorization sessions after 7 days. - let until = state.clock.now() - chrono::Duration::days(7); - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - let (count, cursor) = repo - .upstream_oauth_session() - .cleanup_orphaned(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no pending upstream OAuth sessions to clean up"); - } else { - info!(count = total, "cleaned up pending upstream OAuth sessions"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupUpstreamOAuthLinksJob { - #[tracing::instrument(name = "job.cleanup_upstream_oauth_links", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove orphaned upstream OAuth links after 7 days. - let until = state.clock.now() - chrono::Duration::days(7); - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - let (count, cursor) = repo - .upstream_oauth_link() - .cleanup_orphaned(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no orphaned upstream OAuth links to clean up"); - } else { - info!(count = total, "cleaned up orphaned upstream OAuth links"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupQueueJobsJob { - #[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove completed and failed queue jobs after 30 days. - // Keep them for debugging purposes. - let until = state.clock.now() - chrono::Duration::days(30); - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - let (count, cursor) = repo - .queue_job() - .cleanup(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no queue jobs to clean up"); - } else { - info!(count = total, "cleaned up queue jobs"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupUserRegistrationsJob { - #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove user registrations after 30 days. They are in practice only - // valid for 1h, but keeping them around helps investigate abuse patterns. - let until = state.clock.now() - chrono::Duration::days(30); - // We use the fact that ULIDs include the creation time in their first 48 bits - // as a cursor - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - // This returns the number of deleted registrations, and the greatest ULID - // processed - let (count, cursor) = repo - .user_registration() - .cleanup(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no user registrations to clean up"); - } else { - info!(count = total, "cleaned up user registrations"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupFinishedCompatSessionsJob { - #[tracing::instrument(name = "job.cleanup_finished_compat_sessions", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup compat sessions that were finished more than 30 days ago - let until = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted sessions, and the last finished_at - // timestamp - let (count, last_finished_at) = repo - .compat_session() - .cleanup_finished(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_finished_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no finished compat sessions to clean up"); - } else { - info!(count = total, "cleaned up finished compat sessions"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupFinishedOAuth2SessionsJob { - #[tracing::instrument(name = "job.cleanup_finished_oauth2_sessions", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup OAuth2 sessions that were finished more than 30 days ago - let until = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted sessions, and the last finished_at - // timestamp - let (count, last_finished_at) = repo - .oauth2_session() - .cleanup_finished(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_finished_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no finished OAuth2 sessions to clean up"); - } else { - info!(count = total, "cleaned up finished OAuth2 sessions"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupFinishedUserSessionsJob { - #[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Cleanup user/browser sessions that were finished more than 30 days ago - let until = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // This returns the number of deleted sessions, and the last finished_at - // timestamp. Only deletes sessions that have no child sessions - // (compat_sessions or oauth2_sessions). - let (count, last_finished_at) = repo - .browser_session() - .cleanup_finished(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_finished_at; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no finished user sessions to clean up"); - } else { - info!(count = total, "cleaned up finished user sessions"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupOAuthAuthorizationGrantsJob { - #[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove authorization grants after 7 days. They are in practice only - // valid for a short time, but keeping them around helps investigate abuse - // patterns. - let until = state.clock.now() - chrono::Duration::days(7); - // We use the fact that ULIDs include the creation time in their first 48 bits - // as a cursor - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - // This returns the number of deleted grants, and the greatest ULID processed - let (count, cursor) = repo - .oauth2_authorization_grant() - .cleanup(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no authorization grants to clean up"); - } else { - info!(count = total, "cleaned up authorization grants"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupOAuthDeviceCodeGrantsJob { - #[tracing::instrument(name = "job.cleanup_oauth_device_code_grants", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Remove device code grants after 7 days. They are in practice only - // valid for a short time, but keeping them around helps investigate abuse - // patterns. - let until = state.clock.now() - chrono::Duration::days(7); - // We use the fact that ULIDs include the creation time in their first 48 bits - // as a cursor - let until = Ulid::from_parts( - u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), - u128::MAX, - ); - let mut total = 0; - - // Run until we get cancelled. We don't schedule a retry if we get cancelled, as - // this is a scheduled job and it will end up being rescheduled later anyway. - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - // This returns the number of deleted grants, and the greatest ULID processed - let (count, cursor) = repo - .oauth2_device_code_grant() - .cleanup(since, until, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - since = cursor; - total += count; - - // Check how many we deleted. If we deleted exactly BATCH_SIZE, - // there might be more to delete - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no device code grants to clean up"); - } else { - info!(count = total, "cleaned up device code grants"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - // This job runs every hour, so having it running it for 10 minutes is fine - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for PruneStalePolicyDataJob { - #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)] - async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - // Keep the last 10 policy data - let count = repo - .policy_data() - .prune(10) - .await - .map_err(JobError::retry)?; - - repo.save().await.map_err(JobError::retry)?; - - if count == 0 { - debug!("no stale policy data to prune"); - } else { - info!(count, "pruned stale policy data"); - } - - Ok(()) - } -} - -#[async_trait] -impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob { - #[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Clear IPs from sessions inactive for 30+ days - let threshold = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - let (count, last_active_at) = repo - .oauth2_session() - .cleanup_inactive_ips(since, threshold, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_active_at; - total += count; - - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no OAuth2 session IPs to clean up"); - } else { - info!(count = total, "cleaned up inactive OAuth2 session IPs"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupInactiveCompatSessionIpsJob { - #[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Clear IPs from sessions inactive for 30+ days - let threshold = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - let (count, last_active_at) = repo - .compat_session() - .cleanup_inactive_ips(since, threshold, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_active_at; - total += count; - - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no compat session IPs to clean up"); - } else { - info!(count = total, "cleaned up inactive compat session IPs"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - Some(Duration::from_secs(10 * 60)) - } -} - -#[async_trait] -impl RunnableJob for CleanupInactiveUserSessionIpsJob { - #[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)] - async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { - // Clear IPs from sessions inactive for 30+ days - let threshold = state.clock.now() - chrono::Duration::days(30); - let mut total = 0; - - let mut since = None; - while !context.cancellation_token.is_cancelled() { - let mut repo = state.repository().await.map_err(JobError::retry)?; - - let (count, last_active_at) = repo - .browser_session() - .cleanup_inactive_ips(since, threshold, BATCH_SIZE) - .await - .map_err(JobError::retry)?; - repo.save().await.map_err(JobError::retry)?; - - since = last_active_at; - total += count; - - if count != BATCH_SIZE { - break; - } - } - - if total == 0 { - debug!("no user session IPs to clean up"); - } else { - info!(count = total, "cleaned up inactive user session IPs"); - } - - Ok(()) - } - - fn timeout(&self) -> Option { - Some(Duration::from_secs(10 * 60)) - } -} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index ca339f96f..33748ae8e 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -21,7 +21,7 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker}; pub use crate::new_queue::QueueWorker; -mod database; +mod cleanup; mod email; mod matrix; mod new_queue; @@ -162,125 +162,128 @@ pub async fn init( .register_handler::() .register_handler::() .register_deprecated_queue("cleanup-expired-tokens") + // Recurring jobs are spread across the hour at ~5 minute intervals + // to avoid clustering and distribute database load evenly. .add_schedule( "cleanup-revoked-oauth-access-tokens", - // Run this job every hour + // Run this job every hour at minute 0 "0 0 * * * *".parse()?, mas_storage::queue::CleanupRevokedOAuthAccessTokensJob, ) .add_schedule( "cleanup-revoked-oauth-refresh-tokens", - // Run this job every hour - "0 10 * * * *".parse()?, + // Run this job every hour at minute 5 + "0 5 * * * *".parse()?, mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob, ) .add_schedule( "cleanup-consumed-oauth-refresh-tokens", - // Run this job every hour - "0 20 * * * *".parse()?, + // Run this job every hour at minute 5 (safe to parallelize with revoked) + "0 5 * * * *".parse()?, mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob, ) - .add_schedule( - "cleanup-user-registrations", - // Run this job every hour - "0 30 * * * *".parse()?, - mas_storage::queue::CleanupUserRegistrationsJob, - ) .add_schedule( "cleanup-finished-compat-sessions", - // Run this job every hour - "0 40 * * * *".parse()?, + // Run this job every hour at minute 10 + "0 10 * * * *".parse()?, mas_storage::queue::CleanupFinishedCompatSessionsJob, ) .add_schedule( "cleanup-finished-oauth2-sessions", - // Run this job every hour - "0 42 * * * *".parse()?, + // Run this job every hour at minute 15 + "0 15 * * * *".parse()?, mas_storage::queue::CleanupFinishedOAuth2SessionsJob, ) .add_schedule( "cleanup-finished-user-sessions", - // Run this job every hour - "0 44 * * * *".parse()?, + // Run this job every hour at minute 20 + "0 20 * * * *".parse()?, mas_storage::queue::CleanupFinishedUserSessionsJob, ) + .add_schedule( + "cleanup-inactive-oauth2-session-ips", + // Run this job every hour at minute 25 + "0 25 * * * *".parse()?, + mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob, + ) + .add_schedule( + "cleanup-inactive-compat-session-ips", + // Run this job every hour at minute 25 + "0 25 * * * *".parse()?, + mas_storage::queue::CleanupInactiveCompatSessionIpsJob, + ) + .add_schedule( + "cleanup-inactive-user-session-ips", + // Run this job every hour at minute 25 + "0 25 * * * *".parse()?, + mas_storage::queue::CleanupInactiveUserSessionIpsJob, + ) .add_schedule( "cleanup-oauth-authorization-grants", - // Run this job every hour - "0 50 * * * *".parse()?, + // Run this job every hour at minute 30 + "0 30 * * * *".parse()?, mas_storage::queue::CleanupOAuthAuthorizationGrantsJob, ) .add_schedule( "cleanup-oauth-device-code-grants", - // Run this job every hour - "0 55 * * * *".parse()?, + // Run this job every hour at minute 35 + "0 35 * * * *".parse()?, mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob, ) - .add_schedule( - "cleanup-user-recovery-sessions", - // Run this job every hour - "0 56 * * * *".parse()?, - mas_storage::queue::CleanupUserRecoverySessionsJob, - ) - .add_schedule( - "cleanup-user-email-authentications", - // Run this job every hour - "0 57 * * * *".parse()?, - mas_storage::queue::CleanupUserEmailAuthenticationsJob, - ) .add_schedule( "cleanup-upstream-oauth-sessions", - // Run this job every hour - "0 58 * * * *".parse()?, + // Run this job every hour at minute 40 (independent, safe to parallelize) + "0 40 * * * *".parse()?, mas_storage::queue::CleanupUpstreamOAuthSessionsJob, ) .add_schedule( "cleanup-upstream-oauth-links", - // Run this job every hour - "0 59 * * * *".parse()?, + // Run this job every hour at minute 40 + "0 40 * * * *".parse()?, mas_storage::queue::CleanupUpstreamOAuthLinksJob, ) + // User cleanup jobs (minutes 45, 50) + .add_schedule( + "cleanup-user-registrations", + // Run this job every hour at minute 45 + "0 45 * * * *".parse()?, + mas_storage::queue::CleanupUserRegistrationsJob, + ) + .add_schedule( + "cleanup-user-recovery-sessions", + // Run this job every hour at minute 50 + "0 50 * * * *".parse()?, + mas_storage::queue::CleanupUserRecoverySessionsJob, + ) + .add_schedule( + "cleanup-user-email-authentications", + // Run this job every hour at minute 50 + "0 50 * * * *".parse()?, + mas_storage::queue::CleanupUserEmailAuthenticationsJob, + ) .add_schedule( "cleanup-queue-jobs", - // Run this job every hour - "0 45 * * * *".parse()?, + // Run this job every hour at minute 55 + "0 55 * * * *".parse()?, mas_storage::queue::CleanupQueueJobsJob, ) .add_schedule( "cleanup-expired-oauth-access-tokens", - // Run this job every 4 hours + // Run this job every 4 hours at minute 5 "0 5 */4 * * *".parse()?, mas_storage::queue::CleanupExpiredOAuthAccessTokensJob, ) .add_schedule( "expire-inactive-sessions", - // Run this job every 15 minutes + // Run this job every 15 minutes at second 30 "30 */15 * * * *".parse()?, mas_storage::queue::ExpireInactiveSessionsJob, ) .add_schedule( "prune-stale-policy-data", - // Run once a day + // Run once a day at 2:00 AM "0 0 2 * * *".parse()?, mas_storage::queue::PruneStalePolicyDataJob, - ) - .add_schedule( - "cleanup-inactive-oauth2-session-ips", - // Run this job every hour - "0 46 * * * *".parse()?, - mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob, - ) - .add_schedule( - "cleanup-inactive-compat-session-ips", - // Run this job every hour - "0 47 * * * *".parse()?, - mas_storage::queue::CleanupInactiveCompatSessionIpsJob, - ) - .add_schedule( - "cleanup-inactive-user-session-ips", - // Run this job every hour - "0 48 * * * *".parse()?, - mas_storage::queue::CleanupInactiveUserSessionIpsJob, ); Ok(worker) diff --git a/docs/development/cleanup-jobs.md b/docs/development/cleanup-jobs.md index 639d0e7a3..f64cb7f21 100644 --- a/docs/development/cleanup-jobs.md +++ b/docs/development/cleanup-jobs.md @@ -11,9 +11,23 @@ Cleanup jobs are scheduled tasks that hard-delete old data from the database. Th 1. **Job struct** in `crates/storage/src/queue/tasks.rs` - Defines the job and queue name 2. **Storage trait** in `crates/storage/src/{domain}/` - Declares the cleanup method interface 3. **PostgreSQL implementation** in `crates/storage-pg/src/{domain}/` - Implements the actual cleanup logic -4. **Job runner** in `crates/tasks/src/database.rs` - Implements the `RunnableJob` trait with batching logic +4. **Job runner** in `crates/tasks/src/cleanup/` - Implements the `RunnableJob` trait with batching logic 5. **Registration** in `crates/tasks/src/lib.rs` - Registers the handler and schedules execution +### Module Structure + +The cleanup job implementations are organized into submodules by domain: + +``` +crates/tasks/src/cleanup/ +├── mod.rs # Re-exports, shared BATCH_SIZE constant +├── tokens.rs # OAuth token cleanup (access and refresh tokens) +├── sessions.rs # Session cleanup (compat, OAuth2, user sessions and their IPs) +├── oauth.rs # OAuth grants and upstream OAuth cleanup +├── user.rs # User-related cleanup (registrations, recovery, email auth) +└── misc.rs # Queue jobs, policy data cleanup +``` + ## All Cleanup Jobs | Job | Entity | Retention | Notes | @@ -183,7 +197,7 @@ The partial index (`WHERE timestamp_col IS NOT NULL`) makes queries more efficie ### 5. Implement RunnableJob -In `crates/tasks/src/database.rs`: +In the appropriate submodule under `crates/tasks/src/cleanup/` (e.g., `tokens.rs`, `sessions.rs`, `oauth.rs`, `user.rs`, or `misc.rs`): ```rust #[async_trait]