Foundation of new syn2mas tool (#3636)

This commit is contained in:
reivilibre
2025-01-02 18:21:39 +00:00
committed by reivilibre
parent ced66416e5
commit dd0299fa40
28 changed files with 1980 additions and 1 deletions

71
Cargo.lock generated
View File

@@ -861,6 +861,15 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "castaway"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5"
dependencies = [
"rustversion",
]
[[package]] [[package]]
name = "cbc" name = "cbc"
version = "0.1.2" version = "0.1.2"
@@ -1081,6 +1090,20 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "compact_str"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6050c3a16ddab2e412160b31f2c871015704239bca62f72f6e5f0be631d3f644"
dependencies = [
"castaway",
"cfg-if",
"itoa",
"rustversion",
"ryu",
"static_assertions",
]
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "2.5.0" version = "2.5.0"
@@ -3166,6 +3189,7 @@ dependencies = [
"mas-tasks", "mas-tasks",
"mas-templates", "mas-templates",
"mas-tower", "mas-tower",
"oauth2-types",
"opentelemetry", "opentelemetry",
"opentelemetry-http", "opentelemetry-http",
"opentelemetry-jaeger-propagator", "opentelemetry-jaeger-propagator",
@@ -3186,7 +3210,9 @@ dependencies = [
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"sqlx", "sqlx",
"syn2mas",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower",
"tower-http", "tower-http",
@@ -6013,6 +6039,12 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "static_assertions_next" name = "static_assertions_next"
version = "1.1.2" version = "1.1.2"
@@ -6075,6 +6107,23 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "syn2mas"
version = "0.13.0-rc.1"
dependencies = [
"chrono",
"compact_str",
"futures-util",
"rand",
"sqlx",
"thiserror 2.0.11",
"thiserror-ext",
"tokio",
"tracing",
"ulid",
"uuid",
]
[[package]] [[package]]
name = "sync_wrapper" name = "sync_wrapper"
version = "1.0.1" version = "1.0.1"
@@ -6141,6 +6190,28 @@ dependencies = [
"thiserror-impl 2.0.11", "thiserror-impl 2.0.11",
] ]
[[package]]
name = "thiserror-ext"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa35fd08b65a716e1a91479b00d03ed2ef4c92371a4900ceb6ec2b332f9d71df"
dependencies = [
"thiserror 1.0.69",
"thiserror-ext-derive",
]
[[package]]
name = "thiserror-ext-derive"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85ec5bcb8889378397e46bcd9f8ac636e9045f42851561e05a700667151abd18"
dependencies = [
"either",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.69" version = "1.0.69"

View File

@@ -54,6 +54,7 @@ mas-tasks = { path = "./crates/tasks/", version = "=0.13.0-rc.1" }
mas-templates = { path = "./crates/templates/", version = "=0.13.0-rc.1" } mas-templates = { path = "./crates/templates/", version = "=0.13.0-rc.1" }
mas-tower = { path = "./crates/tower/", version = "=0.13.0-rc.1" } mas-tower = { path = "./crates/tower/", version = "=0.13.0-rc.1" }
oauth2-types = { path = "./crates/oauth2-types/", version = "=0.13.0-rc.1" } oauth2-types = { path = "./crates/oauth2-types/", version = "=0.13.0-rc.1" }
syn2mas = { path = "./crates/syn2mas", version = "=0.13.0-rc.1" }
# OpenAPI schema generation and validation # OpenAPI schema generation and validation
[workspace.dependencies.aide] [workspace.dependencies.aide]
@@ -65,6 +66,9 @@ features = ["axum", "axum-headers", "macros"]
version = "7.0.14" version = "7.0.14"
features = ["chrono", "url", "tracing"] features = ["chrono", "url", "tracing"]
[workspace.dependencies.async-stream]
version = "0.3.6"
# Utility to write and implement async traits # Utility to write and implement async traits
[workspace.dependencies.async-trait] [workspace.dependencies.async-trait]
version = "0.1.85" version = "0.1.85"
@@ -94,6 +98,10 @@ version = "1.9.0"
[workspace.dependencies.camino] [workspace.dependencies.camino]
version = "1.1.9" version = "1.1.9"
# Memory optimisation for short strings
[workspace.dependencies.compact_str]
version = "0.8.0"
# Time utilities # Time utilities
[workspace.dependencies.chrono] [workspace.dependencies.chrono]
version = "0.4.39" version = "0.4.39"
@@ -312,11 +320,17 @@ features = [
[workspace.dependencies.thiserror] [workspace.dependencies.thiserror]
version = "2.0.11" version = "2.0.11"
[workspace.dependencies.thiserror-ext]
version = "0.2.0"
# Async runtime # Async runtime
[workspace.dependencies.tokio] [workspace.dependencies.tokio]
version = "1.43.0" version = "1.43.0"
features = ["full"] features = ["full"]
[workspace.dependencies.tokio-stream]
version = "0.1.16"
# Useful async utilities # Useful async utilities
[workspace.dependencies.tokio-util] [workspace.dependencies.tokio-util]
version = "0.7.13" version = "0.7.13"

View File

@@ -1,4 +1,4 @@
doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL"] doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL", "SQLite"]
disallowed-methods = [ disallowed-methods = [
{ path = "rand::thread_rng", reason = "do not create rngs on the fly, pass them as parameters" }, { path = "rand::thread_rng", reason = "do not create rngs on the fly, pass them as parameters" },

View File

@@ -37,6 +37,7 @@ serde_yaml = "0.9.34"
sqlx.workspace = true sqlx.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-util.workspace = true tokio-util.workspace = true
tokio-stream.workspace = true
tower.workspace = true tower.workspace = true
tower-http.workspace = true tower-http.workspace = true
url.workspace = true url.workspace = true
@@ -78,6 +79,9 @@ mas-tasks.workspace = true
mas-templates.workspace = true mas-templates.workspace = true
mas-tower.workspace = true mas-tower.workspace = true
oauth2-types.workspace = true
syn2mas.workspace = true
[build-dependencies] [build-dependencies]
anyhow.workspace = true anyhow.workspace = true
vergen-gitcl = { version = "1.0.5", features = ["rustc"] } vergen-gitcl = { version = "1.0.5", features = ["rustc"] }

View File

@@ -19,6 +19,7 @@ mod debug;
mod doctor; mod doctor;
mod manage; mod manage;
mod server; mod server;
mod syn2mas;
mod templates; mod templates;
mod worker; mod worker;
@@ -48,6 +49,10 @@ enum Subcommand {
/// Run diagnostics on the deployment /// Run diagnostics on the deployment
Doctor(self::doctor::Options), Doctor(self::doctor::Options),
/// Migrate from Synapse's built-in auth system to MAS.
#[clap(name = "syn2mas")]
Syn2Mas(self::syn2mas::Options),
} }
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@@ -75,6 +80,7 @@ impl Options {
Some(S::Templates(c)) => Box::pin(c.run(figment)).await, Some(S::Templates(c)) => Box::pin(c.run(figment)).await,
Some(S::Debug(c)) => Box::pin(c.run(figment)).await, Some(S::Debug(c)) => Box::pin(c.run(figment)).await,
Some(S::Doctor(c)) => Box::pin(c.run(figment)).await, Some(S::Doctor(c)) => Box::pin(c.run(figment)).await,
Some(S::Syn2Mas(c)) => Box::pin(c.run(figment)).await,
None => Box::pin(self::server::Options::default().run(figment)).await, None => Box::pin(self::server::Options::default().run(figment)).await,
} }
} }

View File

@@ -0,0 +1,83 @@
use std::process::ExitCode;
use anyhow::Context;
use clap::Parser;
use figment::Figment;
use mas_config::{ConfigurationSectionExt, DatabaseConfig};
use rand::thread_rng;
use sqlx::{Connection, Either, PgConnection};
use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader};
use tracing::{error, warn};
use crate::util::database_connection_from_config;
#[derive(Parser, Debug)]
pub(super) struct Options {
#[command(subcommand)]
subcommand: Subcommand,
/// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is only suitable for TESTING.
/// If you want to use this tool anyway, please pass this argument.
///
/// If you want to migrate from Synapse to MAS today, please use the Node.js-based tool in the MAS repository.
#[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")]
experimental_accepted: bool,
}
#[derive(Parser, Debug)]
enum Subcommand {
Check,
Migrate,
}
/// The number of parallel writing transactions active against the MAS database.
const NUM_WRITER_CONNECTIONS: usize = 8;
impl Options {
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
warn!("This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. Do not use it, except for TESTING.");
if !self.experimental_accepted {
error!("Please agree that you can only use this tool for testing.");
return Ok(ExitCode::FAILURE);
}
// TODO allow configuring the synapse database location
let mut syn_conn = PgConnection::connect("postgres:///fakesyn").await.unwrap();
let config = DatabaseConfig::extract_or_default(figment)?;
let mut mas_connection = database_connection_from_config(&config).await?;
let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(&mut mas_connection)
.await
.context("failed to issue query to lock database")?
else {
error!("Failed to acquire syn2mas lock on the database.");
error!("This likely means that another syn2mas instance is already running!");
return Ok(ExitCode::FAILURE);
};
syn2mas::mas_pre_migration_checks(&mut mas_connection).await?;
syn2mas::synapse_pre_migration_checks(&mut syn_conn).await?;
let mut reader = SynapseReader::new(&mut syn_conn, true).await?;
let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS);
for _ in 0..NUM_WRITER_CONNECTIONS {
writer_mas_connections.push(database_connection_from_config(&config).await?);
}
let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
// TODO is this rng ok?
#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
// TODO progress reporting
// TODO allow configuring the server name
syn2mas::migrate(&mut reader, &mut writer, "matrix.org", &mut rng).await?;
reader.finish().await?;
writer.finish().await?;
Ok(ExitCode::SUCCESS)
}
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas_restore_indices (name, table_name, definition)\n VALUES ($1, $2, $3)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752"
}

