diff --git a/crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json b/crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json new file mode 100644 index 000000000..941ae4366 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO apalis.jobs (job, id, job_type)\n VALUES ($1::json, $2::text, $3::text)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Json", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b" +} diff --git a/crates/storage-pg/.sqlx/query-b753790eecbbb4bcd87b9e9a1d1b0dd6c3b50e82ffbfee356e2cf755d72f00be.json b/crates/storage-pg/.sqlx/query-b753790eecbbb4bcd87b9e9a1d1b0dd6c3b50e82ffbfee356e2cf755d72f00be.json deleted file mode 100644 index 1858b964a..000000000 --- a/crates/storage-pg/.sqlx/query-b753790eecbbb4bcd87b9e9a1d1b0dd6c3b50e82ffbfee356e2cf755d72f00be.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT id as \"id!\"\n FROM apalis.push_job($1::text, $2::json, 'Pending', now(), 25)\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id!", - "type_info": "Text" - } - ], - "parameters": { - "Left": [ - "Text", - "Json" - ] - }, - "nullable": [ - null - ] - }, - "hash": "b753790eecbbb4bcd87b9e9a1d1b0dd6c3b50e82ffbfee356e2cf755d72f00be" -} diff --git a/crates/storage-pg/migrations/20220709210445_add_job_fn.sql b/crates/storage-pg/migrations/20220709210445_add_job_fn.sql deleted file mode 100644 index 26b56e1d3..000000000 --- a/crates/storage-pg/migrations/20220709210445_add_job_fn.sql +++ /dev/null @@ -1,46 +0,0 @@ -CREATE OR REPLACE FUNCTION apalis.push_job( - job_type text, - job json DEFAULT NULL :: json, - job_id text DEFAULT NULL :: text, - status text DEFAULT 'Pending' :: text, - run_at timestamptz DEFAULT NOW() :: timestamptz, - max_attempts integer DEFAULT 25 :: integer -) RETURNS apalis.jobs AS $$ - - DECLARE - v_job_row apalis.jobs; - v_job_id text; - - BEGIN - IF job_type is not NULL and length(job_type) > 512 THEN raise exception 'Job_type is too long (max length: 512).' USING errcode = 'APAJT'; - END IF; - - IF max_attempts < 1 THEN raise exception 'Job maximum attempts must be at least 1.' USING errcode = 'APAMA'; - end IF; - - SELECT - uuid_in( - md5(random() :: text || now() :: text) :: cstring - ) INTO v_job_id; - INSERT INTO - apalis.jobs - VALUES - ( - job, - v_job_id, - job_type, - status, - 0, - max_attempts, - run_at, - NULL, - NULL, - NULL, - NULL - ) - returning * INTO v_job_row; - RETURN v_job_row; -END; -$$ LANGUAGE plpgsql volatile; - - diff --git a/crates/storage-pg/migrations/20230330210841_replace_add_job_fn.sql b/crates/storage-pg/migrations/20230330210841_replace_add_job_fn.sql deleted file mode 100644 index 3562e96ff..000000000 --- a/crates/storage-pg/migrations/20230330210841_replace_add_job_fn.sql +++ /dev/null @@ -1,106 +0,0 @@ -CREATE EXTENSION IF NOT EXISTS pgcrypto; - -CREATE OR REPLACE FUNCTION generate_ulid() -RETURNS TEXT -AS $$ -DECLARE - -- Crockford's Base32 - encoding BYTEA = '0123456789ABCDEFGHJKMNPQRSTVWXYZ'; - timestamp BYTEA = E'\\000\\000\\000\\000\\000\\000'; - output TEXT = ''; - - unix_time BIGINT; - ulid BYTEA; -BEGIN - -- 6 timestamp bytes - unix_time = (EXTRACT(EPOCH FROM CLOCK_TIMESTAMP()) * 1000)::BIGINT; - timestamp = SET_BYTE(timestamp, 0, (unix_time >> 40)::BIT(8)::INTEGER); - timestamp = SET_BYTE(timestamp, 1, (unix_time >> 32)::BIT(8)::INTEGER); - timestamp = SET_BYTE(timestamp, 2, (unix_time >> 24)::BIT(8)::INTEGER); - timestamp = SET_BYTE(timestamp, 3, (unix_time >> 16)::BIT(8)::INTEGER); - timestamp = SET_BYTE(timestamp, 4, (unix_time >> 8)::BIT(8)::INTEGER); - timestamp = SET_BYTE(timestamp, 5, unix_time::BIT(8)::INTEGER); - - -- 10 entropy bytes - ulid = timestamp || gen_random_bytes(10); - - -- Encode the timestamp - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 224) >> 5)); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 31))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 1) & 248) >> 3)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 1) & 7) << 2) | ((GET_BYTE(ulid, 2) & 192) >> 6))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 2) & 62) >> 1)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 2) & 1) << 4) | ((GET_BYTE(ulid, 3) & 240) >> 4))); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 3) & 15) << 1) | ((GET_BYTE(ulid, 4) & 128) >> 7))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 4) & 124) >> 2)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 4) & 3) << 3) | ((GET_BYTE(ulid, 5) & 224) >> 5))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 5) & 31))); - - -- Encode the entropy - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 6) & 248) >> 3)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 6) & 7) << 2) | ((GET_BYTE(ulid, 7) & 192) >> 6))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 7) & 62) >> 1)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 7) & 1) << 4) | ((GET_BYTE(ulid, 8) & 240) >> 4))); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 8) & 15) << 1) | ((GET_BYTE(ulid, 9) & 128) >> 7))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 9) & 124) >> 2)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 9) & 3) << 3) | ((GET_BYTE(ulid, 10) & 224) >> 5))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 10) & 31))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 11) & 248) >> 3)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 11) & 7) << 2) | ((GET_BYTE(ulid, 12) & 192) >> 6))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 12) & 62) >> 1)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 12) & 1) << 4) | ((GET_BYTE(ulid, 13) & 240) >> 4))); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 13) & 15) << 1) | ((GET_BYTE(ulid, 14) & 128) >> 7))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 14) & 124) >> 2)); - output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 14) & 3) << 3) | ((GET_BYTE(ulid, 15) & 224) >> 5))); - output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 15) & 31))); - - RETURN output; -END -$$ -LANGUAGE plpgsql -VOLATILE; - - -CREATE OR REPLACE FUNCTION apalis.push_job( - job_type text, - job json DEFAULT NULL :: json, - status text DEFAULT 'Pending' :: text, - run_at timestamptz DEFAULT NOW() :: timestamptz, - max_attempts integer DEFAULT 25 :: integer -) RETURNS apalis.jobs AS $$ - - DECLARE - v_job_row apalis.jobs; - v_job_id text; - - BEGIN - IF job_type is not NULL and length(job_type) > 512 THEN raise exception 'Job_type is too long (max length: 512).' USING errcode = 'APAJT'; - END IF; - - IF max_attempts < 1 THEN raise exception 'Job maximum attempts must be at least 1.' USING errcode = 'APAMA'; - end IF; - - SELECT - CONCAT('JID-' || generate_ulid()) INTO v_job_id; - INSERT INTO - apalis.jobs - VALUES - ( - job, - v_job_id, - job_type, - status, - 0, - max_attempts, - run_at, - NULL, - NULL, - NULL, - NULL - ) - returning * INTO v_job_row; - RETURN v_job_row; -END; -$$ LANGUAGE plpgsql volatile; - - diff --git a/crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql b/crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql deleted file mode 100644 index 7d799b9ba..000000000 --- a/crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql +++ /dev/null @@ -1,8 +0,0 @@ -DROP FUNCTION IF EXISTS apalis.push_job( - job_type text, - job json, - job_id text, - status text, - run_at timestamptz, - max_attempts integer -); \ No newline at end of file diff --git a/crates/storage-pg/migrations/20230823125247_drop_apalis_push_job.sql b/crates/storage-pg/migrations/20230823125247_drop_apalis_push_job.sql new file mode 100644 index 000000000..1ef75bd83 --- /dev/null +++ b/crates/storage-pg/migrations/20230823125247_drop_apalis_push_job.sql @@ -0,0 +1,53 @@ +-- Copyright 2023 The Matrix.org Foundation C.I.C. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +/** + * This fully drops any existing push_job functions, as we're not relying on them anymore + */ + +-- Temporarily change the client_min_messages to suppress the NOTICEs +SET client_min_messages = 'ERROR'; + +DROP FUNCTION IF EXISTS apalis.push_job( + job_type text, + job json, + job_id text, + status text, + run_at timestamptz, + max_attempts integer +); + +DROP FUNCTION IF EXISTS apalis.push_job( + job_type text, + job json, + status text, + run_at timestamptz, + max_attempts integer +); + +-- Reset the client_min_messages +RESET client_min_messages; + +/** + * Remove the old applied migrations in case they were applied: + * - 20220709210445_add_job_fn.sql + * - 20230330210841_replace_add_job_fn.sql + * - 20230408110421_drop_old_push_job.sql + */ +DELETE FROM public._sqlx_migrations +WHERE version IN ( + 20220709210445, + 20230330210841, + 20230408110421 +); \ No newline at end of file diff --git a/crates/storage-pg/src/job.rs b/crates/storage-pg/src/job.rs index f151bec3c..3c6b0645b 100644 --- a/crates/storage-pg/src/job.rs +++ b/crates/storage-pg/src/job.rs @@ -18,7 +18,7 @@ use async_trait::async_trait; use mas_storage::job::{JobId, JobRepository, JobSubmission}; use sqlx::PgConnection; -use crate::{errors::DatabaseInconsistencyError, DatabaseError, ExecuteExt}; +use crate::{DatabaseError, ExecuteExt}; /// An implementation of [`JobRepository`] for a PostgreSQL connection. pub struct PgJobRepository<'c> { @@ -43,7 +43,7 @@ impl<'c> JobRepository for PgJobRepository<'c> { fields( db.statement, job.id, - job.name, + job.name = submission.name(), ), err, )] @@ -51,25 +51,24 @@ impl<'c> JobRepository for PgJobRepository<'c> { &mut self, submission: JobSubmission, ) -> Result { - // XXX: The apalis.push_job function is not unique, so we have to specify all - // the arguments - let res = sqlx::query_scalar!( + // XXX: This does not use the clock nor the rng + let id = JobId::new(); + tracing::Span::current().record("job.id", tracing::field::display(&id)); + + let res = sqlx::query!( r#" - SELECT id as "id!" - FROM apalis.push_job($1::text, $2::json, 'Pending', now(), 25) + INSERT INTO apalis.jobs (job, id, job_type) + VALUES ($1::json, $2::text, $3::text) "#, - submission.name(), submission.payload(), + id.to_string(), + submission.name(), ) .traced() - .fetch_one(&mut *self.conn) + .execute(&mut *self.conn) .await?; - let id = res - .parse() - .map_err(|source| DatabaseInconsistencyError::on("apalis.push_job").source(source))?; - - tracing::Span::current().record("job.id", tracing::field::display(&id)); + DatabaseError::ensure_affected_rows(&res, 1)?; Ok(id) } diff --git a/crates/storage-pg/src/lib.rs b/crates/storage-pg/src/lib.rs index 4a822986a..151d8d2da 100644 --- a/crates/storage-pg/src/lib.rs +++ b/crates/storage-pg/src/lib.rs @@ -193,4 +193,13 @@ pub(crate) use self::errors::DatabaseInconsistencyError; pub use self::{errors::DatabaseError, repository::PgRepository, tracing::ExecuteExt}; /// Embedded migrations, allowing them to run on startup -pub static MIGRATOR: Migrator = sqlx::migrate!(); +pub static MIGRATOR: Migrator = { + // XXX: The macro does not let us ignore missing migrations, so we have to do it + // like this. See https://github.com/launchbadge/sqlx/issues/1788 + let mut m = sqlx::migrate!(); + + // We manually removed some migrations because they made us depend on the + // `pgcrypto` extension. See: https://github.com/matrix-org/matrix-authentication-service/issues/1557 + m.ignore_missing = true; + m +};