Remove the schedule_expression from the database & other fixes

This commit is contained in:
Quentin Gliech
2024-12-05 17:55:34 +01:00
parent fc7dd0ffdf
commit e0aab3740f
14 changed files with 116 additions and 53 deletions

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)\n VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')\n ",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, schedule_name, status)\n VALUES ($1, $2, $3, $4, $5, $6, $7, 'scheduled')\n ",
"describe": {
"columns": [],
"parameters": {
@@ -10,10 +10,11 @@
"Jsonb",
"Jsonb",
"Timestamptz",
"Timestamptz"
"Timestamptz",
"Text"
]
},
"nullable": []
},
"hash": "d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943"
"hash": "245cab1cf7d9cf4e94cdec91ecb4dc8e678278121efbe1f66bcdc24144d684d0"
}

View File

@@ -1,17 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'\n FROM queue_jobs\n WHERE queue_job_id = $4\n AND status = 'failed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_schedules\n SET last_scheduled_at = $1,\n last_scheduled_job_id = $2\n WHERE schedule_name = $3\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Uuid",
"Text"
]
},
"nullable": []
},
"hash": "3e6e3aad53b22fc53eb3ee881b29bb249b18ced57d6a4809dffc23972b3e9423"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE queue_schedules\n SET last_scheduled_at = $1,\n last_scheduled_job_id = $2\n WHERE last_scheduled_job_id = $3\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Uuid",
"Uuid"
]
},
"nullable": []
},
"hash": "5b21644dd3c094b0f2f8babb2c730554dc067d0a6cad963dd7e0c66a80b342bf"
}

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at,\n attempt, scheduled_at, schedule_name, status)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, schedule_name, 'scheduled'\n FROM queue_jobs\n WHERE queue_job_id = $4\n AND status = 'failed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "8f4f071f844281fb14ecd99db3261540441b14c8206038fdc4a4336bbae3f382"
}

View File

@@ -0,0 +1,32 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n queue_schedules.schedule_name,\n queue_schedules.last_scheduled_at,\n queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed\n FROM queue_schedules\n LEFT JOIN queue_jobs\n ON queue_jobs.queue_job_id = queue_schedules.last_scheduled_job_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "schedule_name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "last_scheduled_at",
"type_info": "Timestamptz"
},
{
"ordinal": 2,
"name": "last_scheduled_job_completed",
"type_info": "Bool"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
true,
null
]
},
"hash": "9ad4e6e9bfedea476d1f47753e4738455e94eade48ad5f577e53278cc70dc266"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO queue_schedules (schedule_name)\n SELECT * FROM UNNEST($1::text[]) AS t (schedule_name)\n ON CONFLICT (schedule_name) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"TextArray"
]
},
"nullable": []
},
"hash": "f8182fd162ffb018d4f102fa7ddbc9991135065e81af8f77b5beef9405607577"
}

View File

