From 86000613ac2f0a578cdf1dd99b6ee51104f133a5 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 22 Apr 2025 14:37:32 +0200 Subject: [PATCH] syn2mas: implement WriteBatch for MasNewCompatSession --- ...233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json | 22 +++ ...e36f4ef03e1224a0a89a921e5a3d398a5d35c.json | 22 --- crates/syn2mas/src/mas_writer/mod.rs | 134 +++++++++--------- 3 files changed, 92 insertions(+), 86 deletions(-) create mode 100644 crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json delete mode 100644 crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json diff --git a/crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json b/crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json new file mode 100644 index 000000000..97e8a07a0 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray", + "BoolArray", + "TimestamptzArray", + "InetArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25" +} diff --git a/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json b/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json deleted file mode 100644 index 521e4facd..000000000 --- a/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "TextArray", - "TextArray", - "TimestamptzArray", - "BoolArray", - "TimestamptzArray", - "InetArray", - "TextArray" - ] - }, - "nullable": [] - }, - "hash": "396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c" -} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 0f7ccf715..b0b8250c1 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -530,6 +530,75 @@ pub struct MasNewCompatSession { pub user_agent: Option, } +impl WriteBatch for MasNewCompatSession { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut session_ids: Vec = Vec::with_capacity(batch.len()); + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut device_ids: Vec> = Vec::with_capacity(batch.len()); + let mut human_names: Vec> = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + let mut is_synapse_admins: Vec = Vec::with_capacity(batch.len()); + let mut last_active_ats: Vec>> = Vec::with_capacity(batch.len()); + let mut last_active_ips: Vec> = Vec::with_capacity(batch.len()); + let mut user_agents: Vec> = Vec::with_capacity(batch.len()); + + for MasNewCompatSession { + session_id, + user_id, + device_id, + human_name, + created_at, + is_synapse_admin, + last_active_at, + last_active_ip, + user_agent, + } in batch + { + session_ids.push(session_id); + user_ids.push(user_id.get()); + device_ids.push(device_id); + human_names.push(human_name); + created_ats.push(created_at); + is_synapse_admins.push(is_synapse_admin); + last_active_ats.push(last_active_at); + last_active_ips.push(last_active_ip); + user_agents.push(user_agent); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_sessions ( + compat_session_id, user_id, + device_id, human_name, + created_at, is_synapse_admin, + last_active_at, last_active_ip, + user_agent) + SELECT * FROM UNNEST( + $1::UUID[], $2::UUID[], + $3::TEXT[], $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], + $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], + $9::TEXT[]) + "#, + &session_ids[..], + &user_ids[..], + &device_ids[..] as &[Option], + &human_names[..] as &[Option], + &created_ats[..], + &is_synapse_admins[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &last_active_ats[..] as &[Option>], + &last_active_ips[..] as &[Option], + &user_agents[..] as &[Option], + ) + .execute(&mut *conn) + .await + .into_database("writing compat sessions to MAS")?; + + Ok(()) + } +} + pub struct MasNewCompatAccessToken { pub token_id: Uuid, pub session_id: Uuid, @@ -949,70 +1018,7 @@ impl MasWriter { self.writer_pool .spawn_with_connection(move |conn| { Box::pin(async move { - let mut session_ids: Vec = Vec::with_capacity(sessions.len()); - let mut user_ids: Vec = Vec::with_capacity(sessions.len()); - let mut device_ids: Vec> = Vec::with_capacity(sessions.len()); - let mut human_names: Vec> = Vec::with_capacity(sessions.len()); - let mut created_ats: Vec> = Vec::with_capacity(sessions.len()); - let mut is_synapse_admins: Vec = Vec::with_capacity(sessions.len()); - let mut last_active_ats: Vec>> = - Vec::with_capacity(sessions.len()); - let mut last_active_ips: Vec> = - Vec::with_capacity(sessions.len()); - let mut user_agents: Vec> = Vec::with_capacity(sessions.len()); - - for MasNewCompatSession { - session_id, - user_id, - device_id, - human_name, - created_at, - is_synapse_admin, - last_active_at, - last_active_ip, - user_agent, - } in sessions - { - session_ids.push(session_id); - user_ids.push(user_id.get()); - device_ids.push(device_id); - human_names.push(human_name); - created_ats.push(created_at); - is_synapse_admins.push(is_synapse_admin); - last_active_ats.push(last_active_at); - last_active_ips.push(last_active_ip); - user_agents.push(user_agent); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__compat_sessions ( - compat_session_id, user_id, - device_id, human_name, - created_at, is_synapse_admin, - last_active_at, last_active_ip, - user_agent) - SELECT * FROM UNNEST( - $1::UUID[], $2::UUID[], - $3::TEXT[], $4::TEXT[], - $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], - $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], - $9::TEXT[]) - "#, - &session_ids[..], - &user_ids[..], - &device_ids[..] as &[Option], - &human_names[..] as &[Option], - &created_ats[..], - &is_synapse_admins[..], - // We need to override the typing for arrays of optionals (sqlx limitation) - &last_active_ats[..] as &[Option>], - &last_active_ips[..] as &[Option], - &user_agents[..] as &[Option], - ) - .execute(&mut *conn) - .await - .into_database("writing compat sessions to MAS")?; + MasNewCompatSession::write_batch(conn, sessions).await?; Ok(()) })