diff --git a/opentelemetry-otlp/src/transform/metrics.rs b/opentelemetry-otlp/src/transform/metrics.rs index e0aaa418b0..1befe5a520 100644 --- a/opentelemetry-otlp/src/transform/metrics.rs +++ b/opentelemetry-otlp/src/transform/metrics.rs @@ -14,6 +14,7 @@ pub(crate) mod tonic { ArrayAggregator, HistogramAggregator, LastValueAggregator, MinMaxSumCountAggregator, SumAggregator, }; + use opentelemetry::sdk::InstrumentationLibrary; use opentelemetry_proto::tonic::metrics::v1::DataPointFlags; use opentelemetry_proto::tonic::FromNumber; use opentelemetry_proto::tonic::{ @@ -28,7 +29,6 @@ pub(crate) mod tonic { use crate::to_nanos; use crate::transform::{CheckpointedMetrics, ResourceWrapper}; - use opentelemetry::sdk::InstrumentationLibrary; use std::collections::{BTreeMap, HashMap}; pub(crate) fn record_to_metric( @@ -212,14 +212,21 @@ pub(crate) mod tonic { resource_metrics: sink_map .into_iter() .map(|(resource, metric_map)| ResourceMetrics { + schema_url: resource + .schema_url() + .map(|s| s.to_string()) + .unwrap_or_default(), resource: Some(resource.into()), - schema_url: "".to_string(), // todo: replace with actual schema url. instrumentation_library_metrics: metric_map .into_iter() .map( |(instrumentation_library, metrics)| InstrumentationLibraryMetrics { + schema_url: instrumentation_library + .schema_url + .clone() + .unwrap_or_default() + .to_string(), instrumentation_library: Some(instrumentation_library.into()), - schema_url: "".to_string(), // todo: replace with actual schema url. metrics: metrics .into_iter() .map(|(_k, v)| v) @@ -385,7 +392,7 @@ mod tests { version: instrumentation_version.unwrap_or("").to_string(), }, ), - schema_url: "".to_string(), // todo: replace with actual schema url. + schema_url: "".to_string(), metrics: metrics .into_iter() .map(|(name, data_points)| get_metric_with_name(name, data_points)) @@ -394,7 +401,7 @@ mod tests { } ResourceMetrics { resource: Some(resource), - schema_url: "".to_string(), // todo: replace with actual schema url. + schema_url: "".to_string(), instrumentation_library_metrics, } } diff --git a/opentelemetry-otlp/src/transform/resource.rs b/opentelemetry-otlp/src/transform/resource.rs index 013ee840e0..d83816eb2e 100644 --- a/opentelemetry-otlp/src/transform/resource.rs +++ b/opentelemetry-otlp/src/transform/resource.rs @@ -25,6 +25,14 @@ impl PartialOrd for ResourceWrapper { } } +impl ResourceWrapper { + #[cfg(all(feature = "grpc-tonic", feature = "metrics"))] + // it's currently only used by metrics. Trace set this in opentelemtry-proto + pub(crate) fn schema_url(&self) -> Option<&str> { + self.0.schema_url() + } +} + #[cfg(feature = "grpc-tonic")] impl From for Resource { fn from(resource: ResourceWrapper) -> Self { diff --git a/opentelemetry-proto/src/transform/traces.rs b/opentelemetry-proto/src/transform/traces.rs index 57a9c034bf..1a4634ffb2 100644 --- a/opentelemetry-proto/src/transform/traces.rs +++ b/opentelemetry-proto/src/transform/traces.rs @@ -57,7 +57,10 @@ pub mod tonic { .0, dropped_attributes_count: 0, }), - schema_url: "".to_string(), // todo: replace with actual schema url. + schema_url: source_span + .resource + .and_then(|resource| resource.schema_url().map(|url| url.to_string())) + .unwrap_or_default(), instrumentation_library_spans: vec![InstrumentationLibrarySpans { schema_url: source_span .instrumentation_lib diff --git a/opentelemetry-sdk/src/resource/mod.rs b/opentelemetry-sdk/src/resource/mod.rs index 48e1b0ee50..ccbf4dbca5 100644 --- a/opentelemetry-sdk/src/resource/mod.rs +++ b/opentelemetry-sdk/src/resource/mod.rs @@ -30,6 +30,7 @@ pub use process::ProcessResourceDetector; #[cfg(feature = "metrics")] use opentelemetry_api::attributes; use opentelemetry_api::{Key, KeyValue, Value}; +use std::borrow::Cow; use std::collections::{btree_map, BTreeMap}; use std::ops::Deref; use std::time::Duration; @@ -38,6 +39,7 @@ use std::time::Duration; #[derive(Clone, Debug, PartialEq)] pub struct Resource { attrs: BTreeMap, + schema_url: Option>, } impl Default for Resource { @@ -54,6 +56,7 @@ impl Resource { pub fn empty() -> Self { Self { attrs: Default::default(), + schema_url: None, } } @@ -71,6 +74,24 @@ impl Resource { resource } + /// Create a new `Resource` from a key value pairs and [schema url]. + /// + /// Values are de-duplicated by key, and the first key-value pair with a non-empty string value + /// will be retained. + /// + /// schema_url must be a valid URL using HTTP or HTTPS protocol. + /// + /// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url + pub fn from_schema_url(kvs: KV, schema_url: S) -> Self + where + KV: IntoIterator, + S: Into>, + { + let mut resource = Self::new(kvs); + resource.schema_url = Some(schema_url.into()); + resource + } + /// Create a new `Resource` from resource detectors. /// /// timeout will be applied to each detector. @@ -89,8 +110,19 @@ impl Resource { /// Create a new `Resource` by combining two resources. /// + /// ### Key value pairs /// Keys from the `other` resource have priority over keys from this resource, even if the /// updated value is empty. + /// + /// ### [Schema url] + /// If both of the resource are not empty. Schema url is determined by the following rules, in order: + /// 1. If this resource has a schema url, it will be used. + /// 2. If this resource does not have a schema url, and the other resource has a schema url, it will be used. + /// 3. If both resources have a schema url and it's the same, it will be used. + /// 4. If both resources have a schema url and it's different, the schema url will be empty. + /// 5. If both resources do not have a schema url, the schema url will be empty. + /// + /// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url pub fn merge>(&self, other: T) -> Self { if self.attrs.is_empty() { return other.clone(); @@ -109,9 +141,31 @@ impl Resource { resource.attrs.insert(k.clone(), v.clone()); } + if self.schema_url == other.schema_url { + resource.schema_url = self.schema_url.clone(); + } else if self.schema_url.is_none() { + // if the other resource has schema url, use it. + if other.schema_url.is_some() { + resource.schema_url = other.schema_url.clone(); + } + // else empty schema url. + } else { + // if self has schema url, use it. + if other.schema_url.is_none() { + resource.schema_url = self.schema_url.clone(); + } + } + resource } + /// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`. + /// + /// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url + pub fn schema_url(&self) -> Option<&str> { + self.schema_url.as_ref().map(|s| s.as_ref()) + } + /// Returns the number of attributes for this resource pub fn len(&self) -> usize { self.attrs.len() @@ -215,13 +269,14 @@ mod tests { assert_eq!( Resource::new(args_with_dupe_keys), Resource { - attrs: expected_attrs + attrs: expected_attrs, + schema_url: None, } ); } #[test] - fn merge_resource() { + fn merge_resource_key_value_pairs() { let resource_a = Resource::new(vec![ KeyValue::new("a", ""), KeyValue::new("b", "b-value"), @@ -243,11 +298,47 @@ mod tests { assert_eq!( resource_a.merge(&resource_b), Resource { - attrs: expected_attrs + attrs: expected_attrs, + schema_url: None, } ); } + #[test] + fn merge_resource_schema_url() { + // if both resources contains key value pairs + let test_cases = vec![ + (Some("http://schema/a"), None, Some("http://schema/a")), + (Some("http://schema/a"), Some("http://schema/b"), None), + (None, Some("http://schema/b"), Some("http://schema/b")), + ( + Some("http://schema/a"), + Some("http://schema/a"), + Some("http://schema/a"), + ), + (None, None, None), + ]; + + for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() { + let mut resource = Resource::new(vec![KeyValue::new("key", "")]); + resource.schema_url = schema_url.map(Into::into); + + let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]); + other_resource.schema_url = other_schema_url.map(Into::into); + + assert_eq!( + resource.merge(&other_resource).schema_url, + expect_schema_url.map(Into::into) + ); + } + + // if only one resource contains key value pairs + let resource = Resource::from_schema_url(vec![], "http://schema/a"); + let other_resource = Resource::new(vec![KeyValue::new("key", "")]); + + assert_eq!(resource.merge(&other_resource).schema_url, None); + } + #[test] fn detect_resource() { env::set_var("OTEL_RESOURCE_ATTRIBUTES", "key=value, k = v , a= x, a=z"); @@ -262,7 +353,7 @@ mod tests { KeyValue::new("key", "value"), KeyValue::new("k", "v"), KeyValue::new("a", "x"), - KeyValue::new("a", "z") + KeyValue::new("a", "z"), ]) ) }