syn2mas: add a buffered channel for writing threepids

This commit is contained in:
Quentin Gliech
2025-04-22 13:40:14 +02:00
parent 1fcf650322
commit ef81b3ce4f

View File

@@ -335,77 +335,102 @@ async fn migrate_threepids(
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
let mut users_stream = pin!(synapse.read_threepids());
let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseThreepid>(10 * 1024 * 1024);
while let Some(threepid_res) = users_stream.next().await {
let SynapseThreepid {
user_id: synapse_user_id,
medium,
address,
added_at,
} = threepid_res.into_synapse("reading threepid")?;
let created_at: DateTime<Utc> = added_at.into();
// create a new RNG seeded from the passed RNG so that we can move it into the
// spawned task
let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
let task = tokio::spawn(
async move {
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
let mut unsupported_buffer =
MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
let username = synapse_user_id
.extract_localpart(&state.server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_infos) = state.users.get(username.as_str()).copied() else {
return Err(Error::MissingUserFromDependentTable {
table: "user_threepids".to_owned(),
user: synapse_user_id,
});
};
while let Some(threepid) = rx.recv().await {
let SynapseThreepid {
user_id: synapse_user_id,
medium,
address,
added_at,
} = threepid;
let created_at: DateTime<Utc> = added_at.into();
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
continue;
};
let username = synapse_user_id
.extract_localpart(&state.server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_infos) = state.users.get(username.as_str()).copied() else {
return Err(Error::MissingUserFromDependentTable {
table: "user_threepids".to_owned(),
user: synapse_user_id,
});
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
continue;
};
if medium == "email" {
email_buffer
.write(
&mut mas,
MasNewEmailThreepid {
user_id: mas_user_id,
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
created_at.into(),
&mut rng,
)),
email: address,
created_at,
},
)
.await
.into_mas("writing email")?;
} else {
unsupported_buffer
.write(
&mut mas,
MasNewUnsupportedThreepid {
user_id: mas_user_id,
medium,
address,
created_at,
},
)
.await
.into_mas("writing unsupported threepid")?;
}
progress_counter.increment_migrated();
}
if medium == "email" {
email_buffer
.write(
&mut mas,
MasNewEmailThreepid {
user_id: mas_user_id,
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
created_at.into(),
rng,
)),
email: address,
created_at,
},
)
.finish(&mut mas)
.await
.into_mas("writing email")?;
} else {
.into_mas("writing email threepids")?;
unsupported_buffer
.write(
&mut mas,
MasNewUnsupportedThreepid {
user_id: mas_user_id,
medium,
address,
created_at,
},
)
.finish(&mut mas)
.await
.into_mas("writing unsupported threepid")?;
.into_mas("writing unsupported threepids")?;
Ok((mas, state))
}
.instrument(tracing::info_span!("ingest_task")),
);
progress_counter.increment_migrated();
}
// In case this has an error, we still want to join the task, so we look at the
// error later
let res = synapse
.read_threepids()
.map_err(|e| e.into_synapse("reading threepids"))
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
.inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
.await;
email_buffer
.finish(&mut mas)
.await
.into_mas("writing email threepids")?;
unsupported_buffer
.finish(&mut mas)
.await
.into_mas("writing unsupported threepids")?;
let (mas, state) = task.await.into_join("threepid write task")??;
res?;
info!(
"third-party IDs migrated in {:.1}s",