New job queue: worker registration and leader election

This commit is contained in:
Quentin Gliech
2024-10-07 12:07:09 +02:00
parent df6333288e
commit 62ccd2b08c
17 changed files with 639 additions and 4 deletions

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at)\n VALUES ($1, $2, $2)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz"
]
},
"nullable": []
},
"hash": "12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_workers\n SET shutdown_at = $2\n WHERE queue_worker_id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz"
]
},
"nullable": []
},
"hash": "5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_workers\n SET shutdown_at = $1\n WHERE shutdown_at IS NULL\n AND last_seen_at < $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz"
]
},
"nullable": []
},
"hash": "6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (active)\n DO UPDATE SET expires_at = EXCLUDED.expires_at\n WHERE queue_leader.queue_worker_id = $3\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_workers\n SET last_seen_at = $2\n WHERE queue_worker_id = $1 AND shutdown_at IS NULL\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz"
]
},
"nullable": []
},
"hash": "966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM queue_leader\n WHERE expires_at < $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": []
},
"hash": "ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5"
}

View File

@@ -0,0 +1,37 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- This table stores informations about worker, mostly to track their health
CREATE TABLE queue_workers (
queue_worker_id UUID NOT NULL PRIMARY KEY,
-- When the worker was registered
registered_at TIMESTAMP WITH TIME ZONE NOT NULL,
-- When the worker was last seen
last_seen_at TIMESTAMP WITH TIME ZONE NOT NULL,
-- When the worker was shut down
shutdown_at TIMESTAMP WITH TIME ZONE
);
-- This single-row table stores the leader of the queue
-- The leader is responsible for running maintenance tasks
CREATE UNLOGGED TABLE queue_leader (
-- This makes the row unique
active BOOLEAN NOT NULL DEFAULT TRUE UNIQUE,
-- When the leader was elected
elected_at TIMESTAMP WITH TIME ZONE NOT NULL,
-- Until when the lease is valid
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
-- The worker ID of the leader
queue_worker_id UUID NOT NULL REFERENCES queue_workers (queue_worker_id),
-- This, combined with the unique constraint, makes sure we only ever have a single row
CONSTRAINT queue_leader_active CHECK (active IS TRUE)
);

View File

@@ -166,6 +166,7 @@ pub mod app_session;
pub mod compat;
pub mod job;
pub mod oauth2;
pub mod queue;
pub mod upstream_oauth2;
pub mod user;

View File

@@ -0,0 +1,8 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! A module containing the PostgreSQL implementation of the job queue
pub mod worker;

View File