View File

@@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "table_name!",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "name!",
"type_info": "Name"
},
{
"ordinal": 2,
"name": "definition!",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
null,
false,
null
]
},
"hash": "12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca"
}

View File

@@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT indexname AS \"name!\", indexdef AS \"definition!\", schemaname AS \"table_name!\"\n FROM pg_indexes\n WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "name!",
"type_info": "Name"
},
{
"ordinal": 1,
"name": "definition!",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "table_name!",
"type_info": "Name"
}
],
"parameters": {
"Left": [
"Name"
]
},
"nullable": [
true,
true,
true
]
},
"hash": "486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0"
}

View File

@@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype = 'f' AND confrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "table_name!",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "name!",
"type_info": "Name"
},
{
"ordinal": 2,
"name": "definition!",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
null,
false,
null
]
},
"hash": "5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas_restore_constraints (name, table_name, definition)\n VALUES ($1, $2, $3)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93"
}

View File

@@ -0,0 +1,32 @@
{
"db_name": "PostgreSQL",
"query": "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "table_name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "definition",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
},
"hash": "78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84"
}

View File

@@ -0,0 +1,32 @@
{
"db_name": "PostgreSQL",
"query": "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "table_name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "definition",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
},
"hash": "979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema\n AND tablename = ANY($1)\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "_dummy",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"NameArray"
]
},
"nullable": [
null
]
},
"hash": "b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76"
}

View File

@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_passwords\n (user_password_id, user_id, hashed_password, created_at, version)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"TextArray",
"TimestamptzArray",
"Int4Array"
]
},
"nullable": []
},
"hash": "c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425"
}

View File

@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"TextArray",
"TimestamptzArray",
"TimestamptzArray",
"BoolArray"
]
},
"nullable": []
},
"hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f"
}

27
crates/syn2mas/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "syn2mas"
version.workspace = true
license.workspace = true
authors.workspace = true
edition.workspace = true
homepage.workspace = true
repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
thiserror.workspace = true
thiserror-ext.workspace = true
tokio.workspace = true
sqlx.workspace = true
chrono.workspace = true
compact_str.workspace = true
tracing.workspace = true
futures-util = "0.3.30"
rand.workspace = true
uuid = "1.10.0"
ulid = { workspace = true, features = ["uuid"] }
[lints]
workspace = true

View File

