Allow scheduling jobs in the future

Also retries jobs with an exponential backoff.
This commit is contained in:
Quentin Gliech
2024-11-22 16:58:38 +01:00
parent 5ab826c778
commit a01201f954
9 changed files with 277 additions and 25 deletions

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'\n FROM queue_jobs\n WHERE queue_job_id = $4\n AND status = 'failed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_jobs\n SET status = 'available'\n WHERE\n status = 'scheduled'\n AND scheduled_at <= $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": []
},
"hash": "3c7960a2eb2edd71bc71177fc0fb2e83858c9944893b8f3a0f0131e8a9b7a494"
}

View File

@@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1\n FROM queue_jobs\n WHERE queue_job_id = $3\n AND status = 'failed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38"
}

View File

@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)\n VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Jsonb",
"Jsonb",
"Timestamptz",
"Timestamptz"
]
},
"nullable": []
},
"hash": "d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943"
}

View File

@@ -0,0 +1,11 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- Add a new status for scheduled jobs
ALTER TYPE "queue_job_status" ADD VALUE 'scheduled';
ALTER TABLE "queue_jobs"
-- When the job is scheduled to run
ADD COLUMN "scheduled_at" TIMESTAMP WITH TIME ZONE;

View File

@@ -0,0 +1,9 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- Add a partial index on scheduled jobs
CREATE INDEX "queue_jobs_scheduled_at_idx"
ON "queue_jobs" ("scheduled_at")
WHERE "status" = 'scheduled';

View File

@@ -7,6 +7,7 @@
//! [`QueueJobRepository`].
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use mas_storage::{
queue::{Job, QueueJobRepository, Worker},
Clock,
@@ -117,6 +118,50 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
Ok(())
}
#[tracing::instrument(
name = "db.queue_job.schedule_later",
fields(
queue_job.id,
queue_job.queue_name = queue_name,
queue_job.scheduled_at = %scheduled_at,
db.query.text,
),
skip_all,
err,
)]
async fn schedule_later(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record("queue_job.id", tracing::field::display(id));
sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)
VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')
"#,
Uuid::from(id),
queue_name,
payload,
metadata,
created_at,
scheduled_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
Ok(())
}
#[tracing::instrument(
name = "db.queue_job.reserve",
skip_all,
@@ -264,8 +309,10 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
delay: Duration,
) -> Result<(), Self::Error> {
let now = clock.now();
let scheduled_at = now + delay;
let new_id = Ulid::from_datetime_with_source(now.into(), rng);
// Create a new job with the same payload and metadata, but a new ID and
@@ -274,14 +321,15 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
let res = sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at, attempt)
SELECT $1, queue_name, payload, metadata, $2, attempt + 1
(queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)
SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'
FROM queue_jobs
WHERE queue_job_id = $3
WHERE queue_job_id = $4
AND status = 'failed'
"#,
Uuid::from(new_id),
now,
scheduled_at,
Uuid::from(id),
)
.traced()
@@ -308,4 +356,32 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
Ok(())
}
#[tracing::instrument(
name = "db.queue_job.schedule_available_jobs",
skip_all,
fields(
db.query.text,
),
err
)]
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET status = 'available'
WHERE
status = 'scheduled'
AND scheduled_at <= $1
"#,
now,
)
.traced()
.execute(&mut *self.conn)
.await?;
let count = res.rows_affected();
Ok(usize::try_from(count).unwrap_or(usize::MAX))
}
}

View File

