From 1056949149484b4f3b29436670dfb458ced15ae4 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 22 Apr 2025 16:30:40 +0200 Subject: [PATCH] syn2mas: remove the `MasWriter::write_` methods and replaced them in tests --- crates/syn2mas/src/mas_writer/mod.rs | 640 ++++++++++++++------------- 1 file changed, 331 insertions(+), 309 deletions(-) diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 257e07a11..5e3a0f0ef 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -22,7 +22,7 @@ use sqlx::{Executor, PgConnection, query, query_as}; use thiserror::Error; use thiserror_ext::{Construct, ContextInto}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::{Instrument, Level, error, info, warn}; +use tracing::{Instrument, error, info, warn}; use uuid::{NonNilUuid, Uuid}; use self::{ @@ -1022,144 +1022,6 @@ impl MasWriter { Ok(conn) } - - /// Write a batch of users to the database. - /// - /// # Errors - /// - /// Errors are returned in the following conditions: - /// - /// - If the database writer connection pool had an error. - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_users(&mut self, users: Vec) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewUser::write_batch(conn, users).await?; - - Ok(()) - }) - }) - .boxed() - } - - /// Write a batch of user passwords to the database. - /// - /// # Errors - /// - /// Errors are returned in the following conditions: - /// - /// - If the database writer connection pool had an error. - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_passwords( - &mut self, - passwords: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewUserPassword::write_batch(conn, passwords).await?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_email_threepids( - &mut self, - threepids: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewEmailThreepid::write_batch(conn, threepids).await?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_unsupported_threepids( - &mut self, - threepids: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewUnsupportedThreepid::write_batch(conn, threepids).await?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_upstream_oauth_links( - &mut self, - links: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewUpstreamOauthLink::write_batch(conn, links).await?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_compat_sessions( - &mut self, - sessions: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewCompatSession::write_batch(conn, sessions).await?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_compat_access_tokens( - &mut self, - tokens: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewCompatAccessToken::write_batch(conn, tokens).await?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_compat_refresh_tokens( - &mut self, - tokens: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - MasNewCompatRefreshToken::write_batch(conn, tokens).await?; - Ok(()) - }) - }) - .boxed() - } } // How many entries to buffer at once, before writing a batch of rows to the @@ -1228,7 +1090,7 @@ mod test { mas_writer::{ MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, - MasNewUserPassword, + MasNewUserPassword, MasWriteBuffer, }, }; @@ -1340,20 +1202,29 @@ mod test { #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] async fn test_write_user(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; + let mut buffer = MasWriteBuffer::new(&writer); - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); + buffer + .finish(&mut writer) + .await + .expect("failed to finish MasWriter"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1369,28 +1240,47 @@ mod test { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: USER_ID, - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut password_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: USER_ID, + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_passwords(vec![MasNewUserPassword { - user_password_id: Uuid::from_u128(42u128), - user_id: USER_ID, - hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(), - created_at: DateTime::default(), - }]) + + password_buffer + .write( + &mut writer, + MasNewUserPassword { + user_password_id: Uuid::from_u128(42u128), + user_id: USER_ID, + hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write password"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish MasWriteBuffer"); + password_buffer + .finish(&mut writer) + .await + .expect("failed to finish MasWriteBuffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1404,29 +1294,47 @@ mod test { async fn test_write_user_with_email(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut email_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_email_threepids(vec![MasNewEmailThreepid { - user_email_id: Uuid::from_u128(2u128), - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - email: "alice@example.org".to_owned(), - created_at: DateTime::default(), - }]) + email_buffer + .write( + &mut writer, + MasNewEmailThreepid { + user_email_id: Uuid::from_u128(2u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + email: "alice@example.org".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write e-mail"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + email_buffer + .finish(&mut writer) + .await + .expect("failed to finish email buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1441,29 +1349,47 @@ mod test { async fn test_write_user_with_unsupported_threepid(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut threepid_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_unsupported_threepids(vec![MasNewUnsupportedThreepid { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - medium: "msisdn".to_owned(), - address: "441189998819991197253".to_owned(), - created_at: DateTime::default(), - }]) + threepid_buffer + .write( + &mut writer, + MasNewUnsupportedThreepid { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + medium: "msisdn".to_owned(), + address: "441189998819991197253".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write phone number (unsupported threepid)"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + threepid_buffer + .finish(&mut writer) + .await + .expect("failed to finish threepid buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1479,30 +1405,48 @@ mod test { async fn test_write_user_with_upstream_provider_link(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut link_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_upstream_oauth_links(vec![MasNewUpstreamOauthLink { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - link_id: Uuid::from_u128(3u128), - upstream_provider_id: Uuid::from_u128(4u128), - subject: "12345.67890".to_owned(), - created_at: DateTime::default(), - }]) + link_buffer + .write( + &mut writer, + MasNewUpstreamOauthLink { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + link_id: Uuid::from_u128(3u128), + upstream_provider_id: Uuid::from_u128(4u128), + subject: "12345.67890".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write link"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + link_buffer + .finish(&mut writer) + .await + .expect("failed to finish link buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1516,34 +1460,52 @@ mod test { async fn test_write_user_with_device(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut session_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_compat_sessions(vec![MasNewCompatSession { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - session_id: Uuid::from_u128(5u128), - created_at: DateTime::default(), - device_id: Some("ADEVICE".to_owned()), - human_name: Some("alice's pinephone".to_owned()), - is_synapse_admin: true, - last_active_at: Some(DateTime::default()), - last_active_ip: Some("203.0.113.1".parse().unwrap()), - user_agent: Some("Browser/5.0".to_owned()), - }]) + session_buffer + .write( + &mut writer, + MasNewCompatSession { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: Some("alice's pinephone".to_owned()), + is_synapse_admin: true, + last_active_at: Some(DateTime::default()), + last_active_ip: Some("203.0.113.1".parse().unwrap()), + user_agent: Some("Browser/5.0".to_owned()), + }, + ) .await .expect("failed to write compat session"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + session_buffer + .finish(&mut writer) + .await + .expect("failed to finish session buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1557,45 +1519,71 @@ mod test { async fn test_write_user_with_access_token(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut session_buffer = MasWriteBuffer::new(&writer); + let mut token_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_compat_sessions(vec![MasNewCompatSession { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - session_id: Uuid::from_u128(5u128), - created_at: DateTime::default(), - device_id: Some("ADEVICE".to_owned()), - human_name: None, - is_synapse_admin: false, - last_active_at: None, - last_active_ip: None, - user_agent: None, - }]) + session_buffer + .write( + &mut writer, + MasNewCompatSession { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }, + ) .await .expect("failed to write compat session"); - writer - .write_compat_access_tokens(vec![MasNewCompatAccessToken { - token_id: Uuid::from_u128(6u128), - session_id: Uuid::from_u128(5u128), - access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), - created_at: DateTime::default(), - expires_at: None, - }]) + token_buffer + .write( + &mut writer, + MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }, + ) .await .expect("failed to write access token"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + session_buffer + .finish(&mut writer) + .await + .expect("failed to finish session buffer"); + token_buffer + .finish(&mut writer) + .await + .expect("failed to finish token buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1610,56 +1598,90 @@ mod test { async fn test_write_user_with_refresh_token(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut session_buffer = MasWriteBuffer::new(&writer); + let mut token_buffer = MasWriteBuffer::new(&writer); + let mut refresh_token_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_compat_sessions(vec![MasNewCompatSession { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - session_id: Uuid::from_u128(5u128), - created_at: DateTime::default(), - device_id: Some("ADEVICE".to_owned()), - human_name: None, - is_synapse_admin: false, - last_active_at: None, - last_active_ip: None, - user_agent: None, - }]) + session_buffer + .write( + &mut writer, + MasNewCompatSession { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }, + ) .await .expect("failed to write compat session"); - writer - .write_compat_access_tokens(vec![MasNewCompatAccessToken { - token_id: Uuid::from_u128(6u128), - session_id: Uuid::from_u128(5u128), - access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), - created_at: DateTime::default(), - expires_at: None, - }]) + token_buffer + .write( + &mut writer, + MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }, + ) .await .expect("failed to write access token"); - writer - .write_compat_refresh_tokens(vec![MasNewCompatRefreshToken { - refresh_token_id: Uuid::from_u128(7u128), - session_id: Uuid::from_u128(5u128), - access_token_id: Uuid::from_u128(6u128), - refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(), - created_at: DateTime::default(), - }]) + refresh_token_buffer + .write( + &mut writer, + MasNewCompatRefreshToken { + refresh_token_id: Uuid::from_u128(7u128), + session_id: Uuid::from_u128(5u128), + access_token_id: Uuid::from_u128(6u128), + refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write refresh token"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + session_buffer + .finish(&mut writer) + .await + .expect("failed to finish session buffer"); + token_buffer + .finish(&mut writer) + .await + .expect("failed to finish token buffer"); + refresh_token_buffer + .finish(&mut writer) + .await + .expect("failed to finish refresh token buffer"); + let mut conn = writer .finish(&Progress::default()) .await