Track skipped entities in the syn2mas progress

This commit is contained in:
Quentin Gliech
2025-03-14 14:07:57 +01:00
committed by Olivier 'reivilibre
parent 0c28e1a389
commit d7bd50f12b
5 changed files with 173 additions and 83 deletions

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, process::ExitCode, sync::atomic::Ordering, time::Duration};
use std::{collections::HashMap, process::ExitCode, time::Duration};
use anyhow::Context;
use camino::Utf8PathBuf;
@@ -289,13 +289,14 @@ async fn occasional_progress_logger(progress: Progress) {
}
ProgressStage::MigratingData {
entity,
migrated,
counter,
approx_count,
} => {
let migrated = migrated.load(Ordering::Relaxed);
let migrated = counter.migrated();
let skipped = counter.skipped();
#[allow(clippy::cast_precision_loss)]
let percent = (f64::from(migrated) / *approx_count as f64) * 100.0;
info!(name: "progress", "migrating {entity}: {migrated}/~{approx_count} (~{percent:.1}%)");
let percent = (f64::from(migrated + skipped) / *approx_count as f64) * 100.0;
info!(name: "progress", "migrating {entity}: {migrated} ({skipped} skipped) /~{approx_count} (~{percent:.1}%)");
}
ProgressStage::RebuildIndex { index_name } => {
info!(name: "progress", "still waiting for rebuild of index {index_name}");

View File

@@ -16,7 +16,7 @@ type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;
pub use self::{
mas_writer::{MasWriter, checks::mas_pre_migration_checks, locking::LockedMasDatabase},
migration::migrate,
progress::{Progress, ProgressStage},
progress::{Progress, ProgressCounter, ProgressStage},
synapse_reader::{
SynapseReader,
checks::{

View File

@@ -29,6 +29,7 @@ use self::{
constraint_pausing::{ConstraintDescription, IndexDescription},
locking::LockedMasDatabase,
};
use crate::Progress;
pub mod checks;
pub mod locking;
@@ -550,16 +551,19 @@ impl MasWriter {
conn: &mut LockedMasDatabase,
indices_to_restore: &[IndexDescription],
constraints_to_restore: &[ConstraintDescription],
progress: &Progress,
) -> Result<(), Error> {
// First restore all indices. The order is not important as far as I know.
// However the indices are needed before constraints.
for index in indices_to_restore.iter().rev() {
progress.rebuild_index(index.name.clone());
constraint_pausing::restore_index(conn.as_mut(), index).await?;
}
// Then restore all constraints.
// The order here is the reverse of drop order, since some constraints may rely
// on other constraints to work.
for constraint in constraints_to_restore.iter().rev() {
progress.rebuild_constraint(constraint.name.clone());
constraint_pausing::restore_constraint(conn.as_mut(), constraint).await?;
}
Ok(())
@@ -574,7 +578,7 @@ impl MasWriter {
///
/// - If the database connection experiences an error.
#[tracing::instrument(skip_all)]
pub async fn finish(mut self) -> Result<PgConnection, Error> {
pub async fn finish(mut self, progress: &Progress) -> Result<PgConnection, Error> {
self.write_buffer_finish_checker.check_all_finished()?;
// Commit all writer transactions to the database.
@@ -595,6 +599,7 @@ impl MasWriter {
&mut self.conn,
&self.indices_to_restore,
&self.constraints_to_restore,
progress,
)
.await?;
@@ -1148,7 +1153,7 @@ mod test {
use uuid::{NonNilUuid, Uuid};
use crate::{
LockedMasDatabase, MasWriter,
LockedMasDatabase, MasWriter, Progress,
mas_writer::{
MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
@@ -1278,7 +1283,10 @@ mod test {
.await
.expect("failed to write user");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1312,7 +1320,10 @@ mod test {
.await
.expect("failed to write password");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1345,7 +1356,10 @@ mod test {
.await
.expect("failed to write e-mail");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1379,7 +1393,10 @@ mod test {
.await
.expect("failed to write phone number (unsupported threepid)");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1415,7 +1432,10 @@ mod test {
.await
.expect("failed to write link");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1453,7 +1473,10 @@ mod test {
.await
.expect("failed to write compat session");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1502,7 +1525,10 @@ mod test {
.await
.expect("failed to write access token");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}
@@ -1563,7 +1589,10 @@ mod test {
.await
.expect("failed to write refresh token");
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
let mut conn = writer
.finish(&Progress::default())
.await
.expect("failed to finish MasWriter");
assert_db_snapshot!(&mut conn);
}

View File

@@ -11,14 +11,7 @@
//! This module does not implement any of the safety checks that should be run
//! *before* the migration.
use std::{
pin::pin,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::Instant,
};
use std::{pin::pin, time::Instant};
use chrono::{DateTime, Utc};
use compact_str::CompactString;
@@ -34,13 +27,13 @@ use ulid::Ulid;
use uuid::{NonNilUuid, Uuid};
use crate::{
HashMap, RandomState, SynapseReader,
HashMap, ProgressCounter, RandomState, SynapseReader,
mas_writer::{
self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword, MasWriteBuffer, MasWriter,
},
progress::{Progress, ProgressStage},
progress::Progress,
synapse_reader::{
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -173,6 +166,10 @@ pub async fn migrate(
.u64_counter("syn2mas.entity.migrated")
.with_description("Number of entities of this type that have been migrated so far")
.build();
let skipped_otel_counter = METER
.u64_counter("syn2mas.entity.skipped")
.with_description("Number of entities of this type that have been skipped so far")
.build();
approx_total_counter.add(
counts.users as u64,
@@ -216,101 +213,81 @@ pub async fn migrate(
provider_id_mapping,
};
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: V_ENTITY_USERS,
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let progress_counter = progress.migrating_data(V_ENTITY_USERS, counts.users);
let (mas, state) = migrate_users(
&mut synapse,
mas,
state,
rng,
migrated_counter,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: V_ENTITY_THREEPIDS,
migrated: migrated_counter.clone(),
approx_count: counts.threepids as u64,
});
let progress_counter = progress.migrating_data(V_ENTITY_THREEPIDS, counts.threepids);
let (mas, state) = migrate_threepids(
&mut synapse,
mas,
rng,
state,
&migrated_counter,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: V_ENTITY_EXTERNAL_IDS,
migrated: migrated_counter.clone(),
approx_count: counts.external_ids as u64,
});
let progress_counter = progress.migrating_data(V_ENTITY_EXTERNAL_IDS, counts.external_ids);
let (mas, state) = migrate_external_ids(
&mut synapse,
mas,
rng,
state,
&migrated_counter,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
migrated: migrated_counter.clone(),
approx_count: (counts.access_tokens - counts.refresh_tokens) as u64,
});
let progress_counter = progress.migrating_data(
V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
counts.access_tokens - counts.refresh_tokens,
);
let (mas, state) = migrate_unrefreshable_access_tokens(
&mut synapse,
mas,
clock,
rng,
state,
migrated_counter,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: V_ENTITY_REFRESHABLE_TOKEN_PAIRS,
migrated: migrated_counter.clone(),
approx_count: counts.refresh_tokens as u64,
});
let progress_counter =
progress.migrating_data(V_ENTITY_REFRESHABLE_TOKEN_PAIRS, counts.refresh_tokens);
let (mas, state) = migrate_refreshable_token_pairs(
&mut synapse,
mas,
clock,
rng,
state,
&migrated_counter,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "devices",
migrated: migrated_counter.clone(),
approx_count: counts.devices as u64,
});
let progress_counter = progress.migrating_data("devices", counts.devices);
let (mas, _state) = migrate_devices(
&mut synapse,
mas,
rng,
state,
migrated_counter,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
@@ -319,7 +296,7 @@ pub async fn migrate(
.await
.into_synapse("failed to close Synapse reader")?;
mas.finish()
mas.finish(progress)
.await
.into_mas("failed to finalise MAS database")?;
@@ -332,8 +309,9 @@ async fn migrate_users(
mut mas: MasWriter,
mut state: MigrationState,
rng: &mut impl RngCore,
progress_counter: Arc<AtomicU32>,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)];
@@ -376,6 +354,9 @@ async fn migrate_users(
if user.appservice_id.is_some() {
flags |= UserFlags::IS_APPSERVICE;
skipped_otel_counter.add(1, &otel_kv);
progress_counter.increment_skipped();
// Special case for appservice users: we don't insert them into the database
// We just record the user's information in the state and continue
state.users.insert(
@@ -409,7 +390,7 @@ async fn migrate_users(
}
migrated_otel_counter.add(1, &otel_kv);
progress_counter.fetch_add(1, Ordering::Relaxed);
progress_counter.increment_migrated();
}
user_buffer
@@ -453,8 +434,9 @@ async fn migrate_threepids(
mut mas: MasWriter,
rng: &mut impl RngCore,
state: MigrationState,
progress_counter: &AtomicU32,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)];
@@ -484,6 +466,8 @@ async fn migrate_threepids(
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -519,7 +503,7 @@ async fn migrate_threepids(
}
migrated_otel_counter.add(1, &otel_kv);
progress_counter.fetch_add(1, Ordering::Relaxed);
progress_counter.increment_migrated();
}
email_buffer
@@ -549,8 +533,9 @@ async fn migrate_external_ids(
mut mas: MasWriter,
rng: &mut impl RngCore,
state: MigrationState,
progress_counter: &AtomicU32,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)];
@@ -576,6 +561,8 @@ async fn migrate_external_ids(
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -607,7 +594,7 @@ async fn migrate_external_ids(
.into_mas("failed to write upstream link")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.fetch_add(1, Ordering::Relaxed);
progress_counter.increment_migrated();
}
write_buffer
@@ -637,8 +624,9 @@ async fn migrate_devices(
mut mas: MasWriter,
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: Arc<AtomicU32>,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)];
@@ -671,6 +659,8 @@ async fn migrate_devices(
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -729,7 +719,7 @@ async fn migrate_devices(
.into_mas("writing compat sessions")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.fetch_add(1, Ordering::Relaxed);
progress_counter.increment_migrated();
}
write_buffer
@@ -766,14 +756,16 @@ async fn migrate_devices(
/// Migrates unrefreshable access tokens (those without an associated refresh
/// token). Some of these may be deviceless.
#[tracing::instrument(skip_all, level = Level::INFO)]
#[allow(clippy::too_many_arguments)]
async fn migrate_unrefreshable_access_tokens(
synapse: &mut SynapseReader<'_>,
mut mas: MasWriter,
clock: &dyn Clock,
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: Arc<AtomicU32>,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(
@@ -811,6 +803,8 @@ async fn migrate_unrefreshable_access_tokens(
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -818,6 +812,8 @@ async fn migrate_unrefreshable_access_tokens(
|| user_infos.flags.is_guest()
|| user_infos.flags.is_appservice()
{
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
}
@@ -879,7 +875,7 @@ async fn migrate_unrefreshable_access_tokens(
.into_mas("writing compat access tokens")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.fetch_add(1, Ordering::Relaxed);
progress_counter.increment_migrated();
}
write_buffer
.finish(&mut mas)
@@ -919,14 +915,16 @@ async fn migrate_unrefreshable_access_tokens(
/// Migrates (access token, refresh token) pairs.
/// Does not migrate non-refreshable access tokens.
#[tracing::instrument(skip_all, level = Level::INFO)]
#[allow(clippy::too_many_arguments)]
async fn migrate_refreshable_token_pairs(
synapse: &mut SynapseReader<'_>,
mut mas: MasWriter,
clock: &dyn Clock,
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: &AtomicU32,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)];
@@ -959,6 +957,8 @@ async fn migrate_refreshable_token_pairs(
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -966,6 +966,8 @@ async fn migrate_refreshable_token_pairs(
|| user_infos.flags.is_guest()
|| user_infos.flags.is_appservice()
{
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
}
@@ -1011,7 +1013,7 @@ async fn migrate_refreshable_token_pairs(
.into_mas("writing compat refresh tokens")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.fetch_add(1, Ordering::Relaxed);
progress_counter.increment_migrated();
}
access_token_write_buffer

View File

@@ -11,14 +11,72 @@ pub struct Progress {
current_stage: Arc<ArcSwap<ProgressStage>>,
}
#[derive(Clone, Default)]
pub struct ProgressCounter {
inner: Arc<ProgressCounterInner>,
}
#[derive(Default)]
struct ProgressCounterInner {
migrated: AtomicU32,
skipped: AtomicU32,
}
impl ProgressCounter {
pub fn increment_migrated(&self) {
self.inner
.migrated
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_skipped(&self) {
self.inner
.skipped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
#[must_use]
pub fn migrated(&self) -> u32 {
self.inner
.migrated
.load(std::sync::atomic::Ordering::Relaxed)
}
#[must_use]
pub fn skipped(&self) -> u32 {
self.inner
.skipped
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Progress {
#[must_use]
pub fn migrating_data(&self, entity: &'static str, approx_count: usize) -> ProgressCounter {
let counter = ProgressCounter::default();
self.set_current_stage(ProgressStage::MigratingData {
entity,
counter: counter.clone(),
approx_count: approx_count as u64,
});
counter
}
pub fn rebuild_index(&self, index_name: String) {
self.set_current_stage(ProgressStage::RebuildIndex { index_name });
}
pub fn rebuild_constraint(&self, constraint_name: String) {
self.set_current_stage(ProgressStage::RebuildConstraint { constraint_name });
}
/// Sets the current stage of progress.
///
/// This is probably not cheap enough to use for every individual row,
/// so use of atomic integers for the fields that will be updated is
/// recommended.
#[inline]
pub fn set_current_stage(&self, stage: ProgressStage) {
fn set_current_stage(&self, stage: ProgressStage) {
self.current_stage.store(Arc::new(stage));
}
@@ -42,7 +100,7 @@ pub enum ProgressStage {
SettingUp,
MigratingData {
entity: &'static str,
migrated: Arc<AtomicU32>,
counter: ProgressCounter,
approx_count: u64,
},
RebuildIndex {