@@ -0,0 +1,30 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! # Checks
//!
//! This module provides safety checks to run against a Synapse database before running the Synapse-to-MAS migration.
use sqlx::PgConnection;
use thiserror::Error;
use crate::mas_writer;
#[derive(Debug, Error)]
pub enum Error {
#[error("problem with MAS database: {0}")]
MasDatabase(#[source] mas_writer::checks::Error),
#[error("query failed: {0}")]
Sqlx(#[from] sqlx::Error),
}
#[tracing::instrument(skip_all)]
pub async fn synapse_pre_migration_checks(
synapse_connection: &mut PgConnection,
) -> Result<(), Error> {
// TODO check that the database looks like a Synapse database and is sane for migration
Ok(())
}

16
crates/syn2mas/src/lib.rs Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
mod mas_writer;
mod synapse_reader;
mod checks;
mod migration;
pub use self::checks::synapse_pre_migration_checks;
pub use self::mas_writer::locking::LockedMasDatabase;
pub use self::mas_writer::{checks::mas_pre_migration_checks, MasWriter};
pub use self::migration::migrate;
pub use self::synapse_reader::SynapseReader;

View File

@@ -0,0 +1,74 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! # MAS Database Checks
//!
//! This module provides safety checks to run against a MAS database before running the Synapse-to-MAS migration.
use thiserror::Error;
use thiserror_ext::ContextInto;
use super::{is_syn2mas_in_progress, locking::LockedMasDatabase, MAS_TABLES_AFFECTED_BY_MIGRATION};
#[derive(Debug, Error, ContextInto)]
pub enum Error {
#[error("the MAS database is not empty: rows found in at least `{table}`")]
MasDatabaseNotEmpty { table: &'static str },
#[error("query against {table} failed — is this actually a MAS database?")]
MaybeNotMas {
#[source]
source: sqlx::Error,
table: &'static str,
},
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
#[error("unable to check if syn2mas is already in progress")]
UnableToCheckInProgress(#[source] super::Error),
}
/// Check that a MAS database is ready for being migrated to.
///
/// Concretely, this checks that the database is empty.
///
/// If syn2mas is already in progress on this database, the checks are skipped.
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - If any database access error occurs.
/// - If any MAS tables involved in the migration are not empty.
/// - If we can't check whether syn2mas is already in progress on this database or not.
#[tracing::instrument(skip_all)]
pub async fn mas_pre_migration_checks<'a>(
mas_connection: &mut LockedMasDatabase<'a>,
) -> Result<(), Error> {
if is_syn2mas_in_progress(mas_connection.as_mut())
.await
.map_err(Error::UnableToCheckInProgress)?
{
// syn2mas already in progress, so we already performed the checks
return Ok(());
}
// Check that the database looks like a MAS database and that it is also an empty database.
for &table in MAS_TABLES_AFFECTED_BY_MIGRATION {
let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1"))
.fetch_optional(mas_connection.as_mut())
.await
.into_maybe_not_mas(table)?
.is_some();
if row_present {
return Err(Error::MasDatabaseNotEmpty { table });
}
}
Ok(())
}

View File

@@ -0,0 +1,151 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use sqlx::PgConnection;
use tracing::debug;
use super::{Error, IntoDatabase};
/// Description of a constraint, which allows recreating it later.
pub struct ConstraintDescription {
pub name: String,
pub table_name: String,
pub definition: String,
}
pub struct IndexDescription {
pub name: String,
pub table_name: String,
pub definition: String,
}
/// Look up and return the definition of a constraint.
pub async fn describe_constraints_on_table(
conn: &mut PgConnection,
table_name: &str,
) -> Result<Vec<ConstraintDescription>, Error> {
sqlx::query_as!(
ConstraintDescription,
r#"
SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1
AND n.nspname = current_schema;
"#,
table_name
).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}"))
}
/// Look up and return the definitions of foreign-key constraints whose
/// target table is the one specified.
pub async fn describe_foreign_key_constraints_to_table(
conn: &mut PgConnection,
target_table_name: &str,
) -> Result<Vec<ConstraintDescription>, Error> {
sqlx::query_as!(
ConstraintDescription,
r#"
SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE contype = 'f' AND confrelid::regclass::text = $1
AND n.nspname = current_schema;
"#,
target_table_name
).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}"))
}
/// Look up and return the definitions of all indices on a given table.
pub async fn describe_indices_on_table(
conn: &mut PgConnection,
table_name: &str,
) -> Result<Vec<IndexDescription>, Error> {
sqlx::query_as!(
IndexDescription,
r#"
SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!"
FROM pg_indexes
WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL
"#,
table_name
).fetch_all(&mut *conn).await.into_database("cannot search for indices")
}
/// Drops a constraint from the database.
///
/// The constraint must exist prior to this call.
pub async fn drop_constraint(
conn: &mut PgConnection,
constraint: &ConstraintDescription,
) -> Result<(), Error> {
let name = &constraint.name;
let table_name = &constraint.table_name;
debug!("dropping constraint {name} on table {table_name}");
sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};"))
.execute(&mut *conn)
.await
.into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?;
Ok(())
}
/// Drops an index from the database.
///
/// The index must exist prior to this call.
pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
let index_name = &index.name;
debug!("dropping index {index_name}");
sqlx::query(&format!("DROP INDEX {index_name};"))
.execute(&mut *conn)
.await
.into_database_with(|| format!("failed to temporarily drop {index_name}"))?;
Ok(())
}
/// Restores (recreates) a constraint.
///
/// The constraint must not exist prior to this call.
pub async fn restore_constraint(
conn: &mut PgConnection,
constraint: &ConstraintDescription,
) -> Result<(), Error> {
let ConstraintDescription {
name,
table_name,
definition,
} = &constraint;
sqlx::query(&format!(
"ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};"
))
.execute(conn)
.await
.into_database_with(|| {
format!("failed to recreate constraint {name} on {table_name} with {definition}")
})?;
Ok(())
}
/// Restores (recreates) a index.
///
/// The index must not exist prior to this call.
pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
let IndexDescription {
name,
table_name,
definition,
} = &index;
sqlx::query(&format!("{definition};"))
.execute(conn)
.await
.into_database_with(|| {
format!("failed to recreate index {name} on {table_name} with {definition}")
})?;
Ok(())
}

View File

