Split the cleanup jobs into multiple files

This commit is contained in:
Quentin Gliech
2026-01-30 06:28:05 +01:00
parent f8de48a086
commit a059f32f16
9 changed files with 1093 additions and 979 deletions

View File

@@ -0,0 +1,88 @@
// Copyright 2026 Element Creations Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! Miscellaneous cleanup tasks
use std::time::Duration;
use async_trait::async_trait;
use mas_storage::queue::{CleanupQueueJobsJob, PruneStalePolicyDataJob};
use tracing::{debug, info};
use ulid::Ulid;
use super::BATCH_SIZE;
use crate::{
State,
new_queue::{JobContext, JobError, RunnableJob},
};
#[async_trait]
impl RunnableJob for CleanupQueueJobsJob {
#[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove completed and failed queue jobs after 30 days.
// Keep them for debugging purposes.
let until = state.clock.now() - chrono::Duration::days(30);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.queue_job()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no queue jobs to clean up");
} else {
info!(count = total, "cleaned up queue jobs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for PruneStalePolicyDataJob {
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// Keep the last 10 policy data
let count = repo
.policy_data()
.prune(10)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
if count == 0 {
debug!("no stale policy data to prune");
} else {
info!(count, "pruned stale policy data");
}
Ok(())
}
}

View File

@@ -0,0 +1,24 @@
// Copyright 2026 Element Creations Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! Database cleanup tasks
//!
//! This module contains tasks for cleaning up old data from the database.
//! Tasks are grouped by domain:
//!
//! - [`tokens`]: OAuth token cleanup (access and refresh tokens)
//! - [`sessions`]: Session cleanup (compat, `OAuth2`, user sessions and their
//! IPs)
//! - [`oauth`]: OAuth grants and upstream OAuth cleanup
//! - [`user`]: User-related cleanup (registrations, recovery, email auth)
//! - [`misc`]: Miscellaneous cleanup (queue jobs, policy data)
mod misc;
mod oauth;
mod sessions;
mod tokens;
mod user;
pub(crate) const BATCH_SIZE: usize = 1000;

View File

