diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 6d0420077..f5efaa722 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -335,77 +335,102 @@ async fn migrate_threepids( ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids); - let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids); - let mut users_stream = pin!(synapse.read_threepids()); + let (tx, mut rx) = tokio::sync::mpsc::channel::(10 * 1024 * 1024); - while let Some(threepid_res) = users_stream.next().await { - let SynapseThreepid { - user_id: synapse_user_id, - medium, - address, - added_at, - } = threepid_res.into_synapse("reading threepid")?; - let created_at: DateTime = added_at.into(); + // create a new RNG seeded from the passed RNG so that we can move it into the + // spawned task + let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); + let task = tokio::spawn( + async move { + let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids); + let mut unsupported_buffer = + MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids); - let username = synapse_user_id - .extract_localpart(&state.server_name) - .into_extract_localpart(synapse_user_id.clone())? - .to_owned(); - let Some(user_infos) = state.users.get(username.as_str()).copied() else { - return Err(Error::MissingUserFromDependentTable { - table: "user_threepids".to_owned(), - user: synapse_user_id, - }); - }; + while let Some(threepid) = rx.recv().await { + let SynapseThreepid { + user_id: synapse_user_id, + medium, + address, + added_at, + } = threepid; + let created_at: DateTime = added_at.into(); - let Some(mas_user_id) = user_infos.mas_user_id else { - progress_counter.increment_skipped(); - continue; - }; + let username = synapse_user_id + .extract_localpart(&state.server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_infos) = state.users.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "user_threepids".to_owned(), + user: synapse_user_id, + }); + }; + + let Some(mas_user_id) = user_infos.mas_user_id else { + progress_counter.increment_skipped(); + continue; + }; + + if medium == "email" { + email_buffer + .write( + &mut mas, + MasNewEmailThreepid { + user_id: mas_user_id, + user_email_id: Uuid::from(Ulid::from_datetime_with_source( + created_at.into(), + &mut rng, + )), + email: address, + created_at, + }, + ) + .await + .into_mas("writing email")?; + } else { + unsupported_buffer + .write( + &mut mas, + MasNewUnsupportedThreepid { + user_id: mas_user_id, + medium, + address, + created_at, + }, + ) + .await + .into_mas("writing unsupported threepid")?; + } + + progress_counter.increment_migrated(); + } - if medium == "email" { email_buffer - .write( - &mut mas, - MasNewEmailThreepid { - user_id: mas_user_id, - user_email_id: Uuid::from(Ulid::from_datetime_with_source( - created_at.into(), - rng, - )), - email: address, - created_at, - }, - ) + .finish(&mut mas) .await - .into_mas("writing email")?; - } else { + .into_mas("writing email threepids")?; unsupported_buffer - .write( - &mut mas, - MasNewUnsupportedThreepid { - user_id: mas_user_id, - medium, - address, - created_at, - }, - ) + .finish(&mut mas) .await - .into_mas("writing unsupported threepid")?; + .into_mas("writing unsupported threepids")?; + + Ok((mas, state)) } + .instrument(tracing::info_span!("ingest_task")), + ); - progress_counter.increment_migrated(); - } + // In case this has an error, we still want to join the task, so we look at the + // error later + let res = synapse + .read_threepids() + .map_err(|e| e.into_synapse("reading threepids")) + .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed)) + .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error)) + .await; - email_buffer - .finish(&mut mas) - .await - .into_mas("writing email threepids")?; - unsupported_buffer - .finish(&mut mas) - .await - .into_mas("writing unsupported threepids")?; + let (mas, state) = task.await.into_join("threepid write task")??; + + res?; info!( "third-party IDs migrated in {:.1}s",