Skip to content

Commit

Permalink
Switch to static resource references
Browse files Browse the repository at this point in the history
The spec suggests all spans should have an associated `Resource`. This
change switches trace config and span data from `Option<Arc<Resource>>`
to `Cow<'static, Resource>` and removes `Config::with_no_resource` to
accommodate this requirement.
  • Loading branch information
jtescher committed May 1, 2022
1 parent 90a70ee commit 90c28b0
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 185 deletions.
15 changes: 7 additions & 8 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -5,6 +5,7 @@ pub use model::ApiVersion;
pub use model::Error;
pub use model::FieldMappingFn;

use std::borrow::Cow;
use std::fmt::{Debug, Formatter};

use crate::exporter::model::FieldMapping;
Expand Down Expand Up @@ -165,18 +166,16 @@ impl DatadogPipelineBuilder {
let service_name = self.service_name.take();
if let Some(service_name) = service_name {
let config = if let Some(mut cfg) = self.trace_config.take() {
cfg.resource = cfg.resource.map(|r| {
let without_service_name = r
cfg.resource = Cow::Owned(Resource::new(
cfg.resource
.iter()
.filter(|(k, _v)| **k != semcov::resource::SERVICE_NAME)
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<KeyValue>>();
Arc::new(Resource::new(without_service_name))
});
.map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
));
cfg
} else {
Config {
resource: Some(Arc::new(Resource::empty())),
resource: Cow::Owned(Resource::empty()),
..Default::default()
}
};
Expand All @@ -190,7 +189,7 @@ impl DatadogPipelineBuilder {
(
Config {
// use a empty resource to prevent TracerProvider to assign a service name.
resource: Some(Arc::new(Resource::empty())),
resource: Cow::Owned(Resource::empty()),
..Default::default()
},
service_name,
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Expand Up @@ -179,12 +179,13 @@ impl ApiVersion {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use opentelemetry::sdk;
use opentelemetry::sdk::InstrumentationLibrary;
use opentelemetry::sdk::{self, Resource};
use opentelemetry::{
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
Key,
};
use std::borrow::Cow;
use std::time::{Duration, SystemTime};

fn get_traces() -> Vec<Vec<trace::SpanData>> {
Expand Down Expand Up @@ -221,7 +222,7 @@ pub(crate) mod tests {
events,
links,
status: Status::Ok,
resource: None,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: InstrumentationLibrary::new("component", None, None),
}
}
Expand Down
6 changes: 0 additions & 6 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Expand Up @@ -232,7 +232,6 @@ impl AgentPipeline {
let mut builder = sdk::trace::TracerProvider::builder();

let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down Expand Up @@ -270,7 +269,6 @@ impl AgentPipeline {
// build sdk trace config and jaeger process.
// some attributes like service name has attributes like service name
let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down Expand Up @@ -312,12 +310,10 @@ impl AgentPipeline {
where
R: JaegerTraceRuntime,
{
let builder = sdk::trace::TracerProvider::builder();
let export_instrument_library = self.transformation_config.export_instrument_library;
// build sdk trace config and jaeger process.
// some attributes like service name has attributes like service name
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand All @@ -331,9 +327,7 @@ impl AgentPipeline {

/// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
let builder = sdk::trace::TracerProvider::builder();
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down
Expand Up @@ -179,18 +179,14 @@ mod collector_client_tests {
use crate::exporter::thrift::jaeger::Batch;
use crate::new_collector_pipeline;
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::TraceError;
use opentelemetry::KeyValue;

#[test]
fn test_bring_your_own_client() -> Result<(), TraceError> {
let invalid_uri_builder = new_collector_pipeline()
.with_endpoint("localhost:6831")
.with_http_client(test_http_client::TestHttpClient);
let sdk_provided_resource =
Resource::new(vec![KeyValue::new("service.name", "unknown_service")]);
let (_, process) = build_config_and_process(sdk_provided_resource, None, None);
let (_, process) = build_config_and_process(None, None);
let mut uploader = invalid_uri_builder.build_uploader::<Tokio>()?;
let res = futures_executor::block_on(async {
uploader
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Expand Up @@ -407,7 +407,6 @@ impl CollectorPipeline {
// some attributes like service name has attributes like service name
let export_instrument_library = self.transformation_config.export_instrument_library;
let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down
66 changes: 11 additions & 55 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -10,12 +10,9 @@
//! [jaeger deployment guide]: https://www.jaegertracing.io/docs/1.31/deployment

use crate::Process;
use opentelemetry::sdk::trace::Config;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::{TraceError, TracerProvider};
use opentelemetry::{global, sdk, KeyValue};
use opentelemetry_semantic_conventions as semcov;
use std::sync::Arc;

/// Config a exporter that sends the spans to a [jaeger agent](https://www.jaegertracing.io/docs/1.31/deployment/#agent).
pub mod agent;
Expand Down Expand Up @@ -54,35 +51,25 @@ trait HasRequiredConfig {
// There are multiple ways to set the service name. A `service.name` tag will be always added
// to the process tags.
fn build_config_and_process(
sdk_resource: sdk::Resource,
mut config: Option<sdk::trace::Config>,
config: Option<sdk::trace::Config>,
service_name_opt: Option<String>,
) -> (sdk::trace::Config, Process) {
let (config, resource) = if let Some(mut config) = config.take() {
let resource = if let Some(resource) = config.resource.replace(Arc::new(Resource::empty()))
{
sdk_resource.merge(resource)
} else {
sdk_resource
};

(config, resource)
} else {
(Config::default(), sdk_resource)
};
let config = config.unwrap_or_default();

let service_name = service_name_opt.unwrap_or_else(|| {
resource
config
.resource
.get(semcov::resource::SERVICE_NAME)
.map(|v| v.to_string())
.unwrap_or_else(|| "unknown_service".to_string())
});

// merge the tags and resource. Resources take priority.
let mut tags = resource
.into_iter()
.filter(|(key, _)| *key != semcov::resource::SERVICE_NAME)
.map(|(key, value)| KeyValue::new(key, value))
let mut tags = config
.resource
.iter()
.filter(|(key, _)| **key != semcov::resource::SERVICE_NAME)
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
.collect::<Vec<KeyValue>>();

tags.push(KeyValue::new(
Expand All @@ -101,51 +88,20 @@ mod tests {
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;
use std::env;
use std::sync::Arc;

#[test]
fn test_set_service_name() {
let service_name = "halloween_service".to_string();

// set via builder's service name, it has highest priority
let (_, process) =
build_config_and_process(Resource::empty(), None, Some(service_name.clone()));
let (_, process) = build_config_and_process(None, Some(service_name.clone()));
assert_eq!(process.service_name, service_name);

// make sure the tags in resource are moved to process
let trace_config = Config::default()
.with_resource(Resource::new(vec![KeyValue::new("test-key", "test-value")]));
let (config, process) =
build_config_and_process(Resource::empty(), Some(trace_config), Some(service_name));
assert_eq!(config.resource, Some(Arc::new(Resource::empty())));
let (_, process) = build_config_and_process(Some(trace_config), Some(service_name));
assert_eq!(process.tags.len(), 2);

// sdk provided resource can override service name if users didn't provided service name to builder
let (_, process) = build_config_and_process(
Resource::new(vec![KeyValue::new("service.name", "halloween_service")]),
None,
None,
);
assert_eq!(process.service_name, "halloween_service");

// users can also provided service.name from config's resource, in this case, it will override the
// sdk provided service name
let trace_config = Config::default().with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"override_service",
)]));
let (_, process) = build_config_and_process(
Resource::new(vec![KeyValue::new("service.name", "halloween_service")]),
Some(trace_config),
None,
);

assert_eq!(process.service_name, "override_service");
assert_eq!(process.tags.len(), 1);
assert_eq!(
process.tags[0],
KeyValue::new("service.name", "override_service")
);
}

#[test]
Expand Down
11 changes: 10 additions & 1 deletion opentelemetry-otlp/src/transform/metrics.rs
Expand Up @@ -417,7 +417,16 @@ mod tests {
// If we changed the sink function to process the input in parallel, we will have to sort other vectors
// like data points in Metrics.
fn assert_resource_metrics(mut expect: ResourceMetrics, mut actual: ResourceMetrics) {
assert_eq!(expect.resource, actual.resource);
assert_eq!(
expect
.resource
.as_mut()
.map(|r| r.attributes.sort_by_key(|kv| kv.key.to_string())),
actual
.resource
.as_mut()
.map(|r| r.attributes.sort_by_key(|kv| kv.key.to_string()))
);
assert_eq!(
expect.instrumentation_library_metrics.len(),
actual.instrumentation_library_metrics.len()
Expand Down
36 changes: 12 additions & 24 deletions opentelemetry-proto/src/transform/traces.rs
Expand Up @@ -51,15 +51,13 @@ pub mod tonic {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(
source_span.resource.as_ref().map(AsRef::as_ref),
)
.0,
attributes: resource_attributes(&source_span.resource).0,
dropped_attributes_count: 0,
}),
schema_url: source_span
.resource
.and_then(|resource| resource.schema_url().map(|url| url.to_string()))
.schema_url()
.map(|url| url.to_string())
.unwrap_or_default(),
instrumentation_library_spans: vec![InstrumentationLibrarySpans {
schema_url: source_span
Expand Down Expand Up @@ -112,14 +110,11 @@ pub mod tonic {
}
}

fn resource_attributes(resource: Option<&sdk::Resource>) -> Attributes {
fn resource_attributes(resource: &sdk::Resource) -> Attributes {
resource
.map(|res| {
res.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
.into()
}
}
Expand Down Expand Up @@ -175,10 +170,7 @@ pub mod grpcio {
fn from(source_span: SpanData) -> Self {
ResourceSpans {
resource: SingularPtrField::from(Some(Resource {
attributes: resource_attributes(
source_span.resource.as_ref().map(AsRef::as_ref),
)
.0,
attributes: resource_attributes(&source_span.resource).0,
dropped_attributes_count: 0,
..Default::default()
})),
Expand Down Expand Up @@ -246,15 +238,11 @@ pub mod grpcio {
}
}

fn resource_attributes(resource: Option<&sdk::Resource>) -> Attributes {
fn resource_attributes(resource: &sdk::Resource) -> Attributes {
resource
.map(|resource| {
resource
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
.into()
}
}
5 changes: 3 additions & 2 deletions opentelemetry-sdk/Cargo.toml
Expand Up @@ -4,23 +4,24 @@ version = "0.1.0"
edition = "2018"

[dependencies]
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api/" }
async-std = { version = "1.6", features = ["unstable"], optional = true }
async-trait = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }
dashmap = { version = "4.0.1", optional = true }
fnv = { version = "1.0", optional = true }
futures-channel = "0.3"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
lazy_static = "1.4"
once_cell = "1.10"
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api/" }
percent-encoding = { version = "2.0", optional = true }
pin-project = { version = "1.0.2", optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
serde = { version = "1.0", features = ["derive", "rc"], optional = true }
thiserror = "1"
tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }

[package.metadata.docs.rs]
all-features = true
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/export/trace/mod.rs
@@ -1,9 +1,9 @@
//! Trace exporters
use crate::Resource;
use async_trait::async_trait;
use opentelemetry_api::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status, TraceError};
use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;

pub mod stdout;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct SpanData {
/// Span status
pub status: Status,
/// Resource contains attributes representing an entity that produced this span.
pub resource: Option<Arc<crate::Resource>>,
pub resource: Cow<'static, Resource>,
/// Instrumentation library that produced this span
pub instrumentation_lib: crate::InstrumentationLibrary,
}

0 comments on commit 90c28b0

Please sign in to comment.