Split access tokens between refreshable and unrefreshable ones

This commit is contained in:
Olivier 'reivilibre
2025-01-30 16:55:44 +00:00
committed by Quentin Gliech
parent 73ccecf8e8
commit dc305ddc40
6 changed files with 118 additions and 105 deletions

View File

@@ -29,13 +29,13 @@ use uuid::Uuid;
use crate::{
mas_writer::{
self, MasNewCompatAccessToken, MasNewCompatSession, MasNewEmailThreepid,
MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword,
MasWriteBuffer, MasWriter,
self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword, MasWriteBuffer, MasWriter,
},
synapse_reader::{
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
SynapseExternalId, SynapseRefreshToken, SynapseThreepid, SynapseUser,
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
},
SynapseReader,
};
@@ -140,7 +140,7 @@ pub async fn migrate(
.expect("More than usize::MAX devices — unable to handle this many!"),
);
migrate_access_tokens(
migrate_unrefreshable_access_tokens(
synapse,
mas,
server_name,
@@ -151,10 +151,11 @@ pub async fn migrate(
)
.await?;
migrate_refresh_tokens(
migrate_refreshable_token_pairs(
synapse,
mas,
server_name,
clock,
rng,
&migrated_users.user_localparts_to_uuid,
&mut devices_to_compat_sessions,
@@ -468,8 +469,10 @@ async fn migrate_devices(
Ok(())
}
/// Migrates unrefreshable access tokens (those without an associated refresh
/// token). Some of these may be deviceless.
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_access_tokens(
async fn migrate_unrefreshable_access_tokens(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
@@ -478,7 +481,7 @@ async fn migrate_access_tokens(
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
) -> Result<(), Error> {
let mut token_stream = pin!(synapse.read_access_tokens());
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
@@ -497,7 +500,7 @@ async fn migrate_access_tokens(
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
return Err(Error::MissingUserFromDependentTable {
table: "devices".to_owned(),
table: "access_tokens".to_owned(),
user: synapse_user_id,
});
};
@@ -571,24 +574,31 @@ async fn migrate_access_tokens(
Ok(())
}
/// Migrates (access token, refresh token) pairs.
/// Does not migrate non-refreshable access tokens.
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_refresh_tokens(
async fn migrate_refreshable_token_pairs(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
) -> Result<(), Error> {
let mut token_stream = pin!(synapse.read_refresh_tokens());
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens);
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
let mut refresh_token_write_buffer =
MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens);
while let Some(token_res) = token_stream.next().await {
let SynapseRefreshToken {
let SynapseRefreshableTokenPair {
user_id: synapse_user_id,
device_id,
token,
id,
access_token,
refresh_token,
valid_until_ms,
last_validated,
} = token_res.into_synapse("reading Synapse refresh token")?;
let username = synapse_user_id
@@ -597,15 +607,59 @@ async fn migrate_refresh_tokens(
.to_owned();
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
return Err(Error::MissingUserFromDependentTable {
table: "devices".to_owned(),
table: "refresh_tokens".to_owned(),
user: synapse_user_id,
});
};
todo!()
// It's not always accurate, but last_validated is *often* the creation time of
// the device If we don't have one, then use the current time as a
// fallback.
let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from);
// Use the existing device_id if this is the second token for a device
let session_id = *devices
.entry((user_id, CompactString::new(&device_id)))
.or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)));
let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
// TODO skip access tokens for deactivated users
access_token_write_buffer
.write(
mas,
MasNewCompatAccessToken {
token_id: access_token_id,
session_id,
access_token,
created_at,
expires_at: valid_until_ms.map(DateTime::from),
},
)
.await
.into_mas("writing compat access tokens")?;
refresh_token_write_buffer
.write(
mas,
MasNewCompatRefreshToken {
refresh_token_id,
session_id,
access_token_id,
refresh_token,
created_at,
},
)
.await
.into_mas("writing compat refresh tokens")?;
}
write_buffer
access_token_write_buffer
.finish(mas)
.await
.into_mas("writing compat access tokens")?;
refresh_token_write_buffer
.finish(mas)
.await
.into_mas("writing compat refresh tokens")?;

View File

@@ -232,11 +232,13 @@ pub struct SynapseAccessToken {
/// Row of the `refresh_tokens` table in Synapse.
#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)]
pub struct SynapseRefreshToken {
pub id: i64,
pub struct SynapseRefreshableTokenPair {
pub user_id: FullUserId,
pub device_id: String,
pub token: String,
pub access_token: String,
pub refresh_token: String,
pub valid_until_ms: Option<MillisecondsTimestamp>,
pub last_validated: Option<MillisecondsTimestamp>,
}
/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on.
@@ -413,13 +415,10 @@ impl<'conn> SynapseReader<'conn> {
.map_err(|err| err.into_database("reading Synapse devices"))
}
/// Reads access tokens from the Synapse database.
/// Reads unrefreshable access tokens from the Synapse database.
/// This does not include access tokens used for puppetting users, as those
/// are not supported by MAS. This also does not include access tokens
/// which have been made obsolete by using the associated refresh token
/// and then acknowledging the successor access token by using it to
/// authenticate a request.
pub fn read_access_tokens(
/// are not supported by MAS.
pub fn read_unrefreshable_access_tokens(
&mut self,
) -> impl Stream<Item = Result<SynapseAccessToken, Error>> + '_ {
sqlx::query_as(
@@ -427,17 +426,15 @@ impl<'conn> SynapseReader<'conn> {
SELECT
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
FROM access_tokens at0
LEFT JOIN refresh_tokens rt0 ON at0.refresh_token_id = rt0.id
LEFT JOIN access_tokens at1 ON rt0.next_token_id = at1.refresh_token_id
WHERE at0.puppets_user_id IS NULL AND (NOT at1.used OR at1.used IS NULL)
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
",
)
.fetch(&mut *self.txn)
.map_err(|err| err.into_database("reading Synapse access tokens"))
}
/// Reads refresh tokens from the Synapse database.
/// This also does not include refresh tokens which have been made obsolete
/// Reads (access token, refresh token) pairs from the Synapse database.
/// This does not include token pairs which have been made obsolete
/// by using the refresh token and then acknowledging the
/// successor access token by using it to authenticate a request.
///
@@ -445,14 +442,15 @@ impl<'conn> SynapseReader<'conn> {
/// they are not implemented in MAS.
/// Further, they are unused by any real-world deployment to the best of
/// our knowledge.
pub fn read_refresh_tokens(
pub fn read_refreshable_token_pairs(
&mut self,
) -> impl Stream<Item = Result<SynapseRefreshToken, Error>> + '_ {
) -> impl Stream<Item = Result<SynapseRefreshableTokenPair, Error>> + '_ {
sqlx::query_as(
"
SELECT
rt0.id, rt0.user_id, rt0.device_id, rt0.token, rt0.next_token_id
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
FROM refresh_tokens rt0
LEFT JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
WHERE NOT at1.used OR at1.used IS NULL
",
@@ -472,7 +470,7 @@ mod test {
use crate::{
synapse_reader::{
SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshToken,
SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair,
SynapseThreepid, SynapseUser,
},
SynapseReader,
@@ -553,7 +551,7 @@ mod test {
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_access_tokens()
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
@@ -573,7 +571,7 @@ mod test {
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_access_tokens()
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
@@ -592,18 +590,21 @@ mod test {
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_access_tokens()
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
let refresh_tokens: BTreeSet<SynapseRefreshToken> = reader
.read_refresh_tokens()
let refresh_tokens: BTreeSet<SynapseRefreshableTokenPair> = reader
.read_refreshable_token_pairs()
.try_collect()
.await
.expect("failed to read Synapse refresh tokens");
assert_debug_snapshot!(access_tokens);
assert!(
access_tokens.is_empty(),
"there are no unrefreshable access tokens"
);
assert_debug_snapshot!(refresh_tokens);
}
@@ -618,18 +619,21 @@ mod test {
.expect("failed to make SynapseReader");
let access_tokens: BTreeSet<SynapseAccessToken> = reader
.read_access_tokens()
.read_unrefreshable_access_tokens()
.try_collect()
.await
.expect("failed to read Synapse access tokens");
let refresh_tokens: BTreeSet<SynapseRefreshToken> = reader
.read_refresh_tokens()
let refresh_tokens: BTreeSet<SynapseRefreshableTokenPair> = reader
.read_refreshable_token_pairs()
.try_collect()
.await
.expect("failed to read Synapse refresh tokens");
assert_debug_snapshot!(access_tokens);
assert!(
access_tokens.is_empty(),
"there are no unrefreshable access tokens"
);
assert_debug_snapshot!(refresh_tokens);
}
}

View File

@@ -1,14 +0,0 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: refresh_tokens
---
{
SynapseRefreshToken {
id: 8,
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
token: "syr_cccccccccccc_cccc",
},
}

View File

@@ -1,16 +1,15 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: access_tokens
expression: refresh_tokens
---
{
SynapseAccessToken {
SynapseRefreshableTokenPair {
user_id: FullUserId(
"@alice:example.com",
),
device_id: Some(
"ADEVICE",
),
token: "syt_AAAAAAAAAAAAAA_AAAA",
device_id: "ADEVICE",
access_token: "syt_AAAAAAAAAAAAAA_AAAA",
refresh_token: "syr_cccccccccccc_cccc",
valid_until_ms: None,
last_validated: None,
},

View File

@@ -1,22 +0,0 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: refresh_tokens
---
{
SynapseRefreshToken {
id: 7,
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
token: "syr_bbbbbbbbbbbbb_bbbb",
},
SynapseRefreshToken {
id: 8,
user_id: FullUserId(
"@alice:example.com",
),
device_id: "ADEVICE",
token: "syr_cccccccccccc_cccc",
},
}

View File

@@ -1,34 +1,26 @@
---
source: crates/syn2mas/src/synapse_reader/mod.rs
expression: access_tokens
expression: refresh_tokens
---
{
SynapseAccessToken {
SynapseRefreshableTokenPair {
user_id: FullUserId(
"@alice:example.com",
),
device_id: Some(
"ADEVICE",
),
token: "syt_AAAAAAAAAAAAAA_AAAA",
device_id: "ADEVICE",
access_token: "syt_AAAAAAAAAAAAAA_AAAA",
refresh_token: "syr_cccccccccccc_cccc",
valid_until_ms: None,
last_validated: None,
refresh_token_id: Some(
8,
),
},
SynapseAccessToken {
SynapseRefreshableTokenPair {
user_id: FullUserId(
"@alice:example.com",
),
device_id: Some(
"ADEVICE",
),
token: "syt_aaaaaaaaaaaaaa_aaaa",
device_id: "ADEVICE",
access_token: "syt_aaaaaaaaaaaaaa_aaaa",
refresh_token: "syr_bbbbbbbbbbbbb_bbbb",
valid_until_ms: None,
last_validated: None,
refresh_token_id: Some(
7,
),
},
}