@@ -0,0 +1,216 @@
// Copyright 2026 Element Creations Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! OAuth grants and upstream OAuth cleanup tasks
use std::time::Duration;
use async_trait::async_trait;
use mas_storage::queue::{
CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob,
CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob,
};
use tracing::{debug, info};
use ulid::Ulid;
use super::BATCH_SIZE;
use crate::{
State,
new_queue::{JobContext, JobError, RunnableJob},
};
#[async_trait]
impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
#[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove authorization grants after 7 days. They are in practice only
// valid for a short time, but keeping them around helps investigate abuse
// patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted grants, and the greatest ULID processed
let (count, cursor) = repo
.oauth2_authorization_grant()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no authorization grants to clean up");
} else {
info!(count = total, "cleaned up authorization grants");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupOAuthDeviceCodeGrantsJob {
#[tracing::instrument(name = "job.cleanup_oauth_device_code_grants", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove device code grants after 7 days. They are in practice only
// valid for a short time, but keeping them around helps investigate abuse
// patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted grants, and the greatest ULID processed
let (count, cursor) = repo
.oauth2_device_code_grant()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no device code grants to clean up");
} else {
info!(count = total, "cleaned up device code grants");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUpstreamOAuthSessionsJob {
#[tracing::instrument(name = "job.cleanup_upstream_oauth_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove pending upstream OAuth authorization sessions after 7 days.
let until = state.clock.now() - chrono::Duration::days(7);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_session()
.cleanup_orphaned(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no pending upstream OAuth sessions to clean up");
} else {
info!(count = total, "cleaned up pending upstream OAuth sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUpstreamOAuthLinksJob {
#[tracing::instrument(name = "job.cleanup_upstream_oauth_links", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove orphaned upstream OAuth links after 7 days.
let until = state.clock.now() - chrono::Duration::days(7);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_link()
.cleanup_orphaned(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no orphaned upstream OAuth links to clean up");
} else {
info!(count = total, "cleaned up orphaned upstream OAuth links");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}

View File

@@ -0,0 +1,290 @@
// Copyright 2026 Element Creations Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! Session cleanup tasks
use std::time::Duration;
use async_trait::async_trait;
use mas_storage::queue::{
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob,
CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob,
};
use tracing::{debug, info};
use super::BATCH_SIZE;
use crate::{
State,
new_queue::{JobContext, JobError, RunnableJob},
};
#[async_trait]
impl RunnableJob for CleanupFinishedCompatSessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_compat_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup compat sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp
let (count, last_finished_at) = repo
.compat_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished compat sessions to clean up");
} else {
info!(count = total, "cleaned up finished compat sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupFinishedOAuth2SessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_oauth2_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup OAuth2 sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp
let (count, last_finished_at) = repo
.oauth2_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished OAuth2 sessions to clean up");
} else {
info!(count = total, "cleaned up finished OAuth2 sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupFinishedUserSessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup user/browser sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp. Only deletes sessions that have no child sessions
// (compat_sessions or oauth2_sessions).
let (count, last_finished_at) = repo
.browser_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished user sessions to clean up");
} else {
info!(count = total, "cleaned up finished user sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob {
#[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Clear IPs from sessions inactive for 30+ days
let threshold = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_active_at) = repo
.oauth2_session()
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_active_at;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no OAuth2 session IPs to clean up");
} else {
info!(count = total, "cleaned up inactive OAuth2 session IPs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupInactiveCompatSessionIpsJob {
#[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Clear IPs from sessions inactive for 30+ days
let threshold = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_active_at) = repo
.compat_session()
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_active_at;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no compat session IPs to clean up");
} else {
info!(count = total, "cleaned up inactive compat session IPs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupInactiveUserSessionIpsJob {
#[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Clear IPs from sessions inactive for 30+ days
let threshold = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_active_at) = repo
.browser_session()
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_active_at;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user session IPs to clean up");
} else {
info!(count = total, "cleaned up inactive user session IPs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}

View File

@@ -0,0 +1,214 @@
// Copyright 2026 Element Creations Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! OAuth token cleanup tasks
use std::time::Duration;
use async_trait::async_trait;
use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
};
use tracing::{debug, info};
use super::BATCH_SIZE;
use crate::{
State,
new_queue::{JobContext, JobError, RunnableJob},
};
#[async_trait]
impl RunnableJob for CleanupRevokedOAuthAccessTokensJob {
#[tracing::instrument(name = "job.cleanup_revoked_oauth_access_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that were revoked more than an hour ago
let until = state.clock.now() - chrono::Duration::hours(1);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last revoked_at timestamp
let (count, last_revoked_at) = repo
.oauth2_access_token()
.cleanup_revoked(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_revoked_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up revoked tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupExpiredOAuthAccessTokensJob {
#[tracing::instrument(name = "job.cleanup_expired_oauth_access_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that expired more than a month ago
// It is important to keep them around for a bit because of refresh
// token idempotency. When we see a refresh token twice, we allow
// reusing it *only* if both the next refresh token and the next access
// tokens were not used. By keeping expired access tokens around for a
// month, we cannot make the *correct* decision, we will assume that the
// token wasn't used. Refer to the token refresh logic for details.
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last expires_at timestamp
let (count, last_expires_at) = repo
.oauth2_access_token()
.cleanup_expired(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_expires_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up expired tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(60))
}
}
#[async_trait]
impl RunnableJob for CleanupRevokedOAuthRefreshTokensJob {
#[tracing::instrument(name = "job.cleanup_revoked_oauth_refresh_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that were revoked more than an hour ago
let until = state.clock.now() - chrono::Duration::hours(1);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last revoked_at timestamp
let (count, last_revoked_at) = repo
.oauth2_refresh_token()
.cleanup_revoked(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_revoked_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up revoked tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupConsumedOAuthRefreshTokensJob {
#[tracing::instrument(name = "job.cleanup_consumed_oauth_refresh_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that were consumed more than an hour ago
let until = state.clock.now() - chrono::Duration::hours(1);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last consumed_at timestamp
let (count, last_consumed_at) = repo
.oauth2_refresh_token()
.cleanup_consumed(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_consumed_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up consumed tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}

View File

@@ -0,0 +1,181 @@
// Copyright 2026 Element Creations Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! User-related cleanup tasks
use std::time::Duration;
use async_trait::async_trait;
use mas_storage::queue::{
CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob,
};
use tracing::{debug, info};
use ulid::Ulid;
use super::BATCH_SIZE;
use crate::{
State,
new_queue::{JobContext, JobError, RunnableJob},
};
#[async_trait]
impl RunnableJob for CleanupUserRegistrationsJob {
#[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove user registrations after 30 days. They are in practice only
// valid for 1h, but keeping them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(30);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted registrations, and the greatest ULID
// processed
let (count, cursor) = repo
.user_registration()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user registrations to clean up");
} else {
info!(count = total, "cleaned up user registrations");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserRecoverySessionsJob {
#[tracing::instrument(name = "job.cleanup_user_recovery_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove recovery sessions after 7 days. They are in practice only
// valid for a short time (tickets expire after 10 minutes), but keeping
// them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the greatest ULID processed
let (count, cursor) = repo
.user_recovery()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user recovery sessions to clean up");
} else {
info!(count = total, "cleaned up user recovery sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserEmailAuthenticationsJob {
#[tracing::instrument(name = "job.cleanup_user_email_authentications", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove email authentications after 7 days. They are in practice only
// valid for a short time (codes expire after 10 minutes), but keeping
// them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted authentications, and the greatest ULID
// processed
let (count, cursor) = repo
.user_email()
.cleanup_authentications(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user email authentications to clean up");
} else {
info!(count = total, "cleaned up user email authentications");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}

View File

@@ -1,916 +0,0 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
//! Database-related tasks
use std::time::Duration;
use async_trait::async_trait;
use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob,
CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob,
CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob,
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob,
CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob,
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
};
use tracing::{debug, info};
use ulid::Ulid;
use crate::{
State,
new_queue::{JobContext, JobError, RunnableJob},
};
const BATCH_SIZE: usize = 1000;
#[async_trait]
impl RunnableJob for CleanupRevokedOAuthAccessTokensJob {
#[tracing::instrument(name = "job.cleanup_revoked_oauth_access_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that were revoked more than an hour ago
let until = state.clock.now() - chrono::Duration::hours(1);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last revoked_at timestamp
let (count, last_revoked_at) = repo
.oauth2_access_token()
.cleanup_revoked(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_revoked_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up revoked tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupExpiredOAuthAccessTokensJob {
#[tracing::instrument(name = "job.cleanup_expired_oauth_access_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that expired more than a month ago
// It is important to keep them around for a bit because of refresh
// token idempotency. When we see a refresh token twice, we allow
// reusing it *only* if both the next refresh token and the next access
// tokens were not used. By keeping expired access tokens around for a
// month, we cannot make the *correct* decision, we will assume that the
// token wasn't used. Refer to the token refresh logic for details.
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last expires_at timestamp
let (count, last_expires_at) = repo
.oauth2_access_token()
.cleanup_expired(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_expires_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up expired tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(60))
}
}
#[async_trait]
impl RunnableJob for CleanupRevokedOAuthRefreshTokensJob {
#[tracing::instrument(name = "job.cleanup_revoked_oauth_refresh_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that were revoked more than an hour ago
let until = state.clock.now() - chrono::Duration::hours(1);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last revoked_at timestamp
let (count, last_revoked_at) = repo
.oauth2_refresh_token()
.cleanup_revoked(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_revoked_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up revoked tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupConsumedOAuthRefreshTokensJob {
#[tracing::instrument(name = "job.cleanup_consumed_oauth_refresh_tokens", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup tokens that were consumed more than an hour ago
let until = state.clock.now() - chrono::Duration::hours(1);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted tokens, and the last consumed_at timestamp
let (count, last_consumed_at) = repo
.oauth2_refresh_token()
.cleanup_consumed(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_consumed_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no token to clean up");
} else {
info!(count = total, "cleaned up consumed tokens");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserRecoverySessionsJob {
#[tracing::instrument(name = "job.cleanup_user_recovery_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove recovery sessions after 7 days. They are in practice only
// valid for a short time (tickets expire after 10 minutes), but keeping
// them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the greatest ULID processed
let (count, cursor) = repo
.user_recovery()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user recovery sessions to clean up");
} else {
info!(count = total, "cleaned up user recovery sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserEmailAuthenticationsJob {
#[tracing::instrument(name = "job.cleanup_user_email_authentications", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove email authentications after 7 days. They are in practice only
// valid for a short time (codes expire after 10 minutes), but keeping
// them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted authentications, and the greatest ULID
// processed
let (count, cursor) = repo
.user_email()
.cleanup_authentications(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user email authentications to clean up");
} else {
info!(count = total, "cleaned up user email authentications");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUpstreamOAuthSessionsJob {
#[tracing::instrument(name = "job.cleanup_upstream_oauth_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove pending upstream OAuth authorization sessions after 7 days.
let until = state.clock.now() - chrono::Duration::days(7);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_session()
.cleanup_orphaned(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no pending upstream OAuth sessions to clean up");
} else {
info!(count = total, "cleaned up pending upstream OAuth sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUpstreamOAuthLinksJob {
#[tracing::instrument(name = "job.cleanup_upstream_oauth_links", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove orphaned upstream OAuth links after 7 days.
let until = state.clock.now() - chrono::Duration::days(7);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_link()
.cleanup_orphaned(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no orphaned upstream OAuth links to clean up");
} else {
info!(count = total, "cleaned up orphaned upstream OAuth links");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupQueueJobsJob {
#[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove completed and failed queue jobs after 30 days.
// Keep them for debugging purposes.
let until = state.clock.now() - chrono::Duration::days(30);
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.queue_job()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no queue jobs to clean up");
} else {
info!(count = total, "cleaned up queue jobs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupUserRegistrationsJob {
#[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove user registrations after 30 days. They are in practice only
// valid for 1h, but keeping them around helps investigate abuse patterns.
let until = state.clock.now() - chrono::Duration::days(30);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted registrations, and the greatest ULID
// processed
let (count, cursor) = repo
.user_registration()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user registrations to clean up");
} else {
info!(count = total, "cleaned up user registrations");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupFinishedCompatSessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_compat_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup compat sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp
let (count, last_finished_at) = repo
.compat_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished compat sessions to clean up");
} else {
info!(count = total, "cleaned up finished compat sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupFinishedOAuth2SessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_oauth2_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup OAuth2 sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp
let (count, last_finished_at) = repo
.oauth2_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished OAuth2 sessions to clean up");
} else {
info!(count = total, "cleaned up finished OAuth2 sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupFinishedUserSessionsJob {
#[tracing::instrument(name = "job.cleanup_finished_user_sessions", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup user/browser sessions that were finished more than 30 days ago
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted sessions, and the last finished_at
// timestamp. Only deletes sessions that have no child sessions
// (compat_sessions or oauth2_sessions).
let (count, last_finished_at) = repo
.browser_session()
.cleanup_finished(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_finished_at;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no finished user sessions to clean up");
} else {
info!(count = total, "cleaned up finished user sessions");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupOAuthAuthorizationGrantsJob {
#[tracing::instrument(name = "job.cleanup_oauth_authorization_grants", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove authorization grants after 7 days. They are in practice only
// valid for a short time, but keeping them around helps investigate abuse
// patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted grants, and the greatest ULID processed
let (count, cursor) = repo
.oauth2_authorization_grant()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no authorization grants to clean up");
} else {
info!(count = total, "cleaned up authorization grants");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupOAuthDeviceCodeGrantsJob {
#[tracing::instrument(name = "job.cleanup_oauth_device_code_grants", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove device code grants after 7 days. They are in practice only
// valid for a short time, but keeping them around helps investigate abuse
// patterns.
let until = state.clock.now() - chrono::Duration::days(7);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted grants, and the greatest ULID processed
let (count, cursor) = repo
.oauth2_device_code_grant()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no device code grants to clean up");
} else {
info!(count = total, "cleaned up device code grants");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
// This job runs every hour, so having it running it for 10 minutes is fine
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for PruneStalePolicyDataJob {
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// Keep the last 10 policy data
let count = repo
.policy_data()
.prune(10)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
if count == 0 {
debug!("no stale policy data to prune");
} else {
info!(count, "pruned stale policy data");
}
Ok(())
}
}
#[async_trait]
impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob {
#[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Clear IPs from sessions inactive for 30+ days
let threshold = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_active_at) = repo
.oauth2_session()
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_active_at;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no OAuth2 session IPs to clean up");
} else {
info!(count = total, "cleaned up inactive OAuth2 session IPs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupInactiveCompatSessionIpsJob {
#[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Clear IPs from sessions inactive for 30+ days
let threshold = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_active_at) = repo
.compat_session()
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_active_at;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no compat session IPs to clean up");
} else {
info!(count = total, "cleaned up inactive compat session IPs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
#[async_trait]
impl RunnableJob for CleanupInactiveUserSessionIpsJob {
#[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Clear IPs from sessions inactive for 30+ days
let threshold = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_active_at) = repo
.browser_session()
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_active_at;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user session IPs to clean up");
} else {
info!(count = total, "cleaned up inactive user session IPs");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}

View File

@@ -21,7 +21,7 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker};
pub use crate::new_queue::QueueWorker;
mod database;
mod cleanup;
mod email;
mod matrix;
mod new_queue;
@@ -162,125 +162,128 @@ pub async fn init(
.register_handler::<mas_storage::queue::CleanupInactiveCompatSessionIpsJob>()
.register_handler::<mas_storage::queue::CleanupInactiveUserSessionIpsJob>()
.register_deprecated_queue("cleanup-expired-tokens")
// Recurring jobs are spread across the hour at ~5 minute intervals
// to avoid clustering and distribute database load evenly.
.add_schedule(
"cleanup-revoked-oauth-access-tokens",
// Run this job every hour
// Run this job every hour at minute 0
"0 0 * * * *".parse()?,
mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
)
.add_schedule(
"cleanup-revoked-oauth-refresh-tokens",
// Run this job every hour
"0 10 * * * *".parse()?,
// Run this job every hour at minute 5
"0 5 * * * *".parse()?,
mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
)
.add_schedule(
"cleanup-consumed-oauth-refresh-tokens",
// Run this job every hour
"0 20 * * * *".parse()?,
// Run this job every hour at minute 5 (safe to parallelize with revoked)
"0 5 * * * *".parse()?,
mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
)
.add_schedule(
"cleanup-user-registrations",
// Run this job every hour
"0 30 * * * *".parse()?,
mas_storage::queue::CleanupUserRegistrationsJob,
)
.add_schedule(
"cleanup-finished-compat-sessions",
// Run this job every hour
"0 40 * * * *".parse()?,
// Run this job every hour at minute 10
"0 10 * * * *".parse()?,
mas_storage::queue::CleanupFinishedCompatSessionsJob,
)
.add_schedule(
"cleanup-finished-oauth2-sessions",
// Run this job every hour
"0 42 * * * *".parse()?,
// Run this job every hour at minute 15
"0 15 * * * *".parse()?,
mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
)
.add_schedule(
"cleanup-finished-user-sessions",
// Run this job every hour
"0 44 * * * *".parse()?,
// Run this job every hour at minute 20
"0 20 * * * *".parse()?,
mas_storage::queue::CleanupFinishedUserSessionsJob,
)
.add_schedule(
"cleanup-inactive-oauth2-session-ips",
// Run this job every hour at minute 25
"0 25 * * * *".parse()?,
mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
)
.add_schedule(
"cleanup-inactive-compat-session-ips",
// Run this job every hour at minute 25
"0 25 * * * *".parse()?,
mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
)
.add_schedule(
"cleanup-inactive-user-session-ips",
// Run this job every hour at minute 25
"0 25 * * * *".parse()?,
mas_storage::queue::CleanupInactiveUserSessionIpsJob,
)
.add_schedule(
"cleanup-oauth-authorization-grants",
// Run this job every hour
"0 50 * * * *".parse()?,
// Run this job every hour at minute 30
"0 30 * * * *".parse()?,
mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
)
.add_schedule(
"cleanup-oauth-device-code-grants",
// Run this job every hour
"0 55 * * * *".parse()?,
// Run this job every hour at minute 35
"0 35 * * * *".parse()?,
mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob,
)
.add_schedule(
"cleanup-user-recovery-sessions",
// Run this job every hour
"0 56 * * * *".parse()?,
mas_storage::queue::CleanupUserRecoverySessionsJob,
)
.add_schedule(
"cleanup-user-email-authentications",
// Run this job every hour
"0 57 * * * *".parse()?,
mas_storage::queue::CleanupUserEmailAuthenticationsJob,
)
.add_schedule(
"cleanup-upstream-oauth-sessions",
// Run this job every hour
"0 58 * * * *".parse()?,
// Run this job every hour at minute 40 (independent, safe to parallelize)
"0 40 * * * *".parse()?,
mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
)
.add_schedule(
"cleanup-upstream-oauth-links",
// Run this job every hour
"0 59 * * * *".parse()?,
// Run this job every hour at minute 40
"0 40 * * * *".parse()?,
mas_storage::queue::CleanupUpstreamOAuthLinksJob,
)
// User cleanup jobs (minutes 45, 50)
.add_schedule(
"cleanup-user-registrations",
// Run this job every hour at minute 45
"0 45 * * * *".parse()?,
mas_storage::queue::CleanupUserRegistrationsJob,
)
.add_schedule(
"cleanup-user-recovery-sessions",
// Run this job every hour at minute 50
"0 50 * * * *".parse()?,
mas_storage::queue::CleanupUserRecoverySessionsJob,
)
.add_schedule(
"cleanup-user-email-authentications",
// Run this job every hour at minute 50
"0 50 * * * *".parse()?,
mas_storage::queue::CleanupUserEmailAuthenticationsJob,
)
.add_schedule(
"cleanup-queue-jobs",
// Run this job every hour
"0 45 * * * *".parse()?,
// Run this job every hour at minute 55
"0 55 * * * *".parse()?,
mas_storage::queue::CleanupQueueJobsJob,
)
.add_schedule(
"cleanup-expired-oauth-access-tokens",
// Run this job every 4 hours
// Run this job every 4 hours at minute 5
"0 5 */4 * * *".parse()?,
mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
)
.add_schedule(
"expire-inactive-sessions",
// Run this job every 15 minutes
// Run this job every 15 minutes at second 30
"30 */15 * * * *".parse()?,
mas_storage::queue::ExpireInactiveSessionsJob,
)
.add_schedule(
"prune-stale-policy-data",
// Run once a day
// Run once a day at 2:00 AM
"0 0 2 * * *".parse()?,
mas_storage::queue::PruneStalePolicyDataJob,
)
.add_schedule(
"cleanup-inactive-oauth2-session-ips",
// Run this job every hour
"0 46 * * * *".parse()?,
mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
)
.add_schedule(
"cleanup-inactive-compat-session-ips",
// Run this job every hour
"0 47 * * * *".parse()?,
mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
)
.add_schedule(
"cleanup-inactive-user-session-ips",
// Run this job every hour
"0 48 * * * *".parse()?,
mas_storage::queue::CleanupInactiveUserSessionIpsJob,
);
Ok(worker)