@@ -0,0 +1,237 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! A module containing the PostgreSQL implementation of the
//! [`QueueWorkerRepository`].
use async_trait::async_trait;
use chrono::Duration;
use mas_storage::{
queue::{QueueWorkerRepository, Worker},
Clock,
};
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
use uuid::Uuid;
use crate::{DatabaseError, ExecuteExt};
/// An implementation of [`QueueWorkerRepository`] for a PostgreSQL connection.
pub struct PgQueueWorkerRepository<'c> {
conn: &'c mut PgConnection,
}
impl<'c> PgQueueWorkerRepository<'c> {
/// Create a new [`PgQueueWorkerRepository`] from an active PostgreSQL
/// connection.
#[must_use]
pub fn new(conn: &'c mut PgConnection) -> Self {
Self { conn }
}
}
#[async_trait]
impl QueueWorkerRepository for PgQueueWorkerRepository<'_> {
type Error = DatabaseError;
#[tracing::instrument(
name = "db.queue_worker.register",
skip_all,
fields(
worker.id,
db.query.text,
),
err,
)]
async fn register(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
) -> Result<Worker, Self::Error> {
let now = clock.now();
let worker_id = Ulid::from_datetime_with_source(now.into(), rng);
tracing::Span::current().record("worker.id", tracing::field::display(worker_id));
sqlx::query!(
r#"
INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at)
VALUES ($1, $2, $2)
"#,
Uuid::from(worker_id),
now,
)
.traced()
.execute(&mut *self.conn)
.await?;
Ok(Worker { id: worker_id })
}
#[tracing::instrument(
name = "db.queue_worker.heartbeat",
skip_all,
fields(
%worker.id,
db.query.text,
),
err,
)]
async fn heartbeat(
&mut self,
clock: &dyn Clock,
worker: Worker,
) -> Result<Worker, Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_workers
SET last_seen_at = $2
WHERE queue_worker_id = $1 AND shutdown_at IS NULL
"#,
Uuid::from(worker.id),
now,
)
.traced()
.execute(&mut *self.conn)
.await?;
// If no row was updated, the worker was shutdown so we return an error
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(worker)
}
#[tracing::instrument(
name = "db.queue_worker.shutdown",
skip_all,
fields(
%worker.id,
db.query.text,
),
err,
)]
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_workers
SET shutdown_at = $2
WHERE queue_worker_id = $1
"#,
Uuid::from(worker.id),
now,
)
.traced()
.execute(&mut *self.conn)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(())
}
#[tracing::instrument(
name = "db.queue_worker.shutdown_dead_workers",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn shutdown_dead_workers(
&mut self,
clock: &dyn Clock,
threshold: Duration,
) -> Result<(), Self::Error> {
let now = clock.now();
sqlx::query!(
r#"
UPDATE queue_workers
SET shutdown_at = $1
WHERE shutdown_at IS NULL
AND last_seen_at < $2
"#,
now,
now - threshold,
)
.traced()
.execute(&mut *self.conn)
.await?;
Ok(())
}
#[tracing::instrument(
name = "db.queue_worker.remove_leader_lease_if_expired",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn remove_leader_lease_if_expired(
&mut self,
clock: &dyn Clock,
) -> Result<(), Self::Error> {
let now = clock.now();
sqlx::query!(
r#"
DELETE FROM queue_leader
WHERE expires_at < $1
"#,
now,
)
.traced()
.execute(&mut *self.conn)
.await?;
Ok(())
}
#[tracing::instrument(
name = "db.queue_worker.try_get_leader_lease",
skip_all,
fields(
%worker.id,
db.query.text,
),
err,
)]
async fn try_get_leader_lease(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<bool, Self::Error> {
let now = clock.now();
let ttl = Duration::seconds(5);
// The queue_leader table is meant to only have a single row, which conflicts on
// the `active` column
// If there is a conflict, we update the `expires_at` column ONLY IF the current
// leader is ourselves.
let res = sqlx::query!(
r#"
INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)
VALUES ($1, $2, $3)
ON CONFLICT (active)
DO UPDATE SET expires_at = EXCLUDED.expires_at
WHERE queue_leader.queue_worker_id = $3
"#,
now,
now + ttl,
Uuid::from(worker.id)
)
.traced()
.execute(&mut *self.conn)
.await?;
// We can then detect whether we are the leader or not by checking how many rows
// were affected by the upsert
let am_i_the_leader = res.rows_affected() == 1;
Ok(am_i_the_leader)
}
}

View File

@@ -40,6 +40,7 @@ use crate::{
PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository,
PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository,
},
queue::worker::PgQueueWorkerRepository,
upstream_oauth2::{
PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository,
PgUpstreamOAuthSessionRepository,
@@ -263,4 +264,10 @@ where
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c> {
Box::new(PgJobRepository::new(self.conn.as_mut()))
}
fn queue_worker<'c>(
&'c mut self,
) -> Box<dyn mas_storage::queue::QueueWorkerRepository<Error = Self::Error> + 'c> {
Box::new(PgQueueWorkerRepository::new(self.conn.as_mut()))
}
}

View File

@@ -120,6 +120,7 @@ pub mod app_session;
pub mod compat;
pub mod job;
pub mod oauth2;
pub mod queue;
pub mod upstream_oauth2;
pub mod user;

View File

