diff --git a/Cargo.toml b/Cargo.toml index a595369927..b7eaef93f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "opentelemetry-datadog", "opentelemetry-dynatrace", "opentelemetry-http", -# "opentelemetry-jaeger", + "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry-proto", @@ -16,24 +16,24 @@ members = [ "opentelemetry-stackdriver", "opentelemetry-zipkin", "opentelemetry-zpages", -# "examples/actix-http", -# "examples/actix-http-tracing", -# "examples/actix-udp", -# "examples/async", + "examples/actix-http", + "examples/actix-http-tracing", + "examples/actix-udp", + "examples/async", "examples/aws-xray", -# "examples/basic", + "examples/basic", "examples/basic-otlp", "examples/basic-otlp-with-selector", "examples/basic-otlp-http", "examples/datadog", "examples/dynatrace", "examples/external-otlp-tonic-tokio", -# "examples/grpc", + "examples/grpc", "examples/http", "examples/hyper-prometheus", -# "examples/tracing-grpc", + "examples/tracing-grpc", "examples/zipkin", -# "examples/multiple-span-processors", + "examples/multiple-span-processors", "examples/zpages" ] exclude = ["examples/external-otlp-grpcio-async-std"] diff --git a/examples/actix-udp/src/main.rs b/examples/actix-udp/src/main.rs index 2a741b92a1..cac9801fb0 100644 --- a/examples/actix-udp/src/main.rs +++ b/examples/actix-udp/src/main.rs @@ -1,14 +1,16 @@ use actix_service::Service; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk::trace as sdktrace}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, Key, }; +use opentelemetry_jaeger::JaegerTraceRuntime; -fn init_tracer() -> Result { +fn init_tracer(runtime: R) -> Result { opentelemetry_jaeger::new_agent_pipeline() .with_endpoint("localhost:6831") .with_service_name("trace-udp-demo") @@ -19,7 +21,7 @@ fn init_tracer() -> Result { opentelemetry::KeyValue::new("exporter", "jaeger"), ]), )) - .install_simple() + .install_simple(runtime) } async fn index() -> &'static str { @@ -34,7 +36,7 @@ async fn index() -> &'static str { async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); - let _tracer = init_tracer().expect("Failed to initialise tracer."); + let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer."); HttpServer::new(|| { App::new() diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index df2e65ab14..51668cd9b8 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -3,6 +3,7 @@ use hello_world::HelloRequest; use opentelemetry::global; use opentelemetry::global::shutdown_tracer_provider; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceResult; use opentelemetry::{ propagation::Injector, @@ -10,6 +11,7 @@ use opentelemetry::{ trace::{TraceContextExt, Tracer as _}, Context, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); @@ -28,16 +30,16 @@ pub mod hello_world { tonic::include_proto!("helloworld"); } -fn tracing_init() -> TraceResult { +fn tracing_init(runtime: R) -> TraceResult { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-client") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let tracer = tracing_init()?; + let tracer = tracing_init(Tokio)?; let mut client = GreeterClient::connect("http://[::1]:50051").await?; let span = tracer.start("client-request"); let cx = Context::current_with_span(span); diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index b4f02b5e2e..a454e5f7d8 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -4,12 +4,14 @@ use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; use opentelemetry::global; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{ propagation::Extractor, trace::{Span, Tracer}, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; use std::error::Error; pub mod hello_world { @@ -59,16 +61,16 @@ impl Greeter for MyGreeter { } } -fn tracing_init() -> Result { +fn tracing_init(runtime: R) -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-server") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let _tracer = tracing_init()?; + let _tracer = tracing_init(Tokio)?; let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index f020cb87ca..f37308ac35 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"] async-std = { version = "1.6", optional = true } async-trait = "0.1" base64 = { version = "0.13", optional = true } +futures = "0.3" +futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index efefc17a9b..03c81803c7 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -228,7 +228,10 @@ impl AgentPipeline { /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. - pub fn build_simple(mut self) -> Result { + pub fn build_simple( + mut self, + runtime: R, + ) -> Result { let mut builder = sdk::trace::TracerProvider::builder(); let (config, process) = build_config_and_process( @@ -236,12 +239,13 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let exporter = Exporter::new( + let (exporter, task) = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, ); + runtime.spawn(Box::pin(task)); builder = builder.with_simple_exporter(exporter); builder = builder.with_config(config); @@ -275,7 +279,9 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -287,8 +293,11 @@ impl AgentPipeline { /// tracer provider. /// /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. - pub fn install_simple(self) -> Result { - let tracer_provider = self.build_simple()?; + pub fn install_simple( + self, + runtime: R, + ) -> Result { + let tracer_provider = self.build_simple(runtime)?; install_tracer_provider_and_get_tracer(tracer_provider) } @@ -321,28 +330,27 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime)?; - Ok(Exporter::new( - process.into(), - export_instrument_library, - uploader, - )) + let uploader = self.build_async_agent_uploader(runtime.clone())?; + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. - pub fn build_sync_agent_exporter(mut self) -> Result { - let builder = sdk::trace::TracerProvider::builder(); - let (_, process) = build_config_and_process( - builder.sdk_provided_resource(), - self.trace_config.take(), - self.transformation_config.service_name.take(), - ); - Ok(Exporter::new( - process.into(), - self.transformation_config.export_instrument_library, - self.build_sync_agent_uploader()?, - )) - } + // pub fn build_sync_agent_exporter(mut self) -> Result { + // let builder = sdk::trace::TracerProvider::builder(); + // let (_, process) = build_config_and_process( + // builder.sdk_provided_resource(), + // self.trace_config.take(), + // self.transformation_config.service_name.take(), + // ); + // Ok(Exporter::new( + // process.into(), + // self.transformation_config.export_instrument_library, + // self.build_sync_agent_uploader()?, + // )) + // } fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> where @@ -358,6 +366,7 @@ impl AgentPipeline { Ok(Box::new(AsyncUploader::Agent(agent))) } + #[allow(dead_code)] // TODO jwilm fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 205c142695..df9ff23891 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -412,8 +412,9 @@ impl CollectorPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_uploader::()?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index a363c0270a..039dcc1a07 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -153,6 +153,7 @@ mod tests { // OTEL_SERVICE_NAME env var also works env::set_var("OTEL_SERVICE_NAME", "test service"); let builder = new_agent_pipeline(); + // TODO jwilm let exporter = builder.build_sync_agent_exporter().unwrap(); assert_eq!(exporter.process.service_name, "test service"); env::set_var("OTEL_SERVICE_NAME", "") diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 0d64a739ea..5500505df6 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -18,8 +18,11 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; +use futures::StreamExt; use std::convert::TryInto; +use std::future::Future; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -45,10 +48,7 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { - process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: Box, + tx: mpsc::Sender<(Vec, oneshot::Sender)>, } impl Exporter { @@ -56,11 +56,51 @@ impl Exporter { process: jaeger::Process, export_instrumentation_lib: bool, uploader: Box, - ) -> Exporter { - Exporter { - process, - export_instrumentation_lib, - uploader, + ) -> (Exporter, impl Future) { + let (tx, rx) = futures::channel::mpsc::channel(64); + ( + Exporter { tx }, + ExporterTask { + rx, + process, + export_instrumentation_lib, + uploader, + } + .run(), + ) + } +} + +struct ExporterTask { + rx: mpsc::Receiver<(Vec, oneshot::Sender)>, + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: Box, +} + +impl ExporterTask { + async fn run(mut self) { + // TODO jwilm: this might benefit from a ExporterMessage so that we can + // send Shutdown and break the loop. + while let Some((batch, tx)) = self.rx.next().await { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + let res = self + .uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await; + + // TODO jwilm: is ignoring the err (fail to send) correct here? + let _ = tx.send(res); } } } @@ -74,23 +114,16 @@ pub struct Process { pub tags: Vec, } -#[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + let (tx, rx) = oneshot::channel(); + + if let Err(err) = self.tx.try_send((batch, tx)) { + return Box::pin(futures::future::ready(Err(Into::into(err)))); } - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await + Box::pin(async move { rx.await? }) } }