Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): add schema_url to resource. #775

Merged
merged 7 commits into from Apr 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 12 additions & 5 deletions opentelemetry-otlp/src/transform/metrics.rs
Expand Up @@ -14,6 +14,7 @@ pub(crate) mod tonic {
ArrayAggregator, HistogramAggregator, LastValueAggregator, MinMaxSumCountAggregator,
SumAggregator,
};
use opentelemetry::sdk::InstrumentationLibrary;
use opentelemetry_proto::tonic::metrics::v1::DataPointFlags;
use opentelemetry_proto::tonic::FromNumber;
use opentelemetry_proto::tonic::{
Expand All @@ -28,7 +29,6 @@ pub(crate) mod tonic {

use crate::to_nanos;
use crate::transform::{CheckpointedMetrics, ResourceWrapper};
use opentelemetry::sdk::InstrumentationLibrary;
use std::collections::{BTreeMap, HashMap};

pub(crate) fn record_to_metric(
Expand Down Expand Up @@ -212,14 +212,21 @@ pub(crate) mod tonic {
resource_metrics: sink_map
.into_iter()
.map(|(resource, metric_map)| ResourceMetrics {
schema_url: resource
.schema_url()
.map(|s| s.to_string())
.unwrap_or_default(),
resource: Some(resource.into()),
schema_url: "".to_string(), // todo: replace with actual schema url.
instrumentation_library_metrics: metric_map
.into_iter()
.map(
|(instrumentation_library, metrics)| InstrumentationLibraryMetrics {
schema_url: instrumentation_library
.schema_url
.clone()
.unwrap_or_default()
.to_string(),
instrumentation_library: Some(instrumentation_library.into()),
schema_url: "".to_string(), // todo: replace with actual schema url.
metrics: metrics
.into_iter()
.map(|(_k, v)| v)
Expand Down Expand Up @@ -385,7 +392,7 @@ mod tests {
version: instrumentation_version.unwrap_or("").to_string(),
},
),
schema_url: "".to_string(), // todo: replace with actual schema url.
schema_url: "".to_string(),
metrics: metrics
.into_iter()
.map(|(name, data_points)| get_metric_with_name(name, data_points))
Expand All @@ -394,7 +401,7 @@ mod tests {
}
ResourceMetrics {
resource: Some(resource),
schema_url: "".to_string(), // todo: replace with actual schema url.
schema_url: "".to_string(),
instrumentation_library_metrics,
}
}
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry-otlp/src/transform/resource.rs
Expand Up @@ -25,6 +25,14 @@ impl PartialOrd for ResourceWrapper {
}
}

impl ResourceWrapper {
#[cfg(all(feature = "grpc-tonic", feature = "metrics"))]
// it's currently only used by metrics. Trace set this in opentelemtry-proto
pub(crate) fn schema_url(&self) -> Option<&str> {
self.0.schema_url()
}
}

#[cfg(feature = "grpc-tonic")]
impl From<ResourceWrapper> for Resource {
fn from(resource: ResourceWrapper) -> Self {
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-proto/src/transform/traces.rs
Expand Up @@ -57,7 +57,10 @@ pub mod tonic {
.0,
dropped_attributes_count: 0,
}),
schema_url: "".to_string(), // todo: replace with actual schema url.
schema_url: source_span
.resource
.and_then(|resource| resource.schema_url().map(|url| url.to_string()))
.unwrap_or_default(),
instrumentation_library_spans: vec![InstrumentationLibrarySpans {
schema_url: source_span
.instrumentation_lib
Expand Down
99 changes: 95 additions & 4 deletions opentelemetry-sdk/src/resource/mod.rs
Expand Up @@ -30,6 +30,7 @@ pub use process::ProcessResourceDetector;
#[cfg(feature = "metrics")]
use opentelemetry_api::attributes;
use opentelemetry_api::{Key, KeyValue, Value};
use std::borrow::Cow;
use std::collections::{btree_map, BTreeMap};
use std::ops::Deref;
use std::time::Duration;
Expand All @@ -38,6 +39,7 @@ use std::time::Duration;
#[derive(Clone, Debug, PartialEq)]
pub struct Resource {
attrs: BTreeMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
}

impl Default for Resource {
Expand All @@ -54,6 +56,7 @@ impl Resource {
pub fn empty() -> Self {
Self {
attrs: Default::default(),
schema_url: None,
}
}

Expand All @@ -71,6 +74,24 @@ impl Resource {
resource
}

/// Create a new `Resource` from a key value pairs and [schema url].
///
/// Values are de-duplicated by key, and the first key-value pair with a non-empty string value
/// will be retained.
///
/// schema_url must be a valid URL using HTTP or HTTPS protocol.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn from_schema_url<KV, S>(kvs: KV, schema_url: S) -> Self
where
KV: IntoIterator<Item = KeyValue>,
S: Into<Cow<'static, str>>,
{
let mut resource = Self::new(kvs);
resource.schema_url = Some(schema_url.into());
resource
}

/// Create a new `Resource` from resource detectors.
///
/// timeout will be applied to each detector.
Expand All @@ -89,8 +110,19 @@ impl Resource {

/// Create a new `Resource` by combining two resources.
///
/// ### Key value pairs
/// Keys from the `other` resource have priority over keys from this resource, even if the
/// updated value is empty.
///
/// ### [Schema url]
/// If both of the resource are not empty. Schema url is determined by the following rules, in order:
/// 1. If this resource has a schema url, it will be used.
/// 2. If this resource does not have a schema url, and the other resource has a schema url, it will be used.
/// 3. If both resources have a schema url and it's the same, it will be used.
/// 4. If both resources have a schema url and it's different, the schema url will be empty.
/// 5. If both resources do not have a schema url, the schema url will be empty.
///
/// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn merge<T: Deref<Target = Self>>(&self, other: T) -> Self {
if self.attrs.is_empty() {
return other.clone();
Expand All @@ -109,9 +141,31 @@ impl Resource {
resource.attrs.insert(k.clone(), v.clone());
}

if self.schema_url == other.schema_url {
resource.schema_url = self.schema_url.clone();
} else if self.schema_url.is_none() {
// if the other resource has schema url, use it.
if other.schema_url.is_some() {
resource.schema_url = other.schema_url.clone();
}
// else empty schema url.
} else {
// if self has schema url, use it.
if other.schema_url.is_none() {
resource.schema_url = self.schema_url.clone();
}
}

resource
}

/// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn schema_url(&self) -> Option<&str> {
self.schema_url.as_ref().map(|s| s.as_ref())
}

/// Returns the number of attributes for this resource
pub fn len(&self) -> usize {
self.attrs.len()
Expand Down Expand Up @@ -215,13 +269,14 @@ mod tests {
assert_eq!(
Resource::new(args_with_dupe_keys),
Resource {
attrs: expected_attrs
attrs: expected_attrs,
schema_url: None,
}
);
}

#[test]
fn merge_resource() {
fn merge_resource_key_value_pairs() {
let resource_a = Resource::new(vec![
KeyValue::new("a", ""),
KeyValue::new("b", "b-value"),
Expand All @@ -243,11 +298,47 @@ mod tests {
assert_eq!(
resource_a.merge(&resource_b),
Resource {
attrs: expected_attrs
attrs: expected_attrs,
schema_url: None,
}
);
}

#[test]
fn merge_resource_schema_url() {
// if both resources contains key value pairs
let test_cases = vec![
(Some("http://schema/a"), None, Some("http://schema/a")),
(Some("http://schema/a"), Some("http://schema/b"), None),
(None, Some("http://schema/b"), Some("http://schema/b")),
(
Some("http://schema/a"),
Some("http://schema/a"),
Some("http://schema/a"),
),
(None, None, None),
];

for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() {
let mut resource = Resource::new(vec![KeyValue::new("key", "")]);
resource.schema_url = schema_url.map(Into::into);

let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]);
other_resource.schema_url = other_schema_url.map(Into::into);

assert_eq!(
resource.merge(&other_resource).schema_url,
expect_schema_url.map(Into::into)
);
}

// if only one resource contains key value pairs
let resource = Resource::from_schema_url(vec![], "http://schema/a");
let other_resource = Resource::new(vec![KeyValue::new("key", "")]);

assert_eq!(resource.merge(&other_resource).schema_url, None);
}

#[test]
fn detect_resource() {
env::set_var("OTEL_RESOURCE_ATTRIBUTES", "key=value, k = v , a= x, a=z");
Expand All @@ -262,7 +353,7 @@ mod tests {
KeyValue::new("key", "value"),
KeyValue::new("k", "v"),
KeyValue::new("a", "x"),
KeyValue::new("a", "z")
KeyValue::new("a", "z"),
])
)
}
Expand Down