From d1d73af3ce6f930cd22a7e6e76df47bdd579a8bd Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 7 Feb 2025 12:57:21 +0100 Subject: [PATCH] Remove the lifetime parameter from MasWriter --- crates/syn2mas/src/mas_writer/mod.rs | 30 +++++++++++++--------------- crates/syn2mas/src/migration.rs | 14 ++++++------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index a28821e31..e5260ad4f 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -185,10 +185,8 @@ impl WriterConnectionPool { } } -pub struct MasWriter<'c> { +pub struct MasWriter { conn: LockedMasDatabase, - // Temporary phantom data, so that we don't remove the lifetime parameter yet - phantom: std::marker::PhantomData<&'c ()>, writer_pool: WriterConnectionPool, indices_to_restore: Vec, @@ -326,7 +324,7 @@ pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result { +impl MasWriter { /// Creates a new MAS writer. /// /// # Errors @@ -448,7 +446,7 @@ impl MasWriter<'_> { Ok(Self { conn, - phantom: std::marker::PhantomData, + writer_pool: WriterConnectionPool::new(writer_connections), indices_to_restore, constraints_to_restore, @@ -1027,8 +1025,8 @@ const WRITE_BUFFER_BATCH_SIZE: usize = 4096; /// A function that can accept and flush buffers from a `MasWriteBuffer`. /// Intended uses are the methods on `MasWriter` such as `write_users`. -type WriteBufferFlusher<'conn, T> = - for<'a> fn(&'a mut MasWriter<'conn>, Vec) -> BoxFuture<'a, Result<(), Error>>; +type WriteBufferFlusher = + for<'a> fn(&'a mut MasWriter, Vec) -> BoxFuture<'a, Result<(), Error>>; /// A buffer for writing rows to the MAS database. /// Generic over the type of rows. @@ -1036,14 +1034,14 @@ type WriteBufferFlusher<'conn, T> = /// # Panics /// /// Panics if dropped before `finish()` has been called. -pub struct MasWriteBuffer<'conn, T> { +pub struct MasWriteBuffer { rows: Vec, - flusher: WriteBufferFlusher<'conn, T>, + flusher: WriteBufferFlusher, finished: bool, } -impl<'conn, T> MasWriteBuffer<'conn, T> { - pub fn new(flusher: WriteBufferFlusher<'conn, T>) -> Self { +impl MasWriteBuffer { + pub fn new(flusher: WriteBufferFlusher) -> Self { MasWriteBuffer { rows: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), flusher, @@ -1051,13 +1049,13 @@ impl<'conn, T> MasWriteBuffer<'conn, T> { } } - pub async fn finish(mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> { + pub async fn finish(mut self, writer: &mut MasWriter) -> Result<(), Error> { self.finished = true; self.flush(writer).await?; Ok(()) } - pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> { + pub async fn flush(&mut self, writer: &mut MasWriter) -> Result<(), Error> { if self.rows.is_empty() { return Ok(()); } @@ -1067,7 +1065,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> { Ok(()) } - pub async fn write(&mut self, writer: &mut MasWriter<'conn>, row: T) -> Result<(), Error> { + pub async fn write(&mut self, writer: &mut MasWriter, row: T) -> Result<(), Error> { self.rows.push(row); if self.rows.len() >= WRITE_BUFFER_BATCH_SIZE { self.flush(writer).await?; @@ -1076,7 +1074,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> { } } -impl Drop for MasWriteBuffer<'_, T> { +impl Drop for MasWriteBuffer { fn drop(&mut self) { assert!(self.finished, "MasWriteBuffer dropped but not finished!"); } @@ -1185,7 +1183,7 @@ mod test { /// Runs some code with a `MasWriter`. /// /// The callback is responsible for `finish`ing the `MasWriter`. - async fn make_mas_writer(pool: &PgPool) -> MasWriter<'static> { + async fn make_mas_writer(pool: &PgPool) -> MasWriter { let main_conn = pool.acquire().await.unwrap().detach(); let mut writer_conns = Vec::new(); for _ in 0..2 { diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index f46586c03..18d6746be 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -92,7 +92,7 @@ struct UsersMigrated { #[allow(clippy::implicit_hasher)] pub async fn migrate( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, server_name: &str, clock: &dyn Clock, rng: &mut impl RngCore, @@ -179,7 +179,7 @@ pub async fn migrate( #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_users( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, user_count_hint: usize, server_name: &str, rng: &mut impl RngCore, @@ -232,7 +232,7 @@ async fn migrate_users( #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_threepids( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, server_name: &str, rng: &mut impl RngCore, user_localparts_to_uuid: &HashMap, @@ -315,7 +315,7 @@ async fn migrate_threepids( #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_external_ids( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, server_name: &str, rng: &mut impl RngCore, user_localparts_to_uuid: &HashMap, @@ -391,7 +391,7 @@ async fn migrate_external_ids( #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_devices( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, server_name: &str, rng: &mut impl RngCore, user_localparts_to_uuid: &HashMap, @@ -483,7 +483,7 @@ async fn migrate_devices( #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_unrefreshable_access_tokens( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, server_name: &str, clock: &dyn Clock, rng: &mut impl RngCore, @@ -591,7 +591,7 @@ async fn migrate_unrefreshable_access_tokens( #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_refreshable_token_pairs( synapse: &mut SynapseReader<'_>, - mas: &mut MasWriter<'_>, + mas: &mut MasWriter, server_name: &str, clock: &dyn Clock, rng: &mut impl RngCore,