syn2mas: Migrate threepids to MAS (#3878)

* Add a table to hold unsupported threepids

* Migrate threepids from Synapse to MAS
This commit is contained in:
reivilibre
2025-01-27 17:25:00 +00:00
committed by reivilibre
parent 5179d41214
commit 1dc172dcf7
15 changed files with 502 additions and 96 deletions

View File

@@ -12,10 +12,12 @@ use tracing::{error, warn};
use crate::util::database_connection_from_config;
/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are errors preventing migration.
/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are
/// errors preventing migration.
const EXIT_CODE_CHECK_ERRORS: u8 = 10;
/// The exit code used by `syn2mas check` when there are warnings which should be considered prior to migration.
/// The exit code used by `syn2mas check` when there are warnings which should
/// be considered prior to migration.
const EXIT_CODE_CHECK_WARNINGS: u8 = 11;
#[derive(Parser, Debug)]
@@ -23,32 +25,38 @@ pub(super) struct Options {
#[command(subcommand)]
subcommand: Subcommand,
/// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is only suitable for TESTING.
/// If you want to use this tool anyway, please pass this argument.
/// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is
/// only suitable for TESTING. If you want to use this tool anyway,
/// please pass this argument.
///
/// If you want to migrate from Synapse to MAS today, please use the Node.js-based tool in the MAS repository.
/// If you want to migrate from Synapse to MAS today, please use the
/// Node.js-based tool in the MAS repository.
#[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")]
experimental_accepted: bool,
/// Path to the Synapse configuration (in YAML format).
/// May be specified multiple times if multiple Synapse configuration files are in use.
/// May be specified multiple times if multiple Synapse configuration files
/// are in use.
#[clap(long = "synapse-config")]
synapse_configuration_files: Vec<Utf8PathBuf>,
/// Override the Synapse database URI.
/// syn2mas normally loads the Synapse database connection details from the Synapse configuration.
/// However, it may sometimes be necessary to override the database URI and in that case this flag can be used.
/// syn2mas normally loads the Synapse database connection details from the
/// Synapse configuration. However, it may sometimes be necessary to
/// override the database URI and in that case this flag can be used.
///
/// Should be a connection URI of the following general form:
/// ```text
/// postgresql://[user[:password]@][host][:port][/dbname][?param1=value1&...]
/// ```
/// To use a UNIX socket at a custom path, the host should be a path to a socket, but in the URI string
/// it must be URI-encoded by replacing `/` with `%2F`.
/// To use a UNIX socket at a custom path, the host should be a path to a
/// socket, but in the URI string it must be URI-encoded by replacing
/// `/` with `%2F`.
///
/// Finally, any missing values will be loaded from the libpq-compatible environment variables
/// `PGHOST`, `PGPORT`, `PGUSER`, `PGDATABASE`, `PGPASSWORD`, etc.
/// It is valid to specify the URL `postgresql:` and configure all values through those environment variables.
/// Finally, any missing values will be loaded from the libpq-compatible
/// environment variables `PGHOST`, `PGPORT`, `PGUSER`, `PGDATABASE`,
/// `PGPASSWORD`, etc. It is valid to specify the URL `postgresql:` and
/// configure all values through those environment variables.
#[clap(long = "synapse-database-uri")]
synapse_database_uri: Option<PgConnectOptions>,
}

View File

@@ -0,0 +1,30 @@
-- Copyright 2025 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- Tracks third-party ID associations that have been verified but are
-- not currently supported by MAS.
-- This is currently used when importing third-party IDs from Synapse,
-- which historically could verify at least phone numbers.
-- E-mail associations will not be stored in this table because those are natively
-- supported by MAS; see the `user_emails` table.
CREATE TABLE user_unsupported_third_party_ids(
-- The owner of the third-party ID assocation
user_id UUID NOT NULL
REFERENCES users(user_id) ON DELETE CASCADE,
-- What type of association is this?
medium TEXT NOT NULL,
-- The address of the associated ID, e.g. a phone number or other identifier.
address TEXT NOT NULL,
-- When the association was created
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (user_id, medium, address)
);

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"TextArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b"
}

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00"
}

