This commit is contained in:
Quentin Gliech
2024-10-14 10:29:56 +02:00
parent ac991a6572
commit 7bbc867e2a
7 changed files with 395 additions and 1 deletions

View File

@@ -0,0 +1,37 @@
{
"db_name": "PostgreSQL",
"query": "\n -- We first grab a few jobs that are available,\n -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently\n -- and we don't get multiple workers grabbing the same jobs\n WITH locked_jobs AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE\n status = 'available'\n AND queue_name = ANY($1)\n ORDER BY queue_job_id ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n )\n -- then we update the status of those jobs to 'running', returning the job details\n UPDATE queue_jobs\n SET status = 'running', started_at = $3, started_by = $4\n FROM locked_jobs\n WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id\n RETURNING\n queue_jobs.queue_job_id,\n queue_jobs.payload,\n queue_jobs.metadata\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "queue_job_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "payload",
"type_info": "Jsonb"
},
{
"ordinal": 2,
"name": "metadata",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"TextArray",
"Int8",
"Timestamptz",
"Uuid"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "0ac1abe7161c0e58d76d8b1e4de293c35ba4503855d52a5b62b6e86b126362f5"
}

View File

@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at)\n VALUES ($1, $2, $3, $4, $5)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Jsonb",
"Jsonb",
"Timestamptz"
]
},
"nullable": []
},
"hash": "e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c"
}

View File

@@ -0,0 +1,79 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
CREATE TYPE queue_job_status AS ENUM (
-- The job is available to be picked up by a worker
'available',
-- The job is currently being processed by a worker
'running',
-- The job has been completed
'completed',
-- The worker running the job was lost
'lost'
);
CREATE TABLE queue_jobs (
queue_job_id UUID NOT NULL PRIMARY KEY,
-- The status of the job
status queue_job_status NOT NULL DEFAULT 'available',
-- When the job was created
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
-- When the job was grabbed by a worker
started_at TIMESTAMP WITH TIME ZONE,
-- Which worker is currently processing the job
started_by UUID REFERENCES queue_workers (queue_worker_id),
-- When the job was completed
completed_at TIMESTAMP WITH TIME ZONE,
-- The name of the queue this job belongs to
queue_name TEXT NOT NULL,
-- The arguments to the job
payload JSONB NOT NULL DEFAULT '{}',
-- Arbitrary metadata about the job, like the trace context
metadata JSONB NOT NULL DEFAULT '{}'
);
-- When we grab jobs, we filter on the status of the job and the queue name
-- Then we order on the `queue_job_id` column, as it is a ULID, which ensures timestamp ordering
CREATE INDEX idx_queue_jobs_status_queue_job_id
ON queue_jobs
USING BTREE (status, queue_name, queue_job_id);
-- We would like to notify workers when a job is available to wake them up
CREATE OR REPLACE FUNCTION queue_job_notify()
RETURNS TRIGGER
AS $$
DECLARE
payload json;
BEGIN
IF NEW.status = 'available' THEN
-- The idea with this trigger is to notify the queue worker that a new job
-- is available on a queue. If there are many notifications with the same
-- payload, PG will coalesce them in a single notification, which is why we
-- keep the payload simple.
payload = json_build_object('queue', NEW.queue_name);
PERFORM
pg_notify('queue_available', payload::text);
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER queue_job_notify_trigger
AFTER INSERT OR UPDATE OF status
ON queue_jobs
FOR EACH ROW
EXECUTE PROCEDURE queue_job_notify();

View File

