From ed3ceaed0900cea9732c567c8b33836e045366e6 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Sat, 5 Dec 2020 15:18:01 -0500 Subject: [PATCH] feat: move jaeger and zipkin propagators into their own crate and rename them. --- CONTRIBUTING.md | 4 + opentelemetry-contrib/Cargo.toml | 2 - .../src/trace/propagator/mod.rs | 8 - opentelemetry-jaeger/Cargo.toml | 4 + opentelemetry-jaeger/README.md | 8 +- .../src/{ => exporter}/agent.rs | 4 +- .../src/{ => exporter}/collector.rs | 4 +- .../src/{ => exporter}/env.rs | 0 opentelemetry-jaeger/src/exporter/mod.rs | 519 +++++++++++++++++ .../src/{ => exporter}/thrift/agent.rs | 0 .../src/{ => exporter}/thrift/jaeger.rs | 0 .../src/{ => exporter}/thrift/mod.rs | 0 .../src/{ => exporter}/thrift/zipkincore.rs | 0 .../src/{ => exporter}/transport/buffer.rs | 0 .../src/{ => exporter}/transport/mod.rs | 0 .../src/{ => exporter}/transport/noop.rs | 0 .../src/{ => exporter}/uploader.rs | 4 +- opentelemetry-jaeger/src/lib.rs | 526 +----------------- .../src/propagator/mod.rs | 24 +- opentelemetry-zipkin/Cargo.toml | 1 + opentelemetry-zipkin/README.md | 4 + opentelemetry-zipkin/src/exporter/mod.rs | 189 +++++++ .../src/{ => exporter}/model/annotation.rs | 2 +- .../src/{ => exporter}/model/endpoint.rs | 2 +- .../src/{ => exporter}/model/mod.rs | 0 .../src/{ => exporter}/model/span.rs | 8 +- .../src/{ => exporter}/uploader.rs | 4 +- opentelemetry-zipkin/src/lib.rs | 196 +------ .../src/propagator/mod.rs | 36 +- opentelemetry/Cargo.toml | 2 +- 30 files changed, 791 insertions(+), 760 deletions(-) rename opentelemetry-jaeger/src/{ => exporter}/agent.rs (97%) rename opentelemetry-jaeger/src/{ => exporter}/collector.rs (99%) rename opentelemetry-jaeger/src/{ => exporter}/env.rs (100%) create mode 100644 opentelemetry-jaeger/src/exporter/mod.rs rename opentelemetry-jaeger/src/{ => exporter}/thrift/agent.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/thrift/jaeger.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/thrift/mod.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/thrift/zipkincore.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/transport/buffer.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/transport/mod.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/transport/noop.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/uploader.rs (94%) rename opentelemetry-contrib/src/trace/propagator/jaeger.rs => opentelemetry-jaeger/src/propagator/mod.rs (95%) create mode 100644 opentelemetry-zipkin/src/exporter/mod.rs rename opentelemetry-zipkin/src/{ => exporter}/model/annotation.rs (94%) rename opentelemetry-zipkin/src/{ => exporter}/model/endpoint.rs (97%) rename opentelemetry-zipkin/src/{ => exporter}/model/mod.rs (100%) rename opentelemetry-zipkin/src/{ => exporter}/model/span.rs (94%) rename opentelemetry-zipkin/src/{ => exporter}/uploader.rs (94%) rename opentelemetry-contrib/src/trace/propagator/b3.rs => opentelemetry-zipkin/src/propagator/mod.rs (96%) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5864fc4989..59a17c8dc1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -129,3 +129,7 @@ See the [code owners](CODEOWNERS) file. See the [community membership document in OpenTelemetry community repo](https://github.com/open-telemetry/community/blob/master/community-membership.md). + +## FAQ +### Where should I put third party propagators/exporters, contrib or standalone crates? +As of now, the specification classify the propagators into three categories: Fully opened standards, platform-specific standards, proprietary headers. The conclusion is only the fully opened standards should live in SDK packages/repos. So here, only fully opened standards should live as independent crate. For more detail and discussion, see [this pr](https://github.com/open-telemetry/opentelemetry-specification/pull/1144). \ No newline at end of file diff --git a/opentelemetry-contrib/Cargo.toml b/opentelemetry-contrib/Cargo.toml index d67d085a61..68598bfa60 100644 --- a/opentelemetry-contrib/Cargo.toml +++ b/opentelemetry-contrib/Cargo.toml @@ -26,8 +26,6 @@ datadog = ["indexmap", "rmp", "async-trait", "thiserror"] reqwest-blocking-client = ["reqwest/blocking", "opentelemetry/reqwest"] reqwest-client = ["reqwest", "opentelemetry/reqwest"] surf-client = ["surf", "opentelemetry/surf"] -jaeger = [] -zipkin = [] aws-xray = [] [dependencies] diff --git a/opentelemetry-contrib/src/trace/propagator/mod.rs b/opentelemetry-contrib/src/trace/propagator/mod.rs index ef2a8072c7..43d038ecba 100644 --- a/opentelemetry-contrib/src/trace/propagator/mod.rs +++ b/opentelemetry-contrib/src/trace/propagator/mod.rs @@ -13,15 +13,7 @@ //! This module also provides relative types for those propagators. #[cfg(feature = "aws-xray")] mod aws; -#[cfg(feature = "zipkin")] -mod b3; pub mod binary; -#[cfg(feature = "jaeger")] -mod jaeger; #[cfg(feature = "aws-xray")] pub use aws::XrayPropagator; -#[cfg(feature = "zipkin")] -pub use b3::{B3Encoding, B3Propagator}; -#[cfg(feature = "jaeger")] -pub use jaeger::JaegerPropagator; diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 6db9117e7c..1db2a3a9a5 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -34,6 +34,10 @@ tokio = { version = "0.2", features = ["udp", "sync"], optional = true } wasm-bindgen = { version = "0.2", optional = true } wasm-bindgen-futures = { version = "0.4.18", optional = true } thiserror = "1.0" +lazy_static = "1.4" + +[dev-dependencies] +opentelemetry = { version = "0.10", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } [dependencies.web-sys] version = "0.3.4" diff --git a/opentelemetry-jaeger/README.md b/opentelemetry-jaeger/README.md index f570d0c320..0044645d4f 100644 --- a/opentelemetry-jaeger/README.md +++ b/opentelemetry-jaeger/README.md @@ -4,7 +4,7 @@ # OpenTelemetry Jaeger -[`Jaeger`] integration for applications instrumented with [`OpenTelemetry`]. +[`Jaeger`] integration for applications instrumented with [`OpenTelemetry`]. This includes a jaeger exporter and a jaeger propagator. [![Crates.io: opentelemetry-jaeger](https://img.shields.io/crates/v/opentelemetry-jaeger.svg)](https://crates.io/crates/opentelemetry-jaeger) [![Documentation](https://docs.rs/opentelemetry-jaeger/badge.svg)](https://docs.rs/opentelemetry-jaeger) @@ -43,8 +43,10 @@ exporting telemetry: ```rust use opentelemetry::tracer; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().install()?; tracer.in_span("doing_work", |cx| { @@ -84,8 +86,10 @@ in the [jaeger variables spec]. ```rust use opentelemetry::tracer; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); // export OTEL_SERVICE_NAME=my-service-name let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?; @@ -142,8 +146,10 @@ Example showing how to override all configuration options. See the ```rust use opentelemetry::{KeyValue, Tracer}; use opentelemetry::sdk::{trace, IdGenerator, Resource, Sampler}; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() .from_env() .with_agent_endpoint("localhost:6831") diff --git a/opentelemetry-jaeger/src/agent.rs b/opentelemetry-jaeger/src/exporter/agent.rs similarity index 97% rename from opentelemetry-jaeger/src/agent.rs rename to opentelemetry-jaeger/src/exporter/agent.rs index bd6b72485b..3e734ac634 100644 --- a/opentelemetry-jaeger/src/agent.rs +++ b/opentelemetry-jaeger/src/exporter/agent.rs @@ -1,9 +1,9 @@ //! # UDP Jaeger Agent Client -use crate::thrift::{ +use crate::exporter::thrift::{ agent::{self, TAgentSyncClient}, jaeger, }; -use crate::transport::{TBufferChannel, TNoopChannel}; +use crate::exporter::transport::{TBufferChannel, TNoopChannel}; use std::fmt; use std::net::{ToSocketAddrs, UdpSocket}; use thrift::{ diff --git a/opentelemetry-jaeger/src/collector.rs b/opentelemetry-jaeger/src/exporter/collector.rs similarity index 99% rename from opentelemetry-jaeger/src/collector.rs rename to opentelemetry-jaeger/src/exporter/collector.rs index d502fb2683..0b26b2e16a 100644 --- a/opentelemetry-jaeger/src/collector.rs +++ b/opentelemetry-jaeger/src/exporter/collector.rs @@ -23,7 +23,7 @@ struct WasmHttpClient { #[cfg(feature = "collector_client")] mod collector_client { use super::*; - use crate::thrift::jaeger; + use crate::exporter::thrift::jaeger; use http::{Request, Uri}; use isahc::{ auth::{Authentication, Credentials}, @@ -106,7 +106,7 @@ mod collector_client { #[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] mod wasm_collector_client { use super::*; - use crate::thrift::jaeger; + use crate::exporter::thrift::jaeger; use futures_util::future; use http::Uri; use js_sys::Uint8Array; diff --git a/opentelemetry-jaeger/src/env.rs b/opentelemetry-jaeger/src/exporter/env.rs similarity index 100% rename from opentelemetry-jaeger/src/env.rs rename to opentelemetry-jaeger/src/exporter/env.rs diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs new file mode 100644 index 0000000000..62f38484ca --- /dev/null +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -0,0 +1,519 @@ +//! # Jaeger Exporter +//! +mod agent; +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] +mod collector; +#[allow(clippy::all, unreachable_pub, dead_code)] +#[rustfmt::skip] +mod thrift; +mod env; +pub(crate) mod transport; +mod uploader; + +use self::thrift::jaeger; +use agent::AgentAsyncClientUDP; +use async_trait::async_trait; +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] +use collector::CollectorAsyncClientHttp; +use opentelemetry::sdk::export::ExportError; +use opentelemetry::trace::TraceError; +use opentelemetry::{ + global, sdk, + sdk::export::trace, + trace::{Event, Link, SpanKind, StatusCode, TracerProvider}, + Key, KeyValue, Value, +}; +use std::{ + net, + time::{Duration, SystemTime}, +}; +use uploader::BatchUploader; + +/// Default service name if no service is configured. +const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; + +/// Default agent endpoint if none is provided +const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; + +/// Instrument Library name MUST be reported in Jaeger Span tags with the following key +const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; + +/// Instrument Library version MUST be reported in Jaeger Span tags with the following key +const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; + +/// Create a new Jaeger exporter pipeline builder. +pub fn new_pipeline() -> PipelineBuilder { + PipelineBuilder::default() +} + +/// Guard that uninstalls the Jaeger trace pipeline when dropped +#[must_use] +#[derive(Debug)] +pub struct Uninstall(global::TracerProviderGuard); + +/// Jaeger span exporter +#[derive(Debug)] +pub struct Exporter { + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: uploader::BatchUploader, +} + +/// Jaeger process configuration +#[derive(Debug, Default)] +pub struct Process { + /// Jaeger service name + pub service_name: String, + /// Jaeger tags + pub tags: Vec, +} + +impl Into for Process { + fn into(self) -> jaeger::Process { + jaeger::Process::new( + self.service_name, + Some(self.tags.into_iter().map(Into::into).collect()), + ) + } +} + +#[async_trait] +impl trace::SpanExporter for Exporter { + /// Export spans to Jaeger + async fn export(&mut self, batch: Vec) -> trace::ExportResult { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let mut process = self.process.clone(); + + for (idx, span) in batch.into_iter().enumerate() { + if idx == 0 { + if let Some(span_process_tags) = build_process_tags(&span) { + if let Some(process_tags) = &mut process.tags { + process_tags.extend(span_process_tags); + } else { + process.tags = Some(span_process_tags.collect()) + } + } + } + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + self.uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await + } +} + +/// Jaeger exporter builder +#[derive(Debug)] +pub struct PipelineBuilder { + agent_endpoint: Vec, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_endpoint: Option, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_username: Option, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_password: Option, + export_instrument_library: bool, + process: Process, + config: Option, +} + +impl Default for PipelineBuilder { + /// Return the default Exporter Builder. + fn default() -> Self { + PipelineBuilder { + agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_endpoint: None, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_username: None, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_password: None, + export_instrument_library: true, + process: Process { + service_name: DEFAULT_SERVICE_NAME.to_string(), + tags: Vec::new(), + }, + config: None, + } + } +} + +impl PipelineBuilder { + /// Assign builder attributes from environment variables. + /// + /// See the [jaeger variable spec] for full list. + /// + /// [jaeger variable spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter + #[allow(clippy::wrong_self_convention)] + pub fn from_env(self) -> Self { + env::assign_attrs(self) + } + + /// Assign the agent endpoint. + pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self { + PipelineBuilder { + agent_endpoint: agent_endpoint + .to_socket_addrs() + .map(|addrs| addrs.collect()) + .unwrap_or_default(), + + ..self + } + } + + /// Config whether to export information of instrumentation library. + pub fn with_instrumentation_library_tags(self, export: bool) -> Self { + PipelineBuilder { + export_instrument_library: export, + ..self + } + } + + /// Assign the collector endpoint. + /// + /// E.g. "http://localhost:14268/api/traces" + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(cfg(any(feature = "collector_client", feature = "wasm_collector_client"))) + )] + pub fn with_collector_endpoint(self, collector_endpoint: T) -> Self + where + http::Uri: core::convert::TryFrom, + { + PipelineBuilder { + collector_endpoint: core::convert::TryFrom::try_from(collector_endpoint).ok(), + ..self + } + } + + /// Assign the collector username + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(any(feature = "collector_client", feature = "wasm_collector_client")) + )] + pub fn with_collector_username>(self, collector_username: S) -> Self { + PipelineBuilder { + collector_username: Some(collector_username.into()), + ..self + } + } + + /// Assign the collector password + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(any(feature = "collector_client", feature = "wasm_collector_client")) + )] + pub fn with_collector_password>(self, collector_password: S) -> Self { + PipelineBuilder { + collector_password: Some(collector_password.into()), + ..self + } + } + + /// Assign the process service name. + pub fn with_service_name>(mut self, service_name: T) -> Self { + self.process.service_name = service_name.into(); + self + } + + /// Assign the process service tags. + pub fn with_tags>(mut self, tags: T) -> Self { + self.process.tags = tags.into_iter().collect(); + self + } + + /// Assign the SDK config for the exporter pipeline. + pub fn with_trace_config(self, config: sdk::trace::Config) -> Self { + PipelineBuilder { + config: Some(config), + ..self + } + } + + /// Install a Jaeger pipeline with the recommended defaults. + pub fn install(self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { + let tracer_provider = self.build()?; + let tracer = + tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION"))); + + let provider_guard = global::set_tracer_provider(tracer_provider); + + Ok((tracer, Uninstall(provider_guard))) + } + + /// Build a configured `sdk::trace::TracerProvider` with the recommended defaults. + pub fn build(mut self) -> Result { + let config = self.config.take(); + let exporter = self.init_exporter()?; + + let mut builder = sdk::trace::TracerProvider::builder().with_exporter(exporter); + + if let Some(config) = config { + builder = builder.with_config(config) + } + + Ok(builder.build()) + } + + /// Initialize a new exporter. + /// + /// This is useful if you are manually constructing a pipeline. + pub fn init_exporter(self) -> Result { + let export_instrumentation_lib = self.export_instrument_library; + let (process, uploader) = self.init_uploader()?; + + Ok(Exporter { + process: process.into(), + export_instrumentation_lib, + uploader, + }) + } + + #[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))] + fn init_uploader(self) -> Result<(Process, BatchUploader), TraceError> { + let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice()) + .map_err::(Into::into)?; + Ok((self.process, BatchUploader::Agent(agent))) + } + + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + fn init_uploader(self) -> Result<(Process, uploader::BatchUploader), TraceError> { + if let Some(collector_endpoint) = self.collector_endpoint { + let collector = CollectorAsyncClientHttp::new( + collector_endpoint, + self.collector_username, + self.collector_password, + ) + .map_err::(Into::into)?; + Ok((self.process, uploader::BatchUploader::Collector(collector))) + } else { + let endpoint = self.agent_endpoint.as_slice(); + let agent = AgentAsyncClientUDP::new(endpoint).map_err::(Into::into)?; + Ok((self.process, BatchUploader::Agent(agent))) + } + } +} + +#[rustfmt::skip] +impl Into for KeyValue { + fn into(self) -> jaeger::Tag { + let KeyValue { key, value } = self; + match value { + Value::String(s) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(s.into()), None, None, None, None), + Value::F64(f) => jaeger::Tag::new(key.into(), jaeger::TagType::Double, None, Some(f.into()), None, None, None), + Value::Bool(b) => jaeger::Tag::new(key.into(), jaeger::TagType::Bool, None, None, Some(b), None, None), + Value::I64(i) => jaeger::Tag::new(key.into(), jaeger::TagType::Long, None, None, None, Some(i), None), + // TODO: better Array handling, jaeger thrift doesn't support arrays + v @ Value::Array(_) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(v.to_string()), None, None, None, None), + } + } +} + +impl Into for Event { + fn into(self) -> jaeger::Log { + let timestamp = self + .timestamp + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_micros() as i64; + let mut event_set_via_attribute = false; + let mut fields = self + .attributes + .into_iter() + .map(|attr| { + if attr.key.as_str() == "event" { + event_set_via_attribute = true; + }; + attr.into() + }) + .collect::>(); + + if !event_set_via_attribute { + fields.push(Key::new("event").string(self.name).into()); + } + + jaeger::Log::new(timestamp, fields) + } +} + +fn links_to_references(links: sdk::trace::EvictedQueue) -> Option> { + if !links.is_empty() { + let refs = links + .iter() + .map(|link| { + let span_context = link.span_context(); + let trace_id = span_context.trace_id().to_u128(); + let trace_id_high = (trace_id >> 64) as i64; + let trace_id_low = trace_id as i64; + + // TODO: properly set the reference type when specs are defined + // see https://github.com/open-telemetry/opentelemetry-specification/issues/65 + jaeger::SpanRef::new( + jaeger::SpanRefType::ChildOf, + trace_id_low, + trace_id_high, + span_context.span_id().to_u64() as i64, + ) + }) + .collect(); + Some(refs) + } else { + None + } +} + +/// Convert spans to jaeger thrift span for exporting. +fn convert_otel_span_into_jaeger_span( + span: trace::SpanData, + export_instrument_lib: bool, +) -> jaeger::Span { + let trace_id = span.span_context.trace_id().to_u128(); + let trace_id_high = (trace_id >> 64) as i64; + let trace_id_low = trace_id as i64; + jaeger::Span { + trace_id_low, + trace_id_high, + span_id: span.span_context.span_id().to_u64() as i64, + parent_span_id: span.parent_span_id.to_u64() as i64, + operation_name: span.name, + references: links_to_references(span.links), + flags: span.span_context.trace_flags() as i32, + start_time: span + .start_time + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_micros() as i64, + duration: span + .end_time + .duration_since(span.start_time) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_micros() as i64, + tags: build_span_tags( + span.attributes, + if export_instrument_lib { + Some(span.instrumentation_lib) + } else { + None + }, + span.status_code, + span.status_message, + span.span_kind, + ), + logs: events_to_logs(span.message_events), + } +} + +fn build_process_tags( + span_data: &trace::SpanData, +) -> Option + '_> { + if span_data.resource.is_empty() { + None + } else { + Some( + span_data + .resource + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone()).into()), + ) + } +} + +fn build_span_tags( + attrs: sdk::trace::EvictedHashMap, + instrumentation_lib: Option, + status_code: StatusCode, + status_message: String, + kind: SpanKind, +) -> Option> { + let mut user_overrides = UserOverrides::default(); + // TODO determine if namespacing is required to avoid collisions with set attributes + let mut tags = attrs + .into_iter() + .map(|(k, v)| { + user_overrides.record_attr(k.as_str()); + KeyValue::new(k, v).into() + }) + .collect::>(); + + if let Some(instrumentation_lib) = instrumentation_lib { + // Set instrument library tags + tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_NAME, instrumentation_lib.name).into()); + if let Some(version) = instrumentation_lib.version { + tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_VERSION, version).into()) + } + } + + // Ensure error status is set + if status_code == StatusCode::Error && !user_overrides.error { + tags.push(Key::new(ERROR).bool(true).into()) + } + + if !user_overrides.span_kind { + tags.push(Key::new(SPAN_KIND).string(kind.to_string()).into()); + } + + if !user_overrides.status_code { + tags.push(KeyValue::new(STATUS_CODE, status_code as i64).into()); + } + + if !user_overrides.status_message { + tags.push(Key::new(STATUS_MESSAGE).string(status_message).into()); + } + + Some(tags) +} + +const ERROR: &str = "error"; +const SPAN_KIND: &str = "span.kind"; +const STATUS_CODE: &str = "status.code"; +const STATUS_MESSAGE: &str = "status.message"; + +#[derive(Default)] +struct UserOverrides { + error: bool, + span_kind: bool, + status_code: bool, + status_message: bool, +} + +impl UserOverrides { + fn record_attr(&mut self, attr: &str) { + match attr { + ERROR => self.error = true, + SPAN_KIND => self.span_kind = true, + STATUS_CODE => self.status_code = true, + STATUS_MESSAGE => self.status_message = true, + _ => (), + } + } +} + +fn events_to_logs(events: sdk::trace::EvictedQueue) -> Option> { + if events.is_empty() { + None + } else { + Some(events.into_iter().map(Into::into).collect()) + } +} + +/// Wrap type for errors from opentelemetry jaeger +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Error from thrift agents. + #[error("thrift agent failed with {0}")] + ThriftAgentError(#[from] ::thrift::Error), +} + +impl ExportError for Error { + fn exporter_name(&self) -> &'static str { + "jaeger" + } +} diff --git a/opentelemetry-jaeger/src/thrift/agent.rs b/opentelemetry-jaeger/src/exporter/thrift/agent.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/agent.rs rename to opentelemetry-jaeger/src/exporter/thrift/agent.rs diff --git a/opentelemetry-jaeger/src/thrift/jaeger.rs b/opentelemetry-jaeger/src/exporter/thrift/jaeger.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/jaeger.rs rename to opentelemetry-jaeger/src/exporter/thrift/jaeger.rs diff --git a/opentelemetry-jaeger/src/thrift/mod.rs b/opentelemetry-jaeger/src/exporter/thrift/mod.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/mod.rs rename to opentelemetry-jaeger/src/exporter/thrift/mod.rs diff --git a/opentelemetry-jaeger/src/thrift/zipkincore.rs b/opentelemetry-jaeger/src/exporter/thrift/zipkincore.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/zipkincore.rs rename to opentelemetry-jaeger/src/exporter/thrift/zipkincore.rs diff --git a/opentelemetry-jaeger/src/transport/buffer.rs b/opentelemetry-jaeger/src/exporter/transport/buffer.rs similarity index 100% rename from opentelemetry-jaeger/src/transport/buffer.rs rename to opentelemetry-jaeger/src/exporter/transport/buffer.rs diff --git a/opentelemetry-jaeger/src/transport/mod.rs b/opentelemetry-jaeger/src/exporter/transport/mod.rs similarity index 100% rename from opentelemetry-jaeger/src/transport/mod.rs rename to opentelemetry-jaeger/src/exporter/transport/mod.rs diff --git a/opentelemetry-jaeger/src/transport/noop.rs b/opentelemetry-jaeger/src/exporter/transport/noop.rs similarity index 100% rename from opentelemetry-jaeger/src/transport/noop.rs rename to opentelemetry-jaeger/src/exporter/transport/noop.rs diff --git a/opentelemetry-jaeger/src/uploader.rs b/opentelemetry-jaeger/src/exporter/uploader.rs similarity index 94% rename from opentelemetry-jaeger/src/uploader.rs rename to opentelemetry-jaeger/src/exporter/uploader.rs index 7f6c5743aa..91681d0b28 100644 --- a/opentelemetry-jaeger/src/uploader.rs +++ b/opentelemetry-jaeger/src/exporter/uploader.rs @@ -1,7 +1,7 @@ //! # Jaeger Span Uploader #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -use crate::collector; -use crate::{agent, jaeger}; +use crate::exporter::collector; +use crate::exporter::{agent, jaeger}; use opentelemetry::sdk::export::trace; /// Uploads a batch of spans to Jaeger diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index 89f24ecc27..17d55cc045 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -21,8 +21,10 @@ //! //! ```no_run //! use opentelemetry::trace::Tracer; +//! use opentelemetry::global; //! //! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().install()?; //! //! tracer.in_span("doing_work", |cx| { @@ -60,8 +62,10 @@ //! //! ```no_run //! use opentelemetry::trace::{Tracer, TraceError}; +//! use opentelemetry::global; //! //! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! // export OTEL_SERVICE_NAME=my-service-name //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?; //! @@ -118,8 +122,10 @@ //! ```no_run //! use opentelemetry::{KeyValue, trace::{Tracer, TraceError}}; //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; +//! use opentelemetry::global; //! //! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() //! .from_env() //! .with_agent_endpoint("localhost:6831") @@ -179,520 +185,8 @@ )] #![cfg_attr(test, deny(warnings))] -mod agent; -#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -mod collector; -#[allow(clippy::all, unreachable_pub, dead_code)] -#[rustfmt::skip] -mod thrift; -mod env; -pub(crate) mod transport; -mod uploader; +mod exporter; +mod propagator; -use self::thrift::jaeger; -use agent::AgentAsyncClientUDP; -use async_trait::async_trait; -#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -use collector::CollectorAsyncClientHttp; -use opentelemetry::sdk::export::ExportError; -use opentelemetry::trace::TraceError; -use opentelemetry::{ - global, sdk, - sdk::export::trace, - trace::{Event, Link, SpanKind, StatusCode, TracerProvider}, - Key, KeyValue, Value, -}; -use std::{ - net, - time::{Duration, SystemTime}, -}; -use uploader::BatchUploader; - -/// Default service name if no service is configured. -const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; - -/// Default agent endpoint if none is provided -const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; - -/// Instrument Library name MUST be reported in Jaeger Span tags with the following key -const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; - -/// Instrument Library version MUST be reported in Jaeger Span tags with the following key -const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; - -/// Create a new Jaeger exporter pipeline builder. -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} - -/// Guard that uninstalls the Jaeger trace pipeline when dropped -#[must_use] -#[derive(Debug)] -pub struct Uninstall(global::TracerProviderGuard); - -/// Jaeger span exporter -#[derive(Debug)] -pub struct Exporter { - process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: uploader::BatchUploader, -} - -/// Jaeger process configuration -#[derive(Debug, Default)] -pub struct Process { - /// Jaeger service name - pub service_name: String, - /// Jaeger tags - pub tags: Vec, -} - -impl Into for Process { - fn into(self) -> jaeger::Process { - jaeger::Process::new( - self.service_name, - Some(self.tags.into_iter().map(Into::into).collect()), - ) - } -} - -#[async_trait] -impl trace::SpanExporter for Exporter { - /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let mut process = self.process.clone(); - - for (idx, span) in batch.into_iter().enumerate() { - if idx == 0 { - if let Some(span_process_tags) = build_process_tags(&span) { - if let Some(process_tags) = &mut process.tags { - process_tags.extend(span_process_tags); - } else { - process.tags = Some(span_process_tags.collect()) - } - } - } - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); - } - - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await - } -} - -/// Jaeger exporter builder -#[derive(Debug)] -pub struct PipelineBuilder { - agent_endpoint: Vec, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_endpoint: Option, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_username: Option, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_password: Option, - export_instrument_library: bool, - process: Process, - config: Option, -} - -impl Default for PipelineBuilder { - /// Return the default Exporter Builder. - fn default() -> Self { - PipelineBuilder { - agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_endpoint: None, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_username: None, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_password: None, - export_instrument_library: true, - process: Process { - service_name: DEFAULT_SERVICE_NAME.to_string(), - tags: Vec::new(), - }, - config: None, - } - } -} - -impl PipelineBuilder { - /// Assign builder attributes from environment variables. - /// - /// See the [jaeger variable spec] for full list. - /// - /// [jaeger variable spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter - #[allow(clippy::wrong_self_convention)] - pub fn from_env(self) -> Self { - env::assign_attrs(self) - } - - /// Assign the agent endpoint. - pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self { - PipelineBuilder { - agent_endpoint: agent_endpoint - .to_socket_addrs() - .map(|addrs| addrs.collect()) - .unwrap_or_default(), - - ..self - } - } - - /// Config whether to export information of instrumentation library. - pub fn with_instrumentation_library_tags(self, export: bool) -> Self { - PipelineBuilder { - export_instrument_library: export, - ..self - } - } - - /// Assign the collector endpoint. - /// - /// E.g. "http://localhost:14268/api/traces" - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(cfg(any(feature = "collector_client", feature = "wasm_collector_client"))) - )] - pub fn with_collector_endpoint(self, collector_endpoint: T) -> Self - where - http::Uri: core::convert::TryFrom, - { - PipelineBuilder { - collector_endpoint: core::convert::TryFrom::try_from(collector_endpoint).ok(), - ..self - } - } - - /// Assign the collector username - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn with_collector_username>(self, collector_username: S) -> Self { - PipelineBuilder { - collector_username: Some(collector_username.into()), - ..self - } - } - - /// Assign the collector password - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn with_collector_password>(self, collector_password: S) -> Self { - PipelineBuilder { - collector_password: Some(collector_password.into()), - ..self - } - } - - /// Assign the process service name. - pub fn with_service_name>(mut self, service_name: T) -> Self { - self.process.service_name = service_name.into(); - self - } - - /// Assign the process service tags. - pub fn with_tags>(mut self, tags: T) -> Self { - self.process.tags = tags.into_iter().collect(); - self - } - - /// Assign the SDK config for the exporter pipeline. - pub fn with_trace_config(self, config: sdk::trace::Config) -> Self { - PipelineBuilder { - config: Some(config), - ..self - } - } - - /// Install a Jaeger pipeline with the recommended defaults. - pub fn install(self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { - let tracer_provider = self.build()?; - let tracer = - tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION"))); - - let provider_guard = global::set_tracer_provider(tracer_provider); - - Ok((tracer, Uninstall(provider_guard))) - } - - /// Build a configured `sdk::trace::TracerProvider` with the recommended defaults. - pub fn build(mut self) -> Result { - let config = self.config.take(); - let exporter = self.init_exporter()?; - - let mut builder = sdk::trace::TracerProvider::builder().with_exporter(exporter); - - if let Some(config) = config { - builder = builder.with_config(config) - } - - Ok(builder.build()) - } - - /// Initialize a new exporter. - /// - /// This is useful if you are manually constructing a pipeline. - pub fn init_exporter(self) -> Result { - let export_instrumentation_lib = self.export_instrument_library; - let (process, uploader) = self.init_uploader()?; - - Ok(Exporter { - process: process.into(), - export_instrumentation_lib, - uploader, - }) - } - - #[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))] - fn init_uploader(self) -> Result<(Process, BatchUploader), TraceError> { - let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice()) - .map_err::(Into::into)?; - Ok((self.process, BatchUploader::Agent(agent))) - } - - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - fn init_uploader(self) -> Result<(Process, uploader::BatchUploader), TraceError> { - if let Some(collector_endpoint) = self.collector_endpoint { - let collector = CollectorAsyncClientHttp::new( - collector_endpoint, - self.collector_username, - self.collector_password, - ) - .map_err::(Into::into)?; - Ok((self.process, uploader::BatchUploader::Collector(collector))) - } else { - let endpoint = self.agent_endpoint.as_slice(); - let agent = AgentAsyncClientUDP::new(endpoint).map_err::(Into::into)?; - Ok((self.process, BatchUploader::Agent(agent))) - } - } -} - -#[rustfmt::skip] -impl Into for KeyValue { - fn into(self) -> jaeger::Tag { - let KeyValue { key, value } = self; - match value { - Value::String(s) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(s.into()), None, None, None, None), - Value::F64(f) => jaeger::Tag::new(key.into(), jaeger::TagType::Double, None, Some(f.into()), None, None, None), - Value::Bool(b) => jaeger::Tag::new(key.into(), jaeger::TagType::Bool, None, None, Some(b), None, None), - Value::I64(i) => jaeger::Tag::new(key.into(), jaeger::TagType::Long, None, None, None, Some(i), None), - // TODO: better Array handling, jaeger thrift doesn't support arrays - v @ Value::Array(_) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(v.to_string()), None, None, None, None), - } - } -} - -impl Into for Event { - fn into(self) -> jaeger::Log { - let timestamp = self - .timestamp - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_micros() as i64; - let mut event_set_via_attribute = false; - let mut fields = self - .attributes - .into_iter() - .map(|attr| { - if attr.key.as_str() == "event" { - event_set_via_attribute = true; - }; - attr.into() - }) - .collect::>(); - - if !event_set_via_attribute { - fields.push(Key::new("event").string(self.name).into()); - } - - jaeger::Log::new(timestamp, fields) - } -} - -fn links_to_references(links: sdk::trace::EvictedQueue) -> Option> { - if !links.is_empty() { - let refs = links - .iter() - .map(|link| { - let span_context = link.span_context(); - let trace_id = span_context.trace_id().to_u128(); - let trace_id_high = (trace_id >> 64) as i64; - let trace_id_low = trace_id as i64; - - // TODO: properly set the reference type when specs are defined - // see https://github.com/open-telemetry/opentelemetry-specification/issues/65 - jaeger::SpanRef::new( - jaeger::SpanRefType::ChildOf, - trace_id_low, - trace_id_high, - span_context.span_id().to_u64() as i64, - ) - }) - .collect(); - Some(refs) - } else { - None - } -} - -/// Convert spans to jaeger thrift span for exporting. -fn convert_otel_span_into_jaeger_span( - span: trace::SpanData, - export_instrument_lib: bool, -) -> jaeger::Span { - let trace_id = span.span_context.trace_id().to_u128(); - let trace_id_high = (trace_id >> 64) as i64; - let trace_id_low = trace_id as i64; - jaeger::Span { - trace_id_low, - trace_id_high, - span_id: span.span_context.span_id().to_u64() as i64, - parent_span_id: span.parent_span_id.to_u64() as i64, - operation_name: span.name, - references: links_to_references(span.links), - flags: span.span_context.trace_flags() as i32, - start_time: span - .start_time - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_micros() as i64, - duration: span - .end_time - .duration_since(span.start_time) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_micros() as i64, - tags: build_span_tags( - span.attributes, - if export_instrument_lib { - Some(span.instrumentation_lib) - } else { - None - }, - span.status_code, - span.status_message, - span.span_kind, - ), - logs: events_to_logs(span.message_events), - } -} - -fn build_process_tags( - span_data: &trace::SpanData, -) -> Option + '_> { - if span_data.resource.is_empty() { - None - } else { - Some( - span_data - .resource - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone()).into()), - ) - } -} - -fn build_span_tags( - attrs: sdk::trace::EvictedHashMap, - instrumentation_lib: Option, - status_code: StatusCode, - status_message: String, - kind: SpanKind, -) -> Option> { - let mut user_overrides = UserOverrides::default(); - // TODO determine if namespacing is required to avoid collisions with set attributes - let mut tags = attrs - .into_iter() - .map(|(k, v)| { - user_overrides.record_attr(k.as_str()); - KeyValue::new(k, v).into() - }) - .collect::>(); - - if let Some(instrumentation_lib) = instrumentation_lib { - // Set instrument library tags - tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_NAME, instrumentation_lib.name).into()); - if let Some(version) = instrumentation_lib.version { - tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_VERSION, version).into()) - } - } - - // Ensure error status is set - if status_code == StatusCode::Error && !user_overrides.error { - tags.push(Key::new(ERROR).bool(true).into()) - } - - if !user_overrides.span_kind { - tags.push(Key::new(SPAN_KIND).string(kind.to_string()).into()); - } - - if !user_overrides.status_code { - tags.push(KeyValue::new(STATUS_CODE, status_code as i64).into()); - } - - if !user_overrides.status_message { - tags.push(Key::new(STATUS_MESSAGE).string(status_message).into()); - } - - Some(tags) -} - -const ERROR: &str = "error"; -const SPAN_KIND: &str = "span.kind"; -const STATUS_CODE: &str = "status.code"; -const STATUS_MESSAGE: &str = "status.message"; - -#[derive(Default)] -struct UserOverrides { - error: bool, - span_kind: bool, - status_code: bool, - status_message: bool, -} - -impl UserOverrides { - fn record_attr(&mut self, attr: &str) { - match attr { - ERROR => self.error = true, - SPAN_KIND => self.span_kind = true, - STATUS_CODE => self.status_code = true, - STATUS_MESSAGE => self.status_message = true, - _ => (), - } - } -} - -fn events_to_logs(events: sdk::trace::EvictedQueue) -> Option> { - if events.is_empty() { - None - } else { - Some(events.into_iter().map(Into::into).collect()) - } -} - -/// Wrap type for errors from opentelemetry jaeger -#[derive(thiserror::Error, Debug)] -pub enum Error { - /// Error from thrift agents. - #[error("thrift agent failed with {0}")] - ThriftAgentError(#[from] ::thrift::Error), -} - -impl ExportError for Error { - fn exporter_name(&self) -> &'static str { - "jaeger" - } -} +pub use exporter::{new_pipeline, Error, Exporter, PipelineBuilder, Process, Uninstall}; +pub use propagator::Propagator; diff --git a/opentelemetry-contrib/src/trace/propagator/jaeger.rs b/opentelemetry-jaeger/src/propagator/mod.rs similarity index 95% rename from opentelemetry-contrib/src/trace/propagator/jaeger.rs rename to opentelemetry-jaeger/src/propagator/mod.rs index 86eb2e7dcb..bb46b47d1b 100644 --- a/opentelemetry-contrib/src/trace/propagator/jaeger.rs +++ b/opentelemetry-jaeger/src/propagator/mod.rs @@ -32,20 +32,20 @@ lazy_static::lazy_static! { /// /// [`Jaeger documentation`]: https://www.jaegertracing.io/docs/1.18/client-libraries/#propagation-format #[derive(Clone, Debug)] -pub struct JaegerPropagator { +pub struct Propagator { _private: (), } -impl Default for JaegerPropagator { +impl Default for Propagator { fn default() -> Self { - JaegerPropagator { _private: () } + Propagator { _private: () } } } -impl JaegerPropagator { +impl Propagator { /// Create a Jaeger propagator pub fn new() -> Self { - JaegerPropagator::default() + Propagator::default() } /// Extract span context from header value @@ -135,7 +135,7 @@ impl JaegerPropagator { } } -impl TextMapPropagator for JaegerPropagator { +impl TextMapPropagator for Propagator { fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) { let span_context = cx.span().span_context(); if span_context.is_valid() { @@ -300,7 +300,7 @@ mod tests { #[test] fn test_extract_empty() { let map: HashMap = HashMap::new(); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -316,7 +316,7 @@ mod tests { JAEGER_HEADER, format!("{}:{}:0:{}", trace_id, span_id, flag), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!(context.remote_span_context(), Some(&expected)); } @@ -329,7 +329,7 @@ mod tests { JAEGER_HEADER, format!("{}:{}:0:1:aa", LONG_TRACE_ID_STR, SPAN_ID_STR), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -344,7 +344,7 @@ mod tests { JAEGER_HEADER, format!("{}:{}:0:aa", LONG_TRACE_ID_STR, SPAN_ID_STR), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -359,7 +359,7 @@ mod tests { JAEGER_HEADER, format!("{}%3A{}%3A0%3A1", LONG_TRACE_ID_STR, SPAN_ID_STR), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -374,7 +374,7 @@ mod tests { } #[test] fn test_inject() { - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); for (span_context, header_value) in get_inject_data() { let mut injector = HashMap::new(); propagator.inject_context( diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index 08ed750ca3..483698ab16 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -39,3 +39,4 @@ thiserror = { version = "1.0"} [dev-dependencies] isahc = "=0.9.6" +opentelemetry = { version = "0.10", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } diff --git a/opentelemetry-zipkin/README.md b/opentelemetry-zipkin/README.md index 6b9aa416d7..933f3aec0b 100644 --- a/opentelemetry-zipkin/README.md +++ b/opentelemetry-zipkin/README.md @@ -43,8 +43,10 @@ telemetry: ```rust use opentelemetry::trace::Tracer; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline().install()?; tracer.in_span("doing_work", |cx| { @@ -98,6 +100,7 @@ Example showing how to override all configuration options. See the use opentelemetry::{KeyValue, trace::Tracer}; use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; use opentelemetry::sdk::export::trace::{ExportResult, HttpClient}; +use opentelemetry::global; use async_trait::async_trait; use std::error::Error; @@ -121,6 +124,7 @@ impl HttpClient for IsahcClient { } fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline() .with_http_client(IsahcClient(isahc::HttpClient::new()?)) .with_service_name("my_app") diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs new file mode 100644 index 0000000000..e8fc6e23d2 --- /dev/null +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -0,0 +1,189 @@ +mod model; +mod uploader; + +use async_trait::async_trait; +use http::Uri; +use model::endpoint::Endpoint; +use opentelemetry::{ + global, sdk, + sdk::export::{ + trace::{self, HttpClient}, + ExportError, + }, + trace::{TraceError, TracerProvider}, +}; +use std::net::SocketAddr; + +/// Default Zipkin collector endpoint +const DEFAULT_COLLECTOR_ENDPOINT: &str = "http://127.0.0.1:9411/api/v2/spans"; + +/// Default service name if no service is configured. +const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; + +/// Zipkin span exporter +#[derive(Debug)] +pub struct Exporter { + local_endpoint: Endpoint, + uploader: uploader::Uploader, +} + +impl Exporter { + fn new(local_endpoint: Endpoint, client: Box, collector_endpoint: Uri) -> Self { + Exporter { + local_endpoint, + uploader: uploader::Uploader::new(client, collector_endpoint), + } + } +} + +/// Create a new Zipkin exporter pipeline builder. +pub fn new_pipeline() -> ZipkinPipelineBuilder { + ZipkinPipelineBuilder::default() +} + +/// Builder for `ExporterConfig` struct. +#[derive(Debug)] +pub struct ZipkinPipelineBuilder { + service_name: String, + service_addr: Option, + collector_endpoint: String, + trace_config: Option, + client: Option>, +} + +impl Default for ZipkinPipelineBuilder { + fn default() -> Self { + ZipkinPipelineBuilder { + #[cfg(feature = "reqwest-blocking-client")] + client: Some(Box::new(reqwest::blocking::Client::new())), + #[cfg(all( + not(feature = "reqwest-blocking-client"), + not(feature = "surf-client"), + feature = "reqwest-client" + ))] + client: Some(Box::new(reqwest::Client::new())), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "surf-client" + ))] + client: Some(Box::new(surf::Client::new())), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "surf-client"), + not(feature = "reqwest-blocking-client") + ))] + client: None, + + service_name: DEFAULT_SERVICE_NAME.to_string(), + service_addr: None, + collector_endpoint: DEFAULT_COLLECTOR_ENDPOINT.to_string(), + trace_config: None, + } + } +} + +impl ZipkinPipelineBuilder { + /// Create `ExporterConfig` struct from current `ExporterConfigBuilder` + pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { + if let Some(client) = self.client { + let endpoint = Endpoint::new(self.service_name, self.service_addr); + let exporter = Exporter::new( + endpoint, + client, + self.collector_endpoint + .parse() + .map_err::(Into::into)?, + ); + + let mut provider_builder = + sdk::trace::TracerProvider::builder().with_exporter(exporter); + if let Some(config) = self.trace_config.take() { + provider_builder = provider_builder.with_config(config); + } + let provider = provider_builder.build(); + let tracer = + provider.get_tracer("opentelemetry-zipkin", Some(env!("CARGO_PKG_VERSION"))); + let provider_guard = global::set_tracer_provider(provider); + + Ok((tracer, Uninstall(provider_guard))) + } else { + Err(Error::NoHttpClient.into()) + } + } + + /// Assign the service name under which to group traces. + pub fn with_service_name>(mut self, name: T) -> Self { + self.service_name = name.into(); + self + } + + /// Assign client implementation + pub fn with_http_client(mut self, client: T) -> Self { + self.client = Some(Box::new(client)); + self + } + + /// Assign the service name under which to group traces. + pub fn with_service_address(mut self, addr: SocketAddr) -> Self { + self.service_addr = Some(addr); + self + } + + /// Assign the Zipkin collector endpoint + pub fn with_collector_endpoint>(mut self, endpoint: T) -> Self { + self.collector_endpoint = endpoint.into(); + self + } + + /// Assign the SDK trace configuration. + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.trace_config = Some(config); + self + } +} + +#[async_trait] +impl trace::SpanExporter for Exporter { + /// Export spans to Zipkin collector. + async fn export(&mut self, batch: Vec) -> trace::ExportResult { + let zipkin_spans = batch + .into_iter() + .map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span)) + .collect(); + + self.uploader.upload(zipkin_spans).await + } +} + +/// Uninstalls the Zipkin pipeline on drop. +#[must_use] +#[derive(Debug)] +pub struct Uninstall(global::TracerProviderGuard); + +/// Wrap type for errors from opentelemetry zipkin +#[derive(thiserror::Error, Debug)] +#[non_exhaustive] +pub enum Error { + /// No http client implementation found. User should provide one or enable features. + #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")] + NoHttpClient, + + /// Http requests failed + #[error("http request failed with {0}")] + RequestFailed(#[from] http::Error), + + /// The uri provided is invalid + #[error("invalid uri")] + InvalidUri(#[from] http::uri::InvalidUri), + + /// Other errors + #[error("export error: {0}")] + Other(String), +} + +impl ExportError for Error { + fn exporter_name(&self) -> &'static str { + "zipkin" + } +} diff --git a/opentelemetry-zipkin/src/model/annotation.rs b/opentelemetry-zipkin/src/exporter/model/annotation.rs similarity index 94% rename from opentelemetry-zipkin/src/model/annotation.rs rename to opentelemetry-zipkin/src/exporter/model/annotation.rs index 451aa03057..2f7bbb2f4c 100644 --- a/opentelemetry-zipkin/src/model/annotation.rs +++ b/opentelemetry-zipkin/src/exporter/model/annotation.rs @@ -13,7 +13,7 @@ pub struct Annotation { #[cfg(test)] mod tests { - use crate::model::annotation::Annotation; + use crate::exporter::model::annotation::Annotation; #[test] fn test_empty() { diff --git a/opentelemetry-zipkin/src/model/endpoint.rs b/opentelemetry-zipkin/src/exporter/model/endpoint.rs similarity index 97% rename from opentelemetry-zipkin/src/model/endpoint.rs rename to opentelemetry-zipkin/src/exporter/model/endpoint.rs index 66612057b2..a70e4feb3f 100644 --- a/opentelemetry-zipkin/src/model/endpoint.rs +++ b/opentelemetry-zipkin/src/exporter/model/endpoint.rs @@ -38,7 +38,7 @@ impl Endpoint { #[cfg(test)] mod tests { - use crate::model::endpoint::Endpoint; + use crate::exporter::model::endpoint::Endpoint; use std::net::Ipv4Addr; #[test] diff --git a/opentelemetry-zipkin/src/model/mod.rs b/opentelemetry-zipkin/src/exporter/model/mod.rs similarity index 100% rename from opentelemetry-zipkin/src/model/mod.rs rename to opentelemetry-zipkin/src/exporter/model/mod.rs diff --git a/opentelemetry-zipkin/src/model/span.rs b/opentelemetry-zipkin/src/exporter/model/span.rs similarity index 94% rename from opentelemetry-zipkin/src/model/span.rs rename to opentelemetry-zipkin/src/exporter/model/span.rs index 3f34efe03e..55e9825b46 100644 --- a/opentelemetry-zipkin/src/model/span.rs +++ b/opentelemetry-zipkin/src/exporter/model/span.rs @@ -1,4 +1,4 @@ -use crate::model::{annotation::Annotation, endpoint::Endpoint}; +use crate::exporter::model::{annotation::Annotation, endpoint::Endpoint}; use serde::Serialize; use std::collections::HashMap; @@ -55,9 +55,9 @@ pub(crate) struct Span { #[cfg(test)] mod tests { - use crate::model::annotation::Annotation; - use crate::model::endpoint::Endpoint; - use crate::model::span::{Kind, Span}; + use crate::exporter::model::annotation::Annotation; + use crate::exporter::model::endpoint::Endpoint; + use crate::exporter::model::span::{Kind, Span}; use std::collections::HashMap; use std::net::Ipv4Addr; diff --git a/opentelemetry-zipkin/src/uploader.rs b/opentelemetry-zipkin/src/exporter/uploader.rs similarity index 94% rename from opentelemetry-zipkin/src/uploader.rs rename to opentelemetry-zipkin/src/exporter/uploader.rs index c89392ab80..27751e3a12 100644 --- a/opentelemetry-zipkin/src/uploader.rs +++ b/opentelemetry-zipkin/src/exporter/uploader.rs @@ -1,6 +1,6 @@ //! # Zipkin Span Exporter -use crate::model::span::Span; -use crate::Error; +use crate::exporter::model::span::Span; +use crate::exporter::Error; use http::{header::CONTENT_TYPE, Method, Request, Uri}; use opentelemetry::sdk::export::trace::{ExportResult, HttpClient}; use std::fmt::Debug; diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index e41dec640e..737359f786 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -22,8 +22,10 @@ //! //! ```no_run //! use opentelemetry::trace::{Tracer, TraceError}; +//! use opentelemetry::global; //! //! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline().install()?; //! //! tracer.in_span("doing_work", |cx| { @@ -78,6 +80,7 @@ //! use opentelemetry::{KeyValue, trace::Tracer}; //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; //! use opentelemetry::sdk::export::trace::{ExportResult, HttpClient}; +//! use opentelemetry::global; //! use async_trait::async_trait; //! use std::error::Error; //! @@ -101,6 +104,7 @@ //! } //! //! fn main() -> Result<(), Box> { +//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline() //! .with_http_client(IsahcClient(isahc::HttpClient::new()?)) //! .with_service_name("my_app") @@ -165,192 +169,8 @@ #[macro_use] extern crate typed_builder; -mod model; -mod uploader; +mod exporter; +mod propagator; -use async_trait::async_trait; -use http::Uri; -use model::endpoint::Endpoint; -use opentelemetry::{ - global, sdk, - sdk::export::{ - trace::{self, HttpClient}, - ExportError, - }, - trace::{TraceError, TracerProvider}, -}; -use std::net::SocketAddr; - -/// Default Zipkin collector endpoint -const DEFAULT_COLLECTOR_ENDPOINT: &str = "http://127.0.0.1:9411/api/v2/spans"; - -/// Default service name if no service is configured. -const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; - -/// Zipkin span exporter -#[derive(Debug)] -pub struct Exporter { - local_endpoint: Endpoint, - uploader: uploader::Uploader, -} - -impl Exporter { - fn new(local_endpoint: Endpoint, client: Box, collector_endpoint: Uri) -> Self { - Exporter { - local_endpoint, - uploader: uploader::Uploader::new(client, collector_endpoint), - } - } -} - -/// Create a new Zipkin exporter pipeline builder. -pub fn new_pipeline() -> ZipkinPipelineBuilder { - ZipkinPipelineBuilder::default() -} - -/// Builder for `ExporterConfig` struct. -#[derive(Debug)] -pub struct ZipkinPipelineBuilder { - service_name: String, - service_addr: Option, - collector_endpoint: String, - trace_config: Option, - client: Option>, -} - -impl Default for ZipkinPipelineBuilder { - fn default() -> Self { - ZipkinPipelineBuilder { - #[cfg(feature = "reqwest-blocking-client")] - client: Some(Box::new(reqwest::blocking::Client::new())), - #[cfg(all( - not(feature = "reqwest-blocking-client"), - not(feature = "surf-client"), - feature = "reqwest-client" - ))] - client: Some(Box::new(reqwest::Client::new())), - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client"), - feature = "surf-client" - ))] - client: Some(Box::new(surf::Client::new())), - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "surf-client"), - not(feature = "reqwest-blocking-client") - ))] - client: None, - - service_name: DEFAULT_SERVICE_NAME.to_string(), - service_addr: None, - collector_endpoint: DEFAULT_COLLECTOR_ENDPOINT.to_string(), - trace_config: None, - } - } -} - -impl ZipkinPipelineBuilder { - /// Create `ExporterConfig` struct from current `ExporterConfigBuilder` - pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { - if let Some(client) = self.client { - let endpoint = Endpoint::new(self.service_name, self.service_addr); - let exporter = Exporter::new( - endpoint, - client, - self.collector_endpoint - .parse() - .map_err::(Into::into)?, - ); - - let mut provider_builder = - sdk::trace::TracerProvider::builder().with_exporter(exporter); - if let Some(config) = self.trace_config.take() { - provider_builder = provider_builder.with_config(config); - } - let provider = provider_builder.build(); - let tracer = - provider.get_tracer("opentelemetry-zipkin", Some(env!("CARGO_PKG_VERSION"))); - let provider_guard = global::set_tracer_provider(provider); - - Ok((tracer, Uninstall(provider_guard))) - } else { - Err(Error::NoHttpClient.into()) - } - } - - /// Assign the service name under which to group traces. - pub fn with_service_name>(mut self, name: T) -> Self { - self.service_name = name.into(); - self - } - - /// Assign client implementation - pub fn with_http_client(mut self, client: T) -> Self { - self.client = Some(Box::new(client)); - self - } - - /// Assign the service name under which to group traces. - pub fn with_service_address(mut self, addr: SocketAddr) -> Self { - self.service_addr = Some(addr); - self - } - - /// Assign the Zipkin collector endpoint - pub fn with_collector_endpoint>(mut self, endpoint: T) -> Self { - self.collector_endpoint = endpoint.into(); - self - } - - /// Assign the SDK trace configuration. - pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { - self.trace_config = Some(config); - self - } -} - -#[async_trait] -impl trace::SpanExporter for Exporter { - /// Export spans to Zipkin collector. - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let zipkin_spans = batch - .into_iter() - .map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span)) - .collect(); - - self.uploader.upload(zipkin_spans).await - } -} - -/// Uninstalls the Zipkin pipeline on drop. -#[must_use] -#[derive(Debug)] -pub struct Uninstall(global::TracerProviderGuard); - -/// Wrap type for errors from opentelemetry zipkin -#[derive(thiserror::Error, Debug)] -#[non_exhaustive] -pub enum Error { - /// No http client implementation found. User should provide one or enable features. - #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")] - NoHttpClient, - - /// Http requests failed - #[error("http request failed with {0}")] - RequestFailed(#[from] http::Error), - - /// The uri provided is invalid - #[error("invalid uri")] - InvalidUri(#[from] http::uri::InvalidUri), - - /// Other errors - #[error("export error: {0}")] - Other(String), -} - -impl ExportError for Error { - fn exporter_name(&self) -> &'static str { - "zipkin" - } -} +pub use exporter::{new_pipeline, Error, Exporter, Uninstall, ZipkinPipelineBuilder}; +pub use propagator::{B3Encoding, Propagator}; diff --git a/opentelemetry-contrib/src/trace/propagator/b3.rs b/opentelemetry-zipkin/src/propagator/mod.rs similarity index 96% rename from opentelemetry-contrib/src/trace/propagator/b3.rs rename to opentelemetry-zipkin/src/propagator/mod.rs index f52868bb01..9a3f2158c3 100644 --- a/opentelemetry-contrib/src/trace/propagator/b3.rs +++ b/opentelemetry-zipkin/src/propagator/mod.rs @@ -65,27 +65,27 @@ impl B3Encoding { /// Extracts and injects `SpanContext`s into `Extractor`s or `Injector`s using B3 header format. #[derive(Clone, Debug)] -pub struct B3Propagator { +pub struct Propagator { inject_encoding: B3Encoding, } -impl Default for B3Propagator { +impl Default for Propagator { fn default() -> Self { - B3Propagator { + Propagator { inject_encoding: B3Encoding::MultipleHeader, } } } -impl B3Propagator { +impl Propagator { /// Create a new `HttpB3Propagator` that uses multiple headers. pub fn new() -> Self { - B3Propagator::default() + Propagator::default() } /// Create a new `HttpB3Propagator` that uses `encoding` as encoding method pub fn with_encoding(encoding: B3Encoding) -> Self { - B3Propagator { + Propagator { inject_encoding: encoding, } } @@ -207,7 +207,7 @@ impl B3Propagator { } } -impl TextMapPropagator for B3Propagator { +impl TextMapPropagator for Propagator { /// Properly encodes the values of the `Context`'s `SpanContext` and injects /// them into the `Injector`. fn inject_context(&self, context: &Context, injector: &mut dyn Injector) { @@ -466,10 +466,10 @@ mod tests { #[test] fn extract_b3() { - let single_header_propagator = B3Propagator::with_encoding(B3Encoding::SingleHeader); - let multi_header_propagator = B3Propagator::with_encoding(B3Encoding::MultipleHeader); - let single_multi_propagator = B3Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); - let unspecific_header_propagator = B3Propagator::with_encoding(B3Encoding::UnSpecified); + let single_header_propagator = Propagator::with_encoding(B3Encoding::SingleHeader); + let multi_header_propagator = Propagator::with_encoding(B3Encoding::MultipleHeader); + let single_multi_propagator = Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); + let unspecific_header_propagator = Propagator::with_encoding(B3Encoding::UnSpecified); for (header, expected_context) in single_header_extract_data() { let mut extractor: HashMap = HashMap::new(); @@ -552,11 +552,11 @@ mod tests { #[test] fn inject_b3() { - let single_header_propagator = B3Propagator::with_encoding(B3Encoding::SingleHeader); - let multi_header_propagator = B3Propagator::with_encoding(B3Encoding::MultipleHeader); + let single_header_propagator = Propagator::with_encoding(B3Encoding::SingleHeader); + let multi_header_propagator = Propagator::with_encoding(B3Encoding::MultipleHeader); let single_multi_header_propagator = - B3Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); - let unspecified_header_propagator = B3Propagator::with_encoding(B3Encoding::UnSpecified); + Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); + let unspecified_header_propagator = Propagator::with_encoding(B3Encoding::UnSpecified); for (expected_header, context) in single_header_inject_data() { let mut injector = HashMap::new(); @@ -645,10 +645,10 @@ mod tests { #[test] fn test_get_fields() { - let single_header_propagator = B3Propagator::with_encoding(B3Encoding::SingleHeader); - let multi_header_propagator = B3Propagator::with_encoding(B3Encoding::MultipleHeader); + let single_header_propagator = Propagator::with_encoding(B3Encoding::SingleHeader); + let multi_header_propagator = Propagator::with_encoding(B3Encoding::MultipleHeader); let single_multi_header_propagator = - B3Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); + Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); assert_eq!( single_header_propagator.fields().collect::>(), diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 9e6569dc40..dc4f8a93f6 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -53,7 +53,7 @@ default = ["trace"] trace = ["rand", "pin-project", "async-trait", "regex", "percent-encoding", "thiserror"] metrics = ["thiserror", "dashmap", "fnv"] serialize = ["serde", "bincode"] -testing = ["trace", "metrics", "tokio" ] +testing = ["trace", "metrics", "tokio/full" ] [[bench]] name = "trace"