diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 829383e3d3..52ce13bffe 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -155,11 +155,15 @@ jobs: toolchain: 1.49.0 - subcrate: tracing-subscriber toolchain: 1.49.0 + - subcrate: tracing-opentelemetry + toolchain: 1.49.0 include: - subcrate: tracing-appender toolchain: 1.53.0 - subcrate: tracing-subscriber toolchain: 1.50.0 + - subcrate: tracing-opentelemetry + toolchain: 1.56.0 steps: - uses: actions/checkout@v3 - name: install Rust nightly diff --git a/examples/Cargo.toml b/examples/Cargo.toml index cd4eb2a3d6..4d33691a3a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,8 +52,8 @@ inferno = "0.11.6" tempfile = "3.3.0" # opentelemetry example -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } -opentelemetry-jaeger = "0.16.0" +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } +opentelemetry-jaeger = "0.17.0" # fmt examples snafu = "0.6.10" diff --git a/examples/examples/opentelemetry.rs b/examples/examples/opentelemetry.rs index af7661d280..12e96d3f15 100644 --- a/examples/examples/opentelemetry.rs +++ b/examples/examples/opentelemetry.rs @@ -19,7 +19,7 @@ fn main() -> Result<(), Box> { // Install an otel pipeline with a simple span processor that exports data one at a time when // spans end. See the `install_batch` option on each exporter's pipeline builder to see how to // export in batches. - let tracer = opentelemetry_jaeger::new_pipeline() + let tracer = opentelemetry_jaeger::new_agent_pipeline() .with_service_name("report_example") .install_simple()?; let opentelemetry = tracing_opentelemetry::subscriber().with_tracer(tracer); diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index ec7d4a8606..868f0a2d91 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -17,7 +17,7 @@ categories = [ keywords = ["tracing", "opentelemetry", "jaeger", "zipkin", "async"] license = "MIT" edition = "2018" -rust-version = "1.46.0" +rust-version = "1.56.0" [features] default = ["tracing-log", "metrics"] @@ -25,7 +25,7 @@ default = ["tracing-log", "metrics"] metrics = ["opentelemetry/metrics"] [dependencies] -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } tracing = { path = "../tracing", version = "0.2", default-features = false, features = ["std"] } tracing-core = { path = "../tracing-core", version = "0.2" } tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default-features = false, features = ["registry", "std"] } @@ -38,7 +38,7 @@ async-trait = "0.1.20" [dev-dependencies] async-trait = "0.1.56" criterion = { version = "0.3.6", default_features = false } -opentelemetry-jaeger = "0.16.0" +opentelemetry-jaeger = "0.17.0" futures-util = { version = "0.3.21", default-features = false } tokio = { version = "1.20.0", features = ["full"] } tokio-stream = "0.1.9" @@ -52,4 +52,4 @@ harness = false [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file +rustdoc-args = ["--cfg", "docsrs"] diff --git a/tracing-opentelemetry/README.md b/tracing-opentelemetry/README.md index dade3d7eca..796b2386cf 100644 --- a/tracing-opentelemetry/README.md +++ b/tracing-opentelemetry/README.md @@ -50,7 +50,7 @@ The crate provides the following types: [`tracing`]: https://crates.io/crates/tracing [OpenTelemetry]: https://opentelemetry.io/ -*Compiler support: [requires `rustc` 1.49+][msrv]* +*Compiler support: [requires `rustc` 1.56+][msrv]* [msrv]: #supported-rust-versions @@ -110,7 +110,7 @@ $ firefox http://localhost:16686/ ## Supported Rust Versions Tracing Opentelemetry is built against the latest stable release. The minimum -supported version is 1.46. The current Tracing version is not guaranteed to +supported version is 1.56. The current Tracing version is not guaranteed to build on Rust versions earlier than the minimum supported version. Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/tracing-opentelemetry/src/lib.rs b/tracing-opentelemetry/src/lib.rs index 71017ea8d3..7764c68543 100644 --- a/tracing-opentelemetry/src/lib.rs +++ b/tracing-opentelemetry/src/lib.rs @@ -9,7 +9,7 @@ //! [OpenTelemetry]: https://opentelemetry.io //! [`tracing`]: https://github.com/tokio-rs/tracing //! -//! *Compiler support: [requires `rustc` 1.49+][msrv]* +//! *Compiler support: [requires `rustc` 1.56+][msrv]* //! //! [msrv]: #supported-rust-versions //! @@ -86,7 +86,7 @@ //! ## Supported Rust Versions //! //! Tracing is built against the latest stable release. The minimum supported -//! version is 1.49. The current Tracing version is not guaranteed to build on +//! version is 1.56. The current Tracing version is not guaranteed to build on //! Rust versions earlier than the minimum supported version. //! //! Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/tracing-opentelemetry/src/metrics.rs b/tracing-opentelemetry/src/metrics.rs index 76c0ed2d37..147a9f8833 100644 --- a/tracing-opentelemetry/src/metrics.rs +++ b/tracing-opentelemetry/src/metrics.rs @@ -3,8 +3,9 @@ use tracing::{field::Visit, Collect}; use tracing_core::Field; use opentelemetry::{ - metrics::{Counter, Meter, MeterProvider, UpDownCounter, ValueRecorder}, - sdk::metrics::PushController, + metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, + sdk::metrics::controllers::BasicController, + Context as OtelContext, }; use tracing_subscriber::{registry::LookupSpan, subscribe::Context, Subscribe}; @@ -13,7 +14,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter."; const METRIC_PREFIX_COUNTER: &str = "counter."; -const METRIC_PREFIX_VALUE: &str = "value."; +const METRIC_PREFIX_HISTOGRAM: &str = "histogram."; const I64_MAX: u64 = i64::MAX as u64; #[derive(Default)] @@ -22,9 +23,9 @@ pub(crate) struct Instruments { f64_counter: MetricsMap>, i64_up_down_counter: MetricsMap>, f64_up_down_counter: MetricsMap>, - u64_value_recorder: MetricsMap>, - i64_value_recorder: MetricsMap>, - f64_value_recorder: MetricsMap>, + u64_histogram: MetricsMap>, + i64_histogram: MetricsMap>, + f64_histogram: MetricsMap>, } type MetricsMap = RwLock>; @@ -35,14 +36,15 @@ pub(crate) enum InstrumentType { CounterF64(f64), UpDownCounterI64(i64), UpDownCounterF64(f64), - ValueRecorderU64(u64), - ValueRecorderI64(i64), - ValueRecorderF64(f64), + HistogramU64(u64), + HistogramI64(i64), + HistogramF64(f64), } impl Instruments { pub(crate) fn update_metric( &self, + cx: &OtelContext, meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, @@ -76,7 +78,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::CounterF64(value) => { @@ -84,7 +86,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterI64(value) => { @@ -92,7 +94,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterF64(value) => { @@ -100,31 +102,31 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } - InstrumentType::ValueRecorderU64(value) => { + InstrumentType::HistogramU64(value) => { update_or_insert( - &self.u64_value_recorder, + &self.u64_histogram, metric_name, - || meter.u64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.u64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderI64(value) => { + InstrumentType::HistogramI64(value) => { update_or_insert( - &self.i64_value_recorder, + &self.i64_histogram, metric_name, - || meter.i64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.i64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderF64(value) => { + InstrumentType::HistogramF64(value) => { update_or_insert( - &self.f64_value_recorder, + &self.f64_histogram, metric_name, - || meter.f64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.f64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } }; @@ -142,8 +144,10 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_u64(&mut self, field: &Field, value: u64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value), metric_name, @@ -151,6 +155,7 @@ impl<'a> Visit for MetricVisitor<'a> { } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value as i64), metric_name, @@ -163,54 +168,63 @@ impl<'a> Visit for MetricVisitor<'a> { value ); } - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderU64(value), + InstrumentType::HistogramU64(value), metric_name, ); } } fn record_f64(&mut self, field: &Field, value: f64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterF64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderF64(value), + InstrumentType::HistogramF64(value), metric_name, ); } } fn record_i64(&mut self, field: &Field, value: i64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value as u64), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderI64(value), + InstrumentType::HistogramI64(value), metric_name, ); } @@ -232,14 +246,14 @@ impl<'a> Visit for MetricVisitor<'a> { /// use tracing_opentelemetry::MetricsSubscriber; /// use tracing_subscriber::subscribe::CollectExt; /// use tracing_subscriber::Registry; -/// # use opentelemetry::sdk::metrics::PushController; +/// # use opentelemetry::sdk::metrics::controllers::BasicController; /// -/// // Constructing a PushController is out-of-scope for the docs here, but there +/// // Constructing a BasicController is out-of-scope for the docs here, but there /// // are examples in the opentelemetry repository. See: -/// // https://github.com/open-telemetry/opentelemetry-rust/blob/c13a11e62a68eacd8c41a0742a0d097808e28fbd/examples/basic-otlp/src/main.rs#L39-L53 -/// # let push_controller: PushController = unimplemented!(); +/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52 +/// # let controller: BasicController = unimplemented!(); /// -/// let opentelemetry_metrics = MetricsSubscriber::new(push_controller); +/// let opentelemetry_metrics = MetricsSubscriber::new(controller); /// let collector = Registry::default().with(opentelemetry_metrics); /// tracing::collect::set_global_default(collector).unwrap(); /// ``` @@ -328,10 +342,9 @@ pub struct MetricsSubscriber { impl MetricsSubscriber { /// Create a new instance of MetricsSubscriber. - pub fn new(push_controller: PushController) -> Self { - let meter = push_controller - .provider() - .meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION)); + pub fn new(controller: BasicController) -> Self { + let meter = + controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None); MetricsSubscriber { meter, instruments: Default::default(), diff --git a/tracing-opentelemetry/src/subscriber.rs b/tracing-opentelemetry/src/subscriber.rs index f0936271e8..6450b7f7bd 100644 --- a/tracing-opentelemetry/src/subscriber.rs +++ b/tracing-opentelemetry/src/subscriber.rs @@ -1,7 +1,7 @@ use crate::{OtelData, PreSampledTracer}; use once_cell::unsync; use opentelemetry::{ - trace::{self as otel, noop, TraceContextExt}, + trace::{self as otel, noop, OrderMap, TraceContextExt}, Context as OtelContext, Key, KeyValue, Value, }; use std::fmt; @@ -100,12 +100,11 @@ fn str_to_span_kind(s: &str) -> Option { } } -fn str_to_status_code(s: &str) -> Option { +fn str_to_status(s: &str) -> otel::Status { match s { - s if s.eq_ignore_ascii_case("unset") => Some(otel::StatusCode::Unset), - s if s.eq_ignore_ascii_case("ok") => Some(otel::StatusCode::Ok), - s if s.eq_ignore_ascii_case("error") => Some(otel::StatusCode::Error), - _ => None, + s if s.eq_ignore_ascii_case("ok") => otel::Status::Ok, + s if s.eq_ignore_ascii_case("error") => otel::Status::error(""), + _ => otel::Status::Unset, } } @@ -199,7 +198,7 @@ impl<'a> SpanAttributeVisitor<'a> { fn record(&mut self, attribute: KeyValue) { debug_assert!(self.0.attributes.is_some()); if let Some(v) = self.0.attributes.as_mut() { - v.push(attribute); + v.insert(attribute.key, attribute.value); } } } @@ -233,8 +232,8 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.0.name = value.to_string().into(), SPAN_KIND_FIELD => self.0.span_kind = str_to_span_kind(value), - SPAN_STATUS_CODE_FIELD => self.0.status_code = str_to_status_code(value), - SPAN_STATUS_MESSAGE_FIELD => self.0.status_message = Some(value.to_owned().into()), + SPAN_STATUS_CODE_FIELD => self.0.status = str_to_status(value), + SPAN_STATUS_MESSAGE_FIELD => self.0.status = otel::Status::error(value.to_string()), _ => self.record(KeyValue::new(field.name(), value.to_string())), } } @@ -247,11 +246,9 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.0.name = format!("{:?}", value).into(), SPAN_KIND_FIELD => self.0.span_kind = str_to_span_kind(&format!("{:?}", value)), - SPAN_STATUS_CODE_FIELD => { - self.0.status_code = str_to_status_code(&format!("{:?}", value)) - } + SPAN_STATUS_CODE_FIELD => self.0.status = str_to_status(&format!("{:?}", value)), SPAN_STATUS_MESSAGE_FIELD => { - self.0.status_message = Some(format!("{:?}", value).into()) + self.0.status = otel::Status::error(format!("{:?}", value)) } _ => self.record(Key::new(field.name()).string(format!("{:?}", value))), } @@ -277,7 +274,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -314,7 +311,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -506,7 +503,7 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } - let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( + let builder_attrs = builder.attributes.get_or_insert(OrderMap::with_capacity( attrs.fields().len() + self.extra_span_attrs(), )); @@ -514,26 +511,26 @@ where let meta = attrs.metadata(); if let Some(filename) = meta.file() { - builder_attrs.push(KeyValue::new("code.filepath", filename)); + builder_attrs.insert("code.filepath".into(), filename.into()); } if let Some(module) = meta.module_path() { - builder_attrs.push(KeyValue::new("code.namespace", module)); + builder_attrs.insert("code.namespace".into(), module.into()); } if let Some(line) = meta.line() { - builder_attrs.push(KeyValue::new("code.lineno", line as i64)); + builder_attrs.insert("code.lineno".into(), (line as i64).into()); } } if self.with_threads { - THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + THREAD_ID.with(|id| builder_attrs.insert("thread.id".into(), (**id as i64).into())); if let Some(name) = std::thread::current().name() { // TODO(eliza): it's a bummer that we have to allocate here, but // we can't easily get the string as a `static`. it would be // nice if `opentelemetry` could also take `Arc`s as // `String` values... - builder_attrs.push(KeyValue::new("thread.name", name.to_owned())); + builder_attrs.insert("thread.name".into(), name.to_owned().into()); } } @@ -653,8 +650,10 @@ where let mut extensions = span.extensions_mut(); if let Some(OtelData { builder, .. }) = extensions.get_mut::() { - if builder.status_code.is_none() && *meta.level() == tracing_core::Level::ERROR { - builder.status_code = Some(otel::StatusCode::Error); + if builder.status == otel::Status::Unset + && *meta.level() == tracing_core::Level::ERROR + { + builder.status = otel::Status::error("") } if self.location { @@ -712,15 +711,14 @@ where if self.tracked_inactivity { // Append busy/idle timings when enabled. if let Some(timings) = extensions.get_mut::() { - let busy_ns = KeyValue::new("busy_ns", timings.busy); - let idle_ns = KeyValue::new("idle_ns", timings.idle); - - if let Some(ref mut attributes) = builder.attributes { - attributes.push(busy_ns); - attributes.push(idle_ns); - } else { - builder.attributes = Some(vec![busy_ns, idle_ns]); - } + let busy_ns = Key::new("busy_ns"); + let idle_ns = Key::new("idle_ns"); + + let attributes = builder + .attributes + .get_or_insert_with(|| OrderMap::with_capacity(2)); + attributes.insert(busy_ns, timings.busy.into()); + attributes.insert(idle_ns, timings.idle.into()); } } @@ -773,7 +771,7 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 { mod tests { use super::*; use crate::OtelData; - use opentelemetry::trace::{noop, SpanKind, TraceFlags}; + use opentelemetry::trace::{noop, TraceFlags}; use std::{ borrow::Cow, collections::HashMap, @@ -849,7 +847,7 @@ mod tests { false } fn set_attribute(&mut self, _attribute: KeyValue) {} - fn set_status(&mut self, _code: otel::StatusCode, _message: String) {} + fn set_status(&mut self, _status: otel::Status) {} fn update_name>>(&mut self, _new_name: T) {} fn end_with_timestamp(&mut self, _timestamp: SystemTime) {} } @@ -881,7 +879,7 @@ mod tests { tracing_subscriber::registry().with(subscriber().with_tracer(tracer.clone())); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone()); @@ -895,11 +893,19 @@ mod tests { tracing_subscriber::registry().with(subscriber().with_tracer(tracer.clone())); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok); + tracing::debug_span!("request", otel.status_code = ?otel::Status::Ok); }); + let recorded_status = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); - let recorded_status_code = tracer.with_data(|data| data.builder.status_code); - assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok)) + assert_eq!(recorded_status, otel::Status::Ok) } #[test] @@ -914,8 +920,17 @@ mod tests { tracing::debug_span!("request", otel.status_message = message); }); - let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone()); - assert_eq!(recorded_status_message, Some(message.into())) + let recorded_status_message = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); + + assert_eq!(recorded_status_message, otel::Status::error(message)) } #[test] @@ -934,7 +949,7 @@ mod tests { let _g = existing_cx.attach(); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_trace_id = @@ -958,7 +973,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"idle_ns")); assert!(keys.contains(&"busy_ns")); @@ -977,7 +992,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"code.filepath")); assert!(keys.contains(&"code.namespace")); @@ -1000,7 +1015,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"code.filepath")); assert!(!keys.contains(&"code.namespace")); @@ -1012,7 +1027,7 @@ mod tests { let thread = thread::current(); let expected_name = thread .name() - .map(|name| Value::String(Cow::Owned(name.to_owned()))); + .map(|name| Value::String(name.to_owned().into())); let expected_id = Value::I64(thread_id_integer(thread.id()) as i64); let tracer = TestTracer(Arc::new(Mutex::new(None))); @@ -1026,7 +1041,7 @@ mod tests { let attributes = tracer .with_data(|data| data.builder.attributes.as_ref().unwrap().clone()) .drain(..) - .map(|keyval| (keyval.key.as_str().to_string(), keyval.value)) + .map(|(key, value)| (key.as_str().to_string(), value)) .collect::>(); assert_eq!(attributes.get("thread.name"), expected_name.as_ref()); assert_eq!(attributes.get("thread.id"), Some(&expected_id)); @@ -1045,7 +1060,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"thread.name")); assert!(!keys.contains(&"thread.id")); diff --git a/tracing-opentelemetry/src/tracer.rs b/tracing-opentelemetry/src/tracer.rs index 23e574450d..52513c36ad 100644 --- a/tracing-opentelemetry/src/tracer.rs +++ b/tracing-opentelemetry/src/tracer.rs @@ -1,9 +1,10 @@ -use opentelemetry::sdk::trace::{SamplingDecision, SamplingResult, Tracer, TracerProvider}; +use opentelemetry::sdk::trace::{Tracer, TracerProvider}; +use opentelemetry::trace::OrderMap; use opentelemetry::{ trace as otel, trace::{ - noop, SpanBuilder, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, - TraceState, + noop, SamplingDecision, SamplingResult, SpanBuilder, SpanContext, SpanId, SpanKind, + TraceContextExt, TraceFlags, TraceId, TraceState, }, Context as OtelContext, }; @@ -73,19 +74,18 @@ impl PreSampledTracer for Tracer { let builder = &mut data.builder; // Gather trace state - let (no_parent, trace_id, remote_parent, parent_trace_flags) = - current_trace_state(builder, parent_cx, &provider); + let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider); // Sample or defer to existing sampling decisions let (flags, trace_state) = if let Some(result) = &builder.sampling_result { process_sampling_result(result, parent_trace_flags) - } else if no_parent || remote_parent { + } else { builder.sampling_result = Some(provider.config().sampler.should_sample( Some(parent_cx), trace_id, &builder.name, builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal), - builder.attributes.as_deref().unwrap_or(&[]), + builder.attributes.as_ref().unwrap_or(&OrderMap::default()), builder.links.as_deref().unwrap_or(&[]), self.instrumentation_library(), )); @@ -94,12 +94,6 @@ impl PreSampledTracer for Tracer { builder.sampling_result.as_ref().unwrap(), parent_trace_flags, ) - } else { - // has parent that is local - Some(( - parent_trace_flags, - parent_cx.span().span_context().trace_state().clone(), - )) } .unwrap_or_default(); @@ -125,18 +119,16 @@ fn current_trace_state( builder: &SpanBuilder, parent_cx: &OtelContext, provider: &TracerProvider, -) -> (bool, TraceId, bool, TraceFlags) { +) -> (TraceId, TraceFlags) { if parent_cx.has_active_span() { let span = parent_cx.span(); let sc = span.span_context(); - (false, sc.trace_id(), sc.is_remote(), sc.trace_flags()) + (sc.trace_id(), sc.trace_flags()) } else { ( - true, builder .trace_id .unwrap_or_else(|| provider.config().id_generator.new_trace_id()), - false, Default::default(), ) } diff --git a/tracing-opentelemetry/tests/metrics_publishing.rs b/tracing-opentelemetry/tests/metrics_publishing.rs index 9b8d8a6bab..c3a143987b 100644 --- a/tracing-opentelemetry/tests/metrics_publishing.rs +++ b/tracing-opentelemetry/tests/metrics_publishing.rs @@ -1,26 +1,21 @@ -#![cfg(feature = "metrics")] -use async_trait::async_trait; -use futures_util::{Stream, StreamExt as _}; use opentelemetry::{ - metrics::{Descriptor, InstrumentKind}, - metrics::{Number, NumberKind}, + metrics::MetricsError, sdk::{ - export::{ - metrics::{ - CheckpointSet, ExportKind, ExportKindFor, ExportKindSelector, - Exporter as MetricsExporter, Points, Sum, - }, - trace::{SpanData, SpanExporter}, + export::metrics::{ + aggregation::{self, Histogram, Sum, TemporalitySelector}, + InstrumentationLibraryReader, }, metrics::{ - aggregators::{ArrayAggregator, SumAggregator}, - selectors::simple::Selector, + aggregators::{HistogramAggregator, SumAggregator}, + controllers::BasicController, + processors, + sdk_api::{Descriptor, InstrumentKind, Number, NumberKind}, + selectors, }, }, - Key, Value, + Context, }; use std::cmp::Ordering; -use std::time::Duration; use tracing::Collect; use tracing_opentelemetry::MetricsSubscriber; use tracing_subscriber::prelude::*; @@ -30,7 +25,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -40,11 +35,13 @@ async fn u64_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world2".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -54,11 +51,13 @@ async fn u64_counter_is_exported_i64_at_instrumentation_point() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "float_hello_world".to_string(), InstrumentKind::Counter, NumberKind::F64, @@ -68,11 +67,13 @@ async fn f64_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.float_hello_world = 1.000000123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -82,11 +83,13 @@ async fn i64_up_down_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak2".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -96,11 +99,13 @@ async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, NumberKind::F64, @@ -110,13 +115,15 @@ async fn f64_up_down_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak_blah = 99.123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn u64_value_is_exported() { - let subscriber = init_subscriber( +async fn u64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::U64, Number::from(9_u64), ); @@ -124,13 +131,15 @@ async fn u64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg = 9_u64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn i64_value_is_exported() { - let subscriber = init_subscriber( +async fn i64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_auenatsou".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::I64, Number::from(-19_i64), ); @@ -138,13 +147,15 @@ async fn i64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg_auenatsou = -19_i64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn f64_value_is_exported() { - let subscriber = init_subscriber( +async fn f64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_racecar".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::F64, Number::from(777.0012_f64), ); @@ -152,6 +163,8 @@ async fn f64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg_racecar = 777.0012_f64); }); + + exporter.export().unwrap(); } fn init_subscriber( @@ -159,24 +172,25 @@ fn init_subscriber( expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, -) -> impl Collect + 'static { +) -> (impl Collect + 'static, TestExporter) { + let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory( + selectors::simple::histogram(vec![-10.0, 100.0]), + aggregation::cumulative_temporality_selector(), + )) + .build(); + let exporter = TestExporter { expected_metric_name, expected_instrument_kind, expected_number_kind, expected_value, + controller: controller.clone(), }; - let push_controller = opentelemetry::sdk::metrics::controllers::push( - Selector::Exact, - ExportKindSelector::Stateless, + ( + tracing_subscriber::registry().with(MetricsSubscriber::new(controller)), exporter, - tokio::spawn, - delayed_interval, ) - .build(); - - tracing_subscriber::registry().with(MetricsSubscriber::new(push_controller)) } #[derive(Clone, Debug)] @@ -185,100 +199,84 @@ struct TestExporter { expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, + controller: BasicController, } -#[async_trait] -impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut _batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - Ok(()) - } -} +impl TestExporter { + fn export(&self) -> Result<(), MetricsError> { + self.controller.collect(&Context::current())?; + self.controller.try_for_each(&mut |library, reader| { + reader.try_for_each(self, &mut |record| { + assert_eq!(self.expected_metric_name, record.descriptor().name()); + assert_eq!( + self.expected_instrument_kind, + *record.descriptor().instrument_kind() + ); + assert_eq!( + self.expected_number_kind, + *record.descriptor().number_kind() + ); + match self.expected_instrument_kind { + InstrumentKind::Counter | InstrumentKind::UpDownCounter => { + let number = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .sum() + .unwrap(); + + assert_eq!( + Ordering::Equal, + number + .partial_cmp(&NumberKind::U64, &self.expected_value) + .unwrap() + ); + } + InstrumentKind::Histogram => { + let histogram = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .histogram() + .unwrap(); + + let counts = histogram.counts(); + if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 { + assert_eq!(counts, &[0.0, 0.0, 1.0]); + } else if self.expected_value.to_i64(&self.expected_number_kind) > 0 { + assert_eq!(counts, &[0.0, 1.0, 0.0]); + } else { + assert_eq!(counts, &[1.0, 0.0, 0.0]); + } + } + _ => panic!( + "InstrumentKind {:?} not currently supported!", + self.expected_instrument_kind + ), + }; -impl MetricsExporter for TestExporter { - fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> opentelemetry::metrics::Result<()> { - checkpoint_set.try_for_each(self, &mut |record| { - assert_eq!(self.expected_metric_name, record.descriptor().name()); - assert_eq!( - self.expected_instrument_kind, - *record.descriptor().instrument_kind() - ); - assert_eq!( - self.expected_number_kind, - *record.descriptor().number_kind() - ); - let number = match self.expected_instrument_kind { - InstrumentKind::Counter | InstrumentKind::UpDownCounter => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .sum() - .unwrap(), - InstrumentKind::ValueRecorder => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .points() - .unwrap()[0] - .clone(), - _ => panic!( - "InstrumentKind {:?} not currently supported!", - self.expected_instrument_kind - ), - }; - assert_eq!( - Ordering::Equal, - number - .partial_cmp(&NumberKind::U64, &self.expected_value) - .unwrap() - ); - - // The following are the same regardless of the individual metric. - assert_eq!( - INSTRUMENTATION_LIBRARY_NAME, - record.descriptor().instrumentation_library().name - ); - assert_eq!( - CARGO_PKG_VERSION, - record.descriptor().instrumentation_version().unwrap() - ); - assert_eq!( - Value::String("unknown_service".into()), - record - .resource() - .get(Key::new("service.name".to_string())) - .unwrap() - ); - - opentelemetry::metrics::Result::Ok(()) + // The following are the same regardless of the individual metric. + assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name); + assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap()); + + Ok(()) + }) }) } } -impl ExportKindFor for TestExporter { - fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind { +impl TemporalitySelector for TestExporter { + fn temporality_for( + &self, + _descriptor: &Descriptor, + _kind: &aggregation::AggregationKind, + ) -> aggregation::Temporality { // I don't think the value here makes a difference since // we are just testing a single metric. - ExportKind::Cumulative + aggregation::Temporality::Cumulative } } - -// From opentelemetry::sdk::util:: -// For some reason I can't pull it in from the other crate, it gives -// could not find `util` in `sdk` -/// Helper which wraps `tokio::time::interval` and makes it return a stream -fn tokio_interval_stream(period: std::time::Duration) -> tokio_stream::wrappers::IntervalStream { - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period)) -} - -// https://github.com/open-telemetry/opentelemetry-rust/blob/2585d109bf90d53d57c91e19c758dca8c36f5512/examples/basic-otlp/src/main.rs#L34-L37 -// Skip first immediate tick from tokio, not needed for async_std. -fn delayed_interval(duration: Duration) -> impl Stream { - tokio_interval_stream(duration).skip(0) -} diff --git a/tracing-opentelemetry/tests/trace_state_propagation.rs b/tracing-opentelemetry/tests/trace_state_propagation.rs index 5382411b50..bc80e1364b 100644 --- a/tracing-opentelemetry/tests/trace_state_propagation.rs +++ b/tracing-opentelemetry/tests/trace_state_propagation.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; +use futures_util::future::BoxFuture; use opentelemetry::{ propagation::TextMapPropagator, sdk::{ - export::trace::{SpanData, SpanExporter}, + export::trace::{ExportResult, SpanData, SpanExporter}, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, trace::{Tracer, TracerProvider}, }, @@ -158,15 +158,14 @@ fn build_sampled_context() -> (Context, impl Collect, TestExporter, TracerProvid #[derive(Clone, Default, Debug)] struct TestExporter(Arc>>); -#[async_trait] impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - if let Ok(mut inner) = self.0.lock() { - inner.append(&mut batch); - } - Ok(()) + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + let spans = self.0.clone(); + Box::pin(async move { + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + }) } }