syn2mas: disable logging of slow statements, better access token query perf (#4208)
This commit is contained in:
@@ -15,7 +15,7 @@ use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::
|
||||
use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader, synapse_config};
|
||||
use tracing::{Instrument, error, info_span, warn};
|
||||
|
||||
use crate::util::database_connection_from_config;
|
||||
use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options};
|
||||
|
||||
/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are
|
||||
/// errors preventing migration.
|
||||
@@ -114,7 +114,13 @@ impl Options {
|
||||
|
||||
let config = DatabaseConfig::extract_or_default(figment)?;
|
||||
|
||||
let mut mas_connection = database_connection_from_config(&config).await?;
|
||||
let mut mas_connection = database_connection_from_config_with_options(
|
||||
&config,
|
||||
&DatabaseConnectOptions {
|
||||
log_slow_statements: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
MIGRATOR
|
||||
.run(&mut mas_connection)
|
||||
@@ -225,7 +231,15 @@ impl Options {
|
||||
let reader = SynapseReader::new(&mut syn_conn, true).await?;
|
||||
let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS);
|
||||
for _ in 0..NUM_WRITER_CONNECTIONS {
|
||||
writer_mas_connections.push(database_connection_from_config(&config).await?);
|
||||
writer_mas_connections.push(
|
||||
database_connection_from_config_with_options(
|
||||
&config,
|
||||
&DatabaseConnectOptions {
|
||||
log_slow_statements: false,
|
||||
},
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
let writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
|
||||
|
||||
|
||||
@@ -231,6 +231,7 @@ pub async fn templates_from_config(
|
||||
|
||||
fn database_connect_options_from_config(
|
||||
config: &DatabaseConfig,
|
||||
opts: &DatabaseConnectOptions,
|
||||
) -> Result<PgConnectOptions, anyhow::Error> {
|
||||
let options = if let Some(uri) = config.uri.as_deref() {
|
||||
uri.parse()
|
||||
@@ -315,9 +316,11 @@ fn database_connect_options_from_config(
|
||||
None => options,
|
||||
};
|
||||
|
||||
let options = options
|
||||
.log_statements(LevelFilter::Debug)
|
||||
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(100));
|
||||
let mut options = options.log_statements(LevelFilter::Debug);
|
||||
|
||||
if opts.log_slow_statements {
|
||||
options = options.log_slow_statements(LevelFilter::Warn, Duration::from_millis(100));
|
||||
}
|
||||
|
||||
Ok(options)
|
||||
}
|
||||
@@ -325,7 +328,7 @@ fn database_connect_options_from_config(
|
||||
/// Create a database connection pool from the configuration
|
||||
#[tracing::instrument(name = "db.connect", skip_all, err(Debug))]
|
||||
pub async fn database_pool_from_config(config: &DatabaseConfig) -> Result<PgPool, anyhow::Error> {
|
||||
let options = database_connect_options_from_config(config)?;
|
||||
let options = database_connect_options_from_config(config, &DatabaseConnectOptions::default())?;
|
||||
PgPoolOptions::new()
|
||||
.max_connections(config.max_connections.into())
|
||||
.min_connections(config.min_connections)
|
||||
@@ -337,12 +340,37 @@ pub async fn database_pool_from_config(config: &DatabaseConfig) -> Result<PgPool
|
||||
.context("could not connect to the database")
|
||||
}
|
||||
|
||||
pub struct DatabaseConnectOptions {
|
||||
pub log_slow_statements: bool,
|
||||
}
|
||||
|
||||
impl Default for DatabaseConnectOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
log_slow_statements: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a single database connection from the configuration
|
||||
#[tracing::instrument(name = "db.connect", skip_all, err(Debug))]
|
||||
pub async fn database_connection_from_config(
|
||||
config: &DatabaseConfig,
|
||||
) -> Result<PgConnection, anyhow::Error> {
|
||||
database_connect_options_from_config(config)?
|
||||
database_connect_options_from_config(config, &DatabaseConnectOptions::default())?
|
||||
.connect()
|
||||
.await
|
||||
.context("could not connect to the database")
|
||||
}
|
||||
|
||||
/// Create a single database connection from the configuration,
|
||||
/// with specific options.
|
||||
#[tracing::instrument(name = "db.connect", skip_all, err(Debug))]
|
||||
pub async fn database_connection_from_config_with_options(
|
||||
config: &DatabaseConfig,
|
||||
options: &DatabaseConnectOptions,
|
||||
) -> Result<PgConnection, anyhow::Error> {
|
||||
database_connect_options_from_config(config, options)?
|
||||
.connect()
|
||||
.await
|
||||
.context("could not connect to the database")
|
||||
|
||||
@@ -449,7 +449,7 @@ impl<'conn> SynapseReader<'conn> {
|
||||
INNER JOIN devices USING (user_id, device_id)
|
||||
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
|
||||
|
||||
UNION
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
|
||||
@@ -478,7 +478,7 @@ impl<'conn> SynapseReader<'conn> {
|
||||
SELECT
|
||||
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
|
||||
FROM refresh_tokens rt0
|
||||
INNER JOIN devices USING (device_id)
|
||||
INNER JOIN devices USING (user_id, device_id)
|
||||
INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
|
||||
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
|
||||
WHERE NOT at1.used OR at1.used IS NULL
|
||||
@@ -505,7 +505,6 @@ mod test {
|
||||
},
|
||||
};
|
||||
|
||||
// TODO test me
|
||||
static MIGRATOR: Migrator = sqlx::migrate!("./test_synapse_migrations");
|
||||
|
||||
#[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice"))]
|
||||
|
||||
Reference in New Issue
Block a user