diff --git a/crates/storage-pg/.sqlx/query-07cd2da428f0984513b4ce58e526c35c9c236ea8beb6696e5740fa45655e59f3.json b/crates/storage-pg/.sqlx/query-07cd2da428f0984513b4ce58e526c35c9c236ea8beb6696e5740fa45655e59f3.json new file mode 100644 index 000000000..e5ffe95e2 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-07cd2da428f0984513b4ce58e526c35c9c236ea8beb6696e5740fa45655e59f3.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_jobs\n SET next_attempt_id = $1\n WHERE queue_job_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "07cd2da428f0984513b4ce58e526c35c9c236ea8beb6696e5740fa45655e59f3" +} diff --git a/crates/storage-pg/.sqlx/query-47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38.json b/crates/storage-pg/.sqlx/query-47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38.json new file mode 100644 index 000000000..2962db553 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1\n FROM queue_jobs\n WHERE queue_job_id = $3\n AND status = 'failed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38" +} diff --git a/crates/storage-pg/.sqlx/query-9f2fae84d17991a179f93c4ea43b411aa9f15e7beccfd6212787c3452d35d061.json b/crates/storage-pg/.sqlx/query-707d78340069627aba9f18bbe5ac1388d6723f82549d88d704d9c939b9d35c49.json similarity index 87% rename from crates/storage-pg/.sqlx/query-9f2fae84d17991a179f93c4ea43b411aa9f15e7beccfd6212787c3452d35d061.json rename to crates/storage-pg/.sqlx/query-707d78340069627aba9f18bbe5ac1388d6723f82549d88d704d9c939b9d35c49.json index 67f1ad132..88eb81f9f 100644 --- a/crates/storage-pg/.sqlx/query-9f2fae84d17991a179f93c4ea43b411aa9f15e7beccfd6212787c3452d35d061.json +++ b/crates/storage-pg/.sqlx/query-707d78340069627aba9f18bbe5ac1388d6723f82549d88d704d9c939b9d35c49.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n -- We first grab a few jobs that are available,\n -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently\n -- and we don't get multiple workers grabbing the same jobs\n WITH locked_jobs AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE\n status = 'available'\n AND queue_name = ANY($1)\n ORDER BY queue_job_id ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n )\n -- then we update the status of those jobs to 'running', returning the job details\n UPDATE queue_jobs\n SET status = 'running', started_at = $3, started_by = $4\n FROM locked_jobs\n WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id\n RETURNING\n queue_jobs.queue_job_id,\n queue_jobs.queue_name,\n queue_jobs.payload,\n queue_jobs.metadata\n ", + "query": "\n -- We first grab a few jobs that are available,\n -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently\n -- and we don't get multiple workers grabbing the same jobs\n WITH locked_jobs AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE\n status = 'available'\n AND queue_name = ANY($1)\n ORDER BY queue_job_id ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n )\n -- then we update the status of those jobs to 'running', returning the job details\n UPDATE queue_jobs\n SET status = 'running', started_at = $3, started_by = $4\n FROM locked_jobs\n WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id\n RETURNING\n queue_jobs.queue_job_id,\n queue_jobs.queue_name,\n queue_jobs.payload,\n queue_jobs.metadata,\n queue_jobs.attempt\n ", "describe": { "columns": [ { @@ -22,6 +22,11 @@ "ordinal": 3, "name": "metadata", "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "attempt", + "type_info": "Int4" } ], "parameters": { @@ -36,8 +41,9 @@ false, false, false, + false, false ] }, - "hash": "9f2fae84d17991a179f93c4ea43b411aa9f15e7beccfd6212787c3452d35d061" + "hash": "707d78340069627aba9f18bbe5ac1388d6723f82549d88d704d9c939b9d35c49" } diff --git a/crates/storage-pg/.sqlx/query-f50b7fb5a2c09e7b7e89e2addb0ca42c790c101a3fc9442862b5885d5116325a.json b/crates/storage-pg/.sqlx/query-f50b7fb5a2c09e7b7e89e2addb0ca42c790c101a3fc9442862b5885d5116325a.json new file mode 100644 index 000000000..df75b11b1 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-f50b7fb5a2c09e7b7e89e2addb0ca42c790c101a3fc9442862b5885d5116325a.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_jobs\n SET\n status = 'failed',\n failed_at = $1,\n failed_reason = $2\n WHERE\n queue_job_id = $3\n AND status = 'running'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Text", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "f50b7fb5a2c09e7b7e89e2addb0ca42c790c101a3fc9442862b5885d5116325a" +} diff --git a/crates/storage-pg/migrations/20241120163320_queue_job_failures.sql b/crates/storage-pg/migrations/20241120163320_queue_job_failures.sql new file mode 100644 index 000000000..0407d6342 --- /dev/null +++ b/crates/storage-pg/migrations/20241120163320_queue_job_failures.sql @@ -0,0 +1,17 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Add a new status for failed jobs +ALTER TYPE "queue_job_status" ADD VALUE 'failed'; + +ALTER TABLE "queue_jobs" + -- When the job failed + ADD COLUMN "failed_at" TIMESTAMP WITH TIME ZONE, + -- Error message of the failure + ADD COLUMN "failed_reason" TEXT, + -- How many times we've already tried to run the job + ADD COLUMN "attempt" INTEGER NOT NULL DEFAULT 0, + -- The next attempt, if it was retried + ADD COLUMN "next_attempt_id" UUID REFERENCES "queue_jobs" ("queue_job_id"); diff --git a/crates/storage-pg/src/queue/job.rs b/crates/storage-pg/src/queue/job.rs index 02ceed793..769a9eb49 100644 --- a/crates/storage-pg/src/queue/job.rs +++ b/crates/storage-pg/src/queue/job.rs @@ -37,6 +37,7 @@ struct JobReservationResult { queue_name: String, payload: serde_json::Value, metadata: serde_json::Value, + attempt: i32, } impl TryFrom for Job { @@ -54,11 +55,19 @@ impl TryFrom for Job { .source(e) })?; + let attempt = value.attempt.try_into().map_err(|e| { + DatabaseInconsistencyError::on("queue_jobs") + .column("attempt") + .row(id) + .source(e) + })?; + Ok(Self { id, queue_name, payload, metadata, + attempt, }) } } @@ -152,7 +161,8 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { queue_jobs.queue_job_id, queue_jobs.queue_name, queue_jobs.payload, - queue_jobs.metadata + queue_jobs.metadata, + queue_jobs.attempt "#, &queues, max_count, @@ -199,4 +209,103 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { Ok(()) } + + #[tracing::instrument( + name = "db.queue_job.mark_as_failed", + skip_all, + fields( + db.query.text, + job.id = %id, + ), + err + )] + async fn mark_as_failed( + &mut self, + clock: &dyn Clock, + id: Ulid, + reason: &str, + ) -> Result<(), Self::Error> { + let now = clock.now(); + let res = sqlx::query!( + r#" + UPDATE queue_jobs + SET + status = 'failed', + failed_at = $1, + failed_reason = $2 + WHERE + queue_job_id = $3 + AND status = 'running' + "#, + now, + reason, + Uuid::from(id), + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_job.retry", + skip_all, + fields( + db.query.text, + job.id = %id, + ), + err + )] + async fn retry( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + id: Ulid, + ) -> Result<(), Self::Error> { + let now = clock.now(); + let new_id = Ulid::from_datetime_with_source(now.into(), rng); + + // Create a new job with the same payload and metadata, but a new ID and + // increment the attempt + // We make sure we do this only for 'failed' jobs + let res = sqlx::query!( + r#" + INSERT INTO queue_jobs + (queue_job_id, queue_name, payload, metadata, created_at, attempt) + SELECT $1, queue_name, payload, metadata, $2, attempt + 1 + FROM queue_jobs + WHERE queue_job_id = $3 + AND status = 'failed' + "#, + Uuid::from(new_id), + now, + Uuid::from(id), + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + // Update the old job to point to the new attempt + let res = sqlx::query!( + r#" + UPDATE queue_jobs + SET next_attempt_id = $1 + WHERE queue_job_id = $2 + "#, + Uuid::from(new_id), + Uuid::from(id), + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(()) + } } diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs index 13df586d7..9a24fa649 100644 --- a/crates/storage/src/queue/job.rs +++ b/crates/storage/src/queue/job.rs @@ -28,6 +28,9 @@ pub struct Job { /// Arbitrary metadata about the job pub metadata: JobMetadata, + + /// Which attempt it is + pub attempt: usize, } /// Metadata stored alongside the job @@ -127,12 +130,48 @@ pub trait QueueJobRepository: Send + Sync { /// # Parameters /// /// * `clock` - The clock used to generate timestamps - /// * `job` - The job to mark as completed + /// * `id` - The ID of the job to mark as completed /// /// # Errors /// /// Returns an error if the underlying repository fails. async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>; + + /// Marks a job as failed. + /// + /// # Parameters + /// + /// * `clock` - The clock used to generate timestamps + /// * `id` - The ID of the job to mark as failed + /// * `reason` - The reason for the failure + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn mark_as_failed( + &mut self, + clock: &dyn Clock, + id: Ulid, + reason: &str, + ) -> Result<(), Self::Error>; + + /// Retry a job. + /// + /// # Parameters + /// + /// * `rng` - The random number generator used to generate a new job ID + /// * `clock` - The clock used to generate timestamps + /// * `id` - The ID of the job to reschedule + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn retry( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + id: Ulid, + ) -> Result<(), Self::Error>; } repository_impl!(QueueJobRepository: @@ -154,6 +193,19 @@ repository_impl!(QueueJobRepository: ) -> Result, Self::Error>; async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>; + + async fn mark_as_failed(&mut self, + clock: &dyn Clock, + id: Ulid, + reason: &str, + ) -> Result<(), Self::Error>; + + async fn retry( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + id: Ulid, + ) -> Result<(), Self::Error>; ); /// Extension trait for [`QueueJobRepository`] to help adding a job to the queue diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index ba707cff8..143b83ece 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -35,6 +35,7 @@ pub struct JobContext { pub id: Ulid, pub metadata: JobMetadata, pub queue_name: String, + pub attempt: usize, pub cancellation_token: CancellationToken, } @@ -156,6 +157,9 @@ const MAX_CONCURRENT_JOBS: usize = 10; // How many jobs can we fetch at once const MAX_JOBS_TO_FETCH: usize = 5; +// How many attempts a job should be retried +const MAX_ATTEMPTS: usize = 5; + type JobFactory = Arc Box + Send + Sync>; pub struct QueueWorker { @@ -280,6 +284,8 @@ impl QueueWorker { async fn shutdown(&mut self) -> Result<(), QueueRunnerError> { tracing::info!("Shutting down worker"); + // TODO: collect running jobs + // Start a transaction on the existing PgListener connection let txn = self .listener @@ -397,7 +403,7 @@ impl QueueWorker { while let Some(result) = self.last_join_result.take() { // TODO: add metrics to track the job status and the time it took - let context = match result { + match result { Ok((id, Ok(()))) => { // The job succeeded let context = self @@ -408,10 +414,13 @@ impl QueueWorker { tracing::info!( job.id = %context.id, job.queue_name = %context.queue_name, + job.attempt = %context.attempt, "Job completed" ); - context + repo.queue_job() + .mark_as_completed(&self.clock, context.id) + .await?; } Ok((id, Err(e))) => { // The job failed @@ -420,29 +429,48 @@ impl QueueWorker { .remove(&id) .expect("Job context not found"); + let reason = format!("{:?}", e.error); + repo.queue_job() + .mark_as_failed(&self.clock, context.id, &reason) + .await?; + match e.decision { JobErrorDecision::Fail => { tracing::error!( error = &e as &dyn std::error::Error, job.id = %context.id, job.queue_name = %context.queue_name, - "Job failed" + job.attempt = %context.attempt, + "Job failed, not retrying" ); } JobErrorDecision::Retry => { - tracing::warn!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue_name = %context.queue_name, - "Job failed, will retry" - ); + if context.attempt < MAX_ATTEMPTS { + tracing::warn!( + error = &e as &dyn std::error::Error, + job.id = %context.id, + job.queue_name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed, will retry" + ); - // TODO: reschedule the job + // TODO: retry with an exponential backoff, once we know how to + // schedule jobs in the future + repo.queue_job() + .retry(&mut self.rng, &self.clock, context.id) + .await?; + } else { + tracing::error!( + error = &e as &dyn std::error::Error, + job.id = %context.id, + job.queue_name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed too many times, abandonning" + ); + } } } - - context } Err(e) => { // The job crashed (or was cancelled) @@ -452,23 +480,35 @@ impl QueueWorker { .remove(&id) .expect("Job context not found"); - tracing::error!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue_name = %context.queue_name, - "Job crashed" - ); + let reason = e.to_string(); + repo.queue_job() + .mark_as_failed(&self.clock, context.id, &reason) + .await?; - // TODO: reschedule the job + if context.attempt < MAX_ATTEMPTS { + tracing::warn!( + error = &e as &dyn std::error::Error, + job.id = %context.id, + job.queue_name = %context.queue_name, + job.attempt = %context.attempt, + "Job crashed, will retry" + ); - context + repo.queue_job() + .retry(&mut self.rng, &self.clock, context.id) + .await?; + } else { + tracing::error!( + error = &e as &dyn std::error::Error, + job.id = %context.id, + job.queue_name = %context.queue_name, + job.attempt = %context.attempt, + "Job crashed too many times, abandonning" + ); + } } }; - repo.queue_job() - .mark_as_completed(&self.clock, context.id) - .await?; - self.last_join_result = self.running_jobs.try_join_next_with_id(); } @@ -492,6 +532,7 @@ impl QueueWorker { queue_name, payload, metadata, + attempt, } in jobs { let cancellation_token = self.cancellation_token.child_token(); @@ -500,6 +541,7 @@ impl QueueWorker { id, metadata, queue_name, + attempt, cancellation_token, };