From 49bf5287b331f4644f7ee50aebd817a19f978da3 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 14 Oct 2021 17:39:49 +0200 Subject: [PATCH] Propagate parent trace context --- Cargo.lock | 14 ++++ crates/cli/Cargo.toml | 1 + crates/cli/src/server.rs | 82 +++++++++++++++++--- crates/cli/src/telemetry.rs | 20 ++++- crates/core/src/handlers/oauth2/discovery.rs | 16 ++-- 5 files changed, 117 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ddbd99c30..bd5060f50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1397,6 +1397,7 @@ dependencies = [ "mas-config", "mas-core", "opentelemetry", + "opentelemetry-http", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "schemars", @@ -1764,6 +1765,19 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentelemetry-http" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d50ceb0b0e8b75cb3e388a2571a807c8228dabc5d6670f317b6eb21301095373" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "opentelemetry", +] + [[package]] name = "opentelemetry-otlp" version = "0.9.0" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 02bf453c5..4a86c8320 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -28,6 +28,7 @@ url = "2.2.2" mas-config = { path = "../config" } mas-core = { path = "../core" } +opentelemetry-http = "0.5.0" [dev-dependencies] indoc = "1.0.3" diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 2edcb4c47..42c0d7d99 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -19,21 +19,21 @@ use std::{ use anyhow::Context; use clap::Clap; -use hyper::{header, Server}; +use hyper::{header, Server, Version}; use mas_config::RootConfig; use mas_core::{ storage::MIGRATOR, tasks::{self, TaskQueue}, templates::Templates, }; +use opentelemetry_http::HeaderExtractor; use tower::{make::Shared, ServiceBuilder}; use tower_http::{ compression::CompressionLayer, sensitive_headers::SetSensitiveHeadersLayer, - trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, - LatencyUnit, + trace::{MakeSpan, OnResponse, TraceLayer}, }; -use tracing::info; +use tracing::{field, info}; use super::RootCommand; @@ -44,6 +44,72 @@ pub(super) struct ServerCommand { migrate: bool, } +#[derive(Debug, Clone, Default)] +struct OtelMakeSpan; + +impl MakeSpan for OtelMakeSpan { + fn make_span(&mut self, request: &hyper::Request) -> tracing::Span { + // Extract the context from the headers + let headers = request.headers(); + let extractor = HeaderExtractor(headers); + + let cx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor) + }); + + // Attach the context so when the request span is created it gets properly + // parented + let _guard = cx.attach(); + + let version = match request.version() { + Version::HTTP_09 => "0.9", + Version::HTTP_10 => "1.0", + Version::HTTP_11 => "1.1", + Version::HTTP_2 => "2.0", + Version::HTTP_3 => "3.0", + _ => "", + }; + + let span = tracing::info_span!( + "request", + http.method = %request.method(), + http.target = %request.uri(), + http.flavor = version, + http.status_code = field::Empty, + http.user_agent = field::Empty, + otel.kind = "server", + otel.status_code = field::Empty, + ); + + if let Some(user_agent) = headers + .get(header::USER_AGENT) + .and_then(|s| s.to_str().ok()) + { + span.record("http.user_agent", &user_agent); + } + + span + } +} + +#[derive(Debug, Clone, Default)] +struct OtelOnResponse; + +impl OnResponse for OtelOnResponse { + fn on_response(self, response: &hyper::Response, _latency: Duration, span: &tracing::Span) { + let s = response.status(); + let status = if s.is_success() { + "ok" + } else if s.is_client_error() || s.is_server_error() { + "error" + } else { + "unset" + }; + span.record("otel.status_code", &status); + span.record("http.status_code", &s.as_u16()); + } +} + impl ServerCommand { pub async fn run(&self, root: &RootCommand) -> anyhow::Result<()> { let config: RootConfig = root.load_config()?; @@ -80,12 +146,8 @@ impl ServerCommand { // Add high level tracing/logging to all requests .layer( TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().include_headers(true)) - .on_response( - DefaultOnResponse::new() - .include_headers(true) - .latency_unit(LatencyUnit::Micros), - ), + .make_span_with(OtelMakeSpan) + .on_response(OtelOnResponse), ) // Set a timeout .timeout(Duration::from_secs(10)) diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index 470973e60..8067fc795 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -18,12 +18,19 @@ use futures::stream::{Stream, StreamExt}; use mas_config::{MetricsConfig, TelemetryConfig, TracingConfig}; use opentelemetry::{ global, - sdk::{self, trace::Tracer, Resource}, + propagation::TextMapPropagator, + sdk::{ + self, + propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, + trace::Tracer, + Resource, + }, }; use opentelemetry_semantic_conventions as semcov; pub fn setup(config: &TelemetryConfig) -> anyhow::Result> { global::set_error_handler(|e| tracing::error!("{}", e))?; + global::set_text_map_propagator(propagator()); let tracer = tracer(&config.tracing)?; meter(&config.metrics)?; @@ -34,6 +41,17 @@ pub fn shutdown() { global::shutdown_tracer_provider(); } +fn propagator() -> impl TextMapPropagator { + // TODO: make this configurable + let baggage_propagator = BaggagePropagator::new(); + let trace_context_propagator = TraceContextPropagator::new(); + + TextMapCompositePropagator::new(vec![ + Box::new(baggage_propagator), + Box::new(trace_context_propagator), + ]) +} + #[cfg(feature = "otlp")] fn otlp_tracer(endpoint: &Option) -> anyhow::Result { use opentelemetry_otlp::WithExportConfig; diff --git a/crates/core/src/handlers/oauth2/discovery.rs b/crates/core/src/handlers/oauth2/discovery.rs index 083d5f88d..766216ae1 100644 --- a/crates/core/src/handlers/oauth2/discovery.rs +++ b/crates/core/src/handlers/oauth2/discovery.rs @@ -14,6 +14,7 @@ use std::collections::HashSet; +use hyper::Method; use oauth2_types::{ oidc::Metadata, pkce::CodeChallengeMethod, @@ -86,10 +87,15 @@ pub(super) fn filter( code_challenge_methods_supported, }; - let cors = warp::cors().allow_any_origin(); + // TODO: get the headers list from the global opentelemetry propagators + let cors = warp::cors() + .allow_method(Method::GET) + .allow_any_origin() + .allow_headers(["traceparent"]); - warp::path!(".well-known" / "openid-configuration") - .and(warp::get()) - .map(move || warp::reply::json(&metadata)) - .with(cors) + warp::path!(".well-known" / "openid-configuration").and( + warp::get() + .map(move || warp::reply::json(&metadata)) + .with(cors), + ) }