@@ -0,0 +1,58 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use std::sync::LazyLock;
use sqlx::{
postgres::{PgAdvisoryLock, PgAdvisoryLockGuard},
Either, PgConnection,
};
static SYN2MAS_ADVISORY_LOCK: LazyLock<PgAdvisoryLock> =
LazyLock::new(|| PgAdvisoryLock::new("syn2mas-maswriter"));
/// A wrapper around a Postgres connection which holds a session-wide advisory lock
/// preventing concurrent access by other syn2mas instances.
pub struct LockedMasDatabase<'conn> {
inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>,
}
impl<'conn> LockedMasDatabase<'conn> {
/// Attempts to lock the MAS database against concurrent access by other syn2mas instances.
///
/// If the lock can be acquired, returns a `LockedMasDatabase`.
/// If the lock cannot be acquired, returns the connection back to the caller wrapped in `Either::Right`.
///
/// # Errors
///
/// Errors are returned for underlying database errors.
pub async fn try_new(
mas_connection: &'conn mut PgConnection,
) -> Result<Either<Self, &'conn mut PgConnection>, sqlx::Error> {
SYN2MAS_ADVISORY_LOCK
.try_acquire(mas_connection)
.await
.map(|either| match either {
Either::Left(inner) => Either::Left(LockedMasDatabase { inner }),
Either::Right(unlocked) => Either::Right(unlocked),
})
}
/// Releases the advisory lock on the MAS database, returning the underlying
/// connection.
///
/// # Errors
///
/// Errors are returned for underlying database errors.
pub async fn unlock(self) -> Result<&'conn mut PgConnection, sqlx::Error> {
self.inner.release_now().await
}
}
impl AsMut<PgConnection> for LockedMasDatabase<'_> {
fn as_mut(&mut self) -> &mut PgConnection {
self.inner.as_mut()
}
}

View File

