Cleanup old user registrations from the database

This commit is contained in:
Quentin Gliech
2026-01-14 13:23:28 +01:00
parent d4d4cd7cd1
commit 3fa53d285e
6 changed files with 168 additions and 1 deletions

View File

@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH to_delete AS (\n SELECT user_registration_id\n FROM user_registrations\n WHERE ($1::uuid IS NULL OR user_registration_id > $1)\n AND user_registration_id <= $2\n ORDER BY user_registration_id\n LIMIT $3\n )\n DELETE FROM user_registrations\n USING to_delete\n WHERE user_registrations.user_registration_id = to_delete.user_registration_id\n RETURNING user_registrations.user_registration_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "user_registration_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Int8"
]
},
"nullable": [
false
]
},
"hash": "a50eb326c3522f971f6ee7e13dff61efbeb1ec24e2c694e1673347bae993762d"
}

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
@@ -469,6 +470,53 @@ impl UserRegistrationRepository for PgUserRegistrationRepository<'_> {
Ok(user_registration)
}
#[tracing::instrument(
name = "db.user_registration.cleanup",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error> {
// `MAX(uuid)` isn't a thing in Postgres, so we can't just re-select the
// deleted rows and do a MAX on the `user_registration_id`.
// Instead, we do the aggregation on the client side, which is a little
// less efficient, but good enough.
let res = sqlx::query_scalar!(
r#"
WITH to_delete AS (
SELECT user_registration_id
FROM user_registrations
WHERE ($1::uuid IS NULL OR user_registration_id > $1)
AND user_registration_id <= $2
ORDER BY user_registration_id
LIMIT $3
)
DELETE FROM user_registrations
USING to_delete
WHERE user_registrations.user_registration_id = to_delete.user_registration_id
RETURNING user_registrations.user_registration_id
"#,
since.map(Uuid::from),
Uuid::from(until),
i64::try_from(limit).unwrap_or(i64::MAX)
)
.traced()
.fetch_all(&mut *self.conn)
.await?;
let count = res.len();
let max_id = res.into_iter().max();
Ok((count, max_id.map(Ulid::from)))
}
}
#[cfg(test)]

View File

@@ -350,6 +350,14 @@ impl InsertableJob for CleanupConsumedOAuthRefreshTokensJob {
const QUEUE_NAME: &'static str = "cleanup-consumed-oauth-refresh-tokens";
}
/// Cleanup old user registrations
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupUserRegistrationsJob;
impl InsertableJob for CleanupUserRegistrationsJob {
const QUEUE_NAME: &'static str = "cleanup-user-registrations";
}
/// Scheduled job to expire inactive sessions
///
/// This job will trigger jobs to expire inactive compat, oauth and user

View File

@@ -1,3 +1,4 @@
// Copyright 2025, 2026 Element Creations Ltd.
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
@@ -199,6 +200,27 @@ pub trait UserRegistrationRepository: Send + Sync {
clock: &dyn Clock,
user_registration: UserRegistration,
) -> Result<UserRegistration, Self::Error>;
/// Cleanup [`UserRegistration`]s between the given IDs.
///
/// Returns the number of registrations deleted, as well as the ID of the
/// last registration deleted.
///
/// # Parameters
///
/// * `since`: An optional ID to start from
/// * `until`: The ID until which to clean up registrations
/// * `limit`: The maximum number of registrations to clean up
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
}
repository_impl!(UserRegistrationRepository:
@@ -248,4 +270,10 @@ repository_impl!(UserRegistrationRepository:
clock: &dyn Clock,
user_registration: UserRegistration,
) -> Result<UserRegistration, Self::Error>;
async fn cleanup(
&mut self,
since: Option<Ulid>,
until: Ulid,
limit: usize,
) -> Result<(usize, Option<Ulid>), Self::Error>;
);

View File

@@ -13,9 +13,10 @@ use async_trait::async_trait;
use mas_storage::queue::{
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
PruneStalePolicyDataJob,
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
};
use tracing::{debug, info};
use ulid::Ulid;
use crate::{
State,
@@ -214,6 +215,57 @@ impl RunnableJob for CleanupConsumedOAuthRefreshTokensJob {
}
}
#[async_trait]
impl RunnableJob for CleanupUserRegistrationsJob {
#[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Remove user registrations after 24h. They are in practice only valid for 1h
let until = state.clock.now() - chrono::Duration::hours(24);
// We use the fact that ULIDs include the creation time in their first 48 bits
// as a cursor
let until = Ulid::from_parts(
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
u128::MAX,
);
let mut total = 0;
// Run until we get cancelled. We don't schedule a retry if we get cancelled, as
// this is a scheduled job and it will end up being rescheduled later anyway.
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
// This returns the number of deleted registrations, and the greatest ULID
// processed
let (count, cursor) = repo
.user_registration()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = cursor;
total += count;
// Check how many we deleted. If we deleted exactly BATCH_SIZE,
// there might be more to delete
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no user registrations to clean up");
} else {
info!(count = total, "cleaned up user registrations");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(60))
}
}
#[async_trait]
impl RunnableJob for PruneStalePolicyDataJob {
#[tracing::instrument(name = "job.prune_stale_policy_data", skip_all)]

View File

@@ -133,6 +133,7 @@ pub async fn init(
.register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
.register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
.register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
.register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
.register_handler::<mas_storage::queue::DeactivateUserJob>()
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
@@ -166,6 +167,12 @@ pub async fn init(
"0 20 * * * *".parse()?,
mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
)
.add_schedule(
"cleanup-user-registrations",
// Run this job every hour
"0 30 * * * *".parse()?,
mas_storage::queue::CleanupUserRegistrationsJob,
)
.add_schedule(
"cleanup-expired-oauth-access-tokens",
// Run this job every 4 hours