syn2mas: implement WriteBatch for MasNewUnsupportedThreepid

This commit is contained in:
Quentin Gliech
2025-04-22 14:30:51 +02:00
parent 2450e2e480
commit 028a993dc6
3 changed files with 63 additions and 50 deletions

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"TextArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d"
}

View File

@@ -1,17 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"TextArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b"
}

View File

@@ -431,6 +431,45 @@ pub struct MasNewUnsupportedThreepid {
pub created_at: DateTime<Utc>,
}
impl WriteBatch for MasNewUnsupportedThreepid {
async fn write_batch(conn: &mut PgConnection, batch: Vec<Self>) -> Result<(), Error> {
let mut user_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
let mut mediums: Vec<String> = Vec::with_capacity(batch.len());
let mut addresses: Vec<String> = Vec::with_capacity(batch.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(batch.len());
for MasNewUnsupportedThreepid {
user_id,
medium,
address,
created_at,
} in batch
{
user_ids.push(user_id.get());
mediums.push(medium);
addresses.push(address);
created_ats.push(created_at);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__user_unsupported_third_party_ids
(user_id, medium, address, created_at)
SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])
"#,
&user_ids[..],
&mediums[..],
&addresses[..],
&created_ats[..],
)
.execute(&mut *conn)
.await
.into_database("writing unsupported threepids to MAS")?;
Ok(())
}
}
pub struct MasNewUpstreamOauthLink {
pub link_id: Uuid,
pub user_id: NonNilUuid,
@@ -835,41 +874,15 @@ impl MasWriter {
&mut self,
threepids: Vec<MasNewUnsupportedThreepid>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
let mut mediums: Vec<String> = Vec::with_capacity(threepids.len());
let mut addresses: Vec<String> = Vec::with_capacity(threepids.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(threepids.len());
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewUnsupportedThreepid::write_batch(conn, threepids).await?;
for MasNewUnsupportedThreepid {
user_id,
medium,
address,
created_at,
} in threepids
{
user_ids.push(user_id.get());
mediums.push(medium);
addresses.push(address);
created_ats.push(created_at);
}
sqlx::query!(
r#"
INSERT INTO syn2mas__user_unsupported_third_party_ids
(user_id, medium, address, created_at)
SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])
"#,
&user_ids[..],
&mediums[..],
&addresses[..],
&created_ats[..],
).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?;
Ok(())
Ok(())
})
})
}).boxed()
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]