diff --git a/Cargo.lock b/Cargo.lock index 46876de76..06d1ba21c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3404,6 +3404,7 @@ dependencies = [ name = "mas-http" version = "0.12.0" dependencies = [ + "futures-util", "headers", "http", "hyper-util", @@ -3412,6 +3413,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "reqwest", "rustls-platform-verifier", + "tower 0.5.1", "tower-http", "tracing", "tracing-opentelemetry", diff --git a/Cargo.toml b/Cargo.toml index f1177edfc..e5d3ae134 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,10 @@ features = ["derive"] version = "0.10.19" features = ["env", "yaml", "test"] +# Utilities for dealing with futures +[workspace.dependencies.futures-util] +version = "0.3.31" + # Rate-limiting [workspace.dependencies.governor] version = "0.7.0" diff --git a/crates/axum-utils/Cargo.toml b/crates/axum-utils/Cargo.toml index e23e065b6..24e679314 100644 --- a/crates/axum-utils/Cargo.toml +++ b/crates/axum-utils/Cargo.toml @@ -18,7 +18,7 @@ axum-extra.workspace = true bytes.workspace = true chrono.workspace = true data-encoding = "2.6.0" -futures-util = "0.3.31" +futures-util.workspace = true headers.workspace = true http.workspace = true http-body.workspace = true diff --git a/crates/handlers/Cargo.toml b/crates/handlers/Cargo.toml index d7fb3e850..bca0126b9 100644 --- a/crates/handlers/Cargo.toml +++ b/crates/handlers/Cargo.toml @@ -15,7 +15,7 @@ workspace = true # Async runtime tokio.workspace = true tokio-util.workspace = true -futures-util = "0.3.31" +futures-util.workspace = true async-trait.workspace = true # Logging and tracing diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 1f86794a2..1bb5a6254 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true workspace = true [dependencies] +futures-util.workspace = true headers.workspace = true http.workspace = true hyper-util.workspace = true @@ -20,6 +21,7 @@ opentelemetry-semantic-conventions.workspace = true opentelemetry.workspace = true reqwest.workspace = true rustls-platform-verifier.workspace = true +tower.workspace = true tower-http.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true diff --git a/crates/http/src/reqwest.rs b/crates/http/src/reqwest.rs index 9ed10e5f4..7a1fd947d 100644 --- a/crates/http/src/reqwest.rs +++ b/crates/http/src/reqwest.rs @@ -3,10 +3,14 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use std::{future::Future, time::Duration}; +use std::{future::Future, str::FromStr, sync::Arc, time::Duration}; +use futures_util::FutureExt as _; use headers::{ContentLength, HeaderMapExt as _, Host, UserAgent}; -use hyper_util::client::legacy::connect::HttpInfo; +use hyper_util::client::legacy::connect::{ + dns::{GaiResolver, Name}, + HttpInfo, +}; use opentelemetry_http::HeaderInjector; use opentelemetry_semantic_conventions::{ attribute::{HTTP_REQUEST_BODY_SIZE, HTTP_RESPONSE_BODY_SIZE}, @@ -16,11 +20,40 @@ use opentelemetry_semantic_conventions::{ USER_AGENT_ORIGINAL, }, }; +use tower::{BoxError, Service as _}; use tracing::Instrument; use tracing_opentelemetry::OpenTelemetrySpanExt; static USER_AGENT: &str = concat!("matrix-authentication-service/", env!("CARGO_PKG_VERSION")); +struct TracingResolver { + inner: GaiResolver, +} + +impl TracingResolver { + fn new() -> Self { + let inner = GaiResolver::new(); + Self { inner } + } +} + +impl reqwest::dns::Resolve for TracingResolver { + fn resolve(&self, name: reqwest::dns::Name) -> reqwest::dns::Resolving { + let span = tracing::info_span!("dns.resolve", name = name.as_str()); + let inner = &mut self.inner.clone(); + Box::pin( + inner + .call(Name::from_str(name.as_str()).unwrap()) + .map(|result| { + result + .map(|addrs| -> reqwest::dns::Addrs { Box::new(addrs) }) + .map_err(|err| -> BoxError { Box::new(err) }) + }) + .instrument(span), + ) + } +} + /// Create a new [`reqwest::Client`] with sane parameters /// /// # Panics @@ -28,9 +61,9 @@ static USER_AGENT: &str = concat!("matrix-authentication-service/", env!("CARGO_ /// Panics if the client fails to build, which should never happen #[must_use] pub fn client() -> reqwest::Client { - // TODO: make the resolver tracing again // TODO: can/should we limit in-flight requests? reqwest::Client::builder() + .dns_resolver(Arc::new(TracingResolver::new())) .use_preconfigured_tls(rustls_platform_verifier::tls_config()) .user_agent(USER_AGENT) .timeout(Duration::from_secs(60)) @@ -74,7 +107,6 @@ async fn send_traced( { USER_AGENT_ORIGINAL } = user_agent, "rust.error" = tracing::field::Empty, ); - let _guard = span.enter(); // Inject the span context into the request headers let context = span.context(); @@ -83,37 +115,42 @@ async fn send_traced( propagator.inject_context(&context, &mut injector); }); - match client.execute(request).in_current_span().await { - Ok(response) => { - span.record("otel.status_code", "OK"); - span.record(HTTP_RESPONSE_STATUS_CODE, response.status().as_u16()); + async move { + let span = tracing::Span::current(); + match client.execute(request).await { + Ok(response) => { + span.record("otel.status_code", "OK"); + span.record(HTTP_RESPONSE_STATUS_CODE, response.status().as_u16()); - if let Some(ContentLength(content_length)) = response.headers().typed_get() { - span.record(HTTP_RESPONSE_BODY_SIZE, content_length); + if let Some(ContentLength(content_length)) = response.headers().typed_get() { + span.record(HTTP_RESPONSE_BODY_SIZE, content_length); + } + + if let Some(http_info) = response.extensions().get::() { + let local = http_info.local_addr(); + let remote = http_info.remote_addr(); + + let family = if local.is_ipv4() { "ipv4" } else { "ipv6" }; + span.record(NETWORK_TYPE, family); + span.record(CLIENT_ADDRESS, remote.ip().to_string()); + span.record(CLIENT_PORT, remote.port()); + span.record(SERVER_ADDRESS, local.ip().to_string()); + span.record(SERVER_PORT, local.port()); + } else { + tracing::warn!("No HttpInfo injected in response extensions"); + } + + Ok(response) } - - if let Some(http_info) = response.extensions().get::() { - let local = http_info.local_addr(); - let remote = http_info.remote_addr(); - - let family = if local.is_ipv4() { "ipv4" } else { "ipv6" }; - span.record(NETWORK_TYPE, family); - span.record(CLIENT_ADDRESS, remote.ip().to_string()); - span.record(CLIENT_PORT, remote.port()); - span.record(SERVER_ADDRESS, local.ip().to_string()); - span.record(SERVER_PORT, local.port()); - } else { - tracing::warn!("No HttpInfo injected in response extensions"); + Err(err) => { + span.record("otel.status_code", "ERROR"); + span.record("rust.error", &err as &dyn std::error::Error); + Err(err) } - - Ok(response) - } - Err(err) => { - span.record("otel.status_code", "ERROR"); - span.record("rust.error", &err as &dyn std::error::Error); - Err(err) } } + .instrument(span) + .await } /// An extension trait implemented for [`reqwest::RequestBuilder`] to send a diff --git a/crates/iana-codegen/Cargo.toml b/crates/iana-codegen/Cargo.toml index 6e81d2901..d6b592039 100644 --- a/crates/iana-codegen/Cargo.toml +++ b/crates/iana-codegen/Cargo.toml @@ -17,7 +17,7 @@ async-trait.workspace = true camino.workspace = true convert_case = "0.6.0" csv = "1.3.0" -futures-util = "0.3.31" +futures-util.workspace = true reqwest.workspace = true serde.workspace = true tokio.workspace = true diff --git a/crates/listener/Cargo.toml b/crates/listener/Cargo.toml index 19b22e84b..4d0101901 100644 --- a/crates/listener/Cargo.toml +++ b/crates/listener/Cargo.toml @@ -13,7 +13,7 @@ workspace = true [dependencies] bytes.workspace = true -futures-util = "0.3.31" +futures-util.workspace = true http-body.workspace = true hyper = { workspace = true, features = ["server"] } hyper-util.workspace = true diff --git a/crates/storage-pg/Cargo.toml b/crates/storage-pg/Cargo.toml index 78d13631d..8f354184d 100644 --- a/crates/storage-pg/Cargo.toml +++ b/crates/storage-pg/Cargo.toml @@ -21,7 +21,7 @@ serde.workspace = true serde_json.workspace = true thiserror.workspace = true tracing.workspace = true -futures-util = "0.3.31" +futures-util.workspace = true opentelemetry-semantic-conventions.workspace = true rand.workspace = true diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index d6039f389..53b62c916 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -15,7 +15,7 @@ workspace = true async-trait.workspace = true chrono.workspace = true thiserror.workspace = true -futures-util = "0.3.31" +futures-util.workspace = true apalis-core = { version = "0.4.9", features = ["tokio-comp"] } opentelemetry.workspace = true