@@ -0,0 +1,10 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! A module containing repositories for the job queue
mod worker;
pub use self::worker::{QueueWorkerRepository, Worker};

View File

@@ -0,0 +1,130 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! Repository to interact with workers in the job queue
use async_trait::async_trait;
use chrono::Duration;
use rand_core::RngCore;
use ulid::Ulid;
use crate::{repository_impl, Clock};
/// A worker is an entity which can execute jobs.
pub struct Worker {
/// The ID of the worker.
pub id: Ulid,
}
/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
/// worker.
#[async_trait]
pub trait QueueWorkerRepository: Send + Sync {
/// The error type returned by the repository.
type Error;
/// Register a new worker.
///
/// Returns a reference to the worker.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn register(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
) -> Result<Worker, Self::Error>;
/// Send a heartbeat for the given worker.
///
/// Returns the updated worker.
///
/// # Errors
///
/// Returns an error if the underlying repository fails or if the worker was
/// shutdown.
async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker)
-> Result<Worker, Self::Error>;
/// Mark the given worker as shutdown.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error>;
/// Find dead workers and shut them down.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn shutdown_dead_workers(
&mut self,
clock: &dyn Clock,
threshold: Duration,
) -> Result<(), Self::Error>;
/// Remove the leader lease if it is expired, sending a notification to
/// trigger a new leader election.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn remove_leader_lease_if_expired(
&mut self,
clock: &dyn Clock,
) -> Result<(), Self::Error>;
/// Try to get the leader lease, renewing it if we already have it
///
/// Returns `true` if we got the leader lease, `false` if we didn't
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn try_get_leader_lease(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<bool, Self::Error>;
}
repository_impl!(QueueWorkerRepository:
async fn register(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
) -> Result<Worker, Self::Error>;
async fn heartbeat(
&mut self,
clock: &dyn Clock,
worker: Worker,
) -> Result<Worker, Self::Error>;
async fn shutdown(
&mut self,
clock: &dyn Clock,
worker: Worker,
) -> Result<(), Self::Error>;
async fn shutdown_dead_workers(
&mut self,
clock: &dyn Clock,
threshold: Duration,
) -> Result<(), Self::Error>;
async fn remove_leader_lease_if_expired(
&mut self,
clock: &dyn Clock,
) -> Result<(), Self::Error>;
async fn try_get_leader_lease(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<bool, Self::Error>;
);

View File

@@ -18,6 +18,7 @@ use crate::{
OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository,
OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository,
},
queue::QueueWorkerRepository,
upstream_oauth2::{
UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository,
UpstreamOAuthSessionRepository,
@@ -191,6 +192,9 @@ pub trait RepositoryAccess: Send {
/// Get a [`JobRepository`]
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c>;
/// Get a [`QueueWorkerRepository`]
fn queue_worker<'c>(&'c mut self) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c>;
}
/// Implementations of the [`RepositoryAccess`], [`RepositoryTransaction`] and
@@ -211,6 +215,7 @@ mod impls {
OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository,
OAuth2SessionRepository,
},
queue::QueueWorkerRepository,
upstream_oauth2::{
UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository,
UpstreamOAuthSessionRepository,
@@ -405,6 +410,12 @@ mod impls {
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c> {
Box::new(MapErr::new(self.inner.job(), &mut self.mapper))
}
fn queue_worker<'c>(
&'c mut self,
) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c> {
Box::new(MapErr::new(self.inner.queue_worker(), &mut self.mapper))
}
}
impl<R: RepositoryAccess + ?Sized> RepositoryAccess for Box<R> {
@@ -527,5 +538,11 @@ mod impls {
fn job<'c>(&'c mut self) -> Box<dyn JobRepository<Error = Self::Error> + 'c> {
(**self).job()
}
fn queue_worker<'c>(
&'c mut self,
) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c> {
(**self).queue_worker()
}
}
}

View File

