Fix HTTP request tracing and make the DNS resolver traced again
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3404,6 +3404,7 @@ dependencies = [
|
||||
name = "mas-http"
|
||||
version = "0.12.0"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http",
|
||||
"hyper-util",
|
||||
@@ -3412,6 +3413,7 @@ dependencies = [
|
||||
"opentelemetry-semantic-conventions",
|
||||
"reqwest",
|
||||
"rustls-platform-verifier",
|
||||
"tower 0.5.1",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
|
||||
@@ -107,6 +107,10 @@ features = ["derive"]
|
||||
version = "0.10.19"
|
||||
features = ["env", "yaml", "test"]
|
||||
|
||||
# Utilities for dealing with futures
|
||||
[workspace.dependencies.futures-util]
|
||||
version = "0.3.31"
|
||||
|
||||
# Rate-limiting
|
||||
[workspace.dependencies.governor]
|
||||
version = "0.7.0"
|
||||
|
||||
@@ -18,7 +18,7 @@ axum-extra.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
data-encoding = "2.6.0"
|
||||
futures-util = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
headers.workspace = true
|
||||
http.workspace = true
|
||||
http-body.workspace = true
|
||||
|
||||
@@ -15,7 +15,7 @@ workspace = true
|
||||
# Async runtime
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
futures-util = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
# Logging and tracing
|
||||
|
||||
@@ -12,6 +12,7 @@ repository.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
futures-util.workspace = true
|
||||
headers.workspace = true
|
||||
http.workspace = true
|
||||
hyper-util.workspace = true
|
||||
@@ -20,6 +21,7 @@ opentelemetry-semantic-conventions.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
reqwest.workspace = true
|
||||
rustls-platform-verifier.workspace = true
|
||||
tower.workspace = true
|
||||
tower-http.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
|
||||
@@ -3,10 +3,14 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Please see LICENSE in the repository root for full details.
|
||||
|
||||
use std::{future::Future, time::Duration};
|
||||
use std::{future::Future, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use futures_util::FutureExt as _;
|
||||
use headers::{ContentLength, HeaderMapExt as _, Host, UserAgent};
|
||||
use hyper_util::client::legacy::connect::HttpInfo;
|
||||
use hyper_util::client::legacy::connect::{
|
||||
dns::{GaiResolver, Name},
|
||||
HttpInfo,
|
||||
};
|
||||
use opentelemetry_http::HeaderInjector;
|
||||
use opentelemetry_semantic_conventions::{
|
||||
attribute::{HTTP_REQUEST_BODY_SIZE, HTTP_RESPONSE_BODY_SIZE},
|
||||
@@ -16,11 +20,40 @@ use opentelemetry_semantic_conventions::{
|
||||
USER_AGENT_ORIGINAL,
|
||||
},
|
||||
};
|
||||
use tower::{BoxError, Service as _};
|
||||
use tracing::Instrument;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
static USER_AGENT: &str = concat!("matrix-authentication-service/", env!("CARGO_PKG_VERSION"));
|
||||
|
||||
struct TracingResolver {
|
||||
inner: GaiResolver,
|
||||
}
|
||||
|
||||
impl TracingResolver {
|
||||
fn new() -> Self {
|
||||
let inner = GaiResolver::new();
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl reqwest::dns::Resolve for TracingResolver {
|
||||
fn resolve(&self, name: reqwest::dns::Name) -> reqwest::dns::Resolving {
|
||||
let span = tracing::info_span!("dns.resolve", name = name.as_str());
|
||||
let inner = &mut self.inner.clone();
|
||||
Box::pin(
|
||||
inner
|
||||
.call(Name::from_str(name.as_str()).unwrap())
|
||||
.map(|result| {
|
||||
result
|
||||
.map(|addrs| -> reqwest::dns::Addrs { Box::new(addrs) })
|
||||
.map_err(|err| -> BoxError { Box::new(err) })
|
||||
})
|
||||
.instrument(span),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new [`reqwest::Client`] with sane parameters
|
||||
///
|
||||
/// # Panics
|
||||
@@ -28,9 +61,9 @@ static USER_AGENT: &str = concat!("matrix-authentication-service/", env!("CARGO_
|
||||
/// Panics if the client fails to build, which should never happen
|
||||
#[must_use]
|
||||
pub fn client() -> reqwest::Client {
|
||||
// TODO: make the resolver tracing again
|
||||
// TODO: can/should we limit in-flight requests?
|
||||
reqwest::Client::builder()
|
||||
.dns_resolver(Arc::new(TracingResolver::new()))
|
||||
.use_preconfigured_tls(rustls_platform_verifier::tls_config())
|
||||
.user_agent(USER_AGENT)
|
||||
.timeout(Duration::from_secs(60))
|
||||
@@ -74,7 +107,6 @@ async fn send_traced(
|
||||
{ USER_AGENT_ORIGINAL } = user_agent,
|
||||
"rust.error" = tracing::field::Empty,
|
||||
);
|
||||
let _guard = span.enter();
|
||||
|
||||
// Inject the span context into the request headers
|
||||
let context = span.context();
|
||||
@@ -83,37 +115,42 @@ async fn send_traced(
|
||||
propagator.inject_context(&context, &mut injector);
|
||||
});
|
||||
|
||||
match client.execute(request).in_current_span().await {
|
||||
Ok(response) => {
|
||||
span.record("otel.status_code", "OK");
|
||||
span.record(HTTP_RESPONSE_STATUS_CODE, response.status().as_u16());
|
||||
async move {
|
||||
let span = tracing::Span::current();
|
||||
match client.execute(request).await {
|
||||
Ok(response) => {
|
||||
span.record("otel.status_code", "OK");
|
||||
span.record(HTTP_RESPONSE_STATUS_CODE, response.status().as_u16());
|
||||
|
||||
if let Some(ContentLength(content_length)) = response.headers().typed_get() {
|
||||
span.record(HTTP_RESPONSE_BODY_SIZE, content_length);
|
||||
if let Some(ContentLength(content_length)) = response.headers().typed_get() {
|
||||
span.record(HTTP_RESPONSE_BODY_SIZE, content_length);
|
||||
}
|
||||
|
||||
if let Some(http_info) = response.extensions().get::<HttpInfo>() {
|
||||
let local = http_info.local_addr();
|
||||
let remote = http_info.remote_addr();
|
||||
|
||||
let family = if local.is_ipv4() { "ipv4" } else { "ipv6" };
|
||||
span.record(NETWORK_TYPE, family);
|
||||
span.record(CLIENT_ADDRESS, remote.ip().to_string());
|
||||
span.record(CLIENT_PORT, remote.port());
|
||||
span.record(SERVER_ADDRESS, local.ip().to_string());
|
||||
span.record(SERVER_PORT, local.port());
|
||||
} else {
|
||||
tracing::warn!("No HttpInfo injected in response extensions");
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
if let Some(http_info) = response.extensions().get::<HttpInfo>() {
|
||||
let local = http_info.local_addr();
|
||||
let remote = http_info.remote_addr();
|
||||
|
||||
let family = if local.is_ipv4() { "ipv4" } else { "ipv6" };
|
||||
span.record(NETWORK_TYPE, family);
|
||||
span.record(CLIENT_ADDRESS, remote.ip().to_string());
|
||||
span.record(CLIENT_PORT, remote.port());
|
||||
span.record(SERVER_ADDRESS, local.ip().to_string());
|
||||
span.record(SERVER_PORT, local.port());
|
||||
} else {
|
||||
tracing::warn!("No HttpInfo injected in response extensions");
|
||||
Err(err) => {
|
||||
span.record("otel.status_code", "ERROR");
|
||||
span.record("rust.error", &err as &dyn std::error::Error);
|
||||
Err(err)
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
Err(err) => {
|
||||
span.record("otel.status_code", "ERROR");
|
||||
span.record("rust.error", &err as &dyn std::error::Error);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
|
||||
/// An extension trait implemented for [`reqwest::RequestBuilder`] to send a
|
||||
|
||||
@@ -17,7 +17,7 @@ async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
convert_case = "0.6.0"
|
||||
csv = "1.3.0"
|
||||
futures-util = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -13,7 +13,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
futures-util = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
http-body.workspace = true
|
||||
hyper = { workspace = true, features = ["server"] }
|
||||
hyper-util.workspace = true
|
||||
|
||||
@@ -21,7 +21,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
futures-util = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
opentelemetry-semantic-conventions.workspace = true
|
||||
|
||||
rand.workspace = true
|
||||
|
||||
@@ -15,7 +15,7 @@ workspace = true
|
||||
async-trait.workspace = true
|
||||
chrono.workspace = true
|
||||
thiserror.workspace = true
|
||||
futures-util = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
|
||||
apalis-core = { version = "0.4.9", features = ["tokio-comp"] }
|
||||
opentelemetry.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user