From 29cbb7d5056ae00d137e97218ba5d0457b12d755 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Fri, 24 Jun 2022 19:05:19 -0700 Subject: [PATCH 1/4] opentelemetry: Update otel to 0.18.0 ## Motivation Support the latest OpenTelemetry specification. ## Solution Update `opentelemetry` to the latest `0.18.x` release. Breaking changes in the metrics spec have removed value recorders and added histograms so the metrics layer's `value.` prefix has been changed to `histogram.` and behaves accordingly. Additionally the `PushController` configuration for the metrics layer has been simplified to accept a `BasicController` that can act in either push or pull modes. Finally trace sampling in the sdk's `PreSampledTracer` impl has been updated to match the sampling logic in https://github.com/open-telemetry/opentelemetry-rust/pull/839. --- tracing-opentelemetry/Cargo.toml | 10 +- tracing-opentelemetry/src/metrics.rs | 93 ++++--- tracing-opentelemetry/src/subscriber.rs | 109 ++++---- tracing-opentelemetry/src/tracer.rs | 26 +- .../tests/metrics_publishing.rs | 248 +++++++++--------- .../tests/trace_state_propagation.rs | 21 +- 6 files changed, 260 insertions(+), 247 deletions(-) diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index ec7d4a8606..36e51e5a96 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -25,20 +25,16 @@ default = ["tracing-log", "metrics"] metrics = ["opentelemetry/metrics"] [dependencies] -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } tracing = { path = "../tracing", version = "0.2", default-features = false, features = ["std"] } tracing-core = { path = "../tracing-core", version = "0.2" } tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default-features = false, features = ["registry", "std"] } tracing-log = { path = "../tracing-log", version = "0.2", default-features = false, optional = true } once_cell = "1.13.0" -# Fix minimal-versions; opentelemetry specifies async-trait = "0.1" which breaks -async-trait = "0.1.20" - [dev-dependencies] -async-trait = "0.1.56" criterion = { version = "0.3.6", default_features = false } -opentelemetry-jaeger = "0.16.0" +opentelemetry-jaeger = "0.17.0" futures-util = { version = "0.3.21", default-features = false } tokio = { version = "1.20.0", features = ["full"] } tokio-stream = "0.1.9" @@ -52,4 +48,4 @@ harness = false [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file +rustdoc-args = ["--cfg", "docsrs"] diff --git a/tracing-opentelemetry/src/metrics.rs b/tracing-opentelemetry/src/metrics.rs index 76c0ed2d37..147a9f8833 100644 --- a/tracing-opentelemetry/src/metrics.rs +++ b/tracing-opentelemetry/src/metrics.rs @@ -3,8 +3,9 @@ use tracing::{field::Visit, Collect}; use tracing_core::Field; use opentelemetry::{ - metrics::{Counter, Meter, MeterProvider, UpDownCounter, ValueRecorder}, - sdk::metrics::PushController, + metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, + sdk::metrics::controllers::BasicController, + Context as OtelContext, }; use tracing_subscriber::{registry::LookupSpan, subscribe::Context, Subscribe}; @@ -13,7 +14,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter."; const METRIC_PREFIX_COUNTER: &str = "counter."; -const METRIC_PREFIX_VALUE: &str = "value."; +const METRIC_PREFIX_HISTOGRAM: &str = "histogram."; const I64_MAX: u64 = i64::MAX as u64; #[derive(Default)] @@ -22,9 +23,9 @@ pub(crate) struct Instruments { f64_counter: MetricsMap>, i64_up_down_counter: MetricsMap>, f64_up_down_counter: MetricsMap>, - u64_value_recorder: MetricsMap>, - i64_value_recorder: MetricsMap>, - f64_value_recorder: MetricsMap>, + u64_histogram: MetricsMap>, + i64_histogram: MetricsMap>, + f64_histogram: MetricsMap>, } type MetricsMap = RwLock>; @@ -35,14 +36,15 @@ pub(crate) enum InstrumentType { CounterF64(f64), UpDownCounterI64(i64), UpDownCounterF64(f64), - ValueRecorderU64(u64), - ValueRecorderI64(i64), - ValueRecorderF64(f64), + HistogramU64(u64), + HistogramI64(i64), + HistogramF64(f64), } impl Instruments { pub(crate) fn update_metric( &self, + cx: &OtelContext, meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, @@ -76,7 +78,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::CounterF64(value) => { @@ -84,7 +86,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterI64(value) => { @@ -92,7 +94,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterF64(value) => { @@ -100,31 +102,31 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } - InstrumentType::ValueRecorderU64(value) => { + InstrumentType::HistogramU64(value) => { update_or_insert( - &self.u64_value_recorder, + &self.u64_histogram, metric_name, - || meter.u64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.u64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderI64(value) => { + InstrumentType::HistogramI64(value) => { update_or_insert( - &self.i64_value_recorder, + &self.i64_histogram, metric_name, - || meter.i64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.i64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderF64(value) => { + InstrumentType::HistogramF64(value) => { update_or_insert( - &self.f64_value_recorder, + &self.f64_histogram, metric_name, - || meter.f64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.f64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } }; @@ -142,8 +144,10 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_u64(&mut self, field: &Field, value: u64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value), metric_name, @@ -151,6 +155,7 @@ impl<'a> Visit for MetricVisitor<'a> { } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value as i64), metric_name, @@ -163,54 +168,63 @@ impl<'a> Visit for MetricVisitor<'a> { value ); } - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderU64(value), + InstrumentType::HistogramU64(value), metric_name, ); } } fn record_f64(&mut self, field: &Field, value: f64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterF64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderF64(value), + InstrumentType::HistogramF64(value), metric_name, ); } } fn record_i64(&mut self, field: &Field, value: i64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value as u64), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderI64(value), + InstrumentType::HistogramI64(value), metric_name, ); } @@ -232,14 +246,14 @@ impl<'a> Visit for MetricVisitor<'a> { /// use tracing_opentelemetry::MetricsSubscriber; /// use tracing_subscriber::subscribe::CollectExt; /// use tracing_subscriber::Registry; -/// # use opentelemetry::sdk::metrics::PushController; +/// # use opentelemetry::sdk::metrics::controllers::BasicController; /// -/// // Constructing a PushController is out-of-scope for the docs here, but there +/// // Constructing a BasicController is out-of-scope for the docs here, but there /// // are examples in the opentelemetry repository. See: -/// // https://github.com/open-telemetry/opentelemetry-rust/blob/c13a11e62a68eacd8c41a0742a0d097808e28fbd/examples/basic-otlp/src/main.rs#L39-L53 -/// # let push_controller: PushController = unimplemented!(); +/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52 +/// # let controller: BasicController = unimplemented!(); /// -/// let opentelemetry_metrics = MetricsSubscriber::new(push_controller); +/// let opentelemetry_metrics = MetricsSubscriber::new(controller); /// let collector = Registry::default().with(opentelemetry_metrics); /// tracing::collect::set_global_default(collector).unwrap(); /// ``` @@ -328,10 +342,9 @@ pub struct MetricsSubscriber { impl MetricsSubscriber { /// Create a new instance of MetricsSubscriber. - pub fn new(push_controller: PushController) -> Self { - let meter = push_controller - .provider() - .meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION)); + pub fn new(controller: BasicController) -> Self { + let meter = + controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None); MetricsSubscriber { meter, instruments: Default::default(), diff --git a/tracing-opentelemetry/src/subscriber.rs b/tracing-opentelemetry/src/subscriber.rs index f0936271e8..6450b7f7bd 100644 --- a/tracing-opentelemetry/src/subscriber.rs +++ b/tracing-opentelemetry/src/subscriber.rs @@ -1,7 +1,7 @@ use crate::{OtelData, PreSampledTracer}; use once_cell::unsync; use opentelemetry::{ - trace::{self as otel, noop, TraceContextExt}, + trace::{self as otel, noop, OrderMap, TraceContextExt}, Context as OtelContext, Key, KeyValue, Value, }; use std::fmt; @@ -100,12 +100,11 @@ fn str_to_span_kind(s: &str) -> Option { } } -fn str_to_status_code(s: &str) -> Option { +fn str_to_status(s: &str) -> otel::Status { match s { - s if s.eq_ignore_ascii_case("unset") => Some(otel::StatusCode::Unset), - s if s.eq_ignore_ascii_case("ok") => Some(otel::StatusCode::Ok), - s if s.eq_ignore_ascii_case("error") => Some(otel::StatusCode::Error), - _ => None, + s if s.eq_ignore_ascii_case("ok") => otel::Status::Ok, + s if s.eq_ignore_ascii_case("error") => otel::Status::error(""), + _ => otel::Status::Unset, } } @@ -199,7 +198,7 @@ impl<'a> SpanAttributeVisitor<'a> { fn record(&mut self, attribute: KeyValue) { debug_assert!(self.0.attributes.is_some()); if let Some(v) = self.0.attributes.as_mut() { - v.push(attribute); + v.insert(attribute.key, attribute.value); } } } @@ -233,8 +232,8 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.0.name = value.to_string().into(), SPAN_KIND_FIELD => self.0.span_kind = str_to_span_kind(value), - SPAN_STATUS_CODE_FIELD => self.0.status_code = str_to_status_code(value), - SPAN_STATUS_MESSAGE_FIELD => self.0.status_message = Some(value.to_owned().into()), + SPAN_STATUS_CODE_FIELD => self.0.status = str_to_status(value), + SPAN_STATUS_MESSAGE_FIELD => self.0.status = otel::Status::error(value.to_string()), _ => self.record(KeyValue::new(field.name(), value.to_string())), } } @@ -247,11 +246,9 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.0.name = format!("{:?}", value).into(), SPAN_KIND_FIELD => self.0.span_kind = str_to_span_kind(&format!("{:?}", value)), - SPAN_STATUS_CODE_FIELD => { - self.0.status_code = str_to_status_code(&format!("{:?}", value)) - } + SPAN_STATUS_CODE_FIELD => self.0.status = str_to_status(&format!("{:?}", value)), SPAN_STATUS_MESSAGE_FIELD => { - self.0.status_message = Some(format!("{:?}", value).into()) + self.0.status = otel::Status::error(format!("{:?}", value)) } _ => self.record(Key::new(field.name()).string(format!("{:?}", value))), } @@ -277,7 +274,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -314,7 +311,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -506,7 +503,7 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } - let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( + let builder_attrs = builder.attributes.get_or_insert(OrderMap::with_capacity( attrs.fields().len() + self.extra_span_attrs(), )); @@ -514,26 +511,26 @@ where let meta = attrs.metadata(); if let Some(filename) = meta.file() { - builder_attrs.push(KeyValue::new("code.filepath", filename)); + builder_attrs.insert("code.filepath".into(), filename.into()); } if let Some(module) = meta.module_path() { - builder_attrs.push(KeyValue::new("code.namespace", module)); + builder_attrs.insert("code.namespace".into(), module.into()); } if let Some(line) = meta.line() { - builder_attrs.push(KeyValue::new("code.lineno", line as i64)); + builder_attrs.insert("code.lineno".into(), (line as i64).into()); } } if self.with_threads { - THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + THREAD_ID.with(|id| builder_attrs.insert("thread.id".into(), (**id as i64).into())); if let Some(name) = std::thread::current().name() { // TODO(eliza): it's a bummer that we have to allocate here, but // we can't easily get the string as a `static`. it would be // nice if `opentelemetry` could also take `Arc`s as // `String` values... - builder_attrs.push(KeyValue::new("thread.name", name.to_owned())); + builder_attrs.insert("thread.name".into(), name.to_owned().into()); } } @@ -653,8 +650,10 @@ where let mut extensions = span.extensions_mut(); if let Some(OtelData { builder, .. }) = extensions.get_mut::() { - if builder.status_code.is_none() && *meta.level() == tracing_core::Level::ERROR { - builder.status_code = Some(otel::StatusCode::Error); + if builder.status == otel::Status::Unset + && *meta.level() == tracing_core::Level::ERROR + { + builder.status = otel::Status::error("") } if self.location { @@ -712,15 +711,14 @@ where if self.tracked_inactivity { // Append busy/idle timings when enabled. if let Some(timings) = extensions.get_mut::() { - let busy_ns = KeyValue::new("busy_ns", timings.busy); - let idle_ns = KeyValue::new("idle_ns", timings.idle); - - if let Some(ref mut attributes) = builder.attributes { - attributes.push(busy_ns); - attributes.push(idle_ns); - } else { - builder.attributes = Some(vec![busy_ns, idle_ns]); - } + let busy_ns = Key::new("busy_ns"); + let idle_ns = Key::new("idle_ns"); + + let attributes = builder + .attributes + .get_or_insert_with(|| OrderMap::with_capacity(2)); + attributes.insert(busy_ns, timings.busy.into()); + attributes.insert(idle_ns, timings.idle.into()); } } @@ -773,7 +771,7 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 { mod tests { use super::*; use crate::OtelData; - use opentelemetry::trace::{noop, SpanKind, TraceFlags}; + use opentelemetry::trace::{noop, TraceFlags}; use std::{ borrow::Cow, collections::HashMap, @@ -849,7 +847,7 @@ mod tests { false } fn set_attribute(&mut self, _attribute: KeyValue) {} - fn set_status(&mut self, _code: otel::StatusCode, _message: String) {} + fn set_status(&mut self, _status: otel::Status) {} fn update_name>>(&mut self, _new_name: T) {} fn end_with_timestamp(&mut self, _timestamp: SystemTime) {} } @@ -881,7 +879,7 @@ mod tests { tracing_subscriber::registry().with(subscriber().with_tracer(tracer.clone())); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone()); @@ -895,11 +893,19 @@ mod tests { tracing_subscriber::registry().with(subscriber().with_tracer(tracer.clone())); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok); + tracing::debug_span!("request", otel.status_code = ?otel::Status::Ok); }); + let recorded_status = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); - let recorded_status_code = tracer.with_data(|data| data.builder.status_code); - assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok)) + assert_eq!(recorded_status, otel::Status::Ok) } #[test] @@ -914,8 +920,17 @@ mod tests { tracing::debug_span!("request", otel.status_message = message); }); - let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone()); - assert_eq!(recorded_status_message, Some(message.into())) + let recorded_status_message = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); + + assert_eq!(recorded_status_message, otel::Status::error(message)) } #[test] @@ -934,7 +949,7 @@ mod tests { let _g = existing_cx.attach(); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_trace_id = @@ -958,7 +973,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"idle_ns")); assert!(keys.contains(&"busy_ns")); @@ -977,7 +992,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"code.filepath")); assert!(keys.contains(&"code.namespace")); @@ -1000,7 +1015,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"code.filepath")); assert!(!keys.contains(&"code.namespace")); @@ -1012,7 +1027,7 @@ mod tests { let thread = thread::current(); let expected_name = thread .name() - .map(|name| Value::String(Cow::Owned(name.to_owned()))); + .map(|name| Value::String(name.to_owned().into())); let expected_id = Value::I64(thread_id_integer(thread.id()) as i64); let tracer = TestTracer(Arc::new(Mutex::new(None))); @@ -1026,7 +1041,7 @@ mod tests { let attributes = tracer .with_data(|data| data.builder.attributes.as_ref().unwrap().clone()) .drain(..) - .map(|keyval| (keyval.key.as_str().to_string(), keyval.value)) + .map(|(key, value)| (key.as_str().to_string(), value)) .collect::>(); assert_eq!(attributes.get("thread.name"), expected_name.as_ref()); assert_eq!(attributes.get("thread.id"), Some(&expected_id)); @@ -1045,7 +1060,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"thread.name")); assert!(!keys.contains(&"thread.id")); diff --git a/tracing-opentelemetry/src/tracer.rs b/tracing-opentelemetry/src/tracer.rs index 23e574450d..52513c36ad 100644 --- a/tracing-opentelemetry/src/tracer.rs +++ b/tracing-opentelemetry/src/tracer.rs @@ -1,9 +1,10 @@ -use opentelemetry::sdk::trace::{SamplingDecision, SamplingResult, Tracer, TracerProvider}; +use opentelemetry::sdk::trace::{Tracer, TracerProvider}; +use opentelemetry::trace::OrderMap; use opentelemetry::{ trace as otel, trace::{ - noop, SpanBuilder, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, - TraceState, + noop, SamplingDecision, SamplingResult, SpanBuilder, SpanContext, SpanId, SpanKind, + TraceContextExt, TraceFlags, TraceId, TraceState, }, Context as OtelContext, }; @@ -73,19 +74,18 @@ impl PreSampledTracer for Tracer { let builder = &mut data.builder; // Gather trace state - let (no_parent, trace_id, remote_parent, parent_trace_flags) = - current_trace_state(builder, parent_cx, &provider); + let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider); // Sample or defer to existing sampling decisions let (flags, trace_state) = if let Some(result) = &builder.sampling_result { process_sampling_result(result, parent_trace_flags) - } else if no_parent || remote_parent { + } else { builder.sampling_result = Some(provider.config().sampler.should_sample( Some(parent_cx), trace_id, &builder.name, builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal), - builder.attributes.as_deref().unwrap_or(&[]), + builder.attributes.as_ref().unwrap_or(&OrderMap::default()), builder.links.as_deref().unwrap_or(&[]), self.instrumentation_library(), )); @@ -94,12 +94,6 @@ impl PreSampledTracer for Tracer { builder.sampling_result.as_ref().unwrap(), parent_trace_flags, ) - } else { - // has parent that is local - Some(( - parent_trace_flags, - parent_cx.span().span_context().trace_state().clone(), - )) } .unwrap_or_default(); @@ -125,18 +119,16 @@ fn current_trace_state( builder: &SpanBuilder, parent_cx: &OtelContext, provider: &TracerProvider, -) -> (bool, TraceId, bool, TraceFlags) { +) -> (TraceId, TraceFlags) { if parent_cx.has_active_span() { let span = parent_cx.span(); let sc = span.span_context(); - (false, sc.trace_id(), sc.is_remote(), sc.trace_flags()) + (sc.trace_id(), sc.trace_flags()) } else { ( - true, builder .trace_id .unwrap_or_else(|| provider.config().id_generator.new_trace_id()), - false, Default::default(), ) } diff --git a/tracing-opentelemetry/tests/metrics_publishing.rs b/tracing-opentelemetry/tests/metrics_publishing.rs index 9b8d8a6bab..c3a143987b 100644 --- a/tracing-opentelemetry/tests/metrics_publishing.rs +++ b/tracing-opentelemetry/tests/metrics_publishing.rs @@ -1,26 +1,21 @@ -#![cfg(feature = "metrics")] -use async_trait::async_trait; -use futures_util::{Stream, StreamExt as _}; use opentelemetry::{ - metrics::{Descriptor, InstrumentKind}, - metrics::{Number, NumberKind}, + metrics::MetricsError, sdk::{ - export::{ - metrics::{ - CheckpointSet, ExportKind, ExportKindFor, ExportKindSelector, - Exporter as MetricsExporter, Points, Sum, - }, - trace::{SpanData, SpanExporter}, + export::metrics::{ + aggregation::{self, Histogram, Sum, TemporalitySelector}, + InstrumentationLibraryReader, }, metrics::{ - aggregators::{ArrayAggregator, SumAggregator}, - selectors::simple::Selector, + aggregators::{HistogramAggregator, SumAggregator}, + controllers::BasicController, + processors, + sdk_api::{Descriptor, InstrumentKind, Number, NumberKind}, + selectors, }, }, - Key, Value, + Context, }; use std::cmp::Ordering; -use std::time::Duration; use tracing::Collect; use tracing_opentelemetry::MetricsSubscriber; use tracing_subscriber::prelude::*; @@ -30,7 +25,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -40,11 +35,13 @@ async fn u64_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world2".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -54,11 +51,13 @@ async fn u64_counter_is_exported_i64_at_instrumentation_point() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "float_hello_world".to_string(), InstrumentKind::Counter, NumberKind::F64, @@ -68,11 +67,13 @@ async fn f64_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.float_hello_world = 1.000000123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -82,11 +83,13 @@ async fn i64_up_down_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak2".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -96,11 +99,13 @@ async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, NumberKind::F64, @@ -110,13 +115,15 @@ async fn f64_up_down_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak_blah = 99.123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn u64_value_is_exported() { - let subscriber = init_subscriber( +async fn u64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::U64, Number::from(9_u64), ); @@ -124,13 +131,15 @@ async fn u64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg = 9_u64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn i64_value_is_exported() { - let subscriber = init_subscriber( +async fn i64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_auenatsou".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::I64, Number::from(-19_i64), ); @@ -138,13 +147,15 @@ async fn i64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg_auenatsou = -19_i64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn f64_value_is_exported() { - let subscriber = init_subscriber( +async fn f64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_racecar".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::F64, Number::from(777.0012_f64), ); @@ -152,6 +163,8 @@ async fn f64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg_racecar = 777.0012_f64); }); + + exporter.export().unwrap(); } fn init_subscriber( @@ -159,24 +172,25 @@ fn init_subscriber( expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, -) -> impl Collect + 'static { +) -> (impl Collect + 'static, TestExporter) { + let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory( + selectors::simple::histogram(vec![-10.0, 100.0]), + aggregation::cumulative_temporality_selector(), + )) + .build(); + let exporter = TestExporter { expected_metric_name, expected_instrument_kind, expected_number_kind, expected_value, + controller: controller.clone(), }; - let push_controller = opentelemetry::sdk::metrics::controllers::push( - Selector::Exact, - ExportKindSelector::Stateless, + ( + tracing_subscriber::registry().with(MetricsSubscriber::new(controller)), exporter, - tokio::spawn, - delayed_interval, ) - .build(); - - tracing_subscriber::registry().with(MetricsSubscriber::new(push_controller)) } #[derive(Clone, Debug)] @@ -185,100 +199,84 @@ struct TestExporter { expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, + controller: BasicController, } -#[async_trait] -impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut _batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - Ok(()) - } -} +impl TestExporter { + fn export(&self) -> Result<(), MetricsError> { + self.controller.collect(&Context::current())?; + self.controller.try_for_each(&mut |library, reader| { + reader.try_for_each(self, &mut |record| { + assert_eq!(self.expected_metric_name, record.descriptor().name()); + assert_eq!( + self.expected_instrument_kind, + *record.descriptor().instrument_kind() + ); + assert_eq!( + self.expected_number_kind, + *record.descriptor().number_kind() + ); + match self.expected_instrument_kind { + InstrumentKind::Counter | InstrumentKind::UpDownCounter => { + let number = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .sum() + .unwrap(); + + assert_eq!( + Ordering::Equal, + number + .partial_cmp(&NumberKind::U64, &self.expected_value) + .unwrap() + ); + } + InstrumentKind::Histogram => { + let histogram = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .histogram() + .unwrap(); + + let counts = histogram.counts(); + if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 { + assert_eq!(counts, &[0.0, 0.0, 1.0]); + } else if self.expected_value.to_i64(&self.expected_number_kind) > 0 { + assert_eq!(counts, &[0.0, 1.0, 0.0]); + } else { + assert_eq!(counts, &[1.0, 0.0, 0.0]); + } + } + _ => panic!( + "InstrumentKind {:?} not currently supported!", + self.expected_instrument_kind + ), + }; -impl MetricsExporter for TestExporter { - fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> opentelemetry::metrics::Result<()> { - checkpoint_set.try_for_each(self, &mut |record| { - assert_eq!(self.expected_metric_name, record.descriptor().name()); - assert_eq!( - self.expected_instrument_kind, - *record.descriptor().instrument_kind() - ); - assert_eq!( - self.expected_number_kind, - *record.descriptor().number_kind() - ); - let number = match self.expected_instrument_kind { - InstrumentKind::Counter | InstrumentKind::UpDownCounter => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .sum() - .unwrap(), - InstrumentKind::ValueRecorder => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .points() - .unwrap()[0] - .clone(), - _ => panic!( - "InstrumentKind {:?} not currently supported!", - self.expected_instrument_kind - ), - }; - assert_eq!( - Ordering::Equal, - number - .partial_cmp(&NumberKind::U64, &self.expected_value) - .unwrap() - ); - - // The following are the same regardless of the individual metric. - assert_eq!( - INSTRUMENTATION_LIBRARY_NAME, - record.descriptor().instrumentation_library().name - ); - assert_eq!( - CARGO_PKG_VERSION, - record.descriptor().instrumentation_version().unwrap() - ); - assert_eq!( - Value::String("unknown_service".into()), - record - .resource() - .get(Key::new("service.name".to_string())) - .unwrap() - ); - - opentelemetry::metrics::Result::Ok(()) + // The following are the same regardless of the individual metric. + assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name); + assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap()); + + Ok(()) + }) }) } } -impl ExportKindFor for TestExporter { - fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind { +impl TemporalitySelector for TestExporter { + fn temporality_for( + &self, + _descriptor: &Descriptor, + _kind: &aggregation::AggregationKind, + ) -> aggregation::Temporality { // I don't think the value here makes a difference since // we are just testing a single metric. - ExportKind::Cumulative + aggregation::Temporality::Cumulative } } - -// From opentelemetry::sdk::util:: -// For some reason I can't pull it in from the other crate, it gives -// could not find `util` in `sdk` -/// Helper which wraps `tokio::time::interval` and makes it return a stream -fn tokio_interval_stream(period: std::time::Duration) -> tokio_stream::wrappers::IntervalStream { - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period)) -} - -// https://github.com/open-telemetry/opentelemetry-rust/blob/2585d109bf90d53d57c91e19c758dca8c36f5512/examples/basic-otlp/src/main.rs#L34-L37 -// Skip first immediate tick from tokio, not needed for async_std. -fn delayed_interval(duration: Duration) -> impl Stream { - tokio_interval_stream(duration).skip(0) -} diff --git a/tracing-opentelemetry/tests/trace_state_propagation.rs b/tracing-opentelemetry/tests/trace_state_propagation.rs index 5382411b50..bc80e1364b 100644 --- a/tracing-opentelemetry/tests/trace_state_propagation.rs +++ b/tracing-opentelemetry/tests/trace_state_propagation.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; +use futures_util::future::BoxFuture; use opentelemetry::{ propagation::TextMapPropagator, sdk::{ - export::trace::{SpanData, SpanExporter}, + export::trace::{ExportResult, SpanData, SpanExporter}, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, trace::{Tracer, TracerProvider}, }, @@ -158,15 +158,14 @@ fn build_sampled_context() -> (Context, impl Collect, TestExporter, TracerProvid #[derive(Clone, Default, Debug)] struct TestExporter(Arc>>); -#[async_trait] impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - if let Ok(mut inner) = self.0.lock() { - inner.append(&mut batch); - } - Ok(()) + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + let spans = self.0.clone(); + Box::pin(async move { + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + }) } } From 50cac30aed0dbbf6fc9d3b00c913d37790d279ce Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Wed, 14 Sep 2022 11:41:22 -0700 Subject: [PATCH 2/4] Update MSRV to 1.56 --- .github/workflows/CI.yml | 4 ++++ tracing-opentelemetry/Cargo.toml | 2 +- tracing-opentelemetry/README.md | 4 ++-- tracing-opentelemetry/src/lib.rs | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 829383e3d3..52ce13bffe 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -155,11 +155,15 @@ jobs: toolchain: 1.49.0 - subcrate: tracing-subscriber toolchain: 1.49.0 + - subcrate: tracing-opentelemetry + toolchain: 1.49.0 include: - subcrate: tracing-appender toolchain: 1.53.0 - subcrate: tracing-subscriber toolchain: 1.50.0 + - subcrate: tracing-opentelemetry + toolchain: 1.56.0 steps: - uses: actions/checkout@v3 - name: install Rust nightly diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index 36e51e5a96..33d5830976 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -17,7 +17,7 @@ categories = [ keywords = ["tracing", "opentelemetry", "jaeger", "zipkin", "async"] license = "MIT" edition = "2018" -rust-version = "1.46.0" +rust-version = "1.56.0" [features] default = ["tracing-log", "metrics"] diff --git a/tracing-opentelemetry/README.md b/tracing-opentelemetry/README.md index dade3d7eca..796b2386cf 100644 --- a/tracing-opentelemetry/README.md +++ b/tracing-opentelemetry/README.md @@ -50,7 +50,7 @@ The crate provides the following types: [`tracing`]: https://crates.io/crates/tracing [OpenTelemetry]: https://opentelemetry.io/ -*Compiler support: [requires `rustc` 1.49+][msrv]* +*Compiler support: [requires `rustc` 1.56+][msrv]* [msrv]: #supported-rust-versions @@ -110,7 +110,7 @@ $ firefox http://localhost:16686/ ## Supported Rust Versions Tracing Opentelemetry is built against the latest stable release. The minimum -supported version is 1.46. The current Tracing version is not guaranteed to +supported version is 1.56. The current Tracing version is not guaranteed to build on Rust versions earlier than the minimum supported version. Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/tracing-opentelemetry/src/lib.rs b/tracing-opentelemetry/src/lib.rs index 71017ea8d3..7764c68543 100644 --- a/tracing-opentelemetry/src/lib.rs +++ b/tracing-opentelemetry/src/lib.rs @@ -9,7 +9,7 @@ //! [OpenTelemetry]: https://opentelemetry.io //! [`tracing`]: https://github.com/tokio-rs/tracing //! -//! *Compiler support: [requires `rustc` 1.49+][msrv]* +//! *Compiler support: [requires `rustc` 1.56+][msrv]* //! //! [msrv]: #supported-rust-versions //! @@ -86,7 +86,7 @@ //! ## Supported Rust Versions //! //! Tracing is built against the latest stable release. The minimum supported -//! version is 1.49. The current Tracing version is not guaranteed to build on +//! version is 1.56. The current Tracing version is not guaranteed to build on //! Rust versions earlier than the minimum supported version. //! //! Tracing follows the same compiler support policies as the rest of the Tokio From dd9a1a6970c0dadabdb04594df9bc2dda98e5a6d Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Wed, 14 Sep 2022 11:56:58 -0700 Subject: [PATCH 3/4] Update examples --- examples/Cargo.toml | 4 ++-- examples/examples/opentelemetry.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index cd4eb2a3d6..4d33691a3a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,8 +52,8 @@ inferno = "0.11.6" tempfile = "3.3.0" # opentelemetry example -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } -opentelemetry-jaeger = "0.16.0" +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } +opentelemetry-jaeger = "0.17.0" # fmt examples snafu = "0.6.10" diff --git a/examples/examples/opentelemetry.rs b/examples/examples/opentelemetry.rs index af7661d280..12e96d3f15 100644 --- a/examples/examples/opentelemetry.rs +++ b/examples/examples/opentelemetry.rs @@ -19,7 +19,7 @@ fn main() -> Result<(), Box> { // Install an otel pipeline with a simple span processor that exports data one at a time when // spans end. See the `install_batch` option on each exporter's pipeline builder to see how to // export in batches. - let tracer = opentelemetry_jaeger::new_pipeline() + let tracer = opentelemetry_jaeger::new_agent_pipeline() .with_service_name("report_example") .install_simple()?; let opentelemetry = tracing_opentelemetry::subscriber().with_tracer(tracer); From 67e1fa4a17b4542e8e2f169c5261f2c406868225 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Thu, 15 Sep 2022 16:37:42 -0700 Subject: [PATCH 4/4] Fix async-trait dep --- tracing-opentelemetry/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index 33d5830976..868f0a2d91 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -32,7 +32,11 @@ tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default- tracing-log = { path = "../tracing-log", version = "0.2", default-features = false, optional = true } once_cell = "1.13.0" +# Fix minimal-versions; opentelemetry specifies async-trait = "0.1" which breaks +async-trait = "0.1.20" + [dev-dependencies] +async-trait = "0.1.56" criterion = { version = "0.3.6", default_features = false } opentelemetry-jaeger = "0.17.0" futures-util = { version = "0.3.21", default-features = false }