syn2mas: refactor the metrics logic in the progress module

We don't need to carry around the various meters. Just make them global.
This commit is contained in:
Quentin Gliech
2025-04-18 11:32:26 +02:00
parent bb6a6e8081
commit 35cd982e6f
3 changed files with 126 additions and 180 deletions

View File

@@ -17,7 +17,6 @@ use chrono::{DateTime, Utc};
use compact_str::CompactString;
use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _};
use mas_storage::Clock;
use opentelemetry::{KeyValue, metrics::Counter};
use rand::{RngCore, SeedableRng};
use thiserror::Error;
use thiserror_ext::ContextInto;
@@ -33,16 +32,11 @@ use crate::{
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword, MasWriteBuffer, MasWriter,
},
progress::Progress,
progress::{EntityType, Progress},
synapse_reader::{
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
},
telemetry::{
K_ENTITY, METER, V_ENTITY_DEVICES, V_ENTITY_EXTERNAL_IDS,
V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, V_ENTITY_REFRESHABLE_TOKEN_PAIRS,
V_ENTITY_THREEPIDS, V_ENTITY_USERS,
},
};
#[derive(Debug, Error, ContextInto)]
@@ -146,7 +140,7 @@ struct MigrationState {
///
/// - An underlying database access error, either to MAS or to Synapse.
/// - Invalid data in the Synapse database.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
#[allow(clippy::implicit_hasher)]
pub async fn migrate(
mut synapse: SynapseReader<'_>,
mas: MasWriter,
@@ -158,49 +152,6 @@ pub async fn migrate(
) -> Result<(), Error> {
let counts = synapse.count_rows().await.into_synapse("counting users")?;
let approx_total_counter = METER
.u64_counter("syn2mas.entity.approx_total")
.with_description("Approximate number of entities of this type to be migrated")
.build();
let migrated_otel_counter = METER
.u64_counter("syn2mas.entity.migrated")
.with_description("Number of entities of this type that have been migrated so far")
.build();
let skipped_otel_counter = METER
.u64_counter("syn2mas.entity.skipped")
.with_description("Number of entities of this type that have been skipped so far")
.build();
approx_total_counter.add(
counts.users as u64,
&[KeyValue::new(K_ENTITY, V_ENTITY_USERS)],
);
approx_total_counter.add(
counts.devices as u64,
&[KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)],
);
approx_total_counter.add(
counts.threepids as u64,
&[KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)],
);
approx_total_counter.add(
counts.external_ids as u64,
&[KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)],
);
// assume 1 refreshable access token per refresh token.
let approx_nonrefreshable_access_tokens = counts.access_tokens - counts.refresh_tokens;
approx_total_counter.add(
approx_nonrefreshable_access_tokens as u64,
&[KeyValue::new(
K_ENTITY,
V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
)],
);
approx_total_counter.add(
counts.refresh_tokens as u64,
&[KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)],
);
let state = MigrationState {
server_name,
// We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid
@@ -213,83 +164,32 @@ pub async fn migrate(
provider_id_mapping,
};
let progress_counter = progress.migrating_data(V_ENTITY_USERS, counts.users);
let (mas, state) = migrate_users(
&mut synapse,
mas,
state,
rng,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let progress_counter = progress.migrating_data(EntityType::Users, counts.users);
let (mas, state) = migrate_users(&mut synapse, mas, state, rng, progress_counter).await?;
let progress_counter = progress.migrating_data(V_ENTITY_THREEPIDS, counts.threepids);
let (mas, state) = migrate_threepids(
&mut synapse,
mas,
rng,
state,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let progress_counter = progress.migrating_data(EntityType::ThreePids, counts.threepids);
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, progress_counter).await?;
let progress_counter = progress.migrating_data(V_ENTITY_EXTERNAL_IDS, counts.external_ids);
let (mas, state) = migrate_external_ids(
&mut synapse,
mas,
rng,
state,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let progress_counter = progress.migrating_data(EntityType::ExternalIds, counts.external_ids);
let (mas, state) =
migrate_external_ids(&mut synapse, mas, rng, state, progress_counter).await?;
let progress_counter = progress.migrating_data(
V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
EntityType::NonRefreshableAccessTokens,
counts.access_tokens - counts.refresh_tokens,
);
let (mas, state) = migrate_unrefreshable_access_tokens(
&mut synapse,
mas,
clock,
rng,
state,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let (mas, state) =
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, progress_counter)
.await?;
let progress_counter =
progress.migrating_data(V_ENTITY_REFRESHABLE_TOKEN_PAIRS, counts.refresh_tokens);
let (mas, state) = migrate_refreshable_token_pairs(
&mut synapse,
mas,
clock,
rng,
state,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
progress.migrating_data(EntityType::RefreshableTokens, counts.refresh_tokens);
let (mas, state) =
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, progress_counter)
.await?;
let progress_counter = progress.migrating_data("devices", counts.devices);
let (mas, _state) = migrate_devices(
&mut synapse,
mas,
rng,
state,
progress_counter,
migrated_otel_counter.clone(),
skipped_otel_counter.clone(),
)
.await?;
let progress_counter = progress.migrating_data(EntityType::Devices, counts.devices);
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, progress_counter).await?;
synapse
.finish()
@@ -310,11 +210,8 @@ async fn migrate_users(
mut state: MigrationState,
rng: &mut impl RngCore,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)];
let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(10 * 1024 * 1024);
@@ -356,7 +253,6 @@ async fn migrate_users(
if user.appservice_id.is_some() {
flags |= UserFlags::IS_APPSERVICE;
skipped_otel_counter.add(1, &otel_kv);
progress_counter.increment_skipped();
// Special case for appservice users: we don't insert them into the database
@@ -391,7 +287,6 @@ async fn migrate_users(
.into_mas("writing password")?;
}
migrated_otel_counter.add(1, &otel_kv);
progress_counter.increment_migrated();
}
@@ -437,11 +332,8 @@ async fn migrate_threepids(
rng: &mut impl RngCore,
state: MigrationState,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)];
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
@@ -469,7 +361,6 @@ async fn migrate_threepids(
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -504,7 +395,6 @@ async fn migrate_threepids(
.into_mas("writing unsupported threepid")?;
}
migrated_otel_counter.add(1, &otel_kv);
progress_counter.increment_migrated();
}
@@ -536,11 +426,8 @@ async fn migrate_external_ids(
rng: &mut impl RngCore,
state: MigrationState,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)];
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
let mut extids_stream = pin!(synapse.read_user_external_ids());
@@ -564,7 +451,6 @@ async fn migrate_external_ids(
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -595,7 +481,6 @@ async fn migrate_external_ids(
.await
.into_mas("failed to write upstream link")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.increment_migrated();
}
@@ -627,11 +512,8 @@ async fn migrate_devices(
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)];
let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
@@ -664,7 +546,6 @@ async fn migrate_devices(
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -721,7 +602,6 @@ async fn migrate_devices(
.await
.into_mas("writing compat sessions")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.increment_migrated();
}
@@ -759,7 +639,6 @@ async fn migrate_devices(
/// Migrates unrefreshable access tokens (those without an associated refresh
/// token). Some of these may be deviceless.
#[tracing::instrument(skip_all, level = Level::INFO)]
#[allow(clippy::too_many_arguments)]
async fn migrate_unrefreshable_access_tokens(
synapse: &mut SynapseReader<'_>,
mut mas: MasWriter,
@@ -767,14 +646,8 @@ async fn migrate_unrefreshable_access_tokens(
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(
K_ENTITY,
V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
)];
let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
@@ -809,7 +682,6 @@ async fn migrate_unrefreshable_access_tokens(
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -818,7 +690,6 @@ async fn migrate_unrefreshable_access_tokens(
|| user_infos.flags.is_appservice()
{
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
}
@@ -879,7 +750,6 @@ async fn migrate_unrefreshable_access_tokens(
.await
.into_mas("writing compat access tokens")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.increment_migrated();
}
write_buffer
@@ -920,7 +790,6 @@ async fn migrate_unrefreshable_access_tokens(
/// Migrates (access token, refresh token) pairs.
/// Does not migrate non-refreshable access tokens.
#[tracing::instrument(skip_all, level = Level::INFO)]
#[allow(clippy::too_many_arguments)]
async fn migrate_refreshable_token_pairs(
synapse: &mut SynapseReader<'_>,
mut mas: MasWriter,
@@ -928,11 +797,8 @@ async fn migrate_refreshable_token_pairs(
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: ProgressCounter,
migrated_otel_counter: Counter<u64>,
skipped_otel_counter: Counter<u64>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)];
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
let mut access_token_write_buffer =
@@ -963,7 +829,6 @@ async fn migrate_refreshable_token_pairs(
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
};
@@ -972,7 +837,6 @@ async fn migrate_refreshable_token_pairs(
|| user_infos.flags.is_appservice()
{
progress_counter.increment_skipped();
skipped_otel_counter.add(1, &otel_kv);
continue;
}
@@ -1017,7 +881,6 @@ async fn migrate_refreshable_token_pairs(
.await
.into_mas("writing compat refresh tokens")?;
migrated_otel_counter.add(1, &otel_kv);
progress_counter.increment_migrated();
}

View File

@@ -1,6 +1,89 @@
use std::sync::{Arc, atomic::AtomicU32};
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use std::sync::{Arc, LazyLock, atomic::AtomicU32};
use arc_swap::ArcSwap;
use opentelemetry::{
KeyValue,
metrics::{Counter, Gauge},
};
use crate::telemetry::METER;
/// A gauge that tracks the approximate number of entities of a given type
/// that will be migrated.
pub static APPROX_TOTAL_GAUGE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
METER
.u64_gauge("syn2mas.entity.approx_total")
.with_description("Approximate number of entities of this type to be migrated")
.build()
});
/// A counter that tracks the number of entities of a given type that have
/// been migrated so far.
pub static MIGRATED_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
METER
.u64_counter("syn2mas.entity.migrated")
.with_description("Number of entities of this type that have been migrated so far")
.build()
});
/// A counter that tracks the number of entities of a given type that have
/// been skipped so far.
pub static SKIPPED_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
METER
.u64_counter("syn2mas.entity.skipped")
.with_description("Number of entities of this type that have been skipped so far")
.build()
});
/// Enum representing the different types of entities that syn2mas can migrate.
#[derive(Debug, Clone, Copy)]
pub enum EntityType {
/// Represents users
Users,
/// Represents devices
Devices,
/// Represents third-party IDs
ThreePids,
/// Represents external IDs
ExternalIds,
/// Represents non-refreshable access tokens
NonRefreshableAccessTokens,
/// Represents refreshable access tokens
RefreshableTokens,
}
impl std::fmt::Display for EntityType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl EntityType {
pub const fn name(self) -> &'static str {
match self {
Self::Users => "users",
Self::Devices => "devices",
Self::ThreePids => "threepids",
Self::ExternalIds => "external_ids",
Self::NonRefreshableAccessTokens => "nonrefreshable_access_tokens",
Self::RefreshableTokens => "refreshable_tokens",
}
}
pub fn as_kv(self) -> KeyValue {
KeyValue::new("entity", self.name())
}
}
/// Tracker for the progress of the migration
///
@@ -11,25 +94,37 @@ pub struct Progress {
current_stage: Arc<ArcSwap<ProgressStage>>,
}
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct ProgressCounter {
inner: Arc<ProgressCounterInner>,
}
#[derive(Default)]
struct ProgressCounterInner {
kv: [KeyValue; 1],
migrated: AtomicU32,
skipped: AtomicU32,
}
impl ProgressCounter {
fn new(entity: EntityType) -> Self {
Self {
inner: Arc::new(ProgressCounterInner {
kv: [entity.as_kv()],
migrated: AtomicU32::new(0),
skipped: AtomicU32::new(0),
}),
}
}
pub fn increment_migrated(&self) {
MIGRATED_COUNTER.add(1, &self.inner.kv);
self.inner
.migrated
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_skipped(&self) {
SKIPPED_COUNTER.add(1, &self.inner.kv);
self.inner
.skipped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@@ -52,8 +147,9 @@ impl ProgressCounter {
impl Progress {
#[must_use]
pub fn migrating_data(&self, entity: &'static str, approx_count: usize) -> ProgressCounter {
let counter = ProgressCounter::default();
pub fn migrating_data(&self, entity: EntityType, approx_count: usize) -> ProgressCounter {
let counter = ProgressCounter::new(entity);
APPROX_TOTAL_GAUGE.record(approx_count as u64, &[entity.as_kv()]);
self.set_current_stage(ProgressStage::MigratingData {
entity,
counter: counter.clone(),
@@ -99,7 +195,7 @@ impl Default for Progress {
pub enum ProgressStage {
SettingUp,
MigratingData {
entity: &'static str,
entity: EntityType,
counter: ProgressCounter,
approx_count: u64,
},

View File

@@ -1,3 +1,8 @@
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use std::sync::LazyLock;
use opentelemetry::{InstrumentationScope, metrics::Meter};
@@ -12,21 +17,3 @@ static SCOPE: LazyLock<InstrumentationScope> = LazyLock::new(|| {
pub static METER: LazyLock<Meter> =
LazyLock::new(|| opentelemetry::global::meter_with_scope(SCOPE.clone()));
/// Attribute key for syn2mas.entity metrics representing what entity.
pub const K_ENTITY: &str = "entity";
/// Attribute value for syn2mas.entity metrics representing users.
pub const V_ENTITY_USERS: &str = "users";
/// Attribute value for syn2mas.entity metrics representing devices.
pub const V_ENTITY_DEVICES: &str = "devices";
/// Attribute value for syn2mas.entity metrics representing threepids.
pub const V_ENTITY_THREEPIDS: &str = "threepids";
/// Attribute value for syn2mas.entity metrics representing external IDs.
pub const V_ENTITY_EXTERNAL_IDS: &str = "external_ids";
/// Attribute value for syn2mas.entity metrics representing non-refreshable
/// access token entities.
pub const V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS: &str = "nonrefreshable_access_tokens";
/// Attribute value for syn2mas.entity metrics representing refreshable
/// access/refresh token pairs.
pub const V_ENTITY_REFRESHABLE_TOKEN_PAIRS: &str = "refreshable_token_pairs";