diff --git a/crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json b/crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json new file mode 100644 index 000000000..b712d9e63 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_delete AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE (status = 'completed' OR status = 'failed')\n AND ($1::uuid IS NULL OR queue_job_id > $1)\n AND queue_job_id <= $2\n ORDER BY queue_job_id\n LIMIT $3\n )\n DELETE FROM queue_jobs\n USING to_delete\n WHERE queue_jobs.queue_job_id = to_delete.queue_job_id\n RETURNING queue_jobs.queue_job_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "queue_job_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7" +} diff --git a/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql b/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql new file mode 100644 index 000000000..c94934500 --- /dev/null +++ b/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql @@ -0,0 +1,14 @@ +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Change the FK constraint on next_attempt_id to SET NULL on delete +-- This allows us to clean up old completed/failed jobs without breaking retry chains +ALTER TABLE queue_jobs + DROP CONSTRAINT queue_jobs_next_attempt_id_fkey, + ADD CONSTRAINT queue_jobs_next_attempt_id_fkey + FOREIGN KEY (next_attempt_id) + REFERENCES queue_jobs (queue_job_id) + ON DELETE SET NULL + NOT VALID; diff --git a/crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql b/crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql new file mode 100644 index 000000000..b29424a4f --- /dev/null +++ b/crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Validate the FK constraint that was added in the previous migration +-- This is done in a separate migration to avoid holding locks for too long +ALTER TABLE queue_jobs + VALIDATE CONSTRAINT queue_jobs_next_attempt_id_fkey; diff --git a/crates/storage-pg/src/queue/job.rs b/crates/storage-pg/src/queue/job.rs index a7f9e5591..f3d1da697 100644 --- a/crates/storage-pg/src/queue/job.rs +++ b/crates/storage-pg/src/queue/job.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -444,4 +445,54 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { let count = res.rows_affected(); Ok(usize::try_from(count).unwrap_or(usize::MAX)) } + + #[tracing::instrument( + name = "db.queue_job.cleanup", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // Use ULID cursor-based pagination for completed and failed jobs. + // We delete both completed and failed jobs in the same batch. + // `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side. + let res = sqlx::query_scalar!( + r#" + WITH to_delete AS ( + SELECT queue_job_id + FROM queue_jobs + WHERE (status = 'completed' OR status = 'failed') + AND ($1::uuid IS NULL OR queue_job_id > $1) + AND queue_job_id <= $2 + ORDER BY queue_job_id + LIMIT $3 + ) + DELETE FROM queue_jobs + USING to_delete + WHERE queue_jobs.queue_job_id = to_delete.queue_job_id + RETURNING queue_jobs.queue_job_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs index 1fd21f442..a436e0669 100644 --- a/crates/storage/src/queue/job.rs +++ b/crates/storage/src/queue/job.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -213,6 +214,30 @@ pub trait QueueJobRepository: Send + Sync { /// /// Returns an error if the underlying repository fails. async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result; + + /// Cleanup old completed and failed jobs + /// + /// This will delete jobs with status 'completed' or 'failed' and IDs up to + /// and including `until`. Uses ULID cursor-based pagination for efficiency. + /// + /// Returns the number of jobs deleted and the cursor for the next batch + /// + /// # Parameters + /// + /// * `since`: The cursor to start from (exclusive), or `None` to start from + /// the beginning + /// * `until`: The maximum ULID to delete (inclusive upper bound) + /// * `limit`: The maximum number of jobs to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(QueueJobRepository: @@ -261,6 +286,13 @@ repository_impl!(QueueJobRepository: ) -> Result<(), Self::Error>; async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result; + + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); /// Extension trait for [`QueueJobRepository`] to help adding a job to the queue diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 500a7eced..7ea900a47 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -414,6 +414,14 @@ impl InsertableJob for CleanupUpstreamOAuthLinksJob { const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-links"; } +/// Cleanup old completed and failed queue jobs +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupQueueJobsJob; + +impl InsertableJob for CleanupQueueJobsJob { + const QUEUE_NAME: &'static str = "cleanup-queue-jobs"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index fe92e5a99..c3876631f 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -13,7 +13,7 @@ use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob, - CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob, + CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, @@ -416,6 +416,50 @@ impl RunnableJob for CleanupUpstreamOAuthLinksJob { } } +#[async_trait] +impl RunnableJob for CleanupQueueJobsJob { + #[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove completed and failed queue jobs after 30 days. + // Keep them for debugging purposes. + let until = state.clock.now() - chrono::Duration::days(30); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .queue_job() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no queue jobs to clean up"); + } else { + info!(count = total, "cleaned up queue jobs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for CleanupUserRegistrationsJob { #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 5278caed3..54dc8d713 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -141,6 +141,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -222,6 +223,12 @@ pub async fn init( "0 59 * * * *".parse()?, mas_storage::queue::CleanupUpstreamOAuthLinksJob, ) + .add_schedule( + "cleanup-queue-jobs", + // Run this job every hour + "0 45 * * * *".parse()?, + mas_storage::queue::CleanupQueueJobsJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours