Retry failed jobs

This commit is contained in:
Quentin Gliech
2024-11-20 18:36:45 +01:00
parent ae5374d458
commit 7113e0ddf6
8 changed files with 301 additions and 28 deletions

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_jobs\n SET next_attempt_id = $1\n WHERE queue_job_id = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Uuid"
]
},
"nullable": []
},
"hash": "07cd2da428f0984513b4ce58e526c35c9c236ea8beb6696e5740fa45655e59f3"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1\n FROM queue_jobs\n WHERE queue_job_id = $3\n AND status = 'failed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n -- We first grab a few jobs that are available,\n -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently\n -- and we don't get multiple workers grabbing the same jobs\n WITH locked_jobs AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE\n status = 'available'\n AND queue_name = ANY($1)\n ORDER BY queue_job_id ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n )\n -- then we update the status of those jobs to 'running', returning the job details\n UPDATE queue_jobs\n SET status = 'running', started_at = $3, started_by = $4\n FROM locked_jobs\n WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id\n RETURNING\n queue_jobs.queue_job_id,\n queue_jobs.queue_name,\n queue_jobs.payload,\n queue_jobs.metadata\n ",
"query": "\n -- We first grab a few jobs that are available,\n -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently\n -- and we don't get multiple workers grabbing the same jobs\n WITH locked_jobs AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE\n status = 'available'\n AND queue_name = ANY($1)\n ORDER BY queue_job_id ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n )\n -- then we update the status of those jobs to 'running', returning the job details\n UPDATE queue_jobs\n SET status = 'running', started_at = $3, started_by = $4\n FROM locked_jobs\n WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id\n RETURNING\n queue_jobs.queue_job_id,\n queue_jobs.queue_name,\n queue_jobs.payload,\n queue_jobs.metadata,\n queue_jobs.attempt\n ",
"describe": {
"columns": [
{
@@ -22,6 +22,11 @@
"ordinal": 3,
"name": "metadata",
"type_info": "Jsonb"
},
{
"ordinal": 4,
"name": "attempt",
"type_info": "Int4"
}
],
"parameters": {
@@ -36,8 +41,9 @@
false,
false,
false,
false,
false
]
},
"hash": "9f2fae84d17991a179f93c4ea43b411aa9f15e7beccfd6212787c3452d35d061"
"hash": "707d78340069627aba9f18bbe5ac1388d6723f82549d88d704d9c939b9d35c49"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_jobs\n SET\n status = 'failed',\n failed_at = $1,\n failed_reason = $2\n WHERE\n queue_job_id = $3\n AND status = 'running'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "f50b7fb5a2c09e7b7e89e2addb0ca42c790c101a3fc9442862b5885d5116325a"
}

View File

@@ -0,0 +1,17 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- Add a new status for failed jobs
ALTER TYPE "queue_job_status" ADD VALUE 'failed';
ALTER TABLE "queue_jobs"
-- When the job failed
ADD COLUMN "failed_at" TIMESTAMP WITH TIME ZONE,
-- Error message of the failure
ADD COLUMN "failed_reason" TEXT,
-- How many times we've already tried to run the job
ADD COLUMN "attempt" INTEGER NOT NULL DEFAULT 0,
-- The next attempt, if it was retried
ADD COLUMN "next_attempt_id" UUID REFERENCES "queue_jobs" ("queue_job_id");

View File

@@ -37,6 +37,7 @@ struct JobReservationResult {
queue_name: String,
payload: serde_json::Value,
metadata: serde_json::Value,
attempt: i32,
}
impl TryFrom<JobReservationResult> for Job {
@@ -54,11 +55,19 @@ impl TryFrom<JobReservationResult> for Job {
.source(e)
})?;
let attempt = value.attempt.try_into().map_err(|e| {
DatabaseInconsistencyError::on("queue_jobs")
.column("attempt")
.row(id)
.source(e)
})?;
Ok(Self {
id,
queue_name,
payload,
metadata,
attempt,
})
}
}
@@ -152,7 +161,8 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
queue_jobs.queue_job_id,
queue_jobs.queue_name,
queue_jobs.payload,
queue_jobs.metadata
queue_jobs.metadata,
queue_jobs.attempt
"#,
&queues,
max_count,
@@ -199,4 +209,103 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
Ok(())
}
#[tracing::instrument(
name = "db.queue_job.mark_as_failed",
skip_all,
fields(
db.query.text,
job.id = %id,
),
err
)]
async fn mark_as_failed(
&mut self,
clock: &dyn Clock,
id: Ulid,
reason: &str,
) -> Result<(), Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET
status = 'failed',
failed_at = $1,
failed_reason = $2
WHERE
queue_job_id = $3
AND status = 'running'
"#,
now,
reason,
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(())
}
#[tracing::instrument(
name = "db.queue_job.retry",
skip_all,
fields(
db.query.text,
job.id = %id,
),
err
)]
async fn retry(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
) -> Result<(), Self::Error> {
let now = clock.now();
let new_id = Ulid::from_datetime_with_source(now.into(), rng);
// Create a new job with the same payload and metadata, but a new ID and
// increment the attempt
// We make sure we do this only for 'failed' jobs
let res = sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at, attempt)
SELECT $1, queue_name, payload, metadata, $2, attempt + 1
FROM queue_jobs
WHERE queue_job_id = $3
AND status = 'failed'
"#,
Uuid::from(new_id),
now,
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
// Update the old job to point to the new attempt
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET next_attempt_id = $1
WHERE queue_job_id = $2
"#,
Uuid::from(new_id),
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(())
}
}

