From 62ccd2b08cedc461cb52f9c99eedbd19fac935f9 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 7 Oct 2024 12:07:09 +0200 Subject: [PATCH] New job queue: worker registration and leader election --- ...0f3f086e4e62c6de9d6864a6a11a2470ebe62.json | 15 ++ ...0dc74505b22c681322bd99b62c2a540c6cd35.json | 15 ++ ...5b531b9873f4139eadcbf1450e726b9a27379.json | 15 ++ ...d356a4ed86fd33400066e422545ffc55f9aa9.json | 16 ++ ...327d03b29fe413d57cce21c67b6d539f59e7d.json | 15 ++ ...84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json | 14 ++ .../20241004075132_queue_worker.sql | 37 +++ crates/storage-pg/src/lib.rs | 1 + crates/storage-pg/src/queue/mod.rs | 8 + crates/storage-pg/src/queue/worker.rs | 237 ++++++++++++++++++ crates/storage-pg/src/repository.rs | 7 + crates/storage/src/lib.rs | 1 + crates/storage/src/queue/mod.rs | 10 + crates/storage/src/queue/worker.rs | 130 ++++++++++ crates/storage/src/repository.rs | 17 ++ crates/tasks/src/lib.rs | 24 +- crates/tasks/src/new_queue.rs | 81 ++++++ 17 files changed, 639 insertions(+), 4 deletions(-) create mode 100644 crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json create mode 100644 crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json create mode 100644 crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json create mode 100644 crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json create mode 100644 crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json create mode 100644 crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json create mode 100644 crates/storage-pg/migrations/20241004075132_queue_worker.sql create mode 100644 crates/storage-pg/src/queue/mod.rs create mode 100644 crates/storage-pg/src/queue/worker.rs create mode 100644 crates/storage/src/queue/mod.rs create mode 100644 crates/storage/src/queue/worker.rs create mode 100644 crates/tasks/src/new_queue.rs diff --git a/crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json b/crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json new file mode 100644 index 000000000..dce1983fe --- /dev/null +++ b/crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at)\n VALUES ($1, $2, $2)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62" +} diff --git a/crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json b/crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json new file mode 100644 index 000000000..364a1c6b6 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_workers\n SET shutdown_at = $2\n WHERE queue_worker_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35" +} diff --git a/crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json b/crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json new file mode 100644 index 000000000..4898fc432 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_workers\n SET shutdown_at = $1\n WHERE shutdown_at IS NULL\n AND last_seen_at < $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379" +} diff --git a/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json b/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json new file mode 100644 index 000000000..9195a9d4d --- /dev/null +++ b/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (active)\n DO UPDATE SET expires_at = EXCLUDED.expires_at\n WHERE queue_leader.queue_worker_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9" +} diff --git a/crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json b/crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json new file mode 100644 index 000000000..3e1fb3580 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_workers\n SET last_seen_at = $2\n WHERE queue_worker_id = $1 AND shutdown_at IS NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d" +} diff --git a/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json b/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json new file mode 100644 index 000000000..af6213a8a --- /dev/null +++ b/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM queue_leader\n WHERE expires_at < $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5" +} diff --git a/crates/storage-pg/migrations/20241004075132_queue_worker.sql b/crates/storage-pg/migrations/20241004075132_queue_worker.sql new file mode 100644 index 000000000..07b49d22d --- /dev/null +++ b/crates/storage-pg/migrations/20241004075132_queue_worker.sql @@ -0,0 +1,37 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- This table stores informations about worker, mostly to track their health +CREATE TABLE queue_workers ( + queue_worker_id UUID NOT NULL PRIMARY KEY, + + -- When the worker was registered + registered_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- When the worker was last seen + last_seen_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- When the worker was shut down + shutdown_at TIMESTAMP WITH TIME ZONE +); + +-- This single-row table stores the leader of the queue +-- The leader is responsible for running maintenance tasks +CREATE UNLOGGED TABLE queue_leader ( + -- This makes the row unique + active BOOLEAN NOT NULL DEFAULT TRUE UNIQUE, + + -- When the leader was elected + elected_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- Until when the lease is valid + expires_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- The worker ID of the leader + queue_worker_id UUID NOT NULL REFERENCES queue_workers (queue_worker_id), + + -- This, combined with the unique constraint, makes sure we only ever have a single row + CONSTRAINT queue_leader_active CHECK (active IS TRUE) +); diff --git a/crates/storage-pg/src/lib.rs b/crates/storage-pg/src/lib.rs index aa7fadd55..e16303278 100644 --- a/crates/storage-pg/src/lib.rs +++ b/crates/storage-pg/src/lib.rs @@ -166,6 +166,7 @@ pub mod app_session; pub mod compat; pub mod job; pub mod oauth2; +pub mod queue; pub mod upstream_oauth2; pub mod user; diff --git a/crates/storage-pg/src/queue/mod.rs b/crates/storage-pg/src/queue/mod.rs new file mode 100644 index 000000000..b6ba8295e --- /dev/null +++ b/crates/storage-pg/src/queue/mod.rs @@ -0,0 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing the PostgreSQL implementation of the job queue + +pub mod worker; diff --git a/crates/storage-pg/src/queue/worker.rs b/crates/storage-pg/src/queue/worker.rs new file mode 100644 index 000000000..2aaacc64b --- /dev/null +++ b/crates/storage-pg/src/queue/worker.rs @@ -0,0 +1,237 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing the PostgreSQL implementation of the +//! [`QueueWorkerRepository`]. + +use async_trait::async_trait; +use chrono::Duration; +use mas_storage::{ + queue::{QueueWorkerRepository, Worker}, + Clock, +}; +use rand::RngCore; +use sqlx::PgConnection; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{DatabaseError, ExecuteExt}; + +/// An implementation of [`QueueWorkerRepository`] for a PostgreSQL connection. +pub struct PgQueueWorkerRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgQueueWorkerRepository<'c> { + /// Create a new [`PgQueueWorkerRepository`] from an active PostgreSQL + /// connection. + #[must_use] + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +#[async_trait] +impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.queue_worker.register", + skip_all, + fields( + worker.id, + db.query.text, + ), + err, + )] + async fn register( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + ) -> Result { + let now = clock.now(); + let worker_id = Ulid::from_datetime_with_source(now.into(), rng); + tracing::Span::current().record("worker.id", tracing::field::display(worker_id)); + + sqlx::query!( + r#" + INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at) + VALUES ($1, $2, $2) + "#, + Uuid::from(worker_id), + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(Worker { id: worker_id }) + } + + #[tracing::instrument( + name = "db.queue_worker.heartbeat", + skip_all, + fields( + %worker.id, + db.query.text, + ), + err, + )] + async fn heartbeat( + &mut self, + clock: &dyn Clock, + worker: Worker, + ) -> Result { + let now = clock.now(); + let res = sqlx::query!( + r#" + UPDATE queue_workers + SET last_seen_at = $2 + WHERE queue_worker_id = $1 AND shutdown_at IS NULL + "#, + Uuid::from(worker.id), + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + // If no row was updated, the worker was shutdown so we return an error + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(worker) + } + + #[tracing::instrument( + name = "db.queue_worker.shutdown", + skip_all, + fields( + %worker.id, + db.query.text, + ), + err, + )] + async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> { + let now = clock.now(); + let res = sqlx::query!( + r#" + UPDATE queue_workers + SET shutdown_at = $2 + WHERE queue_worker_id = $1 + "#, + Uuid::from(worker.id), + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_worker.shutdown_dead_workers", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn shutdown_dead_workers( + &mut self, + clock: &dyn Clock, + threshold: Duration, + ) -> Result<(), Self::Error> { + let now = clock.now(); + sqlx::query!( + r#" + UPDATE queue_workers + SET shutdown_at = $1 + WHERE shutdown_at IS NULL + AND last_seen_at < $2 + "#, + now, + now - threshold, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_worker.remove_leader_lease_if_expired", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn remove_leader_lease_if_expired( + &mut self, + clock: &dyn Clock, + ) -> Result<(), Self::Error> { + let now = clock.now(); + sqlx::query!( + r#" + DELETE FROM queue_leader + WHERE expires_at < $1 + "#, + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_worker.try_get_leader_lease", + skip_all, + fields( + %worker.id, + db.query.text, + ), + err, + )] + async fn try_get_leader_lease( + &mut self, + clock: &dyn Clock, + worker: &Worker, + ) -> Result { + let now = clock.now(); + let ttl = Duration::seconds(5); + // The queue_leader table is meant to only have a single row, which conflicts on + // the `active` column + + // If there is a conflict, we update the `expires_at` column ONLY IF the current + // leader is ourselves. + let res = sqlx::query!( + r#" + INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id) + VALUES ($1, $2, $3) + ON CONFLICT (active) + DO UPDATE SET expires_at = EXCLUDED.expires_at + WHERE queue_leader.queue_worker_id = $3 + "#, + now, + now + ttl, + Uuid::from(worker.id) + ) + .traced() + .execute(&mut *self.conn) + .await?; + + // We can then detect whether we are the leader or not by checking how many rows + // were affected by the upsert + let am_i_the_leader = res.rows_affected() == 1; + + Ok(am_i_the_leader) + } +} diff --git a/crates/storage-pg/src/repository.rs b/crates/storage-pg/src/repository.rs index 284f5e2dc..99580467c 100644 --- a/crates/storage-pg/src/repository.rs +++ b/crates/storage-pg/src/repository.rs @@ -40,6 +40,7 @@ use crate::{ PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository, PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository, }, + queue::worker::PgQueueWorkerRepository, upstream_oauth2::{ PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository, PgUpstreamOAuthSessionRepository, @@ -263,4 +264,10 @@ where fn job<'c>(&'c mut self) -> Box + 'c> { Box::new(PgJobRepository::new(self.conn.as_mut())) } + + fn queue_worker<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(PgQueueWorkerRepository::new(self.conn.as_mut())) + } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index f2f699ba6..30dc553de 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -120,6 +120,7 @@ pub mod app_session; pub mod compat; pub mod job; pub mod oauth2; +pub mod queue; pub mod upstream_oauth2; pub mod user; diff --git a/crates/storage/src/queue/mod.rs b/crates/storage/src/queue/mod.rs new file mode 100644 index 000000000..4ca97ec5e --- /dev/null +++ b/crates/storage/src/queue/mod.rs @@ -0,0 +1,10 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing repositories for the job queue + +mod worker; + +pub use self::worker::{QueueWorkerRepository, Worker}; diff --git a/crates/storage/src/queue/worker.rs b/crates/storage/src/queue/worker.rs new file mode 100644 index 000000000..dfb9699e6 --- /dev/null +++ b/crates/storage/src/queue/worker.rs @@ -0,0 +1,130 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! Repository to interact with workers in the job queue + +use async_trait::async_trait; +use chrono::Duration; +use rand_core::RngCore; +use ulid::Ulid; + +use crate::{repository_impl, Clock}; + +/// A worker is an entity which can execute jobs. +pub struct Worker { + /// The ID of the worker. + pub id: Ulid, +} + +/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a +/// worker. +#[async_trait] +pub trait QueueWorkerRepository: Send + Sync { + /// The error type returned by the repository. + type Error; + + /// Register a new worker. + /// + /// Returns a reference to the worker. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn register( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + ) -> Result; + + /// Send a heartbeat for the given worker. + /// + /// Returns the updated worker. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails or if the worker was + /// shutdown. + async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker) + -> Result; + + /// Mark the given worker as shutdown. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error>; + + /// Find dead workers and shut them down. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn shutdown_dead_workers( + &mut self, + clock: &dyn Clock, + threshold: Duration, + ) -> Result<(), Self::Error>; + + /// Remove the leader lease if it is expired, sending a notification to + /// trigger a new leader election. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn remove_leader_lease_if_expired( + &mut self, + clock: &dyn Clock, + ) -> Result<(), Self::Error>; + + /// Try to get the leader lease, renewing it if we already have it + /// + /// Returns `true` if we got the leader lease, `false` if we didn't + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn try_get_leader_lease( + &mut self, + clock: &dyn Clock, + worker: &Worker, + ) -> Result; +} + +repository_impl!(QueueWorkerRepository: + async fn register( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + ) -> Result; + + async fn heartbeat( + &mut self, + clock: &dyn Clock, + worker: Worker, + ) -> Result; + + async fn shutdown( + &mut self, + clock: &dyn Clock, + worker: Worker, + ) -> Result<(), Self::Error>; + + async fn shutdown_dead_workers( + &mut self, + clock: &dyn Clock, + threshold: Duration, + ) -> Result<(), Self::Error>; + + async fn remove_leader_lease_if_expired( + &mut self, + clock: &dyn Clock, + ) -> Result<(), Self::Error>; + + async fn try_get_leader_lease( + &mut self, + clock: &dyn Clock, + worker: &Worker, + ) -> Result; +); diff --git a/crates/storage/src/repository.rs b/crates/storage/src/repository.rs index a78d51d1d..55d19d281 100644 --- a/crates/storage/src/repository.rs +++ b/crates/storage/src/repository.rs @@ -18,6 +18,7 @@ use crate::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + queue::QueueWorkerRepository, upstream_oauth2::{ UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository, UpstreamOAuthSessionRepository, @@ -191,6 +192,9 @@ pub trait RepositoryAccess: Send { /// Get a [`JobRepository`] fn job<'c>(&'c mut self) -> Box + 'c>; + + /// Get a [`QueueWorkerRepository`] + fn queue_worker<'c>(&'c mut self) -> Box + 'c>; } /// Implementations of the [`RepositoryAccess`], [`RepositoryTransaction`] and @@ -211,6 +215,7 @@ mod impls { OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + queue::QueueWorkerRepository, upstream_oauth2::{ UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository, UpstreamOAuthSessionRepository, @@ -405,6 +410,12 @@ mod impls { fn job<'c>(&'c mut self) -> Box + 'c> { Box::new(MapErr::new(self.inner.job(), &mut self.mapper)) } + + fn queue_worker<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(MapErr::new(self.inner.queue_worker(), &mut self.mapper)) + } } impl RepositoryAccess for Box { @@ -527,5 +538,11 @@ mod impls { fn job<'c>(&'c mut self) -> Box + 'c> { (**self).job() } + + fn queue_worker<'c>( + &'c mut self, + ) -> Box + 'c> { + (**self).queue_worker() + } } } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 8d012bdae..52e9683cc 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -10,8 +10,8 @@ use apalis_core::{executor::TokioExecutor, layers::extensions::Extension, monito use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; -use mas_storage::{BoxClock, BoxRepository, SystemClock}; -use mas_storage_pg::{DatabaseError, PgRepository}; +use mas_storage::{BoxClock, BoxRepository, RepositoryError, SystemClock}; +use mas_storage_pg::PgRepository; use rand::SeedableRng; use sqlx::{Pool, Postgres}; use tracing::debug; @@ -21,6 +21,7 @@ use crate::storage::PostgresStorageFactory; mod database; mod email; mod matrix; +mod new_queue; mod recovery; mod storage; mod user; @@ -74,8 +75,11 @@ impl State { rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng") } - pub async fn repository(&self) -> Result { - let repo = PgRepository::from_pool(self.pool()).await?.boxed(); + pub async fn repository(&self) -> Result { + let repo = PgRepository::from_pool(self.pool()) + .await + .map_err(RepositoryError::from_error)? + .boxed(); Ok(repo) } @@ -156,5 +160,17 @@ pub async fn init( // TODO: we might want to grab the join handle here factory.listen().await?; debug!(?monitor, "workers registered"); + + // TODO: this is just spawning the task in the background, we probably actually + // want to wrap that in a structure, and handle graceful shutdown correctly + tokio::spawn(async move { + if let Err(e) = self::new_queue::run(state).await { + tracing::error!( + error = &e as &dyn std::error::Error, + "Failed to run new queue" + ); + } + }); + Ok(monitor) } diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs new file mode 100644 index 000000000..eabf17aa6 --- /dev/null +++ b/crates/tasks/src/new_queue.rs @@ -0,0 +1,81 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use chrono::Duration; +use mas_storage::{RepositoryAccess, RepositoryError}; + +use crate::State; + +pub async fn run(state: State) -> Result<(), RepositoryError> { + let span = tracing::info_span!("worker.init", worker.id = tracing::field::Empty); + let guard = span.enter(); + let mut repo = state.repository().await?; + let mut rng = state.rng(); + let clock = state.clock(); + + let mut worker = repo.queue_worker().register(&mut rng, &clock).await?; + span.record("worker.id", tracing::field::display(worker.id)); + repo.save().await?; + + tracing::info!("Registered worker"); + drop(guard); + + let mut was_i_the_leader = false; + + // Record when we last sent a heartbeat + let mut last_heartbeat = clock.now(); + + loop { + // This is to make sure we wake up every second to do the maintenance tasks + // Later we might wait on other events, like a PG notification + let wakeup_sleep = tokio::time::sleep(std::time::Duration::from_secs(1)); + wakeup_sleep.await; + + let span = tracing::info_span!("worker.tick", %worker.id); + let _guard = span.enter(); + + tracing::debug!("Tick"); + let now = clock.now(); + let mut repo = state.repository().await?; + + // We send a heartbeat every minute, to avoid writing to the database too often + // on a logged table + if now - last_heartbeat >= chrono::Duration::minutes(1) { + tracing::info!("Sending heartbeat"); + worker = repo.queue_worker().heartbeat(&clock, worker).await?; + last_heartbeat = now; + } + + // Remove any dead worker leader leases + repo.queue_worker() + .remove_leader_lease_if_expired(&clock) + .await?; + + // Try to become (or stay) the leader + let am_i_the_leader = repo + .queue_worker() + .try_get_leader_lease(&clock, &worker) + .await?; + + // Log any changes in leadership + if !was_i_the_leader && am_i_the_leader { + tracing::info!("I'm the leader now"); + } else if was_i_the_leader && !am_i_the_leader { + tracing::warn!("I am no longer the leader"); + } + was_i_the_leader = am_i_the_leader; + + // The leader does all the maintenance work + if am_i_the_leader { + // We also check if the worker is dead, and if so, we shutdown all the dead + // workers that haven't checked in the last two minutes + repo.queue_worker() + .shutdown_dead_workers(&clock, Duration::minutes(2)) + .await?; + } + + repo.save().await?; + } +}