syn2mas: add a buffered channel for writing external IDs

This commit is contained in:
Quentin Gliech
2025-04-22 13:44:52 +02:00
parent ef81b3ce4f
commit c292da7ac9

View File

@@ -440,10 +440,6 @@ async fn migrate_threepids(
Ok((mas, state))
}
/// # Parameters
///
/// - `provider_id_mapping`: mapping from Synapse `auth_provider` ID to UUID of
/// the upstream provider in MAS.
#[tracing::instrument(skip_all, level = Level::INFO)]
async fn migrate_external_ids(
synapse: &mut SynapseReader<'_>,
@@ -454,65 +450,91 @@ async fn migrate_external_ids(
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
let mut extids_stream = pin!(synapse.read_user_external_ids());
let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseExternalId>(10 * 1024 * 1024);
while let Some(extid_res) = extids_stream.next().await {
let SynapseExternalId {
user_id: synapse_user_id,
auth_provider,
external_id: subject,
} = extid_res.into_synapse("reading external ID")?;
let username = synapse_user_id
.extract_localpart(&state.server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_infos) = state.users.get(username.as_str()).copied() else {
return Err(Error::MissingUserFromDependentTable {
table: "user_external_ids".to_owned(),
user: synapse_user_id,
});
};
// create a new RNG seeded from the passed RNG so that we can move it into the
// spawned task
let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
let task = tokio::spawn(
async move {
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
continue;
};
while let Some(extid) = rx.recv().await {
let SynapseExternalId {
user_id: synapse_user_id,
auth_provider,
external_id: subject,
} = extid;
let username = synapse_user_id
.extract_localpart(&state.server_name)
.into_extract_localpart(synapse_user_id.clone())?
.to_owned();
let Some(user_infos) = state.users.get(username.as_str()).copied() else {
return Err(Error::MissingUserFromDependentTable {
table: "user_external_ids".to_owned(),
user: synapse_user_id,
});
};
let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else {
return Err(Error::MissingAuthProviderMapping {
synapse_id: auth_provider,
user: synapse_user_id,
});
};
let Some(mas_user_id) = user_infos.mas_user_id else {
progress_counter.increment_skipped();
continue;
};
// To save having to store user creation times, extract it from the ULID
// This gives millisecond precision — good enough.
let user_created_ts = Ulid::from(mas_user_id.get()).datetime();
let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider)
else {
return Err(Error::MissingAuthProviderMapping {
synapse_id: auth_provider,
user: synapse_user_id,
});
};
let link_id: Uuid = Ulid::from_datetime_with_source(user_created_ts, rng).into();
// To save having to store user creation times, extract it from the ULID
// This gives millisecond precision — good enough.
let user_created_ts = Ulid::from(mas_user_id.get()).datetime();
write_buffer
.write(
&mut mas,
MasNewUpstreamOauthLink {
link_id,
user_id: mas_user_id,
upstream_provider_id,
subject,
created_at: user_created_ts.into(),
},
)
.await
.into_mas("failed to write upstream link")?;
let link_id: Uuid =
Ulid::from_datetime_with_source(user_created_ts, &mut rng).into();
progress_counter.increment_migrated();
}
write_buffer
.write(
&mut mas,
MasNewUpstreamOauthLink {
link_id,
user_id: mas_user_id,
upstream_provider_id,
subject,
created_at: user_created_ts.into(),
},
)
.await
.into_mas("failed to write upstream link")?;
write_buffer
.finish(&mut mas)
.await
.into_mas("writing upstream links")?;
progress_counter.increment_migrated();
}
write_buffer
.finish(&mut mas)
.await
.into_mas("writing upstream links")?;
Ok((mas, state))
}
.instrument(tracing::info_span!("ingest_task")),
);
// In case this has an error, we still want to join the task, so we look at the
// error later
let res = synapse
.read_user_external_ids()
.map_err(|e| e.into_synapse("reading external ID"))
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
.inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
.await;
let (mas, state) = task.await.into_join("external IDs write task")??;
res?;
info!(
"upstream links (external IDs) migrated in {:.1}s",