Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to static resource references #790

Merged
merged 1 commit into from May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 7 additions & 8 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -5,6 +5,7 @@ pub use model::ApiVersion;
pub use model::Error;
pub use model::FieldMappingFn;

use std::borrow::Cow;
use std::fmt::{Debug, Formatter};

use crate::exporter::model::FieldMapping;
Expand Down Expand Up @@ -165,18 +166,16 @@ impl DatadogPipelineBuilder {
let service_name = self.service_name.take();
if let Some(service_name) = service_name {
let config = if let Some(mut cfg) = self.trace_config.take() {
cfg.resource = cfg.resource.map(|r| {
let without_service_name = r
cfg.resource = Cow::Owned(Resource::new(
cfg.resource
.iter()
.filter(|(k, _v)| **k != semcov::resource::SERVICE_NAME)
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<KeyValue>>();
Arc::new(Resource::new(without_service_name))
});
.map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
));
cfg
} else {
Config {
resource: Some(Arc::new(Resource::empty())),
resource: Cow::Owned(Resource::empty()),
..Default::default()
}
};
Expand All @@ -190,7 +189,7 @@ impl DatadogPipelineBuilder {
(
Config {
// use a empty resource to prevent TracerProvider to assign a service name.
resource: Some(Arc::new(Resource::empty())),
resource: Cow::Owned(Resource::empty()),
..Default::default()
},
service_name,
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Expand Up @@ -179,12 +179,13 @@ impl ApiVersion {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use opentelemetry::sdk;
use opentelemetry::sdk::InstrumentationLibrary;
use opentelemetry::sdk::{self, Resource};
use opentelemetry::{
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
Key,
};
use std::borrow::Cow;
use std::time::{Duration, SystemTime};

fn get_traces() -> Vec<Vec<trace::SpanData>> {
Expand Down Expand Up @@ -221,7 +222,7 @@ pub(crate) mod tests {
events,
links,
status: Status::Ok,
resource: None,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: InstrumentationLibrary::new("component", None, None),
}
}
Expand Down
6 changes: 0 additions & 6 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Expand Up @@ -232,7 +232,6 @@ impl AgentPipeline {
let mut builder = sdk::trace::TracerProvider::builder();

let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down Expand Up @@ -270,7 +269,6 @@ impl AgentPipeline {
// build sdk trace config and jaeger process.
// some attributes like service name has attributes like service name
let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down Expand Up @@ -312,12 +310,10 @@ impl AgentPipeline {
where
R: JaegerTraceRuntime,
{
let builder = sdk::trace::TracerProvider::builder();
let export_instrument_library = self.transformation_config.export_instrument_library;
// build sdk trace config and jaeger process.
// some attributes like service name has attributes like service name
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand All @@ -331,9 +327,7 @@ impl AgentPipeline {

/// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
let builder = sdk::trace::TracerProvider::builder();
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down
Expand Up @@ -179,18 +179,14 @@ mod collector_client_tests {
use crate::exporter::thrift::jaeger::Batch;
use crate::new_collector_pipeline;
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::TraceError;
use opentelemetry::KeyValue;

#[test]
fn test_bring_your_own_client() -> Result<(), TraceError> {
let invalid_uri_builder = new_collector_pipeline()
.with_endpoint("localhost:6831")
.with_http_client(test_http_client::TestHttpClient);
let sdk_provided_resource =
Resource::new(vec![KeyValue::new("service.name", "unknown_service")]);
let (_, process) = build_config_and_process(sdk_provided_resource, None, None);
let (_, process) = build_config_and_process(None, None);
let mut uploader = invalid_uri_builder.build_uploader::<Tokio>()?;
let res = futures_executor::block_on(async {
uploader
Expand Down
Expand Up @@ -407,7 +407,6 @@ impl CollectorPipeline {
// some attributes like service name has attributes like service name
let export_instrument_library = self.transformation_config.export_instrument_library;
let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Expand Down
66 changes: 11 additions & 55 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -10,12 +10,9 @@
//! [jaeger deployment guide]: https://www.jaegertracing.io/docs/1.31/deployment

use crate::Process;
use opentelemetry::sdk::trace::Config;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::{TraceError, TracerProvider};
use opentelemetry::{global, sdk, KeyValue};
use opentelemetry_semantic_conventions as semcov;
use std::sync::Arc;

/// Config a exporter that sends the spans to a [jaeger agent](https://www.jaegertracing.io/docs/1.31/deployment/#agent).
pub mod agent;
Expand Down Expand Up @@ -54,35 +51,25 @@ trait HasRequiredConfig {
// There are multiple ways to set the service name. A `service.name` tag will be always added
// to the process tags.
fn build_config_and_process(
sdk_resource: sdk::Resource,
mut config: Option<sdk::trace::Config>,
config: Option<sdk::trace::Config>,
service_name_opt: Option<String>,
) -> (sdk::trace::Config, Process) {
let (config, resource) = if let Some(mut config) = config.take() {
let resource = if let Some(resource) = config.resource.replace(Arc::new(Resource::empty()))
{
sdk_resource.merge(resource)
} else {
sdk_resource
};

(config, resource)
} else {
(Config::default(), sdk_resource)
};
let config = config.unwrap_or_default();

let service_name = service_name_opt.unwrap_or_else(|| {
resource
config
.resource
.get(semcov::resource::SERVICE_NAME)
.map(|v| v.to_string())
.unwrap_or_else(|| "unknown_service".to_string())
});

// merge the tags and resource. Resources take priority.
let mut tags = resource
.into_iter()
.filter(|(key, _)| *key != semcov::resource::SERVICE_NAME)
.map(|(key, value)| KeyValue::new(key, value))
let mut tags = config
.resource
.iter()
.filter(|(key, _)| **key != semcov::resource::SERVICE_NAME)
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
.collect::<Vec<KeyValue>>();

tags.push(KeyValue::new(
Expand All @@ -101,51 +88,20 @@ mod tests {
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;
use std::env;
use std::sync::Arc;

#[test]
fn test_set_service_name() {
let service_name = "halloween_service".to_string();

// set via builder's service name, it has highest priority
let (_, process) =
build_config_and_process(Resource::empty(), None, Some(service_name.clone()));
let (_, process) = build_config_and_process(None, Some(service_name.clone()));
assert_eq!(process.service_name, service_name);

// make sure the tags in resource are moved to process
let trace_config = Config::default()
.with_resource(Resource::new(vec![KeyValue::new("test-key", "test-value")]));
let (config, process) =
build_config_and_process(Resource::empty(), Some(trace_config), Some(service_name));
assert_eq!(config.resource, Some(Arc::new(Resource::empty())));
let (_, process) = build_config_and_process(Some(trace_config), Some(service_name));
assert_eq!(process.tags.len(), 2);

// sdk provided resource can override service name if users didn't provided service name to builder
let (_, process) = build_config_and_process(
Resource::new(vec![KeyValue::new("service.name", "halloween_service")]),
None,
None,
);
assert_eq!(process.service_name, "halloween_service");

// users can also provided service.name from config's resource, in this case, it will override the
// sdk provided service name
let trace_config = Config::default().with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"override_service",
)]));
let (_, process) = build_config_and_process(
Resource::new(vec![KeyValue::new("service.name", "halloween_service")]),
Some(trace_config),
None,
);

assert_eq!(process.service_name, "override_service");
assert_eq!(process.tags.len(), 1);
assert_eq!(
process.tags[0],
KeyValue::new("service.name", "override_service")
);
}

#[test]
Expand Down
11 changes: 10 additions & 1 deletion opentelemetry-otlp/src/transform/metrics.rs
Expand Up @@ -417,7 +417,16 @@ mod tests {
// If we changed the sink function to process the input in parallel, we will have to sort other vectors
// like data points in Metrics.
fn assert_resource_metrics(mut expect: ResourceMetrics, mut actual: ResourceMetrics) {
assert_eq!(expect.resource, actual.resource);
assert_eq!(
expect
.resource
.as_mut()
.map(|r| r.attributes.sort_by_key(|kv| kv.key.to_string())),
actual
.resource
.as_mut()
.map(|r| r.attributes.sort_by_key(|kv| kv.key.to_string()))
);
assert_eq!(
expect.instrumentation_library_metrics.len(),
actual.instrumentation_library_metrics.len()
Expand Down
36 changes: 12 additions & 24 deletions opentelemetry-proto/src/transform/traces.rs
Expand Up @@ -51,15 +51,13 @@ pub mod tonic {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(
source_span.resource.as_ref().map(AsRef::as_ref),
)
.0,
attributes: resource_attributes(&source_span.resource).0,
dropped_attributes_count: 0,
}),
schema_url: source_span
.resource
.and_then(|resource| resource.schema_url().map(|url| url.to_string()))
.schema_url()
.map(|url| url.to_string())
.unwrap_or_default(),
instrumentation_library_spans: vec![InstrumentationLibrarySpans {
schema_url: source_span
Expand Down Expand Up @@ -112,14 +110,11 @@ pub mod tonic {
}
}

fn resource_attributes(resource: Option<&sdk::Resource>) -> Attributes {
fn resource_attributes(resource: &sdk::Resource) -> Attributes {
resource
.map(|res| {
res.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
.into()
}
}
Expand Down Expand Up @@ -175,10 +170,7 @@ pub mod grpcio {
fn from(source_span: SpanData) -> Self {
ResourceSpans {
resource: SingularPtrField::from(Some(Resource {
attributes: resource_attributes(
source_span.resource.as_ref().map(AsRef::as_ref),
)
.0,
attributes: resource_attributes(&source_span.resource).0,
dropped_attributes_count: 0,
..Default::default()
})),
Expand Down Expand Up @@ -246,15 +238,11 @@ pub mod grpcio {
}
}

fn resource_attributes(resource: Option<&sdk::Resource>) -> Attributes {
fn resource_attributes(resource: &sdk::Resource) -> Attributes {
resource
.map(|resource| {
resource
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
.into()
}
}
5 changes: 3 additions & 2 deletions opentelemetry-sdk/Cargo.toml
Expand Up @@ -4,23 +4,24 @@ version = "0.1.0"
edition = "2018"

[dependencies]
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api/" }
async-std = { version = "1.6", features = ["unstable"], optional = true }
async-trait = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }
dashmap = { version = "4.0.1", optional = true }
fnv = { version = "1.0", optional = true }
futures-channel = "0.3"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
lazy_static = "1.4"
once_cell = "1.10"
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api/" }
percent-encoding = { version = "2.0", optional = true }
pin-project = { version = "1.0.2", optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
serde = { version = "1.0", features = ["derive", "rc"], optional = true }
thiserror = "1"
tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }

[package.metadata.docs.rs]
all-features = true
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/benches/batch_span_processor.rs
Expand Up @@ -6,6 +6,8 @@ use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{BatchSpanProcessor, EvictedHashMap, EvictedQueue, SpanProcessor};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Runtime;
Expand All @@ -30,7 +32,7 @@ fn get_span_data() -> Vec<SpanData> {
events: EvictedQueue::new(12),
links: EvictedQueue::new(12),
status: Status::Unset,
resource: None,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/export/trace/mod.rs
@@ -1,9 +1,9 @@
//! Trace exporters
use crate::Resource;
use async_trait::async_trait;
use opentelemetry_api::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status, TraceError};
use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;

pub mod stdout;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct SpanData {
/// Span status
pub status: Status,
/// Resource contains attributes representing an entity that produced this span.
pub resource: Option<Arc<crate::Resource>>,
pub resource: Cow<'static, Resource>,
/// Instrumentation library that produced this span
pub instrumentation_lib: crate::InstrumentationLibrary,
}