@@ -10,8 +10,8 @@ use apalis_core::{executor::TokioExecutor, layers::extensions::Extension, monito
use mas_email::Mailer;
use mas_matrix::HomeserverConnection;
use mas_router::UrlBuilder;
use mas_storage::{BoxClock, BoxRepository, SystemClock};
use mas_storage_pg::{DatabaseError, PgRepository};
use mas_storage::{BoxClock, BoxRepository, RepositoryError, SystemClock};
use mas_storage_pg::PgRepository;
use rand::SeedableRng;
use sqlx::{Pool, Postgres};
use tracing::debug;
@@ -21,6 +21,7 @@ use crate::storage::PostgresStorageFactory;
mod database;
mod email;
mod matrix;
mod new_queue;
mod recovery;
mod storage;
mod user;
@@ -74,8 +75,11 @@ impl State {
rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
}
pub async fn repository(&self) -> Result<BoxRepository, DatabaseError> {
let repo = PgRepository::from_pool(self.pool()).await?.boxed();
pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
let repo = PgRepository::from_pool(self.pool())
.await
.map_err(RepositoryError::from_error)?
.boxed();
Ok(repo)
}
@@ -156,5 +160,17 @@ pub async fn init(
// TODO: we might want to grab the join handle here
factory.listen().await?;
debug!(?monitor, "workers registered");
// TODO: this is just spawning the task in the background, we probably actually
// want to wrap that in a structure, and handle graceful shutdown correctly
tokio::spawn(async move {
if let Err(e) = self::new_queue::run(state).await {
tracing::error!(
error = &e as &dyn std::error::Error,
"Failed to run new queue"
);
}
});
Ok(monitor)
}

View File

@@ -0,0 +1,81 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use chrono::Duration;
use mas_storage::{RepositoryAccess, RepositoryError};
use crate::State;
pub async fn run(state: State) -> Result<(), RepositoryError> {
let span = tracing::info_span!("worker.init", worker.id = tracing::field::Empty);
let guard = span.enter();
let mut repo = state.repository().await?;
let mut rng = state.rng();
let clock = state.clock();
let mut worker = repo.queue_worker().register(&mut rng, &clock).await?;
span.record("worker.id", tracing::field::display(worker.id));
repo.save().await?;
tracing::info!("Registered worker");
drop(guard);
let mut was_i_the_leader = false;
// Record when we last sent a heartbeat
let mut last_heartbeat = clock.now();
loop {
// This is to make sure we wake up every second to do the maintenance tasks
// Later we might wait on other events, like a PG notification
let wakeup_sleep = tokio::time::sleep(std::time::Duration::from_secs(1));
wakeup_sleep.await;
let span = tracing::info_span!("worker.tick", %worker.id);
let _guard = span.enter();
tracing::debug!("Tick");
let now = clock.now();
let mut repo = state.repository().await?;
// We send a heartbeat every minute, to avoid writing to the database too often
// on a logged table
if now - last_heartbeat >= chrono::Duration::minutes(1) {
tracing::info!("Sending heartbeat");
worker = repo.queue_worker().heartbeat(&clock, worker).await?;
last_heartbeat = now;
}
// Remove any dead worker leader leases
repo.queue_worker()
.remove_leader_lease_if_expired(&clock)
.await?;
// Try to become (or stay) the leader
let am_i_the_leader = repo
.queue_worker()
.try_get_leader_lease(&clock, &worker)
.await?;
// Log any changes in leadership
if !was_i_the_leader && am_i_the_leader {
tracing::info!("I'm the leader now");
} else if was_i_the_leader && !am_i_the_leader {
tracing::warn!("I am no longer the leader");
}
was_i_the_leader = am_i_the_leader;
// The leader does all the maintenance work
if am_i_the_leader {
// We also check if the worker is dead, and if so, we shutdown all the dead
// workers that haven't checked in the last two minutes
repo.queue_worker()
.shutdown_dead_workers(&clock, Duration::minutes(2))
.await?;
}
repo.save().await?;
}
}