diff --git a/crates/storage-pg/.sqlx/query-d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943.json b/crates/storage-pg/.sqlx/query-245cab1cf7d9cf4e94cdec91ecb4dc8e678278121efbe1f66bcdc24144d684d0.json similarity index 60% rename from crates/storage-pg/.sqlx/query-d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943.json rename to crates/storage-pg/.sqlx/query-245cab1cf7d9cf4e94cdec91ecb4dc8e678278121efbe1f66bcdc24144d684d0.json index f87d2dff4..b6635baa8 100644 --- a/crates/storage-pg/.sqlx/query-d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943.json +++ b/crates/storage-pg/.sqlx/query-245cab1cf7d9cf4e94cdec91ecb4dc8e678278121efbe1f66bcdc24144d684d0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)\n VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')\n ", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, schedule_name, status)\n VALUES ($1, $2, $3, $4, $5, $6, $7, 'scheduled')\n ", "describe": { "columns": [], "parameters": { @@ -10,10 +10,11 @@ "Jsonb", "Jsonb", "Timestamptz", - "Timestamptz" + "Timestamptz", + "Text" ] }, "nullable": [] }, - "hash": "d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943" + "hash": "245cab1cf7d9cf4e94cdec91ecb4dc8e678278121efbe1f66bcdc24144d684d0" } diff --git a/crates/storage-pg/.sqlx/query-3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936.json b/crates/storage-pg/.sqlx/query-3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936.json deleted file mode 100644 index c65354f92..000000000 --- a/crates/storage-pg/.sqlx/query-3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'\n FROM queue_jobs\n WHERE queue_job_id = $4\n AND status = 'failed'\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Timestamptz", - "Timestamptz", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936" -} diff --git a/crates/storage-pg/.sqlx/query-3e6e3aad53b22fc53eb3ee881b29bb249b18ced57d6a4809dffc23972b3e9423.json b/crates/storage-pg/.sqlx/query-3e6e3aad53b22fc53eb3ee881b29bb249b18ced57d6a4809dffc23972b3e9423.json new file mode 100644 index 000000000..a930b70e9 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-3e6e3aad53b22fc53eb3ee881b29bb249b18ced57d6a4809dffc23972b3e9423.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_schedules\n SET last_scheduled_at = $1,\n last_scheduled_job_id = $2\n WHERE schedule_name = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "3e6e3aad53b22fc53eb3ee881b29bb249b18ced57d6a4809dffc23972b3e9423" +} diff --git a/crates/storage-pg/.sqlx/query-5b21644dd3c094b0f2f8babb2c730554dc067d0a6cad963dd7e0c66a80b342bf.json b/crates/storage-pg/.sqlx/query-5b21644dd3c094b0f2f8babb2c730554dc067d0a6cad963dd7e0c66a80b342bf.json new file mode 100644 index 000000000..ea5a5fb0a --- /dev/null +++ b/crates/storage-pg/.sqlx/query-5b21644dd3c094b0f2f8babb2c730554dc067d0a6cad963dd7e0c66a80b342bf.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_schedules\n SET last_scheduled_at = $1,\n last_scheduled_job_id = $2\n WHERE last_scheduled_job_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Uuid", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "5b21644dd3c094b0f2f8babb2c730554dc067d0a6cad963dd7e0c66a80b342bf" +} diff --git a/crates/storage-pg/.sqlx/query-8f4f071f844281fb14ecd99db3261540441b14c8206038fdc4a4336bbae3f382.json b/crates/storage-pg/.sqlx/query-8f4f071f844281fb14ecd99db3261540441b14c8206038fdc4a4336bbae3f382.json new file mode 100644 index 000000000..304e477e6 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-8f4f071f844281fb14ecd99db3261540441b14c8206038fdc4a4336bbae3f382.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at,\n attempt, scheduled_at, schedule_name, status)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, schedule_name, 'scheduled'\n FROM queue_jobs\n WHERE queue_job_id = $4\n AND status = 'failed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz", + "Timestamptz", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8f4f071f844281fb14ecd99db3261540441b14c8206038fdc4a4336bbae3f382" +} diff --git a/crates/storage-pg/.sqlx/query-9ad4e6e9bfedea476d1f47753e4738455e94eade48ad5f577e53278cc70dc266.json b/crates/storage-pg/.sqlx/query-9ad4e6e9bfedea476d1f47753e4738455e94eade48ad5f577e53278cc70dc266.json new file mode 100644 index 000000000..6a0c3b950 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-9ad4e6e9bfedea476d1f47753e4738455e94eade48ad5f577e53278cc70dc266.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n queue_schedules.schedule_name,\n queue_schedules.last_scheduled_at,\n queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed\n FROM queue_schedules\n LEFT JOIN queue_jobs\n ON queue_jobs.queue_job_id = queue_schedules.last_scheduled_job_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "schedule_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "last_scheduled_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "last_scheduled_job_completed", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + true, + null + ] + }, + "hash": "9ad4e6e9bfedea476d1f47753e4738455e94eade48ad5f577e53278cc70dc266" +} diff --git a/crates/storage-pg/.sqlx/query-f8182fd162ffb018d4f102fa7ddbc9991135065e81af8f77b5beef9405607577.json b/crates/storage-pg/.sqlx/query-f8182fd162ffb018d4f102fa7ddbc9991135065e81af8f77b5beef9405607577.json new file mode 100644 index 000000000..1a715f579 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-f8182fd162ffb018d4f102fa7ddbc9991135065e81af8f77b5beef9405607577.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_schedules (schedule_name)\n SELECT * FROM UNNEST($1::text[]) AS t (schedule_name)\n ON CONFLICT (schedule_name) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "f8182fd162ffb018d4f102fa7ddbc9991135065e81af8f77b5beef9405607577" +} diff --git a/crates/storage-pg/migrations/20241125110803_queue_job_recurrent.sql b/crates/storage-pg/migrations/20241125110803_queue_job_recurrent.sql index 18c28803c..814e073c4 100644 --- a/crates/storage-pg/migrations/20241125110803_queue_job_recurrent.sql +++ b/crates/storage-pg/migrations/20241125110803_queue_job_recurrent.sql @@ -6,11 +6,7 @@ -- Add a table to track the state of scheduled recurring jobs. CREATE TABLE queue_schedules ( -- A unique name for the schedule - schedule_name TEXT PRIMARY KEY, - - -- The cron expression to use to schedule the job. This is there just for - -- convenience, as this is defined by the backend - schedule_expression TEXT NOT NULL, + schedule_name TEXT NOT NULL PRIMARY KEY, -- The last time the job was scheduled. If NULL, it means that the job was -- never scheduled. @@ -22,7 +18,7 @@ CREATE TABLE queue_schedules ( REFERENCES queue_jobs (queue_job_id) ); --- When a job is scheduled from a recurreing schedule, we keep a column +-- When a job is scheduled from a recurring schedule, we keep a column -- referencing the name of the schedule ALTER TABLE queue_jobs ADD COLUMN schedule_name TEXT diff --git a/crates/storage-pg/src/queue/schedule.rs b/crates/storage-pg/src/queue/schedule.rs index 41f4cb7cf..3594cee7e 100644 --- a/crates/storage-pg/src/queue/schedule.rs +++ b/crates/storage-pg/src/queue/schedule.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use mas_storage::queue::{QueueScheduleRepository, Schedule, ScheduleStatus}; +use mas_storage::queue::{QueueScheduleRepository, ScheduleStatus}; use sqlx::PgConnection; use crate::{DatabaseError, ExecuteExt}; @@ -45,22 +45,17 @@ impl From for ScheduleStatus { } #[async_trait] -impl<'c> QueueScheduleRepository for PgQueueScheduleRepository<'c> { +impl QueueScheduleRepository for PgQueueScheduleRepository<'_> { type Error = DatabaseError; - async fn setup(&mut self, schedules: &[(&'static str, Schedule)]) -> Result<(), Self::Error> { + async fn setup(&mut self, schedules: &[&'static str]) -> Result<(), Self::Error> { sqlx::query!( r#" - INSERT INTO queue_schedules (schedule_name, schedule_expression) - SELECT * FROM UNNEST($1::text[], $2::text[]) AS t (schedule_name, schedule_expression) - ON CONFLICT (schedule_name) DO UPDATE - SET schedule_expression = EXCLUDED.schedule_expression + INSERT INTO queue_schedules (schedule_name) + SELECT * FROM UNNEST($1::text[]) AS t (schedule_name) + ON CONFLICT (schedule_name) DO NOTHING "#, - &schedules.iter().map(|(name, _)| (*name).to_owned()).collect::>(), - &schedules - .iter() - .map(|(_, schedule)| schedule.source().to_owned()) - .collect::>() + &schedules.iter().map(|&s| s.to_owned()).collect::>(), ) .traced() .execute(&mut *self.conn) @@ -74,7 +69,7 @@ impl<'c> QueueScheduleRepository for PgQueueScheduleRepository<'c> { ScheduleLookup, r#" SELECT - queue_schedules.schedule_name as "schedule_name!", + queue_schedules.schedule_name, queue_schedules.last_scheduled_at, queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed FROM queue_schedules diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs index e4c9f7235..298d2f758 100644 --- a/crates/storage/src/queue/job.rs +++ b/crates/storage/src/queue/job.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; -use cron::Schedule; use opentelemetry::trace::TraceContextExt; use rand_core::RngCore; use serde::{Deserialize, Serialize}; diff --git a/crates/storage/src/queue/mod.rs b/crates/storage/src/queue/mod.rs index a41bd4438..03d969bbb 100644 --- a/crates/storage/src/queue/mod.rs +++ b/crates/storage/src/queue/mod.rs @@ -12,7 +12,7 @@ mod worker; pub use self::{ job::{InsertableJob, Job, JobMetadata, QueueJobRepository, QueueJobRepositoryExt}, - schedule::{QueueScheduleRepository, Schedule, ScheduleStatus}, + schedule::{QueueScheduleRepository, ScheduleStatus}, tasks::*, worker::{QueueWorkerRepository, Worker}, }; diff --git a/crates/storage/src/queue/schedule.rs b/crates/storage/src/queue/schedule.rs index aaee5d325..aaa83e5d2 100644 --- a/crates/storage/src/queue/schedule.rs +++ b/crates/storage/src/queue/schedule.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -pub use cron::Schedule; use crate::repository_impl; @@ -33,13 +32,12 @@ pub trait QueueScheduleRepository: Send + Sync { /// /// # Parameters /// - /// * `schedules` - The list of schedules to setup, as a list of (name, - /// schedule) + /// * `schedules` - The list of schedules to setup /// /// # Errors /// /// Returns an error if the underlying repository fails. - async fn setup(&mut self, schedules: &[(&'static str, Schedule)]) -> Result<(), Self::Error>; + async fn setup(&mut self, schedules: &[&'static str]) -> Result<(), Self::Error>; /// List the schedules in the repository, with the last time they were run /// @@ -52,7 +50,7 @@ pub trait QueueScheduleRepository: Send + Sync { repository_impl!(QueueScheduleRepository: async fn setup( &mut self, - schedules: &[(&'static str, Schedule)], + schedules: &[&'static str], ) -> Result<(), Self::Error>; async fn list(&mut self) -> Result, Self::Error>; diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index a193f2037..fe8b1f9e5 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use chrono::DateTime; use mas_data_model::{Device, User, UserEmail, UserRecoverySession}; use serde::{Deserialize, Serialize}; use ulid::Ulid; diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index afba9c73e..3eab5e53c 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -7,8 +7,9 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; +use cron::Schedule; use mas_storage::{ - queue::{InsertableJob, Job, JobMetadata, Schedule, Worker}, + queue::{InsertableJob, Job, JobMetadata, Worker}, Clock, RepositoryAccess, RepositoryError, }; use mas_storage_pg::{DatabaseError, PgRepository}; @@ -298,11 +299,7 @@ impl QueueWorker { #[tracing::instrument(name = "worker.setup_schedules", skip_all, err)] pub async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> { - let schedules: Vec<_> = self - .schedules - .iter() - .map(|s| (s.schedule_name, s.expression.clone())) - .collect(); + let schedules: Vec<_> = self.schedules.iter().map(|s| s.schedule_name).collect(); // Start a transaction on the existing PgListener connection let txn = self