View File

@@ -28,6 +28,9 @@ pub struct Job {
/// Arbitrary metadata about the job
pub metadata: JobMetadata,
/// Which attempt it is
pub attempt: usize,
}
/// Metadata stored alongside the job
@@ -127,12 +130,48 @@ pub trait QueueJobRepository: Send + Sync {
/// # Parameters
///
/// * `clock` - The clock used to generate timestamps
/// * `job` - The job to mark as completed
/// * `id` - The ID of the job to mark as completed
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
/// Marks a job as failed.
///
/// # Parameters
///
/// * `clock` - The clock used to generate timestamps
/// * `id` - The ID of the job to mark as failed
/// * `reason` - The reason for the failure
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn mark_as_failed(
&mut self,
clock: &dyn Clock,
id: Ulid,
reason: &str,
) -> Result<(), Self::Error>;
/// Retry a job.
///
/// # Parameters
///
/// * `rng` - The random number generator used to generate a new job ID
/// * `clock` - The clock used to generate timestamps
/// * `id` - The ID of the job to reschedule
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn retry(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
) -> Result<(), Self::Error>;
}
repository_impl!(QueueJobRepository:
@@ -154,6 +193,19 @@ repository_impl!(QueueJobRepository:
) -> Result<Vec<Job>, Self::Error>;
async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
async fn mark_as_failed(&mut self,
clock: &dyn Clock,
id: Ulid,
reason: &str,
) -> Result<(), Self::Error>;
async fn retry(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
) -> Result<(), Self::Error>;
);
/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue

View File

@@ -35,6 +35,7 @@ pub struct JobContext {
pub id: Ulid,
pub metadata: JobMetadata,
pub queue_name: String,
pub attempt: usize,
pub cancellation_token: CancellationToken,
}
@@ -156,6 +157,9 @@ const MAX_CONCURRENT_JOBS: usize = 10;
// How many jobs can we fetch at once
const MAX_JOBS_TO_FETCH: usize = 5;
// How many attempts a job should be retried
const MAX_ATTEMPTS: usize = 5;
type JobFactory = Arc<dyn Fn(JobPayload) -> Box<dyn RunnableJob> + Send + Sync>;
pub struct QueueWorker {
@@ -280,6 +284,8 @@ impl QueueWorker {
async fn shutdown(&mut self) -> Result<(), QueueRunnerError> {
tracing::info!("Shutting down worker");
// TODO: collect running jobs
// Start a transaction on the existing PgListener connection
let txn = self
.listener
@@ -397,7 +403,7 @@ impl QueueWorker {
while let Some(result) = self.last_join_result.take() {
// TODO: add metrics to track the job status and the time it took
let context = match result {
match result {
Ok((id, Ok(()))) => {
// The job succeeded
let context = self
@@ -408,10 +414,13 @@ impl QueueWorker {
tracing::info!(
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job completed"
);
context
repo.queue_job()
.mark_as_completed(&self.clock, context.id)
.await?;
}
Ok((id, Err(e))) => {
// The job failed
@@ -420,29 +429,48 @@ impl QueueWorker {
.remove(&id)
.expect("Job context not found");
let reason = format!("{:?}", e.error);
repo.queue_job()
.mark_as_failed(&self.clock, context.id, &reason)
.await?;
match e.decision {
JobErrorDecision::Fail => {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
"Job failed"
job.attempt = %context.attempt,
"Job failed, not retrying"
);
}
JobErrorDecision::Retry => {
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
"Job failed, will retry"
);
if context.attempt < MAX_ATTEMPTS {
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job failed, will retry"
);
// TODO: reschedule the job
// TODO: retry with an exponential backoff, once we know how to
// schedule jobs in the future
repo.queue_job()
.retry(&mut self.rng, &self.clock, context.id)
.await?;
} else {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job failed too many times, abandonning"
);
}
}
}
context
}
Err(e) => {
// The job crashed (or was cancelled)
@@ -452,23 +480,35 @@ impl QueueWorker {
.remove(&id)
.expect("Job context not found");
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
"Job crashed"
);
let reason = e.to_string();
repo.queue_job()
.mark_as_failed(&self.clock, context.id, &reason)
.await?;
// TODO: reschedule the job
if context.attempt < MAX_ATTEMPTS {
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job crashed, will retry"
);
context
repo.queue_job()
.retry(&mut self.rng, &self.clock, context.id)
.await?;
} else {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job crashed too many times, abandonning"
);
}
}
};
repo.queue_job()
.mark_as_completed(&self.clock, context.id)
.await?;
self.last_join_result = self.running_jobs.try_join_next_with_id();
}
@@ -492,6 +532,7 @@ impl QueueWorker {
queue_name,
payload,
metadata,
attempt,
} in jobs
{
let cancellation_token = self.cancellation_token.child_token();
@@ -500,6 +541,7 @@ impl QueueWorker {
id,
metadata,
queue_name,
attempt,
cancellation_token,
};