diff --git a/crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json b/crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json new file mode 100644 index 000000000..fa5f442ed --- /dev/null +++ b/crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__upstream_oauth_links\n (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd" +} diff --git a/crates/syn2mas/.sqlx/query-d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32.json b/crates/syn2mas/.sqlx/query-d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32.json deleted file mode 100644 index f6ac32781..000000000 --- a/crates/syn2mas/.sqlx/query-d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__upstream_oauth_links\n (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "UuidArray", - "TextArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32" -} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index cbb4ad3b0..0f7ccf715 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -478,6 +478,46 @@ pub struct MasNewUpstreamOauthLink { pub created_at: DateTime, } +impl WriteBatch for MasNewUpstreamOauthLink { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut link_ids: Vec = Vec::with_capacity(batch.len()); + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut upstream_provider_ids: Vec = Vec::with_capacity(batch.len()); + let mut subjects: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + + for MasNewUpstreamOauthLink { + link_id, + user_id, + upstream_provider_id, + subject, + created_at, + } in batch + { + link_ids.push(link_id); + user_ids.push(user_id.get()); + upstream_provider_ids.push(upstream_provider_id); + subjects.push(subject); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__upstream_oauth_links + (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &link_ids[..], + &user_ids[..], + &upstream_provider_ids[..], + &subjects[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; + + Ok(()) + } +} + pub struct MasNewCompatSession { pub session_id: Uuid, pub user_id: NonNilUuid, @@ -890,45 +930,15 @@ impl MasWriter { &mut self, links: Vec, ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| { - Box::pin(async move { - let mut link_ids: Vec = Vec::with_capacity(links.len()); - let mut user_ids: Vec = Vec::with_capacity(links.len()); - let mut upstream_provider_ids: Vec = Vec::with_capacity(links.len()); - let mut subjects: Vec = Vec::with_capacity(links.len()); - let mut created_ats: Vec> = Vec::with_capacity(links.len()); + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + MasNewUpstreamOauthLink::write_batch(conn, links).await?; - for MasNewUpstreamOauthLink { - link_id, - user_id, - upstream_provider_id, - subject, - created_at, - } in links - { - link_ids.push(link_id); - user_ids.push(user_id.get()); - upstream_provider_ids.push(upstream_provider_id); - subjects.push(subject); - created_ats.push(created_at); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__upstream_oauth_links - (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at) - SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[]) - "#, - &link_ids[..], - &user_ids[..], - &upstream_provider_ids[..], - &subjects[..], - &created_ats[..], - ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; - - Ok(()) + Ok(()) + }) }) - }).boxed() + .boxed() } #[tracing::instrument(skip_all, level = Level::DEBUG)]