From 35cd982e6f7b50ec08c22fe9067df4683aa77649 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 18 Apr 2025 11:32:26 +0200 Subject: [PATCH] syn2mas: refactor the metrics logic in the progress module We don't need to carry around the various meters. Just make them global. --- crates/syn2mas/src/migration.rs | 175 ++++---------------------------- crates/syn2mas/src/progress.rs | 108 ++++++++++++++++++-- crates/syn2mas/src/telemetry.rs | 23 +---- 3 files changed, 126 insertions(+), 180 deletions(-) diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index efefc25d7..37612d1d3 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -17,7 +17,6 @@ use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _}; use mas_storage::Clock; -use opentelemetry::{KeyValue, metrics::Counter}; use rand::{RngCore, SeedableRng}; use thiserror::Error; use thiserror_ext::ContextInto; @@ -33,16 +32,11 @@ use crate::{ MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, MasWriteBuffer, MasWriter, }, - progress::Progress, + progress::{EntityType, Progress}, synapse_reader::{ self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser, }, - telemetry::{ - K_ENTITY, METER, V_ENTITY_DEVICES, V_ENTITY_EXTERNAL_IDS, - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, V_ENTITY_REFRESHABLE_TOKEN_PAIRS, - V_ENTITY_THREEPIDS, V_ENTITY_USERS, - }, }; #[derive(Debug, Error, ContextInto)] @@ -146,7 +140,7 @@ struct MigrationState { /// /// - An underlying database access error, either to MAS or to Synapse. /// - Invalid data in the Synapse database. -#[allow(clippy::implicit_hasher, clippy::too_many_lines)] +#[allow(clippy::implicit_hasher)] pub async fn migrate( mut synapse: SynapseReader<'_>, mas: MasWriter, @@ -158,49 +152,6 @@ pub async fn migrate( ) -> Result<(), Error> { let counts = synapse.count_rows().await.into_synapse("counting users")?; - let approx_total_counter = METER - .u64_counter("syn2mas.entity.approx_total") - .with_description("Approximate number of entities of this type to be migrated") - .build(); - let migrated_otel_counter = METER - .u64_counter("syn2mas.entity.migrated") - .with_description("Number of entities of this type that have been migrated so far") - .build(); - let skipped_otel_counter = METER - .u64_counter("syn2mas.entity.skipped") - .with_description("Number of entities of this type that have been skipped so far") - .build(); - - approx_total_counter.add( - counts.users as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_USERS)], - ); - approx_total_counter.add( - counts.devices as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)], - ); - approx_total_counter.add( - counts.threepids as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)], - ); - approx_total_counter.add( - counts.external_ids as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)], - ); - // assume 1 refreshable access token per refresh token. - let approx_nonrefreshable_access_tokens = counts.access_tokens - counts.refresh_tokens; - approx_total_counter.add( - approx_nonrefreshable_access_tokens as u64, - &[KeyValue::new( - K_ENTITY, - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, - )], - ); - approx_total_counter.add( - counts.refresh_tokens as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)], - ); - let state = MigrationState { server_name, // We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid @@ -213,83 +164,32 @@ pub async fn migrate( provider_id_mapping, }; - let progress_counter = progress.migrating_data(V_ENTITY_USERS, counts.users); - let (mas, state) = migrate_users( - &mut synapse, - mas, - state, - rng, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let progress_counter = progress.migrating_data(EntityType::Users, counts.users); + let (mas, state) = migrate_users(&mut synapse, mas, state, rng, progress_counter).await?; - let progress_counter = progress.migrating_data(V_ENTITY_THREEPIDS, counts.threepids); - let (mas, state) = migrate_threepids( - &mut synapse, - mas, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let progress_counter = progress.migrating_data(EntityType::ThreePids, counts.threepids); + let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, progress_counter).await?; - let progress_counter = progress.migrating_data(V_ENTITY_EXTERNAL_IDS, counts.external_ids); - let (mas, state) = migrate_external_ids( - &mut synapse, - mas, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let progress_counter = progress.migrating_data(EntityType::ExternalIds, counts.external_ids); + let (mas, state) = + migrate_external_ids(&mut synapse, mas, rng, state, progress_counter).await?; let progress_counter = progress.migrating_data( - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, + EntityType::NonRefreshableAccessTokens, counts.access_tokens - counts.refresh_tokens, ); - let (mas, state) = migrate_unrefreshable_access_tokens( - &mut synapse, - mas, - clock, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let (mas, state) = + migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, progress_counter) + .await?; let progress_counter = - progress.migrating_data(V_ENTITY_REFRESHABLE_TOKEN_PAIRS, counts.refresh_tokens); - let (mas, state) = migrate_refreshable_token_pairs( - &mut synapse, - mas, - clock, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + progress.migrating_data(EntityType::RefreshableTokens, counts.refresh_tokens); + let (mas, state) = + migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, progress_counter) + .await?; - let progress_counter = progress.migrating_data("devices", counts.devices); - let (mas, _state) = migrate_devices( - &mut synapse, - mas, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let progress_counter = progress.migrating_data(EntityType::Devices, counts.devices); + let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, progress_counter).await?; synapse .finish() @@ -310,11 +210,8 @@ async fn migrate_users( mut state: MigrationState, rng: &mut impl RngCore, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)]; let (tx, mut rx) = tokio::sync::mpsc::channel::(10 * 1024 * 1024); @@ -356,7 +253,6 @@ async fn migrate_users( if user.appservice_id.is_some() { flags |= UserFlags::IS_APPSERVICE; - skipped_otel_counter.add(1, &otel_kv); progress_counter.increment_skipped(); // Special case for appservice users: we don't insert them into the database @@ -391,7 +287,6 @@ async fn migrate_users( .into_mas("writing password")?; } - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } @@ -437,11 +332,8 @@ async fn migrate_threepids( rng: &mut impl RngCore, state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)]; let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids); let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids); @@ -469,7 +361,6 @@ async fn migrate_threepids( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -504,7 +395,6 @@ async fn migrate_threepids( .into_mas("writing unsupported threepid")?; } - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } @@ -536,11 +426,8 @@ async fn migrate_external_ids( rng: &mut impl RngCore, state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)]; let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links); let mut extids_stream = pin!(synapse.read_user_external_ids()); @@ -564,7 +451,6 @@ async fn migrate_external_ids( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -595,7 +481,6 @@ async fn migrate_external_ids( .await .into_mas("failed to write upstream link")?; - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } @@ -627,11 +512,8 @@ async fn migrate_devices( rng: &mut impl RngCore, mut state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)]; let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024); @@ -664,7 +546,6 @@ async fn migrate_devices( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -721,7 +602,6 @@ async fn migrate_devices( .await .into_mas("writing compat sessions")?; - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } @@ -759,7 +639,6 @@ async fn migrate_devices( /// Migrates unrefreshable access tokens (those without an associated refresh /// token). Some of these may be deviceless. #[tracing::instrument(skip_all, level = Level::INFO)] -#[allow(clippy::too_many_arguments)] async fn migrate_unrefreshable_access_tokens( synapse: &mut SynapseReader<'_>, mut mas: MasWriter, @@ -767,14 +646,8 @@ async fn migrate_unrefreshable_access_tokens( rng: &mut impl RngCore, mut state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new( - K_ENTITY, - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, - )]; let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024); @@ -809,7 +682,6 @@ async fn migrate_unrefreshable_access_tokens( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -818,7 +690,6 @@ async fn migrate_unrefreshable_access_tokens( || user_infos.flags.is_appservice() { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; } @@ -879,7 +750,6 @@ async fn migrate_unrefreshable_access_tokens( .await .into_mas("writing compat access tokens")?; - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } write_buffer @@ -920,7 +790,6 @@ async fn migrate_unrefreshable_access_tokens( /// Migrates (access token, refresh token) pairs. /// Does not migrate non-refreshable access tokens. #[tracing::instrument(skip_all, level = Level::INFO)] -#[allow(clippy::too_many_arguments)] async fn migrate_refreshable_token_pairs( synapse: &mut SynapseReader<'_>, mut mas: MasWriter, @@ -928,11 +797,8 @@ async fn migrate_refreshable_token_pairs( rng: &mut impl RngCore, mut state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)]; let mut token_stream = pin!(synapse.read_refreshable_token_pairs()); let mut access_token_write_buffer = @@ -963,7 +829,6 @@ async fn migrate_refreshable_token_pairs( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -972,7 +837,6 @@ async fn migrate_refreshable_token_pairs( || user_infos.flags.is_appservice() { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; } @@ -1017,7 +881,6 @@ async fn migrate_refreshable_token_pairs( .await .into_mas("writing compat refresh tokens")?; - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } diff --git a/crates/syn2mas/src/progress.rs b/crates/syn2mas/src/progress.rs index e5f61d292..3c67825ce 100644 --- a/crates/syn2mas/src/progress.rs +++ b/crates/syn2mas/src/progress.rs @@ -1,6 +1,89 @@ -use std::sync::{Arc, atomic::AtomicU32}; +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::sync::{Arc, LazyLock, atomic::AtomicU32}; use arc_swap::ArcSwap; +use opentelemetry::{ + KeyValue, + metrics::{Counter, Gauge}, +}; + +use crate::telemetry::METER; + +/// A gauge that tracks the approximate number of entities of a given type +/// that will be migrated. +pub static APPROX_TOTAL_GAUGE: LazyLock> = LazyLock::new(|| { + METER + .u64_gauge("syn2mas.entity.approx_total") + .with_description("Approximate number of entities of this type to be migrated") + .build() +}); + +/// A counter that tracks the number of entities of a given type that have +/// been migrated so far. +pub static MIGRATED_COUNTER: LazyLock> = LazyLock::new(|| { + METER + .u64_counter("syn2mas.entity.migrated") + .with_description("Number of entities of this type that have been migrated so far") + .build() +}); + +/// A counter that tracks the number of entities of a given type that have +/// been skipped so far. +pub static SKIPPED_COUNTER: LazyLock> = LazyLock::new(|| { + METER + .u64_counter("syn2mas.entity.skipped") + .with_description("Number of entities of this type that have been skipped so far") + .build() +}); + +/// Enum representing the different types of entities that syn2mas can migrate. +#[derive(Debug, Clone, Copy)] +pub enum EntityType { + /// Represents users + Users, + + /// Represents devices + Devices, + + /// Represents third-party IDs + ThreePids, + + /// Represents external IDs + ExternalIds, + + /// Represents non-refreshable access tokens + NonRefreshableAccessTokens, + + /// Represents refreshable access tokens + RefreshableTokens, +} + +impl std::fmt::Display for EntityType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl EntityType { + pub const fn name(self) -> &'static str { + match self { + Self::Users => "users", + Self::Devices => "devices", + Self::ThreePids => "threepids", + Self::ExternalIds => "external_ids", + Self::NonRefreshableAccessTokens => "nonrefreshable_access_tokens", + Self::RefreshableTokens => "refreshable_tokens", + } + } + + pub fn as_kv(self) -> KeyValue { + KeyValue::new("entity", self.name()) + } +} /// Tracker for the progress of the migration /// @@ -11,25 +94,37 @@ pub struct Progress { current_stage: Arc>, } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct ProgressCounter { inner: Arc, } -#[derive(Default)] struct ProgressCounterInner { + kv: [KeyValue; 1], migrated: AtomicU32, skipped: AtomicU32, } impl ProgressCounter { + fn new(entity: EntityType) -> Self { + Self { + inner: Arc::new(ProgressCounterInner { + kv: [entity.as_kv()], + migrated: AtomicU32::new(0), + skipped: AtomicU32::new(0), + }), + } + } + pub fn increment_migrated(&self) { + MIGRATED_COUNTER.add(1, &self.inner.kv); self.inner .migrated .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } pub fn increment_skipped(&self) { + SKIPPED_COUNTER.add(1, &self.inner.kv); self.inner .skipped .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -52,8 +147,9 @@ impl ProgressCounter { impl Progress { #[must_use] - pub fn migrating_data(&self, entity: &'static str, approx_count: usize) -> ProgressCounter { - let counter = ProgressCounter::default(); + pub fn migrating_data(&self, entity: EntityType, approx_count: usize) -> ProgressCounter { + let counter = ProgressCounter::new(entity); + APPROX_TOTAL_GAUGE.record(approx_count as u64, &[entity.as_kv()]); self.set_current_stage(ProgressStage::MigratingData { entity, counter: counter.clone(), @@ -99,7 +195,7 @@ impl Default for Progress { pub enum ProgressStage { SettingUp, MigratingData { - entity: &'static str, + entity: EntityType, counter: ProgressCounter, approx_count: u64, }, diff --git a/crates/syn2mas/src/telemetry.rs b/crates/syn2mas/src/telemetry.rs index 5c1c0a54a..e9a3385fb 100644 --- a/crates/syn2mas/src/telemetry.rs +++ b/crates/syn2mas/src/telemetry.rs @@ -1,3 +1,8 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + use std::sync::LazyLock; use opentelemetry::{InstrumentationScope, metrics::Meter}; @@ -12,21 +17,3 @@ static SCOPE: LazyLock = LazyLock::new(|| { pub static METER: LazyLock = LazyLock::new(|| opentelemetry::global::meter_with_scope(SCOPE.clone())); - -/// Attribute key for syn2mas.entity metrics representing what entity. -pub const K_ENTITY: &str = "entity"; - -/// Attribute value for syn2mas.entity metrics representing users. -pub const V_ENTITY_USERS: &str = "users"; -/// Attribute value for syn2mas.entity metrics representing devices. -pub const V_ENTITY_DEVICES: &str = "devices"; -/// Attribute value for syn2mas.entity metrics representing threepids. -pub const V_ENTITY_THREEPIDS: &str = "threepids"; -/// Attribute value for syn2mas.entity metrics representing external IDs. -pub const V_ENTITY_EXTERNAL_IDS: &str = "external_ids"; -/// Attribute value for syn2mas.entity metrics representing non-refreshable -/// access token entities. -pub const V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS: &str = "nonrefreshable_access_tokens"; -/// Attribute value for syn2mas.entity metrics representing refreshable -/// access/refresh token pairs. -pub const V_ENTITY_REFRESHABLE_TOKEN_PAIRS: &str = "refreshable_token_pairs";