Log timings of migrations and index/constraint rebuilds

This commit is contained in:
Olivier 'reivilibre
2025-02-01 17:41:11 +00:00
parent 8c1a8dd8a1
commit 07dd9cd9ec
2 changed files with 62 additions and 4 deletions

View File

@@ -3,8 +3,10 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use std::time::Instant;
use sqlx::PgConnection;
use tracing::debug;
use tracing::{debug, info};
use super::{Error, IntoDatabase};
@@ -114,6 +116,8 @@ pub async fn restore_constraint(
conn: &mut PgConnection,
constraint: &ConstraintDescription,
) -> Result<(), Error> {
let start = Instant::now();
let ConstraintDescription {
name,
table_name,
@@ -128,6 +132,11 @@ pub async fn restore_constraint(
format!("failed to recreate constraint {name} on {table_name} with {definition}")
})?;
info!(
"constraint {name} rebuilt in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok(())
}
@@ -136,6 +145,8 @@ pub async fn restore_constraint(
/// The index must not exist prior to this call.
#[tracing::instrument(name = "syn2mas.restore_index", skip_all, fields(index.name = index.name))]
pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
let start = Instant::now();
let IndexDescription {
name,
table_name,
@@ -149,5 +160,10 @@ pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) ->
format!("failed to recreate index {name} on {table_name} with {definition}")
})?;
info!(
"index {name} rebuilt in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok(())
}

View File

@@ -11,7 +11,7 @@
//! This module does not implement any of the safety checks that should be run
//! *before* the migration.
use std::{collections::HashMap, pin::pin};
use std::{collections::HashMap, pin::pin, time::Instant};
use chrono::{DateTime, Utc};
use compact_str::CompactString;
@@ -20,7 +20,7 @@ use mas_storage::Clock;
use rand::RngCore;
use thiserror::Error;
use thiserror_ext::ContextInto;
use tracing::Level;
use tracing::{Level, info};
use ulid::Ulid;
use uuid::{NonNilUuid, Uuid};
@@ -177,6 +177,8 @@ async fn migrate_users(
mut state: MigrationState,
rng: &mut impl RngCore,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users);
let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords);
let mut users_stream = pin!(synapse.read_users());
@@ -254,6 +256,11 @@ async fn migrate_users(
.await
.into_mas("writing passwords")?;
info!(
"users migrated in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok((mas, state))
}
@@ -264,6 +271,8 @@ async fn migrate_threepids(
rng: &mut impl RngCore,
state: MigrationState,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
let mut users_stream = pin!(synapse.read_threepids());
@@ -333,6 +342,11 @@ async fn migrate_threepids(
.await
.into_mas("writing unsupported threepids")?;
info!(
"third-party IDs migrated in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok((mas, state))
}
@@ -347,6 +361,8 @@ async fn migrate_external_ids(
rng: &mut impl RngCore,
state: MigrationState,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
let mut extids_stream = pin!(synapse.read_user_external_ids());
@@ -402,7 +418,12 @@ async fn migrate_external_ids(
write_buffer
.finish(&mut mas)
.await
.into_mas("writing threepids")?;
.into_mas("writing upstream links")?;
info!(
"upstream links (external IDs) migrated in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok((mas, state))
}
@@ -422,6 +443,8 @@ async fn migrate_devices(
rng: &mut impl RngCore,
mut state: MigrationState,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut devices_stream = pin!(synapse.read_devices());
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
@@ -509,6 +532,11 @@ async fn migrate_devices(
.await
.into_mas("writing compat sessions")?;
info!(
"devices migrated in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok((mas, state))
}
@@ -522,6 +550,8 @@ async fn migrate_unrefreshable_access_tokens(
rng: &mut impl RngCore,
mut state: MigrationState,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
let mut deviceless_session_write_buffer =
@@ -624,6 +654,11 @@ async fn migrate_unrefreshable_access_tokens(
.await
.into_mas("writing deviceless compat sessions")?;
info!(
"non-refreshable access tokens migrated in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok((mas, state))
}
@@ -637,6 +672,8 @@ async fn migrate_refreshable_token_pairs(
rng: &mut impl RngCore,
mut state: MigrationState,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
let mut access_token_write_buffer =
MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
@@ -727,6 +764,11 @@ async fn migrate_refreshable_token_pairs(
.await
.into_mas("writing compat refresh tokens")?;
info!(
"refreshable token pairs migrated in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);
Ok((mas, state))
}