syn2mas: implement WriteBatch for MasNewCompatSession
This commit is contained in:
22
crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json
generated
Normal file
22
crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json
generated
Normal file
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"UuidArray",
|
||||
"UuidArray",
|
||||
"TextArray",
|
||||
"TextArray",
|
||||
"TimestamptzArray",
|
||||
"BoolArray",
|
||||
"TimestamptzArray",
|
||||
"InetArray",
|
||||
"TextArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25"
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"UuidArray",
|
||||
"UuidArray",
|
||||
"TextArray",
|
||||
"TextArray",
|
||||
"TimestamptzArray",
|
||||
"BoolArray",
|
||||
"TimestamptzArray",
|
||||
"InetArray",
|
||||
"TextArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c"
|
||||
}
|
||||
@@ -530,6 +530,75 @@ pub struct MasNewCompatSession {
|
||||
pub user_agent: Option<String>,
|
||||
}
|
||||
|
||||
impl WriteBatch for MasNewCompatSession {
|
||||
async fn write_batch(conn: &mut PgConnection, batch: Vec<Self>) -> Result<(), Error> {
|
||||
let mut session_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
|
||||
let mut user_ids: Vec<Uuid> = Vec::with_capacity(batch.len());
|
||||
let mut device_ids: Vec<Option<String>> = Vec::with_capacity(batch.len());
|
||||
let mut human_names: Vec<Option<String>> = Vec::with_capacity(batch.len());
|
||||
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(batch.len());
|
||||
let mut is_synapse_admins: Vec<bool> = Vec::with_capacity(batch.len());
|
||||
let mut last_active_ats: Vec<Option<DateTime<Utc>>> = Vec::with_capacity(batch.len());
|
||||
let mut last_active_ips: Vec<Option<IpAddr>> = Vec::with_capacity(batch.len());
|
||||
let mut user_agents: Vec<Option<String>> = Vec::with_capacity(batch.len());
|
||||
|
||||
for MasNewCompatSession {
|
||||
session_id,
|
||||
user_id,
|
||||
device_id,
|
||||
human_name,
|
||||
created_at,
|
||||
is_synapse_admin,
|
||||
last_active_at,
|
||||
last_active_ip,
|
||||
user_agent,
|
||||
} in batch
|
||||
{
|
||||
session_ids.push(session_id);
|
||||
user_ids.push(user_id.get());
|
||||
device_ids.push(device_id);
|
||||
human_names.push(human_name);
|
||||
created_ats.push(created_at);
|
||||
is_synapse_admins.push(is_synapse_admin);
|
||||
last_active_ats.push(last_active_at);
|
||||
last_active_ips.push(last_active_ip);
|
||||
user_agents.push(user_agent);
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO syn2mas__compat_sessions (
|
||||
compat_session_id, user_id,
|
||||
device_id, human_name,
|
||||
created_at, is_synapse_admin,
|
||||
last_active_at, last_active_ip,
|
||||
user_agent)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::UUID[], $2::UUID[],
|
||||
$3::TEXT[], $4::TEXT[],
|
||||
$5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],
|
||||
$7::TIMESTAMP WITH TIME ZONE[], $8::INET[],
|
||||
$9::TEXT[])
|
||||
"#,
|
||||
&session_ids[..],
|
||||
&user_ids[..],
|
||||
&device_ids[..] as &[Option<String>],
|
||||
&human_names[..] as &[Option<String>],
|
||||
&created_ats[..],
|
||||
&is_synapse_admins[..],
|
||||
// We need to override the typing for arrays of optionals (sqlx limitation)
|
||||
&last_active_ats[..] as &[Option<DateTime<Utc>>],
|
||||
&last_active_ips[..] as &[Option<IpAddr>],
|
||||
&user_agents[..] as &[Option<String>],
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.into_database("writing compat sessions to MAS")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MasNewCompatAccessToken {
|
||||
pub token_id: Uuid,
|
||||
pub session_id: Uuid,
|
||||
@@ -949,70 +1018,7 @@ impl MasWriter {
|
||||
self.writer_pool
|
||||
.spawn_with_connection(move |conn| {
|
||||
Box::pin(async move {
|
||||
let mut session_ids: Vec<Uuid> = Vec::with_capacity(sessions.len());
|
||||
let mut user_ids: Vec<Uuid> = Vec::with_capacity(sessions.len());
|
||||
let mut device_ids: Vec<Option<String>> = Vec::with_capacity(sessions.len());
|
||||
let mut human_names: Vec<Option<String>> = Vec::with_capacity(sessions.len());
|
||||
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(sessions.len());
|
||||
let mut is_synapse_admins: Vec<bool> = Vec::with_capacity(sessions.len());
|
||||
let mut last_active_ats: Vec<Option<DateTime<Utc>>> =
|
||||
Vec::with_capacity(sessions.len());
|
||||
let mut last_active_ips: Vec<Option<IpAddr>> =
|
||||
Vec::with_capacity(sessions.len());
|
||||
let mut user_agents: Vec<Option<String>> = Vec::with_capacity(sessions.len());
|
||||
|
||||
for MasNewCompatSession {
|
||||
session_id,
|
||||
user_id,
|
||||
device_id,
|
||||
human_name,
|
||||
created_at,
|
||||
is_synapse_admin,
|
||||
last_active_at,
|
||||
last_active_ip,
|
||||
user_agent,
|
||||
} in sessions
|
||||
{
|
||||
session_ids.push(session_id);
|
||||
user_ids.push(user_id.get());
|
||||
device_ids.push(device_id);
|
||||
human_names.push(human_name);
|
||||
created_ats.push(created_at);
|
||||
is_synapse_admins.push(is_synapse_admin);
|
||||
last_active_ats.push(last_active_at);
|
||||
last_active_ips.push(last_active_ip);
|
||||
user_agents.push(user_agent);
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO syn2mas__compat_sessions (
|
||||
compat_session_id, user_id,
|
||||
device_id, human_name,
|
||||
created_at, is_synapse_admin,
|
||||
last_active_at, last_active_ip,
|
||||
user_agent)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::UUID[], $2::UUID[],
|
||||
$3::TEXT[], $4::TEXT[],
|
||||
$5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],
|
||||
$7::TIMESTAMP WITH TIME ZONE[], $8::INET[],
|
||||
$9::TEXT[])
|
||||
"#,
|
||||
&session_ids[..],
|
||||
&user_ids[..],
|
||||
&device_ids[..] as &[Option<String>],
|
||||
&human_names[..] as &[Option<String>],
|
||||
&created_ats[..],
|
||||
&is_synapse_admins[..],
|
||||
// We need to override the typing for arrays of optionals (sqlx limitation)
|
||||
&last_active_ats[..] as &[Option<DateTime<Utc>>],
|
||||
&last_active_ips[..] as &[Option<IpAddr>],
|
||||
&user_agents[..] as &[Option<String>],
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.into_database("writing compat sessions to MAS")?;
|
||||
MasNewCompatSession::write_batch(conn, sessions).await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user