syn2mas: implement WriteBatch for MasNewEmailThreepid

This commit is contained in:
Quentin Gliech
2025-04-22 14:25:13 +02:00
parent 4c081152a9
commit 2450e2e480
3 changed files with 62 additions and 52 deletions

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "08ad2855f0baaaed9d6af23c8bf035e9a087ff27b06e804464a432d93e5a25f1"
}

View File

@@ -1,17 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"UuidArray",
"TextArray",
"TimestamptzArray"
]
},
"nullable": []
},
"hash": "dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00"
}

View File

@@ -386,6 +386,44 @@ pub struct MasNewEmailThreepid {
pub created_at: DateTime<Utc>,
}
impl WriteBatch for MasNewEmailThreepid {
async fn write_batch(conn: &mut PgConnection, batch: Vec<Self>) -> Result<(), Error> {
let mut user_email_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
let mut emails: Vec<String> = Vec::with_capacity(batch.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(batch.len());
for MasNewEmailThreepid {
user_email_id,
user_id,
email,
created_at,
} in batch
{
user_email_ids.push(user_email_id);
user_ids.push(user_id.get());
emails.push(email);
created_ats.push(created_at);
}
// `confirmed_at` is going to get removed in a future MAS release,
// so just populate with `created_at`
sqlx::query!(
r#"
INSERT INTO syn2mas__user_emails
(user_email_id, user_id, email, created_at, confirmed_at)
SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])
"#,
&user_email_ids[..],
&user_ids[..],
&emails[..],
&created_ats[..],
).execute(&mut *conn).await.into_database("writing emails to MAS")?;
Ok(())
}
}
pub struct MasNewUnsupportedThreepid {
pub user_id: NonNilUuid,
pub medium: String,
@@ -781,43 +819,15 @@ impl MasWriter {
&mut self,
threepids: Vec<MasNewEmailThreepid>,
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut user_email_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
let mut emails: Vec<String> = Vec::with_capacity(threepids.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(threepids.len());
self.writer_pool
.spawn_with_connection(move |conn| {
Box::pin(async move {
MasNewEmailThreepid::write_batch(conn, threepids).await?;
for MasNewEmailThreepid {
user_email_id,
user_id,
email,
created_at,
} in threepids
{
user_email_ids.push(user_email_id);
user_ids.push(user_id.get());
emails.push(email);
created_ats.push(created_at);
}
// `confirmed_at` is going to get removed in a future MAS release,
// so just populate with `created_at`
sqlx::query!(
r#"
INSERT INTO syn2mas__user_emails
(user_email_id, user_id, email, created_at, confirmed_at)
SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])
"#,
&user_email_ids[..],
&user_ids[..],
&emails[..],
&created_ats[..],
).execute(&mut *conn).await.into_database("writing emails to MAS")?;
Ok(())
Ok(())
})
})
}).boxed()
.boxed()
}
#[tracing::instrument(skip_all, level = Level::DEBUG)]