syn2mas: remove the MasWriter::write_ methods and replaced them in tests

This commit is contained in:
Quentin Gliech
2025-04-22 16:30:40 +02:00
parent 47009a8800
commit 1056949149

View File

@@ -22,7 +22,7 @@ use sqlx::{Executor, PgConnection, query, query_as};
use thiserror::Error;
use thiserror_ext::{Construct, ContextInto};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{Instrument, Level, error, info, warn};
use tracing::{Instrument, error, info, warn};
use uuid::{NonNilUuid, Uuid};
use self::{
@@ -1022,144 +1022,6 @@ impl MasWriter {
Ok(conn)
}
/// Write a batch of users to the database.
///
/// # Errors
///
/// Errors are returned in the following conditions:
///
/// - If the database writer connection pool had an error.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_users(&mut self, users: Vec<MasNewUser>) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewUser::write_batch(conn, users).await?;
Ok(())
})
})
.boxed()
}
/// Write a batch of user passwords to the database.
///
/// # Errors
///
/// Errors are returned in the following conditions:
///
/// - If the database writer connection pool had an error.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_passwords(
&mut self,
passwords: Vec<MasNewUserPassword>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewUserPassword::write_batch(conn, passwords).await?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_email_threepids(
&mut self,
threepids: Vec<MasNewEmailThreepid>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewEmailThreepid::write_batch(conn, threepids).await?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_unsupported_threepids(
&mut self,
threepids: Vec<MasNewUnsupportedThreepid>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewUnsupportedThreepid::write_batch(conn, threepids).await?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_upstream_oauth_links(
&mut self,
links: Vec<MasNewUpstreamOauthLink>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewUpstreamOauthLink::write_batch(conn, links).await?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_compat_sessions(
&mut self,
sessions: Vec<MasNewCompatSession>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewCompatSession::write_batch(conn, sessions).await?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_compat_access_tokens(
&mut self,
tokens: Vec<MasNewCompatAccessToken>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewCompatAccessToken::write_batch(conn, tokens).await?;
Ok(())
})
})
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_compat_refresh_tokens(
&mut self,
tokens: Vec<MasNewCompatRefreshToken>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewCompatRefreshToken::write_batch(conn, tokens).await?;
Ok(())
})
})
.boxed()
}
}
// How many entries to buffer at once, before writing a batch of rows to the
@@ -1228,7 +1090,7 @@ mod test {
mas_writer::{
MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword,
MasNewUserPassword, MasWriteBuffer,
},
};
@@ -1340,20 +1202,29 @@ mod test {
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_write_user(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
let mut buffer = MasWriteBuffer::new(&writer);
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
buffer
.finish(&mut writer)
.await
.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1369,28 +1240,47 @@ mod test {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: USER_ID,
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut password_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: USER_ID,
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_passwords(vec![MasNewUserPassword {
user_password_id: Uuid::from_u128(42u128),
user_id: USER_ID,
hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(),
created_at: DateTime::default(),
}])
password_buffer
.write(
&mut writer,
MasNewUserPassword {
user_password_id: Uuid::from_u128(42u128),
user_id: USER_ID,
hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(),
created_at: DateTime::default(),
},
)
.await
.expect("failed to write password");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish MasWriteBuffer");
password_buffer
.finish(&mut writer)
.await
.expect("failed to finish MasWriteBuffer");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1404,29 +1294,47 @@ mod test {
async fn test_write_user_with_email(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut email_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_email_threepids(vec![MasNewEmailThreepid {
user_email_id: Uuid::from_u128(2u128),
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
email: "alice@example.org".to_owned(),
created_at: DateTime::default(),
}])
email_buffer
.write(
&mut writer,
MasNewEmailThreepid {
user_email_id: Uuid::from_u128(2u128),
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
email: "alice@example.org".to_owned(),
created_at: DateTime::default(),
},
)
.await
.expect("failed to write e-mail");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish user buffer");
email_buffer
.finish(&mut writer)
.await
.expect("failed to finish email buffer");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1441,29 +1349,47 @@ mod test {
async fn test_write_user_with_unsupported_threepid(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut threepid_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_unsupported_threepids(vec![MasNewUnsupportedThreepid {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
medium: "msisdn".to_owned(),
address: "441189998819991197253".to_owned(),
created_at: DateTime::default(),
}])
threepid_buffer
.write(
&mut writer,
MasNewUnsupportedThreepid {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
medium: "msisdn".to_owned(),
address: "441189998819991197253".to_owned(),
created_at: DateTime::default(),
},
)
.await
.expect("failed to write phone number (unsupported threepid)");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish user buffer");
threepid_buffer
.finish(&mut writer)
.await
.expect("failed to finish threepid buffer");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1479,30 +1405,48 @@ mod test {
async fn test_write_user_with_upstream_provider_link(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut link_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_upstream_oauth_links(vec![MasNewUpstreamOauthLink {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
link_id: Uuid::from_u128(3u128),
upstream_provider_id: Uuid::from_u128(4u128),
subject: "12345.67890".to_owned(),
created_at: DateTime::default(),
}])
link_buffer
.write(
&mut writer,
MasNewUpstreamOauthLink {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
link_id: Uuid::from_u128(3u128),
upstream_provider_id: Uuid::from_u128(4u128),
subject: "12345.67890".to_owned(),
created_at: DateTime::default(),
},
)
.await
.expect("failed to write link");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish user buffer");
link_buffer
.finish(&mut writer)
.await
.expect("failed to finish link buffer");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1516,34 +1460,52 @@ mod test {
async fn test_write_user_with_device(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut session_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_compat_sessions(vec![MasNewCompatSession {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: Some("alice's pinephone".to_owned()),
is_synapse_admin: true,
last_active_at: Some(DateTime::default()),
last_active_ip: Some("203.0.113.1".parse().unwrap()),
user_agent: Some("Browser/5.0".to_owned()),
}])
session_buffer
.write(
&mut writer,
MasNewCompatSession {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: Some("alice's pinephone".to_owned()),
is_synapse_admin: true,
last_active_at: Some(DateTime::default()),
last_active_ip: Some("203.0.113.1".parse().unwrap()),
user_agent: Some("Browser/5.0".to_owned()),
},
)
.await
.expect("failed to write compat session");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish user buffer");
session_buffer
.finish(&mut writer)
.await
.expect("failed to finish session buffer");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1557,45 +1519,71 @@ mod test {
async fn test_write_user_with_access_token(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut session_buffer = MasWriteBuffer::new(&writer);
let mut token_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_compat_sessions(vec![MasNewCompatSession {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: None,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
}])
session_buffer
.write(
&mut writer,
MasNewCompatSession {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: None,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
},
)
.await
.expect("failed to write compat session");
writer
.write_compat_access_tokens(vec![MasNewCompatAccessToken {
token_id: Uuid::from_u128(6u128),
session_id: Uuid::from_u128(5u128),
access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
expires_at: None,
}])
token_buffer
.write(
&mut writer,
MasNewCompatAccessToken {
token_id: Uuid::from_u128(6u128),
session_id: Uuid::from_u128(5u128),
access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
expires_at: None,
},
)
.await
.expect("failed to write access token");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish user buffer");
session_buffer
.finish(&mut writer)
.await
.expect("failed to finish session buffer");
token_buffer
.finish(&mut writer)
.await
.expect("failed to finish token buffer");
let mut conn = writer
.finish(&Progress::default())
.await
@@ -1610,56 +1598,90 @@ mod test {
async fn test_write_user_with_refresh_token(pool: PgPool) {
let mut writer = make_mas_writer(&pool).await;
writer
.write_users(vec![MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
}])
let mut user_buffer = MasWriteBuffer::new(&writer);
let mut session_buffer = MasWriteBuffer::new(&writer);
let mut token_buffer = MasWriteBuffer::new(&writer);
let mut refresh_token_buffer = MasWriteBuffer::new(&writer);
user_buffer
.write(
&mut writer,
MasNewUser {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
username: "alice".to_owned(),
created_at: DateTime::default(),
locked_at: None,
deactivated_at: None,
can_request_admin: false,
is_guest: false,
},
)
.await
.expect("failed to write user");
writer
.write_compat_sessions(vec![MasNewCompatSession {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: None,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
}])
session_buffer
.write(
&mut writer,
MasNewCompatSession {
user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(),
session_id: Uuid::from_u128(5u128),
created_at: DateTime::default(),
device_id: Some("ADEVICE".to_owned()),
human_name: None,
is_synapse_admin: false,
last_active_at: None,
last_active_ip: None,
user_agent: None,
},
)
.await
.expect("failed to write compat session");
writer
.write_compat_access_tokens(vec![MasNewCompatAccessToken {
token_id: Uuid::from_u128(6u128),
session_id: Uuid::from_u128(5u128),
access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
expires_at: None,
}])
token_buffer
.write(
&mut writer,
MasNewCompatAccessToken {
token_id: Uuid::from_u128(6u128),
session_id: Uuid::from_u128(5u128),
access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
expires_at: None,
},
)
.await
.expect("failed to write access token");
writer
.write_compat_refresh_tokens(vec![MasNewCompatRefreshToken {
refresh_token_id: Uuid::from_u128(7u128),
session_id: Uuid::from_u128(5u128),
access_token_id: Uuid::from_u128(6u128),
refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
}])
refresh_token_buffer
.write(
&mut writer,
MasNewCompatRefreshToken {
refresh_token_id: Uuid::from_u128(7u128),
session_id: Uuid::from_u128(5u128),
access_token_id: Uuid::from_u128(6u128),
refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(),
created_at: DateTime::default(),
},
)
.await
.expect("failed to write refresh token");
user_buffer
.finish(&mut writer)
.await
.expect("failed to finish user buffer");
session_buffer
.finish(&mut writer)
.await
.expect("failed to finish session buffer");
token_buffer
.finish(&mut writer)
.await
.expect("failed to finish token buffer");
refresh_token_buffer
.finish(&mut writer)
.await
.expect("failed to finish refresh token buffer");
let mut conn = writer
.finish(&Progress::default())
.await