syn2mas: implement WriteBatch for MasNewUpstreamOauthLink
This commit is contained in:
18
crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json
generated
Normal file
18
crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json
generated
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO syn2mas__upstream_oauth_links\n (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"UuidArray",
|
||||
"UuidArray",
|
||||
"UuidArray",
|
||||
"TextArray",
|
||||
"TimestamptzArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd"
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO syn2mas__upstream_oauth_links\n (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"UuidArray",
|
||||
"UuidArray",
|
||||
"UuidArray",
|
||||
"TextArray",
|
||||
"TimestamptzArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32"
|
||||
}
|
||||
@@ -478,6 +478,46 @@ pub struct MasNewUpstreamOauthLink {
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl WriteBatch for MasNewUpstreamOauthLink {
|
||||
async fn write_batch(conn: &mut PgConnection, batch: Vec<Self>) -> Result<(), Error> {
|
||||
let mut link_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
|
||||
let mut user_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
|
||||
let mut upstream_provider_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
|
||||
let mut subjects: Vec<String> = Vec::with_capacity(batch.len());
|
||||
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(batch.len());
|
||||
|
||||
for MasNewUpstreamOauthLink {
|
||||
link_id,
|
||||
user_id,
|
||||
upstream_provider_id,
|
||||
subject,
|
||||
created_at,
|
||||
} in batch
|
||||
{
|
||||
link_ids.push(link_id);
|
||||
user_ids.push(user_id.get());
|
||||
upstream_provider_ids.push(upstream_provider_id);
|
||||
subjects.push(subject);
|
||||
created_ats.push(created_at);
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO syn2mas__upstream_oauth_links
|
||||
(upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)
|
||||
SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])
|
||||
"#,
|
||||
&link_ids[..],
|
||||
&user_ids[..],
|
||||
&upstream_provider_ids[..],
|
||||
&subjects[..],
|
||||
&created_ats[..],
|
||||
).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MasNewCompatSession {
|
||||
pub session_id: Uuid,
|
||||
pub user_id: NonNilUuid,
|
||||
@@ -890,45 +930,15 @@ impl MasWriter {
|
||||
&mut self,
|
||||
links: Vec<MasNewUpstreamOauthLink>,
|
||||
) -> BoxFuture<'_, Result<(), Error>> {
|
||||
self.writer_pool.spawn_with_connection(move |conn| {
|
||||
Box::pin(async move {
|
||||
let mut link_ids: Vec<Uuid> = Vec::with_capacity(links.len());
|
||||
let mut user_ids: Vec<Uuid> = Vec::with_capacity(links.len());
|
||||
let mut upstream_provider_ids: Vec<Uuid> = Vec::with_capacity(links.len());
|
||||
let mut subjects: Vec<String> = Vec::with_capacity(links.len());
|
||||
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(links.len());
|
||||
self.writer_pool
|
||||
.spawn_with_connection(move |conn| {
|
||||
Box::pin(async move {
|
||||
MasNewUpstreamOauthLink::write_batch(conn, links).await?;
|
||||
|
||||
for MasNewUpstreamOauthLink {
|
||||
link_id,
|
||||
user_id,
|
||||
upstream_provider_id,
|
||||
subject,
|
||||
created_at,
|
||||
} in links
|
||||
{
|
||||
link_ids.push(link_id);
|
||||
user_ids.push(user_id.get());
|
||||
upstream_provider_ids.push(upstream_provider_id);
|
||||
subjects.push(subject);
|
||||
created_ats.push(created_at);
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO syn2mas__upstream_oauth_links
|
||||
(upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)
|
||||
SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])
|
||||
"#,
|
||||
&link_ids[..],
|
||||
&user_ids[..],
|
||||
&upstream_provider_ids[..],
|
||||
&subjects[..],
|
||||
&created_ats[..],
|
||||
).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?;
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
}).boxed()
|
||||
.boxed()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, level = Level::DEBUG)]
|
||||
|
||||
Reference in New Issue
Block a user