View File

@@ -27,6 +27,8 @@ rand.workspace = true
uuid = "1.10.0"
ulid = { workspace = true, features = ["uuid"] }
mas-config.workspace = true
[dev-dependencies]
mas-storage-pg.workspace = true
@@ -34,7 +36,5 @@ anyhow.workspace = true
insta.workspace = true
serde.workspace = true
mas-config.workspace = true
[lints]
workspace = true

View File

@@ -8,11 +8,13 @@ mod synapse_reader;
mod migration;
pub use self::mas_writer::locking::LockedMasDatabase;
pub use self::mas_writer::{checks::mas_pre_migration_checks, MasWriter};
pub use self::migration::migrate;
pub use self::synapse_reader::checks::{
synapse_config_check, synapse_config_check_against_mas_config, synapse_database_check,
pub use self::{
mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter},
migration::migrate,
synapse_reader::{
checks::{
synapse_config_check, synapse_config_check_against_mas_config, synapse_database_check,
},
config as synapse_config, SynapseReader,
},
};
pub use self::synapse_reader::config as synapse_config;
pub use self::synapse_reader::SynapseReader;

View File

@@ -5,7 +5,8 @@
//! # MAS Database Checks
//!
//! This module provides safety checks to run against a MAS database before running the Synapse-to-MAS migration.
//! This module provides safety checks to run against a MAS database before
//! running the Synapse-to-MAS migration.
use thiserror::Error;
use thiserror_ext::ContextInto;
@@ -43,7 +44,8 @@ pub enum Error {
///
/// - If any database access error occurs.
/// - If any MAS tables involved in the migration are not empty.
/// - If we can't check whether syn2mas is already in progress on this database or not.
/// - If we can't check whether syn2mas is already in progress on this database
/// or not.
#[tracing::instrument(skip_all)]
pub async fn mas_pre_migration_checks<'a>(
mas_connection: &mut LockedMasDatabase<'a>,
@@ -56,7 +58,8 @@ pub async fn mas_pre_migration_checks<'a>(
return Ok(());
}
// Check that the database looks like a MAS database and that it is also an empty database.
// Check that the database looks like a MAS database and that it is also an
// empty database.
for &table in MAS_TABLES_AFFECTED_BY_MIGRATION {
let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1"))

View File

@@ -13,17 +13,19 @@ use sqlx::{
static SYN2MAS_ADVISORY_LOCK: LazyLock<PgAdvisoryLock> =
LazyLock::new(|| PgAdvisoryLock::new("syn2mas-maswriter"));
/// A wrapper around a Postgres connection which holds a session-wide advisory lock
/// preventing concurrent access by other syn2mas instances.
/// A wrapper around a Postgres connection which holds a session-wide advisory
/// lock preventing concurrent access by other syn2mas instances.
pub struct LockedMasDatabase<'conn> {
inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>,
}
impl<'conn> LockedMasDatabase<'conn> {
/// Attempts to lock the MAS database against concurrent access by other syn2mas instances.
/// Attempts to lock the MAS database against concurrent access by other
/// syn2mas instances.
///
/// If the lock can be acquired, returns a `LockedMasDatabase`.
/// If the lock cannot be acquired, returns the connection back to the caller wrapped in `Either::Right`.
/// If the lock cannot be acquired, returns the connection back to the
/// caller wrapped in `Either::Right`.
///
/// # Errors
///

View File

@@ -146,7 +146,8 @@ impl WriterConnectionPool {
///
/// # Panics
///
/// - If connections were not returned to the pool. (This indicates a serious bug.)
/// - If connections were not returned to the pool. (This indicates a
/// serious bug.)
pub async fn finish(self) -> Result<(), Vec<Error>> {
let mut errors = Vec::new();
@@ -207,14 +208,33 @@ pub struct MasNewUserPassword {
pub created_at: DateTime<Utc>,
}
/// The 'version' of the password hashing scheme used for passwords when they are
/// migrated from Synapse to MAS.
pub struct MasNewEmailThreepid {
pub user_email_id: Uuid,
pub user_id: Uuid,
pub email: String,
pub created_at: DateTime<Utc>,
}
pub struct MasNewUnsupportedThreepid {
pub user_id: Uuid,
pub medium: String,
pub address: String,
pub created_at: DateTime<Utc>,
}
/// The 'version' of the password hashing scheme used for passwords when they
/// are migrated from Synapse to MAS.
/// This is version 1, as in the previous syn2mas script.
// TODO hardcoding version to `1` may not be correct long-term?
pub const MIGRATED_PASSWORD_VERSION: u16 = 1;
/// List of all MAS tables that are written to by syn2mas.
pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords"];
pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[
"users",
"user_passwords",
"user_emails",
"user_unsupported_third_party_ids",
];
/// Detect whether a syn2mas migration has started on the given database.
///
@@ -227,8 +247,8 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords
/// Errors are returned under the following circumstances:
///
/// - If any database error occurs whilst querying the database.
/// - If some, but not all, syn2mas restoration tables are present.
/// (This shouldn't be possible without syn2mas having been sabotaged!)
/// - If some, but not all, syn2mas restoration tables are present. (This
/// shouldn't be possible without syn2mas having been sabotaged!)
pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result<bool, Error> {
// Names of tables used for syn2mas resumption
// Must be `String`s, not just `&str`, for the query.
@@ -457,7 +477,8 @@ impl<'conn> MasWriter<'conn> {
.await
.map_err(|errors| Error::Multiple(MultipleErrors::from(errors)))?;
// Now all the data has been migrated, finish off by restoring indices and constraints!
// Now all the data has been migrated, finish off by restoring indices and
// constraints!
query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")
.execute(self.conn.as_mut())
@@ -563,11 +584,11 @@ impl<'conn> MasWriter<'conn> {
&mut self,
passwords: Vec<MasNewUserPassword>,
) -> Result<(), Error> {
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
if passwords.is_empty() {
return Ok(());
}
if passwords.is_empty() {
return Ok(());
}
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
let mut user_password_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
let mut hashed_passwords: Vec<String> = Vec::with_capacity(passwords.len());
@@ -603,12 +624,107 @@ impl<'conn> MasWriter<'conn> {
Ok(())
})).await
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_email_threepids(
&mut self,
threepids: Vec<MasNewEmailThreepid>,
) -> Result<(), Error> {
if threepids.is_empty() {
return Ok(());
}
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut user_email_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
let mut emails: Vec<String> = Vec::with_capacity(threepids.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(threepids.len());
for MasNewEmailThreepid {
user_email_id,
user_id,
email,
created_at,
} in threepids
{
user_email_ids.push(user_email_id);
user_ids.push(user_id);
emails.push(email);
created_ats.push(created_at);
}
// `confirmed_at` is going to get removed in a future MAS release,
// so just populate with `created_at`
sqlx::query!(
r#"
INSERT INTO syn2mas__user_emails
(user_email_id, user_id, email, created_at, confirmed_at)
SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])
"#,
&user_email_ids[..],
&user_ids[..],
&emails[..],
&created_ats[..],
).execute(&mut *conn).await.into_database("writing emails to MAS")?;
Ok(())
})
}).await
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_unsupported_threepids(
&mut self,
threepids: Vec<MasNewUnsupportedThreepid>,
) -> Result<(), Error> {
if threepids.is_empty() {
return Ok(());
}
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
let mut mediums: Vec<String> = Vec::with_capacity(threepids.len());
let mut addresses: Vec<String> = Vec::with_capacity(threepids.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(threepids.len());
for MasNewUnsupportedThreepid {
user_id,
medium,
address,
created_at,
} in threepids
{
user_ids.push(user_id);
mediums.push(medium);
addresses.push(address);
created_ats.push(created_at);
}
// `confirmed_at` is going to get removed in a future MAS release,
// so just populate with `created_at`
sqlx::query!(
r#"
INSERT INTO syn2mas__user_unsupported_third_party_ids
(user_id, medium, address, created_at)
SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])
"#,
&user_ids[..],
&mediums[..],
&addresses[..],
&created_ats[..],
).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?;
Ok(())
})
}).await
}
}
// How many entries to buffer at once, before writing a batch of rows to the database.
// TODO tune: didn't see that much difference between 4k and 64k
// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, especially for DB latency, but probably fiiine
// and also we won't be able to stream to two tables at once...)
// How many entries to buffer at once, before writing a batch of rows to the
// database. TODO tune: didn't see that much difference between 4k and 64k
// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better,
// especially for DB latency, but probably fiiine and also we won't be able to
// stream to two tables at once...)
const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
pub struct MasUserWriteBuffer<'writer, 'conn> {
@@ -670,13 +786,69 @@ impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> {
}
}
pub struct MasThreepidWriteBuffer<'writer, 'conn> {
email: Vec<MasNewEmailThreepid>,
unsupported: Vec<MasNewUnsupportedThreepid>,
writer: &'writer mut MasWriter<'conn>,
}
impl<'writer, 'conn> MasThreepidWriteBuffer<'writer, 'conn> {
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
MasThreepidWriteBuffer {
email: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
unsupported: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
writer,
}
}
pub async fn finish(mut self) -> Result<(), Error> {
self.flush_emails().await?;
self.flush_unsupported().await?;
Ok(())
}
pub async fn flush_emails(&mut self) -> Result<(), Error> {
self.writer
.write_email_threepids(std::mem::take(&mut self.email))
.await?;
self.email.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}
pub async fn flush_unsupported(&mut self) -> Result<(), Error> {
self.writer
.write_unsupported_threepids(std::mem::take(&mut self.unsupported))
.await?;
self.unsupported.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}
pub async fn write_email(&mut self, user: MasNewEmailThreepid) -> Result<(), Error> {
self.email.push(user);
if self.email.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_emails().await?;
}
Ok(())
}
pub async fn write_password(
&mut self,
unsupported: MasNewUnsupportedThreepid,
) -> Result<(), Error> {
self.unsupported.push(unsupported);
if self.unsupported.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_unsupported().await?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::collections::{BTreeMap, BTreeSet};
use chrono::DateTime;
use futures_util::TryStreamExt;
use serde::Serialize;
use sqlx::{Column, PgConnection, PgPool, Row};
use uuid::Uuid;
@@ -707,9 +879,11 @@ mod test {
const SKIPPED_TABLES: &[&str] = &["_sqlx_migrations"];
/// Produces a serialisable snapshot of a database, usable for snapshot testing
/// Produces a serialisable snapshot of a database, usable for snapshot
/// testing
///
/// For brevity, empty tables, as well as [`SKIPPED_TABLES`], will not be included in the snapshot.
/// For brevity, empty tables, as well as [`SKIPPED_TABLES`], will not be
/// included in the snapshot.
async fn snapshot_database(conn: &mut PgConnection) -> DatabaseSnapshot {
let mut out = DatabaseSnapshot::default();
let table_names: Vec<String> = sqlx::query_scalar(

View File

@@ -10,3 +10,5 @@ DROP TABLE syn2mas_restore_indices;
ALTER TABLE syn2mas__users RENAME TO users;
ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords;
ALTER TABLE syn2mas__user_emails RENAME TO user_emails;
ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids;

View File

@@ -39,3 +39,5 @@ CREATE TABLE syn2mas_restore_indices (
-- Now we rename all tables that we touch during the migration.
ALTER TABLE users RENAME TO syn2mas__users;
ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords;
ALTER TABLE user_emails RENAME TO syn2mas__user_emails;
ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids;

View File

@@ -5,9 +5,11 @@
//! # Migration
//!
//! This module provides the high-level logic for performing the Synapse-to-MAS database migration.
//! This module provides the high-level logic for performing the Synapse-to-MAS
//! database migration.
//!
//! This module does not implement any of the safety checks that should be run *before* the migration.
//! This module does not implement any of the safety checks that should be run
//! *before* the migration.
use std::{collections::HashMap, pin::pin};
@@ -17,12 +19,16 @@ use futures_util::StreamExt as _;
use rand::RngCore;
use thiserror::Error;
use thiserror_ext::ContextInto;
use tracing::Level;
use ulid::Ulid;
use uuid::Uuid;
use crate::{
mas_writer::{self, MasNewUser, MasNewUserPassword, MasUserWriteBuffer, MasWriter},
synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseUser},
mas_writer::{
self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUser, MasNewUserPassword,
MasThreepidWriteBuffer, MasUserWriteBuffer, MasWriter,
},
synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseThreepid, SynapseUser},
SynapseReader,
};
@@ -70,7 +76,7 @@ pub async fn migrate(
) -> Result<(), Error> {
let counts = synapse.count_rows().await.into_synapse("counting users")?;
migrate_users(
let migrated_users = migrate_users(
synapse,
mas,
counts
@@ -82,9 +88,19 @@ pub async fn migrate(
)
.await?;
migrate_threepids(
synapse,
mas,
server_name,
rng,
&migrated_users.user_localparts_to_uuid,
)
.await?;
Ok(())
}
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_users(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
@@ -126,6 +142,65 @@ async fn migrate_users(
})
}
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_threepids(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
) -> Result<(), Error> {
let mut write_buffer = MasThreepidWriteBuffer::new(mas);
let mut users_stream = pin!(synapse.read_threepids());
while let Some(threepid_res) = users_stream.next().await {
let SynapseThreepid {
user_id: synapse_user_id,
medium,
address,
added_at,
} = threepid_res.into_synapse("reading threepid")?;
let created_at: DateTime<Utc> = added_at.into();
let username = synapse_user_id
.extract_localpart(server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
todo!()
};
if medium == "email" {
write_buffer
.write_email(MasNewEmailThreepid {
user_id,
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
created_at.into(),
rng,
)),
email: address,
created_at,
})
.await
.into_mas("writing email")?;
} else {
write_buffer
.write_password(MasNewUnsupportedThreepid {
user_id,
medium,
address,
created_at,
})
.await
.into_mas("writing unsupported threepid")?;
}
}
write_buffer.finish().await.into_mas("writing threepids")?;
Ok(())
}
fn transform_user(
user: &SynapseUser,
server_name: &str,

View File

@@ -5,7 +5,8 @@
//! # Synapse Checks
//!
//! This module provides safety checks to run against a Synapse database before running the Synapse-to-MAS migration.
//! This module provides safety checks to run against a Synapse database before
//! running the Synapse-to-MAS migration.
use figment::Figment;
use mas_config::{
@@ -15,9 +16,8 @@ use mas_config::{
use sqlx::{prelude::FromRow, query_as, query_scalar, PgConnection};
use thiserror::Error;
use crate::mas_writer::MIGRATED_PASSWORD_VERSION;
use super::config::Config;
use crate::mas_writer::MIGRATED_PASSWORD_VERSION;
#[derive(Debug, Error)]
pub enum Error {
@@ -31,7 +31,8 @@ pub enum Error {
MasPasswordConfig(#[source] anyhow::Error),
}
/// An error found whilst checking the Synapse database, that should block a migration.
/// An error found whilst checking the Synapse database, that should block a
/// migration.
#[derive(Debug, Error)]
pub enum CheckError {
#[error("MAS config is missing a password hashing scheme with version '1'")]
@@ -74,8 +75,9 @@ pub enum CheckError {
},
}
/// A potential hazard found whilst checking the Synapse database, that should be presented
/// to the operator to check they are aware of a caveat before proceeding with the migration.
/// A potential hazard found whilst checking the Synapse database, that should
/// be presented to the operator to check they are aware of a caveat before
/// proceeding with the migration.
#[derive(Debug, Error)]
pub enum CheckWarning {
#[error("Synapse config contains OIDC auth configuration (issuer: {issuer:?}) which will need to be manually mapped to an upstream OpenID Connect Provider during migration.")]
@@ -148,12 +150,14 @@ pub fn synapse_config_check(synapse_config: &Config) -> (Vec<CheckWarning>, Vec<
(warnings, errors)
}
/// Check that the given Synapse configuration is sane for migration to a MAS with the given MAS configuration.
/// Check that the given Synapse configuration is sane for migration to a MAS
/// with the given MAS configuration.
///
/// # Errors
///
/// - If any necessary section of MAS config cannot be parsed.
/// - If the MAS password configuration (including any necessary secrets) can't be loaded.
/// - If the MAS password configuration (including any necessary secrets) can't
/// be loaded.
pub async fn synapse_config_check_against_mas_config(
synapse: &Config,
mas: &Figment,
@@ -169,8 +173,9 @@ pub async fn synapse_config_check_against_mas_config(
let mas_matrix = MatrixConfig::extract(mas)?;
// Look for the MAS password hashing scheme that will be used for imported Synapse passwords,
// then check the configuration matches so that Synapse passwords will be compatible with MAS.
// Look for the MAS password hashing scheme that will be used for imported
// Synapse passwords, then check the configuration matches so that Synapse
// passwords will be compatible with MAS.
if let Some((_, algorithm, _, secret)) = mas_password_schemes
.iter()
.find(|(version, _, _, _)| *version == MIGRATED_PASSWORD_VERSION)
@@ -215,11 +220,13 @@ pub async fn synapse_config_check_against_mas_config(
Ok((warnings, errors))
}
/// Check that the Synapse database is sane for migration. Returns a list of warnings and errors.
/// Check that the Synapse database is sane for migration. Returns a list of
/// warnings and errors.
///
/// # Errors
///
/// - If there is some database connection error, or the given database is not a Synapse database.
/// - If there is some database connection error, or the given database is not a
/// Synapse database.
/// - If the OAuth2 section of the MAS configuration could not be parsed.
#[tracing::instrument(skip_all)]
pub async fn synapse_database_check(

View File

@@ -11,7 +11,8 @@ use serde::Deserialize;
use sqlx::postgres::PgConnectOptions;
/// The root of a Synapse configuration.
/// This struct only includes fields which the Synapse-to-MAS migration is interested in.
/// This struct only includes fields which the Synapse-to-MAS migration is
/// interested in.
///
/// See: <https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html>
#[derive(Deserialize)]
@@ -29,7 +30,8 @@ pub struct Config {
#[serde(default)]
pub enable_registration_captcha: bool,
/// Normally this defaults to true, but when MAS integration is enabled in Synapse it defaults to false.
/// Normally this defaults to true, but when MAS integration is enabled in
/// Synapse it defaults to false.
#[serde(default)]
pub enable_3pid_changes: bool,
@@ -118,8 +120,9 @@ impl Config {
/// See: <https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#database>
#[derive(Deserialize)]
pub struct DatabaseSection {
/// Expecting `psycopg2` for Postgres or `sqlite3` for `SQLite3`, but may be an arbitrary string and future versions
/// of Synapse may support other database drivers, e.g. psycopg3.
/// Expecting `psycopg2` for Postgres or `sqlite3` for `SQLite3`, but may be
/// an arbitrary string and future versions of Synapse may support other
/// database drivers, e.g. psycopg3.
pub name: String,
#[serde(default)]
pub args: DatabaseArgsSuboption,
@@ -133,12 +136,14 @@ pub const SYNAPSE_DATABASE_DRIVER_NAME_SQLITE3: &str = "sqlite3";
impl DatabaseSection {
/// Process the configuration into Postgres connection options.
///
/// Environment variables and libpq defaults will be used as fallback for any missing values;
/// this should match what Synapse does.
/// But note that if syn2mas is not run in the same context (host, user, environment variables)
/// as Synapse normally runs, then the connection options may not be valid.
/// Environment variables and libpq defaults will be used as fallback for
/// any missing values; this should match what Synapse does.
/// But note that if syn2mas is not run in the same context (host, user,
/// environment variables) as Synapse normally runs, then the connection
/// options may not be valid.
///
/// Returns `None` if this database configuration is not configured for Postgres.
/// Returns `None` if this database configuration is not configured for
/// Postgres.
#[must_use]
pub fn to_sqlx_postgres(&self) -> Option<PgConnectOptions> {
if self.name != SYNAPSE_DATABASE_DRIVER_NAME_PSYCOPG2 {
@@ -167,7 +172,8 @@ impl DatabaseSection {
}
/// The `args` suboption of the `database` section of the Synapse configuration.
/// This struct assumes Postgres is in use and does not represent fields used by SQLite.
/// This struct assumes Postgres is in use and does not represent fields used by
/// SQLite.
#[derive(Deserialize, Default)]
pub struct DatabaseArgsSuboption {
pub user: Option<String>,
@@ -199,7 +205,8 @@ impl Default for PasswordSection {
}
}
/// A section that we only care about whether it's enabled or not, but is not enabled by default.
/// A section that we only care about whether it's enabled or not, but is not
/// enabled by default.
#[derive(Default, Deserialize)]
pub struct EnableableSection {
#[serde(default)]
@@ -208,11 +215,12 @@ pub struct EnableableSection {
#[derive(Clone, Deserialize)]
pub struct OidcProvider {
/// At least for `oidc_config`, if the dict is present but left empty then the config should be ignored,
/// so this field must be optional.
/// At least for `oidc_config`, if the dict is present but left empty then
/// the config should be ignored, so this field must be optional.
pub issuer: Option<String>,
/// Required, except for the old `oidc_config` where this is implied to be "oidc".
/// Required, except for the old `oidc_config` where this is implied to be
/// "oidc".
pub idp_id: Option<String>,
}

View File

@@ -5,7 +5,8 @@
//! # Synapse Database Reader
//!
//! This module provides facilities for streaming relevant types of database records from a Synapse database.
//! This module provides facilities for streaming relevant types of database
//! records from a Synapse database.
use chrono::{DateTime, Utc};
use futures_util::{Stream, TryStreamExt};
@@ -46,13 +47,13 @@ pub enum ExtractLocalpartError {
}
impl FullUserId {
/// Extract the localpart from the User ID, asserting that the User ID has the correct
/// server name.
/// Extract the localpart from the User ID, asserting that the User ID has
/// the correct server name.
///
/// # Errors
///
/// A handful of basic validity checks are performed and an error may be returned
/// if the User ID is not valid.
/// A handful of basic validity checks are performed and an error may be
/// returned if the User ID is not valid.
/// However, the User ID grammar is not checked fully.
///
/// If the wrong server name is asserted, returns an error.
@@ -80,8 +81,8 @@ impl FullUserId {
}
/// A Synapse boolean.
/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite versions
/// that did not have native boolean support.
/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite
/// versions that did not have native boolean support.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct SynapseBool(bool);
@@ -107,8 +108,8 @@ impl From<SynapseBool> for bool {
}
/// A timestamp stored as the number of seconds since the Unix epoch.
/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** since the Unix epoch.
/// But some timestamps are still stored in seconds.
/// Note that Synapse stores MOST timestamps as numbers of **milliseconds**
/// since the Unix epoch. But some timestamps are still stored in seconds.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct SecondsTimestamp(DateTime<Utc>);
@@ -122,9 +123,9 @@ impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp {
fn decode(
value: <Postgres as sqlx::Database>::ValueRef<'r>,
) -> Result<Self, sqlx::error::BoxDynError> {
<i64 as sqlx::Decode<Postgres>>::decode(value).map(|milliseconds_since_epoch| {
<i64 as sqlx::Decode<Postgres>>::decode(value).map(|seconds_since_epoch| {
SecondsTimestamp(DateTime::from_timestamp_nanos(
milliseconds_since_epoch * 1_000_000_000,
seconds_since_epoch * 1_000_000_000,
))
})
}
@@ -136,11 +137,41 @@ impl sqlx::Type<Postgres> for SecondsTimestamp {
}
}
/// A timestamp stored as the number of milliseconds since the Unix epoch.
/// Note that Synapse stores some timestamps in seconds.
#[derive(Copy, Clone, Debug)]
pub struct MillisecondsTimestamp(DateTime<Utc>);
impl From<MillisecondsTimestamp> for DateTime<Utc> {
fn from(MillisecondsTimestamp(value): MillisecondsTimestamp) -> Self {
value
}
}
impl<'r> sqlx::Decode<'r, Postgres> for MillisecondsTimestamp {
fn decode(
value: <Postgres as sqlx::Database>::ValueRef<'r>,
) -> Result<Self, sqlx::error::BoxDynError> {
<i64 as sqlx::Decode<Postgres>>::decode(value).map(|milliseconds_since_epoch| {
MillisecondsTimestamp(DateTime::from_timestamp_nanos(
milliseconds_since_epoch * 1_000_000,
))
})
}
}
impl sqlx::Type<Postgres> for MillisecondsTimestamp {
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
<i64 as sqlx::Type<Postgres>>::type_info()
}
}
#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)]
pub struct SynapseUser {
/// Full User ID of the user
pub name: FullUserId,
/// Password hash string for the user. Optional (null if no password is set).
/// Password hash string for the user. Optional (null if no password is
/// set).
pub password_hash: Option<String>,
/// Whether the user is a Synapse Admin
pub admin: SynapseBool,
@@ -153,10 +184,20 @@ pub struct SynapseUser {
// TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts)
}
/// Row of the `user_threepids` table in Synapse.
#[derive(Clone, Debug, FromRow)]
pub struct SynapseThreepid {
pub user_id: FullUserId,
pub medium: String,
pub address: String,
pub added_at: MillisecondsTimestamp,
}
/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on.
///
/// This is a safety measure against other processes changing the data underneath our feet.
/// It's still not a good idea to run Synapse at the same time as the migration.
/// This is a safety measure against other processes changing the data
/// underneath our feet. It's still not a good idea to run Synapse at the same
/// time as the migration.
// TODO not complete!
const TABLES_TO_LOCK: &[&str] = &["users"];
@@ -172,14 +213,16 @@ pub struct SynapseReader<'c> {
}
impl<'conn> SynapseReader<'conn> {
/// Create a new Synapse reader, which entails creating a transaction and locking Synapse tables.
/// Create a new Synapse reader, which entails creating a transaction and
/// locking Synapse tables.
///
/// # Errors
///
/// Errors are returned under the following circumstances:
///
/// - An underlying database error
/// - If we can't lock the Synapse tables (pointing to the fact that Synapse may still be running)
/// - If we can't lock the Synapse tables (pointing to the fact that Synapse
/// may still be running)
pub async fn new(
synapse_connection: &'conn mut PgConnection,
dry_run: bool,
@@ -224,7 +267,8 @@ impl<'conn> SynapseReader<'conn> {
Ok(())
}
/// Counts the rows in the Synapse database to get an estimate of how large the migration is going to be.
/// Counts the rows in the Synapse database to get an estimate of how large
/// the migration is going to be.
///
/// # Errors
///
@@ -247,7 +291,8 @@ impl<'conn> SynapseReader<'conn> {
Ok(SynapseRowCounts { users })
}
/// Reads Synapse users, excluding application service users (which do not need to be migrated), from the database.
/// Reads Synapse users, excluding application service users (which do not
/// need to be migrated), from the database.
pub fn read_users(&mut self) -> impl Stream<Item = Result<SynapseUser, Error>> + '_ {
sqlx::query_as(
"
@@ -260,6 +305,20 @@ impl<'conn> SynapseReader<'conn> {
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse users"))
}
/// Reads threepids (such as e-mail and phone number associations) from
/// Synapse.
pub fn read_threepids(&mut self) -> impl Stream<Item = Result<SynapseThreepid, Error>> + '_ {
sqlx::query_as(
"
SELECT
user_id, medium, address, added_at
FROM user_threepids
",
)
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse threepids"))
}
}
#[cfg(test)]