From 5192fba4a978be48b56708eed991babe4e0d8186 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 6 Dec 2020 19:32:52 -0500 Subject: [PATCH] Move 3rd party propagators and merge exporter into sdk::export (#375) --- .github/workflows/ci.yml | 2 +- CONTRIBUTING.md | 4 + README.md | 2 +- examples/aws-xray/Cargo.toml | 2 +- examples/aws-xray/src/client.rs | 5 +- examples/aws-xray/src/server.rs | 5 +- examples/basic-otlp/src/main.rs | 3 +- examples/basic/src/main.rs | 3 +- examples/http/src/client.rs | 2 +- examples/http/src/server.rs | 2 +- examples/stdout.rs | 2 +- opentelemetry-contrib/Cargo.toml | 2 + .../src/trace/exporter/datadog/mod.rs | 8 +- .../src/trace/exporter/datadog/model/mod.rs | 2 +- .../src/trace/exporter/datadog/model/v03.rs | 2 +- .../src/trace/exporter/datadog/model/v05.rs | 4 +- .../src/trace/propagator}/aws.rs | 8 +- .../src/trace/propagator/mod.rs | 8 + opentelemetry-jaeger/Cargo.toml | 4 + opentelemetry-jaeger/README.md | 8 +- .../src/{ => exporter}/agent.rs | 4 +- .../src/{ => exporter}/collector.rs | 4 +- .../src/{ => exporter}/env.rs | 0 opentelemetry-jaeger/src/exporter/mod.rs | 519 +++++++++++++++++ .../src/{ => exporter}/thrift/agent.rs | 0 .../src/{ => exporter}/thrift/jaeger.rs | 0 .../src/{ => exporter}/thrift/mod.rs | 0 .../src/{ => exporter}/thrift/zipkincore.rs | 0 .../src/{ => exporter}/transport/buffer.rs | 0 .../src/{ => exporter}/transport/mod.rs | 0 .../src/{ => exporter}/transport/noop.rs | 0 .../src/{ => exporter}/uploader.rs | 6 +- opentelemetry-jaeger/src/lib.rs | 526 +----------------- .../src/propagator/mod.rs | 30 +- opentelemetry-otlp/src/lib.rs | 2 +- opentelemetry-otlp/src/proto/grpcio/common.rs | 4 +- .../src/proto/grpcio/metrics.rs | 4 +- .../src/proto/grpcio/metrics_service.rs | 4 +- .../src/proto/grpcio/resource.rs | 4 +- opentelemetry-otlp/src/proto/grpcio/trace.rs | 4 +- .../src/proto/grpcio/trace_config.rs | 4 +- .../src/proto/grpcio/trace_service.rs | 4 +- .../src/proto/opentelemetry-proto | 2 +- opentelemetry-otlp/src/span.rs | 3 +- opentelemetry-otlp/src/transform/traces.rs | 2 +- .../src/resource.rs | 2 +- opentelemetry-zipkin/Cargo.toml | 1 + opentelemetry-zipkin/README.md | 6 +- opentelemetry-zipkin/src/exporter/mod.rs | 189 +++++++ .../src/{ => exporter}/model/annotation.rs | 2 +- .../src/{ => exporter}/model/endpoint.rs | 2 +- .../src/{ => exporter}/model/mod.rs | 2 +- .../src/{ => exporter}/model/span.rs | 8 +- .../src/{ => exporter}/uploader.rs | 6 +- opentelemetry-zipkin/src/lib.rs | 198 +------ .../src/propagator/mod.rs | 42 +- opentelemetry/Cargo.toml | 1 + opentelemetry/README.md | 2 +- opentelemetry/src/api/metrics/mod.rs | 2 +- opentelemetry/src/api/trace/mod.rs | 2 +- opentelemetry/src/api/trace/noop.rs | 2 +- opentelemetry/src/exporter/metrics/mod.rs | 4 - opentelemetry/src/exporter/mod.rs | 22 - opentelemetry/src/global/mod.rs | 4 +- opentelemetry/src/lib.rs | 8 +- opentelemetry/src/sdk/export/metrics/mod.rs | 2 + .../export}/metrics/stdout.rs | 0 opentelemetry/src/sdk/export/mod.rs | 9 + .../src/{exporter => sdk/export}/trace/mod.rs | 2 +- .../{exporter => sdk/export}/trace/stdout.rs | 6 +- opentelemetry/src/sdk/propagation/mod.rs | 6 - opentelemetry/src/sdk/trace/provider.rs | 2 +- opentelemetry/src/sdk/trace/span.rs | 6 +- opentelemetry/src/sdk/trace/span_processor.rs | 4 +- opentelemetry/src/testing/trace.rs | 18 +- 75 files changed, 897 insertions(+), 867 deletions(-) rename {opentelemetry/src/sdk/propagation => opentelemetry-contrib/src/trace/propagator}/aws.rs (98%) rename opentelemetry-jaeger/src/{ => exporter}/agent.rs (97%) rename opentelemetry-jaeger/src/{ => exporter}/collector.rs (99%) rename opentelemetry-jaeger/src/{ => exporter}/env.rs (100%) create mode 100644 opentelemetry-jaeger/src/exporter/mod.rs rename opentelemetry-jaeger/src/{ => exporter}/thrift/agent.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/thrift/jaeger.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/thrift/mod.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/thrift/zipkincore.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/transport/buffer.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/transport/mod.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/transport/noop.rs (100%) rename opentelemetry-jaeger/src/{ => exporter}/uploader.rs (92%) rename opentelemetry/src/sdk/propagation/jaeger.rs => opentelemetry-jaeger/src/propagator/mod.rs (95%) create mode 100644 opentelemetry-zipkin/src/exporter/mod.rs rename opentelemetry-zipkin/src/{ => exporter}/model/annotation.rs (94%) rename opentelemetry-zipkin/src/{ => exporter}/model/endpoint.rs (97%) rename opentelemetry-zipkin/src/{ => exporter}/model/mod.rs (99%) rename opentelemetry-zipkin/src/{ => exporter}/model/span.rs (94%) rename opentelemetry-zipkin/src/{ => exporter}/uploader.rs (89%) rename opentelemetry/src/sdk/propagation/b3.rs => opentelemetry-zipkin/src/propagator/mod.rs (96%) delete mode 100644 opentelemetry/src/exporter/metrics/mod.rs delete mode 100644 opentelemetry/src/exporter/mod.rs rename opentelemetry/src/{exporter => sdk/export}/metrics/stdout.rs (100%) rename opentelemetry/src/{exporter => sdk/export}/trace/mod.rs (99%) rename opentelemetry/src/{exporter => sdk/export}/trace/stdout.rs (98%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9c697a688..a887ff9bf6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,7 +52,7 @@ jobs: override: true - name: Run tests run: cargo --version && - cargo test --verbose --manifest-path=opentelemetry/Cargo.toml --features trace,metrics,serialize,tokio,serde,http,tonic,reqwest && + cargo test --verbose --manifest-path=opentelemetry/Cargo.toml --features trace,metrics,serialize,tokio,serde,http,tonic,reqwest,testing && cargo test --manifest-path=opentelemetry-jaeger/Cargo.toml && cargo test --manifest-path=opentelemetry-zipkin/Cargo.toml meta: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5864fc4989..59a17c8dc1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -129,3 +129,7 @@ See the [code owners](CODEOWNERS) file. See the [community membership document in OpenTelemetry community repo](https://github.com/open-telemetry/community/blob/master/community-membership.md). + +## FAQ +### Where should I put third party propagators/exporters, contrib or standalone crates? +As of now, the specification classify the propagators into three categories: Fully opened standards, platform-specific standards, proprietary headers. The conclusion is only the fully opened standards should live in SDK packages/repos. So here, only fully opened standards should live as independent crate. For more detail and discussion, see [this pr](https://github.com/open-telemetry/opentelemetry-specification/pull/1144). \ No newline at end of file diff --git a/README.md b/README.md index 02eda2cf05..508abe7072 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ observability tools. ## Getting Started ```rust -use opentelemetry::{exporter::trace::stdout, trace::Tracer}; +use opentelemetry::{sdk::export::trace::stdout, trace::Tracer}; fn main() -> Result<(), Box> { // Create a new instrumentation pipeline diff --git a/examples/aws-xray/Cargo.toml b/examples/aws-xray/Cargo.toml index 0aeda2916c..5b7d2302c7 100644 --- a/examples/aws-xray/Cargo.toml +++ b/examples/aws-xray/Cargo.toml @@ -17,4 +17,4 @@ path = "src/client.rs" hyper = "0.13" tokio = { version = "0.2", features = ["full"] } opentelemetry = { path = "../../opentelemetry", features = ["http"] } -opentelemetry-contrib = { path = "../../opentelemetry-contrib" } +opentelemetry-contrib = { path = "../../opentelemetry-contrib", features = ["aws-xray"] } diff --git a/examples/aws-xray/src/client.rs b/examples/aws-xray/src/client.rs index 91d55b669b..1023162ffa 100644 --- a/examples/aws-xray/src/client.rs +++ b/examples/aws-xray/src/client.rs @@ -1,11 +1,12 @@ use hyper::{body::Body, Client}; use opentelemetry::{ - exporter::trace::stdout, global, - sdk::{propagation::XrayPropagator, trace as sdktrace}, + sdk::export::trace::stdout, + sdk::trace as sdktrace, trace::{TraceContextExt, Tracer}, Context, KeyValue, }; +use opentelemetry_contrib::trace::propagator::XrayPropagator; fn init_tracer() -> (sdktrace::Tracer, stdout::Uninstall) { global::set_text_map_propagator(XrayPropagator::new()); diff --git a/examples/aws-xray/src/server.rs b/examples/aws-xray/src/server.rs index 49a9463c04..1631084534 100644 --- a/examples/aws-xray/src/server.rs +++ b/examples/aws-xray/src/server.rs @@ -1,11 +1,12 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use opentelemetry::{ - exporter::trace::stdout, global, - sdk::{propagation::XrayPropagator, trace as sdktrace}, + sdk::export::trace::stdout, + sdk::trace as sdktrace, trace::{Span, Tracer}, }; +use opentelemetry_contrib::trace::propagator::XrayPropagator; use std::{convert::Infallible, net::SocketAddr}; async fn handle(req: Request) -> Result, Infallible> { diff --git a/examples/basic-otlp/src/main.rs b/examples/basic-otlp/src/main.rs index 74c140ade9..5e05b70497 100644 --- a/examples/basic-otlp/src/main.rs +++ b/examples/basic-otlp/src/main.rs @@ -1,5 +1,4 @@ use futures::stream::{Stream, StreamExt}; -use opentelemetry::exporter; use opentelemetry::sdk::metrics::PushController; use opentelemetry::trace::TraceError; use opentelemetry::{ @@ -22,7 +21,7 @@ fn delayed_interval(duration: Duration) -> impl Stream metrics::Result { - exporter::metrics::stdout(tokio::spawn, delayed_interval) + opentelemetry::sdk::export::metrics::stdout(tokio::spawn, delayed_interval) .with_quantiles(vec![0.5, 0.9, 0.99]) .with_formatter(|batch| { serde_json::to_value(batch) diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index fee5280057..092b0e3433 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -1,5 +1,4 @@ use futures::stream::{Stream, StreamExt}; -use opentelemetry::exporter; use opentelemetry::global; use opentelemetry::sdk::{metrics::PushController, trace as sdktrace}; use opentelemetry::trace::TraceError; @@ -28,7 +27,7 @@ fn delayed_interval(duration: Duration) -> impl Stream metrics::Result { - exporter::metrics::stdout(tokio::spawn, delayed_interval) + opentelemetry::sdk::export::metrics::stdout(tokio::spawn, delayed_interval) .with_quantiles(vec![0.5, 0.9, 0.99]) .with_formatter(|batch| { serde_json::to_value(batch) diff --git a/examples/http/src/client.rs b/examples/http/src/client.rs index bfd2b455b0..aa80392a84 100644 --- a/examples/http/src/client.rs +++ b/examples/http/src/client.rs @@ -1,6 +1,6 @@ use hyper::{body::Body, Client}; -use opentelemetry::exporter::trace::stdout; use opentelemetry::global; +use opentelemetry::sdk::export::trace::stdout; use opentelemetry::sdk::{ propagation::TraceContextPropagator, trace::{Config, Sampler}, diff --git a/examples/http/src/server.rs b/examples/http/src/server.rs index 89528bf3d6..5e116824f7 100644 --- a/examples/http/src/server.rs +++ b/examples/http/src/server.rs @@ -1,8 +1,8 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use opentelemetry::{ - exporter::trace::stdout, global, + sdk::export::trace::stdout, sdk::{ propagation::TraceContextPropagator, trace::{Config, Sampler}, diff --git a/examples/stdout.rs b/examples/stdout.rs index cdbb814056..7b7b3c50b3 100644 --- a/examples/stdout.rs +++ b/examples/stdout.rs @@ -1,5 +1,5 @@ use opentelemetry::{ - exporter::trace::stdout, + sdk::export::trace::stdout, sdk::trace::{self, Sampler}, trace::Tracer, }; diff --git a/opentelemetry-contrib/Cargo.toml b/opentelemetry-contrib/Cargo.toml index d6c5b81cbe..68598bfa60 100644 --- a/opentelemetry-contrib/Cargo.toml +++ b/opentelemetry-contrib/Cargo.toml @@ -26,6 +26,7 @@ datadog = ["indexmap", "rmp", "async-trait", "thiserror"] reqwest-blocking-client = ["reqwest/blocking", "opentelemetry/reqwest"] reqwest-client = ["reqwest", "opentelemetry/reqwest"] surf-client = ["surf", "opentelemetry/surf"] +aws-xray = [] [dependencies] async-trait = { version = "0.1", optional = true } @@ -42,3 +43,4 @@ thiserror = { version = "1.0", optional = true } [dev-dependencies] base64 = "0.13" isahc = "0.9" +opentelemetry = { path = "../opentelemetry", features = ["trace", "http", "testing"] } \ No newline at end of file diff --git a/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs b/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs index 727d6f5440..e7ee4b4b9a 100644 --- a/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs +++ b/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs @@ -72,8 +72,8 @@ //! ```no_run //! use opentelemetry::{KeyValue, trace::Tracer}; //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; -//! use opentelemetry::exporter::trace::ExportResult; -//! use opentelemetry::exporter::trace::HttpClient; +//! use opentelemetry::sdk::export::trace::ExportResult; +//! use opentelemetry::sdk::export::trace::HttpClient; //! use opentelemetry_contrib::trace::exporter::datadog::{new_pipeline, ApiVersion}; //! use async_trait::async_trait; //! use opentelemetry_contrib::trace::exporter::datadog::Error; @@ -127,8 +127,8 @@ pub use model::Error; use async_trait::async_trait; use http::{Method, Request, Uri}; -use opentelemetry::exporter::trace; -use opentelemetry::exporter::trace::{HttpClient, SpanData}; +use opentelemetry::sdk::export::trace; +use opentelemetry::sdk::export::trace::{HttpClient, SpanData}; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk, trace::TracerProvider}; diff --git a/opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs b/opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs index a24a98c740..39e80ca1a2 100644 --- a/opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs +++ b/opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs @@ -1,4 +1,4 @@ -use opentelemetry::exporter::{trace, ExportError}; +use opentelemetry::sdk::export::{trace, ExportError}; mod v03; mod v05; diff --git a/opentelemetry-contrib/src/trace/exporter/datadog/model/v03.rs b/opentelemetry-contrib/src/trace/exporter/datadog/model/v03.rs index 2542f4c200..f7f94d6fc5 100644 --- a/opentelemetry-contrib/src/trace/exporter/datadog/model/v03.rs +++ b/opentelemetry-contrib/src/trace/exporter/datadog/model/v03.rs @@ -1,5 +1,5 @@ use crate::trace::exporter::datadog::model::Error; -use opentelemetry::exporter::trace; +use opentelemetry::sdk::export::trace; use opentelemetry::{Key, Value}; use std::time::SystemTime; diff --git a/opentelemetry-contrib/src/trace/exporter/datadog/model/v05.rs b/opentelemetry-contrib/src/trace/exporter/datadog/model/v05.rs index ecd15d899d..310b70e003 100644 --- a/opentelemetry-contrib/src/trace/exporter/datadog/model/v05.rs +++ b/opentelemetry-contrib/src/trace/exporter/datadog/model/v05.rs @@ -1,6 +1,6 @@ use crate::trace::exporter::datadog::intern::StringInterner; -use crate::trace::exporter::datadog::model::Error; -use opentelemetry::exporter::trace; +use crate::trace::exporter::datadog::Error; +use opentelemetry::sdk::export::trace; use opentelemetry::{Key, Value}; use std::time::SystemTime; diff --git a/opentelemetry/src/sdk/propagation/aws.rs b/opentelemetry-contrib/src/trace/propagator/aws.rs similarity index 98% rename from opentelemetry/src/sdk/propagation/aws.rs rename to opentelemetry-contrib/src/trace/propagator/aws.rs index e04936a54b..1bbbde2117 100644 --- a/opentelemetry/src/sdk/propagation/aws.rs +++ b/opentelemetry-contrib/src/trace/propagator/aws.rs @@ -1,4 +1,4 @@ -use crate::{ +use opentelemetry::{ propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator}, trace::{ SpanContext, SpanId, TraceContextExt, TraceId, TraceState, TRACE_FLAG_DEFERRED, @@ -32,7 +32,7 @@ lazy_static::lazy_static! { /// /// ``` /// use opentelemetry::global; -/// use opentelemetry::sdk::propagation::XrayPropagator; +/// use opentelemetry_contrib::trace::propagator::XrayPropagator; /// /// global::set_text_map_propagator(XrayPropagator::default()); /// ``` @@ -236,8 +236,8 @@ fn title_case(s: &str) -> String { #[cfg(test)] mod tests { use super::*; - use crate::testing::trace::TestSpan; - use crate::trace::TraceState; + use opentelemetry::testing::trace::TestSpan; + use opentelemetry::trace::TraceState; use std::collections::HashMap; use std::str::FromStr; diff --git a/opentelemetry-contrib/src/trace/propagator/mod.rs b/opentelemetry-contrib/src/trace/propagator/mod.rs index aed4750cbf..69e26c8796 100644 --- a/opentelemetry-contrib/src/trace/propagator/mod.rs +++ b/opentelemetry-contrib/src/trace/propagator/mod.rs @@ -6,4 +6,12 @@ //! Currently, the following propagators are supported: //! //! * `binary_propagator`, propagating trace context in the binary format. +//! * `XrayPropagator`, propagating via AWS XRay protocol. +//! +//! This module also provides relative types for those propagators. +#[cfg(feature = "aws-xray")] +mod aws; pub mod binary; + +#[cfg(feature = "aws-xray")] +pub use aws::XrayPropagator; diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 6db9117e7c..1db2a3a9a5 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -34,6 +34,10 @@ tokio = { version = "0.2", features = ["udp", "sync"], optional = true } wasm-bindgen = { version = "0.2", optional = true } wasm-bindgen-futures = { version = "0.4.18", optional = true } thiserror = "1.0" +lazy_static = "1.4" + +[dev-dependencies] +opentelemetry = { version = "0.10", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } [dependencies.web-sys] version = "0.3.4" diff --git a/opentelemetry-jaeger/README.md b/opentelemetry-jaeger/README.md index f570d0c320..0044645d4f 100644 --- a/opentelemetry-jaeger/README.md +++ b/opentelemetry-jaeger/README.md @@ -4,7 +4,7 @@ # OpenTelemetry Jaeger -[`Jaeger`] integration for applications instrumented with [`OpenTelemetry`]. +[`Jaeger`] integration for applications instrumented with [`OpenTelemetry`]. This includes a jaeger exporter and a jaeger propagator. [![Crates.io: opentelemetry-jaeger](https://img.shields.io/crates/v/opentelemetry-jaeger.svg)](https://crates.io/crates/opentelemetry-jaeger) [![Documentation](https://docs.rs/opentelemetry-jaeger/badge.svg)](https://docs.rs/opentelemetry-jaeger) @@ -43,8 +43,10 @@ exporting telemetry: ```rust use opentelemetry::tracer; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().install()?; tracer.in_span("doing_work", |cx| { @@ -84,8 +86,10 @@ in the [jaeger variables spec]. ```rust use opentelemetry::tracer; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); // export OTEL_SERVICE_NAME=my-service-name let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?; @@ -142,8 +146,10 @@ Example showing how to override all configuration options. See the ```rust use opentelemetry::{KeyValue, Tracer}; use opentelemetry::sdk::{trace, IdGenerator, Resource, Sampler}; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() .from_env() .with_agent_endpoint("localhost:6831") diff --git a/opentelemetry-jaeger/src/agent.rs b/opentelemetry-jaeger/src/exporter/agent.rs similarity index 97% rename from opentelemetry-jaeger/src/agent.rs rename to opentelemetry-jaeger/src/exporter/agent.rs index bd6b72485b..3e734ac634 100644 --- a/opentelemetry-jaeger/src/agent.rs +++ b/opentelemetry-jaeger/src/exporter/agent.rs @@ -1,9 +1,9 @@ //! # UDP Jaeger Agent Client -use crate::thrift::{ +use crate::exporter::thrift::{ agent::{self, TAgentSyncClient}, jaeger, }; -use crate::transport::{TBufferChannel, TNoopChannel}; +use crate::exporter::transport::{TBufferChannel, TNoopChannel}; use std::fmt; use std::net::{ToSocketAddrs, UdpSocket}; use thrift::{ diff --git a/opentelemetry-jaeger/src/collector.rs b/opentelemetry-jaeger/src/exporter/collector.rs similarity index 99% rename from opentelemetry-jaeger/src/collector.rs rename to opentelemetry-jaeger/src/exporter/collector.rs index d502fb2683..0b26b2e16a 100644 --- a/opentelemetry-jaeger/src/collector.rs +++ b/opentelemetry-jaeger/src/exporter/collector.rs @@ -23,7 +23,7 @@ struct WasmHttpClient { #[cfg(feature = "collector_client")] mod collector_client { use super::*; - use crate::thrift::jaeger; + use crate::exporter::thrift::jaeger; use http::{Request, Uri}; use isahc::{ auth::{Authentication, Credentials}, @@ -106,7 +106,7 @@ mod collector_client { #[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] mod wasm_collector_client { use super::*; - use crate::thrift::jaeger; + use crate::exporter::thrift::jaeger; use futures_util::future; use http::Uri; use js_sys::Uint8Array; diff --git a/opentelemetry-jaeger/src/env.rs b/opentelemetry-jaeger/src/exporter/env.rs similarity index 100% rename from opentelemetry-jaeger/src/env.rs rename to opentelemetry-jaeger/src/exporter/env.rs diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs new file mode 100644 index 0000000000..62f38484ca --- /dev/null +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -0,0 +1,519 @@ +//! # Jaeger Exporter +//! +mod agent; +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] +mod collector; +#[allow(clippy::all, unreachable_pub, dead_code)] +#[rustfmt::skip] +mod thrift; +mod env; +pub(crate) mod transport; +mod uploader; + +use self::thrift::jaeger; +use agent::AgentAsyncClientUDP; +use async_trait::async_trait; +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] +use collector::CollectorAsyncClientHttp; +use opentelemetry::sdk::export::ExportError; +use opentelemetry::trace::TraceError; +use opentelemetry::{ + global, sdk, + sdk::export::trace, + trace::{Event, Link, SpanKind, StatusCode, TracerProvider}, + Key, KeyValue, Value, +}; +use std::{ + net, + time::{Duration, SystemTime}, +}; +use uploader::BatchUploader; + +/// Default service name if no service is configured. +const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; + +/// Default agent endpoint if none is provided +const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; + +/// Instrument Library name MUST be reported in Jaeger Span tags with the following key +const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; + +/// Instrument Library version MUST be reported in Jaeger Span tags with the following key +const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; + +/// Create a new Jaeger exporter pipeline builder. +pub fn new_pipeline() -> PipelineBuilder { + PipelineBuilder::default() +} + +/// Guard that uninstalls the Jaeger trace pipeline when dropped +#[must_use] +#[derive(Debug)] +pub struct Uninstall(global::TracerProviderGuard); + +/// Jaeger span exporter +#[derive(Debug)] +pub struct Exporter { + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: uploader::BatchUploader, +} + +/// Jaeger process configuration +#[derive(Debug, Default)] +pub struct Process { + /// Jaeger service name + pub service_name: String, + /// Jaeger tags + pub tags: Vec, +} + +impl Into for Process { + fn into(self) -> jaeger::Process { + jaeger::Process::new( + self.service_name, + Some(self.tags.into_iter().map(Into::into).collect()), + ) + } +} + +#[async_trait] +impl trace::SpanExporter for Exporter { + /// Export spans to Jaeger + async fn export(&mut self, batch: Vec) -> trace::ExportResult { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let mut process = self.process.clone(); + + for (idx, span) in batch.into_iter().enumerate() { + if idx == 0 { + if let Some(span_process_tags) = build_process_tags(&span) { + if let Some(process_tags) = &mut process.tags { + process_tags.extend(span_process_tags); + } else { + process.tags = Some(span_process_tags.collect()) + } + } + } + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + self.uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await + } +} + +/// Jaeger exporter builder +#[derive(Debug)] +pub struct PipelineBuilder { + agent_endpoint: Vec, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_endpoint: Option, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_username: Option, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_password: Option, + export_instrument_library: bool, + process: Process, + config: Option, +} + +impl Default for PipelineBuilder { + /// Return the default Exporter Builder. + fn default() -> Self { + PipelineBuilder { + agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_endpoint: None, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_username: None, + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + collector_password: None, + export_instrument_library: true, + process: Process { + service_name: DEFAULT_SERVICE_NAME.to_string(), + tags: Vec::new(), + }, + config: None, + } + } +} + +impl PipelineBuilder { + /// Assign builder attributes from environment variables. + /// + /// See the [jaeger variable spec] for full list. + /// + /// [jaeger variable spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter + #[allow(clippy::wrong_self_convention)] + pub fn from_env(self) -> Self { + env::assign_attrs(self) + } + + /// Assign the agent endpoint. + pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self { + PipelineBuilder { + agent_endpoint: agent_endpoint + .to_socket_addrs() + .map(|addrs| addrs.collect()) + .unwrap_or_default(), + + ..self + } + } + + /// Config whether to export information of instrumentation library. + pub fn with_instrumentation_library_tags(self, export: bool) -> Self { + PipelineBuilder { + export_instrument_library: export, + ..self + } + } + + /// Assign the collector endpoint. + /// + /// E.g. "http://localhost:14268/api/traces" + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(cfg(any(feature = "collector_client", feature = "wasm_collector_client"))) + )] + pub fn with_collector_endpoint(self, collector_endpoint: T) -> Self + where + http::Uri: core::convert::TryFrom, + { + PipelineBuilder { + collector_endpoint: core::convert::TryFrom::try_from(collector_endpoint).ok(), + ..self + } + } + + /// Assign the collector username + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(any(feature = "collector_client", feature = "wasm_collector_client")) + )] + pub fn with_collector_username>(self, collector_username: S) -> Self { + PipelineBuilder { + collector_username: Some(collector_username.into()), + ..self + } + } + + /// Assign the collector password + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(any(feature = "collector_client", feature = "wasm_collector_client")) + )] + pub fn with_collector_password>(self, collector_password: S) -> Self { + PipelineBuilder { + collector_password: Some(collector_password.into()), + ..self + } + } + + /// Assign the process service name. + pub fn with_service_name>(mut self, service_name: T) -> Self { + self.process.service_name = service_name.into(); + self + } + + /// Assign the process service tags. + pub fn with_tags>(mut self, tags: T) -> Self { + self.process.tags = tags.into_iter().collect(); + self + } + + /// Assign the SDK config for the exporter pipeline. + pub fn with_trace_config(self, config: sdk::trace::Config) -> Self { + PipelineBuilder { + config: Some(config), + ..self + } + } + + /// Install a Jaeger pipeline with the recommended defaults. + pub fn install(self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { + let tracer_provider = self.build()?; + let tracer = + tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION"))); + + let provider_guard = global::set_tracer_provider(tracer_provider); + + Ok((tracer, Uninstall(provider_guard))) + } + + /// Build a configured `sdk::trace::TracerProvider` with the recommended defaults. + pub fn build(mut self) -> Result { + let config = self.config.take(); + let exporter = self.init_exporter()?; + + let mut builder = sdk::trace::TracerProvider::builder().with_exporter(exporter); + + if let Some(config) = config { + builder = builder.with_config(config) + } + + Ok(builder.build()) + } + + /// Initialize a new exporter. + /// + /// This is useful if you are manually constructing a pipeline. + pub fn init_exporter(self) -> Result { + let export_instrumentation_lib = self.export_instrument_library; + let (process, uploader) = self.init_uploader()?; + + Ok(Exporter { + process: process.into(), + export_instrumentation_lib, + uploader, + }) + } + + #[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))] + fn init_uploader(self) -> Result<(Process, BatchUploader), TraceError> { + let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice()) + .map_err::(Into::into)?; + Ok((self.process, BatchUploader::Agent(agent))) + } + + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + fn init_uploader(self) -> Result<(Process, uploader::BatchUploader), TraceError> { + if let Some(collector_endpoint) = self.collector_endpoint { + let collector = CollectorAsyncClientHttp::new( + collector_endpoint, + self.collector_username, + self.collector_password, + ) + .map_err::(Into::into)?; + Ok((self.process, uploader::BatchUploader::Collector(collector))) + } else { + let endpoint = self.agent_endpoint.as_slice(); + let agent = AgentAsyncClientUDP::new(endpoint).map_err::(Into::into)?; + Ok((self.process, BatchUploader::Agent(agent))) + } + } +} + +#[rustfmt::skip] +impl Into for KeyValue { + fn into(self) -> jaeger::Tag { + let KeyValue { key, value } = self; + match value { + Value::String(s) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(s.into()), None, None, None, None), + Value::F64(f) => jaeger::Tag::new(key.into(), jaeger::TagType::Double, None, Some(f.into()), None, None, None), + Value::Bool(b) => jaeger::Tag::new(key.into(), jaeger::TagType::Bool, None, None, Some(b), None, None), + Value::I64(i) => jaeger::Tag::new(key.into(), jaeger::TagType::Long, None, None, None, Some(i), None), + // TODO: better Array handling, jaeger thrift doesn't support arrays + v @ Value::Array(_) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(v.to_string()), None, None, None, None), + } + } +} + +impl Into for Event { + fn into(self) -> jaeger::Log { + let timestamp = self + .timestamp + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_micros() as i64; + let mut event_set_via_attribute = false; + let mut fields = self + .attributes + .into_iter() + .map(|attr| { + if attr.key.as_str() == "event" { + event_set_via_attribute = true; + }; + attr.into() + }) + .collect::>(); + + if !event_set_via_attribute { + fields.push(Key::new("event").string(self.name).into()); + } + + jaeger::Log::new(timestamp, fields) + } +} + +fn links_to_references(links: sdk::trace::EvictedQueue) -> Option> { + if !links.is_empty() { + let refs = links + .iter() + .map(|link| { + let span_context = link.span_context(); + let trace_id = span_context.trace_id().to_u128(); + let trace_id_high = (trace_id >> 64) as i64; + let trace_id_low = trace_id as i64; + + // TODO: properly set the reference type when specs are defined + // see https://github.com/open-telemetry/opentelemetry-specification/issues/65 + jaeger::SpanRef::new( + jaeger::SpanRefType::ChildOf, + trace_id_low, + trace_id_high, + span_context.span_id().to_u64() as i64, + ) + }) + .collect(); + Some(refs) + } else { + None + } +} + +/// Convert spans to jaeger thrift span for exporting. +fn convert_otel_span_into_jaeger_span( + span: trace::SpanData, + export_instrument_lib: bool, +) -> jaeger::Span { + let trace_id = span.span_context.trace_id().to_u128(); + let trace_id_high = (trace_id >> 64) as i64; + let trace_id_low = trace_id as i64; + jaeger::Span { + trace_id_low, + trace_id_high, + span_id: span.span_context.span_id().to_u64() as i64, + parent_span_id: span.parent_span_id.to_u64() as i64, + operation_name: span.name, + references: links_to_references(span.links), + flags: span.span_context.trace_flags() as i32, + start_time: span + .start_time + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_micros() as i64, + duration: span + .end_time + .duration_since(span.start_time) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_micros() as i64, + tags: build_span_tags( + span.attributes, + if export_instrument_lib { + Some(span.instrumentation_lib) + } else { + None + }, + span.status_code, + span.status_message, + span.span_kind, + ), + logs: events_to_logs(span.message_events), + } +} + +fn build_process_tags( + span_data: &trace::SpanData, +) -> Option + '_> { + if span_data.resource.is_empty() { + None + } else { + Some( + span_data + .resource + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone()).into()), + ) + } +} + +fn build_span_tags( + attrs: sdk::trace::EvictedHashMap, + instrumentation_lib: Option, + status_code: StatusCode, + status_message: String, + kind: SpanKind, +) -> Option> { + let mut user_overrides = UserOverrides::default(); + // TODO determine if namespacing is required to avoid collisions with set attributes + let mut tags = attrs + .into_iter() + .map(|(k, v)| { + user_overrides.record_attr(k.as_str()); + KeyValue::new(k, v).into() + }) + .collect::>(); + + if let Some(instrumentation_lib) = instrumentation_lib { + // Set instrument library tags + tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_NAME, instrumentation_lib.name).into()); + if let Some(version) = instrumentation_lib.version { + tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_VERSION, version).into()) + } + } + + // Ensure error status is set + if status_code == StatusCode::Error && !user_overrides.error { + tags.push(Key::new(ERROR).bool(true).into()) + } + + if !user_overrides.span_kind { + tags.push(Key::new(SPAN_KIND).string(kind.to_string()).into()); + } + + if !user_overrides.status_code { + tags.push(KeyValue::new(STATUS_CODE, status_code as i64).into()); + } + + if !user_overrides.status_message { + tags.push(Key::new(STATUS_MESSAGE).string(status_message).into()); + } + + Some(tags) +} + +const ERROR: &str = "error"; +const SPAN_KIND: &str = "span.kind"; +const STATUS_CODE: &str = "status.code"; +const STATUS_MESSAGE: &str = "status.message"; + +#[derive(Default)] +struct UserOverrides { + error: bool, + span_kind: bool, + status_code: bool, + status_message: bool, +} + +impl UserOverrides { + fn record_attr(&mut self, attr: &str) { + match attr { + ERROR => self.error = true, + SPAN_KIND => self.span_kind = true, + STATUS_CODE => self.status_code = true, + STATUS_MESSAGE => self.status_message = true, + _ => (), + } + } +} + +fn events_to_logs(events: sdk::trace::EvictedQueue) -> Option> { + if events.is_empty() { + None + } else { + Some(events.into_iter().map(Into::into).collect()) + } +} + +/// Wrap type for errors from opentelemetry jaeger +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Error from thrift agents. + #[error("thrift agent failed with {0}")] + ThriftAgentError(#[from] ::thrift::Error), +} + +impl ExportError for Error { + fn exporter_name(&self) -> &'static str { + "jaeger" + } +} diff --git a/opentelemetry-jaeger/src/thrift/agent.rs b/opentelemetry-jaeger/src/exporter/thrift/agent.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/agent.rs rename to opentelemetry-jaeger/src/exporter/thrift/agent.rs diff --git a/opentelemetry-jaeger/src/thrift/jaeger.rs b/opentelemetry-jaeger/src/exporter/thrift/jaeger.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/jaeger.rs rename to opentelemetry-jaeger/src/exporter/thrift/jaeger.rs diff --git a/opentelemetry-jaeger/src/thrift/mod.rs b/opentelemetry-jaeger/src/exporter/thrift/mod.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/mod.rs rename to opentelemetry-jaeger/src/exporter/thrift/mod.rs diff --git a/opentelemetry-jaeger/src/thrift/zipkincore.rs b/opentelemetry-jaeger/src/exporter/thrift/zipkincore.rs similarity index 100% rename from opentelemetry-jaeger/src/thrift/zipkincore.rs rename to opentelemetry-jaeger/src/exporter/thrift/zipkincore.rs diff --git a/opentelemetry-jaeger/src/transport/buffer.rs b/opentelemetry-jaeger/src/exporter/transport/buffer.rs similarity index 100% rename from opentelemetry-jaeger/src/transport/buffer.rs rename to opentelemetry-jaeger/src/exporter/transport/buffer.rs diff --git a/opentelemetry-jaeger/src/transport/mod.rs b/opentelemetry-jaeger/src/exporter/transport/mod.rs similarity index 100% rename from opentelemetry-jaeger/src/transport/mod.rs rename to opentelemetry-jaeger/src/exporter/transport/mod.rs diff --git a/opentelemetry-jaeger/src/transport/noop.rs b/opentelemetry-jaeger/src/exporter/transport/noop.rs similarity index 100% rename from opentelemetry-jaeger/src/transport/noop.rs rename to opentelemetry-jaeger/src/exporter/transport/noop.rs diff --git a/opentelemetry-jaeger/src/uploader.rs b/opentelemetry-jaeger/src/exporter/uploader.rs similarity index 92% rename from opentelemetry-jaeger/src/uploader.rs rename to opentelemetry-jaeger/src/exporter/uploader.rs index ded8ede466..91681d0b28 100644 --- a/opentelemetry-jaeger/src/uploader.rs +++ b/opentelemetry-jaeger/src/exporter/uploader.rs @@ -1,8 +1,8 @@ //! # Jaeger Span Uploader #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -use crate::collector; -use crate::{agent, jaeger}; -use opentelemetry::exporter::trace; +use crate::exporter::collector; +use crate::exporter::{agent, jaeger}; +use opentelemetry::sdk::export::trace; /// Uploads a batch of spans to Jaeger #[derive(Debug)] diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index cefe00a081..17d55cc045 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -21,8 +21,10 @@ //! //! ```no_run //! use opentelemetry::trace::Tracer; +//! use opentelemetry::global; //! //! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().install()?; //! //! tracer.in_span("doing_work", |cx| { @@ -60,8 +62,10 @@ //! //! ```no_run //! use opentelemetry::trace::{Tracer, TraceError}; +//! use opentelemetry::global; //! //! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! // export OTEL_SERVICE_NAME=my-service-name //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?; //! @@ -118,8 +122,10 @@ //! ```no_run //! use opentelemetry::{KeyValue, trace::{Tracer, TraceError}}; //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; +//! use opentelemetry::global; //! //! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() //! .from_env() //! .with_agent_endpoint("localhost:6831") @@ -179,520 +185,8 @@ )] #![cfg_attr(test, deny(warnings))] -mod agent; -#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -mod collector; -#[allow(clippy::all, unreachable_pub, dead_code)] -#[rustfmt::skip] -mod thrift; -mod env; -pub(crate) mod transport; -mod uploader; +mod exporter; +mod propagator; -use self::thrift::jaeger; -use agent::AgentAsyncClientUDP; -use async_trait::async_trait; -#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -use collector::CollectorAsyncClientHttp; -use opentelemetry::exporter::ExportError; -use opentelemetry::trace::TraceError; -use opentelemetry::{ - exporter::trace, - global, sdk, - trace::{Event, Link, SpanKind, StatusCode, TracerProvider}, - Key, KeyValue, Value, -}; -use std::{ - net, - time::{Duration, SystemTime}, -}; -use uploader::BatchUploader; - -/// Default service name if no service is configured. -const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; - -/// Default agent endpoint if none is provided -const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; - -/// Instrument Library name MUST be reported in Jaeger Span tags with the following key -const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; - -/// Instrument Library version MUST be reported in Jaeger Span tags with the following key -const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; - -/// Create a new Jaeger exporter pipeline builder. -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} - -/// Guard that uninstalls the Jaeger trace pipeline when dropped -#[must_use] -#[derive(Debug)] -pub struct Uninstall(global::TracerProviderGuard); - -/// Jaeger span exporter -#[derive(Debug)] -pub struct Exporter { - process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: uploader::BatchUploader, -} - -/// Jaeger process configuration -#[derive(Debug, Default)] -pub struct Process { - /// Jaeger service name - pub service_name: String, - /// Jaeger tags - pub tags: Vec, -} - -impl Into for Process { - fn into(self) -> jaeger::Process { - jaeger::Process::new( - self.service_name, - Some(self.tags.into_iter().map(Into::into).collect()), - ) - } -} - -#[async_trait] -impl trace::SpanExporter for Exporter { - /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let mut process = self.process.clone(); - - for (idx, span) in batch.into_iter().enumerate() { - if idx == 0 { - if let Some(span_process_tags) = build_process_tags(&span) { - if let Some(process_tags) = &mut process.tags { - process_tags.extend(span_process_tags); - } else { - process.tags = Some(span_process_tags.collect()) - } - } - } - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); - } - - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await - } -} - -/// Jaeger exporter builder -#[derive(Debug)] -pub struct PipelineBuilder { - agent_endpoint: Vec, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_endpoint: Option, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_username: Option, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_password: Option, - export_instrument_library: bool, - process: Process, - config: Option, -} - -impl Default for PipelineBuilder { - /// Return the default Exporter Builder. - fn default() -> Self { - PipelineBuilder { - agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_endpoint: None, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_username: None, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_password: None, - export_instrument_library: true, - process: Process { - service_name: DEFAULT_SERVICE_NAME.to_string(), - tags: Vec::new(), - }, - config: None, - } - } -} - -impl PipelineBuilder { - /// Assign builder attributes from environment variables. - /// - /// See the [jaeger variable spec] for full list. - /// - /// [jaeger variable spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter - #[allow(clippy::wrong_self_convention)] - pub fn from_env(self) -> Self { - env::assign_attrs(self) - } - - /// Assign the agent endpoint. - pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self { - PipelineBuilder { - agent_endpoint: agent_endpoint - .to_socket_addrs() - .map(|addrs| addrs.collect()) - .unwrap_or_default(), - - ..self - } - } - - /// Config whether to export information of instrumentation library. - pub fn with_instrumentation_library_tags(self, export: bool) -> Self { - PipelineBuilder { - export_instrument_library: export, - ..self - } - } - - /// Assign the collector endpoint. - /// - /// E.g. "http://localhost:14268/api/traces" - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(cfg(any(feature = "collector_client", feature = "wasm_collector_client"))) - )] - pub fn with_collector_endpoint(self, collector_endpoint: T) -> Self - where - http::Uri: core::convert::TryFrom, - { - PipelineBuilder { - collector_endpoint: core::convert::TryFrom::try_from(collector_endpoint).ok(), - ..self - } - } - - /// Assign the collector username - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn with_collector_username>(self, collector_username: S) -> Self { - PipelineBuilder { - collector_username: Some(collector_username.into()), - ..self - } - } - - /// Assign the collector password - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn with_collector_password>(self, collector_password: S) -> Self { - PipelineBuilder { - collector_password: Some(collector_password.into()), - ..self - } - } - - /// Assign the process service name. - pub fn with_service_name>(mut self, service_name: T) -> Self { - self.process.service_name = service_name.into(); - self - } - - /// Assign the process service tags. - pub fn with_tags>(mut self, tags: T) -> Self { - self.process.tags = tags.into_iter().collect(); - self - } - - /// Assign the SDK config for the exporter pipeline. - pub fn with_trace_config(self, config: sdk::trace::Config) -> Self { - PipelineBuilder { - config: Some(config), - ..self - } - } - - /// Install a Jaeger pipeline with the recommended defaults. - pub fn install(self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { - let tracer_provider = self.build()?; - let tracer = - tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION"))); - - let provider_guard = global::set_tracer_provider(tracer_provider); - - Ok((tracer, Uninstall(provider_guard))) - } - - /// Build a configured `sdk::trace::TracerProvider` with the recommended defaults. - pub fn build(mut self) -> Result { - let config = self.config.take(); - let exporter = self.init_exporter()?; - - let mut builder = sdk::trace::TracerProvider::builder().with_exporter(exporter); - - if let Some(config) = config { - builder = builder.with_config(config) - } - - Ok(builder.build()) - } - - /// Initialize a new exporter. - /// - /// This is useful if you are manually constructing a pipeline. - pub fn init_exporter(self) -> Result { - let export_instrumentation_lib = self.export_instrument_library; - let (process, uploader) = self.init_uploader()?; - - Ok(Exporter { - process: process.into(), - export_instrumentation_lib, - uploader, - }) - } - - #[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))] - fn init_uploader(self) -> Result<(Process, BatchUploader), TraceError> { - let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice()) - .map_err::(Into::into)?; - Ok((self.process, BatchUploader::Agent(agent))) - } - - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - fn init_uploader(self) -> Result<(Process, uploader::BatchUploader), TraceError> { - if let Some(collector_endpoint) = self.collector_endpoint { - let collector = CollectorAsyncClientHttp::new( - collector_endpoint, - self.collector_username, - self.collector_password, - ) - .map_err::(Into::into)?; - Ok((self.process, uploader::BatchUploader::Collector(collector))) - } else { - let endpoint = self.agent_endpoint.as_slice(); - let agent = AgentAsyncClientUDP::new(endpoint).map_err::(Into::into)?; - Ok((self.process, BatchUploader::Agent(agent))) - } - } -} - -#[rustfmt::skip] -impl Into for KeyValue { - fn into(self) -> jaeger::Tag { - let KeyValue { key, value } = self; - match value { - Value::String(s) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(s.into()), None, None, None, None), - Value::F64(f) => jaeger::Tag::new(key.into(), jaeger::TagType::Double, None, Some(f.into()), None, None, None), - Value::Bool(b) => jaeger::Tag::new(key.into(), jaeger::TagType::Bool, None, None, Some(b), None, None), - Value::I64(i) => jaeger::Tag::new(key.into(), jaeger::TagType::Long, None, None, None, Some(i), None), - // TODO: better Array handling, jaeger thrift doesn't support arrays - v @ Value::Array(_) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(v.to_string()), None, None, None, None), - } - } -} - -impl Into for Event { - fn into(self) -> jaeger::Log { - let timestamp = self - .timestamp - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_micros() as i64; - let mut event_set_via_attribute = false; - let mut fields = self - .attributes - .into_iter() - .map(|attr| { - if attr.key.as_str() == "event" { - event_set_via_attribute = true; - }; - attr.into() - }) - .collect::>(); - - if !event_set_via_attribute { - fields.push(Key::new("event").string(self.name).into()); - } - - jaeger::Log::new(timestamp, fields) - } -} - -fn links_to_references(links: sdk::trace::EvictedQueue) -> Option> { - if !links.is_empty() { - let refs = links - .iter() - .map(|link| { - let span_context = link.span_context(); - let trace_id = span_context.trace_id().to_u128(); - let trace_id_high = (trace_id >> 64) as i64; - let trace_id_low = trace_id as i64; - - // TODO: properly set the reference type when specs are defined - // see https://github.com/open-telemetry/opentelemetry-specification/issues/65 - jaeger::SpanRef::new( - jaeger::SpanRefType::ChildOf, - trace_id_low, - trace_id_high, - span_context.span_id().to_u64() as i64, - ) - }) - .collect(); - Some(refs) - } else { - None - } -} - -/// Convert spans to jaeger thrift span for exporting. -fn convert_otel_span_into_jaeger_span( - span: trace::SpanData, - export_instrument_lib: bool, -) -> jaeger::Span { - let trace_id = span.span_context.trace_id().to_u128(); - let trace_id_high = (trace_id >> 64) as i64; - let trace_id_low = trace_id as i64; - jaeger::Span { - trace_id_low, - trace_id_high, - span_id: span.span_context.span_id().to_u64() as i64, - parent_span_id: span.parent_span_id.to_u64() as i64, - operation_name: span.name, - references: links_to_references(span.links), - flags: span.span_context.trace_flags() as i32, - start_time: span - .start_time - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_micros() as i64, - duration: span - .end_time - .duration_since(span.start_time) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_micros() as i64, - tags: build_span_tags( - span.attributes, - if export_instrument_lib { - Some(span.instrumentation_lib) - } else { - None - }, - span.status_code, - span.status_message, - span.span_kind, - ), - logs: events_to_logs(span.message_events), - } -} - -fn build_process_tags( - span_data: &trace::SpanData, -) -> Option + '_> { - if span_data.resource.is_empty() { - None - } else { - Some( - span_data - .resource - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone()).into()), - ) - } -} - -fn build_span_tags( - attrs: sdk::trace::EvictedHashMap, - instrumentation_lib: Option, - status_code: StatusCode, - status_message: String, - kind: SpanKind, -) -> Option> { - let mut user_overrides = UserOverrides::default(); - // TODO determine if namespacing is required to avoid collisions with set attributes - let mut tags = attrs - .into_iter() - .map(|(k, v)| { - user_overrides.record_attr(k.as_str()); - KeyValue::new(k, v).into() - }) - .collect::>(); - - if let Some(instrumentation_lib) = instrumentation_lib { - // Set instrument library tags - tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_NAME, instrumentation_lib.name).into()); - if let Some(version) = instrumentation_lib.version { - tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_VERSION, version).into()) - } - } - - // Ensure error status is set - if status_code == StatusCode::Error && !user_overrides.error { - tags.push(Key::new(ERROR).bool(true).into()) - } - - if !user_overrides.span_kind { - tags.push(Key::new(SPAN_KIND).string(kind.to_string()).into()); - } - - if !user_overrides.status_code { - tags.push(KeyValue::new(STATUS_CODE, status_code as i64).into()); - } - - if !user_overrides.status_message { - tags.push(Key::new(STATUS_MESSAGE).string(status_message).into()); - } - - Some(tags) -} - -const ERROR: &str = "error"; -const SPAN_KIND: &str = "span.kind"; -const STATUS_CODE: &str = "status.code"; -const STATUS_MESSAGE: &str = "status.message"; - -#[derive(Default)] -struct UserOverrides { - error: bool, - span_kind: bool, - status_code: bool, - status_message: bool, -} - -impl UserOverrides { - fn record_attr(&mut self, attr: &str) { - match attr { - ERROR => self.error = true, - SPAN_KIND => self.span_kind = true, - STATUS_CODE => self.status_code = true, - STATUS_MESSAGE => self.status_message = true, - _ => (), - } - } -} - -fn events_to_logs(events: sdk::trace::EvictedQueue) -> Option> { - if events.is_empty() { - None - } else { - Some(events.into_iter().map(Into::into).collect()) - } -} - -/// Wrap type for errors from opentelemetry jaeger -#[derive(thiserror::Error, Debug)] -pub enum Error { - /// Error from thrift agents. - #[error("thrift agent failed with {0}")] - ThriftAgentError(#[from] ::thrift::Error), -} - -impl ExportError for Error { - fn exporter_name(&self) -> &'static str { - "jaeger" - } -} +pub use exporter::{new_pipeline, Error, Exporter, PipelineBuilder, Process, Uninstall}; +pub use propagator::Propagator; diff --git a/opentelemetry/src/sdk/propagation/jaeger.rs b/opentelemetry-jaeger/src/propagator/mod.rs similarity index 95% rename from opentelemetry/src/sdk/propagation/jaeger.rs rename to opentelemetry-jaeger/src/propagator/mod.rs index 63554c9c13..bb46b47d1b 100644 --- a/opentelemetry/src/sdk/propagation/jaeger.rs +++ b/opentelemetry-jaeger/src/propagator/mod.rs @@ -5,7 +5,7 @@ //! See [`Jaeger documentation`] for detail of Jaeger propagation format. //! //! [`Jaeger documentation`]: https://www.jaegertracing.io/docs/1.18/client-libraries/#propagation-format -use crate::{ +use opentelemetry::{ propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator}, trace::{ SpanContext, SpanId, TraceContextExt, TraceId, TraceState, TRACE_FLAG_DEBUG, @@ -32,20 +32,20 @@ lazy_static::lazy_static! { /// /// [`Jaeger documentation`]: https://www.jaegertracing.io/docs/1.18/client-libraries/#propagation-format #[derive(Clone, Debug)] -pub struct JaegerPropagator { +pub struct Propagator { _private: (), } -impl Default for JaegerPropagator { +impl Default for Propagator { fn default() -> Self { - JaegerPropagator { _private: () } + Propagator { _private: () } } } -impl JaegerPropagator { +impl Propagator { /// Create a Jaeger propagator pub fn new() -> Self { - JaegerPropagator::default() + Propagator::default() } /// Extract span context from header value @@ -135,7 +135,7 @@ impl JaegerPropagator { } } -impl TextMapPropagator for JaegerPropagator { +impl TextMapPropagator for Propagator { fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) { let span_context = cx.span().span_context(); if span_context.is_valid() { @@ -174,9 +174,9 @@ impl TextMapPropagator for JaegerPropagator { #[cfg(test)] mod tests { use super::*; - use crate::testing::trace::TestSpan; - use crate::{ + use opentelemetry::{ propagation::{Injector, TextMapPropagator}, + testing::trace::TestSpan, trace::{ SpanContext, SpanId, TraceContextExt, TraceId, TraceState, TRACE_FLAG_DEBUG, TRACE_FLAG_NOT_SAMPLED, TRACE_FLAG_SAMPLED, @@ -300,7 +300,7 @@ mod tests { #[test] fn test_extract_empty() { let map: HashMap = HashMap::new(); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -316,7 +316,7 @@ mod tests { JAEGER_HEADER, format!("{}:{}:0:{}", trace_id, span_id, flag), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!(context.remote_span_context(), Some(&expected)); } @@ -329,7 +329,7 @@ mod tests { JAEGER_HEADER, format!("{}:{}:0:1:aa", LONG_TRACE_ID_STR, SPAN_ID_STR), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -344,7 +344,7 @@ mod tests { JAEGER_HEADER, format!("{}:{}:0:aa", LONG_TRACE_ID_STR, SPAN_ID_STR), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -359,7 +359,7 @@ mod tests { JAEGER_HEADER, format!("{}%3A{}%3A0%3A1", LONG_TRACE_ID_STR, SPAN_ID_STR), ); - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); let context = propagator.extract(&map); assert_eq!( context.remote_span_context(), @@ -374,7 +374,7 @@ mod tests { } #[test] fn test_inject() { - let propagator = JaegerPropagator::new(); + let propagator = Propagator::new(); for (span_context, header_value) in get_inject_data() { let mut injector = HashMap::new(); propagator.inject_context( diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index f80e235922..a98246ea80 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -148,7 +148,7 @@ pub use crate::span::{Exporter, ExporterConfig, Protocol}; #[cfg(all(feature = "grpc-sys", not(feature = "tonic")))] pub use crate::span::{Compression, Credentials}; -use opentelemetry::exporter::ExportError; +use opentelemetry::sdk::export::ExportError; use opentelemetry::trace::TraceError; /// Create a new pipeline builder with the recommended configuration. diff --git a/opentelemetry-otlp/src/proto/grpcio/common.rs b/opentelemetry-otlp/src/proto/grpcio/common.rs index 8ce33f65f2..0b5074e6f1 100644 --- a/opentelemetry-otlp/src/proto/grpcio/common.rs +++ b/opentelemetry-otlp/src/proto/grpcio/common.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/metrics.rs b/opentelemetry-otlp/src/proto/grpcio/metrics.rs index 039cb4793b..ec06445e3e 100644 --- a/opentelemetry-otlp/src/proto/grpcio/metrics.rs +++ b/opentelemetry-otlp/src/proto/grpcio/metrics.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs b/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs index 0be8ef0831..933e85df41 100644 --- a/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs +++ b/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/resource.rs b/opentelemetry-otlp/src/proto/grpcio/resource.rs index c59c389023..c07a63d1f6 100644 --- a/opentelemetry-otlp/src/proto/grpcio/resource.rs +++ b/opentelemetry-otlp/src/proto/grpcio/resource.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/trace.rs b/opentelemetry-otlp/src/proto/grpcio/trace.rs index 68325c3e04..0b0577df4e 100644 --- a/opentelemetry-otlp/src/proto/grpcio/trace.rs +++ b/opentelemetry-otlp/src/proto/grpcio/trace.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/trace_config.rs b/opentelemetry-otlp/src/proto/grpcio/trace_config.rs index 0d6f2df5f4..c5a8c4ab16 100644 --- a/opentelemetry-otlp/src/proto/grpcio/trace_config.rs +++ b/opentelemetry-otlp/src/proto/grpcio/trace_config.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/trace_service.rs b/opentelemetry-otlp/src/proto/grpcio/trace_service.rs index 39bffa2040..e9757a38d2 100644 --- a/opentelemetry-otlp/src/proto/grpcio/trace_service.rs +++ b/opentelemetry-otlp/src/proto/grpcio/trace_service.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.18.0. Do not edit +// This file is generated by rust-protobuf 2.18.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_18_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/opentelemetry-proto b/opentelemetry-otlp/src/proto/opentelemetry-proto index 18a2a35e9d..8ab21e9da6 160000 --- a/opentelemetry-otlp/src/proto/opentelemetry-proto +++ b/opentelemetry-otlp/src/proto/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 18a2a35e9d19ba48d577112accdf58a9b83e815b +Subproject commit 8ab21e9da6246e465cd9d50d405561aedef31a1e diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 5dc5507ade..2d9db17248 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -42,10 +42,9 @@ use std::fmt::Debug; #[cfg(all(feature = "grpc-sys", not(feature = "tonic")))] use std::sync::Arc; +use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use std::time::Duration; -use opentelemetry::exporter::trace::{ExportResult, SpanData, SpanExporter}; - /// Exporter that sends data in OTLP format. pub struct Exporter { #[cfg(feature = "tonic")] diff --git a/opentelemetry-otlp/src/transform/traces.rs b/opentelemetry-otlp/src/transform/traces.rs index 562564979d..81907099d4 100644 --- a/opentelemetry-otlp/src/transform/traces.rs +++ b/opentelemetry-otlp/src/transform/traces.rs @@ -16,7 +16,7 @@ use crate::proto::grpcio::trace::{ }; use crate::transform::common::{to_nanos, Attributes}; -use opentelemetry::exporter::trace::SpanData; +use opentelemetry::sdk::export::trace::SpanData; use opentelemetry::trace::{Link, SpanKind, StatusCode}; #[cfg(all(feature = "grpc-sys", not(feature = "tonic")))] diff --git a/opentelemetry-semantic-conventions/src/resource.rs b/opentelemetry-semantic-conventions/src/resource.rs index 243c600f6d..be8abcd6a4 100644 --- a/opentelemetry-semantic-conventions/src/resource.rs +++ b/opentelemetry-semantic-conventions/src/resource.rs @@ -12,7 +12,7 @@ //! use opentelemetry_semantic_conventions as semcov; //! use std::sync::Arc; //! -//! let _tracer = opentelemetry::exporter::trace::stdout::new_pipeline() +//! let _tracer = opentelemetry::sdk::export::trace::stdout::new_pipeline() //! .with_trace_config(sdk::trace::Config { //! resource: Arc::new(sdk::Resource::new(vec![ //! semcov::resource::SERVICE_NAME.string("my-service"), diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index 08ed750ca3..483698ab16 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -39,3 +39,4 @@ thiserror = { version = "1.0"} [dev-dependencies] isahc = "=0.9.6" +opentelemetry = { version = "0.10", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } diff --git a/opentelemetry-zipkin/README.md b/opentelemetry-zipkin/README.md index a50052ec3e..933f3aec0b 100644 --- a/opentelemetry-zipkin/README.md +++ b/opentelemetry-zipkin/README.md @@ -43,8 +43,10 @@ telemetry: ```rust use opentelemetry::trace::Tracer; +use opentelemetry::global; fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline().install()?; tracer.in_span("doing_work", |cx| { @@ -97,7 +99,8 @@ Example showing how to override all configuration options. See the ```rust use opentelemetry::{KeyValue, trace::Tracer}; use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; -use opentelemetry::exporter::trace::{ExportResult, HttpClient}; +use opentelemetry::sdk::export::trace::{ExportResult, HttpClient}; +use opentelemetry::global; use async_trait::async_trait; use std::error::Error; @@ -121,6 +124,7 @@ impl HttpClient for IsahcClient { } fn main() -> Result<(), Box> { + global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline() .with_http_client(IsahcClient(isahc::HttpClient::new()?)) .with_service_name("my_app") diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs new file mode 100644 index 0000000000..e8fc6e23d2 --- /dev/null +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -0,0 +1,189 @@ +mod model; +mod uploader; + +use async_trait::async_trait; +use http::Uri; +use model::endpoint::Endpoint; +use opentelemetry::{ + global, sdk, + sdk::export::{ + trace::{self, HttpClient}, + ExportError, + }, + trace::{TraceError, TracerProvider}, +}; +use std::net::SocketAddr; + +/// Default Zipkin collector endpoint +const DEFAULT_COLLECTOR_ENDPOINT: &str = "http://127.0.0.1:9411/api/v2/spans"; + +/// Default service name if no service is configured. +const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; + +/// Zipkin span exporter +#[derive(Debug)] +pub struct Exporter { + local_endpoint: Endpoint, + uploader: uploader::Uploader, +} + +impl Exporter { + fn new(local_endpoint: Endpoint, client: Box, collector_endpoint: Uri) -> Self { + Exporter { + local_endpoint, + uploader: uploader::Uploader::new(client, collector_endpoint), + } + } +} + +/// Create a new Zipkin exporter pipeline builder. +pub fn new_pipeline() -> ZipkinPipelineBuilder { + ZipkinPipelineBuilder::default() +} + +/// Builder for `ExporterConfig` struct. +#[derive(Debug)] +pub struct ZipkinPipelineBuilder { + service_name: String, + service_addr: Option, + collector_endpoint: String, + trace_config: Option, + client: Option>, +} + +impl Default for ZipkinPipelineBuilder { + fn default() -> Self { + ZipkinPipelineBuilder { + #[cfg(feature = "reqwest-blocking-client")] + client: Some(Box::new(reqwest::blocking::Client::new())), + #[cfg(all( + not(feature = "reqwest-blocking-client"), + not(feature = "surf-client"), + feature = "reqwest-client" + ))] + client: Some(Box::new(reqwest::Client::new())), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "surf-client" + ))] + client: Some(Box::new(surf::Client::new())), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "surf-client"), + not(feature = "reqwest-blocking-client") + ))] + client: None, + + service_name: DEFAULT_SERVICE_NAME.to_string(), + service_addr: None, + collector_endpoint: DEFAULT_COLLECTOR_ENDPOINT.to_string(), + trace_config: None, + } + } +} + +impl ZipkinPipelineBuilder { + /// Create `ExporterConfig` struct from current `ExporterConfigBuilder` + pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { + if let Some(client) = self.client { + let endpoint = Endpoint::new(self.service_name, self.service_addr); + let exporter = Exporter::new( + endpoint, + client, + self.collector_endpoint + .parse() + .map_err::(Into::into)?, + ); + + let mut provider_builder = + sdk::trace::TracerProvider::builder().with_exporter(exporter); + if let Some(config) = self.trace_config.take() { + provider_builder = provider_builder.with_config(config); + } + let provider = provider_builder.build(); + let tracer = + provider.get_tracer("opentelemetry-zipkin", Some(env!("CARGO_PKG_VERSION"))); + let provider_guard = global::set_tracer_provider(provider); + + Ok((tracer, Uninstall(provider_guard))) + } else { + Err(Error::NoHttpClient.into()) + } + } + + /// Assign the service name under which to group traces. + pub fn with_service_name>(mut self, name: T) -> Self { + self.service_name = name.into(); + self + } + + /// Assign client implementation + pub fn with_http_client(mut self, client: T) -> Self { + self.client = Some(Box::new(client)); + self + } + + /// Assign the service name under which to group traces. + pub fn with_service_address(mut self, addr: SocketAddr) -> Self { + self.service_addr = Some(addr); + self + } + + /// Assign the Zipkin collector endpoint + pub fn with_collector_endpoint>(mut self, endpoint: T) -> Self { + self.collector_endpoint = endpoint.into(); + self + } + + /// Assign the SDK trace configuration. + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.trace_config = Some(config); + self + } +} + +#[async_trait] +impl trace::SpanExporter for Exporter { + /// Export spans to Zipkin collector. + async fn export(&mut self, batch: Vec) -> trace::ExportResult { + let zipkin_spans = batch + .into_iter() + .map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span)) + .collect(); + + self.uploader.upload(zipkin_spans).await + } +} + +/// Uninstalls the Zipkin pipeline on drop. +#[must_use] +#[derive(Debug)] +pub struct Uninstall(global::TracerProviderGuard); + +/// Wrap type for errors from opentelemetry zipkin +#[derive(thiserror::Error, Debug)] +#[non_exhaustive] +pub enum Error { + /// No http client implementation found. User should provide one or enable features. + #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")] + NoHttpClient, + + /// Http requests failed + #[error("http request failed with {0}")] + RequestFailed(#[from] http::Error), + + /// The uri provided is invalid + #[error("invalid uri")] + InvalidUri(#[from] http::uri::InvalidUri), + + /// Other errors + #[error("export error: {0}")] + Other(String), +} + +impl ExportError for Error { + fn exporter_name(&self) -> &'static str { + "zipkin" + } +} diff --git a/opentelemetry-zipkin/src/model/annotation.rs b/opentelemetry-zipkin/src/exporter/model/annotation.rs similarity index 94% rename from opentelemetry-zipkin/src/model/annotation.rs rename to opentelemetry-zipkin/src/exporter/model/annotation.rs index 451aa03057..2f7bbb2f4c 100644 --- a/opentelemetry-zipkin/src/model/annotation.rs +++ b/opentelemetry-zipkin/src/exporter/model/annotation.rs @@ -13,7 +13,7 @@ pub struct Annotation { #[cfg(test)] mod tests { - use crate::model::annotation::Annotation; + use crate::exporter::model::annotation::Annotation; #[test] fn test_empty() { diff --git a/opentelemetry-zipkin/src/model/endpoint.rs b/opentelemetry-zipkin/src/exporter/model/endpoint.rs similarity index 97% rename from opentelemetry-zipkin/src/model/endpoint.rs rename to opentelemetry-zipkin/src/exporter/model/endpoint.rs index 66612057b2..a70e4feb3f 100644 --- a/opentelemetry-zipkin/src/model/endpoint.rs +++ b/opentelemetry-zipkin/src/exporter/model/endpoint.rs @@ -38,7 +38,7 @@ impl Endpoint { #[cfg(test)] mod tests { - use crate::model::endpoint::Endpoint; + use crate::exporter::model::endpoint::Endpoint; use std::net::Ipv4Addr; #[test] diff --git a/opentelemetry-zipkin/src/model/mod.rs b/opentelemetry-zipkin/src/exporter/model/mod.rs similarity index 99% rename from opentelemetry-zipkin/src/model/mod.rs rename to opentelemetry-zipkin/src/exporter/model/mod.rs index bae383276d..66d2221e6b 100644 --- a/opentelemetry-zipkin/src/model/mod.rs +++ b/opentelemetry-zipkin/src/exporter/model/mod.rs @@ -1,5 +1,5 @@ use opentelemetry::{ - exporter::trace, + sdk::export::trace, trace::{Event, SpanKind, StatusCode}, Key, KeyValue, }; diff --git a/opentelemetry-zipkin/src/model/span.rs b/opentelemetry-zipkin/src/exporter/model/span.rs similarity index 94% rename from opentelemetry-zipkin/src/model/span.rs rename to opentelemetry-zipkin/src/exporter/model/span.rs index 3f34efe03e..55e9825b46 100644 --- a/opentelemetry-zipkin/src/model/span.rs +++ b/opentelemetry-zipkin/src/exporter/model/span.rs @@ -1,4 +1,4 @@ -use crate::model::{annotation::Annotation, endpoint::Endpoint}; +use crate::exporter::model::{annotation::Annotation, endpoint::Endpoint}; use serde::Serialize; use std::collections::HashMap; @@ -55,9 +55,9 @@ pub(crate) struct Span { #[cfg(test)] mod tests { - use crate::model::annotation::Annotation; - use crate::model::endpoint::Endpoint; - use crate::model::span::{Kind, Span}; + use crate::exporter::model::annotation::Annotation; + use crate::exporter::model::endpoint::Endpoint; + use crate::exporter::model::span::{Kind, Span}; use std::collections::HashMap; use std::net::Ipv4Addr; diff --git a/opentelemetry-zipkin/src/uploader.rs b/opentelemetry-zipkin/src/exporter/uploader.rs similarity index 89% rename from opentelemetry-zipkin/src/uploader.rs rename to opentelemetry-zipkin/src/exporter/uploader.rs index d952f595cc..27751e3a12 100644 --- a/opentelemetry-zipkin/src/uploader.rs +++ b/opentelemetry-zipkin/src/exporter/uploader.rs @@ -1,8 +1,8 @@ //! # Zipkin Span Exporter -use crate::model::span::Span; -use crate::Error; +use crate::exporter::model::span::Span; +use crate::exporter::Error; use http::{header::CONTENT_TYPE, Method, Request, Uri}; -use opentelemetry::exporter::trace::{ExportResult, HttpClient}; +use opentelemetry::sdk::export::trace::{ExportResult, HttpClient}; use std::fmt::Debug; #[derive(Debug)] diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 17ede8ed00..737359f786 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -22,8 +22,10 @@ //! //! ```no_run //! use opentelemetry::trace::{Tracer, TraceError}; +//! use opentelemetry::global; //! //! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline().install()?; //! //! tracer.in_span("doing_work", |cx| { @@ -77,7 +79,8 @@ //! ```no_run //! use opentelemetry::{KeyValue, trace::Tracer}; //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; -//! use opentelemetry::exporter::trace::{ExportResult, HttpClient}; +//! use opentelemetry::sdk::export::trace::{ExportResult, HttpClient}; +//! use opentelemetry::global; //! use async_trait::async_trait; //! use std::error::Error; //! @@ -101,6 +104,7 @@ //! } //! //! fn main() -> Result<(), Box> { +//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline() //! .with_http_client(IsahcClient(isahc::HttpClient::new()?)) //! .with_service_name("my_app") @@ -165,192 +169,8 @@ #[macro_use] extern crate typed_builder; -mod model; -mod uploader; +mod exporter; +mod propagator; -use async_trait::async_trait; -use http::Uri; -use model::endpoint::Endpoint; -use opentelemetry::{ - exporter::{ - trace::{self, HttpClient}, - ExportError, - }, - global, sdk, - trace::{TraceError, TracerProvider}, -}; -use std::net::SocketAddr; - -/// Default Zipkin collector endpoint -const DEFAULT_COLLECTOR_ENDPOINT: &str = "http://127.0.0.1:9411/api/v2/spans"; - -/// Default service name if no service is configured. -const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry"; - -/// Zipkin span exporter -#[derive(Debug)] -pub struct Exporter { - local_endpoint: Endpoint, - uploader: uploader::Uploader, -} - -impl Exporter { - fn new(local_endpoint: Endpoint, client: Box, collector_endpoint: Uri) -> Self { - Exporter { - local_endpoint, - uploader: uploader::Uploader::new(client, collector_endpoint), - } - } -} - -/// Create a new Zipkin exporter pipeline builder. -pub fn new_pipeline() -> ZipkinPipelineBuilder { - ZipkinPipelineBuilder::default() -} - -/// Builder for `ExporterConfig` struct. -#[derive(Debug)] -pub struct ZipkinPipelineBuilder { - service_name: String, - service_addr: Option, - collector_endpoint: String, - trace_config: Option, - client: Option>, -} - -impl Default for ZipkinPipelineBuilder { - fn default() -> Self { - ZipkinPipelineBuilder { - #[cfg(feature = "reqwest-blocking-client")] - client: Some(Box::new(reqwest::blocking::Client::new())), - #[cfg(all( - not(feature = "reqwest-blocking-client"), - not(feature = "surf-client"), - feature = "reqwest-client" - ))] - client: Some(Box::new(reqwest::Client::new())), - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client"), - feature = "surf-client" - ))] - client: Some(Box::new(surf::Client::new())), - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "surf-client"), - not(feature = "reqwest-blocking-client") - ))] - client: None, - - service_name: DEFAULT_SERVICE_NAME.to_string(), - service_addr: None, - collector_endpoint: DEFAULT_COLLECTOR_ENDPOINT.to_string(), - trace_config: None, - } - } -} - -impl ZipkinPipelineBuilder { - /// Create `ExporterConfig` struct from current `ExporterConfigBuilder` - pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> { - if let Some(client) = self.client { - let endpoint = Endpoint::new(self.service_name, self.service_addr); - let exporter = Exporter::new( - endpoint, - client, - self.collector_endpoint - .parse() - .map_err::(Into::into)?, - ); - - let mut provider_builder = - sdk::trace::TracerProvider::builder().with_exporter(exporter); - if let Some(config) = self.trace_config.take() { - provider_builder = provider_builder.with_config(config); - } - let provider = provider_builder.build(); - let tracer = - provider.get_tracer("opentelemetry-zipkin", Some(env!("CARGO_PKG_VERSION"))); - let provider_guard = global::set_tracer_provider(provider); - - Ok((tracer, Uninstall(provider_guard))) - } else { - Err(Error::NoHttpClient.into()) - } - } - - /// Assign the service name under which to group traces. - pub fn with_service_name>(mut self, name: T) -> Self { - self.service_name = name.into(); - self - } - - /// Assign client implementation - pub fn with_http_client(mut self, client: T) -> Self { - self.client = Some(Box::new(client)); - self - } - - /// Assign the service name under which to group traces. - pub fn with_service_address(mut self, addr: SocketAddr) -> Self { - self.service_addr = Some(addr); - self - } - - /// Assign the Zipkin collector endpoint - pub fn with_collector_endpoint>(mut self, endpoint: T) -> Self { - self.collector_endpoint = endpoint.into(); - self - } - - /// Assign the SDK trace configuration. - pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { - self.trace_config = Some(config); - self - } -} - -#[async_trait] -impl trace::SpanExporter for Exporter { - /// Export spans to Zipkin collector. - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let zipkin_spans = batch - .into_iter() - .map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span)) - .collect(); - - self.uploader.upload(zipkin_spans).await - } -} - -/// Uninstalls the Zipkin pipeline on drop. -#[must_use] -#[derive(Debug)] -pub struct Uninstall(global::TracerProviderGuard); - -/// Wrap type for errors from opentelemetry zipkin -#[derive(thiserror::Error, Debug)] -#[non_exhaustive] -pub enum Error { - /// No http client implementation found. User should provide one or enable features. - #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")] - NoHttpClient, - - /// Http requests failed - #[error("http request failed with {0}")] - RequestFailed(#[from] http::Error), - - /// The uri provided is invalid - #[error("invalid uri")] - InvalidUri(#[from] http::uri::InvalidUri), - - /// Other errors - #[error("export error: {0}")] - Other(String), -} - -impl ExportError for Error { - fn exporter_name(&self) -> &'static str { - "zipkin" - } -} +pub use exporter::{new_pipeline, Error, Exporter, Uninstall, ZipkinPipelineBuilder}; +pub use propagator::{B3Encoding, Propagator}; diff --git a/opentelemetry/src/sdk/propagation/b3.rs b/opentelemetry-zipkin/src/propagator/mod.rs similarity index 96% rename from opentelemetry/src/sdk/propagation/b3.rs rename to opentelemetry-zipkin/src/propagator/mod.rs index d5224a8ef4..9a3f2158c3 100644 --- a/opentelemetry/src/sdk/propagation/b3.rs +++ b/opentelemetry-zipkin/src/propagator/mod.rs @@ -13,7 +13,7 @@ //! //! If `inject_encoding` is set to `B3Encoding::SingleHeader` then `b3` header is used to inject //! and extract. Otherwise, separate headers are used to inject and extract. -use crate::{ +use opentelemetry::{ propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator}, trace::{ SpanContext, SpanId, TraceContextExt, TraceId, TraceState, TRACE_FLAG_DEBUG, @@ -65,27 +65,27 @@ impl B3Encoding { /// Extracts and injects `SpanContext`s into `Extractor`s or `Injector`s using B3 header format. #[derive(Clone, Debug)] -pub struct B3Propagator { +pub struct Propagator { inject_encoding: B3Encoding, } -impl Default for B3Propagator { +impl Default for Propagator { fn default() -> Self { - B3Propagator { + Propagator { inject_encoding: B3Encoding::MultipleHeader, } } } -impl B3Propagator { +impl Propagator { /// Create a new `HttpB3Propagator` that uses multiple headers. pub fn new() -> Self { - B3Propagator::default() + Propagator::default() } /// Create a new `HttpB3Propagator` that uses `encoding` as encoding method pub fn with_encoding(encoding: B3Encoding) -> Self { - B3Propagator { + Propagator { inject_encoding: encoding, } } @@ -207,7 +207,7 @@ impl B3Propagator { } } -impl TextMapPropagator for B3Propagator { +impl TextMapPropagator for Propagator { /// Properly encodes the values of the `Context`'s `SpanContext` and injects /// them into the `Injector`. fn inject_context(&self, context: &Context, injector: &mut dyn Injector) { @@ -302,9 +302,9 @@ impl TextMapPropagator for B3Propagator { #[cfg(test)] mod tests { use super::*; - use crate::testing::trace::TestSpan; - use crate::{ + use opentelemetry::{ propagation::TextMapPropagator, + testing::trace::TestSpan, trace::{ SpanContext, SpanId, TraceId, TRACE_FLAG_DEBUG, TRACE_FLAG_DEFERRED, TRACE_FLAG_NOT_SAMPLED, TRACE_FLAG_SAMPLED, @@ -466,10 +466,10 @@ mod tests { #[test] fn extract_b3() { - let single_header_propagator = B3Propagator::with_encoding(B3Encoding::SingleHeader); - let multi_header_propagator = B3Propagator::with_encoding(B3Encoding::MultipleHeader); - let single_multi_propagator = B3Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); - let unspecific_header_propagator = B3Propagator::with_encoding(B3Encoding::UnSpecified); + let single_header_propagator = Propagator::with_encoding(B3Encoding::SingleHeader); + let multi_header_propagator = Propagator::with_encoding(B3Encoding::MultipleHeader); + let single_multi_propagator = Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); + let unspecific_header_propagator = Propagator::with_encoding(B3Encoding::UnSpecified); for (header, expected_context) in single_header_extract_data() { let mut extractor: HashMap = HashMap::new(); @@ -552,11 +552,11 @@ mod tests { #[test] fn inject_b3() { - let single_header_propagator = B3Propagator::with_encoding(B3Encoding::SingleHeader); - let multi_header_propagator = B3Propagator::with_encoding(B3Encoding::MultipleHeader); + let single_header_propagator = Propagator::with_encoding(B3Encoding::SingleHeader); + let multi_header_propagator = Propagator::with_encoding(B3Encoding::MultipleHeader); let single_multi_header_propagator = - B3Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); - let unspecified_header_propagator = B3Propagator::with_encoding(B3Encoding::UnSpecified); + Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); + let unspecified_header_propagator = Propagator::with_encoding(B3Encoding::UnSpecified); for (expected_header, context) in single_header_inject_data() { let mut injector = HashMap::new(); @@ -645,10 +645,10 @@ mod tests { #[test] fn test_get_fields() { - let single_header_propagator = B3Propagator::with_encoding(B3Encoding::SingleHeader); - let multi_header_propagator = B3Propagator::with_encoding(B3Encoding::MultipleHeader); + let single_header_propagator = Propagator::with_encoding(B3Encoding::SingleHeader); + let multi_header_propagator = Propagator::with_encoding(B3Encoding::MultipleHeader); let single_multi_header_propagator = - B3Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); + Propagator::with_encoding(B3Encoding::SingleAndMultiHeader); assert_eq!( single_header_propagator.fields().collect::>(), diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 82338977d2..dc4f8a93f6 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -53,6 +53,7 @@ default = ["trace"] trace = ["rand", "pin-project", "async-trait", "regex", "percent-encoding", "thiserror"] metrics = ["thiserror", "dashmap", "fnv"] serialize = ["serde", "bincode"] +testing = ["trace", "metrics", "tokio/full" ] [[bench]] name = "trace" diff --git a/opentelemetry/README.md b/opentelemetry/README.md index 50dfdc15c6..17d08ee9f5 100644 --- a/opentelemetry/README.md +++ b/opentelemetry/README.md @@ -33,7 +33,7 @@ observability tools. ## Getting Started ```rust -use opentelemetry::{exporter::trace::stdout, trace::Tracer}; +use opentelemetry::{sdk::export::trace::stdout, trace::Tracer}; fn main() -> Result<(), Box> { // Create a new instrumentation pipeline diff --git a/opentelemetry/src/api/metrics/mod.rs b/opentelemetry/src/api/metrics/mod.rs index 7e3dc5927a..c72d731fb1 100644 --- a/opentelemetry/src/api/metrics/mod.rs +++ b/opentelemetry/src/api/metrics/mod.rs @@ -19,7 +19,7 @@ mod sync_instrument; mod up_down_counter; mod value_recorder; -use crate::exporter::ExportError; +use crate::sdk::export::ExportError; pub use async_instrument::{AsyncRunner, BatchObserverCallback, Observation, ObserverResult}; pub use config::InstrumentConfig; pub use counter::{BoundCounter, Counter, CounterBuilder}; diff --git a/opentelemetry/src/api/trace/mod.rs b/opentelemetry/src/api/trace/mod.rs index 161c85fea1..46610f51ea 100644 --- a/opentelemetry/src/api/trace/mod.rs +++ b/opentelemetry/src/api/trace/mod.rs @@ -139,7 +139,7 @@ pub use self::{ }, tracer::{SpanBuilder, Tracer}, }; -use crate::exporter::ExportError; +use crate::sdk::export::ExportError; use std::time; /// Describe the result of operations in tracing API. diff --git a/opentelemetry/src/api/trace/noop.rs b/opentelemetry/src/api/trace/noop.rs index 8350dabdf2..66812c9c91 100644 --- a/opentelemetry/src/api/trace/noop.rs +++ b/opentelemetry/src/api/trace/noop.rs @@ -4,7 +4,7 @@ //! has been set. It is also useful for testing purposes as it is intended //! to have minimal resource utilization and runtime impact. use crate::{ - exporter::trace::{ExportResult, SpanData, SpanExporter}, + sdk::export::trace::{ExportResult, SpanData, SpanExporter}, trace, trace::{TraceContextExt, TraceState}, Context, KeyValue, diff --git a/opentelemetry/src/exporter/metrics/mod.rs b/opentelemetry/src/exporter/metrics/mod.rs deleted file mode 100644 index c2be1777f3..0000000000 --- a/opentelemetry/src/exporter/metrics/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Metric exporters - -pub mod stdout; -pub use stdout::stdout; diff --git a/opentelemetry/src/exporter/mod.rs b/opentelemetry/src/exporter/mod.rs deleted file mode 100644 index 7ded6fa9be..0000000000 --- a/opentelemetry/src/exporter/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -//! # OpenTelemetry data exporters -//! -//! There are two main types of exports: -//! -//! - Span exporters, which export instances of `Span`. -//! - Measurement exporters, which export data collected by `Meter` instances. -//! -//! Exporters define the interface that protocol-specific exporters must -//! implement so that they can be plugged into OpenTelemetry SDK and support -//! sending of telemetry data. -#[cfg(feature = "metrics")] -#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))] -pub mod metrics; -#[cfg(feature = "trace")] -#[cfg_attr(docsrs, doc(cfg(feature = "trace")))] -pub mod trace; - -/// Marker trait for errors returned by exporters -pub trait ExportError: std::error::Error + Send + Sync + 'static { - /// The name of exporter that returned this error - fn exporter_name(&self) -> &'static str; -} diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index 7318b656f2..b54ead2a85 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -58,7 +58,7 @@ //! } //! ``` //! -//! [installing a trace pipeline]: crate::exporter::trace::stdout::PipelineBuilder::install +//! [installing a trace pipeline]: crate::sdk::export::trace::stdout::PipelineBuilder::install //! [`TracerProvider`]: crate::trace::TracerProvider //! [`Span`]: crate::trace::Span //! @@ -126,7 +126,7 @@ //! # } //! ``` //! -//! [installing a metrics pipeline]: crate::exporter::metrics::stdout::StdoutExporterBuilder::try_init +//! [installing a metrics pipeline]: crate::sdk::export::metrics::stdout::StdoutExporterBuilder::try_init //! [`MeterProvider`]: crate::metrics::MeterProvider mod error_handler; diff --git a/opentelemetry/src/lib.rs b/opentelemetry/src/lib.rs index 19ea431502..32fef323ad 100644 --- a/opentelemetry/src/lib.rs +++ b/opentelemetry/src/lib.rs @@ -13,7 +13,7 @@ //! ## Getting Started //! //! ```no_run -//! use opentelemetry::{exporter::trace::stdout, trace::Tracer}; +//! use opentelemetry::{sdk::export::trace::stdout, trace::Tracer}; //! //! fn main() -> Result<(), Box> { //! // Create a new instrumentation pipeline @@ -60,7 +60,7 @@ //! [serde]: https://crates.io/crates/serde //! [http]: https://crates.io/crates/http //! [tonic]: https://crates.io/crates/tonic -//! [`HttpClient`]: crate::exporter::trace::HttpClient +//! [`HttpClient`]: crate::sdk::export::trace::HttpClient //! [reqwest]: https://crates.io/crates/reqwest //! [surf]: https://crates.io/crates/surf //! @@ -158,11 +158,11 @@ #![cfg_attr(test, deny(warnings))] mod api; -pub mod exporter; pub mod global; pub mod sdk; -#[cfg(test)] +#[cfg(feature = "testing")] +#[allow(missing_docs)] pub mod testing; #[cfg(feature = "metrics")] diff --git a/opentelemetry/src/sdk/export/metrics/mod.rs b/opentelemetry/src/sdk/export/metrics/mod.rs index 145ed3ce42..544cf9cfc4 100644 --- a/opentelemetry/src/sdk/export/metrics/mod.rs +++ b/opentelemetry/src/sdk/export/metrics/mod.rs @@ -10,11 +10,13 @@ use std::sync::Arc; use std::time::SystemTime; mod aggregation; +pub mod stdout; pub use aggregation::{ Buckets, Count, Distribution, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Quantile, Sum, }; +pub use stdout::stdout; /// Processor is responsible for deciding which kind of aggregation to use (via /// `aggregation_selector`), gathering exported results from the SDK during diff --git a/opentelemetry/src/exporter/metrics/stdout.rs b/opentelemetry/src/sdk/export/metrics/stdout.rs similarity index 100% rename from opentelemetry/src/exporter/metrics/stdout.rs rename to opentelemetry/src/sdk/export/metrics/stdout.rs diff --git a/opentelemetry/src/sdk/export/mod.rs b/opentelemetry/src/sdk/export/mod.rs index 465e532238..a74f30cb9f 100644 --- a/opentelemetry/src/sdk/export/mod.rs +++ b/opentelemetry/src/sdk/export/mod.rs @@ -2,3 +2,12 @@ #[cfg(feature = "metrics")] #[cfg_attr(docsrs, doc(cfg(feature = "metrics")))] pub mod metrics; +#[cfg(feature = "trace")] +#[cfg_attr(docsrs, doc(cfg(feature = "trace")))] +pub mod trace; + +/// Marker trait for errors returned by exporters +pub trait ExportError: std::error::Error + Send + Sync + 'static { + /// The name of exporter that returned this error + fn exporter_name(&self) -> &'static str; +} diff --git a/opentelemetry/src/exporter/trace/mod.rs b/opentelemetry/src/sdk/export/trace/mod.rs similarity index 99% rename from opentelemetry/src/exporter/trace/mod.rs rename to opentelemetry/src/sdk/export/trace/mod.rs index 5e16062e17..097185f6f2 100644 --- a/opentelemetry/src/exporter/trace/mod.rs +++ b/opentelemetry/src/sdk/export/trace/mod.rs @@ -1,7 +1,7 @@ //! Trace exporters use crate::api::trace::TraceError; #[cfg(feature = "http")] -use crate::exporter::ExportError; +use crate::sdk::export::ExportError; use crate::{ sdk, trace::{Event, Link, SpanContext, SpanId, SpanKind, StatusCode}, diff --git a/opentelemetry/src/exporter/trace/stdout.rs b/opentelemetry/src/sdk/export/trace/stdout.rs similarity index 98% rename from opentelemetry/src/exporter/trace/stdout.rs rename to opentelemetry/src/sdk/export/trace/stdout.rs index 7fcdb2719e..f6f281abe6 100644 --- a/opentelemetry/src/exporter/trace/stdout.rs +++ b/opentelemetry/src/sdk/export/trace/stdout.rs @@ -12,7 +12,7 @@ //! //! ```no_run //! use opentelemetry::trace::Tracer; -//! use opentelemetry::exporter::trace::stdout; +//! use opentelemetry::sdk::export::trace::stdout; //! //! fn main() { //! let (tracer, _uninstall) = stdout::new_pipeline() @@ -25,11 +25,11 @@ //! } //! ``` use crate::{ - exporter::{ + global, sdk, + sdk::export::{ trace::{ExportResult, SpanData, SpanExporter}, ExportError, }, - global, sdk, trace::TracerProvider, }; use async_trait::async_trait; diff --git a/opentelemetry/src/sdk/propagation/mod.rs b/opentelemetry/src/sdk/propagation/mod.rs index 53a137aff6..d0b88076d4 100644 --- a/opentelemetry/src/sdk/propagation/mod.rs +++ b/opentelemetry/src/sdk/propagation/mod.rs @@ -1,14 +1,8 @@ //! OpenTelemetry Propagators -mod aws; -mod b3; mod baggage; mod composite; -mod jaeger; mod trace_context; -pub use aws::XrayPropagator; -pub use b3::{B3Encoding, B3Propagator}; pub use baggage::BaggagePropagator; pub use composite::TextMapCompositePropagator; -pub use jaeger::JaegerPropagator; pub use trace_context::TraceContextPropagator; diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index 146316e78c..f57d5cc510 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -8,7 +8,7 @@ //! propagators) are provided by the `TracerProvider`. `Tracer` instances do //! not duplicate this data to avoid that different `Tracer` instances //! of the `TracerProvider` have different versions of these data. -use crate::exporter::trace::SpanExporter; +use crate::sdk::export::trace::SpanExporter; use crate::{sdk, sdk::trace::SpanProcessor}; use std::sync::Arc; diff --git a/opentelemetry/src/sdk/trace/span.rs b/opentelemetry/src/sdk/trace/span.rs index 7625062986..8fc8e042f4 100644 --- a/opentelemetry/src/sdk/trace/span.rs +++ b/opentelemetry/src/sdk/trace/span.rs @@ -9,7 +9,7 @@ //! is possible to change its name, set its `Attributes`, and add `Links` and `Events`. //! These cannot be changed after the `Span`'s end time has been set. use crate::trace::{Event, SpanContext, SpanId, SpanKind, StatusCode}; -use crate::{api, exporter, sdk, KeyValue}; +use crate::{api, sdk, KeyValue}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -200,8 +200,8 @@ fn build_export_data( data: SpanData, span_context: SpanContext, tracer: &sdk::trace::Tracer, -) -> exporter::trace::SpanData { - exporter::trace::SpanData { +) -> sdk::export::trace::SpanData { + sdk::export::trace::SpanData { span_context, parent_span_id: data.parent_span_id, span_kind: data.span_kind, diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 43dbe41606..c69b63b072 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -44,7 +44,7 @@ use crate::api::trace::{TraceError, TraceResult}; use crate::global; use crate::sdk::trace::Span; use crate::{ - exporter::trace::{ExportResult, SpanData, SpanExporter}, + sdk::export::trace::{ExportResult, SpanData, SpanExporter}, Context, }; @@ -569,7 +569,7 @@ mod tests { use async_trait::async_trait; - use crate::exporter::trace::{stdout, ExportResult, SpanData, SpanExporter}; + use crate::sdk::export::trace::{stdout, ExportResult, SpanData, SpanExporter}; use crate::sdk::trace::span_processor::OTEL_BSP_EXPORT_TIMEOUT_MILLIS; use crate::sdk::trace::BatchConfig; use crate::testing::trace::{ diff --git a/opentelemetry/src/testing/trace.rs b/opentelemetry/src/testing/trace.rs index ea74b2e019..b6f4caa83a 100644 --- a/opentelemetry/src/testing/trace.rs +++ b/opentelemetry/src/testing/trace.rs @@ -1,6 +1,6 @@ use crate::{ - exporter::{ - trace::{self as exporter, ExportResult, SpanData, SpanExporter}, + sdk::export::{ + trace::{ExportResult, SpanData, SpanExporter}, ExportError, }, sdk::{ @@ -37,9 +37,9 @@ impl Span for TestSpan { fn end_with_timestamp(&self, _timestamp: std::time::SystemTime) {} } -pub fn new_test_export_span_data() -> exporter::SpanData { +pub fn new_test_export_span_data() -> SpanData { let config = Config::default(); - exporter::SpanData { + SpanData { span_context: SpanContext::empty_context(), parent_span_id: SpanId::from_u64(0), span_kind: SpanKind::Internal, @@ -58,13 +58,13 @@ pub fn new_test_export_span_data() -> exporter::SpanData { #[derive(Debug)] pub struct TestSpanExporter { - tx_export: Sender, + tx_export: Sender, tx_shutdown: Sender<()>, } #[async_trait] impl SpanExporter for TestSpanExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { + async fn export(&mut self, batch: Vec) -> ExportResult { for span_data in batch { self.tx_export .send(span_data) @@ -78,7 +78,7 @@ impl SpanExporter for TestSpanExporter { } } -pub fn new_test_exporter() -> (TestSpanExporter, Receiver, Receiver<()>) { +pub fn new_test_exporter() -> (TestSpanExporter, Receiver, Receiver<()>) { let (tx_export, rx_export) = channel(); let (tx_shutdown, rx_shutdown) = channel(); let exporter = TestSpanExporter { @@ -90,7 +90,7 @@ pub fn new_test_exporter() -> (TestSpanExporter, Receiver, R #[derive(Debug)] pub struct TokioSpanExporter { - tx_export: tokio::sync::mpsc::UnboundedSender, + tx_export: tokio::sync::mpsc::UnboundedSender, tx_shutdown: tokio::sync::mpsc::UnboundedSender<()>, } @@ -112,7 +112,7 @@ impl SpanExporter for TokioSpanExporter { pub fn new_tokio_test_exporter() -> ( TokioSpanExporter, - tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedReceiver<()>, ) { let (tx_export, rx_export) = tokio::sync::mpsc::unbounded_channel();