@@ -6,11 +6,7 @@
-- Add a table to track the state of scheduled recurring jobs.
CREATE TABLE queue_schedules (
-- A unique name for the schedule
schedule_name TEXT PRIMARY KEY,
-- The cron expression to use to schedule the job. This is there just for
-- convenience, as this is defined by the backend
schedule_expression TEXT NOT NULL,
schedule_name TEXT NOT NULL PRIMARY KEY,
-- The last time the job was scheduled. If NULL, it means that the job was
-- never scheduled.
@@ -22,7 +18,7 @@ CREATE TABLE queue_schedules (
REFERENCES queue_jobs (queue_job_id)
);
-- When a job is scheduled from a recurreing schedule, we keep a column
-- When a job is scheduled from a recurring schedule, we keep a column
-- referencing the name of the schedule
ALTER TABLE queue_jobs
ADD COLUMN schedule_name TEXT

View File

@@ -8,7 +8,7 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use mas_storage::queue::{QueueScheduleRepository, Schedule, ScheduleStatus};
use mas_storage::queue::{QueueScheduleRepository, ScheduleStatus};
use sqlx::PgConnection;
use crate::{DatabaseError, ExecuteExt};
@@ -45,22 +45,17 @@ impl From<ScheduleLookup> for ScheduleStatus {
}
#[async_trait]
impl<'c> QueueScheduleRepository for PgQueueScheduleRepository<'c> {
impl QueueScheduleRepository for PgQueueScheduleRepository<'_> {
type Error = DatabaseError;
async fn setup(&mut self, schedules: &[(&'static str, Schedule)]) -> Result<(), Self::Error> {
async fn setup(&mut self, schedules: &[&'static str]) -> Result<(), Self::Error> {
sqlx::query!(
r#"
INSERT INTO queue_schedules (schedule_name, schedule_expression)
SELECT * FROM UNNEST($1::text[], $2::text[]) AS t (schedule_name, schedule_expression)
ON CONFLICT (schedule_name) DO UPDATE
SET schedule_expression = EXCLUDED.schedule_expression
INSERT INTO queue_schedules (schedule_name)
SELECT * FROM UNNEST($1::text[]) AS t (schedule_name)
ON CONFLICT (schedule_name) DO NOTHING
"#,
&schedules.iter().map(|(name, _)| (*name).to_owned()).collect::<Vec<_>>(),
&schedules
.iter()
.map(|(_, schedule)| schedule.source().to_owned())
.collect::<Vec<_>>()
&schedules.iter().map(|&s| s.to_owned()).collect::<Vec<_>>(),
)
.traced()
.execute(&mut *self.conn)
@@ -74,7 +69,7 @@ impl<'c> QueueScheduleRepository for PgQueueScheduleRepository<'c> {
ScheduleLookup,
r#"
SELECT
queue_schedules.schedule_name as "schedule_name!",
queue_schedules.schedule_name,
queue_schedules.last_scheduled_at,
queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed
FROM queue_schedules

View File

@@ -7,7 +7,6 @@
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use cron::Schedule;
use opentelemetry::trace::TraceContextExt;
use rand_core::RngCore;
use serde::{Deserialize, Serialize};

View File

@@ -12,7 +12,7 @@ mod worker;
pub use self::{
job::{InsertableJob, Job, JobMetadata, QueueJobRepository, QueueJobRepositoryExt},
schedule::{QueueScheduleRepository, Schedule, ScheduleStatus},
schedule::{QueueScheduleRepository, ScheduleStatus},
tasks::*,
worker::{QueueWorkerRepository, Worker},
};

View File

@@ -7,7 +7,6 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
pub use cron::Schedule;
use crate::repository_impl;
@@ -33,13 +32,12 @@ pub trait QueueScheduleRepository: Send + Sync {
///
/// # Parameters
///
/// * `schedules` - The list of schedules to setup, as a list of (name,
/// schedule)
/// * `schedules` - The list of schedules to setup
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn setup(&mut self, schedules: &[(&'static str, Schedule)]) -> Result<(), Self::Error>;
async fn setup(&mut self, schedules: &[&'static str]) -> Result<(), Self::Error>;
/// List the schedules in the repository, with the last time they were run
///
@@ -52,7 +50,7 @@ pub trait QueueScheduleRepository: Send + Sync {
repository_impl!(QueueScheduleRepository:
async fn setup(
&mut self,
schedules: &[(&'static str, Schedule)],
schedules: &[&'static str],
) -> Result<(), Self::Error>;
async fn list(&mut self) -> Result<Vec<ScheduleStatus>, Self::Error>;

View File

@@ -3,7 +3,6 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use chrono::DateTime;
use mas_data_model::{Device, User, UserEmail, UserRecoverySession};
use serde::{Deserialize, Serialize};
use ulid::Ulid;

View File

@@ -7,8 +7,9 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use cron::Schedule;
use mas_storage::{
queue::{InsertableJob, Job, JobMetadata, Schedule, Worker},
queue::{InsertableJob, Job, JobMetadata, Worker},
Clock, RepositoryAccess, RepositoryError,
};
use mas_storage_pg::{DatabaseError, PgRepository};
@@ -298,11 +299,7 @@ impl QueueWorker {
#[tracing::instrument(name = "worker.setup_schedules", skip_all, err)]
pub async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> {
let schedules: Vec<_> = self
.schedules
.iter()
.map(|s| (s.schedule_name, s.expression.clone()))
.collect();
let schedules: Vec<_> = self.schedules.iter().map(|s| s.schedule_name).collect();
// Start a transaction on the existing PgListener connection
let txn = self