From 09e619e9a01860c4f58ef29901ecd4b36df6a158 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 28 Jan 2025 20:37:46 +0000 Subject: [PATCH 01/11] Add `SynapseReader` support for access tokens and refresh tokens --- .../fixtures/access_token_alice.sql | 14 ++ .../access_token_alice_with_puppet.sql | 16 ++ .../access_token_alice_with_refresh_token.sql | 56 ++++++ ..._token_alice_with_unused_refresh_token.sql | 56 ++++++ crates/syn2mas/src/synapse_reader/mod.rs | 161 +++++++++++++++++- ...est__read_access_and_refresh_tokens-2.snap | 14 ++ ..._test__read_access_and_refresh_tokens.snap | 20 +++ ...ad_access_and_unused_refresh_tokens-2.snap | 22 +++ ...read_access_and_unused_refresh_tokens.snap | 34 ++++ ...napse_reader__test__read_access_token.snap | 18 ++ ...250128201100_access_and_refresh_tokens.sql | 28 +++ 11 files changed, 438 insertions(+), 1 deletion(-) create mode 100644 crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql create mode 100644 crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql create mode 100644 crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql create mode 100644 crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql create mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap create mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap create mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap create mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap create mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap create mode 100644 crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql new file mode 100644 index 000000000..d9f9a4a7b --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql @@ -0,0 +1,14 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token + ) + VALUES + ( + 42, + '@alice:example.com', + 'ADEVICE', + 'syt_aaaaaaaaaaaaaa_aaaa' + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql new file mode 100644 index 000000000..6bdfb0d9c --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql @@ -0,0 +1,16 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token, + puppets_user_id + ) + VALUES + ( + 42, + '@alice:example.com', + NULL, + 'syt_pupupupupup_eett', + '@bob:example.com' + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql new file mode 100644 index 000000000..554ae4458 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql @@ -0,0 +1,56 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token, + refresh_token_id, + used + ) + VALUES + ( + 42, + '@alice:example.com', + 'ADEVICE', + 'syt_aaaaaaaaaaaaaa_aaaa', + 7, + TRUE + ), + ( + 43, + '@alice:example.com', + 'ADEVICE', + 'syt_AAAAAAAAAAAAAA_AAAA', + 8, + TRUE + ); + +INSERT INTO refresh_tokens + ( + id, + user_id, + device_id, + token, + next_token_id, + expiry_ts, + ultimate_session_expiry_ts + ) + VALUES + ( + 7, + '@alice:example.com', + 'ADEVICE', + 'syr_bbbbbbbbbbbbb_bbbb', + 8, + 1738096199000, + 1778096199000 + ), + ( + 8, + '@alice:example.com', + 'ADEVICE', + 'syr_cccccccccccc_cccc', + NULL, + 1748096199000, + 1778096199000 + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql new file mode 100644 index 000000000..42bfddf01 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql @@ -0,0 +1,56 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token, + refresh_token_id, + used + ) + VALUES + ( + 42, + '@alice:example.com', + 'ADEVICE', + 'syt_aaaaaaaaaaaaaa_aaaa', + 7, + TRUE + ), + ( + 43, + '@alice:example.com', + 'ADEVICE', + 'syt_AAAAAAAAAAAAAA_AAAA', + 8, + FALSE + ); + +INSERT INTO refresh_tokens + ( + id, + user_id, + device_id, + token, + next_token_id, + expiry_ts, + ultimate_session_expiry_ts + ) + VALUES + ( + 7, + '@alice:example.com', + 'ADEVICE', + 'syr_bbbbbbbbbbbbb_bbbb', + 8, + 1738096199000, + 1778096199000 + ), + ( + 8, + '@alice:example.com', + 'ADEVICE', + 'syr_cccccccccccc_cccc', + NULL, + 1748096199000, + 1778096199000 + ); diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 7f3b28784..7fe138e2b 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -209,6 +209,26 @@ pub struct SynapseExternalId { pub external_id: String, } +/// Row of the `access_tokens` table in Synapse. +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseAccessToken { + pub user_id: FullUserId, + pub device_id: Option, + pub token: String, + pub valid_until_ms: Option, + pub last_validated: Option, + pub refresh_token_id: Option, +} + +/// Row of the `refresh_tokens` table in Synapse. +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseRefreshToken { + pub id: i64, + pub user_id: FullUserId, + pub device_id: String, + pub token: String, +} + /// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. /// /// This is a safety measure against other processes changing the data @@ -350,6 +370,54 @@ impl<'conn> SynapseReader<'conn> { .fetch(&mut *self.txn) .map_err(|err| err.into_database("reading Synapse user external IDs")) } + + /// Reads access tokens from the Synapse database. + /// This does not include access tokens used for puppetting users, as those + /// are not supported by MAS. This also does not include access tokens + /// which have been made obsolete by using the associated refresh token + /// and then acknowledging the successor access token by using it to + /// authenticate a request. + pub fn read_access_tokens( + &mut self, + ) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated, at0.refresh_token_id + FROM access_tokens at0 + LEFT JOIN refresh_tokens rt0 ON at0.refresh_token_id = rt0.id + LEFT JOIN access_tokens at1 ON rt0.next_token_id = at1.refresh_token_id + WHERE at0.puppets_user_id IS NULL AND (NOT at1.used OR at1.used IS NULL) + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse access tokens")) + } + + /// Reads refresh tokens from the Synapse database. + /// This also does not include refresh tokens which have been made obsolete + /// by using the refresh token and then acknowledging the + /// successor access token by using it to authenticate a request. + /// + /// The `expiry_ts` and `ultimate_session_expiry_ts` columns are ignored as + /// they are not implemented in MAS. + /// Further, they are unused by any real-world deployment to the best of + /// our knowledge. + pub fn read_refresh_tokens( + &mut self, + ) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + rt0.id, rt0.user_id, rt0.device_id, rt0.token, rt0.next_token_id + FROM refresh_tokens rt0 + LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id + WHERE NOT at1.used OR at1.used IS NULL + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse refresh tokens")) + } } #[cfg(test)] @@ -361,7 +429,10 @@ mod test { use sqlx::{migrate::Migrator, PgPool}; use crate::{ - synapse_reader::{SynapseExternalId, SynapseThreepid, SynapseUser}, + synapse_reader::{ + SynapseAccessToken, SynapseExternalId, SynapseRefreshToken, SynapseThreepid, + SynapseUser, + }, SynapseReader, }; @@ -415,4 +486,92 @@ mod test { assert_debug_snapshot!(external_ids); } + + #[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "access_token_alice"))] + async fn test_read_access_token(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + assert_debug_snapshot!(access_tokens); + } + + /// Tests that puppetting access tokens are ignored. + #[sqlx::test( + migrator = "MIGRATOR", + fixtures("user_alice", "access_token_alice_with_puppet") + )] + async fn test_read_access_token_puppet(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + assert!(access_tokens.is_empty()); + } + + #[sqlx::test( + migrator = "MIGRATOR", + fixtures("user_alice", "access_token_alice_with_refresh_token") + )] + async fn test_read_access_and_refresh_tokens(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + let refresh_tokens: BTreeSet = reader + .read_refresh_tokens() + .try_collect() + .await + .expect("failed to read Synapse refresh tokens"); + + assert_debug_snapshot!(access_tokens); + assert_debug_snapshot!(refresh_tokens); + } + + #[sqlx::test( + migrator = "MIGRATOR", + fixtures("user_alice", "access_token_alice_with_unused_refresh_token") + )] + async fn test_read_access_and_unused_refresh_tokens(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + let refresh_tokens: BTreeSet = reader + .read_refresh_tokens() + .try_collect() + .await + .expect("failed to read Synapse refresh tokens"); + + assert_debug_snapshot!(access_tokens); + assert_debug_snapshot!(refresh_tokens); + } } diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap new file mode 100644 index 000000000..d3ebc062e --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap @@ -0,0 +1,14 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: refresh_tokens +--- +{ + SynapseRefreshToken { + id: 8, + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + token: "syr_cccccccccccc_cccc", + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap new file mode 100644 index 000000000..6bcaf42d6 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap @@ -0,0 +1,20 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: access_tokens +--- +{ + SynapseAccessToken { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: Some( + "ADEVICE", + ), + token: "syt_AAAAAAAAAAAAAA_AAAA", + valid_until_ms: None, + last_validated: None, + refresh_token_id: Some( + 8, + ), + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap new file mode 100644 index 000000000..e26bf9c0f --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap @@ -0,0 +1,22 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: refresh_tokens +--- +{ + SynapseRefreshToken { + id: 7, + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + token: "syr_bbbbbbbbbbbbb_bbbb", + }, + SynapseRefreshToken { + id: 8, + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + token: "syr_cccccccccccc_cccc", + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap new file mode 100644 index 000000000..0c84d8f6b --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap @@ -0,0 +1,34 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: access_tokens +--- +{ + SynapseAccessToken { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: Some( + "ADEVICE", + ), + token: "syt_AAAAAAAAAAAAAA_AAAA", + valid_until_ms: None, + last_validated: None, + refresh_token_id: Some( + 8, + ), + }, + SynapseAccessToken { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: Some( + "ADEVICE", + ), + token: "syt_aaaaaaaaaaaaaa_aaaa", + valid_until_ms: None, + last_validated: None, + refresh_token_id: Some( + 7, + ), + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap new file mode 100644 index 000000000..1e761d446 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap @@ -0,0 +1,18 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: access_tokens +--- +{ + SynapseAccessToken { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: Some( + "ADEVICE", + ), + token: "syt_aaaaaaaaaaaaaa_aaaa", + valid_until_ms: None, + last_validated: None, + refresh_token_id: None, + }, +} diff --git a/crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql b/crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql new file mode 100644 index 000000000..fef25bbbb --- /dev/null +++ b/crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql @@ -0,0 +1,28 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Brings in the `access_tokens` and `refresh_tokens` tables from Synapse + +CREATE TABLE access_tokens ( + id bigint NOT NULL, + user_id text NOT NULL, + device_id text, + token text NOT NULL, + valid_until_ms bigint, + puppets_user_id text, + last_validated bigint, + refresh_token_id bigint, + used boolean +); + +CREATE TABLE refresh_tokens ( + id bigint NOT NULL, + user_id text NOT NULL, + device_id text NOT NULL, + token text NOT NULL, + next_token_id bigint, + expiry_ts bigint, + ultimate_session_expiry_ts bigint +); From 221ab042c67c69e6b955f519281b3b4b1442e8f3 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 29 Jan 2025 13:26:38 +0000 Subject: [PATCH 02/11] When consuming a compat refresh token, consume others in the session --- ...9a111f3f6b468a91bf0d5b574795bf8c80605f19.json} | 4 ++-- crates/storage-pg/src/compat/refresh_token.rs | 15 ++++++++++++--- crates/storage/src/compat/refresh_token.rs | 8 +++++++- 3 files changed, 21 insertions(+), 6 deletions(-) rename crates/storage-pg/.sqlx/{query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json => query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json} (53%) diff --git a/crates/storage-pg/.sqlx/query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json b/crates/storage-pg/.sqlx/query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json similarity index 53% rename from crates/storage-pg/.sqlx/query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json rename to crates/storage-pg/.sqlx/query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json index fc2a0d3ae..0d597ca3b 100644 --- a/crates/storage-pg/.sqlx/query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json +++ b/crates/storage-pg/.sqlx/query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE compat_refresh_tokens\n SET consumed_at = $2\n WHERE compat_refresh_token_id = $1\n ", + "query": "\n UPDATE compat_refresh_tokens\n SET consumed_at = $2\n WHERE compat_session_id = $1\n AND consumed_at IS NULL\n ", "describe": { "columns": [], "parameters": { @@ -11,5 +11,5 @@ }, "nullable": [] }, - "hash": "d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4" + "hash": "f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19" } diff --git a/crates/storage-pg/src/compat/refresh_token.rs b/crates/storage-pg/src/compat/refresh_token.rs index 70e3f109c..2c2939c54 100644 --- a/crates/storage-pg/src/compat/refresh_token.rs +++ b/crates/storage-pg/src/compat/refresh_token.rs @@ -204,16 +204,25 @@ impl CompatRefreshTokenRepository for PgCompatRefreshTokenRepository<'_> { r#" UPDATE compat_refresh_tokens SET consumed_at = $2 - WHERE compat_refresh_token_id = $1 + WHERE compat_session_id = $1 + AND consumed_at IS NULL "#, - Uuid::from(compat_refresh_token.id), + Uuid::from(compat_refresh_token.session_id), consumed_at, ) .traced() .execute(&mut *self.conn) .await?; - DatabaseError::ensure_affected_rows(&res, 1)?; + // This can affect multiple rows in case we've imported refresh tokens + // from Synapse. What we care about is that it at least affected one, + // which is what we're checking here + if res.rows_affected() == 0 { + return Err(DatabaseError::RowsAffected { + expected: 1, + actual: 0, + }); + } let compat_refresh_token = compat_refresh_token .consume(consumed_at) diff --git a/crates/storage/src/compat/refresh_token.rs b/crates/storage/src/compat/refresh_token.rs index c0b3029c0..4bcbd4d55 100644 --- a/crates/storage/src/compat/refresh_token.rs +++ b/crates/storage/src/compat/refresh_token.rs @@ -69,7 +69,13 @@ pub trait CompatRefreshTokenRepository: Send + Sync { token: String, ) -> Result; - /// Consume a compat refresh token + /// Consume a compat refresh token. + /// + /// This also marks other refresh tokens in the same session as consumed. + /// This is desirable because the syn2mas migration process can import + /// multiple refresh tokens for one device (compat session). + /// But once the user uses one of those, the others should no longer + /// be valid. /// /// Returns the consumed compat refresh token /// From c1493a6521e307c1943688a165487a6eef955cdc Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 29 Jan 2025 14:00:27 +0000 Subject: [PATCH 03/11] Add stubs for migrating devices, access tokens and refresh tokens --- crates/syn2mas/src/migration.rs | 61 +++++++++++++++++++++++- crates/syn2mas/src/synapse_reader/mod.rs | 28 ++++++++--- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 250db90f2..9155ccf65 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -68,6 +68,12 @@ struct UsersMigrated { user_localparts_to_uuid: HashMap, } +struct DevicesMigrated { + /// Lookup table from `(user_id, device_id)` pairs to the + /// UUID of the `compat_session` in MAS + devices_to_uuid: HashMap<(CompactString, CompactString), Uuid>, +} + /// Performs a migration from Synapse's database to MAS' database. /// /// # Panics @@ -96,7 +102,7 @@ pub async fn migrate( counts .users .try_into() - .expect("More than usize::MAX users — wow!"), + .expect("More than usize::MAX users — unable to handle this many!"), server_name, rng, ) @@ -121,6 +127,29 @@ pub async fn migrate( ) .await?; + let migrated_devices = migrate_devices( + synapse, + mas, + counts + .devices + .try_into() + .expect("More than usize::MAX devices — unable to handle this many!"), + server_name, + rng, + &migrated_users.user_localparts_to_uuid, + ) + .await?; + + migrate_access_and_refresh_tokens( + synapse, + mas, + server_name, + rng, + &migrated_users.user_localparts_to_uuid, + &migrated_devices.devices_to_uuid, + ) + .await?; + Ok(()) } @@ -312,6 +341,36 @@ async fn migrate_external_ids( Ok(()) } +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_devices( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + device_count_hint: usize, + server_name: &str, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, +) -> Result { + // TODO is 1:1 enough capacity for a HashMap? + let mut devices_to_uuid = HashMap::with_capacity(device_count_hint); + + todo!(); + + Ok(DevicesMigrated { devices_to_uuid }) +} + +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_access_and_refresh_tokens( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, + devices: &HashMap<(CompactString, CompactString), Uuid>, +) -> Result<(), Error> { + todo!(); + + Ok(()) +} fn transform_user( user: &SynapseUser, diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 7fe138e2b..9aa9bf754 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -235,13 +235,21 @@ pub struct SynapseRefreshToken { /// underneath our feet. It's still not a good idea to run Synapse at the same /// time as the migration. // TODO not complete! -const TABLES_TO_LOCK: &[&str] = &["users", "user_threepids", "user_external_ids"]; +const TABLES_TO_LOCK: &[&str] = &[ + "users", + "user_threepids", + "user_external_ids", + "devices", + "access_tokens", + "refresh_tokens", +]; /// Number of migratable rows in various Synapse tables. /// Used to estimate progress. #[derive(Clone, Debug)] pub struct SynapseRowCounts { pub users: i64, + pub devices: i64, } pub struct SynapseReader<'c> { @@ -312,7 +320,7 @@ impl<'conn> SynapseReader<'conn> { /// /// - An underlying database error pub async fn count_rows(&mut self) -> Result { - let users = sqlx::query( + let users: i64 = sqlx::query_scalar( " SELECT COUNT(1) FROM users WHERE appservice_id IS NULL AND is_guest = 0 @@ -320,11 +328,19 @@ impl<'conn> SynapseReader<'conn> { ) .fetch_one(&mut *self.txn) .await - .into_database("counting Synapse users")? - .try_get::(0) - .into_database("couldn't decode count of Synapse users table")?; + .into_database("counting Synapse users")?; - Ok(SynapseRowCounts { users }) + let devices = sqlx::query_scalar( + " + SELECT COUNT(1) FROM devices + WHERE NOT hidden + ", + ) + .fetch_one(&mut *self.txn) + .await + .into_database("counting Synapse devices")?; + + Ok(SynapseRowCounts { users, devices }) } /// Reads Synapse users, excluding application service users (which do not From 719681f3e7c4e7f6e9d5ef12e52e69b52395f924 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 29 Jan 2025 14:15:38 +0000 Subject: [PATCH 04/11] Add `SynapseReader` support for devices --- .../synapse_reader/fixtures/devices_alice.sql | 38 +++++++++++++++ crates/syn2mas/src/synapse_reader/mod.rs | 47 ++++++++++++++++++- ...s__synapse_reader__test__read_devices.snap | 26 ++++++++++ .../20250129140230_devices.sql | 15 ++++++ 4 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql create mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap create mode 100644 crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql diff --git a/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql new file mode 100644 index 000000000..c7f0691d6 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql @@ -0,0 +1,38 @@ +INSERT INTO devices + ( + user_id, + device_id, + display_name, + last_seen, + ip, + user_agent, + hidden + ) + VALUES + ( + '@alice:example.com', + 'ADEVICE', + 'Matrix Console', + 1623366000000, + '203.0.113.1', + 'Browser/5.0 (X12; ComputerOS 64; rv:1024.0)', + FALSE + ), + ( + '@alice:example.com', + 'master signing key', + NULL, + NULL, + NULL, + NULL, + TRUE + ), + ( + '@alice:example.com', + 'self_signing signing key', + NULL, + NULL, + NULL, + NULL, + TRUE + ); diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 9aa9bf754..bad012a1f 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -209,6 +209,17 @@ pub struct SynapseExternalId { pub external_id: String, } +/// Row of the `devices` table in Synapse. +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseDevice { + pub user_id: FullUserId, + pub device_id: String, + pub display_name: Option, + pub last_seen: Option, + pub ip: Option, + pub user_agent: Option, +} + /// Row of the `access_tokens` table in Synapse. #[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] pub struct SynapseAccessToken { @@ -387,6 +398,22 @@ impl<'conn> SynapseReader<'conn> { .map_err(|err| err.into_database("reading Synapse user external IDs")) } + /// Reads devices from the Synapse database. + /// Does not include so-called 'hidden' devices, which are just a mechanism + /// for storing various signing keys shared between the real devices. + pub fn read_devices(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + user_id, device_id, display_name, last_seen, ip, user_agent + FROM devices + WHERE NOT hidden + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse devices")) + } + /// Reads access tokens from the Synapse database. /// This does not include access tokens used for puppetting users, as those /// are not supported by MAS. This also does not include access tokens @@ -446,8 +473,8 @@ mod test { use crate::{ synapse_reader::{ - SynapseAccessToken, SynapseExternalId, SynapseRefreshToken, SynapseThreepid, - SynapseUser, + SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshToken, + SynapseThreepid, SynapseUser, }, SynapseReader, }; @@ -503,6 +530,22 @@ mod test { assert_debug_snapshot!(external_ids); } + #[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "devices_alice"))] + async fn test_read_devices(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let devices: BTreeSet = reader + .read_devices() + .try_collect() + .await + .expect("failed to read Synapse devices"); + + assert_debug_snapshot!(devices); + } + #[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "access_token_alice"))] async fn test_read_access_token(pool: PgPool) { let mut conn = pool.acquire().await.expect("failed to get connection"); diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap new file mode 100644 index 000000000..a8ca1dd61 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap @@ -0,0 +1,26 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: devices +--- +{ + SynapseDevice { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + display_name: Some( + "Matrix Console", + ), + last_seen: Some( + MillisecondsTimestamp( + 2021-06-10T23:00:00Z, + ), + ), + ip: Some( + "203.0.113.1", + ), + user_agent: Some( + "Browser/5.0 (X12; ComputerOS 64; rv:1024.0)", + ), + }, +} diff --git a/crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql b/crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql new file mode 100644 index 000000000..8f9ae723b --- /dev/null +++ b/crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql @@ -0,0 +1,15 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Brings in the `devices` table from Synapse +CREATE TABLE devices ( + user_id text NOT NULL, + device_id text NOT NULL, + display_name text, + last_seen bigint, + ip text, + user_agent text, + hidden boolean DEFAULT false +); From 3034819b7d49318dbeb574fa1ca28373a183f578 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 29 Jan 2025 16:17:07 +0000 Subject: [PATCH 05/11] Introduce optional `human_name` column on `compat_sessions` --- crates/data-model/src/compat/session.rs | 1 + ...9cb16f9f8df08fa258cc907007fb9bcd0bc7.json} | 24 ++++++++++++------- ...0129154003_compat_sessions_device_name.sql | 9 +++++++ crates/storage-pg/src/app_session.rs | 8 +++++++ crates/storage-pg/src/compat/session.rs | 10 ++++++++ crates/storage-pg/src/iden.rs | 1 + 6 files changed, 44 insertions(+), 9 deletions(-) rename crates/storage-pg/.sqlx/{query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json => query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json} (67%) create mode 100644 crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql diff --git a/crates/data-model/src/compat/session.rs b/crates/data-model/src/compat/session.rs index e07c0fb7d..fc660c3f0 100644 --- a/crates/data-model/src/compat/session.rs +++ b/crates/data-model/src/compat/session.rs @@ -72,6 +72,7 @@ pub struct CompatSession { pub state: CompatSessionState, pub user_id: Ulid, pub device: Option, + pub human_name: Option, pub user_session_id: Option, pub created_at: DateTime, pub is_synapse_admin: bool, diff --git a/crates/storage-pg/.sqlx/query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json b/crates/storage-pg/.sqlx/query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json similarity index 67% rename from crates/storage-pg/.sqlx/query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json rename to crates/storage-pg/.sqlx/query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json index 1360f4ba8..e6b0897f7 100644 --- a/crates/storage-pg/.sqlx/query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json +++ b/crates/storage-pg/.sqlx/query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT compat_session_id\n , device_id\n , user_id\n , user_session_id\n , created_at\n , finished_at\n , is_synapse_admin\n , user_agent\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM compat_sessions\n WHERE compat_session_id = $1\n ", + "query": "\n SELECT compat_session_id\n , device_id\n , human_name\n , user_id\n , user_session_id\n , created_at\n , finished_at\n , is_synapse_admin\n , user_agent\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM compat_sessions\n WHERE compat_session_id = $1\n ", "describe": { "columns": [ { @@ -15,41 +15,46 @@ }, { "ordinal": 2, + "name": "human_name", + "type_info": "Text" + }, + { + "ordinal": 3, "name": "user_id", "type_info": "Uuid" }, { - "ordinal": 3, + "ordinal": 4, "name": "user_session_id", "type_info": "Uuid" }, { - "ordinal": 4, + "ordinal": 5, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 5, + "ordinal": 6, "name": "finished_at", "type_info": "Timestamptz" }, { - "ordinal": 6, + "ordinal": 7, "name": "is_synapse_admin", "type_info": "Bool" }, { - "ordinal": 7, + "ordinal": 8, "name": "user_agent", "type_info": "Text" }, { - "ordinal": 8, + "ordinal": 9, "name": "last_active_at", "type_info": "Timestamptz" }, { - "ordinal": 9, + "ordinal": 10, "name": "last_active_ip: IpAddr", "type_info": "Inet" } @@ -62,6 +67,7 @@ "nullable": [ false, true, + true, false, true, false, @@ -72,5 +78,5 @@ true ] }, - "hash": "bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9" + "hash": "9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7" } diff --git a/crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql b/crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql new file mode 100644 index 000000000..28294d5db --- /dev/null +++ b/crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql @@ -0,0 +1,9 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +ALTER TABLE compat_sessions + -- Stores a human-readable name for the device. + -- syn2mas behaviour: Will be populated from the device name in Synapse. + ADD COLUMN human_name TEXT; diff --git a/crates/storage-pg/src/app_session.rs b/crates/storage-pg/src/app_session.rs index 7a71036c9..b812e37e9 100644 --- a/crates/storage-pg/src/app_session.rs +++ b/crates/storage-pg/src/app_session.rs @@ -64,6 +64,7 @@ mod priv_ { pub(super) user_id: Option, pub(super) scope_list: Option>, pub(super) device_id: Option, + pub(super) human_name: Option, pub(super) created_at: DateTime, pub(super) finished_at: Option>, pub(super) is_synapse_admin: Option, @@ -91,6 +92,7 @@ impl TryFrom for AppSession { user_id, scope_list, device_id, + human_name, created_at, finished_at, is_synapse_admin, @@ -141,6 +143,7 @@ impl TryFrom for AppSession { state, user_id: user_id.into(), device, + human_name, user_session_id, created_at, is_synapse_admin, @@ -294,6 +297,7 @@ impl AppSessionRepository for PgAppSessionRepository<'_> { AppSessionLookupIden::ScopeList, ) .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId) + .expr_as(Expr::cust("NULL"), AppSessionLookupIden::HumanName) .expr_as( Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)), AppSessionLookupIden::CreatedAt, @@ -343,6 +347,10 @@ impl AppSessionRepository for PgAppSessionRepository<'_> { Expr::col((CompatSessions::Table, CompatSessions::DeviceId)), AppSessionLookupIden::DeviceId, ) + .expr_as( + Expr::col((CompatSessions::Table, CompatSessions::HumanName)), + AppSessionLookupIden::HumanName, + ) .expr_as( Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)), AppSessionLookupIden::CreatedAt, diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index de5ea6b2f..7d9fa1264 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -48,6 +48,7 @@ impl<'c> PgCompatSessionRepository<'c> { struct CompatSessionLookup { compat_session_id: Uuid, device_id: Option, + human_name: Option, user_id: Uuid, user_session_id: Option, created_at: DateTime, @@ -85,6 +86,7 @@ impl TryFrom for CompatSession { user_id: value.user_id.into(), user_session_id: value.user_session_id.map(Ulid::from), device, + human_name: value.human_name, created_at: value.created_at, is_synapse_admin: value.is_synapse_admin, user_agent: value.user_agent.map(UserAgent::parse), @@ -101,6 +103,7 @@ impl TryFrom for CompatSession { struct CompatSessionAndSsoLoginLookup { compat_session_id: Uuid, device_id: Option, + human_name: Option, user_id: Uuid, user_session_id: Option, created_at: DateTime, @@ -143,6 +146,7 @@ impl TryFrom for (CompatSession, Option { r#" SELECT compat_session_id , device_id + , human_name , user_id , user_session_id , created_at @@ -356,6 +361,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { state: CompatSessionState::default(), user_id: user.id, device: Some(device), + human_name: None, user_session_id: browser_session.map(|s| s.id), created_at, is_synapse_admin, @@ -453,6 +459,10 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { Expr::col((CompatSessions::Table, CompatSessions::DeviceId)), CompatSessionAndSsoLoginLookupIden::DeviceId, ) + .expr_as( + Expr::col((CompatSessions::Table, CompatSessions::HumanName)), + CompatSessionAndSsoLoginLookupIden::HumanName, + ) .expr_as( Expr::col((CompatSessions::Table, CompatSessions::UserId)), CompatSessionAndSsoLoginLookupIden::UserId, diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index 8393cb247..951764806 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -43,6 +43,7 @@ pub enum CompatSessions { CompatSessionId, UserId, DeviceId, + HumanName, UserSessionId, CreatedAt, FinishedAt, From 0cd8106624fdc8834ecda1cf27a793ad701bb567 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 29 Jan 2025 15:37:09 +0000 Subject: [PATCH 06/11] Add `MasWriter` support for compat sessions --- ...e36f4ef03e1224a0a89a921e5a3d398a5d35c.json | 22 +++ crates/syn2mas/src/mas_writer/mod.rs | 134 ++++++++++++++- ..._writer__test__write_user_with_device.snap | 23 +++ .../syn2mas_revert_temporary_tables.sql | 3 + .../mas_writer/syn2mas_temporary_tables.sql | 3 + crates/syn2mas/src/migration.rs | 157 ++++++++++++++---- 6 files changed, 310 insertions(+), 32 deletions(-) create mode 100644 crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json create mode 100644 crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap diff --git a/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json b/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json new file mode 100644 index 000000000..521e4facd --- /dev/null +++ b/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray", + "BoolArray", + "TimestamptzArray", + "InetArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c" +} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index f46a10399..f54405933 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -7,7 +7,7 @@ //! //! This module is responsible for writing new records to MAS' database. -use std::fmt::Display; +use std::{fmt::Display, net::IpAddr}; use chrono::{DateTime, Utc}; use futures_util::{future::BoxFuture, FutureExt, TryStreamExt}; @@ -230,6 +230,18 @@ pub struct MasNewUpstreamOauthLink { pub created_at: DateTime, } +pub struct MasNewCompatSession { + pub session_id: Uuid, + pub user_id: Uuid, + pub device_id: String, + pub human_name: Option, + pub created_at: DateTime, + pub is_synapse_admin: bool, + pub last_active_at: Option>, + pub last_active_ip: Option, + pub user_agent: Option, +} + /// The 'version' of the password hashing scheme used for passwords when they /// are migrated from Synapse to MAS. /// This is version 1, as in the previous syn2mas script. @@ -761,6 +773,85 @@ impl<'conn> MasWriter<'conn> { }) }).boxed() } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn write_compat_sessions( + &mut self, + sessions: Vec, + ) -> BoxFuture<'_, Result<(), Error>> { + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + let mut session_ids: Vec = Vec::with_capacity(sessions.len()); + let mut user_ids: Vec = Vec::with_capacity(sessions.len()); + let mut device_ids: Vec = Vec::with_capacity(sessions.len()); + let mut human_names: Vec> = Vec::with_capacity(sessions.len()); + let mut created_ats: Vec> = Vec::with_capacity(sessions.len()); + let mut is_synapse_admins: Vec = Vec::with_capacity(sessions.len()); + let mut last_active_ats: Vec>> = + Vec::with_capacity(sessions.len()); + let mut last_active_ips: Vec> = + Vec::with_capacity(sessions.len()); + let mut user_agents: Vec> = Vec::with_capacity(sessions.len()); + + for MasNewCompatSession { + session_id, + user_id, + device_id, + human_name, + created_at, + is_synapse_admin, + last_active_at, + last_active_ip, + user_agent, + } in sessions + { + session_ids.push(session_id); + user_ids.push(user_id); + device_ids.push(device_id); + human_names.push(human_name); + created_ats.push(created_at); + is_synapse_admins.push(is_synapse_admin); + last_active_ats.push(last_active_at); + last_active_ips.push(last_active_ip); + user_agents.push(user_agent); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_sessions ( + compat_session_id, user_id, + device_id, human_name, + created_at, is_synapse_admin, + last_active_at, last_active_ip, + user_agent) + SELECT * FROM UNNEST( + $1::UUID[], $2::UUID[], + $3::TEXT[], $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], + $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], + $9::TEXT[]) + "#, + &session_ids[..], + &user_ids[..], + &device_ids[..], + &human_names[..] as &[Option], + &created_ats[..], + &is_synapse_admins[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &last_active_ats[..] as &[Option>], + &last_active_ips[..] as &[Option], + &user_agents[..] as &[Option], + ) + .execute(&mut *conn) + .await + .into_database("writing compat sessions to MAS")?; + + Ok(()) + }) + }) + .boxed() + } } // How many entries to buffer at once, before writing a batch of rows to the @@ -839,8 +930,8 @@ mod test { use crate::{ mas_writer::{ - MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, - MasNewUserPassword, + MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, + MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, }, LockedMasDatabase, MasWriter, }; @@ -1105,4 +1196,41 @@ mod test { assert_db_snapshot!(&mut conn); } + + /// Tests writing a single user, with a device (compat session). + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_device(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + }]) + .await + .expect("failed to write user"); + + writer + .write_compat_sessions(vec![MasNewCompatSession { + user_id: Uuid::from_u128(1u128), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: "ADEVICE".to_owned(), + human_name: Some("alice's pinephone".to_owned()), + is_synapse_admin: true, + last_active_at: Some(DateTime::default()), + last_active_ip: Some("203.0.113.1".parse().unwrap()), + user_agent: Some("Browser/5.0".to_owned()), + }]) + .await + .expect("failed to write link"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } } diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap new file mode 100644 index 000000000..6e9dd8e14 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap @@ -0,0 +1,23 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +compat_sessions: + - compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + device_id: ADEVICE + finished_at: ~ + human_name: "alice's pinephone" + is_synapse_admin: "true" + last_active_at: "1970-01-01 00:00:00+00" + last_active_ip: 203.0.113.1/32 + user_agent: Browser/5.0 + user_id: 00000000-0000-0000-0000-000000000001 + user_session_id: ~ +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql index ee27b6ba8..e1df06f94 100644 --- a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql +++ b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql @@ -13,3 +13,6 @@ ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords; ALTER TABLE syn2mas__user_emails RENAME TO user_emails; ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids; ALTER TABLE syn2mas__upstream_oauth_links RENAME TO upstream_oauth_links; +ALTER TABLE syn2mas__compat_sessions RENAME TO compat_sessions; +ALTER TABLE syn2mas__compat_access_tokens RENAME TO compat_access_tokens; +ALTER TABLE syn2mas__compat_refresh_tokens RENAME TO compat_refresh_tokens; diff --git a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql index 4d07d2469..9cda82881 100644 --- a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql +++ b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql @@ -42,3 +42,6 @@ ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords; ALTER TABLE user_emails RENAME TO syn2mas__user_emails; ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids; ALTER TABLE upstream_oauth_links RENAME TO syn2mas__upstream_oauth_links; +ALTER TABLE compat_sessions RENAME TO syn2mas__compat_sessions; +ALTER TABLE compat_access_tokens RENAME TO syn2mas__compat_access_tokens; +ALTER TABLE compat_refresh_tokens RENAME TO syn2mas__compat_refresh_tokens; diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 9155ccf65..05b25551b 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -11,7 +11,10 @@ //! This module does not implement any of the safety checks that should be run //! *before* the migration. -use std::{collections::HashMap, pin::pin}; +use std::{ + collections::{HashMap, HashSet}, + pin::pin, +}; use chrono::{DateTime, Utc}; use compact_str::CompactString; @@ -25,11 +28,12 @@ use uuid::Uuid; use crate::{ mas_writer::{ - self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, - MasNewUserPassword, MasWriteBuffer, MasWriter, + self, MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, + MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, MasWriteBuffer, MasWriter, }, synapse_reader::{ - self, ExtractLocalpartError, FullUserId, SynapseExternalId, SynapseThreepid, SynapseUser, + self, ExtractLocalpartError, FullUserId, SynapseDevice, SynapseExternalId, SynapseThreepid, + SynapseUser, }, SynapseReader, }; @@ -66,12 +70,9 @@ pub enum Error { struct UsersMigrated { /// Lookup table from user localpart to that user's UUID in MAS. user_localparts_to_uuid: HashMap, -} -struct DevicesMigrated { - /// Lookup table from `(user_id, device_id)` pairs to the - /// UUID of the `compat_session` in MAS - devices_to_uuid: HashMap<(CompactString, CompactString), Uuid>, + /// Set of user UUIDs that correspond to Synapse admins + synapse_admins: HashSet, } /// Performs a migration from Synapse's database to MAS' database. @@ -127,18 +128,14 @@ pub async fn migrate( ) .await?; - let migrated_devices = migrate_devices( - synapse, - mas, - counts - .devices - .try_into() - .expect("More than usize::MAX devices — unable to handle this many!"), - server_name, - rng, - &migrated_users.user_localparts_to_uuid, - ) - .await?; + // `(MAS user_id, device_id)` mapped to `compat_session` ULID + let mut devices_to_compat_sessions: HashMap<(Uuid, CompactString), Uuid> = + HashMap::with_capacity( + counts + .devices + .try_into() + .expect("More than usize::MAX devices — unable to handle this many!"), + ); migrate_access_and_refresh_tokens( synapse, @@ -146,7 +143,18 @@ pub async fn migrate( server_name, rng, &migrated_users.user_localparts_to_uuid, - &migrated_devices.devices_to_uuid, + &mut devices_to_compat_sessions, + ) + .await?; + + migrate_devices( + synapse, + mas, + server_name, + rng, + &migrated_users.user_localparts_to_uuid, + &mut devices_to_compat_sessions, + &migrated_users.synapse_admins, ) .await?; @@ -166,11 +174,19 @@ async fn migrate_users( let mut users_stream = pin!(synapse.read_users()); // TODO is 1:1 capacity enough for a hashmap? let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint); + let mut synapse_admins = HashSet::new(); while let Some(user_res) = users_stream.next().await { let user = user_res.into_synapse("reading user")?; let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?; + if bool::from(user.admin) { + // Note down the fact that this user is a Synapse admin, + // because we will grant their existing devices the Synapse admin + // flag + synapse_admins.insert(mas_user.user_id); + } + user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id); user_buffer @@ -194,6 +210,7 @@ async fn migrate_users( Ok(UsersMigrated { user_localparts_to_uuid, + synapse_admins, }) } @@ -341,21 +358,100 @@ async fn migrate_external_ids( Ok(()) } + +/// Migrate devices from Synapse to MAS (as compat sessions). +/// +/// In order to get the right session creation timestamps, the access tokens +/// must counterintuitively be migrated first, with the ULIDs passed in as +/// `devices`. +/// +/// This is because only access tokens store a timestamp that in any way +/// resembles a creation timestamp. #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_devices( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, - device_count_hint: usize, server_name: &str, rng: &mut impl RngCore, user_localparts_to_uuid: &HashMap, -) -> Result { - // TODO is 1:1 enough capacity for a HashMap? - let mut devices_to_uuid = HashMap::with_capacity(device_count_hint); + devices: &mut HashMap<(Uuid, CompactString), Uuid>, + synapse_admins: &HashSet, +) -> Result<(), Error> { + let mut devices_stream = pin!(synapse.read_devices()); + let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions); - todo!(); + while let Some(device_res) = devices_stream.next().await { + let SynapseDevice { + user_id: synapse_user_id, + device_id, + display_name, + last_seen, + ip, + user_agent, + } = device_res.into_synapse("reading Synapse device")?; - Ok(DevicesMigrated { devices_to_uuid }) + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "devices".to_owned(), + user: synapse_user_id, + }); + }; + + let session_id = *devices + .entry((user_id, CompactString::new(&device_id))) + .or_insert_with(|| + // We don't have a creation time for this device (as it has no access token), + // so use now as a least-evil fallback. + Ulid::with_source(rng).into()); + let created_at = Ulid::from(session_id).datetime().into(); + + // As we're using a real IP type in the MAS database, it is possible + // that we encounter invalid IP addresses in the Synapse database. + // In that case, we should ignore them, but still log a warning. + let last_active_ip = ip.and_then(|ip| { + ip.parse() + .map_err(|e| { + tracing::warn!( + error = &e as &dyn std::error::Error, + mxid = %synapse_user_id, + %device_id, + %ip, + "Failed to parse device IP, ignoring" + ); + }) + .ok() + }); + + // TODO skip access tokens for deactivated users + write_buffer + .write( + mas, + MasNewCompatSession { + session_id, + user_id, + device_id, + human_name: display_name, + created_at, + is_synapse_admin: synapse_admins.contains(&user_id), + last_active_at: last_seen.map(DateTime::from), + last_active_ip, + user_agent, + }, + ) + .await + .into_mas("writing compat sessions")?; + } + + write_buffer + .finish(mas) + .await + .into_mas("writing compat sessions")?; + + Ok(()) } #[tracing::instrument(skip_all, level = Level::INFO)] @@ -365,8 +461,11 @@ async fn migrate_access_and_refresh_tokens( server_name: &str, rng: &mut impl RngCore, user_localparts_to_uuid: &HashMap, - devices: &HashMap<(CompactString, CompactString), Uuid>, + devices: &mut HashMap<(Uuid, CompactString), Uuid>, ) -> Result<(), Error> { + let mut access_token_stream = pin!(synapse.read_access_tokens()); + // let mut write_buffer = + // MasWriteBuffer::new(MasWriter::write_compat_access_token); todo!(); Ok(()) From 900413cc4331e97be55e9d71010a83781aca82d4 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 30 Jan 2025 11:56:46 +0000 Subject: [PATCH 07/11] Add `MasWriter` support for compat access tokens --- ...ad976c3a0ff238046872b17d3f412beda62c7.json | 18 +++ crates/syn2mas/src/mas_writer/mod.rs | 127 +++++++++++++++++- ...r__test__write_user_with_access_token.snap | 29 ++++ 3 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json create mode 100644 crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap diff --git a/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json b/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json new file mode 100644 index 000000000..eb406d23b --- /dev/null +++ b/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_access_tokens (\n compat_access_token_id,\n compat_session_id,\n access_token,\n created_at,\n expires_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::TEXT[],\n $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7" +} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index f54405933..d9b001bd3 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -242,6 +242,14 @@ pub struct MasNewCompatSession { pub user_agent: Option, } +pub struct MasNewCompatAccessToken { + pub token_id: Uuid, + pub session_id: Uuid, + pub access_token: String, + pub created_at: DateTime, + pub expires_at: Option>, +} + /// The 'version' of the password hashing scheme used for passwords when they /// are migrated from Synapse to MAS. /// This is version 1, as in the previous syn2mas script. @@ -255,6 +263,9 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[ "user_emails", "user_unsupported_third_party_ids", "upstream_oauth_links", + "compat_sessions", + "compat_access_tokens", + "compat_refresh_tokens", ]; /// Detect whether a syn2mas migration has started on the given database. @@ -852,6 +863,68 @@ impl<'conn> MasWriter<'conn> { }) .boxed() } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn write_compat_access_tokens( + &mut self, + tokens: Vec, + ) -> BoxFuture<'_, Result<(), Error>> { + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + let mut token_ids: Vec = Vec::with_capacity(tokens.len()); + let mut session_ids: Vec = Vec::with_capacity(tokens.len()); + let mut access_tokens: Vec = Vec::with_capacity(tokens.len()); + let mut created_ats: Vec> = Vec::with_capacity(tokens.len()); + let mut expires_ats: Vec>> = + Vec::with_capacity(tokens.len()); + + for MasNewCompatAccessToken { + token_id, + session_id, + access_token, + created_at, + expires_at, + } in tokens + { + token_ids.push(token_id); + session_ids.push(session_id); + access_tokens.push(access_token); + created_ats.push(created_at); + expires_ats.push(expires_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_access_tokens ( + compat_access_token_id, + compat_session_id, + access_token, + created_at, + expires_at) + SELECT * FROM UNNEST( + $1::UUID[], + $2::UUID[], + $3::TEXT[], + $4::TIMESTAMP WITH TIME ZONE[], + $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &token_ids[..], + &session_ids[..], + &access_tokens[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &expires_ats[..] as &[Option>], + ) + .execute(&mut *conn) + .await + .into_database("writing compat access tokens to MAS")?; + + Ok(()) + }) + }) + .boxed() + } } // How many entries to buffer at once, before writing a batch of rows to the @@ -930,8 +1003,8 @@ mod test { use crate::{ mas_writer::{ - MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, - MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, + MasNewCompatAccessToken, MasNewCompatSession, MasNewEmailThreepid, + MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, }, LockedMasDatabase, MasWriter, }; @@ -1227,7 +1300,55 @@ mod test { user_agent: Some("Browser/5.0".to_owned()), }]) .await - .expect("failed to write link"); + .expect("failed to write compat session"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } + + /// Tests writing a single user, with a device and an access token. + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_access_token(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + }]) + .await + .expect("failed to write user"); + + writer + .write_compat_sessions(vec![MasNewCompatSession { + user_id: Uuid::from_u128(1u128), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: "ADEVICE".to_owned(), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }]) + .await + .expect("failed to write compat session"); + + writer + .write_compat_access_tokens(vec![MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }]) + .await + .expect("failed to write access token"); writer.finish().await.expect("failed to finish MasWriter"); diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap new file mode 100644 index 000000000..81cb99515 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap @@ -0,0 +1,29 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +compat_access_tokens: + - access_token: syt_zxcvzxcvzxcvzxcv_zxcv + compat_access_token_id: 00000000-0000-0000-0000-000000000006 + compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + expires_at: ~ +compat_sessions: + - compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + device_id: ADEVICE + finished_at: ~ + human_name: ~ + is_synapse_admin: "false" + last_active_at: ~ + last_active_ip: ~ + user_agent: ~ + user_id: 00000000-0000-0000-0000-000000000001 + user_session_id: ~ +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice From 73ccecf8e883b804d8ec726176d3d1ed9bde3309 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 30 Jan 2025 16:33:19 +0000 Subject: [PATCH 08/11] Add `MasWriter` support for compat refresh tokens + some migration progress --- Cargo.lock | 1 + crates/cli/src/commands/syn2mas.rs | 2 + ...e5d8cac3836701fc24922f4f0e8b98d330796.json | 18 ++ crates/syn2mas/Cargo.toml | 1 + crates/syn2mas/src/mas_writer/mod.rs | 143 ++++++++++++++- ...__test__write_user_with_refresh_token.snap | 36 ++++ crates/syn2mas/src/migration.rs | 164 ++++++++++++++++-- crates/syn2mas/src/synapse_reader/mod.rs | 5 +- ..._test__read_access_and_refresh_tokens.snap | 3 - ...napse_reader__test__read_access_token.snap | 1 - 10 files changed, 349 insertions(+), 25 deletions(-) create mode 100644 crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json create mode 100644 crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap diff --git a/Cargo.lock b/Cargo.lock index 5d39fe716..2b325bc96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6113,6 +6113,7 @@ dependencies = [ "futures-util", "insta", "mas-config", + "mas-storage", "mas-storage-pg", "rand", "serde", diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs index e9bd199bf..63fe440eb 100644 --- a/crates/cli/src/commands/syn2mas.rs +++ b/crates/cli/src/commands/syn2mas.rs @@ -223,6 +223,7 @@ impl Options { } let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?; + let clock = SystemClock::default(); // TODO is this rng ok? #[allow(clippy::disallowed_methods)] let mut rng = thread_rng(); @@ -233,6 +234,7 @@ impl Options { &mut reader, &mut writer, &mas_matrix.homeserver, + &clock, &mut rng, &provider_id_mappings, ) diff --git a/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json b/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json new file mode 100644 index 000000000..cb251624d --- /dev/null +++ b/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_refresh_tokens (\n compat_refresh_token_id,\n compat_session_id,\n compat_access_token_id,\n refresh_token,\n created_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::UUID[],\n $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796" +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index a7075c7f0..2fc8dd777 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -28,6 +28,7 @@ uuid = "1.10.0" ulid = { workspace = true, features = ["uuid"] } mas-config.workspace = true +mas-storage.workspace = true [dev-dependencies] mas-storage-pg.workspace = true diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index d9b001bd3..6706b15b2 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -233,7 +233,7 @@ pub struct MasNewUpstreamOauthLink { pub struct MasNewCompatSession { pub session_id: Uuid, pub user_id: Uuid, - pub device_id: String, + pub device_id: Option, pub human_name: Option, pub created_at: DateTime, pub is_synapse_admin: bool, @@ -250,6 +250,14 @@ pub struct MasNewCompatAccessToken { pub expires_at: Option>, } +pub struct MasNewCompatRefreshToken { + pub refresh_token_id: Uuid, + pub session_id: Uuid, + pub access_token_id: Uuid, + pub refresh_token: String, + pub created_at: DateTime, +} + /// The 'version' of the password hashing scheme used for passwords when they /// are migrated from Synapse to MAS. /// This is version 1, as in the previous syn2mas script. @@ -795,7 +803,7 @@ impl<'conn> MasWriter<'conn> { Box::pin(async move { let mut session_ids: Vec = Vec::with_capacity(sessions.len()); let mut user_ids: Vec = Vec::with_capacity(sessions.len()); - let mut device_ids: Vec = Vec::with_capacity(sessions.len()); + let mut device_ids: Vec> = Vec::with_capacity(sessions.len()); let mut human_names: Vec> = Vec::with_capacity(sessions.len()); let mut created_ats: Vec> = Vec::with_capacity(sessions.len()); let mut is_synapse_admins: Vec = Vec::with_capacity(sessions.len()); @@ -845,7 +853,7 @@ impl<'conn> MasWriter<'conn> { "#, &session_ids[..], &user_ids[..], - &device_ids[..], + &device_ids[..] as &[Option], &human_names[..] as &[Option], &created_ats[..], &is_synapse_admins[..], @@ -925,6 +933,66 @@ impl<'conn> MasWriter<'conn> { }) .boxed() } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn write_compat_refresh_tokens( + &mut self, + tokens: Vec, + ) -> BoxFuture<'_, Result<(), Error>> { + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + let mut refresh_token_ids: Vec = Vec::with_capacity(tokens.len()); + let mut session_ids: Vec = Vec::with_capacity(tokens.len()); + let mut access_token_ids: Vec = Vec::with_capacity(tokens.len()); + let mut refresh_tokens: Vec = Vec::with_capacity(tokens.len()); + let mut created_ats: Vec> = Vec::with_capacity(tokens.len()); + + for MasNewCompatRefreshToken { + refresh_token_id, + session_id, + access_token_id, + refresh_token, + created_at, + } in tokens + { + refresh_token_ids.push(refresh_token_id); + session_ids.push(session_id); + access_token_ids.push(access_token_id); + refresh_tokens.push(refresh_token); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_refresh_tokens ( + compat_refresh_token_id, + compat_session_id, + compat_access_token_id, + refresh_token, + created_at) + SELECT * FROM UNNEST( + $1::UUID[], + $2::UUID[], + $3::UUID[], + $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &refresh_token_ids[..], + &session_ids[..], + &access_token_ids[..], + &refresh_tokens[..], + &created_ats[..], + ) + .execute(&mut *conn) + .await + .into_database("writing compat refresh tokens to MAS")?; + + Ok(()) + }) + }) + .boxed() + } } // How many entries to buffer at once, before writing a batch of rows to the @@ -1003,8 +1071,9 @@ mod test { use crate::{ mas_writer::{ - MasNewCompatAccessToken, MasNewCompatSession, MasNewEmailThreepid, - MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, + MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession, + MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, + MasNewUserPassword, }, LockedMasDatabase, MasWriter, }; @@ -1292,7 +1361,7 @@ mod test { user_id: Uuid::from_u128(1u128), session_id: Uuid::from_u128(5u128), created_at: DateTime::default(), - device_id: "ADEVICE".to_owned(), + device_id: Some("ADEVICE".to_owned()), human_name: Some("alice's pinephone".to_owned()), is_synapse_admin: true, last_active_at: Some(DateTime::default()), @@ -1329,7 +1398,7 @@ mod test { user_id: Uuid::from_u128(1u128), session_id: Uuid::from_u128(5u128), created_at: DateTime::default(), - device_id: "ADEVICE".to_owned(), + device_id: Some("ADEVICE".to_owned()), human_name: None, is_synapse_admin: false, last_active_at: None, @@ -1354,4 +1423,64 @@ mod test { assert_db_snapshot!(&mut conn); } + + /// Tests writing a single user, with a device, an access token and a + /// refresh token. + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_refresh_token(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + }]) + .await + .expect("failed to write user"); + + writer + .write_compat_sessions(vec![MasNewCompatSession { + user_id: Uuid::from_u128(1u128), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }]) + .await + .expect("failed to write compat session"); + + writer + .write_compat_access_tokens(vec![MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }]) + .await + .expect("failed to write access token"); + + writer + .write_compat_refresh_tokens(vec![MasNewCompatRefreshToken { + refresh_token_id: Uuid::from_u128(7u128), + session_id: Uuid::from_u128(5u128), + access_token_id: Uuid::from_u128(6u128), + refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + }]) + .await + .expect("failed to write refresh token"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } } diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap new file mode 100644 index 000000000..0e7714f96 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap @@ -0,0 +1,36 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +compat_access_tokens: + - access_token: syt_zxcvzxcvzxcvzxcv_zxcv + compat_access_token_id: 00000000-0000-0000-0000-000000000006 + compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + expires_at: ~ +compat_refresh_tokens: + - compat_access_token_id: 00000000-0000-0000-0000-000000000006 + compat_refresh_token_id: 00000000-0000-0000-0000-000000000007 + compat_session_id: 00000000-0000-0000-0000-000000000005 + consumed_at: ~ + created_at: "1970-01-01 00:00:00+00" + refresh_token: syr_zxcvzxcvzxcvzxcv_zxcv +compat_sessions: + - compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + device_id: ADEVICE + finished_at: ~ + human_name: ~ + is_synapse_admin: "false" + last_active_at: ~ + last_active_ip: ~ + user_agent: ~ + user_id: 00000000-0000-0000-0000-000000000001 + user_session_id: ~ +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 05b25551b..2b0533c00 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -19,6 +19,7 @@ use std::{ use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::StreamExt as _; +use mas_storage::Clock; use rand::RngCore; use thiserror::Error; use thiserror_ext::ContextInto; @@ -28,12 +29,13 @@ use uuid::Uuid; use crate::{ mas_writer::{ - self, MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, - MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, MasWriteBuffer, MasWriter, + self, MasNewCompatAccessToken, MasNewCompatSession, MasNewEmailThreepid, + MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, + MasWriteBuffer, MasWriter, }, synapse_reader::{ - self, ExtractLocalpartError, FullUserId, SynapseDevice, SynapseExternalId, SynapseThreepid, - SynapseUser, + self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, + SynapseExternalId, SynapseRefreshToken, SynapseThreepid, SynapseUser, }, SynapseReader, }; @@ -92,6 +94,7 @@ pub async fn migrate( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, server_name: &str, + clock: &dyn Clock, rng: &mut impl RngCore, provider_id_mapping: &HashMap, ) -> Result<(), Error> { @@ -137,7 +140,18 @@ pub async fn migrate( .expect("More than usize::MAX devices — unable to handle this many!"), ); - migrate_access_and_refresh_tokens( + migrate_access_tokens( + synapse, + mas, + server_name, + clock, + rng, + &migrated_users.user_localparts_to_uuid, + &mut devices_to_compat_sessions, + ) + .await?; + + migrate_refresh_tokens( synapse, mas, server_name, @@ -433,7 +447,7 @@ async fn migrate_devices( MasNewCompatSession { session_id, user_id, - device_id, + device_id: Some(device_id), human_name: display_name, created_at, is_synapse_admin: synapse_admins.contains(&user_id), @@ -455,7 +469,110 @@ async fn migrate_devices( } #[tracing::instrument(skip_all, level = Level::INFO)] -async fn migrate_access_and_refresh_tokens( +async fn migrate_access_tokens( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + clock: &dyn Clock, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, + devices: &mut HashMap<(Uuid, CompactString), Uuid>, +) -> Result<(), Error> { + let mut token_stream = pin!(synapse.read_access_tokens()); + let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens); + let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions); + + while let Some(token_res) = token_stream.next().await { + let SynapseAccessToken { + user_id: synapse_user_id, + device_id, + token, + valid_until_ms, + last_validated, + } = token_res.into_synapse("reading Synapse access token")?; + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "devices".to_owned(), + user: synapse_user_id, + }); + }; + + // It's not always accurate, but last_validated is *often* the creation time of + // the device If we don't have one, then use the current time as a + // fallback. + let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from); + + let session_id = if let Some(device_id) = device_id { + // Use the existing device_id if this is the second token for a device + *devices + .entry((user_id, CompactString::new(&device_id))) + .or_insert_with(|| { + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)) + }) + } else { + // If this is a deviceless access token, create a deviceless compat session + // for it (since otherwise we won't create one whilst migrating devices) + let deviceless_session_id = + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + + deviceless_session_write_buffer + .write( + mas, + MasNewCompatSession { + session_id: deviceless_session_id, + user_id, + device_id: None, + human_name: None, + created_at, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }, + ) + .await + .into_mas("failed to write deviceless compat sessions")?; + + deviceless_session_id + }; + + let token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + + // TODO skip access tokens for deactivated users + write_buffer + .write( + mas, + MasNewCompatAccessToken { + token_id, + session_id, + access_token: token, + created_at, + expires_at: valid_until_ms.map(DateTime::from), + }, + ) + .await + .into_mas("writing compat access tokens")?; + } + + write_buffer + .finish(mas) + .await + .into_mas("writing compat access tokens")?; + deviceless_session_write_buffer + .finish(mas) + .await + .into_mas("writing deviceless compat sessions")?; + + Ok(()) +} + +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_refresh_tokens( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, server_name: &str, @@ -463,10 +580,35 @@ async fn migrate_access_and_refresh_tokens( user_localparts_to_uuid: &HashMap, devices: &mut HashMap<(Uuid, CompactString), Uuid>, ) -> Result<(), Error> { - let mut access_token_stream = pin!(synapse.read_access_tokens()); - // let mut write_buffer = - // MasWriteBuffer::new(MasWriter::write_compat_access_token); - todo!(); + let mut token_stream = pin!(synapse.read_refresh_tokens()); + let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens); + + while let Some(token_res) = token_stream.next().await { + let SynapseRefreshToken { + user_id: synapse_user_id, + device_id, + token, + id, + } = token_res.into_synapse("reading Synapse refresh token")?; + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "devices".to_owned(), + user: synapse_user_id, + }); + }; + + todo!() + } + + write_buffer + .finish(mas) + .await + .into_mas("writing compat refresh tokens")?; Ok(()) } diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index bad012a1f..ff28fafb4 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -12,7 +12,7 @@ use std::fmt::Display; use chrono::{DateTime, Utc}; use futures_util::{Stream, TryStreamExt}; -use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Row, Transaction, Type}; +use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Transaction, Type}; use thiserror::Error; use thiserror_ext::ContextInto; @@ -228,7 +228,6 @@ pub struct SynapseAccessToken { pub token: String, pub valid_until_ms: Option, pub last_validated: Option, - pub refresh_token_id: Option, } /// Row of the `refresh_tokens` table in Synapse. @@ -426,7 +425,7 @@ impl<'conn> SynapseReader<'conn> { sqlx::query_as( " SELECT - at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated, at0.refresh_token_id + at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated FROM access_tokens at0 LEFT JOIN refresh_tokens rt0 ON at0.refresh_token_id = rt0.id LEFT JOIN access_tokens at1 ON rt0.next_token_id = at1.refresh_token_id diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap index 6bcaf42d6..4bdbe2a4b 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap @@ -13,8 +13,5 @@ expression: access_tokens token: "syt_AAAAAAAAAAAAAA_AAAA", valid_until_ms: None, last_validated: None, - refresh_token_id: Some( - 8, - ), }, } diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap index 1e761d446..038f6bde2 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap @@ -13,6 +13,5 @@ expression: access_tokens token: "syt_aaaaaaaaaaaaaa_aaaa", valid_until_ms: None, last_validated: None, - refresh_token_id: None, }, } From dc305ddc40ae494a5fe6f41910c7c56c2990e750 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 30 Jan 2025 16:55:44 +0000 Subject: [PATCH 09/11] Split access tokens between refreshable and unrefreshable ones --- crates/syn2mas/src/migration.rs | 90 +++++++++++++++---- crates/syn2mas/src/synapse_reader/mod.rs | 60 +++++++------ ...est__read_access_and_refresh_tokens-2.snap | 14 --- ..._test__read_access_and_refresh_tokens.snap | 11 ++- ...ad_access_and_unused_refresh_tokens-2.snap | 22 ----- ...read_access_and_unused_refresh_tokens.snap | 26 ++---- 6 files changed, 118 insertions(+), 105 deletions(-) delete mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap delete mode 100644 crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 2b0533c00..90e06b43e 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -29,13 +29,13 @@ use uuid::Uuid; use crate::{ mas_writer::{ - self, MasNewCompatAccessToken, MasNewCompatSession, MasNewEmailThreepid, - MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, - MasWriteBuffer, MasWriter, + self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession, + MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, + MasNewUserPassword, MasWriteBuffer, MasWriter, }, synapse_reader::{ self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, - SynapseExternalId, SynapseRefreshToken, SynapseThreepid, SynapseUser, + SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser, }, SynapseReader, }; @@ -140,7 +140,7 @@ pub async fn migrate( .expect("More than usize::MAX devices — unable to handle this many!"), ); - migrate_access_tokens( + migrate_unrefreshable_access_tokens( synapse, mas, server_name, @@ -151,10 +151,11 @@ pub async fn migrate( ) .await?; - migrate_refresh_tokens( + migrate_refreshable_token_pairs( synapse, mas, server_name, + clock, rng, &migrated_users.user_localparts_to_uuid, &mut devices_to_compat_sessions, @@ -468,8 +469,10 @@ async fn migrate_devices( Ok(()) } +/// Migrates unrefreshable access tokens (those without an associated refresh +/// token). Some of these may be deviceless. #[tracing::instrument(skip_all, level = Level::INFO)] -async fn migrate_access_tokens( +async fn migrate_unrefreshable_access_tokens( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, server_name: &str, @@ -478,7 +481,7 @@ async fn migrate_access_tokens( user_localparts_to_uuid: &HashMap, devices: &mut HashMap<(Uuid, CompactString), Uuid>, ) -> Result<(), Error> { - let mut token_stream = pin!(synapse.read_access_tokens()); + let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens()); let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens); let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions); @@ -497,7 +500,7 @@ async fn migrate_access_tokens( .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { return Err(Error::MissingUserFromDependentTable { - table: "devices".to_owned(), + table: "access_tokens".to_owned(), user: synapse_user_id, }); }; @@ -571,24 +574,31 @@ async fn migrate_access_tokens( Ok(()) } +/// Migrates (access token, refresh token) pairs. +/// Does not migrate non-refreshable access tokens. #[tracing::instrument(skip_all, level = Level::INFO)] -async fn migrate_refresh_tokens( +async fn migrate_refreshable_token_pairs( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, server_name: &str, + clock: &dyn Clock, rng: &mut impl RngCore, user_localparts_to_uuid: &HashMap, devices: &mut HashMap<(Uuid, CompactString), Uuid>, ) -> Result<(), Error> { - let mut token_stream = pin!(synapse.read_refresh_tokens()); - let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens); + let mut token_stream = pin!(synapse.read_refreshable_token_pairs()); + let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens); + let mut refresh_token_write_buffer = + MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens); while let Some(token_res) = token_stream.next().await { - let SynapseRefreshToken { + let SynapseRefreshableTokenPair { user_id: synapse_user_id, device_id, - token, - id, + access_token, + refresh_token, + valid_until_ms, + last_validated, } = token_res.into_synapse("reading Synapse refresh token")?; let username = synapse_user_id @@ -597,15 +607,59 @@ async fn migrate_refresh_tokens( .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { return Err(Error::MissingUserFromDependentTable { - table: "devices".to_owned(), + table: "refresh_tokens".to_owned(), user: synapse_user_id, }); }; - todo!() + // It's not always accurate, but last_validated is *often* the creation time of + // the device If we don't have one, then use the current time as a + // fallback. + let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from); + + // Use the existing device_id if this is the second token for a device + let session_id = *devices + .entry((user_id, CompactString::new(&device_id))) + .or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng))); + + let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + + // TODO skip access tokens for deactivated users + access_token_write_buffer + .write( + mas, + MasNewCompatAccessToken { + token_id: access_token_id, + session_id, + access_token, + created_at, + expires_at: valid_until_ms.map(DateTime::from), + }, + ) + .await + .into_mas("writing compat access tokens")?; + refresh_token_write_buffer + .write( + mas, + MasNewCompatRefreshToken { + refresh_token_id, + session_id, + access_token_id, + refresh_token, + created_at, + }, + ) + .await + .into_mas("writing compat refresh tokens")?; } - write_buffer + access_token_write_buffer + .finish(mas) + .await + .into_mas("writing compat access tokens")?; + + refresh_token_write_buffer .finish(mas) .await .into_mas("writing compat refresh tokens")?; diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index ff28fafb4..93f7fcf53 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -232,11 +232,13 @@ pub struct SynapseAccessToken { /// Row of the `refresh_tokens` table in Synapse. #[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] -pub struct SynapseRefreshToken { - pub id: i64, +pub struct SynapseRefreshableTokenPair { pub user_id: FullUserId, pub device_id: String, - pub token: String, + pub access_token: String, + pub refresh_token: String, + pub valid_until_ms: Option, + pub last_validated: Option, } /// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. @@ -413,13 +415,10 @@ impl<'conn> SynapseReader<'conn> { .map_err(|err| err.into_database("reading Synapse devices")) } - /// Reads access tokens from the Synapse database. + /// Reads unrefreshable access tokens from the Synapse database. /// This does not include access tokens used for puppetting users, as those - /// are not supported by MAS. This also does not include access tokens - /// which have been made obsolete by using the associated refresh token - /// and then acknowledging the successor access token by using it to - /// authenticate a request. - pub fn read_access_tokens( + /// are not supported by MAS. + pub fn read_unrefreshable_access_tokens( &mut self, ) -> impl Stream> + '_ { sqlx::query_as( @@ -427,17 +426,15 @@ impl<'conn> SynapseReader<'conn> { SELECT at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated FROM access_tokens at0 - LEFT JOIN refresh_tokens rt0 ON at0.refresh_token_id = rt0.id - LEFT JOIN access_tokens at1 ON rt0.next_token_id = at1.refresh_token_id - WHERE at0.puppets_user_id IS NULL AND (NOT at1.used OR at1.used IS NULL) + WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL ", ) .fetch(&mut *self.txn) .map_err(|err| err.into_database("reading Synapse access tokens")) } - /// Reads refresh tokens from the Synapse database. - /// This also does not include refresh tokens which have been made obsolete + /// Reads (access token, refresh token) pairs from the Synapse database. + /// This does not include token pairs which have been made obsolete /// by using the refresh token and then acknowledging the /// successor access token by using it to authenticate a request. /// @@ -445,14 +442,15 @@ impl<'conn> SynapseReader<'conn> { /// they are not implemented in MAS. /// Further, they are unused by any real-world deployment to the best of /// our knowledge. - pub fn read_refresh_tokens( + pub fn read_refreshable_token_pairs( &mut self, - ) -> impl Stream> + '_ { + ) -> impl Stream> + '_ { sqlx::query_as( " SELECT - rt0.id, rt0.user_id, rt0.device_id, rt0.token, rt0.next_token_id + rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated FROM refresh_tokens rt0 + LEFT JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id WHERE NOT at1.used OR at1.used IS NULL ", @@ -472,7 +470,7 @@ mod test { use crate::{ synapse_reader::{ - SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshToken, + SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser, }, SynapseReader, @@ -553,7 +551,7 @@ mod test { .expect("failed to make SynapseReader"); let access_tokens: BTreeSet = reader - .read_access_tokens() + .read_unrefreshable_access_tokens() .try_collect() .await .expect("failed to read Synapse access tokens"); @@ -573,7 +571,7 @@ mod test { .expect("failed to make SynapseReader"); let access_tokens: BTreeSet = reader - .read_access_tokens() + .read_unrefreshable_access_tokens() .try_collect() .await .expect("failed to read Synapse access tokens"); @@ -592,18 +590,21 @@ mod test { .expect("failed to make SynapseReader"); let access_tokens: BTreeSet = reader - .read_access_tokens() + .read_unrefreshable_access_tokens() .try_collect() .await .expect("failed to read Synapse access tokens"); - let refresh_tokens: BTreeSet = reader - .read_refresh_tokens() + let refresh_tokens: BTreeSet = reader + .read_refreshable_token_pairs() .try_collect() .await .expect("failed to read Synapse refresh tokens"); - assert_debug_snapshot!(access_tokens); + assert!( + access_tokens.is_empty(), + "there are no unrefreshable access tokens" + ); assert_debug_snapshot!(refresh_tokens); } @@ -618,18 +619,21 @@ mod test { .expect("failed to make SynapseReader"); let access_tokens: BTreeSet = reader - .read_access_tokens() + .read_unrefreshable_access_tokens() .try_collect() .await .expect("failed to read Synapse access tokens"); - let refresh_tokens: BTreeSet = reader - .read_refresh_tokens() + let refresh_tokens: BTreeSet = reader + .read_refreshable_token_pairs() .try_collect() .await .expect("failed to read Synapse refresh tokens"); - assert_debug_snapshot!(access_tokens); + assert!( + access_tokens.is_empty(), + "there are no unrefreshable access tokens" + ); assert_debug_snapshot!(refresh_tokens); } } diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap deleted file mode 100644 index d3ebc062e..000000000 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap +++ /dev/null @@ -1,14 +0,0 @@ ---- -source: crates/syn2mas/src/synapse_reader/mod.rs -expression: refresh_tokens ---- -{ - SynapseRefreshToken { - id: 8, - user_id: FullUserId( - "@alice:example.com", - ), - device_id: "ADEVICE", - token: "syr_cccccccccccc_cccc", - }, -} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap index 4bdbe2a4b..fa0ce3a19 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap @@ -1,16 +1,15 @@ --- source: crates/syn2mas/src/synapse_reader/mod.rs -expression: access_tokens +expression: refresh_tokens --- { - SynapseAccessToken { + SynapseRefreshableTokenPair { user_id: FullUserId( "@alice:example.com", ), - device_id: Some( - "ADEVICE", - ), - token: "syt_AAAAAAAAAAAAAA_AAAA", + device_id: "ADEVICE", + access_token: "syt_AAAAAAAAAAAAAA_AAAA", + refresh_token: "syr_cccccccccccc_cccc", valid_until_ms: None, last_validated: None, }, diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap deleted file mode 100644 index e26bf9c0f..000000000 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens-2.snap +++ /dev/null @@ -1,22 +0,0 @@ ---- -source: crates/syn2mas/src/synapse_reader/mod.rs -expression: refresh_tokens ---- -{ - SynapseRefreshToken { - id: 7, - user_id: FullUserId( - "@alice:example.com", - ), - device_id: "ADEVICE", - token: "syr_bbbbbbbbbbbbb_bbbb", - }, - SynapseRefreshToken { - id: 8, - user_id: FullUserId( - "@alice:example.com", - ), - device_id: "ADEVICE", - token: "syr_cccccccccccc_cccc", - }, -} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap index 0c84d8f6b..cb34a5931 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap @@ -1,34 +1,26 @@ --- source: crates/syn2mas/src/synapse_reader/mod.rs -expression: access_tokens +expression: refresh_tokens --- { - SynapseAccessToken { + SynapseRefreshableTokenPair { user_id: FullUserId( "@alice:example.com", ), - device_id: Some( - "ADEVICE", - ), - token: "syt_AAAAAAAAAAAAAA_AAAA", + device_id: "ADEVICE", + access_token: "syt_AAAAAAAAAAAAAA_AAAA", + refresh_token: "syr_cccccccccccc_cccc", valid_until_ms: None, last_validated: None, - refresh_token_id: Some( - 8, - ), }, - SynapseAccessToken { + SynapseRefreshableTokenPair { user_id: FullUserId( "@alice:example.com", ), - device_id: Some( - "ADEVICE", - ), - token: "syt_aaaaaaaaaaaaaa_aaaa", + device_id: "ADEVICE", + access_token: "syt_aaaaaaaaaaaaaa_aaaa", + refresh_token: "syr_bbbbbbbbbbbbb_bbbb", valid_until_ms: None, last_validated: None, - refresh_token_id: Some( - 7, - ), }, } From ac58b4f3262977f2e75f63e0eb7561c7d8207823 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 30 Jan 2025 17:17:32 +0000 Subject: [PATCH 10/11] Support reading and writing guests --- .../20250130170011_user_is_guest.sql | 10 ++ ...c9e3b5de7af6a497ad84036ae104576ae0575.json | 19 +++ ...d57488c930fe431311e53e5e1af6fb1d4e56f.json | 18 --- crates/syn2mas/src/mas_writer/mod.rs | 114 +++++++++++------- ...syn2mas__mas_writer__test__write_user.snap | 1 + ...r__test__write_user_with_access_token.snap | 1 + ..._writer__test__write_user_with_device.snap | 1 + ...s_writer__test__write_user_with_email.snap | 1 + ...riter__test__write_user_with_password.snap | 1 + ...__test__write_user_with_refresh_token.snap | 1 + ..._write_user_with_unsupported_threepid.snap | 1 + ...rite_user_with_upstream_provider_link.snap | 1 + crates/syn2mas/src/migration.rs | 1 + crates/syn2mas/src/synapse_reader/mod.rs | 12 +- ...mas__synapse_reader__test__read_users.snap | 3 + 15 files changed, 118 insertions(+), 67 deletions(-) create mode 100644 crates/storage-pg/migrations/20250130170011_user_is_guest.sql create mode 100644 crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json delete mode 100644 crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json diff --git a/crates/storage-pg/migrations/20250130170011_user_is_guest.sql b/crates/storage-pg/migrations/20250130170011_user_is_guest.sql new file mode 100644 index 000000000..1ca8ce573 --- /dev/null +++ b/crates/storage-pg/migrations/20250130170011_user_is_guest.sql @@ -0,0 +1,10 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +ALTER TABLE users + -- Track whether users are guests. + -- Although guest support is not present in MAS yet, syn2mas should import + -- these users and therefore we should track their state. + ADD COLUMN is_guest BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json b/crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json new file mode 100644 index 000000000..b52cece0d --- /dev/null +++ b/crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__users (\n user_id, username,\n created_at, locked_at,\n can_request_admin, is_guest)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::TEXT[],\n $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[],\n $5::BOOL[], $6::BOOL[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray", + "BoolArray", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575" +} diff --git a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json deleted file mode 100644 index d8be21736..000000000 --- a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "TextArray", - "TimestamptzArray", - "TimestamptzArray", - "BoolArray" - ] - }, - "nullable": [] - }, - "hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f" -} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 6706b15b2..f1362702c 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -199,6 +199,10 @@ pub struct MasNewUser { pub created_at: DateTime, pub locked_at: Option>, pub can_request_admin: bool, + /// Whether the user was a Synapse guest. + /// Although MAS doesn't support guest access, it's still useful to track + /// for the future. + pub is_guest: bool, } pub struct MasNewUserPassword { @@ -563,52 +567,66 @@ impl<'conn> MasWriter<'conn> { #[allow(clippy::missing_panics_doc)] // not a real panic #[tracing::instrument(skip_all, level = Level::DEBUG)] pub fn write_users(&mut self, users: Vec) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { - // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement - // without having to change the statement SQL thus altering the query plan. - // See . - // In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement, - // which is allegedly the best for insert performance, but is less simple to encode. - if users.is_empty() { - return Ok(()); - } + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows + // in one statement without having to change the statement + // SQL thus altering the query plan. See . + // In the future we could consider using sqlx's support for `PgCopyIn` / the + // `COPY FROM STDIN` statement, which is allegedly the best + // for insert performance, but is less simple to encode. + let mut user_ids: Vec = Vec::with_capacity(users.len()); + let mut usernames: Vec = Vec::with_capacity(users.len()); + let mut created_ats: Vec> = Vec::with_capacity(users.len()); + let mut locked_ats: Vec>> = + Vec::with_capacity(users.len()); + let mut can_request_admins: Vec = Vec::with_capacity(users.len()); + let mut is_guests: Vec = Vec::with_capacity(users.len()); + for MasNewUser { + user_id, + username, + created_at, + locked_at, + can_request_admin, + is_guest, + } in users + { + user_ids.push(user_id); + usernames.push(username); + created_ats.push(created_at); + locked_ats.push(locked_at); + can_request_admins.push(can_request_admin); + is_guests.push(is_guest); + } - let mut user_ids: Vec = Vec::with_capacity(users.len()); - let mut usernames: Vec = Vec::with_capacity(users.len()); - let mut created_ats: Vec> = Vec::with_capacity(users.len()); - let mut locked_ats: Vec>> = Vec::with_capacity(users.len()); - let mut can_request_admins: Vec = Vec::with_capacity(users.len()); - for MasNewUser { - user_id, - username, - created_at, - locked_at, - can_request_admin, - } in users - { - user_ids.push(user_id); - usernames.push(username); - created_ats.push(created_at); - locked_ats.push(locked_at); - can_request_admins.push(can_request_admin); - } + sqlx::query!( + r#" + INSERT INTO syn2mas__users ( + user_id, username, + created_at, locked_at, + can_request_admin, is_guest) + SELECT * FROM UNNEST( + $1::UUID[], $2::TEXT[], + $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], + $5::BOOL[], $6::BOOL[]) + "#, + &user_ids[..], + &usernames[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &locked_ats[..] as &[Option>], + &can_request_admins[..], + &is_guests[..], + ) + .execute(&mut *conn) + .await + .into_database("writing users to MAS")?; - sqlx::query!( - r#" - INSERT INTO syn2mas__users - (user_id, username, created_at, locked_at, can_request_admin) - SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[]) - "#, - &user_ids[..], - &usernames[..], - &created_ats[..], - // We need to override the typing for arrays of optionals (sqlx limitation) - &locked_ats[..] as &[Option>], - &can_request_admins[..], - ).execute(&mut *conn).await.into_database("writing users to MAS")?; - - Ok(()) - })).boxed() + Ok(()) + }) + }) + .boxed() } /// Write a batch of user passwords to the database. @@ -1197,6 +1215,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1221,6 +1240,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1252,6 +1272,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1285,6 +1306,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1319,6 +1341,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1352,6 +1375,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1389,6 +1413,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1438,6 +1463,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap index 62d12ad5a..3bb6d1c07 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap @@ -5,6 +5,7 @@ expression: db_snapshot users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap index 81cb99515..e1c069c2e 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap @@ -23,6 +23,7 @@ compat_sessions: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap index 6e9dd8e14..1e7e95d9e 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap @@ -17,6 +17,7 @@ compat_sessions: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap index 6d0e5b6a9..c4f7d2247 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap @@ -11,6 +11,7 @@ user_emails: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap index 13f8db6a8..4c1253026 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap @@ -12,6 +12,7 @@ user_passwords: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap index 0e7714f96..71ad9efee 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap @@ -30,6 +30,7 @@ compat_sessions: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap index 79805555a..3b70125f8 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap @@ -10,6 +10,7 @@ user_unsupported_third_party_ids: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap index 76393c6ca..821eb9e17 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap @@ -36,6 +36,7 @@ upstream_oauth_providers: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 90e06b43e..dff034ca9 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -687,6 +687,7 @@ fn transform_user( created_at: user.creation_ts.into(), locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()), can_request_admin: bool::from(user.admin), + is_guest: bool::from(user.is_guest), }; let mas_password = user diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 93f7fcf53..5d5d3303f 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -187,8 +187,10 @@ pub struct SynapseUser { pub deactivated: SynapseBool, /// When the user was created pub creation_ts: SecondsTimestamp, - // TODO ... - // TODO is_guest + /// Whether the user is a guest. + /// Note that not all numeric user IDs are guests; guests can upgrade their + /// account! + pub is_guest: SynapseBool, // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) } @@ -335,7 +337,7 @@ impl<'conn> SynapseReader<'conn> { let users: i64 = sqlx::query_scalar( " SELECT COUNT(1) FROM users - WHERE appservice_id IS NULL AND is_guest = 0 + WHERE appservice_id IS NULL ", ) .fetch_one(&mut *self.txn) @@ -361,9 +363,9 @@ impl<'conn> SynapseReader<'conn> { sqlx::query_as( " SELECT - name, password_hash, admin, deactivated, creation_ts + name, password_hash, admin, deactivated, creation_ts, is_guest FROM users - WHERE appservice_id IS NULL AND is_guest = 0 + WHERE appservice_id IS NULL ", ) .fetch(&mut *self.txn) diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap index a1ec760f1..77fb9e347 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap @@ -19,5 +19,8 @@ expression: users creation_ts: SecondsTimestamp( 2018-06-30T21:26:02Z, ), + is_guest: SynapseBool( + false, + ), }, } From dacfa9045ddc995f7ff661c1f4b849a740d5a660 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 30 Jan 2025 17:42:22 +0000 Subject: [PATCH 11/11] Don't return errors when finding rows dependent upon appservice users --- crates/syn2mas/src/migration.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index dff034ca9..f46586c03 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -255,6 +255,9 @@ async fn migrate_threepids( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "user_threepids".to_owned(), user: synapse_user_id, @@ -332,6 +335,9 @@ async fn migrate_external_ids( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "user_external_ids".to_owned(), user: synapse_user_id, @@ -410,6 +416,9 @@ async fn migrate_devices( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "devices".to_owned(), user: synapse_user_id, @@ -499,6 +508,9 @@ async fn migrate_unrefreshable_access_tokens( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "access_tokens".to_owned(), user: synapse_user_id, @@ -606,6 +618,9 @@ async fn migrate_refreshable_token_pairs( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "refresh_tokens".to_owned(), user: synapse_user_id, @@ -705,3 +720,15 @@ fn transform_user( Ok((new_user, mas_password)) } + +/// Returns true if and only if the given localpart looks like it would belong +/// to an application service user. +/// The rule here is that it must start with an underscore. +/// Synapse reserves these by default, but there is no hard rule prohibiting +/// other namespaces from being reserved, so this is not a robust check. +// TODO replace with a more robust mechanism, if we even care about this sanity check +// e.g. read application service registration files. +#[inline] +fn is_likely_appservice(localpart: &str) -> bool { + localpart.starts_with('_') +}