Remove the lifetime parameter from MasWriter

This commit is contained in:
Quentin Gliech
2025-02-07 12:57:21 +01:00
parent 260834bc01
commit d1d73af3ce
2 changed files with 21 additions and 23 deletions

View File

@@ -185,10 +185,8 @@ impl WriterConnectionPool {
}
}
pub struct MasWriter<'c> {
pub struct MasWriter {
conn: LockedMasDatabase,
// Temporary phantom data, so that we don't remove the lifetime parameter yet
phantom: std::marker::PhantomData<&'c ()>,
writer_pool: WriterConnectionPool,
indices_to_restore: Vec<IndexDescription>,
@@ -326,7 +324,7 @@ pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result<bool, Err
}
}
impl MasWriter<'_> {
impl MasWriter {
/// Creates a new MAS writer.
///
/// # Errors
@@ -448,7 +446,7 @@ impl MasWriter<'_> {
Ok(Self {
conn,
phantom: std::marker::PhantomData,
writer_pool: WriterConnectionPool::new(writer_connections),
indices_to_restore,
constraints_to_restore,
@@ -1027,8 +1025,8 @@ const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
/// A function that can accept and flush buffers from a `MasWriteBuffer`.
/// Intended uses are the methods on `MasWriter` such as `write_users`.
type WriteBufferFlusher<'conn, T> =
for<'a> fn(&'a mut MasWriter<'conn>, Vec<T>) -> BoxFuture<'a, Result<(), Error>>;
type WriteBufferFlusher<T> =
for<'a> fn(&'a mut MasWriter, Vec<T>) -> BoxFuture<'a, Result<(), Error>>;
/// A buffer for writing rows to the MAS database.
/// Generic over the type of rows.
@@ -1036,14 +1034,14 @@ type WriteBufferFlusher<'conn, T> =
/// # Panics
///
/// Panics if dropped before `finish()` has been called.
pub struct MasWriteBuffer<'conn, T> {
pub struct MasWriteBuffer<T> {
rows: Vec<T>,
flusher: WriteBufferFlusher<'conn, T>,
flusher: WriteBufferFlusher<T>,
finished: bool,
}
impl<'conn, T> MasWriteBuffer<'conn, T> {
pub fn new(flusher: WriteBufferFlusher<'conn, T>) -> Self {
impl<T> MasWriteBuffer<T> {
pub fn new(flusher: WriteBufferFlusher<T>) -> Self {
MasWriteBuffer {
rows: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
flusher,
@@ -1051,13 +1049,13 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
}
}
pub async fn finish(mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
pub async fn finish(mut self, writer: &mut MasWriter) -> Result<(), Error> {
self.finished = true;
self.flush(writer).await?;
Ok(())
}
pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
pub async fn flush(&mut self, writer: &mut MasWriter) -> Result<(), Error> {
if self.rows.is_empty() {
return Ok(());
}
@@ -1067,7 +1065,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
Ok(())
}
pub async fn write(&mut self, writer: &mut MasWriter<'conn>, row: T) -> Result<(), Error> {
pub async fn write(&mut self, writer: &mut MasWriter, row: T) -> Result<(), Error> {
self.rows.push(row);
if self.rows.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush(writer).await?;
@@ -1076,7 +1074,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
}
}
impl<T> Drop for MasWriteBuffer<'_, T> {
impl<T> Drop for MasWriteBuffer<T> {
fn drop(&mut self) {
assert!(self.finished, "MasWriteBuffer dropped but not finished!");
}
@@ -1185,7 +1183,7 @@ mod test {
/// Runs some code with a `MasWriter`.
///
/// The callback is responsible for `finish`ing the `MasWriter`.
async fn make_mas_writer(pool: &PgPool) -> MasWriter<'static> {
async fn make_mas_writer(pool: &PgPool) -> MasWriter {
let main_conn = pool.acquire().await.unwrap().detach();
let mut writer_conns = Vec::new();
for _ in 0..2 {

View File

@@ -92,7 +92,7 @@ struct UsersMigrated {
#[allow(clippy::implicit_hasher)]
pub async fn migrate(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,
@@ -179,7 +179,7 @@ pub async fn migrate(
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_users(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
user_count_hint: usize,
server_name: &str,
rng: &mut impl RngCore,
@@ -232,7 +232,7 @@ async fn migrate_users(
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_threepids(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
server_name: &str,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
@@ -315,7 +315,7 @@ async fn migrate_threepids(
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_external_ids(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
server_name: &str,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
@@ -391,7 +391,7 @@ async fn migrate_external_ids(
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_devices(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
server_name: &str,
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
@@ -483,7 +483,7 @@ async fn migrate_devices(
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_unrefreshable_access_tokens(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,
@@ -591,7 +591,7 @@ async fn migrate_unrefreshable_access_tokens(
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_refreshable_token_pairs(
synapse: &mut SynapseReader<'_>,
mas: &mut MasWriter<'_>,
mas: &mut MasWriter,
server_name: &str,
clock: &dyn Clock,
rng: &mut impl RngCore,