diff --git a/crates/syn2mas/.sqlx/query-204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d.json b/crates/syn2mas/.sqlx/query-204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d.json new file mode 100644 index 000000000..464dd9007 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d" +} diff --git a/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json b/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json deleted file mode 100644 index b44dfc605..000000000 --- a/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "TextArray", - "TextArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b" -} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index fe2391ae6..cbb4ad3b0 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -431,6 +431,45 @@ pub struct MasNewUnsupportedThreepid { pub created_at: DateTime, } +impl WriteBatch for MasNewUnsupportedThreepid { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut mediums: Vec = Vec::with_capacity(batch.len()); + let mut addresses: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + + for MasNewUnsupportedThreepid { + user_id, + medium, + address, + created_at, + } in batch + { + user_ids.push(user_id.get()); + mediums.push(medium); + addresses.push(address); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__user_unsupported_third_party_ids + (user_id, medium, address, created_at) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_ids[..], + &mediums[..], + &addresses[..], + &created_ats[..], + ) + .execute(&mut *conn) + .await + .into_database("writing unsupported threepids to MAS")?; + + Ok(()) + } +} + pub struct MasNewUpstreamOauthLink { pub link_id: Uuid, pub user_id: NonNilUuid, @@ -835,41 +874,15 @@ impl MasWriter { &mut self, threepids: Vec, ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| { - Box::pin(async move { - let mut user_ids: Vec = Vec::with_capacity(threepids.len()); - let mut mediums: Vec = Vec::with_capacity(threepids.len()); - let mut addresses: Vec = Vec::with_capacity(threepids.len()); - let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + MasNewUnsupportedThreepid::write_batch(conn, threepids).await?; - for MasNewUnsupportedThreepid { - user_id, - medium, - address, - created_at, - } in threepids - { - user_ids.push(user_id.get()); - mediums.push(medium); - addresses.push(address); - created_ats.push(created_at); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__user_unsupported_third_party_ids - (user_id, medium, address, created_at) - SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[]) - "#, - &user_ids[..], - &mediums[..], - &addresses[..], - &created_ats[..], - ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; - - Ok(()) + Ok(()) + }) }) - }).boxed() + .boxed() } #[tracing::instrument(skip_all, level = Level::DEBUG)]