@@ -0,0 +1,671 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! # MAS Writer
//!
//! This module is responsible for writing new records to MAS' database.
use std::fmt::Display;
use chrono::{DateTime, Utc};
use futures_util::{future::BoxFuture, TryStreamExt};
use sqlx::{query, query_as, Executor, PgConnection};
use thiserror::Error;
use thiserror_ext::{Construct, ContextInto};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{error, info, warn, Level};
use uuid::Uuid;
use self::{
constraint_pausing::{ConstraintDescription, IndexDescription},
locking::LockedMasDatabase,
};
pub mod checks;
pub mod locking;
mod constraint_pausing;
#[derive(Debug, Error, Construct, ContextInto)]
pub enum Error {
#[error("database error whilst {context}")]
Database {
#[source]
source: sqlx::Error,
context: String,
},
#[error("writer connection pool shut down due to error")]
#[allow(clippy::enum_variant_names)]
WriterConnectionPoolError,
#[error("inconsistent database: {0}")]
Inconsistent(String),
#[error("{0}")]
Multiple(MultipleErrors),
}
#[derive(Debug)]
pub struct MultipleErrors {
errors: Vec<Error>,
}
impl Display for MultipleErrors {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "multiple errors")?;
for error in &self.errors {
write!(f, "\n- {error}")?;
}
Ok(())
}
}
impl From<Vec<Error>> for MultipleErrors {
fn from(value: Vec<Error>) -> Self {
MultipleErrors { errors: value }
}
}
struct WriterConnectionPool {
/// How many connections are in circulation
num_connections: usize,
/// A receiver handle to get a writer connection
/// The writer connection will be mid-transaction!
connection_rx: Receiver<Result<PgConnection, Error>>,
/// A sender handle to return a writer connection to the pool
/// The connection should still be mid-transaction!
connection_tx: Sender<Result<PgConnection, Error>>,
}
impl WriterConnectionPool {
pub fn new(connections: Vec<PgConnection>) -> Self {
let num_connections = connections.len();
let (connection_tx, connection_rx) = mpsc::channel(num_connections);
for connection in connections {
connection_tx
.try_send(Ok(connection))
.expect("there should be room for this connection");
}
WriterConnectionPool {
num_connections,
connection_rx,
connection_tx,
}
}
pub async fn spawn_with_connection<F>(&mut self, task: F) -> Result<(), Error>
where
F: for<'conn> FnOnce(&'conn mut PgConnection) -> BoxFuture<'conn, Result<(), Error>>
+ Send
+ Sync
+ 'static,
{
match self.connection_rx.recv().await {
Some(Ok(mut connection)) => {
let connection_tx = self.connection_tx.clone();
tokio::task::spawn(async move {
let to_return = match task(&mut connection).await {
Ok(()) => Ok(connection),
Err(error) => {
error!("error in writer: {error}");
Err(error)
}
};
// This should always succeed in sending unless we're already shutting
// down for some other reason.
let _: Result<_, _> = connection_tx.send(to_return).await;
});
Ok(())
}
Some(Err(error)) => {
// This should always succeed in sending unless we're already shutting
// down for some other reason.
let _: Result<_, _> = self.connection_tx.send(Err(error)).await;
Err(Error::WriterConnectionPoolError)
}
None => {
unreachable!("we still hold a reference to the sender, so this shouldn't happen")
}
}
}
/// Finishes writing to the database, committing all changes.
///
/// # Errors
///
/// - If any errors were returned to the pool.
/// - If committing the changes failed.
///
/// # Panics
///
/// - If connections were not returned to the pool. (This indicates a serious bug.)
pub async fn finish(self) -> Result<(), Vec<Error>> {
let mut errors = Vec::new();
let Self {
num_connections,
mut connection_rx,
connection_tx,
} = self;
// Drop the sender handle so we gracefully allow the receiver to close
drop(connection_tx);
let mut finished_connections = 0;
while let Some(connection_or_error) = connection_rx.recv().await {
finished_connections += 1;
match connection_or_error {
Ok(mut connection) => {
if let Err(err) = query("COMMIT;").execute(&mut connection).await {
errors.push(err.into_database("commit writer transaction"));
}
}
Err(error) => {
errors.push(error);
}
}
}
assert_eq!(finished_connections, num_connections, "syn2mas had a bug: connections went missing {finished_connections} != {num_connections}");
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
}
pub struct MasWriter<'c> {
conn: LockedMasDatabase<'c>,
writer_pool: WriterConnectionPool,
indices_to_restore: Vec<IndexDescription>,
constraints_to_restore: Vec<ConstraintDescription>,
}
pub struct MasNewUser {
pub user_id: Uuid,
pub username: String,
pub created_at: DateTime<Utc>,
pub locked_at: Option<DateTime<Utc>>,
pub can_request_admin: bool,
}
pub struct MasNewUserPassword {
pub user_password_id: Uuid,
pub user_id: Uuid,
pub hashed_password: String,
pub created_at: DateTime<Utc>,
}
/// List of all MAS tables that are written to by syn2mas.
pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords"];
/// Detect whether a syn2mas migration has started on the given database.
///
/// Concretly, this checks for the presence of syn2mas restoration tables.
///
/// Returns `true` if syn2mas has started, or `false` if it hasn't.
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - If any database error occurs whilst querying the database.
/// - If some, but not all, syn2mas restoration tables are present.
/// (This shouldn't be possible without syn2mas having been sabotaged!)
pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result<bool, Error> {
// Names of tables used for syn2mas resumption
// Must be `String`s, not just `&str`, for the query.
let restore_table_names = vec![
"syn2mas_restore_constraints".to_owned(),
"syn2mas_restore_indices".to_owned(),
];
let num_resumption_tables = query!(
r#"
SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema
AND tablename = ANY($1)
"#,
&restore_table_names,
)
.fetch_all(conn.as_mut())
.await
.into_database("failed to query count of resumption tables")?
.len();
if num_resumption_tables == 0 {
Ok(false)
} else if num_resumption_tables == restore_table_names.len() {
Ok(true)
} else {
Err(Error::inconsistent(
"some, but not all, syn2mas resumption tables were found",
))
}
}
impl<'conn> MasWriter<'conn> {
/// Creates a new MAS writer.
///
/// # Errors
///
/// Errors are returned in the following conditions:
///
/// - If the database connection experiences an error.
#[allow(clippy::missing_panics_doc)] // not real
#[tracing::instrument(skip_all)]
pub async fn new(
mut conn: LockedMasDatabase<'conn>,
mut writer_connections: Vec<PgConnection>,
) -> Result<Self, Error> {
// Given that we don't have any concurrent transactions here,
// the READ COMMITTED isolation level is sufficient.
query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")
.execute(conn.as_mut())
.await
.into_database("begin MAS transaction")?;
let syn2mas_started = is_syn2mas_in_progress(conn.as_mut()).await?;
let indices_to_restore;
let constraints_to_restore;
if syn2mas_started {
// We are resuming from a partially-done syn2mas migration
// We should reset the database so that we're starting from scratch.
warn!("Partial syn2mas migration has already been done; resetting.");
for table in MAS_TABLES_AFFECTED_BY_MIGRATION {
query(&format!("TRUNCATE syn2mas__{table};"))
.execute(conn.as_mut())
.await
.into_database_with(|| format!("failed to truncate table syn2mas__{table}"))?;
}
indices_to_restore = query_as!(
IndexDescription,
"SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key"
)
.fetch_all(conn.as_mut())
.await
.into_database("failed to get syn2mas restore data (index descriptions)")?;
constraints_to_restore = query_as!(
ConstraintDescription,
"SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key"
)
.fetch_all(conn.as_mut())
.await
.into_database("failed to get syn2mas restore data (constraint descriptions)")?;
} else {
info!("Starting new syn2mas migration");
conn.as_mut()
.execute_many(include_str!("syn2mas_temporary_tables.sql"))
// We don't care about any query results
.try_collect::<Vec<_>>()
.await
.into_database("could not create temporary tables")?;
// Pause (temporarily drop) indices and constraints in order to improve
// performance of bulk data loading.
(indices_to_restore, constraints_to_restore) =
Self::pause_indices(conn.as_mut()).await?;
// Persist these index and constraint definitions.
for IndexDescription {
name,
table_name,
definition,
} in &indices_to_restore
{
query!(
r#"
INSERT INTO syn2mas_restore_indices (name, table_name, definition)
VALUES ($1, $2, $3)
"#,
name,
table_name,
definition
)
.execute(conn.as_mut())
.await
.into_database("failed to save restore data (index)")?;
}
for ConstraintDescription {
name,
table_name,
definition,
} in &constraints_to_restore
{
query!(
r#"
INSERT INTO syn2mas_restore_constraints (name, table_name, definition)
VALUES ($1, $2, $3)
"#,
name,
table_name,
definition
)
.execute(conn.as_mut())
.await
.into_database("failed to save restore data (index)")?;
}
}
query("COMMIT;")
.execute(conn.as_mut())
.await
.into_database("begin MAS transaction")?;
// Now after all the schema changes have been done, begin writer transactions
for writer_connection in &mut writer_connections {
query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")
.execute(&mut *writer_connection)
.await
.into_database("begin MAS writer transaction")?;
}
Ok(Self {
conn,
writer_pool: WriterConnectionPool::new(writer_connections),
indices_to_restore,
constraints_to_restore,
})
}
#[tracing::instrument(skip_all)]
async fn pause_indices(
conn: &mut PgConnection,
) -> Result<(Vec<IndexDescription>, Vec<ConstraintDescription>), Error> {
let mut indices_to_restore = Vec::new();
let mut constraints_to_restore = Vec::new();
for &unprefixed_table in MAS_TABLES_AFFECTED_BY_MIGRATION {
let table = format!("syn2mas__{unprefixed_table}");
// First drop incoming foreign key constraints
for constraint in
constraint_pausing::describe_foreign_key_constraints_to_table(&mut *conn, &table)
.await?
{
constraint_pausing::drop_constraint(&mut *conn, &constraint).await?;
constraints_to_restore.push(constraint);
}
// After all incoming foreign key constraints have been removed,
// we can now drop internal constraints.
for constraint in
constraint_pausing::describe_constraints_on_table(&mut *conn, &table).await?
{
constraint_pausing::drop_constraint(&mut *conn, &constraint).await?;
constraints_to_restore.push(constraint);
}
// After all constraints have been removed, we can drop indices.
for index in constraint_pausing::describe_indices_on_table(&mut *conn, &table).await? {
constraint_pausing::drop_index(&mut *conn, &index).await?;
indices_to_restore.push(index);
}
}
Ok((indices_to_restore, constraints_to_restore))
}
async fn restore_indices(
conn: &mut LockedMasDatabase<'_>,
indices_to_restore: &[IndexDescription],
constraints_to_restore: &[ConstraintDescription],
) -> Result<(), Error> {
// First restore all indices. The order is not important as far as I know.
// However the indices are needed before constraints.
for index in indices_to_restore.iter().rev() {
constraint_pausing::restore_index(conn.as_mut(), index).await?;
}
// Then restore all constraints.
// The order here is the reverse of drop order, since some constraints may rely
// on other constraints to work.
for constraint in constraints_to_restore.iter().rev() {
constraint_pausing::restore_constraint(conn.as_mut(), constraint).await?;
}
Ok(())
}
/// Finish writing to the MAS database, flushing and committing all changes.
///
/// # Errors
///
/// Errors are returned in the following conditions:
///
/// - If the database connection experiences an error.
#[tracing::instrument(skip_all)]
pub async fn finish(mut self) -> Result<(), Error> {
// Commit all writer transactions to the database.
self.writer_pool
.finish()
.await
.map_err(|errors| Error::Multiple(MultipleErrors::from(errors)))?;
// Now all the data has been migrated, finish off by restoring indices and constraints!
query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")
.execute(self.conn.as_mut())
.await
.into_database("begin MAS transaction")?;
Self::restore_indices(
&mut self.conn,
&self.indices_to_restore,
&self.constraints_to_restore,
)
.await?;
self.conn
.as_mut()
.execute_many(include_str!("syn2mas_revert_temporary_tables.sql"))
// We don't care about any query results
.try_collect::<Vec<_>>()
.await
.into_database("could not revert temporary tables")?;
query("COMMIT;")
.execute(self.conn.as_mut())
.await
.into_database("ending MAS transaction")?;
self.conn
.unlock()
.await
.into_database("could not unlock MAS database")?;
Ok(())
}
/// Write a batch of users to the database.
///
/// # Errors
///
/// Errors are returned in the following conditions:
///
/// - If the database writer connection pool had an error.
#[allow(clippy::missing_panics_doc)] // not a real panic
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_users(&mut self, users: Vec<MasNewUser>) -> Result<(), Error> {
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
// `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement
// without having to change the statement SQL thus altering the query plan.
// See <https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-bind-an-array-to-a-values-clause-how-can-i-do-bulk-inserts>.
// In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement,
// which is allegedly the best for insert performance, but is less simple to encode.
if users.is_empty() {
return Ok(());
}
let mut user_ids: Vec<Uuid> = Vec::with_capacity(users.len());
let mut usernames: Vec<String> = Vec::with_capacity(users.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(users.len());
let mut locked_ats: Vec<Option<DateTime<Utc>>> = Vec::with_capacity(users.len());
let mut can_request_admins: Vec<bool> = Vec::with_capacity(users.len());
for MasNewUser {
user_id,
username,
created_at,
locked_at,
can_request_admin,
} in users
{
user_ids.push(user_id);
usernames.push(username);
created_ats.push(created_at);
locked_ats.push(locked_at);
can_request_admins.push(can_request_admin);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__users
(user_id, username, created_at, locked_at, can_request_admin)
SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])
"#,
&user_ids[..],
&usernames[..],
&created_ats[..],
// We need to override the typing for arrays of optionals (sqlx limitation)
&locked_ats[..] as &[Option<DateTime<Utc>>],
&can_request_admins[..],
).execute(&mut *conn).await.into_database("writing users to MAS")?;
Ok(())
})).await
}
/// Write a batch of user passwords to the database.
///
/// # Errors
///
/// Errors are returned in the following conditions:
///
/// - If the database writer connection pool had an error.
#[allow(clippy::missing_panics_doc)] // not a real panic
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_passwords(
&mut self,
passwords: Vec<MasNewUserPassword>,
) -> Result<(), Error> {
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
if passwords.is_empty() {
return Ok(());
}
let mut user_password_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
let mut hashed_passwords: Vec<String> = Vec::with_capacity(passwords.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(passwords.len());
let mut versions: Vec<i32> = Vec::with_capacity(passwords.len());
for MasNewUserPassword {
user_password_id,
user_id,
hashed_password,
created_at,
} in passwords
{
user_password_ids.push(user_password_id);
user_ids.push(user_id);
hashed_passwords.push(hashed_password);
created_ats.push(created_at);
// TODO hardcoding version to `1` may not be correct long-term?
versions.push(1);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__user_passwords
(user_password_id, user_id, hashed_password, created_at, version)
SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])
"#,
&user_password_ids[..],
&user_ids[..],
&hashed_passwords[..],
&created_ats[..],
&versions[..],
).execute(&mut *conn).await.into_database("writing users to MAS")?;
Ok(())
})).await
}
}
// How many entries to buffer at once, before writing a batch of rows to the database.
// TODO tune: didn't see that much difference between 4k and 64k
// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, especially for DB latency, but probably fiiine
// and also we won't be able to stream to two tables at once...)
const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
pub struct MasUserWriteBuffer<'writer, 'conn> {
users: Vec<MasNewUser>,
passwords: Vec<MasNewUserPassword>,
writer: &'writer mut MasWriter<'conn>,
}
impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> {
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
MasUserWriteBuffer {
users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
writer,
}
}
pub async fn finish(mut self) -> Result<(), Error> {
self.flush_users().await?;
self.flush_passwords().await?;
Ok(())
}
pub async fn flush_users(&mut self) -> Result<(), Error> {
// via copy: 13s
// not via copy: 14s
// difference probably gets worse with latency
self.writer
.write_users(std::mem::take(&mut self.users))
.await?;
self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}
pub async fn flush_passwords(&mut self) -> Result<(), Error> {
self.writer
.write_passwords(std::mem::take(&mut self.passwords))
.await?;
self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}
pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> {
self.users.push(user);
if self.users.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_users().await?;
}
Ok(())
}
pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> {
self.passwords.push(password);
if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_passwords().await?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
// TODO test me
}

