From c292da7ac99604498c904a7c73c362c977077c27 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 22 Apr 2025 13:44:52 +0200 Subject: [PATCH] syn2mas: add a buffered channel for writing external IDs --- crates/syn2mas/src/migration.rs | 132 +++++++++++++++++++------------- 1 file changed, 77 insertions(+), 55 deletions(-) diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index f5efaa722..fb2ea5487 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -440,10 +440,6 @@ async fn migrate_threepids( Ok((mas, state)) } -/// # Parameters -/// -/// - `provider_id_mapping`: mapping from Synapse `auth_provider` ID to UUID of -/// the upstream provider in MAS. #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_external_ids( synapse: &mut SynapseReader<'_>, @@ -454,65 +450,91 @@ async fn migrate_external_ids( ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links); - let mut extids_stream = pin!(synapse.read_user_external_ids()); + let (tx, mut rx) = tokio::sync::mpsc::channel::(10 * 1024 * 1024); - while let Some(extid_res) = extids_stream.next().await { - let SynapseExternalId { - user_id: synapse_user_id, - auth_provider, - external_id: subject, - } = extid_res.into_synapse("reading external ID")?; - let username = synapse_user_id - .extract_localpart(&state.server_name) - .into_extract_localpart(synapse_user_id.clone())? - .to_owned(); - let Some(user_infos) = state.users.get(username.as_str()).copied() else { - return Err(Error::MissingUserFromDependentTable { - table: "user_external_ids".to_owned(), - user: synapse_user_id, - }); - }; + // create a new RNG seeded from the passed RNG so that we can move it into the + // spawned task + let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); + let task = tokio::spawn( + async move { + let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links); - let Some(mas_user_id) = user_infos.mas_user_id else { - progress_counter.increment_skipped(); - continue; - }; + while let Some(extid) = rx.recv().await { + let SynapseExternalId { + user_id: synapse_user_id, + auth_provider, + external_id: subject, + } = extid; + let username = synapse_user_id + .extract_localpart(&state.server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_infos) = state.users.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "user_external_ids".to_owned(), + user: synapse_user_id, + }); + }; - let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else { - return Err(Error::MissingAuthProviderMapping { - synapse_id: auth_provider, - user: synapse_user_id, - }); - }; + let Some(mas_user_id) = user_infos.mas_user_id else { + progress_counter.increment_skipped(); + continue; + }; - // To save having to store user creation times, extract it from the ULID - // This gives millisecond precision — good enough. - let user_created_ts = Ulid::from(mas_user_id.get()).datetime(); + let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) + else { + return Err(Error::MissingAuthProviderMapping { + synapse_id: auth_provider, + user: synapse_user_id, + }); + }; - let link_id: Uuid = Ulid::from_datetime_with_source(user_created_ts, rng).into(); + // To save having to store user creation times, extract it from the ULID + // This gives millisecond precision — good enough. + let user_created_ts = Ulid::from(mas_user_id.get()).datetime(); - write_buffer - .write( - &mut mas, - MasNewUpstreamOauthLink { - link_id, - user_id: mas_user_id, - upstream_provider_id, - subject, - created_at: user_created_ts.into(), - }, - ) - .await - .into_mas("failed to write upstream link")?; + let link_id: Uuid = + Ulid::from_datetime_with_source(user_created_ts, &mut rng).into(); - progress_counter.increment_migrated(); - } + write_buffer + .write( + &mut mas, + MasNewUpstreamOauthLink { + link_id, + user_id: mas_user_id, + upstream_provider_id, + subject, + created_at: user_created_ts.into(), + }, + ) + .await + .into_mas("failed to write upstream link")?; - write_buffer - .finish(&mut mas) - .await - .into_mas("writing upstream links")?; + progress_counter.increment_migrated(); + } + + write_buffer + .finish(&mut mas) + .await + .into_mas("writing upstream links")?; + + Ok((mas, state)) + } + .instrument(tracing::info_span!("ingest_task")), + ); + + // In case this has an error, we still want to join the task, so we look at the + // error later + let res = synapse + .read_user_external_ids() + .map_err(|e| e.into_synapse("reading external ID")) + .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed)) + .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error)) + .await; + + let (mas, state) = task.await.into_join("external IDs write task")??; + + res?; info!( "upstream links (external IDs) migrated in {:.1}s",