diff --git a/crates/storage-pg/.sqlx/query-0ac1abe7161c0e58d76d8b1e4de293c35ba4503855d52a5b62b6e86b126362f5.json b/crates/storage-pg/.sqlx/query-0ac1abe7161c0e58d76d8b1e4de293c35ba4503855d52a5b62b6e86b126362f5.json new file mode 100644 index 000000000..6488ff09c --- /dev/null +++ b/crates/storage-pg/.sqlx/query-0ac1abe7161c0e58d76d8b1e4de293c35ba4503855d52a5b62b6e86b126362f5.json @@ -0,0 +1,37 @@ +{ + "db_name": "PostgreSQL", + "query": "\n -- We first grab a few jobs that are available,\n -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently\n -- and we don't get multiple workers grabbing the same jobs\n WITH locked_jobs AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE\n status = 'available'\n AND queue_name = ANY($1)\n ORDER BY queue_job_id ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n )\n -- then we update the status of those jobs to 'running', returning the job details\n UPDATE queue_jobs\n SET status = 'running', started_at = $3, started_by = $4\n FROM locked_jobs\n WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id\n RETURNING\n queue_jobs.queue_job_id,\n queue_jobs.payload,\n queue_jobs.metadata\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "queue_job_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "metadata", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "TextArray", + "Int8", + "Timestamptz", + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "0ac1abe7161c0e58d76d8b1e4de293c35ba4503855d52a5b62b6e86b126362f5" +} diff --git a/crates/storage-pg/.sqlx/query-e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c.json b/crates/storage-pg/.sqlx/query-e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c.json new file mode 100644 index 000000000..84ac12de9 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at)\n VALUES ($1, $2, $3, $4, $5)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Jsonb", + "Jsonb", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c" +} diff --git a/crates/storage-pg/migrations/20241004121132_queue_job.sql b/crates/storage-pg/migrations/20241004121132_queue_job.sql new file mode 100644 index 000000000..859377d52 --- /dev/null +++ b/crates/storage-pg/migrations/20241004121132_queue_job.sql @@ -0,0 +1,79 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +CREATE TYPE queue_job_status AS ENUM ( + -- The job is available to be picked up by a worker + 'available', + + -- The job is currently being processed by a worker + 'running', + + -- The job has been completed + 'completed', + + -- The worker running the job was lost + 'lost' +); + +CREATE TABLE queue_jobs ( + queue_job_id UUID NOT NULL PRIMARY KEY, + + -- The status of the job + status queue_job_status NOT NULL DEFAULT 'available', + + -- When the job was created + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- When the job was grabbed by a worker + started_at TIMESTAMP WITH TIME ZONE, + + -- Which worker is currently processing the job + started_by UUID REFERENCES queue_workers (queue_worker_id), + + -- When the job was completed + completed_at TIMESTAMP WITH TIME ZONE, + + -- The name of the queue this job belongs to + queue_name TEXT NOT NULL, + + -- The arguments to the job + payload JSONB NOT NULL DEFAULT '{}', + + -- Arbitrary metadata about the job, like the trace context + metadata JSONB NOT NULL DEFAULT '{}' +); + +-- When we grab jobs, we filter on the status of the job and the queue name +-- Then we order on the `queue_job_id` column, as it is a ULID, which ensures timestamp ordering +CREATE INDEX idx_queue_jobs_status_queue_job_id + ON queue_jobs + USING BTREE (status, queue_name, queue_job_id); + +-- We would like to notify workers when a job is available to wake them up +CREATE OR REPLACE FUNCTION queue_job_notify() + RETURNS TRIGGER + AS $$ +DECLARE + payload json; +BEGIN + IF NEW.status = 'available' THEN + -- The idea with this trigger is to notify the queue worker that a new job + -- is available on a queue. If there are many notifications with the same + -- payload, PG will coalesce them in a single notification, which is why we + -- keep the payload simple. + payload = json_build_object('queue', NEW.queue_name); + PERFORM + pg_notify('queue_available', payload::text); + END IF; + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER queue_job_notify_trigger + AFTER INSERT OR UPDATE OF status + ON queue_jobs + FOR EACH ROW + EXECUTE PROCEDURE queue_job_notify(); diff --git a/crates/storage-pg/src/queue/job.rs b/crates/storage-pg/src/queue/job.rs new file mode 100644 index 000000000..47185243e --- /dev/null +++ b/crates/storage-pg/src/queue/job.rs @@ -0,0 +1,150 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing the PostgreSQL implementation of the +//! [`QueueJobRepository`]. + +use async_trait::async_trait; +use mas_storage::{ + queue::{Job, QueueJobRepository, Worker}, + Clock, +}; +use rand::RngCore; +use sqlx::PgConnection; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{DatabaseError, ExecuteExt}; + +/// An implementation of [`QueueJobRepository`] for a PostgreSQL connection. +pub struct PgQueueJobRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgQueueJobRepository<'c> { + /// Create a new [`PgQueueJobRepository`] from an active PostgreSQL + /// connection. + #[must_use] + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +#[async_trait] +impl QueueJobRepository for PgQueueJobRepository<'_> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.queue_job.schedule", + fields( + queue_job.id, + queue_job.queue_name = queue_name, + db.query.text, + ), + skip_all, + err, + )] + async fn schedule( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + ) -> Result<(), Self::Error> { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("queue_job.id", tracing::field::display(id)); + + sqlx::query!( + r#" + INSERT INTO queue_jobs + (queue_job_id, queue_name, payload, metadata, created_at) + VALUES ($1, $2, $3, $4, $5) + "#, + Uuid::from(id), + queue_name, + payload, + metadata, + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_job.get_available", + fields( + db.query.text, + ), + skip_all, + err, + )] + async fn get_available( + &mut self, + clock: &dyn Clock, + worker: &Worker, + queues: &[&str], + max_count: usize, + ) -> Result, Self::Error> { + let now = clock.now(); + let max_count = i64::try_from(max_count).unwrap_or(i64::MAX); + let queues: Vec = queues.iter().map(|&s| s.to_owned()).collect(); + sqlx::query!( + r#" + -- We first grab a few jobs that are available, + -- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently + -- and we don't get multiple workers grabbing the same jobs + WITH locked_jobs AS ( + SELECT queue_job_id + FROM queue_jobs + WHERE + status = 'available' + AND queue_name = ANY($1) + ORDER BY queue_job_id ASC + LIMIT $2 + FOR UPDATE + SKIP LOCKED + ) + -- then we update the status of those jobs to 'running', returning the job details + UPDATE queue_jobs + SET status = 'running', started_at = $3, started_by = $4 + FROM locked_jobs + WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id + RETURNING + queue_jobs.queue_job_id, + queue_jobs.payload, + queue_jobs.metadata + "#, + &queues, + max_count, + now, + Uuid::from(worker.id), + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + todo!() + } + + #[tracing::instrument( + name = "db.queue_job.mark_completed", + fields( + queue_job.id = %job.id, + db.query.text, + ), + skip_all, + err, + )] + async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error> { + let _ = clock; + let _ = job; + todo!() + } +} diff --git a/crates/storage-pg/src/queue/mod.rs b/crates/storage-pg/src/queue/mod.rs index b6ba8295e..eca02b809 100644 --- a/crates/storage-pg/src/queue/mod.rs +++ b/crates/storage-pg/src/queue/mod.rs @@ -5,4 +5,5 @@ //! A module containing the PostgreSQL implementation of the job queue +pub mod job; pub mod worker; diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs new file mode 100644 index 000000000..a96780ff5 --- /dev/null +++ b/crates/storage/src/queue/job.rs @@ -0,0 +1,105 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! Repository to interact with jobs in the job queue + +use async_trait::async_trait; +use rand_core::RngCore; +use ulid::Ulid; + +use super::Worker; +use crate::{repository_impl, Clock}; + +enum JobState { + /// The job is available to be picked up by a worker + Available, + + /// The job is currently being processed by a worker + Running, + + /// The job has been completed + Completed, + + /// The worker running the job was lost + Lost, +} + +/// Represents a job in the job queue +pub struct Job { + /// The ID of the job + pub id: Ulid, +} + +/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a +/// worker. +#[async_trait] +pub trait QueueJobRepository: Send + Sync { + /// The error type returned by the repository. + type Error; + + /// Schedule a job to be executed as soon as possible by a worker. + /// + /// # Parameters + /// + /// * `rng` - The random number generator used to generate a new job ID + /// * `clock` - The clock used to generate timestamps + /// * `queue_name` - The name of the queue to schedule the job on + /// * `payload` - The payload of the job + /// * `metadata` - Arbitrary metadata about the job scheduled immediately. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn schedule( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + ) -> Result<(), Self::Error>; + + /// Get and lock a batch of jobs that are ready to be executed. + /// This will transition them to a [`JobState::Running`] state. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn get_available( + &mut self, + clock: &dyn Clock, + worker: &Worker, + queues: &[&str], + max_count: usize, + ) -> Result, Self::Error>; + + /// Mark the given job as completed. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error>; +} + +repository_impl!(QueueJobRepository: + async fn schedule( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + ) -> Result<(), Self::Error>; + + async fn get_available( + &mut self, + clock: &dyn Clock, + worker: &Worker, + queues: &[&str], + max_count: usize, + ) -> Result, Self::Error>; + + async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error>; +); diff --git a/crates/storage/src/queue/mod.rs b/crates/storage/src/queue/mod.rs index 4ca97ec5e..a9757aed1 100644 --- a/crates/storage/src/queue/mod.rs +++ b/crates/storage/src/queue/mod.rs @@ -5,6 +5,10 @@ //! A module containing repositories for the job queue +mod job; mod worker; -pub use self::worker::{QueueWorkerRepository, Worker}; +pub use self::{ + job::{Job, QueueJobRepository}, + worker::{QueueWorkerRepository, Worker}, +};