Cleanup old completed jobs from the database (#5427)
This commit is contained in:
24
crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json
generated
Normal file
24
crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json
generated
Normal file
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_delete AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE (status = 'completed' OR status = 'failed')\n AND ($1::uuid IS NULL OR queue_job_id > $1)\n AND queue_job_id <= $2\n ORDER BY queue_job_id\n LIMIT $3\n )\n DELETE FROM queue_jobs\n USING to_delete\n WHERE queue_jobs.queue_job_id = to_delete.queue_job_id\n RETURNING queue_jobs.queue_job_id\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "queue_job_id",
|
||||
"type_info": "Uuid"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Uuid",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7"
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE files in the repository root for full details.
|
||||
|
||||
-- Change the FK constraint on next_attempt_id to SET NULL on delete
|
||||
-- This allows us to clean up old completed/failed jobs without breaking retry chains
|
||||
ALTER TABLE queue_jobs
|
||||
DROP CONSTRAINT queue_jobs_next_attempt_id_fkey,
|
||||
ADD CONSTRAINT queue_jobs_next_attempt_id_fkey
|
||||
FOREIGN KEY (next_attempt_id)
|
||||
REFERENCES queue_jobs (queue_job_id)
|
||||
ON DELETE SET NULL
|
||||
NOT VALID;
|
||||
@@ -0,0 +1,10 @@
|
||||
-- no-transaction
|
||||
-- Copyright 2026 Element Creations Ltd.
|
||||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
-- Please see LICENSE files in the repository root for full details.
|
||||
|
||||
-- Validate the FK constraint that was added in the previous migration
|
||||
-- This is done in a separate migration to avoid holding locks for too long
|
||||
ALTER TABLE queue_jobs
|
||||
VALIDATE CONSTRAINT queue_jobs_next_attempt_id_fkey;
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
@@ -444,4 +445,54 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
|
||||
let count = res.rows_affected();
|
||||
Ok(usize::try_from(count).unwrap_or(usize::MAX))
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.queue_job.cleanup",
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
until = %until,
|
||||
limit = limit,
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn cleanup(
|
||||
&mut self,
|
||||
since: Option<Ulid>,
|
||||
until: Ulid,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<Ulid>), Self::Error> {
|
||||
// Use ULID cursor-based pagination for completed and failed jobs.
|
||||
// We delete both completed and failed jobs in the same batch.
|
||||
// `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side.
|
||||
let res = sqlx::query_scalar!(
|
||||
r#"
|
||||
WITH to_delete AS (
|
||||
SELECT queue_job_id
|
||||
FROM queue_jobs
|
||||
WHERE (status = 'completed' OR status = 'failed')
|
||||
AND ($1::uuid IS NULL OR queue_job_id > $1)
|
||||
AND queue_job_id <= $2
|
||||
ORDER BY queue_job_id
|
||||
LIMIT $3
|
||||
)
|
||||
DELETE FROM queue_jobs
|
||||
USING to_delete
|
||||
WHERE queue_jobs.queue_job_id = to_delete.queue_job_id
|
||||
RETURNING queue_jobs.queue_job_id
|
||||
"#,
|
||||
since.map(Uuid::from),
|
||||
Uuid::from(until),
|
||||
i64::try_from(limit).unwrap_or(i64::MAX)
|
||||
)
|
||||
.traced()
|
||||
.fetch_all(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
let count = res.len();
|
||||
let max_id = res.into_iter().max();
|
||||
|
||||
Ok((count, max_id.map(Ulid::from)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// Copyright 2025, 2026 Element Creations Ltd.
|
||||
// Copyright 2024, 2025 New Vector Ltd.
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
@@ -213,6 +214,30 @@ pub trait QueueJobRepository: Send + Sync {
|
||||
///
|
||||
/// Returns an error if the underlying repository fails.
|
||||
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
|
||||
|
||||
/// Cleanup old completed and failed jobs
|
||||
///
|
||||
/// This will delete jobs with status 'completed' or 'failed' and IDs up to
|
||||
/// and including `until`. Uses ULID cursor-based pagination for efficiency.
|
||||
///
|
||||
/// Returns the number of jobs deleted and the cursor for the next batch
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: The cursor to start from (exclusive), or `None` to start from
|
||||
/// the beginning
|
||||
/// * `until`: The maximum ULID to delete (inclusive upper bound)
|
||||
/// * `limit`: The maximum number of jobs to delete in this batch
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup(
|
||||
&mut self,
|
||||
since: Option<Ulid>,
|
||||
until: Ulid,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<Ulid>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(QueueJobRepository:
|
||||
@@ -261,6 +286,13 @@ repository_impl!(QueueJobRepository:
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
|
||||
|
||||
async fn cleanup(
|
||||
&mut self,
|
||||
since: Option<Ulid>,
|
||||
until: Ulid,
|
||||
limit: usize,
|
||||
) -> Result<(usize, Option<Ulid>), Self::Error>;
|
||||
);
|
||||
|
||||
/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
|
||||
|
||||
@@ -414,6 +414,14 @@ impl InsertableJob for CleanupUpstreamOAuthLinksJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-links";
|
||||
}
|
||||
|
||||
/// Cleanup old completed and failed queue jobs
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub struct CleanupQueueJobsJob;
|
||||
|
||||
impl InsertableJob for CleanupQueueJobsJob {
|
||||
const QUEUE_NAME: &'static str = "cleanup-queue-jobs";
|
||||
}
|
||||
|
||||
/// Scheduled job to expire inactive sessions
|
||||
///
|
||||
/// This job will trigger jobs to expire inactive compat, oauth and user
|
||||
|
||||
@@ -13,7 +13,7 @@ use async_trait::async_trait;
|
||||
use mas_storage::queue::{
|
||||
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
||||
CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob,
|
||||
CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob,
|
||||
CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob,
|
||||
CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob,
|
||||
CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob,
|
||||
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||
@@ -416,6 +416,50 @@ impl RunnableJob for CleanupUpstreamOAuthLinksJob {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupQueueJobsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)]
|
||||
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||
// Remove completed and failed queue jobs after 30 days.
|
||||
// Keep them for debugging purposes.
|
||||
let until = state.clock.now() - chrono::Duration::days(30);
|
||||
let until = Ulid::from_parts(
|
||||
u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN),
|
||||
u128::MAX,
|
||||
);
|
||||
let mut total = 0;
|
||||
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
let (count, cursor) = repo
|
||||
.queue_job()
|
||||
.cleanup(since, until, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
since = cursor;
|
||||
total += count;
|
||||
|
||||
if count != BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
debug!("no queue jobs to clean up");
|
||||
} else {
|
||||
info!(count = total, "cleaned up queue jobs");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn timeout(&self) -> Option<Duration> {
|
||||
Some(Duration::from_secs(10 * 60))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RunnableJob for CleanupUserRegistrationsJob {
|
||||
#[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)]
|
||||
|
||||
@@ -141,6 +141,7 @@ pub async fn init(
|
||||
.register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupUpstreamOAuthSessionsJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupUpstreamOAuthLinksJob>()
|
||||
.register_handler::<mas_storage::queue::CleanupQueueJobsJob>()
|
||||
.register_handler::<mas_storage::queue::DeactivateUserJob>()
|
||||
.register_handler::<mas_storage::queue::DeleteDeviceJob>()
|
||||
.register_handler::<mas_storage::queue::ProvisionDeviceJob>()
|
||||
@@ -222,6 +223,12 @@ pub async fn init(
|
||||
"0 59 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupUpstreamOAuthLinksJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-queue-jobs",
|
||||
// Run this job every hour
|
||||
"0 45 * * * *".parse()?,
|
||||
mas_storage::queue::CleanupQueueJobsJob,
|
||||
)
|
||||
.add_schedule(
|
||||
"cleanup-expired-oauth-access-tokens",
|
||||
// Run this job every 4 hours
|
||||
|
||||
Reference in New Issue
Block a user