Merge pull request #3926 from element-hq/rei/syn2mas_8_atrtdevs

syn2mas: migrate access tokens, refresh tokens and devices
This commit is contained in:
Quentin Gliech
2025-02-05 11:57:59 +01:00
committed by GitHub
43 changed files with 1640 additions and 95 deletions

1
Cargo.lock generated
View File

@@ -6113,6 +6113,7 @@ dependencies = [
"futures-util",
"insta",
"mas-config",
"mas-storage",
"mas-storage-pg",
"rand",
"serde",

View File

@@ -223,6 +223,7 @@ impl Options {
}
let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
let clock = SystemClock::default();
// TODO is this rng ok?
#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
@@ -233,6 +234,7 @@ impl Options {
&mut reader,
&mut writer,
&mas_matrix.homeserver,
&clock,
&mut rng,
&provider_id_mappings,
)

View File

@@ -72,6 +72,7 @@ pub struct CompatSession {
pub state: CompatSessionState,
pub user_id: Ulid,
pub device: Option<Device>,
pub human_name: Option<String>,
pub user_session_id: Option<Ulid>,
pub created_at: DateTime<Utc>,
pub is_synapse_admin: bool,

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT compat_session_id\n , device_id\n , user_id\n , user_session_id\n , created_at\n , finished_at\n , is_synapse_admin\n , user_agent\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM compat_sessions\n WHERE compat_session_id = $1\n ",
"query": "\n SELECT compat_session_id\n , device_id\n , human_name\n , user_id\n , user_session_id\n , created_at\n , finished_at\n , is_synapse_admin\n , user_agent\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM compat_sessions\n WHERE compat_session_id = $1\n ",
"describe": {
"columns": [
{
@@ -15,41 +15,46 @@
},
{
"ordinal": 2,
"name": "human_name",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"ordinal": 4,
"name": "user_session_id",
"type_info": "Uuid"
},
{
"ordinal": 4,
"ordinal": 5,
"name": "created_at",
"type_info": "Timestamptz"
},
{
"ordinal": 5,
"ordinal": 6,
"name": "finished_at",
"type_info": "Timestamptz"
},
{
"ordinal": 6,
"ordinal": 7,
"name": "is_synapse_admin",
"type_info": "Bool"
},
{
"ordinal": 7,
"ordinal": 8,
"name": "user_agent",
"type_info": "Text"
},
{
"ordinal": 8,
"ordinal": 9,
"name": "last_active_at",
"type_info": "Timestamptz"
},
{
"ordinal": 9,
"ordinal": 10,
"name": "last_active_ip: IpAddr",
"type_info": "Inet"
}
@@ -62,6 +67,7 @@
"nullable": [
false,
true,
true,
false,
true,
false,
@@ -72,5 +78,5 @@
true
]
},
"hash": "bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9"
"hash": "9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE compat_refresh_tokens\n SET consumed_at = $2\n WHERE compat_refresh_token_id = $1\n ",
"query": "\n UPDATE compat_refresh_tokens\n SET consumed_at = $2\n WHERE compat_session_id = $1\n AND consumed_at IS NULL\n ",
"describe": {
"columns": [],
"parameters": {
@@ -11,5 +11,5 @@
},
"nullable": []
},
"hash": "d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4"
"hash": "f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19"
}

View File

@@ -0,0 +1,9 @@
-- Copyright 2025 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
ALTER TABLE compat_sessions
-- Stores a human-readable name for the device.
-- syn2mas behaviour: Will be populated from the device name in Synapse.
ADD COLUMN human_name TEXT;

View File

@@ -0,0 +1,10 @@
-- Copyright 2025 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
ALTER TABLE users
-- Track whether users are guests.
-- Although guest support is not present in MAS yet, syn2mas should import
-- these users and therefore we should track their state.
ADD COLUMN is_guest BOOLEAN NOT NULL DEFAULT FALSE;

View File

@@ -64,6 +64,7 @@ mod priv_ {
pub(super) user_id: Option<Uuid>,
pub(super) scope_list: Option<Vec<String>>,
pub(super) device_id: Option<String>,
pub(super) human_name: Option<String>,
pub(super) created_at: DateTime<Utc>,
pub(super) finished_at: Option<DateTime<Utc>>,
pub(super) is_synapse_admin: Option<bool>,
@@ -91,6 +92,7 @@ impl TryFrom<AppSessionLookup> for AppSession {
user_id,
scope_list,
device_id,
human_name,
created_at,
finished_at,
is_synapse_admin,
@@ -141,6 +143,7 @@ impl TryFrom<AppSessionLookup> for AppSession {
state,
user_id: user_id.into(),
device,
human_name,
user_session_id,
created_at,
is_synapse_admin,
@@ -294,6 +297,7 @@ impl AppSessionRepository for PgAppSessionRepository<'_> {
AppSessionLookupIden::ScopeList,
)
.expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
.expr_as(Expr::cust("NULL"), AppSessionLookupIden::HumanName)
.expr_as(
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
AppSessionLookupIden::CreatedAt,
@@ -343,6 +347,10 @@ impl AppSessionRepository for PgAppSessionRepository<'_> {
Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
AppSessionLookupIden::DeviceId,
)
.expr_as(
Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
AppSessionLookupIden::HumanName,
)
.expr_as(
Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
AppSessionLookupIden::CreatedAt,

View File

@@ -204,16 +204,25 @@ impl CompatRefreshTokenRepository for PgCompatRefreshTokenRepository<'_> {
r#"
UPDATE compat_refresh_tokens
SET consumed_at = $2
WHERE compat_refresh_token_id = $1
WHERE compat_session_id = $1
AND consumed_at IS NULL
"#,
Uuid::from(compat_refresh_token.id),
Uuid::from(compat_refresh_token.session_id),
consumed_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
// This can affect multiple rows in case we've imported refresh tokens
// from Synapse. What we care about is that it at least affected one,
// which is what we're checking here
if res.rows_affected() == 0 {
return Err(DatabaseError::RowsAffected {
expected: 1,
actual: 0,
});
}
let compat_refresh_token = compat_refresh_token
.consume(consumed_at)

View File

@@ -48,6 +48,7 @@ impl<'c> PgCompatSessionRepository<'c> {
struct CompatSessionLookup {
compat_session_id: Uuid,
device_id: Option<String>,
human_name: Option<String>,
user_id: Uuid,
user_session_id: Option<Uuid>,
created_at: DateTime<Utc>,
@@ -85,6 +86,7 @@ impl TryFrom<CompatSessionLookup> for CompatSession {
user_id: value.user_id.into(),
user_session_id: value.user_session_id.map(Ulid::from),
device,
human_name: value.human_name,
created_at: value.created_at,
is_synapse_admin: value.is_synapse_admin,
user_agent: value.user_agent.map(UserAgent::parse),
@@ -101,6 +103,7 @@ impl TryFrom<CompatSessionLookup> for CompatSession {
struct CompatSessionAndSsoLoginLookup {
compat_session_id: Uuid,
device_id: Option<String>,
human_name: Option<String>,
user_id: Uuid,
user_session_id: Option<Uuid>,
created_at: DateTime<Utc>,
@@ -143,6 +146,7 @@ impl TryFrom<CompatSessionAndSsoLoginLookup> for (CompatSession, Option<CompatSs
state,
user_id: value.user_id.into(),
device,
human_name: value.human_name,
user_session_id: value.user_session_id.map(Ulid::from),
created_at: value.created_at,
is_synapse_admin: value.is_synapse_admin,
@@ -286,6 +290,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
r#"
SELECT compat_session_id
, device_id
, human_name
, user_id
, user_session_id
, created_at
@@ -356,6 +361,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
state: CompatSessionState::default(),
user_id: user.id,
device: Some(device),
human_name: None,
user_session_id: browser_session.map(|s| s.id),
created_at,
is_synapse_admin,
@@ -453,6 +459,10 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
CompatSessionAndSsoLoginLookupIden::DeviceId,
)
.expr_as(
Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
CompatSessionAndSsoLoginLookupIden::HumanName,
)
.expr_as(
Expr::col((CompatSessions::Table, CompatSessions::UserId)),
CompatSessionAndSsoLoginLookupIden::UserId,

View File

@@ -43,6 +43,7 @@ pub enum CompatSessions {
CompatSessionId,
UserId,
DeviceId,
HumanName,
UserSessionId,
CreatedAt,
FinishedAt,

View File

@@ -69,7 +69,13 @@ pub trait CompatRefreshTokenRepository: Send + Sync {
token: String,
) -> Result<CompatRefreshToken, Self::Error>;
/// Consume a compat refresh token
/// Consume a compat refresh token.
///
/// This also marks other refresh tokens in the same session as consumed.
/// This is desirable because the syn2mas migration process can import
/// multiple refresh tokens for one device (compat session).
/// But once the user uses one of those, the others should no longer
/// be valid.
///
/// Returns the consumed compat refresh token
///

View File

@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__users (\n user_id, username,\n created_at, locked_at,\n can_request_admin, is_guest)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::TEXT[],\n $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[],\n $5::BOOL[], $6::BOOL[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"TextArray",
"TimestamptzArray",
"TimestamptzArray",
"BoolArray",
"BoolArray"
]
},
"nullable": []
},
"hash": "06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"TextArray",
"TextArray",
"TimestamptzArray",
"BoolArray",
"TimestamptzArray",
"InetArray",
"TextArray"
]
},
"nullable": []
},
"hash": "396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c"
}

View File

@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__compat_refresh_tokens (\n compat_refresh_token_id,\n compat_session_id,\n compat_access_token_id,\n refresh_token,\n created_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::UUID[],\n $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"UuidArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796"
}

View File

@@ -1,18 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"TextArray",
"TimestamptzArray",
"TimestamptzArray",
"BoolArray"
]
},
"nullable": []
},
"hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f"
}

View File

@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__compat_access_tokens (\n compat_access_token_id,\n compat_session_id,\n access_token,\n created_at,\n expires_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::TEXT[],\n $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"TextArray",
"TimestamptzArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7"
}

View File

@@ -28,6 +28,7 @@ uuid = "1.10.0"
ulid = { workspace = true, features = ["uuid"] }
mas-config.workspace = true
mas-storage.workspace = true
[dev-dependencies]
mas-storage-pg.workspace = true

View File

@@ -7,7 +7,7 @@
//!
//! This module is responsible for writing new records to MAS' database.
use std::fmt::Display;
use std::{fmt::Display, net::IpAddr};
use chrono::{DateTime, Utc};
use futures_util::{future::BoxFuture, FutureExt, TryStreamExt};
@@ -199,6 +199,10 @@ pub struct MasNewUser {
pub created_at: DateTime<Utc>,
pub locked_at: Option<DateTime<Utc>>,
pub can_request_admin: bool,
/// Whether the user was a Synapse guest.
/// Although MAS doesn't support guest access, it's still useful to track
/// for the future.
pub is_guest: bool,
}
pub struct MasNewUserPassword {
@@ -230,6 +234,34 @@ pub struct MasNewUpstreamOauthLink {
pub created_at: DateTime<Utc>,
}
pub struct MasNewCompatSession {
pub session_id: Uuid,
pub user_id: Uuid,
pub device_id: Option<String>,
pub human_name: Option<String>,
pub created_at: DateTime<Utc>,
pub is_synapse_admin: bool,
pub last_active_at: Option<DateTime<Utc>>,
pub last_active_ip: Option<IpAddr>,
pub user_agent: Option<String>,
}
pub struct MasNewCompatAccessToken {
pub token_id: Uuid,
pub session_id: Uuid,
pub access_token: String,
pub created_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
}
pub struct MasNewCompatRefreshToken {
pub refresh_token_id: Uuid,
pub session_id: Uuid,
pub access_token_id: Uuid,
pub refresh_token: String,
pub created_at: DateTime<Utc>,
}
/// The 'version' of the password hashing scheme used for passwords when they
/// are migrated from Synapse to MAS.
/// This is version 1, as in the previous syn2mas script.
@@ -243,6 +275,9 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[
"user_emails",
"user_unsupported_third_party_ids",
"upstream_oauth_links",
"compat_sessions",
"compat_access_tokens",
"compat_refresh_tokens",
];
/// Detect whether a syn2mas migration has started on the given database.
@@ -532,52 +567,66 @@ impl<'conn> MasWriter<'conn> {
#[allow(clippy::missing_panics_doc)] // not a real panic
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_users(&mut self, users: Vec<MasNewUser>) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
// `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement
// without having to change the statement SQL thus altering the query plan.
// See <https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-bind-an-array-to-a-values-clause-how-can-i-do-bulk-inserts>.
// In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement,
// which is allegedly the best for insert performance, but is less simple to encode.
if users.is_empty() {
return Ok(());
}
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
// `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows
// in one statement without having to change the statement
// SQL thus altering the query plan. See <https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-bind-an-array-to-a-values-clause-how-can-i-do-bulk-inserts>.
// In the future we could consider using sqlx's support for `PgCopyIn` / the
// `COPY FROM STDIN` statement, which is allegedly the best
// for insert performance, but is less simple to encode.
let mut user_ids: Vec<Uuid> = Vec::with_capacity(users.len());
let mut usernames: Vec<String> = Vec::with_capacity(users.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(users.len());
let mut locked_ats: Vec<Option<DateTime<Utc>>> =
Vec::with_capacity(users.len());
let mut can_request_admins: Vec<bool> = Vec::with_capacity(users.len());
let mut is_guests: Vec<bool> = Vec::with_capacity(users.len());
for MasNewUser {
user_id,
username,
created_at,
locked_at,
can_request_admin,
is_guest,
} in users
{
user_ids.push(user_id);
usernames.push(username);
created_ats.push(created_at);
locked_ats.push(locked_at);
can_request_admins.push(can_request_admin);
is_guests.push(is_guest);
}
let mut user_ids: Vec<Uuid> = Vec::with_capacity(users.len());
let mut usernames: Vec<String> = Vec::with_capacity(users.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(users.len());
let mut locked_ats: Vec<Option<DateTime<Utc>>> = Vec::with_capacity(users.len());
let mut can_request_admins: Vec<bool> = Vec::with_capacity(users.len());
for MasNewUser {
user_id,
username,
created_at,
locked_at,
can_request_admin,
} in users
{
user_ids.push(user_id);
usernames.push(username);
created_ats.push(created_at);
locked_ats.push(locked_at);
can_request_admins.push(can_request_admin);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__users (
user_id, username,
created_at, locked_at,
can_request_admin, is_guest)
SELECT * FROM UNNEST(
$1::UUID[], $2::TEXT[],
$3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[],
$5::BOOL[], $6::BOOL[])
"#,
&user_ids[..],
&usernames[..],
&created_ats[..],
// We need to override the typing for arrays of optionals (sqlx limitation)
&locked_ats[..] as &[Option<DateTime<Utc>>],
&can_request_admins[..],
&is_guests[..],
)
.execute(&mut *conn)
.await
.into_database("writing users to MAS")?;
sqlx::query!(
r#"
INSERT INTO syn2mas__users
(user_id, username, created_at, locked_at, can_request_admin)
SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])
"#,
&user_ids[..],
&usernames[..],
&created_ats[..],
// We need to override the typing for arrays of optionals (sqlx limitation)
&locked_ats[..] as &[Option<DateTime<Utc>>],
&can_request_admins[..],
).execute(&mut *conn).await.into_database("writing users to MAS")?;
Ok(())
})).boxed()
Ok(())
})
})
.boxed()
}
/// Write a batch of user passwords to the database.
@@ -761,6 +810,207 @@ impl<'conn> MasWriter<'conn> {
})
}).boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_compat_sessions(
&mut self,
sessions: Vec<MasNewCompatSession>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut session_ids: Vec<Uuid> = Vec::with_capacity(sessions.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(sessions.len());
let mut device_ids: Vec<Option<String>> = Vec::with_capacity(sessions.len());
let mut human_names: Vec<Option<String>> = Vec::with_capacity(sessions.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(sessions.len());
let mut is_synapse_admins: Vec<bool> = Vec::with_capacity(sessions.len());
let mut last_active_ats: Vec<Option<DateTime<Utc>>> =
Vec::with_capacity(sessions.len());
let mut last_active_ips: Vec<Option<IpAddr>> =
Vec::with_capacity(sessions.len());
let mut user_agents: Vec<Option<String>> = Vec::with_capacity(sessions.len());
for MasNewCompatSession {
session_id,
user_id,
device_id,
human_name,
created_at,
is_synapse_admin,
last_active_at,
last_active_ip,
user_agent,
} in sessions
{
session_ids.push(session_id);
user_ids.push(user_id);
device_ids.push(device_id);
human_names.push(human_name);
created_ats.push(created_at);
is_synapse_admins.push(is_synapse_admin);
last_active_ats.push(last_active_at);
last_active_ips.push(last_active_ip);
user_agents.push(user_agent);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__compat_sessions (
compat_session_id, user_id,
device_id, human_name,
created_at, is_synapse_admin,
last_active_at, last_active_ip,
user_agent)
SELECT * FROM UNNEST(
$1::UUID[], $2::UUID[],
$3::TEXT[], $4::TEXT[],
$5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],
$7::TIMESTAMP WITH TIME ZONE[], $8::INET[],
$9::TEXT[])
"#,
&session_ids[..],
&user_ids[..],
&device_ids[..] as &[Option<String>],
&human_names[..] as &[Option<String>],
&created_ats[..],
&is_synapse_admins[..],
// We need to override the typing for arrays of optionals (sqlx limitation)
&last_active_ats[..] as &[Option<DateTime<Utc>>],
&last_active_ips[..] as &[Option<IpAddr>],
&user_agents[..] as &[Option<String>],
)
.execute(&mut *conn)
.await
.into_database("writing compat sessions to MAS")?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_compat_access_tokens(
&mut self,
tokens: Vec<MasNewCompatAccessToken>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut token_ids: Vec<Uuid> = Vec::with_capacity(tokens.len());
let mut session_ids: Vec<Uuid> = Vec::with_capacity(tokens.len());
let mut access_tokens: Vec<String> = Vec::with_capacity(tokens.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(tokens.len());
let mut expires_ats: Vec<Option<DateTime<Utc>>> =
Vec::with_capacity(tokens.len());
for MasNewCompatAccessToken {
token_id,
session_id,
access_token,
created_at,
expires_at,
} in tokens
{
token_ids.push(token_id);
session_ids.push(session_id);
access_tokens.push(access_token);
created_ats.push(created_at);
expires_ats.push(expires_at);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__compat_access_tokens (
compat_access_token_id,
compat_session_id,
access_token,
created_at,
expires_at)
SELECT * FROM UNNEST(
$1::UUID[],
$2::UUID[],
$3::TEXT[],
$4::TIMESTAMP WITH TIME ZONE[],
$5::TIMESTAMP WITH TIME ZONE[])
"#,
&token_ids[..],
&session_ids[..],
&access_tokens[..],
&created_ats[..],
// We need to override the typing for arrays of optionals (sqlx limitation)
&expires_ats[..] as &[Option<DateTime<Utc>>],
)
.execute(&mut *conn)
.await
.into_database("writing compat access tokens to MAS")?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_compat_refresh_tokens(
&mut self,
tokens: Vec<MasNewCompatRefreshToken>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut refresh_token_ids: Vec<Uuid> = Vec::with_capacity(tokens.len());
let mut session_ids: Vec<Uuid> = Vec::with_capacity(tokens.len());
let mut access_token_ids: Vec<Uuid> = Vec::with_capacity(tokens.len());
let mut refresh_tokens: Vec<String> = Vec::with_capacity(tokens.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(tokens.len());
for MasNewCompatRefreshToken {
refresh_token_id,
session_id,
access_token_id,
refresh_token,
created_at,
} in tokens
{
refresh_token_ids.push(refresh_token_id);
session_ids.push(session_id);
access_token_ids.push(access_token_id);
refresh_tokens.push(refresh_token);
created_ats.push(created_at);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__compat_refresh_tokens (
compat_refresh_token_id,
compat_session_id,
compat_access_token_id,
refresh_token,
created_at)
SELECT * FROM UNNEST(
$1::UUID[],
$2::UUID[],
$3::UUID[],
$4::TEXT[],
$5::TIMESTAMP WITH TIME ZONE[])
"#,
&refresh_token_ids[..],
&session_ids[..],
&access_token_ids[..],
&refresh_tokens[..],
&created_ats[..],
)
.execute(&mut *conn)
.await
.into_database("writing compat refresh tokens to MAS")?;
Ok(())
})
})
.boxed()
}
}
// How many entries to buffer at once, before writing a batch of rows to the
@@ -839,6 +1089,7 @@ mod test {
use crate::{
mas_writer::{
MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword,
},
@@ -964,6 +1215,7 @@ mod test {
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
@@ -988,6 +1240,7 @@ mod test {
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
@@ -1019,6 +1272,7 @@ mod test {
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
@@ -1052,6 +1306,7 @@ mod test {
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
@@ -1086,6 +1341,7 @@ mod test {
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
@@ -1105,4 +1361,152 @@ mod test {
assert_db_snapshot!(&mut conn);
}
/// Tests writing a single user, with a device (compat session).
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_write_user_with_device(pool: PgPool) {
let mut conn = pool.acquire().await.unwrap();
let mut writer = make_mas_writer(&pool, &mut conn).await;
writer
.write_users(vec![MasNewUser {
user_id: Uuid::from_u128(1u128),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
writer
.write_compat_sessions(vec![MasNewCompatSession {
user_id: Uuid::from_u128(1u128),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: Some("alice's pinephone".to_owned()),
is_synapse_admin: true,
last_active_at: Some(DateTime::default()),
last_active_ip: Some("203.0.113.1".parse().unwrap()),
user_agent: Some("Browser/5.0".to_owned()),
}])
.await
.expect("failed to write compat session");
writer.finish().await.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
/// Tests writing a single user, with a device and an access token.
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_write_user_with_access_token(pool: PgPool) {
let mut conn = pool.acquire().await.unwrap();
let mut writer = make_mas_writer(&pool, &mut conn).await;
writer
.write_users(vec![MasNewUser {
user_id: Uuid::from_u128(1u128),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
writer
.write_compat_sessions(vec![MasNewCompatSession {
user_id: Uuid::from_u128(1u128),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: None,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
}])
.await
.expect("failed to write compat session");
writer
.write_compat_access_tokens(vec![MasNewCompatAccessToken {
token_id: Uuid::from_u128(6u128),
session_id: Uuid::from_u128(5u128),
access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
expires_at: None,
}])
.await
.expect("failed to write access token");
writer.finish().await.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
/// Tests writing a single user, with a device, an access token and a
/// refresh token.
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_write_user_with_refresh_token(pool: PgPool) {
let mut conn = pool.acquire().await.unwrap();
let mut writer = make_mas_writer(&pool, &mut conn).await;
writer
.write_users(vec![MasNewUser {
user_id: Uuid::from_u128(1u128),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
can_request_admin: false,
is_guest: false,
}])
.await
.expect("failed to write user");
writer
.write_compat_sessions(vec![MasNewCompatSession {
user_id: Uuid::from_u128(1u128),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: None,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
}])
.await
.expect("failed to write compat session");
writer
.write_compat_access_tokens(vec![MasNewCompatAccessToken {
token_id: Uuid::from_u128(6u128),
session_id: Uuid::from_u128(5u128),
access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
expires_at: None,
}])
.await
.expect("failed to write access token");
writer
.write_compat_refresh_tokens(vec![MasNewCompatRefreshToken {
refresh_token_id: Uuid::from_u128(7u128),
session_id: Uuid::from_u128(5u128),
access_token_id: Uuid::from_u128(6u128),
refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
}])
.await
.expect("failed to write refresh token");
writer.finish().await.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
}

View File

@@ -5,6 +5,7 @@ expression: db_snapshot
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001

View File

@@ -0,0 +1,30 @@
---
source: crates/syn2mas/src/mas_writer/mod.rs
expression: db_snapshot
---
compat_access_tokens:
- access_token: syt_zxcvzxcvzxcvzxcv_zxcv
compat_access_token_id: 00000000-0000-0000-0000-000000000006
compat_session_id: 00000000-0000-0000-0000-000000000005
created_at: "1970-01-01 00:00:00+00"
expires_at: ~
compat_sessions:
- compat_session_id: 00000000-0000-0000-0000-000000000005
created_at: "1970-01-01 00:00:00+00"
device_id: ADEVICE
finished_at: ~
human_name: ~
is_synapse_admin: "false"
last_active_at: ~
last_active_ip: ~
user_agent: ~
user_id: 00000000-0000-0000-0000-000000000001
user_session_id: ~
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001
username: alice

View File

@@ -0,0 +1,24 @@
---
source: crates/syn2mas/src/mas_writer/mod.rs
expression: db_snapshot
---
compat_sessions:
- compat_session_id: 00000000-0000-0000-0000-000000000005
created_at: "1970-01-01 00:00:00+00"
device_id: ADEVICE
finished_at: ~
human_name: "alice's pinephone"
is_synapse_admin: "true"
last_active_at: "1970-01-01 00:00:00+00"
last_active_ip: 203.0.113.1/32
user_agent: Browser/5.0
user_id: 00000000-0000-0000-0000-000000000001
user_session_id: ~
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001
username: alice

View File

@@ -11,6 +11,7 @@ user_emails:
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001

View File

@@ -12,6 +12,7 @@ user_passwords:
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001

View File

@@ -0,0 +1,37 @@
---
source: crates/syn2mas/src/mas_writer/mod.rs
expression: db_snapshot
---
compat_access_tokens:
- access_token: syt_zxcvzxcvzxcvzxcv_zxcv
compat_access_token_id: 00000000-0000-0000-0000-000000000006
compat_session_id: 00000000-0000-0000-0000-000000000005
created_at: "1970-01-01 00:00:00+00"
expires_at: ~
compat_refresh_tokens:
- compat_access_token_id: 00000000-0000-0000-0000-000000000006
compat_refresh_token_id: 00000000-0000-0000-0000-000000000007
compat_session_id: 00000000-0000-0000-0000-000000000005
consumed_at: ~
created_at: "1970-01-01 00:00:00+00"
refresh_token: syr_zxcvzxcvzxcvzxcv_zxcv
compat_sessions:
- compat_session_id: 00000000-0000-0000-0000-000000000005
created_at: "1970-01-01 00:00:00+00"
device_id: ADEVICE
finished_at: ~
human_name: ~
is_synapse_admin: "false"
last_active_at: ~
last_active_ip: ~
user_agent: ~
user_id: 00000000-0000-0000-0000-000000000001
user_session_id: ~
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001
username: alice

View File

@@ -10,6 +10,7 @@ user_unsupported_third_party_ids:
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001

View File

@@ -36,6 +36,7 @@ upstream_oauth_providers:
users:
- can_request_admin: "false"
created_at: "1970-01-01 00:00:00+00"
is_guest: "false"
locked_at: ~
primary_user_email_id: ~
user_id: 00000000-0000-0000-0000-000000000001

View File

@@ -13,3 +13,6 @@ ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords;
ALTER TABLE syn2mas__user_emails RENAME TO user_emails;
ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids;
ALTER TABLE syn2mas__upstream_oauth_links RENAME TO upstream_oauth_links;
ALTER TABLE syn2mas__compat_sessions RENAME TO compat_sessions;
ALTER TABLE syn2mas__compat_access_tokens RENAME TO compat_access_tokens;
ALTER TABLE syn2mas__compat_refresh_tokens RENAME TO compat_refresh_tokens;

View File

@@ -42,3 +42,6 @@ ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords;
ALTER TABLE user_emails RENAME TO syn2mas__user_emails;
ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids;
ALTER TABLE upstream_oauth_links RENAME TO syn2mas__upstream_oauth_links;
ALTER TABLE compat_sessions RENAME TO syn2mas__compat_sessions;
ALTER TABLE compat_access_tokens RENAME TO syn2mas__compat_access_tokens;
ALTER TABLE compat_refresh_tokens RENAME TO syn2mas__compat_refresh_tokens;

View File

@@ -11,11 +11,15 @@
//! This module does not implement any of the safety checks that should be run
//! *before* the migration.
use std::{collections::HashMap, pin::pin};
use std::{
collections::{HashMap, HashSet},
pin::pin,
};
use chrono::{DateTime, Utc};
use compact_str::CompactString;
use futures_util::StreamExt as _;
use mas_storage::Clock;
use rand::RngCore;
use thiserror::Error;
use thiserror_ext::ContextInto;
@@ -25,11 +29,13 @@ use uuid::Uuid;
use crate::{
mas_writer::{
self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword, MasWriteBuffer, MasWriter,
},
synapse_reader::{
self, ExtractLocalpartError, FullUserId, SynapseExternalId, SynapseThreepid, SynapseUser,
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
},
SynapseReader,
};
@@ -66,6 +72,9 @@ pub enum Error {
struct UsersMigrated {
/// Lookup table from user localpart to that user's UUID in MAS.
user_localparts_to_uuid: HashMap<CompactString, Uuid>,
/// Set of user UUIDs that correspond to Synapse admins
synapse_admins: HashSet<Uuid>,
}
/// Performs a migration from Synapse's database to MAS' database.
@@ -85,6 +94,7 @@ pub async fn migrate(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,
provider_id_mapping: &HashMap<String, Uuid>,
) -> Result<(), Error> {
@@ -96,7 +106,7 @@ pub async fn migrate(
counts
.users
.try_into()
.expect("More than usize::MAX users — wow!"),
.expect("More than usize::MAX users — unable to handle this many!"),
server_name,
rng,
)
@@ -121,6 +131,48 @@ pub async fn migrate(
)
.await?;
// `(MAS user_id, device_id)` mapped to `compat_session` ULID
let mut devices_to_compat_sessions: HashMap<(Uuid, CompactString), Uuid> =
HashMap::with_capacity(
counts
.devices
.try_into()
.expect("More than usize::MAX devices — unable to handle this many!"),
);
migrate_unrefreshable_access_tokens(
synapse,
mas,
server_name,
clock,
rng,
&migrated_users.user_localparts_to_uuid,
&mut devices_to_compat_sessions,
)
.await?;
migrate_refreshable_token_pairs(
synapse,
mas,
server_name,
clock,
rng,
&migrated_users.user_localparts_to_uuid,
&mut devices_to_compat_sessions,
)
.await?;
migrate_devices(
synapse,
mas,
server_name,
rng,
&migrated_users.user_localparts_to_uuid,
&mut devices_to_compat_sessions,
&migrated_users.synapse_admins,
)
.await?;
Ok(())
}
@@ -137,11 +189,19 @@ async fn migrate_users(
let mut users_stream = pin!(synapse.read_users());
// TODO is 1:1 capacity enough for a hashmap?
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint);
let mut synapse_admins = HashSet::new();
while let Some(user_res) = users_stream.next().await {
let user = user_res.into_synapse("reading user")?;
let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?;
if bool::from(user.admin) {
// Note down the fact that this user is a Synapse admin,
// because we will grant their existing devices the Synapse admin
// flag
synapse_admins.insert(mas_user.user_id);
}
user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id);
user_buffer
@@ -165,6 +225,7 @@ async fn migrate_users(
Ok(UsersMigrated {
user_localparts_to_uuid,
synapse_admins,
})
}
@@ -194,6 +255,9 @@ async fn migrate_threepids(
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
if is_likely_appservice(&username) {
continue;
}
return Err(Error::MissingUserFromDependentTable {
table: "user_threepids".to_owned(),
user: synapse_user_id,
@@ -271,6 +335,9 @@ async fn migrate_external_ids(
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
if is_likely_appservice(&username) {
continue;
}
return Err(Error::MissingUserFromDependentTable {
table: "user_external_ids".to_owned(),
user: synapse_user_id,
@@ -313,6 +380,308 @@ async fn migrate_external_ids(
Ok(())
}
/// Migrate devices from Synapse to MAS (as compat sessions).
///
/// In order to get the right session creation timestamps, the access tokens
/// must counterintuitively be migrated first, with the ULIDs passed in as
/// `devices`.
///
/// This is because only access tokens store a timestamp that in any way
/// resembles a creation timestamp.
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_devices(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
synapse_admins: &HashSet<Uuid>,
) -> Result<(), Error> {
let mut devices_stream = pin!(synapse.read_devices());
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
while let Some(device_res) = devices_stream.next().await {
let SynapseDevice {
user_id: synapse_user_id,
device_id,
display_name,
last_seen,
ip,
user_agent,
} = device_res.into_synapse("reading Synapse device")?;
let username = synapse_user_id
.extract_localpart(server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
if is_likely_appservice(&username) {
continue;
}
return Err(Error::MissingUserFromDependentTable {
table: "devices".to_owned(),
user: synapse_user_id,
});
};
let session_id = *devices
.entry((user_id, CompactString::new(&device_id)))
.or_insert_with(||
// We don't have a creation time for this device (as it has no access token),
// so use now as a least-evil fallback.
Ulid::with_source(rng).into());
let created_at = Ulid::from(session_id).datetime().into();
// As we're using a real IP type in the MAS database, it is possible
// that we encounter invalid IP addresses in the Synapse database.
// In that case, we should ignore them, but still log a warning.
let last_active_ip = ip.and_then(|ip| {
ip.parse()
.map_err(|e| {
tracing::warn!(
error = &e as &dyn std::error::Error,
mxid = %synapse_user_id,
%device_id,
%ip,
"Failed to parse device IP, ignoring"
);
})
.ok()
});
// TODO skip access tokens for deactivated users
write_buffer
.write(
mas,
MasNewCompatSession {
session_id,
user_id,
device_id: Some(device_id),
human_name: display_name,
created_at,
is_synapse_admin: synapse_admins.contains(&user_id),
last_active_at: last_seen.map(DateTime::from),
last_active_ip,
user_agent,
},
)
.await
.into_mas("writing compat sessions")?;
}
write_buffer
.finish(mas)
.await
.into_mas("writing compat sessions")?;
Ok(())
}
/// Migrates unrefreshable access tokens (those without an associated refresh
/// token). Some of these may be deviceless.
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_unrefreshable_access_tokens(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
) -> Result<(), Error> {
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
while let Some(token_res) = token_stream.next().await {
let SynapseAccessToken {
user_id: synapse_user_id,
device_id,
token,
valid_until_ms,
last_validated,
} = token_res.into_synapse("reading Synapse access token")?;
let username = synapse_user_id
.extract_localpart(server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
if is_likely_appservice(&username) {
continue;
}
return Err(Error::MissingUserFromDependentTable {
table: "access_tokens".to_owned(),
user: synapse_user_id,
});
};
// It's not always accurate, but last_validated is *often* the creation time of
// the device If we don't have one, then use the current time as a
// fallback.
let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from);
let session_id = if let Some(device_id) = device_id {
// Use the existing device_id if this is the second token for a device
*devices
.entry((user_id, CompactString::new(&device_id)))
.or_insert_with(|| {
Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng))
})
} else {
// If this is a deviceless access token, create a deviceless compat session
// for it (since otherwise we won't create one whilst migrating devices)
let deviceless_session_id =
Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
deviceless_session_write_buffer
.write(
mas,
MasNewCompatSession {
session_id: deviceless_session_id,
user_id,
device_id: None,
human_name: None,
created_at,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
},
)
.await
.into_mas("failed to write deviceless compat sessions")?;
deviceless_session_id
};
let token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
// TODO skip access tokens for deactivated users
write_buffer
.write(
mas,
MasNewCompatAccessToken {
token_id,
session_id,
access_token: token,
created_at,
expires_at: valid_until_ms.map(DateTime::from),
},
)
.await
.into_mas("writing compat access tokens")?;
}
write_buffer
.finish(mas)
.await
.into_mas("writing compat access tokens")?;
deviceless_session_write_buffer
.finish(mas)
.await
.into_mas("writing deviceless compat sessions")?;
Ok(())
}
/// Migrates (access token, refresh token) pairs.
/// Does not migrate non-refreshable access tokens.
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_refreshable_token_pairs(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
) -> Result<(), Error> {
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
let mut refresh_token_write_buffer =
MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens);
while let Some(token_res) = token_stream.next().await {
let SynapseRefreshableTokenPair {
user_id: synapse_user_id,
device_id,
access_token,
refresh_token,
valid_until_ms,
last_validated,
} = token_res.into_synapse("reading Synapse refresh token")?;
let username = synapse_user_id
.extract_localpart(server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
if is_likely_appservice(&username) {
continue;
}
return Err(Error::MissingUserFromDependentTable {
table: "refresh_tokens".to_owned(),
user: synapse_user_id,
});
};
// It's not always accurate, but last_validated is *often* the creation time of
// the device If we don't have one, then use the current time as a
// fallback.
let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from);
// Use the existing device_id if this is the second token for a device
let session_id = *devices
.entry((user_id, CompactString::new(&device_id)))
.or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)));
let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
// TODO skip access tokens for deactivated users
access_token_write_buffer
.write(
mas,
MasNewCompatAccessToken {
token_id: access_token_id,
session_id,
access_token,
created_at,
expires_at: valid_until_ms.map(DateTime::from),
},
)
.await
.into_mas("writing compat access tokens")?;
refresh_token_write_buffer
.write(
mas,
MasNewCompatRefreshToken {
refresh_token_id,
session_id,
access_token_id,
refresh_token,
created_at,
},
)
.await
.into_mas("writing compat refresh tokens")?;
}
access_token_write_buffer
.finish(mas)
.await
.into_mas("writing compat access tokens")?;
refresh_token_write_buffer
.finish(mas)
.await
.into_mas("writing compat refresh tokens")?;
Ok(())
}
fn transform_user(
user: &SynapseUser,
server_name: &str,
@@ -333,6 +702,7 @@ fn transform_user(
created_at: user.creation_ts.into(),
locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),
can_request_admin: bool::from(user.admin),
is_guest: bool::from(user.is_guest),
};
let mas_password = user
@@ -350,3 +720,15 @@ fn transform_user(
Ok((new_user, mas_password))
}
/// Returns true if and only if the given localpart looks like it would belong
/// to an application service user.
/// The rule here is that it must start with an underscore.
/// Synapse reserves these by default, but there is no hard rule prohibiting
/// other namespaces from being reserved, so this is not a robust check.
// TODO replace with a more robust mechanism, if we even care about this sanity check
// e.g. read application service registration files.
#[inline]
fn is_likely_appservice(localpart: &str) -> bool {
localpart.starts_with('_')
}

View File

@@ -0,0 +1,14 @@
INSERT INTO access_tokens
(
id,
user_id,
device_id,
token
)
VALUES
(
42,
'@alice:example.com',
'ADEVICE',
'syt_aaaaaaaaaaaaaa_aaaa'
);

View File

@@ -0,0 +1,16 @@
INSERT INTO access_tokens
(
id,
user_id,
device_id,
token,
puppets_user_id
)
VALUES
(
42,
'@alice:example.com',
NULL,
'syt_pupupupupup_eett',
'@bob:example.com'
);

View File

@@ -0,0 +1,56 @@
INSERT INTO access_tokens
(
id,
user_id,
device_id,
token,
refresh_token_id,
used
)
VALUES
(
42,
'@alice:example.com',
'ADEVICE',
'syt_aaaaaaaaaaaaaa_aaaa',
7,
TRUE
),
(
43,
'@alice:example.com',
'ADEVICE',
'syt_AAAAAAAAAAAAAA_AAAA',
8,
TRUE
);
INSERT INTO refresh_tokens
(
id,
user_id,
device_id,
token,
next_token_id,
expiry_ts,
ultimate_session_expiry_ts
)
VALUES
(
7,
'@alice:example.com',
'ADEVICE',
'syr_bbbbbbbbbbbbb_bbbb',
8,
1738096199000,
1778096199000
),
(
8,
'@alice:example.com',
'ADEVICE',
'syr_cccccccccccc_cccc',
NULL,
1748096199000,
1778096199000
);

View File

@@ -0,0 +1,56 @@
INSERT INTO access_tokens
(
id,
user_id,
device_id,
token,
refresh_token_id,
used
)
VALUES
(
42,
'@alice:example.com',
'ADEVICE',
'syt_aaaaaaaaaaaaaa_aaaa',
7,
TRUE
),
(
43,
'@alice:example.com',
'ADEVICE',
'syt_AAAAAAAAAAAAAA_AAAA',
8,
FALSE
);
INSERT INTO refresh_tokens
(
id,
user_id,
device_id,
token,
next_token_id,
expiry_ts,
ultimate_session_expiry_ts
)
VALUES
(
7,
'@alice:example.com',
'ADEVICE',
'syr_bbbbbbbbbbbbb_bbbb',
8,
1738096199000,
1778096199000
),
(
8,
'@alice:example.com',
'ADEVICE',
'syr_cccccccccccc_cccc',
NULL,
1748096199000,
1778096199000
);

View File

@@ -0,0 +1,38 @@
INSERT INTO devices
(
user_id,
device_id,
display_name,
last_seen,
ip,
user_agent,
hidden
)
VALUES
(
'@alice:example.com',
'ADEVICE',
'Matrix Console',
1623366000000,
'203.0.113.1',
'Browser/5.0 (X12; ComputerOS 64; rv:1024.0)',
FALSE
),
(
'@alice:example.com',
'master signing key',
NULL,
NULL,
NULL,
NULL,
TRUE
),
(
'@alice:example.com',
'self_signing signing key',
NULL,
NULL,
NULL,
NULL,
TRUE
);

View File

@@ -12,7 +12,7 @@ use std::fmt::Display;
use chrono::{DateTime, Utc};
use futures_util::{Stream, TryStreamExt};
use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Row, Transaction, Type};
use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Transaction, Type};
use thiserror::Error;
use thiserror_ext::ContextInto;
@@ -187,8 +187,10 @@ pub struct SynapseUser {
pub deactivated: SynapseBool,
/// When the user was created
pub creation_ts: SecondsTimestamp,
// TODO ...
// TODO is_guest
/// Whether the user is a guest.
/// Note that not all numeric user IDs are guests; guests can upgrade their
/// account!
pub is_guest: SynapseBool,
// TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts)
}
@@ -209,19 +211,59 @@ pub struct SynapseExternalId {
pub external_id: String,
}
/// Row of the `devices` table in Synapse.
#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)]
pub struct SynapseDevice {
pub user_id: FullUserId,
pub device_id: String,
pub display_name: Option<String>,
pub last_seen: Option<MillisecondsTimestamp>,
pub ip: Option<String>,
pub user_agent: Option<String>,
}
/// Row of the `access_tokens` table in Synapse.
#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)]
pub struct SynapseAccessToken {
pub user_id: FullUserId,
pub device_id: Option<String>,
pub token: String,
pub valid_until_ms: Option<MillisecondsTimestamp>,
pub last_validated: Option<MillisecondsTimestamp>,
}
/// Row of the `refresh_tokens` table in Synapse.
#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)]
pub struct SynapseRefreshableTokenPair {
pub user_id: FullUserId,
pub device_id: String,
pub access_token: String,
pub refresh_token: String,
pub valid_until_ms: Option<MillisecondsTimestamp>,
pub last_validated: Option<MillisecondsTimestamp>,
}
/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on.
///
/// This is a safety measure against other processes changing the data
/// underneath our feet. It's still not a good idea to run Synapse at the same
/// time as the migration.
// TODO not complete!
const TABLES_TO_LOCK: &[&str] = &["users", "user_threepids", "user_external_ids"];
const TABLES_TO_LOCK: &[&str] = &[
"users",
"user_threepids",
"user_external_ids",
"devices",
"access_tokens",
"refresh_tokens",
];
/// Number of migratable rows in various Synapse tables.
/// Used to estimate progress.
#[derive(Clone, Debug)]
pub struct SynapseRowCounts {
pub users: i64,
pub devices: i64,
}
pub struct SynapseReader<'c> {
@@ -292,19 +334,27 @@ impl<'conn> SynapseReader<'conn> {
///
/// - An underlying database error
pub async fn count_rows(&mut self) -> Result<SynapseRowCounts, Error> {
let users = sqlx::query(
let users: i64 = sqlx::query_scalar(
"
SELECT COUNT(1) FROM users
WHERE appservice_id IS NULL AND is_guest = 0
WHERE appservice_id IS NULL
",
)
.fetch_one(&mut *self.txn)
.await
.into_database("counting Synapse users")?
.try_get::<i64, _>(0)
.into_database("couldn't decode count of Synapse users table")?;
.into_database("counting Synapse users")?;
Ok(SynapseRowCounts { users })
let devices = sqlx::query_scalar(
"
SELECT COUNT(1) FROM devices
WHERE NOT hidden
",
)
.fetch_one(&mut *self.txn)
.await
.into_database("counting Synapse devices")?;
Ok(SynapseRowCounts { users, devices })
}
/// Reads Synapse users, excluding application service users (which do not
@@ -313,9 +363,9 @@ impl<'conn> SynapseReader<'conn> {
sqlx::query_as(
"
SELECT
name, password_hash, admin, deactivated, creation_ts
name, password_hash, admin, deactivated, creation_ts, is_guest
FROM users
WHERE appservice_id IS NULL AND is_guest = 0
WHERE appservice_id IS NULL
",
)
.fetch(&mut *self.txn)
@@ -350,6 +400,66 @@ impl<'conn> SynapseReader<'conn> {
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse user external IDs"))
}
/// Reads devices from the Synapse database.
/// Does not include so-called 'hidden' devices, which are just a mechanism
/// for storing various signing keys shared between the real devices.
pub fn read_devices(&mut self) -> impl Stream<Item = Result<SynapseDevice, Error>> + '_ {
sqlx::query_as(
"
SELECT
user_id, device_id, display_name, last_seen, ip, user_agent
FROM devices
WHERE NOT hidden
",
)
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse devices"))
}
/// Reads unrefreshable access tokens from the Synapse database.
/// This does not include access tokens used for puppetting users, as those
/// are not supported by MAS.
pub fn read_unrefreshable_access_tokens(
&mut self,
) -> impl Stream<Item = Result<SynapseAccessToken, Error>> + '_ {
sqlx::query_as(
"
SELECT
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
FROM access_tokens at0
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
",
)
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse access tokens"))
}
/// Reads (access token, refresh token) pairs from the Synapse database.
/// This does not include token pairs which have been made obsolete
/// by using the refresh token and then acknowledging the
/// successor access token by using it to authenticate a request.
///
/// The `expiry_ts` and `ultimate_session_expiry_ts` columns are ignored as
/// they are not implemented in MAS.
/// Further, they are unused by any real-world deployment to the best of
/// our knowledge.
pub fn read_refreshable_token_pairs(
&mut self,
) -> impl Stream<Item = Result<SynapseRefreshableTokenPair, Error>> + '_ {
sqlx::query_as(
"
SELECT
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
FROM refresh_tokens rt0
LEFT JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
WHERE NOT at1.used OR at1.used IS NULL
",
)
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse refresh tokens"))
}
}
#[cfg(test)]
@@ -361,7 +471,10 @@ mod test {
use sqlx::{migrate::Migrator, PgPool};
use crate::{
synapse_reader::{SynapseExternalId, SynapseThreepid, SynapseUser},
synapse_reader::{
SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair,
SynapseThreepid, SynapseUser,
},
SynapseReader,
};
@@ -415,4 +528,114 @@ mod test {
assert_debug_snapshot!(external_ids);
}
#[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "devices_alice"))]
async fn test_read_devices(pool: PgPool) {
let mut conn = pool.acquire().await.expect("failed to get connection");
let mut reader = SynapseReader::new(&mut conn, false)
.await
.expect("failed to make SynapseReader");
let devices: BTreeSet<SynapseDevice> = reader
.read_devices()
.try_collect()
.await
.expect("failed to read Synapse devices");
assert_debug_snapshot!(devices);
}
#[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "access_token_alice"))]
async fn test_read_access_token(pool: PgPool) {
let mut conn = pool.acquire().await.expect("failed to get connection");
let mut reader = SynapseReader::new(&mut conn, false)
.await
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
assert_debug_snapshot!(access_tokens);
}
/// Tests that puppetting access tokens are ignored.
#[sqlx::test(
migrator = "MIGRATOR",
fixtures("user_alice", "access_token_alice_with_puppet")
)]
async fn test_read_access_token_puppet(pool: PgPool) {
let mut conn = pool.acquire().await.expect("failed to get connection");
let mut reader = SynapseReader::new(&mut conn, false)
.await
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
assert!(access_tokens.is_empty());
}
#[sqlx::test(
migrator = "MIGRATOR",
fixtures("user_alice", "access_token_alice_with_refresh_token")
)]
async fn test_read_access_and_refresh_tokens(pool: PgPool) {
let mut conn = pool.acquire().await.expect("failed to get connection");
let mut reader = SynapseReader::new(&mut conn, false)
.await
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
let refresh_tokens: BTreeSet<SynapseRefreshableTokenPair> = reader
.read_refreshable_token_pairs()
.try_collect()
.await
.expect("failed to read Synapse refresh tokens");
assert!(
access_tokens.is_empty(),
"there are no unrefreshable access tokens"
);
assert_debug_snapshot!(refresh_tokens);
}
#[sqlx::test(
migrator = "MIGRATOR",
fixtures("user_alice", "access_token_alice_with_unused_refresh_token")
)]
async fn test_read_access_and_unused_refresh_tokens(pool: PgPool) {
let mut conn = pool.acquire().await.expect("failed to get connection");
let mut reader = SynapseReader::new(&mut conn, false)
.await
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
let refresh_tokens: BTreeSet<SynapseRefreshableTokenPair> = reader
.read_refreshable_token_pairs()
.try_collect()
.await
.expect("failed to read Synapse refresh tokens");
assert!(
access_tokens.is_empty(),
"there are no unrefreshable access tokens"
);
assert_debug_snapshot!(refresh_tokens);
}
}

View File

@@ -0,0 +1,16 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: refresh_tokens
---
{
SynapseRefreshableTokenPair {
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
access_token: "syt_AAAAAAAAAAAAAA_AAAA",
refresh_token: "syr_cccccccccccc_cccc",
valid_until_ms: None,
last_validated: None,
},
}

View File

@@ -0,0 +1,26 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: refresh_tokens
---
{
SynapseRefreshableTokenPair {
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
access_token: "syt_AAAAAAAAAAAAAA_AAAA",
refresh_token: "syr_cccccccccccc_cccc",
valid_until_ms: None,
last_validated: None,
},
SynapseRefreshableTokenPair {
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
access_token: "syt_aaaaaaaaaaaaaa_aaaa",
refresh_token: "syr_bbbbbbbbbbbbb_bbbb",
valid_until_ms: None,
last_validated: None,
},
}

View File

@@ -0,0 +1,17 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: access_tokens
---
{
SynapseAccessToken {
user_id: FullUserId(
"@alice:example.com",
),
device_id: Some(
"ADEVICE",
),
token: "syt_aaaaaaaaaaaaaa_aaaa",
valid_until_ms: None,
last_validated: None,
},
}

View File

@@ -0,0 +1,26 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: devices
---
{
SynapseDevice {
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
display_name: Some(
"Matrix Console",
),
last_seen: Some(
MillisecondsTimestamp(
2021-06-10T23:00:00Z,
),
),
ip: Some(
"203.0.113.1",
),
user_agent: Some(
"Browser/5.0 (X12; ComputerOS 64; rv:1024.0)",
),
},
}

View File

@@ -19,5 +19,8 @@ expression: users
creation_ts: SecondsTimestamp(
2018-06-30T21:26:02Z,
),
is_guest: SynapseBool(
false,
),
},
}

View File

@@ -0,0 +1,28 @@
-- Copyright 2025 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- Brings in the `access_tokens` and `refresh_tokens` tables from Synapse
CREATE TABLE access_tokens (
id bigint NOT NULL,
user_id text NOT NULL,
device_id text,
token text NOT NULL,
valid_until_ms bigint,
puppets_user_id text,
last_validated bigint,
refresh_token_id bigint,
used boolean
);
CREATE TABLE refresh_tokens (
id bigint NOT NULL,
user_id text NOT NULL,
device_id text NOT NULL,
token text NOT NULL,
next_token_id bigint,
expiry_ts bigint,
ultimate_session_expiry_ts bigint
);

View File

@@ -0,0 +1,15 @@
-- Copyright 2025 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
-- Brings in the `devices` table from Synapse
CREATE TABLE devices (
user_id text NOT NULL,
device_id text NOT NULL,
display_name text,
last_seen bigint,
ip text,
user_agent text,
hidden boolean DEFAULT false
);