Skip to content

Commit

Permalink
feat(common): add schema_url to resource. (#775)
Browse files Browse the repository at this point in the history
* feat(common): add schema_url to resource.

* doc(common): update links in doc.

* fix(common): typo

* fix(common): return Option for schema_url

* fix(common): use &str for better performance.

* make clippy happy
  • Loading branch information
TommyCpp committed Apr 17, 2022
1 parent 74d0c2d commit fdf6401
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 10 deletions.
17 changes: 12 additions & 5 deletions opentelemetry-otlp/src/transform/metrics.rs
Expand Up @@ -14,6 +14,7 @@ pub(crate) mod tonic {
ArrayAggregator, HistogramAggregator, LastValueAggregator, MinMaxSumCountAggregator,
SumAggregator,
};
use opentelemetry::sdk::InstrumentationLibrary;
use opentelemetry_proto::tonic::metrics::v1::DataPointFlags;
use opentelemetry_proto::tonic::FromNumber;
use opentelemetry_proto::tonic::{
Expand All @@ -28,7 +29,6 @@ pub(crate) mod tonic {

use crate::to_nanos;
use crate::transform::{CheckpointedMetrics, ResourceWrapper};
use opentelemetry::sdk::InstrumentationLibrary;
use std::collections::{BTreeMap, HashMap};

pub(crate) fn record_to_metric(
Expand Down Expand Up @@ -212,14 +212,21 @@ pub(crate) mod tonic {
resource_metrics: sink_map
.into_iter()
.map(|(resource, metric_map)| ResourceMetrics {
schema_url: resource
.schema_url()
.map(|s| s.to_string())
.unwrap_or_default(),
resource: Some(resource.into()),
schema_url: "".to_string(), // todo: replace with actual schema url.
instrumentation_library_metrics: metric_map
.into_iter()
.map(
|(instrumentation_library, metrics)| InstrumentationLibraryMetrics {
schema_url: instrumentation_library
.schema_url
.clone()
.unwrap_or_default()
.to_string(),
instrumentation_library: Some(instrumentation_library.into()),
schema_url: "".to_string(), // todo: replace with actual schema url.
metrics: metrics
.into_iter()
.map(|(_k, v)| v)
Expand Down Expand Up @@ -385,7 +392,7 @@ mod tests {
version: instrumentation_version.unwrap_or("").to_string(),
},
),
schema_url: "".to_string(), // todo: replace with actual schema url.
schema_url: "".to_string(),
metrics: metrics
.into_iter()
.map(|(name, data_points)| get_metric_with_name(name, data_points))
Expand All @@ -394,7 +401,7 @@ mod tests {
}
ResourceMetrics {
resource: Some(resource),
schema_url: "".to_string(), // todo: replace with actual schema url.
schema_url: "".to_string(),
instrumentation_library_metrics,
}
}
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry-otlp/src/transform/resource.rs
Expand Up @@ -25,6 +25,14 @@ impl PartialOrd for ResourceWrapper {
}
}

impl ResourceWrapper {
#[cfg(all(feature = "grpc-tonic", feature = "metrics"))]
// it's currently only used by metrics. Trace set this in opentelemtry-proto
pub(crate) fn schema_url(&self) -> Option<&str> {
self.0.schema_url()
}
}

#[cfg(feature = "grpc-tonic")]
impl From<ResourceWrapper> for Resource {
fn from(resource: ResourceWrapper) -> Self {
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-proto/src/transform/traces.rs
Expand Up @@ -57,7 +57,10 @@ pub mod tonic {
.0,
dropped_attributes_count: 0,
}),
schema_url: "".to_string(), // todo: replace with actual schema url.
schema_url: source_span
.resource
.and_then(|resource| resource.schema_url().map(|url| url.to_string()))
.unwrap_or_default(),
instrumentation_library_spans: vec![InstrumentationLibrarySpans {
schema_url: source_span
.instrumentation_lib
Expand Down
99 changes: 95 additions & 4 deletions opentelemetry-sdk/src/resource/mod.rs
Expand Up @@ -30,6 +30,7 @@ pub use process::ProcessResourceDetector;
#[cfg(feature = "metrics")]
use opentelemetry_api::attributes;
use opentelemetry_api::{Key, KeyValue, Value};
use std::borrow::Cow;
use std::collections::{btree_map, BTreeMap};
use std::ops::Deref;
use std::time::Duration;
Expand All @@ -38,6 +39,7 @@ use std::time::Duration;
#[derive(Clone, Debug, PartialEq)]
pub struct Resource {
attrs: BTreeMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
}

impl Default for Resource {
Expand All @@ -54,6 +56,7 @@ impl Resource {
pub fn empty() -> Self {
Self {
attrs: Default::default(),
schema_url: None,
}
}

Expand All @@ -71,6 +74,24 @@ impl Resource {
resource
}

/// Create a new `Resource` from a key value pairs and [schema url].
///
/// Values are de-duplicated by key, and the first key-value pair with a non-empty string value
/// will be retained.
///
/// schema_url must be a valid URL using HTTP or HTTPS protocol.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn from_schema_url<KV, S>(kvs: KV, schema_url: S) -> Self
where
KV: IntoIterator<Item = KeyValue>,
S: Into<Cow<'static, str>>,
{
let mut resource = Self::new(kvs);
resource.schema_url = Some(schema_url.into());
resource
}

/// Create a new `Resource` from resource detectors.
///
/// timeout will be applied to each detector.
Expand All @@ -89,8 +110,19 @@ impl Resource {

/// Create a new `Resource` by combining two resources.
///
/// ### Key value pairs
/// Keys from the `other` resource have priority over keys from this resource, even if the
/// updated value is empty.
///
/// ### [Schema url]
/// If both of the resource are not empty. Schema url is determined by the following rules, in order:
/// 1. If this resource has a schema url, it will be used.
/// 2. If this resource does not have a schema url, and the other resource has a schema url, it will be used.
/// 3. If both resources have a schema url and it's the same, it will be used.
/// 4. If both resources have a schema url and it's different, the schema url will be empty.
/// 5. If both resources do not have a schema url, the schema url will be empty.
///
/// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn merge<T: Deref<Target = Self>>(&self, other: T) -> Self {
if self.attrs.is_empty() {
return other.clone();
Expand All @@ -109,9 +141,31 @@ impl Resource {
resource.attrs.insert(k.clone(), v.clone());
}

if self.schema_url == other.schema_url {
resource.schema_url = self.schema_url.clone();
} else if self.schema_url.is_none() {
// if the other resource has schema url, use it.
if other.schema_url.is_some() {
resource.schema_url = other.schema_url.clone();
}
// else empty schema url.
} else {
// if self has schema url, use it.
if other.schema_url.is_none() {
resource.schema_url = self.schema_url.clone();
}
}

resource
}

/// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn schema_url(&self) -> Option<&str> {
self.schema_url.as_ref().map(|s| s.as_ref())
}

/// Returns the number of attributes for this resource
pub fn len(&self) -> usize {
self.attrs.len()
Expand Down Expand Up @@ -215,13 +269,14 @@ mod tests {
assert_eq!(
Resource::new(args_with_dupe_keys),
Resource {
attrs: expected_attrs
attrs: expected_attrs,
schema_url: None,
}
);
}

#[test]
fn merge_resource() {
fn merge_resource_key_value_pairs() {
let resource_a = Resource::new(vec![
KeyValue::new("a", ""),
KeyValue::new("b", "b-value"),
Expand All @@ -243,11 +298,47 @@ mod tests {
assert_eq!(
resource_a.merge(&resource_b),
Resource {
attrs: expected_attrs
attrs: expected_attrs,
schema_url: None,
}
);
}

#[test]
fn merge_resource_schema_url() {
// if both resources contains key value pairs
let test_cases = vec![
(Some("http://schema/a"), None, Some("http://schema/a")),
(Some("http://schema/a"), Some("http://schema/b"), None),
(None, Some("http://schema/b"), Some("http://schema/b")),
(
Some("http://schema/a"),
Some("http://schema/a"),
Some("http://schema/a"),
),
(None, None, None),
];

for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() {
let mut resource = Resource::new(vec![KeyValue::new("key", "")]);
resource.schema_url = schema_url.map(Into::into);

let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]);
other_resource.schema_url = other_schema_url.map(Into::into);

assert_eq!(
resource.merge(&other_resource).schema_url,
expect_schema_url.map(Into::into)
);
}

// if only one resource contains key value pairs
let resource = Resource::from_schema_url(vec![], "http://schema/a");
let other_resource = Resource::new(vec![KeyValue::new("key", "")]);

assert_eq!(resource.merge(&other_resource).schema_url, None);
}

#[test]
fn detect_resource() {
env::set_var("OTEL_RESOURCE_ATTRIBUTES", "key=value, k = v , a= x, a=z");
Expand All @@ -262,7 +353,7 @@ mod tests {
KeyValue::new("key", "value"),
KeyValue::new("k", "v"),
KeyValue::new("a", "x"),
KeyValue::new("a", "z")
KeyValue::new("a", "z"),
])
)
}
Expand Down

0 comments on commit fdf6401

Please sign in to comment.