diff --git a/Cargo.lock b/Cargo.lock index 86a87f359..96915d6f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3165,6 +3165,7 @@ dependencies = [ "itertools 0.14.0", "listenfd", "mas-config", + "mas-context", "mas-data-model", "mas-email", "mas-handlers", @@ -3317,6 +3318,7 @@ dependencies = [ "lettre", "mas-axum-utils", "mas-config", + "mas-context", "mas-data-model", "mas-http", "mas-i18n", @@ -3685,6 +3687,7 @@ dependencies = [ "async-trait", "chrono", "cron", + "mas-context", "mas-data-model", "mas-email", "mas-i18n", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index cc40545cb..e181930c8 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -66,6 +66,7 @@ sentry-tracing.workspace = true sentry-tower.workspace = true mas-config.workspace = true +mas-context.workspace = true mas-data-model.workspace = true mas-email.workspace = true mas-handlers.workspace = true diff --git a/crates/cli/src/app_state.rs b/crates/cli/src/app_state.rs index 0fb6064ea..cd4ae44ad 100644 --- a/crates/cli/src/app_state.rs +++ b/crates/cli/src/app_state.rs @@ -8,6 +8,7 @@ use std::{convert::Infallible, net::IpAddr, sync::Arc, time::Instant}; use axum::extract::{FromRef, FromRequestParts}; use ipnetwork::IpNetwork; +use mas_context::LogContext; use mas_data_model::SiteConfig; use mas_handlers::{ ActivityTracker, BoundActivityTracker, CookieManager, ErrorWrapper, GraphQLSchema, Limiter, @@ -92,35 +93,36 @@ impl AppState { let http_client = self.http_client.clone(); tokio::spawn( - async move { - let conn = match pool.acquire().await { - Ok(conn) => conn, - Err(e) => { + LogContext::new("metadata-cache-warmup") + .run(async move || { + let conn = match pool.acquire().await { + Ok(conn) => conn, + Err(e) => { + tracing::error!( + error = &e as &dyn std::error::Error, + "Failed to acquire a database connection" + ); + return; + } + }; + + let mut repo = PgRepository::from_conn(conn); + + if let Err(e) = metadata_cache + .warm_up_and_run( + &http_client, + std::time::Duration::from_secs(60 * 15), + &mut repo, + ) + .await + { tracing::error!( error = &e as &dyn std::error::Error, - "Failed to acquire a database connection" + "Failed to warm up the metadata cache" ); - return; } - }; - - let mut repo = PgRepository::from_conn(conn); - - if let Err(e) = metadata_cache - .warm_up_and_run( - &http_client, - std::time::Duration::from_secs(60 * 15), - &mut repo, - ) - .await - { - tracing::error!( - error = &e as &dyn std::error::Error, - "Failed to warm up the metadata cache" - ); - } - } - .instrument(tracing::info_span!("metadata_cache.background_warmup")), + }) + .instrument(tracing::info_span!("metadata_cache.background_warmup")), ); } } diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index 15074c684..37d4f5392 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -13,6 +13,7 @@ use itertools::Itertools; use mas_config::{ AppConfig, ClientsConfig, ConfigurationSection, ConfigurationSectionExt, UpstreamOAuth2Config, }; +use mas_context::LogContext; use mas_handlers::{ActivityTracker, CookieManager, Limiter, MetadataCache}; use mas_listener::server::Server; use mas_router::UrlBuilder; @@ -316,11 +317,13 @@ impl Options { shutdown .task_tracker() - .spawn(mas_listener::server::run_servers( - servers, - shutdown.soft_shutdown_token(), - shutdown.hard_shutdown_token(), - )); + .spawn(LogContext::new("run-servers").run(|| { + mas_listener::server::run_servers( + servers, + shutdown.soft_shutdown_token(), + shutdown.hard_shutdown_token(), + ) + })); let exit_code = shutdown.run().await; diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index b40cc0d44..ecc846f4f 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -277,6 +277,9 @@ pub fn build_router( span.record("otel.status_code", "OK"); }), ) + .layer(mas_context::LogContextLayer::new(|req| { + otel_http_method(req).into() + })) .layer(SentryHttpLayer::new()) .layer(NewSentryLayer::new_from_top()) .with_state(state) diff --git a/crates/cli/src/util.rs b/crates/cli/src/util.rs index 2daad2e91..3d74fb4bf 100644 --- a/crates/cli/src/util.rs +++ b/crates/cli/src/util.rs @@ -12,6 +12,7 @@ use mas_config::{ EmailTransportKind, ExperimentalConfig, HomeserverKind, MatrixConfig, PasswordsConfig, PolicyConfig, TemplatesConfig, }; +use mas_context::LogContext; use mas_data_model::{SessionExpirationConfig, SiteConfig}; use mas_email::{MailTransport, Mailer}; use mas_handlers::passwords::PasswordManager; @@ -109,20 +110,23 @@ pub fn test_mailer_in_background(mailer: &Mailer, timeout: Duration) { let mailer = mailer.clone(); let span = tracing::info_span!("cli.test_mailer"); - tokio::spawn(async move { - match tokio::time::timeout(timeout, mailer.test_connection()).await { - Ok(Ok(())) => {} - Ok(Err(err)) => { - tracing::warn!( - error = &err as &dyn std::error::Error, - "Could not connect to the mail backend, tasks sending mails may fail!" - ); + tokio::spawn( + LogContext::new("mailer-test").run(async move || { + match tokio::time::timeout(timeout, mailer.test_connection()).await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + tracing::warn!( + error = &err as &dyn std::error::Error, + "Could not connect to the mail backend, tasks sending mails may fail!" + ); + } + Err(_) => { + tracing::warn!("Timed out while testing the mail backend connection, tasks sending mails may fail!"); + } } - Err(_) => { - tracing::warn!("Timed out while testing the mail backend connection, tasks sending mails may fail!"); - } - } - }.instrument(span)); + }) + .instrument(span) + ); } pub async fn policy_factory_from_config( diff --git a/crates/handlers/Cargo.toml b/crates/handlers/Cargo.toml index 65c7bbb6f..21cf1ded1 100644 --- a/crates/handlers/Cargo.toml +++ b/crates/handlers/Cargo.toml @@ -90,6 +90,7 @@ ulid.workspace = true mas-axum-utils.workspace = true mas-config.workspace = true +mas-context.workspace = true mas-data-model.workspace = true mas-http.workspace = true mas-i18n.workspace = true diff --git a/crates/handlers/src/upstream_oauth2/cache.rs b/crates/handlers/src/upstream_oauth2/cache.rs index 02a202745..54253cac9 100644 --- a/crates/handlers/src/upstream_oauth2/cache.rs +++ b/crates/handlers/src/upstream_oauth2/cache.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc}; +use mas_context::LogContext; use mas_data_model::{ UpstreamOAuthProvider, UpstreamOAuthProviderDiscoveryMode, UpstreamOAuthProviderPkceMode, }; @@ -197,7 +198,9 @@ impl MetadataCache { loop { // Re-fetch the known metadata at the given interval tokio::time::sleep(interval).await; - cache.refresh_all(&client).await; + LogContext::new("metadata-cache-refresh") + .run(|| cache.refresh_all(&client)) + .await; } })) } diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 306e5cede..18eb740d5 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -30,6 +30,7 @@ ulid.workspace = true serde.workspace = true serde_json.workspace = true +mas-context.workspace = true mas-data-model.workspace = true mas-email.workspace = true mas-i18n.workspace = true diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index b78d9014a..a2021c364 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -8,6 +8,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use cron::Schedule; +use mas_context::LogContext; use mas_storage::{ Clock, RepositoryAccess, RepositoryError, queue::{InsertableJob, Job, JobMetadata, Worker}, @@ -337,7 +338,9 @@ impl QueueWorker { self.setup_schedules().await?; while !self.cancellation_token.is_cancelled() { - self.run_loop().await?; + LogContext::new("worker-run-loop") + .run(|| self.run_loop()) + .await?; } self.shutdown().await?; @@ -771,16 +774,18 @@ impl JobTracker { fn spawn_job(&mut self, state: State, context: JobContext, payload: JobPayload) { let factory = self.factories.get(context.queue_name.as_str()).cloned(); let task = { + let log_context = LogContext::new(format!("worker-job-{}", context.queue_name)); let context = context.clone(); let span = context.span(); - async move { - // We should never crash, but in case we do, we do that in the task and - // don't crash the worker - let job = factory.expect("unknown job factory")(payload); - tracing::info!("Running job"); - job.run(&state, context).await - } - .instrument(span) + log_context + .run(async move || { + // We should never crash, but in case we do, we do that in the task and + // don't crash the worker + let job = factory.expect("unknown job factory")(payload); + tracing::info!("Running job"); + job.run(&state, context).await + }) + .instrument(span) }; self.in_flight_jobs.add(