Create a few basic logging contexts
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -3165,6 +3165,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"listenfd",
|
||||
"mas-config",
|
||||
"mas-context",
|
||||
"mas-data-model",
|
||||
"mas-email",
|
||||
"mas-handlers",
|
||||
@@ -3317,6 +3318,7 @@ dependencies = [
|
||||
"lettre",
|
||||
"mas-axum-utils",
|
||||
"mas-config",
|
||||
"mas-context",
|
||||
"mas-data-model",
|
||||
"mas-http",
|
||||
"mas-i18n",
|
||||
@@ -3685,6 +3687,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"cron",
|
||||
"mas-context",
|
||||
"mas-data-model",
|
||||
"mas-email",
|
||||
"mas-i18n",
|
||||
|
||||
@@ -66,6 +66,7 @@ sentry-tracing.workspace = true
|
||||
sentry-tower.workspace = true
|
||||
|
||||
mas-config.workspace = true
|
||||
mas-context.workspace = true
|
||||
mas-data-model.workspace = true
|
||||
mas-email.workspace = true
|
||||
mas-handlers.workspace = true
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::{convert::Infallible, net::IpAddr, sync::Arc, time::Instant};
|
||||
|
||||
use axum::extract::{FromRef, FromRequestParts};
|
||||
use ipnetwork::IpNetwork;
|
||||
use mas_context::LogContext;
|
||||
use mas_data_model::SiteConfig;
|
||||
use mas_handlers::{
|
||||
ActivityTracker, BoundActivityTracker, CookieManager, ErrorWrapper, GraphQLSchema, Limiter,
|
||||
@@ -92,35 +93,36 @@ impl AppState {
|
||||
let http_client = self.http_client.clone();
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let conn = match pool.acquire().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
LogContext::new("metadata-cache-warmup")
|
||||
.run(async move || {
|
||||
let conn = match pool.acquire().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
error = &e as &dyn std::error::Error,
|
||||
"Failed to acquire a database connection"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut repo = PgRepository::from_conn(conn);
|
||||
|
||||
if let Err(e) = metadata_cache
|
||||
.warm_up_and_run(
|
||||
&http_client,
|
||||
std::time::Duration::from_secs(60 * 15),
|
||||
&mut repo,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
error = &e as &dyn std::error::Error,
|
||||
"Failed to acquire a database connection"
|
||||
"Failed to warm up the metadata cache"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut repo = PgRepository::from_conn(conn);
|
||||
|
||||
if let Err(e) = metadata_cache
|
||||
.warm_up_and_run(
|
||||
&http_client,
|
||||
std::time::Duration::from_secs(60 * 15),
|
||||
&mut repo,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
error = &e as &dyn std::error::Error,
|
||||
"Failed to warm up the metadata cache"
|
||||
);
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!("metadata_cache.background_warmup")),
|
||||
})
|
||||
.instrument(tracing::info_span!("metadata_cache.background_warmup")),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use itertools::Itertools;
|
||||
use mas_config::{
|
||||
AppConfig, ClientsConfig, ConfigurationSection, ConfigurationSectionExt, UpstreamOAuth2Config,
|
||||
};
|
||||
use mas_context::LogContext;
|
||||
use mas_handlers::{ActivityTracker, CookieManager, Limiter, MetadataCache};
|
||||
use mas_listener::server::Server;
|
||||
use mas_router::UrlBuilder;
|
||||
@@ -316,11 +317,13 @@ impl Options {
|
||||
|
||||
shutdown
|
||||
.task_tracker()
|
||||
.spawn(mas_listener::server::run_servers(
|
||||
servers,
|
||||
shutdown.soft_shutdown_token(),
|
||||
shutdown.hard_shutdown_token(),
|
||||
));
|
||||
.spawn(LogContext::new("run-servers").run(|| {
|
||||
mas_listener::server::run_servers(
|
||||
servers,
|
||||
shutdown.soft_shutdown_token(),
|
||||
shutdown.hard_shutdown_token(),
|
||||
)
|
||||
}));
|
||||
|
||||
let exit_code = shutdown.run().await;
|
||||
|
||||
|
||||
@@ -277,6 +277,9 @@ pub fn build_router(
|
||||
span.record("otel.status_code", "OK");
|
||||
}),
|
||||
)
|
||||
.layer(mas_context::LogContextLayer::new(|req| {
|
||||
otel_http_method(req).into()
|
||||
}))
|
||||
.layer(SentryHttpLayer::new())
|
||||
.layer(NewSentryLayer::new_from_top())
|
||||
.with_state(state)
|
||||
|
||||
@@ -12,6 +12,7 @@ use mas_config::{
|
||||
EmailTransportKind, ExperimentalConfig, HomeserverKind, MatrixConfig, PasswordsConfig,
|
||||
PolicyConfig, TemplatesConfig,
|
||||
};
|
||||
use mas_context::LogContext;
|
||||
use mas_data_model::{SessionExpirationConfig, SiteConfig};
|
||||
use mas_email::{MailTransport, Mailer};
|
||||
use mas_handlers::passwords::PasswordManager;
|
||||
@@ -109,20 +110,23 @@ pub fn test_mailer_in_background(mailer: &Mailer, timeout: Duration) {
|
||||
let mailer = mailer.clone();
|
||||
|
||||
let span = tracing::info_span!("cli.test_mailer");
|
||||
tokio::spawn(async move {
|
||||
match tokio::time::timeout(timeout, mailer.test_connection()).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(err)) => {
|
||||
tracing::warn!(
|
||||
error = &err as &dyn std::error::Error,
|
||||
"Could not connect to the mail backend, tasks sending mails may fail!"
|
||||
);
|
||||
tokio::spawn(
|
||||
LogContext::new("mailer-test").run(async move || {
|
||||
match tokio::time::timeout(timeout, mailer.test_connection()).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(err)) => {
|
||||
tracing::warn!(
|
||||
error = &err as &dyn std::error::Error,
|
||||
"Could not connect to the mail backend, tasks sending mails may fail!"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!("Timed out while testing the mail backend connection, tasks sending mails may fail!");
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!("Timed out while testing the mail backend connection, tasks sending mails may fail!");
|
||||
}
|
||||
}
|
||||
}.instrument(span));
|
||||
})
|
||||
.instrument(span)
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn policy_factory_from_config(
|
||||
|
||||
@@ -90,6 +90,7 @@ ulid.workspace = true
|
||||
|
||||
mas-axum-utils.workspace = true
|
||||
mas-config.workspace = true
|
||||
mas-context.workspace = true
|
||||
mas-data-model.workspace = true
|
||||
mas-http.workspace = true
|
||||
mas-i18n.workspace = true
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use mas_context::LogContext;
|
||||
use mas_data_model::{
|
||||
UpstreamOAuthProvider, UpstreamOAuthProviderDiscoveryMode, UpstreamOAuthProviderPkceMode,
|
||||
};
|
||||
@@ -197,7 +198,9 @@ impl MetadataCache {
|
||||
loop {
|
||||
// Re-fetch the known metadata at the given interval
|
||||
tokio::time::sleep(interval).await;
|
||||
cache.refresh_all(&client).await;
|
||||
LogContext::new("metadata-cache-refresh")
|
||||
.run(|| cache.refresh_all(&client))
|
||||
.await;
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ ulid.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
mas-context.workspace = true
|
||||
mas-data-model.workspace = true
|
||||
mas-email.workspace = true
|
||||
mas-i18n.workspace = true
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use cron::Schedule;
|
||||
use mas_context::LogContext;
|
||||
use mas_storage::{
|
||||
Clock, RepositoryAccess, RepositoryError,
|
||||
queue::{InsertableJob, Job, JobMetadata, Worker},
|
||||
@@ -337,7 +338,9 @@ impl QueueWorker {
|
||||
self.setup_schedules().await?;
|
||||
|
||||
while !self.cancellation_token.is_cancelled() {
|
||||
self.run_loop().await?;
|
||||
LogContext::new("worker-run-loop")
|
||||
.run(|| self.run_loop())
|
||||
.await?;
|
||||
}
|
||||
|
||||
self.shutdown().await?;
|
||||
@@ -771,16 +774,18 @@ impl JobTracker {
|
||||
fn spawn_job(&mut self, state: State, context: JobContext, payload: JobPayload) {
|
||||
let factory = self.factories.get(context.queue_name.as_str()).cloned();
|
||||
let task = {
|
||||
let log_context = LogContext::new(format!("worker-job-{}", context.queue_name));
|
||||
let context = context.clone();
|
||||
let span = context.span();
|
||||
async move {
|
||||
// We should never crash, but in case we do, we do that in the task and
|
||||
// don't crash the worker
|
||||
let job = factory.expect("unknown job factory")(payload);
|
||||
tracing::info!("Running job");
|
||||
job.run(&state, context).await
|
||||
}
|
||||
.instrument(span)
|
||||
log_context
|
||||
.run(async move || {
|
||||
// We should never crash, but in case we do, we do that in the task and
|
||||
// don't crash the worker
|
||||
let job = factory.expect("unknown job factory")(payload);
|
||||
tracing::info!("Running job");
|
||||
job.run(&state, context).await
|
||||
})
|
||||
.instrument(span)
|
||||
};
|
||||
|
||||
self.in_flight_jobs.add(
|
||||
|
||||
Reference in New Issue
Block a user