@@ -6,6 +6,7 @@
//! Repository to interact with jobs in the job queue
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use opentelemetry::trace::TraceContextExt;
use rand_core::RngCore;
use serde::{Deserialize, Serialize};
@@ -105,6 +106,30 @@ pub trait QueueJobRepository: Send + Sync {
metadata: serde_json::Value,
) -> Result<(), Self::Error>;
/// Schedule a job to be executed at a later date by a worker.
///
/// # Parameters
///
/// * `rng` - The random number generator used to generate a new job ID
/// * `clock` - The clock used to generate timestamps
/// * `queue_name` - The name of the queue to schedule the job on
/// * `payload` - The payload of the job
/// * `metadata` - Arbitrary metadata about the job scheduled immediately.
/// * `scheduled_at` - The date and time to schedule the job for
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule_later(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error>;
/// Reserve multiple jobs from multiple queues
///
/// # Parameters
@@ -171,7 +196,18 @@ pub trait QueueJobRepository: Send + Sync {
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
delay: Duration,
) -> Result<(), Self::Error>;
/// Mark all scheduled jobs past their scheduled date as available to be
/// executed.
///
/// Returns the number of jobs that were marked as available.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
}
repository_impl!(QueueJobRepository:
@@ -184,6 +220,16 @@ repository_impl!(QueueJobRepository:
metadata: serde_json::Value,
) -> Result<(), Self::Error>;
async fn schedule_later(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error>;
async fn reserve(
&mut self,
clock: &dyn Clock,
@@ -205,7 +251,10 @@ repository_impl!(QueueJobRepository:
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
delay: Duration,
) -> Result<(), Self::Error>;
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
);
/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
@@ -230,6 +279,26 @@ pub trait QueueJobRepositoryExt: QueueJobRepository {
clock: &dyn Clock,
job: J,
) -> Result<(), Self::Error>;
/// Schedule a job to be executed at a later date by a worker.
///
/// # Parameters
///
/// * `rng` - The random number generator used to generate a new job ID
/// * `clock` - The clock used to generate timestamps
/// * `job` - The job to schedule
/// * `scheduled_at` - The date and time to schedule the job for
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule_job_later<J: InsertableJob>(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
job: J,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error>;
}
#[async_trait]
@@ -263,4 +332,32 @@ where
self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
.await
}
#[tracing::instrument(
name = "db.queue_job.schedule_job_later",
fields(
queue_job.queue_name = J::QUEUE_NAME,
),
skip_all,
)]
async fn schedule_job_later<J: InsertableJob>(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
job: J,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error> {
// Grab the span context from the current span
let span = tracing::Span::current();
let ctx = span.context();
let span = ctx.span();
let span_context = span.span_context();
let metadata = JobMetadata::new(span_context);
let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
let payload = serde_json::to_value(job).expect("Could not serialize job");
self.schedule_later(rng, clock, J::QUEUE_NAME, payload, metadata, scheduled_at)
.await
}
}

View File

@@ -160,6 +160,14 @@ const MAX_JOBS_TO_FETCH: usize = 5;
// How many attempts a job should be retried
const MAX_ATTEMPTS: usize = 5;
/// Returns the delay to wait before retrying a job
///
/// Uses an exponential backoff: 1s, 2s, 4s, 8s, 16s
fn retry_delay(attempt: usize) -> Duration {
let attempt = u32::try_from(attempt).unwrap_or(u32::MAX);
Duration::milliseconds(2_i64.saturating_pow(attempt) * 1000)
}
type JobResult = Result<(), JobError>;
type JobFactory = Arc<dyn Fn(JobPayload) -> Box<dyn RunnableJob> + Send + Sync>;
@@ -516,6 +524,17 @@ impl QueueWorker {
// TODO: mark tasks those workers had as lost
// Mark all the scheduled jobs as available
let scheduled = repo
.queue_job()
.schedule_available_jobs(&self.clock)
.await?;
match scheduled {
0 => {}
1 => tracing::warn!("One scheduled job marked as available"),
n => tracing::warn!("{n} scheduled jobs marked as available"),
}
// Release the leader lock
let txn = repo
.into_inner()
@@ -669,17 +688,19 @@ impl JobTracker {
JobErrorDecision::Retry => {
if context.attempt < MAX_ATTEMPTS {
let delay = retry_delay(context.attempt);
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job failed, will retry"
"Job failed, will retry in {}s",
delay.num_seconds()
);
// TODO: retry with an exponential backoff, once we know how to
// schedule jobs in the future
repo.queue_job().retry(&mut *rng, clock, context.id).await?;
repo.queue_job()
.retry(&mut *rng, clock, context.id, delay)
.await?;
} else {
tracing::error!(
error = &e as &dyn std::error::Error,
@@ -707,15 +728,19 @@ impl JobTracker {
.await?;
if context.attempt < MAX_ATTEMPTS {
let delay = retry_delay(context.attempt);
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.attempt = %context.attempt,
"Job crashed, will retry"
"Job crashed, will retry in {}s",
delay.num_seconds()
);
repo.queue_job().retry(&mut *rng, clock, context.id).await?;
repo.queue_job()
.retry(&mut *rng, clock, context.id, delay)
.await?;
} else {
tracing::error!(
error = &e as &dyn std::error::Error,