@@ -0,0 +1,150 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! A module containing the PostgreSQL implementation of the
//! [`QueueJobRepository`].
use async_trait::async_trait;
use mas_storage::{
queue::{Job, QueueJobRepository, Worker},
Clock,
};
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
use uuid::Uuid;
use crate::{DatabaseError, ExecuteExt};
/// An implementation of [`QueueJobRepository`] for a PostgreSQL connection.
pub struct PgQueueJobRepository<'c> {
conn: &'c mut PgConnection,
}
impl<'c> PgQueueJobRepository<'c> {
/// Create a new [`PgQueueJobRepository`] from an active PostgreSQL
/// connection.
#[must_use]
pub fn new(conn: &'c mut PgConnection) -> Self {
Self { conn }
}
}
#[async_trait]
impl QueueJobRepository for PgQueueJobRepository<'_> {
type Error = DatabaseError;
#[tracing::instrument(
name = "db.queue_job.schedule",
fields(
queue_job.id,
queue_job.queue_name = queue_name,
db.query.text,
),
skip_all,
err,
)]
async fn schedule(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
) -> Result<(), Self::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record("queue_job.id", tracing::field::display(id));
sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at)
VALUES ($1, $2, $3, $4, $5)
"#,
Uuid::from(id),
queue_name,
payload,
metadata,
created_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
Ok(())
}
#[tracing::instrument(
name = "db.queue_job.get_available",
fields(
db.query.text,
),
skip_all,
err,
)]
async fn get_available(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
max_count: usize,
) -> Result<Vec<Job>, Self::Error> {
let now = clock.now();
let max_count = i64::try_from(max_count).unwrap_or(i64::MAX);
let queues: Vec<String> = queues.iter().map(|&s| s.to_owned()).collect();
sqlx::query!(
r#"
-- We first grab a few jobs that are available,
-- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently
-- and we don't get multiple workers grabbing the same jobs
WITH locked_jobs AS (
SELECT queue_job_id
FROM queue_jobs
WHERE
status = 'available'
AND queue_name = ANY($1)
ORDER BY queue_job_id ASC
LIMIT $2
FOR UPDATE
SKIP LOCKED
)
-- then we update the status of those jobs to 'running', returning the job details
UPDATE queue_jobs
SET status = 'running', started_at = $3, started_by = $4
FROM locked_jobs
WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id
RETURNING
queue_jobs.queue_job_id,
queue_jobs.payload,
queue_jobs.metadata
"#,
&queues,
max_count,
now,
Uuid::from(worker.id),
)
.traced()
.fetch_all(&mut *self.conn)
.await?;
todo!()
}
#[tracing::instrument(
name = "db.queue_job.mark_completed",
fields(
queue_job.id = %job.id,
db.query.text,
),
skip_all,
err,
)]
async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error> {
let _ = clock;
let _ = job;
todo!()
}
}

View File

@@ -5,4 +5,5 @@
//! A module containing the PostgreSQL implementation of the job queue
pub mod job;
pub mod worker;

View File

@@ -0,0 +1,105 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! Repository to interact with jobs in the job queue
use async_trait::async_trait;
use rand_core::RngCore;
use ulid::Ulid;
use super::Worker;
use crate::{repository_impl, Clock};
enum JobState {
/// The job is available to be picked up by a worker
Available,
/// The job is currently being processed by a worker
Running,
/// The job has been completed
Completed,
/// The worker running the job was lost
Lost,
}
/// Represents a job in the job queue
pub struct Job {
/// The ID of the job
pub id: Ulid,
}
/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
/// worker.
#[async_trait]
pub trait QueueJobRepository: Send + Sync {
/// The error type returned by the repository.
type Error;
/// Schedule a job to be executed as soon as possible by a worker.
///
/// # Parameters
///
/// * `rng` - The random number generator used to generate a new job ID
/// * `clock` - The clock used to generate timestamps
/// * `queue_name` - The name of the queue to schedule the job on
/// * `payload` - The payload of the job
/// * `metadata` - Arbitrary metadata about the job scheduled immediately.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
) -> Result<(), Self::Error>;
/// Get and lock a batch of jobs that are ready to be executed.
/// This will transition them to a [`JobState::Running`] state.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn get_available(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
max_count: usize,
) -> Result<Vec<Job>, Self::Error>;
/// Mark the given job as completed.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error>;
}
repository_impl!(QueueJobRepository:
async fn schedule(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
) -> Result<(), Self::Error>;
async fn get_available(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
max_count: usize,
) -> Result<Vec<Job>, Self::Error>;
async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error>;
);

View File

@@ -5,6 +5,10 @@
//! A module containing repositories for the job queue
mod job;
mod worker;
pub use self::worker::{QueueWorkerRepository, Worker};
pub use self::{
job::{Job, QueueJobRepository},
worker::{QueueWorkerRepository, Worker},
};