View File

@@ -0,0 +1,12 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- This script should revert what `syn2mas_temporary_tables.sql` does.
DROP TABLE syn2mas_restore_constraints;
DROP TABLE syn2mas_restore_indices;
ALTER TABLE syn2mas__users RENAME TO users;
ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords;

View File

@@ -0,0 +1,41 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- # syn2mas Temporary Tables
-- This file takes a MAS database and:
--
-- 1. creates temporary tables used by syn2mas for storing restore data
-- 2. renames important tables with the `syn2mas__` prefix, to prevent
-- running MAS instances from having any opportunity to see or modify
-- the partial data in the database, especially whilst it is not protected
-- by constraints.
--
-- All changes in this file must be reverted by `syn2mas_revert_temporary_tables.sql`
-- in the same directory.
-- corresponds to `ConstraintDescription`
CREATE TABLE syn2mas_restore_constraints (
-- synthetic auto-incrementing ID so we can load these in order
order_key SERIAL NOT NULL PRIMARY KEY,
table_name TEXT NOT NULL,
name TEXT NOT NULL,
definition TEXT NOT NULL
);
-- corresponds to `IndexDescription`
CREATE TABLE syn2mas_restore_indices (
-- synthetic auto-incrementing ID so we can load these in order
order_key SERIAL NOT NULL PRIMARY KEY,
table_name TEXT NOT NULL,
name TEXT NOT NULL,
definition TEXT NOT NULL
);
-- Now we rename all tables that we touch during the migration.
ALTER TABLE users RENAME TO syn2mas__users;
ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords;

View File

@@ -0,0 +1,165 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! # Migration
//!
//! This module provides the high-level logic for performing the Synapse-to-MAS database migration.
//!
//! This module does not implement any of the safety checks that should be run *before* the migration.
use std::{collections::HashMap, pin::pin};
use chrono::{DateTime, Utc};
use compact_str::CompactString;
use futures_util::StreamExt as _;
use rand::RngCore;
use thiserror::Error;
use thiserror_ext::ContextInto;
use ulid::Ulid;
use uuid::Uuid;
use crate::{
mas_writer::{self, MasNewUser, MasNewUserPassword, MasUserWriteBuffer, MasWriter},
synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseUser},
SynapseReader,
};
#[derive(Debug, Error, ContextInto)]
pub enum Error {
#[error("error when reading synapse DB ({context}): {source}")]
Synapse {
source: synapse_reader::Error,
context: String,
},
#[error("error when writing to MAS DB ({context}): {source}")]
Mas {
source: mas_writer::Error,
context: String,
},
#[error("failed to extract localpart of {user:?}: {source}")]
ExtractLocalpart {
source: ExtractLocalpartError,
user: FullUserId,
},
}
struct UsersMigrated {
/// Lookup table from user localpart to that user's UUID in MAS.
user_localparts_to_uuid: HashMap<CompactString, Uuid>,
}
/// Performs a migration from Synapse's database to MAS' database.
///
/// # Panics
///
/// - If there are more than `usize::MAX` users
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - An underlying database access error, either to MAS or to Synapse.
/// - Invalid data in the Synapse database.
pub async fn migrate(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
rng: &mut impl RngCore,
) -> Result<(), Error> {
let counts = synapse.count_rows().await.into_synapse("counting users")?;
migrate_users(
synapse,
mas,
counts
.users
.try_into()
.expect("More than usize::MAX users — wow!"),
server_name,
rng,
)
.await?;
Ok(())
}
async fn migrate_users(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
user_count_hint: usize,
server_name: &str,
rng: &mut impl RngCore,
) -> Result<UsersMigrated, Error> {
let mut write_buffer = MasUserWriteBuffer::new(mas);
let mut users_stream = pin!(synapse.read_users());
// TODO is 1:1 capacity enough for a hashmap?
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint);
while let Some(user_res) = users_stream.next().await {
let user = user_res.into_synapse("reading user")?;
let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?;
user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id);
write_buffer
.write_user(mas_user)
.await
.into_mas("writing user")?;
if let Some(mas_password) = mas_password_opt {
write_buffer
.write_password(mas_password)
.await
.into_mas("writing password")?;
}
}
write_buffer
.finish()
.await
.into_mas("writing users & passwords")?;
Ok(UsersMigrated {
user_localparts_to_uuid,
})
}
fn transform_user(
user: &SynapseUser,
server_name: &str,
rng: &mut impl RngCore,
) -> Result<(MasNewUser, Option<MasNewUserPassword>), Error> {
let username = user
.name
.extract_localpart(server_name)
.into_extract_localpart(user.name.clone())?
.to_owned();
let new_user = MasNewUser {
user_id: Uuid::from(Ulid::from_datetime_with_source(
DateTime::<Utc>::from(user.creation_ts).into(),
rng,
)),
username,
created_at: user.creation_ts.into(),
locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),
can_request_admin: bool::from(user.admin),
};
let mas_password = user
.password_hash
.clone()
.map(|password_hash| MasNewUserPassword {
user_password_id: Uuid::from(Ulid::from_datetime_with_source(
DateTime::<Utc>::from(user.creation_ts).into(),
rng,
)),
user_id: new_user.user_id,
hashed_password: password_hash,
created_at: new_user.created_at,
});
Ok((new_user, mas_password))
}

View File

@@ -0,0 +1,265 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! # Synapse Database Reader
//!
//! This module provides facilities for streaming relevant types of database records from a Synapse database.
use chrono::{DateTime, Utc};
use futures_util::{Stream, TryStreamExt};
use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Row, Transaction, Type};
use thiserror::Error;
use thiserror_ext::ContextInto;
#[derive(Debug, Error, ContextInto)]
pub enum Error {
#[error("database error whilst {context}")]
Database {
#[source]
source: sqlx::Error,
context: String,
},
}
#[derive(Clone, Debug, sqlx::Decode)]
pub struct FullUserId(pub String);
impl Type<Postgres> for FullUserId {
fn type_info() -> <sqlx::Postgres as sqlx::Database>::TypeInfo {
<String as Type<Postgres>>::type_info()
}
}
#[derive(Debug, Error)]
pub enum ExtractLocalpartError {
#[error("user ID does not start with `@` sigil")]
NoAtSigil,
#[error("user ID does not have a `:` separator")]
NoSeparator,
#[error("wrong server name: expected {expected:?}, got {found:?}")]
WrongServerName { expected: String, found: String },
}
impl FullUserId {
/// Extract the localpart from the User ID, asserting that the User ID has the correct
/// server name.
///
/// # Errors
///
/// A handful of basic validity checks are performed and an error may be returned
/// if the User ID is not valid.
/// However, the User ID grammar is not checked fully.
///
/// If the wrong server name is asserted, returns an error.
pub fn extract_localpart(
&self,
expected_server_name: &str,
) -> Result<&str, ExtractLocalpartError> {
let Some(without_sigil) = self.0.strip_prefix('@') else {
return Err(ExtractLocalpartError::NoAtSigil);
};
let Some((localpart, server_name)) = without_sigil.split_once(':') else {
return Err(ExtractLocalpartError::NoSeparator);
};
if server_name != expected_server_name {
return Err(ExtractLocalpartError::WrongServerName {
expected: expected_server_name.to_owned(),
found: server_name.to_owned(),
});
};
Ok(localpart)
}
}
/// A Synapse boolean.
/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite versions
/// that did not have native boolean support.
#[derive(Copy, Clone, Debug)]
pub struct SynapseBool(bool);
impl<'r> sqlx::Decode<'r, Postgres> for SynapseBool {
fn decode(
value: <Postgres as sqlx::Database>::ValueRef<'r>,
) -> Result<Self, sqlx::error::BoxDynError> {
<i16 as sqlx::Decode<Postgres>>::decode(value)
.map(|boolean_int| SynapseBool(boolean_int != 0))
}
}
impl sqlx::Type<Postgres> for SynapseBool {
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
<i16 as sqlx::Type<Postgres>>::type_info()
}
}
impl From<SynapseBool> for bool {
fn from(SynapseBool(value): SynapseBool) -> Self {
value
}
}
/// A timestamp stored as the number of seconds since the Unix epoch.
/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** since the Unix epoch.
/// But some timestamps are still stored in seconds.
#[derive(Copy, Clone, Debug)]
pub struct SecondsTimestamp(DateTime<Utc>);
impl From<SecondsTimestamp> for DateTime<Utc> {
fn from(SecondsTimestamp(value): SecondsTimestamp) -> Self {
value
}
}
impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp {
fn decode(
value: <Postgres as sqlx::Database>::ValueRef<'r>,
) -> Result<Self, sqlx::error::BoxDynError> {
<i64 as sqlx::Decode<Postgres>>::decode(value).map(|milliseconds_since_epoch| {
SecondsTimestamp(DateTime::from_timestamp_nanos(
milliseconds_since_epoch * 1_000_000_000,
))
})
}
}
impl sqlx::Type<Postgres> for SecondsTimestamp {
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
<i64 as sqlx::Type<Postgres>>::type_info()
}
}
#[derive(Clone, Debug, FromRow)]
pub struct SynapseUser {
/// Full User ID of the user
pub name: FullUserId,
/// Password hash string for the user. Optional (null if no password is set).
pub password_hash: Option<String>,
/// Whether the user is a Synapse Admin
pub admin: SynapseBool,
/// Whether the user is deactivated
pub deactivated: SynapseBool,
/// When the user was created
pub creation_ts: SecondsTimestamp,
// TODO ...
// TODO is_guest
// TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts)
}
/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on.
///
/// This is a safety measure against other processes changing the data underneath our feet.
/// It's still not a good idea to run Synapse at the same time as the migration.
// TODO not complete!
const TABLES_TO_LOCK: &[&str] = &["users"];
/// Number of migratable rows in various Synapse tables.
/// Used to estimate progress.
#[derive(Clone, Debug)]
pub struct SynapseRowCounts {
pub users: i64,
}
pub struct SynapseReader<'c> {
txn: Transaction<'c, Postgres>,
}
impl<'conn> SynapseReader<'conn> {
/// Create a new Synapse reader, which entails creating a transaction and locking Synapse tables.
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - An underlying database error
/// - If we can't lock the Synapse tables (pointing to the fact that Synapse may still be running)
pub async fn new(
synapse_connection: &'conn mut PgConnection,
dry_run: bool,
) -> Result<Self, Error> {
let mut txn = synapse_connection
.begin()
.await
.into_database("begin transaction")?;
query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;")
.execute(&mut *txn)
.await
.into_database("set transaction")?;
let lock_type = if dry_run {
// We expect dry runs to be done alongside Synapse running, so we don't want to
// interfere with Synapse's database access in that case.
"ACCESS SHARE"
} else {
"EXCLUSIVE"
};
for table in TABLES_TO_LOCK {
query(&format!("LOCK TABLE {table} IN {lock_type} MODE NOWAIT;"))
.execute(&mut *txn)
.await
.into_database_with(|| format!("locking Synapse table `{table}`"))?;
}
Ok(Self { txn })
}
/// Finishes the Synapse reader, committing the transaction.
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - An underlying database error whilst committing the transaction.
pub async fn finish(self) -> Result<(), Error> {
// TODO enforce that this is called somehow.
self.txn.commit().await.into_database("end transaction")?;
Ok(())
}
/// Counts the rows in the Synapse database to get an estimate of how large the migration is going to be.
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - An underlying database error
pub async fn count_rows(&mut self) -> Result<SynapseRowCounts, Error> {
let users = sqlx::query(
"
SELECT COUNT(1) FROM users
WHERE appservice_id IS NULL AND is_guest = 0
",
)
.fetch_one(&mut *self.txn)
.await
.into_database("counting Synapse users")?
.try_get::<i64, _>(0)
.into_database("couldn't decode count of Synapse users table")?;
Ok(SynapseRowCounts { users })
}
/// Reads Synapse users, excluding application service users (which do not need to be migrated), from the database.
pub fn read_users(&mut self) -> impl Stream<Item = Result<SynapseUser, Error>> + '_ {
sqlx::query_as(
"
SELECT
name, password_hash, admin, deactivated, creation_ts
FROM users
WHERE appservice_id IS NULL AND is_guest = 0
",
)
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse users"))
}
}
#[cfg(test)]
mod test {
// TODO test me
}

35
misc/sqlx_update.sh Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/sh
set -eu
if [ "${DATABASE_URL+defined}" != defined ]; then
echo "You need to set DATABASE_URL"
exit 1
fi
if [ "$DATABASE_URL" = "postgres:" ]; then
# Hacky, but psql doesn't accept `postgres:` on its own like sqlx does
export DATABASE_URL="postgres:///"
fi
crates_dir=$(dirname $(realpath $0))"/../crates"
CRATES_WITH_SQLX="storage-pg syn2mas"
for crate in $CRATES_WITH_SQLX; do
echo "=== Updating sqlx query info for $crate ==="
if [ $crate = syn2mas ]; then
# We need to apply the syn2mas_temporary_tables.sql one-off 'migration'
# for checking the syn2mas queries
# not evident from the help text, but psql accepts connection URLs as the dbname
psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql"
fi
(cd "$crates_dir/$crate" && cargo sqlx prepare) || echo "(failed to prepare for $crate)"
if [ $crate = syn2mas ]; then
# Revert syn2mas temporary tables
psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql"
fi
done